Package com.ikanow.infinit.e.utility

Source Code of com.ikanow.infinit.e.utility.MongoEntityFeatureTxfer

/*******************************************************************************
* Copyright 2012, The Infinit.e Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package com.ikanow.infinit.e.utility;

import java.io.IOException;
import java.lang.reflect.Type;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import org.bson.BSONObject;
import org.bson.types.ObjectId;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.ImmutableSettings.Builder;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.ikanow.infinit.e.data_model.index.ElasticSearchManager;
import com.ikanow.infinit.e.data_model.index.IndexManager;
import com.ikanow.infinit.e.data_model.index.feature.entity.EntityFeaturePojoIndexMap;
import com.ikanow.infinit.e.data_model.store.DbManager;
import com.ikanow.infinit.e.data_model.store.feature.entity.EntityFeaturePojo;
import com.ikanow.infinit.e.processing.generic.GenericProcessingController;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.Mongo;
import com.mongodb.MongoException;

public class MongoEntityFeatureTxfer
{

  //___________________________________________________________________________________________________
 
  // MAIN
 
  /**
   * @param args: 0 is config location, 1 is query, 2 is delete/split (optional)
   *         to run 3 options:
   *         Transfer: config query(opt)
   *         Delete: config query delete
   *         Split: config query split
   *
   * @throws MongoException
   * @throws NumberFormatException
   * @throws IOException
   */
  public static void main(String sConfigPath, String sQuery, boolean bDelete, boolean bRebuildIndex, int nSkip, int nLimit, String chunksDescription) throws NumberFormatException, MongoException, IOException {
   
    MongoEntityFeatureTxfer txferManager = new MongoEntityFeatureTxfer();
   
    // Command line processing
    com.ikanow.infinit.e.data_model.Globals.setIdentity(com.ikanow.infinit.e.data_model.Globals.Identity.IDENTITY_SERVICE);
    if (null != sConfigPath) {
      com.ikanow.infinit.e.data_model.Globals.overrideConfigLocation(sConfigPath);
    }
    if (bRebuildIndex) {
      new GenericProcessingController().InitializeIndex(false, true, false);
    }
   
    BasicDBObject query = null;   
    if (null == sQuery) {
      query = new BasicDBObject();
    }
    else {
      query = (BasicDBObject) com.mongodb.util.JSON.parse(sQuery);
    }
   
    if (bDelete) {
      MongoEntityFeatureTxfer.doDelete(query, nLimit);
    }
    else {
      if (null == chunksDescription) {
        txferManager.doTransfer(query, nSkip, nLimit, null);
      }
      else {
        txferManager.doChunkedTransfer(query, nSkip, nLimit, chunksDescription);       
      }
    }
  }
 
  //___________________________________________________________________________________________________
 
  // Wrapper for doing transfer in chunks:
 
  private void doChunkedTransfer(BasicDBObject query, int nSkip, int nLimit, String chunksDescription) throws IOException
  {
    List<BasicDBObject> chunkList = MongoIndexerUtils.getChunks("feature.entity", chunksDescription);
    System.out.println("CHUNKS: Found " + chunkList.size() + " chunks");
    //DEBUG
    //System.out.println("Chunklist= " + chunkList);
    for (BasicDBObject chunk: chunkList) {
      BasicDBObject cleanQuery = new BasicDBObject();
      cleanQuery.putAll((BSONObject)query);
      String id = null;
      try {
        id = (String) chunk.remove("$id");
        System.out.println("CHUNK: " + id);
        doTransfer(cleanQuery, 0, 0, chunk);
      }
      catch (Exception e) {
        System.out.println("FAILED CHUNK: " + id + " ... " + e.getMessage());
      }
    }
  }//TESTED
 
  //___________________________________________________________________________________________________
 
  // PROCESSING LOOP (new interface)
 
  private void doTransfer(BasicDBObject query, int nSkip, int nLimit, BasicDBObject chunk)
  {   
    ElasticSearchManager elasticManager = null;
       
    // Initialize the DB:
    DBCollection entityFeatureDB = DbManager.getFeature().getEntity();
   
    // Initialize the ES (create the index if it doesn't already):
         
// 1. Set-up the entity feature index

    String indexName = "entity_index";
    ElasticSearchManager.setDefaultClusterName("infinite-aws");
   
    // (delete the index)
    //elasticManager = ElasticSearchManager.getIndex(indexName);
    //elasticManager.deleteMe();
   
    // Create the index if necessary
    String sMapping = new Gson().toJson(new EntityFeaturePojoIndexMap.Mapping(), EntityFeaturePojoIndexMap.Mapping.class);
    Builder localSettings = ImmutableSettings.settingsBuilder();
    localSettings.put("number_of_shards", 1).put("number_of_replicas", 0);     
    localSettings.put("index.analysis.analyzer.suggestAnalyzer.tokenizer", "standard");
    localSettings.putArray("index.analysis.analyzer.suggestAnalyzer.filter", "standard", "lowercase");
   
    elasticManager = ElasticSearchManager.createIndex(indexName, null, false,  null, sMapping, localSettings);
   
    // Get the index (necessary if already created)
    if (null == elasticManager)
    {
      elasticManager = ElasticSearchManager.getIndex(indexName);
    }
   
// Now query the DB:
   
    DBCursor dbc = null;
    dbc = entityFeatureDB.find(query);
    if (null != chunk) {
      if (chunk.containsField(DbManager.min_)) {
        dbc = dbc.addSpecial(DbManager.min_, chunk.get(DbManager.min_));
      }
      if (chunk.containsField(DbManager.max_)) {
        dbc = dbc.addSpecial(DbManager.max_, chunk.get(DbManager.max_));
      }
    }
    dbc = dbc.skip(nSkip).limit(nLimit).batchSize(1000);
    if (null == chunk) {
      int nCount = dbc.count() - nSkip;
      if (nCount < 0) nCount = 0;
      System.out.println("Found " + nCount + " records to sync, process first " + (0==nLimit?nCount:nLimit));
      if (0 == nCount) { // Nothing to do...
        return;
      }     
    }   
   
    int nSynced = 0;
   
    List<EntityFeaturePojo> entities = new ArrayList<EntityFeaturePojo>();
    while ( dbc.hasNext() )
    {
      EntityFeaturePojo feature = EntityFeaturePojo.fromDb(dbc.next(),EntityFeaturePojo.class);
       
      if (null != feature.getAlias()) { // (some corrupt gazateer entry)

        // Handle groups (system group is: "4c927585d591d31d7b37097a")
        // if there is no community id, add system group (something is wrong if this happens?)
        if (null == feature.getCommunityId())
        {
          feature.setCommunityId(new ObjectId("4c927585d591d31d7b37097a"));           
        }
      }
     
      entities.add(feature);
      nSynced++;
     
      // Add the entities
      if ( entities.size() > 1000 )
      {
        elasticManager.bulkAddDocuments(
            IndexManager.mapListToIndex(entities, EntityFeaturePojo.listType(), new EntityFeaturePojoIndexMap()),
            "_id", null, true);
          // (note EntityFeaturePojoIndexMap creates an "_id" field of the format index:community)
       
        entities = new ArrayList<EntityFeaturePojo>();
      }
    }
    //write whatevers left
    elasticManager.bulkAddDocuments(
        IndexManager.mapListToIndex(entities, EntityFeaturePojo.listType(), new EntityFeaturePojoIndexMap()),
        "_id", null, true);
      // (note EntityFeaturePojoIndexMap creates an "_id" field of the format index:community)
   
    if (null != chunk) {
      System.out.println("Found " + nSynced + " records to sync in chunk");
    }       
  }
 
