Package com.ikanow.infinit.e.processing.generic.store_and_index

Source Code of com.ikanow.infinit.e.processing.generic.store_and_index.StoreAndIndexManager

/*******************************************************************************
* Copyright 2012, The Infinit.e Open Source Project.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
package com.ikanow.infinit.e.processing.generic.store_and_index;


import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import org.apache.log4j.Logger;
import org.bson.types.ObjectId;
import org.elasticsearch.index.query.BaseQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;

import com.google.gson.reflect.TypeToken;
import com.ikanow.infinit.e.data_model.index.ElasticSearchManager;
import com.ikanow.infinit.e.data_model.index.IndexManager;
import com.ikanow.infinit.e.data_model.index.document.DocumentPojoIndexMap;
import com.ikanow.infinit.e.data_model.store.DbManager;
import com.ikanow.infinit.e.data_model.store.MongoDbManager;
import com.ikanow.infinit.e.data_model.store.config.source.SourceHarvestStatusPojo;
import com.ikanow.infinit.e.data_model.store.config.source.SourcePojo;
import com.ikanow.infinit.e.data_model.store.document.CompressedFullTextPojo;
import com.ikanow.infinit.e.data_model.store.document.DocCountPojo;
import com.ikanow.infinit.e.data_model.store.document.DocumentPojo;
import com.ikanow.infinit.e.data_model.utils.PropertiesManager;
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;

/**
* Class used to commit records to backend storage during harvest process
* @author cmorgan
*
*/
public class StoreAndIndexManager {

  // Initialize the Logger
  private static final Logger logger = Logger.getLogger(StoreAndIndexManager.class)
 
  private static boolean _diagnosticMode = false;
  public static void setDiagnosticMode(boolean bMode) { _diagnosticMode = bMode; }
 
  private int nMaxContentLen_bytes = 100000; // (100KB default max)
  private boolean bStoreRawContent = false; // (store the raw as well as the processed data)
  private boolean bStoreMetadataAsContent = false; // (store the metadata in the content block)
 
  public final static String DELETION_INDICATOR = "?DEL?";
  private String harvesterUUID = null;
  public String getUUID() { return harvesterUUID; }
 
  public StoreAndIndexManager() {
    com.ikanow.infinit.e.processing.generic.utils.PropertiesManager pm =
      new com.ikanow.infinit.e.processing.generic.utils.PropertiesManager();
   
    int nMaxContent = pm.getMaxContentSize();
    if (nMaxContent > -1) {
      nMaxContentLen_bytes = nMaxContent;
    }   
    bStoreRawContent = pm.storeRawContent();
    bStoreMetadataAsContent = pm.storeMetadataAsContent();
   
    try {
      StringBuffer sb = new StringBuffer(DELETION_INDICATOR).append(java.net.InetAddress.getLocalHost().getHostName());
      harvesterUUID = sb.toString();
    } catch (UnknownHostException e) {
      harvesterUUID = DELETION_INDICATOR + "UNKNOWN";
    }
  }
 
///////////////////////////////////////////////////////////////////////////////////////////////// 
///////////////////////////////////////////////////////////////////////////////////////////////// 
 
// Datastore addition   
 
