Package com.ikanow.infinit.e.application.data_model

Examples of com.ikanow.infinit.e.application.data_model.TestLogstashExtractorPojo


    {
      //DEBUG
      //System.out.println("HOST: " + slaveHostname + ": RECEIVED: " + nextElement.toString() + " FROM " + queueQuery);
      _logger.info("host=" + slaveHostname + " received=" + nextElement.toString() + " from=" + queueQuery);
     
      TestLogstashExtractorPojo testInfo = TestLogstashExtractorPojo.fromDb(nextElement, TestLogstashExtractorPojo.class);
      if (null == testInfo.sourceKey) {
        continue; // need a sourceKey parameter...
      }
      if (!isSlave) { // slaves don't need to delete anything from the index, only files
        secondaryQueue.add(testInfo);
      }//(end if master)
       
      try {
        // First off - need to remove the conf file and restart logstash if we're actually deleting this...
        boolean deletedSource = false;
        if ((null == testInfo.deleteDocsOnly) || !testInfo.deleteDocsOnly) { // (default = delete entire source)
          deletedSources = true;
          deletedSource = true;
         
          String fileToDelete = new StringBuffer(LOGSTASH_CONFIG).append(testInfo._id.toString()).append(LOGSTASH_CONFIG_EXTENSION).toString();
         
          boolean deleted = false;
          try {
            deleted = new File(fileToDelete).delete();
          }
          catch (Exception e) {}
         
          //DEBUG
          //System.out.println("DELETED CONF FILE" + fileToDelete + " ? " + deleted);
          _logger.info("delete conf_file=" + fileToDelete + " success=" + deleted);
        }//TESTED (docs-only + source deletion)
       
        // If _not_ deleting the source, then do delete the sincedb file
        // (else let it get cleaned up separately - minimizes race conditions where the source starts ingesting again)
        String fileToDelete = new StringBuffer(LOGSTASH_WD).append(".sincedb_").append(testInfo._id.toString()).toString();
        if (!deletedSource) {
         
          boolean deleted = false;
          try {
            deleted = new File(fileToDelete).delete();
            deletedSinceDbs |= deleted;
          }
          catch (Exception e) {}
         
          //DEBUG
          //System.out.println("DELETED SINCEDB" + fileToDelete + " ? " + deletedSinceDb);
          _logger.info("primary delete sincedb_file=" + fileToDelete + " success=" + deleted);
        }
        else {
          deleteAfterRestartQueue.add(fileToDelete);
        }//TESTED (primary + secondary deletes)
       
      }
      catch (Exception e) {
        //e.printStackTrace();       
      } // probably just doesn't exist       
     
      // Get next element and carry on
      nextElement = _logHarvesterQ.pop(queueQuery);
     
    }//TESTED (end first loop over elements to delete)
   
    if (deletedSources || deletedSinceDbs) { // this file actually existed - need to restart the logstash unfortunately
      _logger.info("Restarting logstash, and sleeping until logstash is restarted");
      try {
        new File(LOGSTASH_RESTART_FILE).createNewFile()
        for (int i = 0; i < 12; ++i) {
          Thread.sleep(10L*1000L);
          if (!new File(LOGSTASH_RESTART_FILE).exists()) {
            Thread.sleep(5L*1000L); // (extra wait for it to shut down)
            break; // (early exit)
          }
        }
      }
      catch (Exception e) {}         
    }//TESTED (from doc deletion and from src deletion)
   
    for (String fileToDelete: deleteAfterRestartQueue) {
      boolean deleted = false;
      try {
        deleted = new File(fileToDelete).delete();
      }
      catch (Exception e) {}
     
      //DEBUG
      //System.out.println("DELETED SINCEDB" + fileToDelete + " ? " + deletedSinceDb);
      _logger.info("secondary delete sincedb_file=" + fileToDelete + " success=" + deleted);     
    }//TESTED (primary and secondary deletion)
   
    for (TestLogstashExtractorPojo testInfo: secondaryQueue) {
     
      String commIdStr = testInfo.deleteOnlyCommunityId.toString();
     
      // Get all the indexes that might need to be cleansed:
      ElasticSearchManager indexMgr = ElasticSearchManager.getIndex(DUMMY_INDEX);
     
      // Stashed index
     
      ArrayList<String> indices = new ArrayList<String>();
     
      String stashedIndex = "recs_" + commIdStr;
      ClusterStateResponse retVal = indexMgr.getRawClient().admin().cluster().prepareState()
          .setIndices(stashedIndex)
          .setRoutingTable(false).setNodes(false).setListenerThreaded(false).get();
     
      if (!retVal.getState().getMetaData().getIndices().isEmpty()) {
        indices.add(stashedIndex);
      } // (else doesn't exist...)
     
      // Live indexes:
     
      String indexPattern = new StringBuffer("recs_t_").append(commIdStr).append("*").toString();
      retVal = indexMgr.getRawClient().admin().cluster().prepareState()
          .setIndices(indexPattern)
          .setRoutingTable(false).setNodes(false).setListenerThreaded(false).get();

      for (IndexMetaData indexMetadata: retVal.getState().getMetaData()) {
        //DEBUG
        //System.out.println("INDEX=" + indexMetadata.index());
        indices.add(indexMetadata.index());           
      }
      deleteSourceKeyRecords(indexMgr, indices.toArray(new String[0]), testInfo.sourceKey);
     
      _logger.info("Deleted key=" + testInfo.sourceKey + " from indexes=" + ArrayUtils.toString(indices.toArray()));
     
      // Now I've deleted, go and distribute the deletion messages to the slaves
      if ((null != testInfo.distributed) && testInfo.distributed) {
        // Copy into the slaves' queue
        DBCursor dbc = DbManager.getIngest().getLogHarvesterSlaves().find();
        while (dbc.hasNext()) {           
          BasicDBObject slave = (BasicDBObject) dbc.next();
          testInfo.forSlave = slave.getString("_id");
          _logHarvesterQ.push(testInfo.toDb());
          testInfo.forSlave = null;

          //DEBUG
          //System.out.println("DISTRIBUTING DELETION MESSAGE TO " + slave.toString());
          _logger.info("distributing deletion message to host=" + slave.toString());
View Full Code Here


    while ( nextElement != null )
    {
      //DEBUG
      //System.out.println("FOUND: " + nextElement.toString());

      TestLogstashExtractorPojo testInfo = TestLogstashExtractorPojo.fromDb(nextElement, TestLogstashExtractorPojo.class);
      if ((null == testInfo.maxDocs) || (null == testInfo.logstash.config) || (null == testInfo.isAdmin) || (null == testInfo.sourceKey))
      {
        TestLogstashExtractorPojo testErr = new TestLogstashExtractorPojo();
        testErr._id = testInfo._id;
        testErr.error = "Internal Logic Error. Missing one of: maxDocs, isAdmin, sourceKey, logstash.config";
        _logHarvesterQ.push(testErr.toDb());

        return;
      }//TESTED
     
      // Validate/tranform the configuration:
      StringBuffer errMessage = new StringBuffer();
      String logstashConfig = LogstashConfigUtils.validateLogstashInput(testInfo.sourceKey, testInfo.logstash.config, errMessage, testInfo.isAdmin);
      if (null == logstashConfig) { // Validation error...
        TestLogstashExtractorPojo testErr = new TestLogstashExtractorPojo();
        testErr._id = testInfo._id;
        testErr.error = "Validation error: " + errMessage.toString();
        _logHarvesterQ.push(testErr.toDb());

        return;       
      }//TESTED
     
      // Replacement for #LOGSTASH{host} - currently only replacement supported (+ #IKANOW{} in main code)
      try {
        logstashConfig = logstashConfig.replace("#LOGSTASH{host}", java.net.InetAddress.getLocalHost().getHostName());
      }
      catch (Exception e) {
        logstashConfig = logstashConfig.replace("#LOGSTASH{host}", "localhost.localdomain");       
      }
      //TESTED
     
      String outputConf = _testOutputTemplate.replace("_XXX_COLLECTION_XXX_", testInfo._id.toString()); //TESTED
      String sinceDbPath = LOGSTASH_WD + ".sincedb_" + testInfo._id.toString();
      String conf = logstashConfig.replace("_XXX_DOTSINCEDB_XXX_", sinceDbPath) + outputConf.replace("_XXX_SOURCEKEY_XXX_", testInfo.sourceKey);

      boolean allWorked = false;
      Process logstashProcess = null;
      try {
        // 1] Create the process

        ArrayList<String> args = new ArrayList<String>(4);
        args.addAll(Arrays.asList(LOGSTASH_BINARY, "-e", conf));
        if (0 == testInfo.maxDocs) {
          args.add("-t"); // test mode, must faster
        }//TESTED
       
        if ((null != testInfo.logstash.testDebugOutput) && testInfo.logstash.testDebugOutput) {
          args.add("--debug");
        }
        else {
          args.add("--verbose");         
        }
        ProcessBuilder logstashProcessBuilder = new ProcessBuilder(args);
        logstashProcessBuilder = logstashProcessBuilder.directory(new File(LOGSTASH_WD)).redirectErrorStream(true);
        logstashProcessBuilder.environment().put("JAVA_OPTS", "");

        //DEBUG
        //System.out.println("STARTING: " + ArrayUtils.toString(logstashProcessBuilder.command().toArray()));


        // 2] Kick off the process
        logstashProcess = logstashProcessBuilder.start();
        StringWriter outputAndError = new StringWriter();
        OutputCollector outAndErrorStream = new OutputCollector(logstashProcess.getInputStream(), new PrintWriter(outputAndError));
        outAndErrorStream.start();
        final int toWait_s = 240;

        boolean exited = false;

        // 3] Check the output collection for records
       
        int errorVal = 0;
        long priorCount = 0L;
        int priorLogCount = 0;
       
        int timeOfLastLoggingChange = 0;
        int timeOfLastDocCountChange = 0;
       
        String reasonForExit = "";
       
        int inactivityTimeout_s = 10; // (default)
        if (null != testInfo.logstash.testInactivityTimeout_secs) {
          inactivityTimeout_s = testInfo.logstash.testInactivityTimeout_secs;
        }
        for (int i = 0; i < toWait_s; i += 5) {
          try {
            Thread.sleep(5000);
          }
          catch (Exception e) {}

          long count = DbManager.getCollection("ingest", testInfo._id.toString()).count();
         
          // 3.1] Do we have all the records (or is the number staying static)
         
          //DEBUG
          //System.out.println("FOUND: " + count + " VS " + priorCount + " , " + priorPriorCount);
         
          // 3.1a] All done?
         
          if ((count >= testInfo.maxDocs) && (count > 0))
          {
            allWorked = true;
            break;
          }//TESTED         
         
          // 3.1b] If not, has anything changes?
         
          if (priorCount != count) {
            timeOfLastDocCountChange = i;
          }
          if (priorLogCount != outAndErrorStream.getLines()) {
            timeOfLastLoggingChange = i;
          }
         
          // 3.1c] Check for inactivity
         
          if ((timeOfLastDocCountChange > 0) &&
              (i - timeOfLastDocCountChange) >= inactivityTimeout_s)
          {
            // Delay between events: treat as success
            allWorked = true;           
            break;
          }//TESTED
         
          if ((0 == count) && outAndErrorStream.getPipelineStarted() &&
              ((timeOfLastLoggingChange > 0) &&
              (i - timeOfLastLoggingChange) >= inactivityTimeout_s))
          {
            // Delay between log messages after pipeline started, no documents, treat as failure
           
            //DEBUG
            //System.out.println("LOG LINES! " + i + " NUM = " + outAndErrorStream.getLines());
           
            errorVal = 1;
            reasonForExit = "No records received and logging inactive.\n";
            break;
          }//TESTED         
         
          // 3.2] Has the process exited unexpectedly?
         
          try {
            errorVal = logstashProcess.exitValue();
            reasonForExit = "Logstash process exited with error: " + errorVal + ".\n";
            exited = true;

            //DEBUG
            //System.out.println("GOT EXIT VALUE: " + errorVal);
            break;

          }//TESTED
          catch (Exception e) {} // that's OK we're just still going is all...
         
          priorCount = count;   
          priorLogCount = outAndErrorStream.getLines();
         
        } //(end loop while waiting for job to complete)       
       
        // 4] If the process is still running then kill it
       
        if (!exited) {
          //DEBUG
          //System.out.println("EXITED WITHOUT FINISHING");

          logstashProcess.destroy();
        }//TESTED

        // 5] Things to do when the job is done: (worked or not)
        //    Send a message to the harvester
       
        outAndErrorStream.join(); // (if we're here then must have closed the process, wait for it to die)
       
        TestLogstashExtractorPojo testErr = new TestLogstashExtractorPojo();
        testErr._id = testInfo._id;
        if ((testInfo.maxDocs > 0) || (0 != errorVal)) {
          testErr.error = reasonForExit + outputAndError.toString();
            // (note this is capped at well below the BSON limit in the thread below)
        }
        else { // maxDocs==0 (ie pre-publish test) AND no error returned
          testErr.error = null;
        }
        _logHarvesterQ.push(testErr.toDb());
        //TESTED       
      }
      catch (Exception e) {
        //DEBUG
        //e.printStackTrace();       

        TestLogstashExtractorPojo testErr = new TestLogstashExtractorPojo();
        testErr._id = testInfo._id;
        testErr.error = "Internal Logic Error: " + e.getMessage();
        _logHarvesterQ.push(testErr.toDb());

      }//TOTEST
      finally {
        // If we created a sincedb path then remove it:
        try {
View Full Code Here

TOP

Related Classes of com.ikanow.infinit.e.application.data_model.TestLogstashExtractorPojo

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.