Package com.ikanow.infinit.e.application.handlers.polls

Source Code of com.ikanow.infinit.e.application.handlers.polls.LogstashSourceDeletionPollHandler

/*******************************************************************************
* Copyright 2012 The Infinit.e Open Source Project sponsored by IKANOW.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package com.ikanow.infinit.e.application.handlers.polls;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;

import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.query.QueryBuilders;

import com.ikanow.infinit.e.application.data_model.TestLogstashExtractorPojo;
import com.ikanow.infinit.e.application.utils.MongoQueue;
import com.ikanow.infinit.e.data_model.index.ElasticSearchManager;
import com.ikanow.infinit.e.data_model.store.DbManager;
import com.ikanow.infinit.e.data_model.store.document.DocumentPojo;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;

public class LogstashSourceDeletionPollHandler implements PollHandler {

  private static final Logger _logger = Logger.getLogger(LogstashSourceDeletionPollHandler.class);
 
  public static String LOGSTASH_CONFIG = "/opt/logstash-infinite/logstash.conf.d/";
  public static String LOGSTASH_CONFIG_DISTRIBUTED = "/opt/logstash-infinite/dist.logstash.conf.d/";
  public static String LOGSTASH_WD = "/opt/logstash-infinite/logstash/";
  public static String LOGSTASH_RESTART_FILE = "/opt/logstash-infinite/RESTART_LOGSTASH";
  public static String LOGSTASH_CONFIG_EXTENSION = ".auto.conf";
 
  final static private String DUMMY_INDEX = "doc_dummy"; // (guaranteed to exist)
   
  private MongoQueue _logHarvesterQ = null;
  @Override
  public void performPoll() {
   
    boolean isSlave = false;
   
    if (null == LOGSTASH_CONFIG) { // (static memory not yet initialized)
      try {
        Thread.sleep(1000); // (extend the sleep time a bit)
      }
      catch (Exception e) {}
      return;     
    }
   
    File logstashDirectory = new File(LOGSTASH_CONFIG);
    String slaveHostname = null;
    if (!logstashDirectory.isDirectory() || !logstashDirectory.canRead() || !logstashDirectory.canWrite())
    {
      logstashDirectory = new File(LOGSTASH_CONFIG_DISTRIBUTED);
      isSlave = true;
      if (!logstashDirectory.isDirectory() || !logstashDirectory.canRead() || !logstashDirectory.canWrite())
      {       
        try {
          Thread.sleep(10000); // (extend the sleep time a bit)
        }
        catch (Exception e) {}
        return;
      }
      try {
        slaveHostname = java.net.InetAddress.getLocalHost().getHostName();
      }
      catch (Exception e) { // too complex if we don't have a hostname, just return
        return;
      }
    }
   
    // Deletion of distributed sources requires some co-ordination, we'll do it in master
   
    if (isSlave) { // register my existence
      BasicDBObject existence = new BasicDBObject("_id", slaveHostname);
      existence.put("ping", new Date());
      DbManager.getIngest().getLogHarvesterSlaves().save(existence);
    }//TESTED (by hand)
    else { // MASTER: clear out old slaves
      // (if it hasn't pinged for more than 30 minutes)
      long now = new Date().getTime();
      BasicDBObject deadSlaveQuery = new BasicDBObject("ping", new BasicDBObject(DbManager.lt_, new Date(now - 1000L*1800L)));
      boolean found = false;
      DBCursor dbc = DbManager.getIngest().getLogHarvesterSlaves().find(deadSlaveQuery);
      while (dbc.hasNext()) {
        BasicDBObject deadSlave = (BasicDBObject) dbc.next();
        found = true;
        String hostname = deadSlave.getString("_id");
        if (null != hostname) {
          DbManager.getIngest().getLogHarvesterQ().remove(new BasicDBObject("forSlave", hostname));     
         
          _logger.info("Removing unresponsive slave host=" + hostname);
        }
      }
      if (found) {
        DbManager.getIngest().getLogHarvesterSlaves().remove(deadSlaveQuery);
      }     
    }//TESTED (by hand)

    // Read delete elements from the Q...
   
    if (null == _logHarvesterQ) {
      _logHarvesterQ = new MongoQueue(DbManager.getIngest().getLogHarvesterQ().getDB().getName(), DbManager.getIngest().getLogHarvesterQ().getName());
    }
    BasicDBObject queueQuery = new BasicDBObject("deleteOnlyCommunityId", new BasicDBObject(DbManager.exists_, true));
    if (!isSlave) { // only get master messages
      queueQuery.put("forSlave", new BasicDBObject(DbManager.exists_, false));
    }
    else { // only get messages intended for me
      queueQuery.put("forSlave", slaveHostname);
    }
    DBObject nextElement = _logHarvesterQ.pop(queueQuery);
    LinkedList<TestLogstashExtractorPojo> secondaryQueue = new LinkedList<TestLogstashExtractorPojo>();
    LinkedList<String> deleteAfterRestartQueue = new LinkedList<String>();
    boolean deletedSources = false;
    boolean deletedSinceDbs = false;
    while ( nextElement != null )
    {
      //DEBUG
      //System.out.println("HOST: " + slaveHostname + ": RECEIVED: " + nextElement.toString() + " FROM " + queueQuery);
      _logger.info("host=" + slaveHostname + " received=" + nextElement.toString() + " from=" + queueQuery);
     
      TestLogstashExtractorPojo testInfo = TestLogstashExtractorPojo.fromDb(nextElement, TestLogstashExtractorPojo.class);
      if (null == testInfo.sourceKey) {
        continue; // need a sourceKey parameter...
      }
      if (!isSlave) { // slaves don't need to delete anything from the index, only files
        secondaryQueue.add(testInfo);
      }//(end if master)
       
      try {
        // First off - need to remove the conf file and restart logstash if we're actually deleting this...
        boolean deletedSource = false;
        if ((null == testInfo.deleteDocsOnly) || !testInfo.deleteDocsOnly) { // (default = delete entire source)
          deletedSources = true;
          deletedSource = true;
         
          String fileToDelete = new StringBuffer(LOGSTASH_CONFIG).append(testInfo._id.toString()).append(LOGSTASH_CONFIG_EXTENSION).toString();
         
          boolean deleted = false;
          try {
            deleted = new File(fileToDelete).delete();
          }
          catch (Exception e) {}
         
          //DEBUG
          //System.out.println("DELETED CONF FILE" + fileToDelete + " ? " + deleted);
          _logger.info("delete conf_file=" + fileToDelete + " success=" + deleted);
        }//TESTED (docs-only + source deletion)
       
        // If _not_ deleting the source, then do delete the sincedb file
        // (else let it get cleaned up separately - minimizes race conditions where the source starts ingesting again)
        String fileToDelete = new StringBuffer(LOGSTASH_WD).append(".sincedb_").append(testInfo._id.toString()).toString();
        if (!deletedSource) {
         
          boolean deleted = false;
          try {
            deleted = new File(fileToDelete).delete();
            deletedSinceDbs |= deleted;
          }
          catch (Exception e) {}
         
          //DEBUG
          //System.out.println("DELETED SINCEDB" + fileToDelete + " ? " + deletedSinceDb);
          _logger.info("primary delete sincedb_file=" + fileToDelete + " success=" + deleted);
        }
        else {
          deleteAfterRestartQueue.add(fileToDelete);
        }//TESTED (primary + secondary deletes)
       
      }
      catch (Exception e) {
        //e.printStackTrace();       
      } // probably just doesn't exist       
     
      // Get next element and carry on
      nextElement = _logHarvesterQ.pop(queueQuery);
     
    }//TESTED (end first loop over elements to delete)
   
    if (deletedSources || deletedSinceDbs) { // this file actually existed - need to restart the logstash unfortunately
      _logger.info("Restarting logstash, and sleeping until logstash is restarted");
      try {
        new File(LOGSTASH_RESTART_FILE).createNewFile()
        for (int i = 0; i < 12; ++i) {
          Thread.sleep(10L*1000L);
          if (!new File(LOGSTASH_RESTART_FILE).exists()) {
            Thread.sleep(5L*1000L); // (extra wait for it to shut down)
            break; // (early exit)
          }
        }
      }
      catch (Exception e) {}         
    }//TESTED (from doc deletion and from src deletion)
   
    for (String fileToDelete: deleteAfterRestartQueue) {
      boolean deleted = false;
      try {
        deleted = new File(fileToDelete).delete();
      }
      catch (Exception e) {}
     
      //DEBUG
      //System.out.println("DELETED SINCEDB" + fileToDelete + " ? " + deletedSinceDb);
      _logger.info("secondary delete sincedb_file=" + fileToDelete + " success=" + deleted);     
    }//TESTED (primary and secondary deletion)
   
    for (TestLogstashExtractorPojo testInfo: secondaryQueue) {
     
      String commIdStr = testInfo.deleteOnlyCommunityId.toString();
     
      // Get all the indexes that might need to be cleansed:
      ElasticSearchManager indexMgr = ElasticSearchManager.getIndex(DUMMY_INDEX);
     
      // Stashed index
     
      ArrayList<String> indices = new ArrayList<String>();
     
      String stashedIndex = "recs_" + commIdStr;
      ClusterStateResponse retVal = indexMgr.getRawClient().admin().cluster().prepareState()
          .setIndices(stashedIndex)
          .setRoutingTable(false).setNodes(false).setListenerThreaded(false).get();
     
      if (!retVal.getState().getMetaData().getIndices().isEmpty()) {
        indices.add(stashedIndex);
      } // (else doesn't exist...)
     
      // Live indexes:
     
      String indexPattern = new StringBuffer("recs_t_").append(commIdStr).append("*").toString();
      retVal = indexMgr.getRawClient().admin().cluster().prepareState()
          .setIndices(indexPattern)
          .setRoutingTable(false).setNodes(false).setListenerThreaded(false).get();

      for (IndexMetaData indexMetadata: retVal.getState().getMetaData()) {
        //DEBUG
        //System.out.println("INDEX=" + indexMetadata.index());
        indices.add(indexMetadata.index());           
      }
      deleteSourceKeyRecords(indexMgr, indices.toArray(new String[0]), testInfo.sourceKey);
     
      _logger.info("Deleted key=" + testInfo.sourceKey + " from indexes=" + ArrayUtils.toString(indices.toArray()));
     
      // Now I've deleted, go and distribute the deletion messages to the slaves
      if ((null != testInfo.distributed) && testInfo.distributed) {
        // Copy into the slaves' queue
        DBCursor dbc = DbManager.getIngest().getLogHarvesterSlaves().find();
        while (dbc.hasNext()) {           
          BasicDBObject slave = (BasicDBObject) dbc.next();
          testInfo.forSlave = slave.getString("_id");
          _logHarvesterQ.push(testInfo.toDb());
          testInfo.forSlave = null;

          //DEBUG
          //System.out.println("DISTRIBUTING DELETION MESSAGE TO " + slave.toString());
          _logger.info("distributing deletion message to host=" + slave.toString());
        }
      }//TESTED (by hand)
    }//(end loop over secondary queue, ie to actually delete the indexes)   
   
  }
  protected void deleteSourceKeyRecords(ElasticSearchManager indexMgr, String[] indices, String sourceKey) {
    indexMgr.getRawClient().prepareDeleteByQuery().setIndices(indices)
                  .setQuery(QueryBuilders.termQuery(DocumentPojo.sourceKey_, sourceKey))
                  .setConsistencyLevel(WriteConsistencyLevel.ONE)
                  .execute().actionGet();
  }//TESTED
}
TOP

Related Classes of com.ikanow.infinit.e.application.handlers.polls.LogstashSourceDeletionPollHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.