  /**
   * Add a list of doc documents to the data store
   * @param feeds
   */
  public void addToDatastore(List<DocumentPojo> docs, boolean bSaveContent, SourcePojo source) {
    try {
      // Create collection manager
      // Add to data store
      addToDatastore(DbManager.getDocument().getMetadata(), docs);
    } catch (Exception e) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);
    }
    // (note: currently modifies docs, see DocumentIndexPojoMap, so beware if using after this point)
    if (bSaveContent) {
      saveContent(docs);
    }
    boolean index = true;
    if ((null != source) && (null != source.getSearchIndexFilter())) {
      if (null != source.getSearchIndexFilter().indexOnIngest) {
        index = source.getSearchIndexFilter().indexOnIngest;
      }
    }
    if (index) {
      this.addToSearch(docs);
    }
   
  }//TESTED
 
  /////////////////////////////////////////////////////////////////////////////////////////////////
 
  // Utilities
 
  /**
   * Add a single doc document to the datastore
   * @param col
   * @param doc
   */
  private void addToDatastore(DBCollection col, DocumentPojo doc) {
    if (!_diagnosticMode) {
      if (!docHasExternalContent(doc.getUrl(), doc.getSourceUrl())) {
        doc.makeFullTextNonTransient(); // (ie store full text in this case)
      }
      col.save(doc.toDb());
    }
    else {
      System.out.println("StoreAndIndexManager.addToDatastore: " + ((BasicDBObject)doc.toDb()).toString());
    }
  }//TESTED
 
  /**
   * Add a list of doc documents to the data store
   * @param feeds
   */
  private void addToDatastore(DBCollection col, List<DocumentPojo> docs) {
    // Store the knowledge in the feeds collection in the harvester db     
    int errors = 0;
    Exception savedException = null;
    Iterator<DocumentPojo> it = docs.iterator();
    while (it.hasNext()) {
      DocumentPojo f = it.next();
     
      // Set an _id before writing it to the datastore,
      // so the same _id gets written to the index
      // NOTE WE OVERWRITE ANY TRANSIENT IDS THAT MIGHT HAVE BEEN SET eg BY REMOVE CODE
      f.setId(new ObjectId());
     
      // Check geo-size: need to add to a different index if so, for memory usage reasons
      if (null == f.getLocs()) { // (can be set by update/deletion code also)
        if (DocumentPojoIndexMap.hasManyGeos(f)) {
          f.setIndex(DocumentPojoIndexMap.manyGeoDocumentIndex_);
          // (note this check isn't stateless, it actually populates "locs" at the same time)
          // therefore...
        }
      }
      Set<String> locs = f.getLocs();
      f.setLocs(null);
     
      try {
        addToDatastore(col, f);
      }
      catch (Exception e) {
        errors++;
        if ((errors > 0) || (null == e.getMessage())) {
          savedException = e;
        }
        it.remove();
        continue;
      }
     
      f.setLocs(locs);
    }
    if (errors > 0) {
      logger.error("addToDatastore: errors=" + errors + " 1sterror=" + savedException.getMessage());
    }
  }//TESTED

  //////////////////////////////////////////////////////////////////////////////////////

  /**
   * Save the fulltext of a pojo to mongo for using later
   *
   * @param docs
   */
  private void saveContent(List<DocumentPojo> docs)
  {
    try
    {
      DBCollection contentDb = DbManager.getDocument().getContent();
     
      for ( DocumentPojo doc : docs )
      {
        boolean bStoreContent = true;
        bStoreContent &= (0 != nMaxContentLen_bytes); // (otherwise it's turned off)
        bStoreContent &= this.bStoreMetadataAsContent || ((null != doc.getFullText()) && !doc.getFullText().isEmpty());       
        boolean bDocHasExternalContent = docHasExternalContent(doc.getUrl(), doc.getSourceUrl());
       
        if (bStoreContent && bDocHasExternalContent) {
          try
          {
            String rawText = this.bStoreRawContent ? doc.getRawFullText() : null;
            DocumentPojo meta = bStoreMetadataAsContent ? doc : null;
            CompressedFullTextPojo gzippedContent = new CompressedFullTextPojo(doc.getUrl(), doc.getSourceKey(), doc.getCommunityId(),
                                              doc.getFullText(), rawText, meta, nMaxContentLen_bytes);
           
            if (null != gzippedContent.getUrl())  {
              // Be efficient and write field-by-field vs using JSON conversion
              BasicDBObject query = new BasicDBObject(CompressedFullTextPojo.url_, gzippedContent.getUrl());
              query.put(CompressedFullTextPojo.sourceKey_, gzippedContent.getSourceKey());
              BasicDBObject update = gzippedContent.getUpdate();
              if (!_diagnosticMode) {
                contentDb.update(query, update, true, false); // (ie upsert, supported because query includes shard key==url)
              }
              else {
                System.out.println("StoreAndIndexManager.savedContent, save content: " + gzippedContent.getUrl());
              }
            }
          }
          catch (Exception ex)
          {
            // Do nothing, just carry on
            ex.printStackTrace();
          }
        }//TESTED
      }
    }
    catch (Exception ex)
    {
      // This is a more serious error
      logger.error(ex.getMessage());
    }
  }//TESTED (not changed since by-eye testing in Beta)
 
 
///////////////////////////////////////////////////////////////////////////////////////////////// 
///////////////////////////////////////////////////////////////////////////////////////////////// 
 