//___________________________________________________________________________________________________
 
  // DELETE DOCUMENTS FROM A QUERY
 
  static void doDelete(BasicDBObject query, int nLimit)
  {
    doDelete(query, nLimit, false);
  }
  static void doDelete(BasicDBObject query, int nLimit, boolean automatedRequest)
  {   
    try
    {
      // Initialize the DB: 
      DBCollection entityFeatureDB = DbManager.getFeature().getEntity();
      ElasticSearchManager elasticManager = ElasticSearchManager.getIndex("entity_index");
     
      BasicDBObject fields = new BasicDBObject();
      fields.put(EntityFeaturePojo.index_, 1);
      fields.put(EntityFeaturePojo.communityId_, 1);
     
      DBCursor cur = entityFeatureDB.find(query, fields).limit(nLimit);
        // (this internally works in batches of 1000)
      if (automatedRequest) {
        System.out.println("Found " + cur.count() + " records to delete from _id list");
      }
      else {
        System.out.println("Found " + cur.count() + " records to delete from " + query.toString());       
      }
      if (nLimit > 0) {
        System.out.println("(limited to " + nLimit + " records)");
      }
      int nArraySize = (cur.count() > 1000) ? 1000 : cur.count();
      ArrayList<EntityFeaturePojo> batchList = new ArrayList<EntityFeaturePojo>(nArraySize);     
     
      while (cur.hasNext())
      {
        EntityFeaturePojo gp = EntityFeaturePojo.fromDb(cur.next(),EntityFeaturePojo.class);
        batchList.add(gp);
        if (batchList.size() >= nArraySize) {
          internalDelete(batchList, elasticManager);
          batchList.clear();
        }
      }     
      if (!batchList.isEmpty()) {
        internalDelete(batchList, elasticManager);       
      }
      entityFeatureDB.remove(query);
     
    } catch (NumberFormatException e) {
      e.printStackTrace();
    } catch (MongoException e) {
      e.printStackTrace();
    }   
    finally
    {
    }
   
  }//TESTED
 
  // Batch delete
 
  private static void internalDelete(List<EntityFeaturePojo> entitiesToDelete, ElasticSearchManager esMgr) {
   
    List<String> esids = new ArrayList<String>(entitiesToDelete.size());
    for (EntityFeaturePojo gp: entitiesToDelete) {
      esids.add(new StringBuffer(gp.getIndex()).append(':').append(gp.getCommunityId().toString()).toString());
    }   
    esMgr.bulkDeleteDocuments(esids);
  }//TESTED
 
  //___________________________________________________________________________________________________
 
  // TEST CODE
 
  @SuppressWarnings("unused")
  private void doUnitTestCode(String sMongoDbHost, String sMongoDbPort, String sElasticHost, String sElasticPort,
      BasicDBObject query, int nLimit)
  {   
    Mongo mongoDB = null;
    ElasticSearchManager elasticManager = null;
   
   
    try {
      // Initialize the DB:
     
      mongoDB = new Mongo(sMongoDbHost, Integer.parseInt(sMongoDbPort));
      DBCollection gazDB = mongoDB.getDB("feature").getCollection("entity");
     
      // Initialize the ES (create the index if it doesn't already):
           
// 1. Set-up the entity feature index

      String indexName = "entity_index";
     
      //TEST: delete the index:
//      elasticManager = ElasticSearchManager.getIndex(indexName, sElasticHost + ":" + sElasticPort);
//      elasticManager.deleteMe();

      //TEST: create the index
//      String sMapping = new Gson().toJson(new GazateerPojo.Mapping(), GazateerPojo.Mapping.class);
//      Builder localSettings = ImmutableSettings.settingsBuilder();
//      localSettings.put("number_of_shards", 1).put("number_of_replicas", 0);       q
//      elasticManager = ElasticSearchManager.createIndex
//                (indexName, false,
//                    sElasticHost + ":" + sElasticPort,
//                    sMapping, localSettings);
     
      //TEST: delete the index:
//      elasticManager.deleteMe();
     
      //TEST: get the index:
//      elasticManager = ElasticSearchManager.getIndex(indexName, sElasticHost + ":" + sElasticPort);
     
// Now query the DB:
     
      DBCursor dbc = null;
      if (nLimit > 0) {
        dbc = gazDB.find(query).limit(nLimit);
      }
      else { // Everything!
        dbc = gazDB.find(query);      
      }
     
      Type listType = new TypeToken<ArrayList<EntityFeaturePojo>>() {}.getType();   
      List<EntityFeaturePojo> entities = new Gson().fromJson(dbc.toArray().toString(), listType);   
         
      //Debug:
      List<String> entIds = new LinkedList<String>();
     
      // Loop over array and invoke the cleansing function for each one

      for (EntityFeaturePojo ent: entities) {
       
        if (null != ent.getAlias()) { // (some corrupt gazateer entry)

          //Debug:
          //System.out.println("entity=" + ent.getGazateerIndex());
          //System.out.println("aliases=" + Arrays.toString(ent.getAlias().toArray()));
         
          // Insert into the elasticsearch index
         
          //Debug:
          //System.out.println(new Gson().toJson(ent, GazateerPojo.class));
         
          // Handle groups (system group is: "4c927585d591d31d7b37097a")
          if (null == ent.getCommunityId()) {
            ent.setCommunityId(new ObjectId("4c927585d591d31d7b37097a"));
          }
         
          //TEST: index documemt
//          ent.synchronizeWithIndex();
//          boolean b = elasticManager.addDocument(ent, ent.getGazateerIndex(), true);

          //TEST: remove document
          //b = elasticManager.removeDocument(ent.getGazateerIndex());
         
          //TEST: (part of get, bulk add/delete)
          entIds.add(ent.getIndex());
         
          // Debug:
//          if (!b) {
//            System.out.println("Didn't add " + ent.getGazateerIndex());           
//          }         
        }
       
      } // End loop over entities
     
      //TEST: bulk delete
      //elasticManager.bulkAddDocuments(entities, "index", null);
      //elasticManager.bulkDeleteDocuments(entIds);
     
      //TEST: get document
//      elasticManager.getRawClient().admin().indices().refresh(Requests.refreshRequest(indexName)).actionGet();
//      for (String id: entIds) {
//        Map<String, GetField> results = elasticManager.getDocument(id,"doccount", "disambiguated_name");
//        System.out.println(id + ": " + results.get("doccount").values().get(0) + " , " + results.get("disambiguated_name").values().get(0));
//      }
     
      //TEST: search
//      elasticManager.getRawClient().admin().indices().refresh(Requests.refreshRequest(indexName)).actionGet();
//      SearchRequestBuilder searchOptions = elasticManager.getSearchOptions();
//      XContentQueryBuilder queryObj = QueryBuilders.matchAllQuery();
//      searchOptions.addSort("doccount", SortOrder.DESC);
//      searchOptions.addFields("doccount", "type");
//      SearchResponse rsp = elasticManager.doQuery(queryObj, searchOptions);
//      SearchHit[] docs = rsp.getHits().getHits();
//      for (SearchHit hit: docs) {
//        String id = hit.getId();
//        Long doccount = (Long) hit.field("doccount").value();
//        String type = (String) hit.field("type").value();
//        System.out.println(id + ": " + doccount + ", " + type);
//      }     
     
    } catch (NumberFormatException e) {
      e.printStackTrace();
    } catch (UnknownHostException e) {
      e.printStackTrace();
    } catch (MongoException e) {
      e.printStackTrace();
    }   
    finally {
     
      if (null != mongoDB) {
        mongoDB.close();
      }
      if (null != elasticManager) {
        //NB not sure when exactly to call this - probably can just not bother?
        //elasticManager.getRawClient().close();
      }
    }
  }
 
}
TOP

Related Classes of com.ikanow.infinit.e.utility.MongoEntityFeatureTxfer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.