Package com.ikanow.infinit.e.processing.custom.scheduler

Source Code of com.ikanow.infinit.e.processing.custom.scheduler.CustomScheduleManager

/*******************************************************************************
* Copyright 2012, The Infinit.e Open Source Project.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
package com.ikanow.infinit.e.processing.custom.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.Set;

import org.bson.types.ObjectId;

import com.ikanow.infinit.e.data_model.api.knowledge.AdvancedQueryPojo;
import com.ikanow.infinit.e.data_model.control.DocumentQueueControlPojo;
import com.ikanow.infinit.e.data_model.store.DbManager;
import com.ikanow.infinit.e.data_model.store.MongoDbManager;
import com.ikanow.infinit.e.data_model.store.custom.mapreduce.CustomMapReduceJobPojo;
import com.ikanow.infinit.e.data_model.store.custom.mapreduce.CustomMapReduceJobPojo.SCHEDULE_FREQUENCY;
import com.ikanow.infinit.e.data_model.store.social.sharing.SharePojo;
import com.ikanow.infinit.e.data_model.store.social.sharing.SharePojo.ShareCommunityPojo;
import com.ikanow.infinit.e.data_model.utils.MongoApplicationLock;
import com.ikanow.infinit.e.processing.custom.utils.AuthUtils;
import com.ikanow.infinit.e.processing.custom.utils.PropertiesManager;
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DBObject;

public class CustomScheduleManager {

  /**
   * Check there are available slots for running
   */ 
  public static boolean availableSlots(PropertiesManager prop_custom)
  {
    int nMaxConcurrent = prop_custom.getHadoopMaxConcurrent();
    if (Integer.MAX_VALUE != nMaxConcurrent) {
      BasicDBObject maxQuery = new BasicDBObject(CustomMapReduceJobPojo.jobidS_, new BasicDBObject(DbManager.ne_, null));
      int nCurrRunningJobs = (int) DbManager.getCustom().getLookup().count(maxQuery);
      if (nCurrRunningJobs >= nMaxConcurrent) {
        return false;
      }
    }
    return true;
  }//TESTED
 
  ////////////////////////////////////////////////////////////////////
 
  /**
   * Look for jobs that have not started yet but are scheduled for some point in the future
   */
  public static CustomMapReduceJobPojo getJobsToRun(PropertiesManager prop_custom, boolean bLocalMode, boolean bHadoopEnabled)
  {
    try
    {
      // First off, check the number of running jobs - don't exceed the max
      // (seem to run into memory problems if this isn't limited?)
      if (!availableSlots(prop_custom)) {
        return null;
      }
     
      BasicDBObject query = new BasicDBObject();
      query.append(CustomMapReduceJobPojo.jobidS_, null);
      query.append(CustomMapReduceJobPojo.waitingOn_, new BasicDBObject(MongoDbManager.size_, 0));
      query.append(CustomMapReduceJobPojo.nextRunTime_, new BasicDBObject(MongoDbManager.lt_, new Date().getTime()));
      if (!bHadoopEnabled && !bLocalMode) {
        // Can only get shared queries:
        query.append("jarURL", null);
      }
      BasicDBObject updates = new BasicDBObject(CustomMapReduceJobPojo.jobidS_, "");
      updates.append("lastRunTime", new Date());
      BasicDBObject update = new BasicDBObject(MongoDbManager.set_, updates);
      DBObject dbo = DbManager.getCustom().getLookup().findAndModify(query,null,null,false,update,true,false);

      if ( dbo != null )
      {   
        return CustomMapReduceJobPojo.fromDb(dbo, CustomMapReduceJobPojo.class);
      }
    }
    catch(Exception ex)
    {
      //oh noes!
      ex.printStackTrace();
    }
   
    return null;
  }
 
  /**
   * Look for running jobs, decide if they are complete
   */
  public static CustomMapReduceJobPojo getJobsToMakeComplete(boolean bHadoopEnabled)
  {
    try
    {           
      BasicDBObject query = new BasicDBObject();
      BasicDBObject nors[] = new BasicDBObject[3];
      nors[0] = new BasicDBObject(CustomMapReduceJobPojo.jobidS_, null);
      nors[1] = new BasicDBObject(CustomMapReduceJobPojo.jobidS_, "CHECKING_COMPLETION");
      nors[2] = new BasicDBObject(CustomMapReduceJobPojo.jobidS_, "");
      query.put(MongoDbManager.nor_, Arrays.asList(nors));         
      BasicDBObject updates = new BasicDBObject(CustomMapReduceJobPojo.jobidS_, "CHECKING_COMPLETION");     
      BasicDBObject update = new BasicDBObject(MongoDbManager.set_, updates);
      if (!bHadoopEnabled) {
        // Can only get shared queries:
        query.append(CustomMapReduceJobPojo.jarURL_, null);
      }
      DBObject dbo = DbManager.getCustom().getLookup().findAndModify(query, update);

      if ( dbo != null )
      {   
        return CustomMapReduceJobPojo.fromDb(dbo, CustomMapReduceJobPojo.class);
      }
    }
    catch(Exception ex)
    {
      //oh noes!
      ex.printStackTrace();
    }
   
    return null;
  }
  /**
   * Uses a map reduce jobs schedule frequency to determine when the next
   * map reduce job should be ran.
   *
   * @param scheduleFreq
   * @param firstSchedule
   * @param iterations
   * @return
   */
  public static long getNextRunTime(SCHEDULE_FREQUENCY scheduleFreq, Date firstSchedule, long nextRuntime, int iterations)
  {
    if ((null == firstSchedule) || (0 == firstSchedule.getTime())) {
      firstSchedule = new Date(nextRuntime);
      iterations = 1; // recover...
    }//TESTED
   
    if ( scheduleFreq == null || SCHEDULE_FREQUENCY.NONE == scheduleFreq)
    {
      return Long.MAX_VALUE;
    }
    Calendar cal = new GregorianCalendar();
    cal.setTime(firstSchedule);
   
    if ( SCHEDULE_FREQUENCY.HOURLY == scheduleFreq)
    {
      cal.add(Calendar.HOUR, 1*iterations);
    }
    else if ( SCHEDULE_FREQUENCY.DAILY == scheduleFreq)
    {
      cal.add(Calendar.HOUR, 24*iterations);
    }
    else if ( SCHEDULE_FREQUENCY.WEEKLY == scheduleFreq)
    {
      cal.add(Calendar.DATE, 7*iterations);
    }
    else if ( SCHEDULE_FREQUENCY.MONTHLY == scheduleFreq)
    {
      cal.add(Calendar.MONTH, 1*iterations);
    }
    return cal.getTimeInMillis();
  }

  ////////////////////////////////////////////////////////////////////
  ////////////////////////////////////////////////////////////////////
  ////////////////////////////////////////////////////////////////////
 
  // Saved Query Handling

  // Somewhat confusingly there are now 2 different types of saved query:
  // The original one, handled by CustomSavedQueryTaskLauncher
  // - this "re-uses" the CustomMapReducePojo object
  // - stores the entire query result in one object, including aggregations (can easily >16MB and break)
  // - you can store snapshots in append mode
  // - never really found a use
  // The new one, handled via DocumentQueueControlPojos embedded in shares
  // - writes the _ids into the shares, the existing API query is required to get them out
 
  private static MongoApplicationLock _appLock = null;
 
  public static void doneWithSavedQueryCache()
  {
    _appLock.release();
  }
 
  public static void createOrUpdatedSavedQueryCache()
  {
    if (null == _appLock) {
      _appLock = MongoApplicationLock.getLock(DbManager.getCustom().getSavedQueryCache().getDB().getName());
    }
    // the built-in applock acquisition code requires more persistent threads so we use the alternate mechanism that
    // just wipes out anything that hasn't been used in the last 2 minutes and then uses the existing contention handling to
    // allow one thread to grab it
    _appLock.clearStaleLocksOnTime(120);
    if (_appLock.acquire(100)) {
      BasicDBObject query = new BasicDBObject(SharePojo.type_, DocumentQueueControlPojo.SavedQueryQueue);
      List<SharePojo> savedQueries = SharePojo.listFromDb(DbManager.getSocial().getShare().find(query), SharePojo.listType());
      if (null != savedQueries) {
        for (SharePojo savedQueryShare: savedQueries) {
          if (null != savedQueryShare.getShare()) {
            DocumentQueueControlPojo savedQuery = DocumentQueueControlPojo.fromApi(savedQueryShare.getShare(), DocumentQueueControlPojo.class);
           
            // Is this query well formed?
            if ((null != savedQuery.getQueryInfo()) &&
                ((null != savedQuery.getQueryInfo().getQuery()) || (null != savedQuery.getQueryInfo().getQueryId())))
            {
              Date now = new Date();
              long freqOffset;
              // Check if it's time to run the query
              if (savedQuery.getQueryInfo().getFrequency() == DocumentQueueControlPojo.SavedQueryInfo.DocQueueFrequency.Hourly)
              {
                freqOffset = 3600L*1000L;
                //(nothing to do here, just run whenever)
              }//TESTED (test3)
              else if (savedQuery.getQueryInfo().getFrequency() == DocumentQueueControlPojo.SavedQueryInfo.DocQueueFrequency.Daily)
              {
                if (null != savedQuery.getQueryInfo().getFrequencyOffset()) { // hour of day
                  freqOffset = 12L*3600L*1000L; // (already check vs hour of day so be more relaxed)
                  Calendar calendar = GregorianCalendar.getInstance();
                  calendar.setTime(now);

                  //DEBUG
                  //System.out.println("DAILY: " + calendar.get(Calendar.HOUR_OF_DAY) + " VS " + savedQuery.getQueryInfo().getFrequencyOffset()); 
                 
                  if (calendar.get(Calendar.HOUR_OF_DAY) != savedQuery.getQueryInfo().getFrequencyOffset()) {
                    continue;
                  }//TESTED (test4)
                }
                else {
                  freqOffset = 24L*3600L*1000L; // (just run every 24 hours)                  
                }
              }//TESTED (test4)
              else if (savedQuery.getQueryInfo().getFrequency() == DocumentQueueControlPojo.SavedQueryInfo.DocQueueFrequency.Weekly)
              {
                if (null != savedQuery.getQueryInfo().getFrequencyOffset()) { // day of week
                  freqOffset = 3L*24L*3600L*1000L; // (already check vs day of week so be more relaxed)
                  Calendar calendar = GregorianCalendar.getInstance();
                  calendar.setTime(now);

                  //DEBUG
                  //System.out.println("WEEKLY: " + calendar.get(Calendar.DAY_OF_WEEK) + " VS " + savedQuery.getQueryInfo().getFrequencyOffset());
                 
                  if (calendar.get(Calendar.DAY_OF_WEEK) != savedQuery.getQueryInfo().getFrequencyOffset()) {
                    continue;
                  }
                }
                else {
                  freqOffset = 7L*24L*3600L*1000L//(just run every 7 days)
                }
              }//TESTED (test5)
              else continue; // (no -supported- frequency, don't run)
             
              long nowTime = now.getTime();

              //DEBUG
              //System.out.println("Comparing: " + savedQuery.getQueryInfo().getLastRun() + " VS " + now + " @ " + freqOffset/1000L);
             
              if ((null == savedQuery.getQueryInfo().getLastRun()) ||
                  ((nowTime - savedQuery.getQueryInfo().getLastRun().getTime()) > freqOffset))
              {
                //(does nothing if the share already exists)
                DbManager.getCustom().getSavedQueryCache().insert(savedQueryShare.toDb());
                CommandResult cr = DbManager.getCustom().getSavedQueryCache().getDB().getLastError();
               
                if (null == cr.get("err")) { // if we've actually done something, update the main share table also
                  savedQuery.getQueryInfo().setLastRun(now);
                  savedQueryShare.setShare(savedQuery.toApi());
                  // (this will overwrite the existing version)
                  DbManager.getSocial().getShare().save(savedQueryShare.toDb());               
                }//TESTED (by hand with prints)
               
              }//TESTED (test3-5)
             
            } // (end saved query actually has a query)
          }
        }//(end loop over saved queries)
      }
    }//(end acquired app lock)
  }//TESTED

  public static DocumentQueueControlPojo getSavedQueryToRun()
  {
    DocumentQueueControlPojo toReturn = null;
    try {
      SharePojo savedQueryShare = SharePojo.fromDb(DbManager.getCustom().getSavedQueryCache().findAndRemove(new BasicDBObject()), SharePojo.class);
      if (null == savedQueryShare) { // nothing to process
        return null;
      }//TESTED (test1)
      toReturn = DocumentQueueControlPojo.fromApi(savedQueryShare.getShare(), DocumentQueueControlPojo.class);
     
      // Get the user communities and append the query if possible
      Set<ObjectId> userAccess = AuthUtils.getCommunities(savedQueryShare.getOwner().get_id());
      if (userAccess.isEmpty()) {
        return toReturn; // (_parentShare is null so will be discarded)
      }
      if (null != toReturn.getQueryInfo().getQueryId()) {
        BasicDBObject queryQuery = new BasicDBObject(SharePojo._id_, toReturn.getQueryInfo().getQueryId());
        queryQuery.put(ShareCommunityPojo.shareQuery_id_, new BasicDBObject(DbManager.in_, userAccess));
        SharePojo shareContainingQuery = SharePojo.fromDb(DbManager.getSocial().getShare().findOne(queryQuery), SharePojo.class);

        if (null == shareContainingQuery) {
          return toReturn; // (_parentShare is null so will be discarded)
        }
        toReturn.getQueryInfo().setQuery(AdvancedQueryPojo.fromApi(shareContainingQuery.getShare(), AdvancedQueryPojo.class));
      }//TESTED (test6)
     
      if (null != toReturn.getQueryInfo().getQuery()) {
        // Check the communityIds...
        if (null != toReturn.getQueryInfo().getQuery().communityIds) {
          ArrayList<ObjectId> revisedCommunityList = new ArrayList<ObjectId>(toReturn.getQueryInfo().getQuery().communityIds.size());
          for (ObjectId commId: toReturn.getQueryInfo().getQuery().communityIds) {
            if (userAccess.contains(commId)) {
              revisedCommunityList.add(commId);
            }
          }//(end loop over unchecked communities)
          toReturn.getQueryInfo().getQuery().communityIds = revisedCommunityList;
        }
      }//TESTED (test1)
      toReturn._parentShare = savedQueryShare; // (if this is null then the subsequent processing will ignore this)
    }
    catch (Exception e) { // (this is some internal horror so log)
      e.printStackTrace();
    }
    return toReturn;
  }//TESTED (test1,test6)
 
}
TOP

Related Classes of com.ikanow.infinit.e.processing.custom.scheduler.CustomScheduleManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.