// Datastore removal   

  /**
   * This function removes documents "soft deleted" by this harvester
   */
 
  public void removeSoftDeletedDocuments()
  {
    BasicDBObject query = new BasicDBObject(DocumentPojo.url_, harvesterUUID);
   
    if (_diagnosticMode) {
      System.out.println("Soft delete: " + DbManager.getDocument().getMetadata().count(query));     
    }
    else {
      DbManager.getDocument().getMetadata().remove(query);     
    }
  }//TESTED
 
  /**
   * Low level utility to abstract soft deletion
   * We're using URL because 1) we cant' use a shard key
   * 2) it needs to be an indexed field
   * 3) ideally one that is likely to be cached in memory
   * 4) one that minimizes the chance of having to move the document when modifying the field
   * (I also considered sourceUrl or an all new field, they _might_ be better because smaller, but conversely
   *  would be less likely to be cached and most importantly there's the risk of 4)
   */
 
  private BasicDBObject _softDeleter = null;
 
  private BasicDBObject getSoftDeleteUpdate()
  {
    if (null == _softDeleter) {
      BasicDBObject softDeleter = new BasicDBObject(DocumentPojo.url_, harvesterUUID);
      softDeleter.put(DocumentPojo.index_, DELETION_INDICATOR);
        // (used in CustomHadoopTaskLauncher.createConfigXML)
      _softDeleter = new BasicDBObject(DbManager.set_, softDeleter);
    }
    return _softDeleter;
  }//TESTED

  /**
   * Remove a list of doc documents from the data store (you have their _id and sourceKey)
   *
   * CALLED FROM:  resizeDB() <- FILLS IN _ID, SOURCEKEY, INDEX, URL, SOURCEURL
   */
  public void removeFromDatastore_byId(List<DocumentPojo> docs) {
    try {
      // Remove from data store
      removeFromDatastore_byId(DbManager.getDocument().getMetadata(), docs);     
      this.removeFromSearch(docs);
     
    } catch (Exception e) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);
    }
  }//TESTED
 
  /**
   * Remove a list of doc documents from the data store (you have their url) AND ALSO the search index
   *
   * @param docs - child function needs url (optionally sourceUrl) set - child function requires sourceKey
   *           this function needs id and index both of which are set by the child stack
   *
   * CALLED FROM: MongoDocumentTxfer.doDelete(...) <- SETS URL, SOURCE URL, SOURCE KEY, COMMUNITY ID, INDEX, _ID
   *         processDocuments(...) [ always called after harvester: have sourceUrl, sourceKey,
   *                     DON'T have _id, BUT do have updateId and index (correct except in many geo cases)]
   *         pruneSource(source, ...) <- SETS URL, SOURCE URL, SOURCE KEY, INDEX, _ID
   *           updateHarvestStatus(...)
   */
  public ObjectId removeFromDatastore_byURL(List<DocumentPojo> docs) {
   
    // Remove from data store:
    ObjectId nextId = null;
    try {
      nextId = removeFromDatastore_byURL(DbManager.getDocument().getMetadata(), docs);     
        // ^^^ adds "created" (if updateId set), "_id" and "index" to the doc and expands "sourceUrl" docs (adding "_id" and "index")
     
    } catch (Exception e) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);
    }
   
    // Remove from index:
   
    try {
      this.removeFromSearch(docs);
     
    } catch (Exception e) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);
    }
   
    return nextId;
  }//TESTED
 
  /**
   * Remove a list of doc documents from the data store (you have a source key, so you can go much quicker)
   * CALLED FROM:  deleteSource(...)
   * @returns the number of docs deleted
   */
  public long removeFromDatastoreAndIndex_bySourceKey(String sourceKey, ObjectId lessThanId, boolean definitelyNoContent, String communityId) {
       
    try {     
      if (!definitelyNoContent) {
        DbManager.getDocument().getContent().remove(new BasicDBObject(CompressedFullTextPojo.sourceKey_, sourceKey));
          // (will just check index and pull out if the doc has no external content)
      }
      BasicDBObject query = new BasicDBObject(DocumentPojo.sourceKey_, sourceKey);
      if (null != lessThanId) { // Multiple threads running for this source
        // First check whether one of the other threads has already deleted the source:
        BasicDBObject oneFinalCheckQuery = new BasicDBObject(DocumentPojo.sourceKey_, sourceKey);
        BasicDBObject oneFinalCheckFields = new BasicDBObject(DocumentPojo.index_, 1);
        BasicDBObject firstDocToBeUpdated = (BasicDBObject) DbManager.getDocument().getMetadata().findOne(oneFinalCheckQuery, oneFinalCheckFields);
        if ((null == firstDocToBeUpdated) || firstDocToBeUpdated.getString(DocumentPojo.index_, "").equals(DELETION_INDICATOR))
        {
          //(ie grab the first doc in natural order and tell me if it's been soft-deleted yet, if so do nothing)
          return 0;
        }//TESTED
       
        // That check isn't perfect because of race conditions, so we'll still add the !="?DEL?" check to the
        // update as well:       
        query.put(DocumentPojo._id_, new BasicDBObject(DbManager.lte_, lessThanId));
        query.put(DocumentPojo.index_, new BasicDBObject(DbManager.ne_, DELETION_INDICATOR));
      }//TESTED
     
      BasicDBObject softDeleter = getSoftDeleteUpdate();
      DbManager.getDocument().getMetadata().update(query, softDeleter, false, true);
      // (don't do getLastError just yet since it can block waiting for completion)
     
      // Quick delete for index though:
      StringBuffer sb = new StringBuffer();
      if (null == lessThanId) {// slower version, be slightly more thorough...
        sb.append(DocumentPojoIndexMap.globalDocumentIndexCollection_).append(",");
      }
      sb.append(DocumentPojoIndexMap.manyGeoDocumentIndexCollection_).append(",docs_").append(communityId).append('/').append(DocumentPojoIndexMap.documentType_);
      ElasticSearchManager indexManager = IndexManager.getIndex(sb.toString());
      BaseQueryBuilder soloOrCombinedQuery = QueryBuilders.termQuery(DocumentPojo.sourceKey_, sourceKey);
      if (null != lessThanId) {
        //(_id isn't indexed - _uid is and == _type + "#" + _id)
        soloOrCombinedQuery = QueryBuilders.boolQuery().must(soloOrCombinedQuery).
                    must(QueryBuilders.rangeQuery("_uid").lte("document_index#" + lessThanId.toString()));
       
      }//TESTED
      indexManager.doDeleteByQuery(soloOrCombinedQuery);           
     
      CommandResult result = DbManager.getDocument().getLastError("metadata");
      return result.getLong("n", 0);
     
    } catch (Exception e) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);     
    }
    return 0;
  }//TESTED
 
 
  /**
   * Remove a list of doc documents from the data store and index (you have a source URL, so you can go much quicker)
   *
   * CALLED FROM: removeFromDataStore_byURL(List<doc>, bDeleteContent) [ALSO DELETES FROM INDEX AFTER ADDED FROM HERE]
   *           MongoDocumentTxfer.doDelete(...)  <- SETS URL, SOURCE URL, SOURCE KEY, COMMUNITY ID, INDEX, _ID
   *           processDocuments(...) [ always called after harvester: have sourceUrl, sourceKey,
   *                       DON'T have _id, BUT do have updateId and index (correct except in many geo cases)]
   * @returns the number of docs deleted
   */
 
  private ElasticSearchManager _cachedIndexManagerForSourceXxxDeletion = null;
  private ObjectId _cachedCommunityIdForSourceXxxDeletion = null;
  public long removeFromDatastoreAndIndex_bySourceUrl(String sourceUrl, String sourceKey, ObjectId communityId) {
       
    try {     
      // (never any content)
      BasicDBObject query = new BasicDBObject(DocumentPojo.sourceUrl_, sourceUrl);
      query.put(DocumentPojo.sourceKey_, sourceKey);
      BasicDBObject softDeleter = getSoftDeleteUpdate();
      DbManager.getDocument().getMetadata().update(query, softDeleter, false, true);
      CommandResult result = DbManager.getDocument().getLastError("metadata");
     
      // Quick delete for index though:
      if (!communityId.equals(_cachedCommunityIdForSourceXxxDeletion)) {
        StringBuffer sb = new StringBuffer(DocumentPojoIndexMap.manyGeoDocumentIndexCollection_).append(",docs_").append(communityId).append('/').append(DocumentPojoIndexMap.documentType_);
        _cachedIndexManagerForSourceXxxDeletion = IndexManager.getIndex(sb.toString());
        _cachedCommunityIdForSourceXxxDeletion = communityId;
      }//TESTED
      _cachedIndexManagerForSourceXxxDeletion.doDeleteByQuery(
          QueryBuilders.boolQuery()
            .must(QueryBuilders.termQuery(DocumentPojo.sourceUrl_, sourceUrl))
            .must(QueryBuilders.termQuery(DocumentPojo.sourceKey_, sourceKey))
          );
     
      return result.getLong("n", 0);
     
    } catch (Exception e) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);     
    }
    return 0;
  }//TESTED
 
 
  /////////////////////////////////////////////////////////////////////////////////////////////////
 
  // Utility
 
  /**
   * Remove a list of doc documents from the data store + adds _id and index doc fields to retrieve to support de-index
   * (also adds created to docs with an updateId so the created remains ~the same)
   * (Will in theory support arbitrary sourceUrl/sourceKey operators but in practice these will always be from a single source)
   * @param docs - needs url (optionally sourceUrl) set - child function requires sourceKey
   * @param col
   * @param feeds
   *
   * CALLED FROM: removeFromDataStore_byURL(List<doc>, bDeleteContent) [ALSO DELETES FROM INDEX AFTER ADDED FROM HERE]
   *           MongoDocumentTxfer.doDelete(...)  <- SETS URL, SOURCE URL, SOURCE KEY, COMMUNITY ID, INDEX, _ID
   *           processDocuments(...) [ always called after harvester: have sourceUrl, sourceKey,
   *                       DON'T have _id, BUT do have updateId and index (correct except in many geo cases)]
   *           pruneSource(source, ...) <- SETS URL, SOURCE URL, SOURCE KEY, INDEX, _ID
   *             updateHarvestStatus(...)
   */
  private ObjectId removeFromDatastore_byURL(DBCollection col, List<DocumentPojo> docs) {
    ObjectId nextId = null;
    BasicDBObject fields = new BasicDBObject();
    fields.put(DocumentPojo.created_, 1); // (If we're getting the deleted doc fields, get this and have exact created time)
    fields.put(DocumentPojo.index_, 1); // This is needed for the removeFromSearch() called from parent removeFromDatastore_URL
       
    TreeMap<String,DocumentPojo> sourceUrlToKeyMap = null;
    HashSet<String> deletedSources = null;
    // Store the knowledge in the feeds collection in the harvester db
    Iterator<DocumentPojo> docIt = docs.iterator();
    while (docIt.hasNext()) {
      DocumentPojo f = docIt.next();
      nextId = f.getId(); // (only interested in the pruneSource case, in which case _id is set on input)
     
      if ((null != f.getSourceUrl()) && (null == f.getUrl())) { // special case ... delete all these documents...
        if ((null == deletedSources) || !deletedSources.contains(f.getSourceKey())) { // (don't bother deleting sourceURL if deleting source)
          if (null == sourceUrlToKeyMap) {
            sourceUrlToKeyMap = new TreeMap<String,DocumentPojo>();
          }
          sourceUrlToKeyMap.put(f.getSourceUrl(), f);       
        }//TESTED

        docIt.remove(); // (so don't miscount number of docs; processed below)
      }
      else if (null != f.getSourceKey() && (null == f.getSourceUrl()) && (null == f.getUrl())) {
        // Even more special case: delete entire sourceKey
        if (null == deletedSources) {
          deletedSources = new HashSet<String>();
        }
        if (!deletedSources.contains(f.getSourceKey())) {
          deletedSources.add(f.getSourceKey());
          long srcRemoved = removeFromDatastoreAndIndex_bySourceKey(f.getSourceKey(), f.getId(), true, f.getCommunityId().toString());
          if (srcRemoved > 0) {
            updateDocCountsOnTheFly(-srcRemoved, f.getSourceKey(), f.getCommunityId());           
          }
        }
        docIt.remove(); // (so don't miscount number of docs)
      }//TESTED
      else {
        removeFromDatastore_byURL(col, f, fields,
            StoreAndIndexManager.docHasExternalContent(f.getUrl(), f.getSourceUrl()));
          // (adds "_id", "index")
      }
    }//TESTED

    // Now tidy up sourceUrls, do some caching across sourceKey/community for performance
    String sourceKey = null; // (if deleting sourceKey don't bother deleting any sourceUrls)
    long removed = 0; // (from special operations)
    String cachedSourceKey = null; // (will handle multiple source keys, although that can't currently happen in practice)
    ObjectId communityId = null;
    if (null != sourceUrlToKeyMap) for (Map.Entry<String, DocumentPojo> entry: sourceUrlToKeyMap.entrySet()) {
      String srcUrl = entry.getKey();
      DocumentPojo doc = entry.getValue();
      sourceKey = doc.getSourceKey();
      communityId = doc.getCommunityId();
      if (sourceKey != cachedSourceKey) { // ptr comparison by design
        if (removed > 0) {
          updateDocCountsOnTheFly(-removed, sourceKey, communityId);
          removed = 0;
        }//TESTED
        cachedSourceKey = sourceKey;
      }
      removed += removeFromDatastoreAndIndex_bySourceUrl(srcUrl, sourceKey, communityId);
    }//TESTED
    if ((removed > 0) && (null != sourceKey)) {
      updateDocCountsOnTheFly(-removed, sourceKey, communityId);
    }//TESTED
    return nextId;
  }//TESTED
 
  public void updateDocCountsOnTheFly(long docIncrement, String sourceKey, ObjectId communityId)
  {
    DbManager.getDocument().getCounts().update(new BasicDBObject(DocCountPojo._id_, communityId),
        new BasicDBObject(DbManager.inc_, new BasicDBObject(DocCountPojo.doccount_, docIncrement)));
    DbManager.getIngest().getSource().update(new BasicDBObject(SourcePojo.key_, sourceKey),
        new BasicDBObject(MongoDbManager.inc_,
            new BasicDBObject(SourceHarvestStatusPojo.sourceQuery_doccount_, docIncrement))
        );   
  }//TESTED
 
  /**
   * Remove a doc from the data store
   * @param col
   * @param doc
   * @param fields - fields to retrieve
   *
   * CALLED FROM:  removeFromDataStore_byId(List<doc>, bDeleteContent)
   *           resizeDB() <- FILLS IN _ID, SOURCEKEY, INDEX, SOURCEURL
   */
  private void removeFromDatastore_byId(DBCollection col, List<DocumentPojo> docs) {
    // Store the knowledge in the feeds collection in the harvester db     
    for ( DocumentPojo f : docs) {
      removeFromDatastore_byId(col, f);
    }
  }//TESTED
 
  /**
   * Remove a doc from the data store
   * @param col
   * @param doc - assumes _id set
   * @param fields - fields to retrieve (set in outside the doc loop for performance, url, index, sourceKey)
   *
   * CALLED FROM:  removeFromDataStore_byId(col, List<doc>, bDeleteContent)
   *           removeFromDataStore_byId(List<doc>, bDeleteContent)
   *             resizeDB() <- _ID, SOURCEKEY, INDEX, SOURCEURL
   */
  private void removeFromDatastore_byId(DBCollection col, DocumentPojo doc) {
   
    boolean bDeleteContent =  docHasExternalContent(doc.getUrl(), doc.getSourceUrl());
   
    if (bDeleteContent) {
      // Remove its content also:
      if (!_diagnosticMode) {
        BasicDBObject contentQuery = new BasicDBObject(DocumentPojo.url_, doc.getUrl());
        contentQuery.put(DocumentPojo.sourceKey_, doc.getSourceKey());
        DbManager.getDocument().getContent().remove(contentQuery);
      }
      else {
        System.out.println("StoreAndIndexManager.removeFromDatastore_byId, delete content: " + doc.getSourceKey() + "/" + doc.getUrl());
      }
    }
   
    // Update Mongodb with the data
    BasicDBObject query = new BasicDBObject();
    query.put(DocumentPojo.sourceKey_, doc.getSourceKey());
    query.put(DocumentPojo._id_, doc.getId());
    query.put(DocumentPojo.sourceKey_, doc.getSourceKey()); // (needed because on newer machines this is the shard key)
   
    if (!_diagnosticMode) {
      BasicDBObject softDelete = getSoftDeleteUpdate();
      col.update(query, softDelete);
        // (can do this on sharded collections because it uses sourceKey+_id, the shard key)
    }
    else { // (diagnostic mode)
      if (null != col.findOne(query)) {
        System.out.println("StoreAndIndexManager.removeFromDatastore_byId, delete: " + doc.toDb().toString());
      }
      else {
        System.out.println("StoreAndIndexManager.removeFromDatastore_byId, delete: DOC NOT FOUND");       
      }
    }
  }//TESTED (1.1)
 
  /**
   * Remove a doc from the data store, ensures all the fields specified in "fields" are populated (ready for index deletion)
   * @param col
   * @param doc - needs  url, sourceKey set
   * @param fields - fields to retrieve (index, created), set in calling function outside of loop for performance
   *
   * CALLED FROM: removeFromDatastore_byURL(col, List<doc>, bDeleteContent) <- ADDS INDEX, CREATED TO FIELDS
   *           removeFromDataStore_byURL(List<doc>, bDeleteContent) [ALSO DELETES FROM INDEX AFTER ADDED FROM HERE]
   *             MongoDocumentTxfer.doDelete(...)  <- SETS URL, SOURCE URL, SOURCE KEY, COMMUNITY ID, INDEX, _ID
   *             processDocuments(...) [ always called after harvester: have sourceUrl, sourceKey,
   *                         DON'T have _id, BUT do have updateId and index (correct except in many geo cases)]
   *             pruneSource(source, ...) <- SETS URL, SOURCE URL, SOURCE KEY, INDEX
   *               updateHarvestStatus(...)
   */
  private void removeFromDatastore_byURL(DBCollection col, DocumentPojo doc, BasicDBObject fields, boolean bDeleteContent) {
   
    // 1] Create the query to soft delete the document
   
    BasicDBObject query = new BasicDBObject();
    query.put(DocumentPojo.url_, doc.getUrl());
    query.put(DocumentPojo.sourceKey_, doc.getSourceKey());

    // 2] Delete the content if needed
   
    if (bDeleteContent) {
      if (docHasExternalContent(doc.getUrl(), doc.getSourceUrl())) {
        if (!_diagnosticMode) {
          DbManager.getDocument().getContent().remove(query);
        }
        else {
          System.out.println("StoreAndIndexManager.removeFromDatastore_byUrl(2), delete content: " + doc.getSourceKey() + "/" + doc.getUrl());
        }
      }
    }
    //TESTED
   
    // 3] Work out which fields we have and which (if any we need to go and fetch):
   
    boolean needToFindAndModify = false;
   
    if (null == doc.getId()) { // This is called from processDocuments
     
      if (null != doc.getUpdateId()) { // update case...
        doc.setId(doc.getUpdateId()); // (note this is overwritten by addToDatastore later, in update case, so we're good)

        // (doc.index is populated but may not be correct because of the "many geos" workaround):
        if (DocumentPojoIndexMap.hasManyGeos(doc)) {
          doc.setIndex(DocumentPojoIndexMap.manyGeoDocumentIndex_);
          // (note this check isn't stateless, it actually populates "locs" at the same time
          //  this is handled in addToDatastore (update case), temp removed when adding to DB
        }//TESTED (2.1.2, diagnostic mode, doc2)
      }
      else { // Not an update case, we're going to have to grab the document after all, which is a bit slower
        needToFindAndModify = true;
      }
    }//TESTED (2.1.2, diagnostic mode, doc2)
    if (!needToFindAndModify) { // set created if we need to, since we're not grabbing it from the datastore
      if (null != doc.getUpdateId()) { // (this means we have an approx created if we don't need to go fetch the deleted doc)
        doc.setCreated(new Date(doc.getUpdateId().getTime()));
      }//TESTED (2.1.2, diagnostic mode, doc2)         
    }
    // (if we're here and index is not set, then it is intended to be null)
   
    // 4] Update the doc_metadata collection
   
    BasicDBObject softDelete = getSoftDeleteUpdate();
    BasicDBObject deadDoc = null; // (not normally needed)
   
    if (needToFindAndModify) { // less pleasant, need to go grab the doc
      deadDoc = (BasicDBObject) col.findOne(query, fields);       
    }//TESTED (2.1.2)
   
    if (!_diagnosticMode) {
      col.update(query, softDelete, false, true); // (needs to be multi- even though there's a single element for sharding reasons)     
    }//TESTED (2.1.2)
   
    // 5] Add fields if necessary
   
    if (null != deadDoc) {
      doc.setCreated((Date) deadDoc.get(DocumentPojo.created_));
        // (if getting this doc anyway then might as well get the created)
      doc.setId((ObjectId) deadDoc.get(DocumentPojo._id_));
      doc.setIndex((String) deadDoc.get(DocumentPojo.index_));
     
      if (_diagnosticMode) {
        System.out.println("StoreAndIndexManager.removeFromDatastore_byUrl(2): found " + deadDoc.toString());
      }
    }//TESTED (2.1.2)
    else if (_diagnosticMode) {
      if (!needToFindAndModify) {
        System.out.println("StoreAndIndexManager.removeFromDatastore_byUrl(2): straight deleted " + doc.toDb().toString());
      }
      else {
        System.out.println("StoreAndIndexManager.removeFromDatastore_byUrl(2): didn't find " + query.toString());
      }
    }//TESTED (2.1.2)
  }//TESSTED (2.1.2)

///////////////////////////////////////////////////////////////////////////////////////////////// 
///////////////////////////////////////////////////////////////////////////////////////////////// 

// Synchronize database with index 
 
  /**
   * Add a list of feeds to the full text index
   * @param docs
   */
  public void addToSearch(List<DocumentPojo> docs)
  {   
    String sSavedIndex = null;
    ElasticSearchManager indexManager = null;
    LinkedList<DocumentPojo> tmpDocs = new LinkedList<DocumentPojo>();
    int nTmpDocs = 0;
    for ( DocumentPojo doc : docs )
    {     
      String sThisDocIndex = doc.getIndex();
     
      if ((null == sSavedIndex) || (null == sThisDocIndex) || !sSavedIndex.equals(sThisDocIndex)) { // Change index
       
        if (null != indexManager) { // ie not first time through, bulk add what docs we have
          sendToIndex(indexManager, tmpDocs);
            // (ie with the *old* index manager)
          nTmpDocs = 0;
        }
        sSavedIndex = sThisDocIndex;
        if ((null == sSavedIndex) || (sSavedIndex.equals(DocumentPojoIndexMap.globalDocumentIndex_))) {
          indexManager = IndexManager.getIndex(DocumentPojoIndexMap.globalDocumentIndex_);
        }
        else {
          indexManager = IndexManager.getIndex(new StringBuffer(sSavedIndex).append('/').
                                append(DocumentPojoIndexMap.documentType_).toString());         
        }
      }//TESTED   
     
      tmpDocs.add(doc);
      nTmpDocs++;
     
      if (nTmpDocs > 5000) { // some sensible upper limit
        sendToIndex(indexManager, tmpDocs);
        nTmpDocs = 0;
      }
     
      if (_diagnosticMode) {
        System.out.println("StoreAndIndexManager.addToSearch, add: " + doc.getId() + " + " +
            ((null != doc.getEntities())?("" + doc.getEntities().size()):"0") + " entities, " +
            ((null != doc.getAssociations())?("" + doc.getAssociations().size()):"0") + " assocs, " +
            ((null != doc.getLocs())?("" + doc.getLocs().size()):"0") + " locs"
                );
      }
    }// (end loop over docs)
   
    // Bulk add remaining docs   
    sendToIndex(indexManager, tmpDocs);
     
  }//TESTED (not change since by-eye testing in Beta)
 
  // Utility required by the above function
 
  private void sendToIndex(ElasticSearchManager indexManager, LinkedList<DocumentPojo> docsToAdd) {
    try {
      if (!docsToAdd.isEmpty()) {
        if (!_diagnosticMode) {
          indexManager.bulkAddDocuments(IndexManager.mapListToIndex(docsToAdd, new TypeToken<LinkedList<DocumentPojo>>(){},
              new DocumentPojoIndexMap()), DocumentPojo._id_, null, true);
        }
        else {
          System.out.println("StoreAndIndexManager.addToSearch: index " + docsToAdd.size() + " documents to " + indexManager.getIndexName());
        }             
        docsToAdd.clear();       
      }
    }
    catch (Exception ex)
    {
      ex.printStackTrace();
      logger.error("Exception Message saving document to ES: " + ex.getMessage(), ex);
    }
  }//TESTED
 
  /**
   *
   * @param docs (just need the id and the index and any events)
   */

  public void removeFromSearch(List<DocumentPojo> docs)
  {
    String sIndex = null;
    ElasticSearchManager indexManager = null;
    LinkedList<String> tmpDocs = new LinkedList<String>();
    int nTmpDocs = 0;
    for ( DocumentPojo doc : docs )
    { 
      if (null == doc.getId()) { // Normally this will be sourceUrls, eg files pointing to many docs
        continue; // (ie can just ignore)
      }
      if ((null != doc.getIndex()) && doc.getIndex().equals("?DEL?"))  {
        continue; //(must have already been deleted, so can ignore)
      }
      if ((null == sIndex) || (null == doc.getIndex()) || !sIndex.equals(doc.getIndex())) { // Change index
       
        if (null != indexManager) { // ie not first time through, bulk delete what docs we have
          deleteFromIndex(indexManager, tmpDocs); // (clears tmpDocs)
            // (ie with the *old* index manager)
          nTmpDocs = 0;
        }
        sIndex = doc.getIndex();
        if ((null == sIndex) || (sIndex.equals(DocumentPojoIndexMap.globalDocumentIndex_))) {
          indexManager = IndexManager.getIndex(DocumentPojoIndexMap.globalDocumentIndex_);
        }
        else {
          indexManager = IndexManager.getIndex(new StringBuffer(sIndex).append('/').append(DocumentPojoIndexMap.documentType_).toString());         
        }
      }//TESTED   
     
      tmpDocs.add(doc.getId().toString());
      nTmpDocs++;
     
      if (nTmpDocs > 5000) { // some sensible upper limit
        deleteFromIndex(indexManager, tmpDocs); // (clears tmpDocs)
        nTmpDocs = 0;
      }
     
      //delete from event search
      if (_diagnosticMode) {
        System.out.println("StoreAndIndexManager.removeFromSearch, remove: " + doc.getId() + " + " +
            ((null != doc.getEntities())?("" + doc.getEntities().size()):"0") + " entities, " +
            ((null != doc.getAssociations())?("" + doc.getAssociations().size()):"0") + " assocs, " +
            ((null != doc.getLocs())?("" + doc.getLocs().size()):"0") + " locs"
            );
      }     
    } // (end loop over docs)
   
    // Bulk remove remaining docs   
    deleteFromIndex(indexManager, tmpDocs);
   
  }//TESTED (not change since by-eye testing in Beta)
 
  /////////////////////////////////////////////////////////////////////////////////////////////////
 
  // Utility required by the above function
 
  private void deleteFromIndex(ElasticSearchManager indexManager, LinkedList<String> docsToDelete) {
    try {
      if (!docsToDelete.isEmpty()) {
        if (!_diagnosticMode) {
          indexManager.bulkDeleteDocuments(docsToDelete);
        }
        else {
          System.out.println("StoreAndIndexManager.removeFromSearch: index " + docsToDelete.size() + " documents from " + indexManager.getIndexName());
        }             
        docsToDelete.clear();       
      }
    }
    catch (Exception ex)
    {
      ex.printStackTrace();
      logger.error("Exception Message deleting document from ES: " + ex.getMessage(), ex);
    }
  }//TESTED
 
///////////////////////////////////////////////////////////////////////////////////////////////// 
///////////////////////////////////////////////////////////////////////////////////////////////// 
 
// Handle resizing the DB if it gets too large 

  // Utility function for diagnostic prints etc

  public long getDatabaseSize() {
    return DbManager.getDocument().getMetadata().count();
  }

  /**
   * This function checks if DB storage requirements are met,
   * if not it will start removing docs based on least used/oldest
   *
   * @return true once DB is within bounds, false if an error occurs
   */
  public boolean resizeDB()
  {
    return resizeDB(-1);
  }

  public boolean resizeDB(long capacityOverride)
  {
    //Do quick check to check if we are already under storage requirements
    if ( checkStorageCapacity(capacityOverride) ) {
      return false;
    }
    else
    {
      //if quick check fails, start removing docs to get under requirement
      try
      {
        long currDocsInDB = DbManager.getDocument().getMetadata().count();
        long storageCap = (capacityOverride == -1L) ? new PropertiesManager().getStorageCapacity() : capacityOverride;

        List<DocumentPojo> docsToRemove = getLeastActiveDocs((int) (currDocsInDB-storageCap));
          // (populates docsToRemove with _id and sourceKey - needed to support doc_metadata sharding)
       
        removeFromDatastore_byId(docsToRemove); // (remove content since don't know if it exists)
        //(^ this also removes from index)

        return true;
      }
      catch (Exception e)
      {
        // If an exception occurs log the error
        logger.error("Exception Message: " + e.getMessage(), e);
        return true;
      }
    }
  }//TESTED

  /////////////////////////////////////////////////////////////////////////////////////////////////
 
  // Utility
 
  /**
   * This method checks if doc count is
   * below threshhold set in properties
   * @return true is below threshhold, false if not
   */
  private boolean checkStorageCapacity(long capacityOverride)
  {
    long currDocsInDB = 0;
    try {
      currDocsInDB = DbManager.getDocument().getMetadata().count();
    } catch (Exception e ) {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);
    }
    long storageCapacity = (-1L == capacityOverride) ? new PropertiesManager().getStorageCapacity() : capacityOverride;
    return (currDocsInDB <= storageCapacity);
  }

  /**
   * Returns a list of the least active documents
   * List is of length numDocs
   *
   * @param numDocs Number of documents to return that are least active
   * @return a list of documents that are least active in DB (populates docsToRemove with _id and sourceKey - needed to support doc_metadata sharding)
)
   */
  private List<DocumentPojo> getLeastActiveDocs(int numDocs)
  {
    List<DocumentPojo> olddocs = null;

    //TODO (INF-1301): WRITE AN ALGORITHM TO CALCULATE THIS BASED ON USAGE, just using time last accessed currently
    //give a weight to documents age and documents activity to calculate
    //least active (current incarnation doesn't work)
    try
    {
      BasicDBObject fields = new BasicDBObject(DocumentPojo._id_, 1);
      fields.put(DocumentPojo.sourceKey_, 1);
      fields.put(DocumentPojo.index_, 1);
      fields.put(DocumentPojo.sourceUrl_, 1);
      fields.put(DocumentPojo.url_, 1);
      DBCursor dbc = DbManager.getDocument().getMetadata().find(new BasicDBObject(), fields).
                                  sort(new BasicDBObject(DocumentPojo._id_,1)).limit(numDocs);
      // (note, just retrieve _id and sourceKey fields: _id starts with timestamp so these are approximately oldest created)

      olddocs = DocumentPojo.listFromDb(dbc, DocumentPojo.listType());

    }
    catch (Exception e )
    {
      // If an exception occurs log the error
      logger.error("Exception Message: " + e.getMessage(), e);
    }
    return olddocs;
  }//TESTED (1.1)     

////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////// 

// Utility

    // Utility function to decide if we need to add/remove content via the external table
    // (ie JDBC and XML have their content as part of their metadata, eg fields
    //  others like HTTP and Files can have large amounts of content that we don't want to store in the DB object)

    // Called from: (noted here because this needs to be tidied up at some point)
    // StoreAndIndexManager.addToDatastore
    // MongoDocumentTxfer.doTransfer
    // SourceUtils.pruneSource
    // StoreAndIndexManager.removeFromDataStore_by(Id|SourceKey|Url)
    // StoreAndIndexManager.saveContent
 
    static public boolean docHasExternalContent(String url, String srcUrl) {
      //TODO: INF-1367: there's an issue with this .. suppose it's some enormous JSON file
      // and we excise a bunch of JSON files from the metadata (after using them for processing)
      // seems like we should have an optional keepExternalContent that defaults to the return value
      // of this function, but you can override from the SAH or whatever
     
      if (null != srcUrl) { // must be either JSON or XML or *sv
        return false;
      }
      else if (null == url) { // no idea, pathological case?!
        return true;
      }
      else if (url.startsWith("jdbc:")) { // DB entry
        return false;
      }
      else if (url.startsWith("inf://custom/")) { // custom entry
        return false;
      }
      else if ((url.startsWith("smb://") || url.startsWith("file:") || url.startsWith("s3://") || url.startsWith("inf://")) &&
                  (url.endsWith(".xml") || url.endsWith(".json") || url.endsWith("sv")))
        // JSON/XML/*sv but 1 doc/file        
      {
        return false;
      }
      return true;
   

}
TOP

Related Classes of com.ikanow.infinit.e.processing.generic.store_and_index.StoreAndIndexManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.