Package org.apache.sling.event.impl

Source Code of org.apache.sling.event.impl.JobEventHandler

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.event.impl;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import javax.jcr.ItemExistsException;
import javax.jcr.Node;
import javax.jcr.NodeIterator;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.observation.EventIterator;
import javax.jcr.query.Query;
import javax.jcr.query.QueryManager;

import org.apache.jackrabbit.util.ISO8601;
import org.apache.sling.commons.osgi.OsgiUtil;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.JobStatusProvider;
import org.osgi.framework.BundleEvent;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;


/**
* An event handler handling special job events.
*
* @scr.component label="%job.events.name" description="%job.events.description"
* @scr.service interface="org.apache.sling.event.JobStatusProvider"
* @scr.property name="event.topics" refValues="EventUtil.TOPIC_JOB"
*               values.updated="org/osgi/framework/BundleEvent/UPDATED"
*               values.started="org/osgi/framework/BundleEvent/STARTED"
*               private="true"
* @scr.property name="repository.path" value="/var/eventing/jobs" private="true"
* We schedule this event handler to run in the background and clean up
* obsolete events.
* @scr.service interface="java.lang.Runnable"
* @scr.property name="scheduler.period" value="600" type="Long"
* @scr.property name="scheduler.concurrent" value="false" type="Boolean" private="true"
*/
public class JobEventHandler
    extends AbstractRepositoryEventHandler
    implements EventUtil.JobStatusNotifier, JobStatusProvider, Runnable {

    /** The topic prefix for bundle events. */
    protected static final String BUNDLE_EVENT_PREFIX = BundleEvent.class.getName().replace('.', '/') + '/';

    /** A map for keeping track of currently processed job topics. */
    protected final Map<String, Boolean> processingMap = new HashMap<String, Boolean>();

    /** Default sleep time. */
    protected static final long DEFAULT_SLEEP_TIME = 20;

    /** @scr.property valueRef="DEFAULT_SLEEP_TIME" */
    protected static final String CONFIG_PROPERTY_SLEEP_TIME = "sleep.time";

    /** Default number of job retries. */
    protected static final int DEFAULT_MAX_JOB_RETRIES = 10;

    /** @scr.property valueRef="DEFAULT_MAX_JOB_RETRIES" */
    protected static final String CONFIG_PROPERTY_MAX_JOB_RETRIES = "max.job.retries";

    /** We check every 20 secs by default. */
    protected long sleepTime;

    /** How often should a job be retried by default. */
    protected int maxJobRetries;

    /** Background session. */
    protected Session backgroundSession;

    /** Unloaded jobs. */
    protected Set<String>unloadedJobs = new HashSet<String>();

    /** List of deleted jobs. */
    protected Set<String>deletedJobs = new HashSet<String>();

    /** Default clean up time is 10 minutes. */
    protected static final int DEFAULT_CLEANUP_PERIOD = 10;

    /** @scr.property valueRef="DEFAULT_CLEANUP_PERIOD" type="Integer" */
    protected static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";

    /** We remove everything which is older than 5 min by default. */
    protected int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;

    /**
     * Activate this component.
     * @param context
     * @throws RepositoryException
     */
    protected void activate(final ComponentContext context)
    throws Exception {
        @SuppressWarnings("unchecked")
        final Dictionary<String, Object> props = context.getProperties();
        this.cleanupPeriod = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
        this.sleepTime = OsgiUtil.toLong(props.get(CONFIG_PROPERTY_SLEEP_TIME), DEFAULT_SLEEP_TIME);
        this.maxJobRetries = OsgiUtil.toInteger(props.get(CONFIG_PROPERTY_MAX_JOB_RETRIES), DEFAULT_MAX_JOB_RETRIES);
        super.activate(context);
    }

    /**
     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#deactivate(org.osgi.service.component.ComponentContext)
     */
    protected void deactivate(final ComponentContext context) {
        super.deactivate(context);
        if ( this.backgroundSession != null ) {
            try {
                this.backgroundSession.getWorkspace().getObservationManager().removeEventListener(this);
            } catch (RepositoryException e) {
                // we just ignore it
                this.logger.warn("Unable to remove event listener.", e);
            }
            this.backgroundSession.logout();
            this.backgroundSession = null;
        }
    }

    /**
     * Return the query string for the clean up.
     */
    protected String getCleanUpQueryString() {
        final Calendar deleteBefore = Calendar.getInstance();
        deleteBefore.add(Calendar.MINUTE, -this.cleanupPeriod);
        final String dateString = ISO8601.format(deleteBefore);

        final StringBuffer buffer = new StringBuffer("/jcr:root");
        buffer.append(this.repositoryPath);
        buffer.append("//element(*, ");
        buffer.append(getEventNodeType());
        buffer.append(")[@");
        buffer.append(EventHelper.NODE_PROPERTY_FINISHED);
        buffer.append(" < xs:dateTime('");
        buffer.append(dateString);
        buffer.append("')]");

        return buffer.toString();
    }

    /**
     * This method is invoked periodically.
     * @see java.lang.Runnable#run()
     */
    public void run() {
        if ( this.cleanupPeriod > 0 ) {
            this.logger.debug("Cleaning up repository, removing all finished jobs older than {} minutes.", this.cleanupPeriod);

            final String queryString = this.getCleanUpQueryString();
            // we create an own session for concurrency issues
            Session s = null;
            try {
                s = this.createSession();
                final Node parentNode = (Node)s.getItem(this.repositoryPath);
                logger.debug("Executing query {}", queryString);
                final Query q = s.getWorkspace().getQueryManager().createQuery(queryString, Query.XPATH);
                final NodeIterator iter = q.execute().getNodes();
                int count = 0;
                while ( iter.hasNext() ) {
                    final Node eventNode = iter.nextNode();
                    eventNode.remove();
                    count++;
                }
                parentNode.save();
                logger.debug("Removed {} entries from the repository.", count);

            } catch (RepositoryException e) {
                // in the case of an error, we just log this as a warning
                this.logger.warn("Exception during repository cleanup.", e);
            } finally {
                if ( s != null ) {
                    s.logout();
                }
            }
        }
    }
    /**
     * @see org.apache.sling.event.impl.AbstractRepositoryEventHandler#processWriteQueue()
     */
    protected void processWriteQueue() {
        while ( this.running ) {
            // so let's wait/get the next job from the queue
            Event event = null;
            try {
                event = this.writeQueue.take();
            } catch (InterruptedException e) {
                // we ignore this
                this.ignoreException(e);
            }
            if ( event != null && this.running ) {
                try {
                    this.writerSession.refresh(false);
                } catch (RepositoryException re) {
                    // we just ignore this
                    this.ignoreException(re);
                }
                final EventInfo info = new EventInfo();
                info.event = event;
                final String nodeName = this.getNodeName(event);

                // if the job has no job id, we can just write the job to the repo and don't
                // need locking
                final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
                if ( jobId == null ) {
                    try {
                        final Node eventNode = this.writeEvent(event, nodeName);
                        info.nodePath = eventNode.getPath();
                    } catch (RepositoryException re ) {
                        // something went wrong, so let's log it
                        this.logger.error("Exception during writing new job '" + nodeName + "' to repository.", re);
                    }
                } else {
                    try {
                        // let's first search for an existing node with the same id
                        final Node parentNode = (Node)this.writerSession.getItem(this.repositoryPath);
                        Node foundNode = null;
                        if ( parentNode.hasNode(nodeName) ) {
                            foundNode = parentNode.getNode(nodeName);
                        }
                        if ( foundNode != null ) {
                            // if the node is locked, someone else was quicker
                            // and we don't have to process this job
                            if ( !foundNode.isLocked() ) {
                                // node is already in repository, so if not finished we just use it
                                // otherwise it has already been processed
                                try {
                                    if ( !foundNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED) ) {
                                        info.nodePath = foundNode.getPath();
                                    }
                                } catch (RepositoryException re) {
                                    // if anything goes wrong, it means that (hopefully) someone
                                    // else is processing this node
                                }
                            }
                        } else {
                            // We now write the event into the repository
                            try {
                                final Node eventNode = this.writeEvent(event, nodeName);
                                info.nodePath = eventNode.getPath();
                            } catch (ItemExistsException iee) {
                                // someone else did already write this node in the meantime
                                // nothing to do for us
                            }
                        }
                    } catch (RepositoryException re ) {
                        // something went wrong, so let's log it
                        this.logger.error("Exception during writing new job '" + nodeName + "' to repository.", re);
                    }
                }
                // if we were able to write the event into the repository
                // we will queue it for processing
                if ( info.nodePath != null ) {
                    try {
                        this.queue.put(info);
                    } catch (InterruptedException e) {
                        // this should never happen
                        this.ignoreException(e);
                    }
                }
            }
        }
    }

    /**
     * This method runs in the background and processes the local queue.
     */
    protected void runInBackground() throws RepositoryException {
        this.backgroundSession = this.createSession();
        this.backgroundSession.getWorkspace().getObservationManager()
        .addEventListener(this,
                          javax.jcr.observation.Event.PROPERTY_REMOVED,
                          this.repositoryPath,
                          true,
                          null,
                          new String[] {this.getEventNodeType()},
                          true);
        // load unprocessed jobs from repository
        this.loadJobs();
        while ( this.running ) {
            // so let's wait/get the next job from the queue
            EventInfo info = null;
            try {
                info = this.queue.take();
            } catch (InterruptedException e) {
                // we ignore this
                this.ignoreException(e);
            }

            if ( info != null && this.running ) {
                // check if the node still exists
                synchronized (this.backgroundSession) {
                    try {
                        this.backgroundSession.refresh(false);
                        if ( this.backgroundSession.itemExists(info.nodePath) ) {
                            final Event event = info.event;
                            final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
                            final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;

                            // check how we can process this job
                            // if parallel processing is allowed, we can just process
                            // if not we should check if any other job with the same topic is currently running
                            boolean process = parallelProcessing;
                            if ( !process ) {
                                synchronized ( this.processingMap ) {
                                    final Boolean value = this.processingMap.get(jobTopic);
                                    if ( value == null || !value.booleanValue() ) {
                                        this.processingMap.put(jobTopic, Boolean.TRUE);
                                        process = true;
                                    }
                                }

                            }
                            if ( process ) {
                                boolean unlock = true;
                                try {
                                    final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
                                    if ( !eventNode.isLocked() ) {
                                        // lock node
                                        try {
                                            eventNode.lock(false, true);
                                        } catch (RepositoryException re) {
                                            // lock failed which means that the node is locked by someone else, so we don't have to requeue
                                            process = false;
                                        }
                                        if ( process ) {
                                            unlock = false;
                                            this.processJob(info.event, eventNode);
                                        }
                                    }
                                } catch (RepositoryException e) {
                                    // ignore
                                    this.ignoreException(e);
                                } finally {
                                    if ( unlock && !parallelProcessing ) {
                                        synchronized ( this.processingMap ) {
                                            this.processingMap.put(jobTopic, Boolean.FALSE);
                                        }
                                    }
                                }
                            } else {
                                try {
                                    // check if the node is in processing or already finished
                                    final Node eventNode = (Node) this.backgroundSession.getItem(info.nodePath);
                                    if ( !eventNode.isLocked() ) {
                                        try {
                                            this.queue.put(info);
                                        } catch (InterruptedException e) {
                                            // ignore
                                            this.ignoreException(e);
                                        }
                                        // wait time before we restart the cycle, if there is only one job in the queue!
                                        if ( this.queue.size() == 1 ) {
                                            try {
                                                Thread.sleep(this.sleepTime);
                                            } catch (InterruptedException e) {
                                                // ignore
                                                this.ignoreException(e);
                                            }
                                        }
                                    }
                                } catch (RepositoryException e) {
                                    // ignore
                                    this.ignoreException(e);
                                }
                            }
                        }
                    } catch (RepositoryException re) {
                        this.ignoreException(re);
                    }

                }
            }
        }
    }

    /**
     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getContainerNodeType()
     */
    protected String getContainerNodeType() {
        return EventHelper.JOBS_NODE_TYPE;
    }

    /**
     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#getEventNodeType()
     */
    protected String getEventNodeType() {
        return EventHelper.JOB_NODE_TYPE;
    }

    /**
     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
     */
    public void handleEvent(final Event event) {
        // we ignore remote job events
        if ( EventUtil.isLocal(event) ) {
            // check for bundle event
            if ( event.getTopic().equals(EventUtil.TOPIC_JOB)) {
                // job event
                final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);

                //  job topic must be set, otherwise we ignore this event!
                if ( jobTopic != null ) {
                    // queue the event in order to respond quickly
                    try {
                        this.writeQueue.put(event);
                    } catch (InterruptedException e) {
                        // this should never happen
                        this.ignoreException(e);
                    }
                } else {
                    this.logger.warn("Event does not contain job topic: {}", event);
                }

            } else {
                // bundle event started or updated
                boolean doIt = false;
                synchronized ( this.unloadedJobs ) {
                    if ( this.unloadedJobs.size() > 0 ) {
                        doIt = true;
                    }
                }
                if ( doIt ) {
                    final Runnable t = new Runnable() {

                        public void run() {
                            synchronized (unloadedJobs) {
                                Session s = null;
                                final Set<String> newUnloadedJobs = new HashSet<String>();
                                newUnloadedJobs.addAll(unloadedJobs);
                                try {
                                    s = createSession();
                                    for(String path : unloadedJobs ) {
                                        newUnloadedJobs.remove(path);
                                        try {
                                            if ( s.itemExists(path) ) {
                                                final Node eventNode = (Node) s.getItem(path);
                                                if ( !eventNode.isLocked() ) {
                                                    try {
                                                        final EventInfo info = new EventInfo();
                                                        info.event = readEvent(eventNode);
                                                        info.nodePath = path;
                                                        try {
                                                            queue.put(info);
                                                        } catch (InterruptedException e) {
                                                            // we ignore this exception as this should never occur
                                                            ignoreException(e);
                                                        }
                                                    } catch (ClassNotFoundException cnfe) {
                                                        newUnloadedJobs.add(path);
                                                        ignoreException(cnfe);
                                                    }
                                                }
                                            }
                                        } catch (RepositoryException re) {
                                            // we ignore this and readd
                                            newUnloadedJobs.add(path);
                                            ignoreException(re);
                                        }
                                    }
                                } catch (RepositoryException re) {
                                    // unable to create session, so we try it again next time
                                    ignoreException(re);
                                } finally {
                                    if ( s != null ) {
                                        s.logout();
                                    }
                                    unloadedJobs.clear();
                                    unloadedJobs.addAll(newUnloadedJobs);
                                }
                            }
                        }

                    };
                    this.threadPool.execute(t);
                }
            }
        }
    }

    public static final String ALLOWED_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+*#!�$%&()=[]?";
    public static final char REPLACEMENT_CHAR = '_';

    public static String filter(final String nodeName) {
        final StringBuffer sb  = new StringBuffer();
        char lastAdded = 0;

        for(int i=0; i < nodeName.length(); i++) {
            final char c = nodeName.charAt(i);
            char toAdd = c;

            if (ALLOWED_CHARS.indexOf(c) < 0) {
                if (lastAdded == REPLACEMENT_CHAR) {
                    // do not add several _ in a row
                    continue;
                }
                toAdd = REPLACEMENT_CHAR;

            } else if(i == 0 && Character.isDigit(c)) {
                sb.append(REPLACEMENT_CHAR);
            }

            sb.append(toAdd);
            lastAdded = toAdd;
        }

        if (sb.length()==0) {
            sb.append(REPLACEMENT_CHAR);
        }

        return sb.toString();
    }

    /**
     * Create a unique node name for the job.
     */
    protected String getNodeName(Event event) {
        final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
        final String name;
        if ( jobId != null ) {
            final String jobTopic = ((String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
            name = jobTopic + " " + jobId;
        } else {
            name = "Job " + UUID.randomUUID().toString();
        }
        return filter(name);
    }

    /**
     * Process a job and unlock the node in the repository.
     * @param event The original event.
     * @param eventNode The node in the repository where the job is stored.
     */
    protected void processJob(Event event, Node eventNode)  {
        final boolean parallelProcessing = event.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
        final String jobTopic = (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
        boolean unlock = true;
        try {
            final Event jobEvent = this.getJobEvent(event, eventNode.getPath());
            eventNode.setProperty(EventHelper.NODE_PROPERTY_PROCESSOR, this.applicationId);
            eventNode.save();
            final EventAdmin localEA = this.eventAdmin;
            if ( localEA != null ) {
                localEA.sendEvent(jobEvent);
                // do not unlock if sending was successful
                unlock = false;
            } else {
                this.logger.error("Job event can't be sent as no event admin is available.");
            }
        } catch (RepositoryException re) {
            // if an exception occurs, we just log
            this.logger.error("Exception during job processing.", re);
        } finally {
            if ( unlock ) {
                if ( !parallelProcessing ) {
                    synchronized ( this.processingMap ) {
                        this.processingMap.put(jobTopic, Boolean.FALSE);
                    }
                }
                // unlock node
                try {
                    eventNode.unlock();
                } catch (RepositoryException e) {
                    // if unlock fails, we silently ignore this
                    this.ignoreException(e);
                }
            }
        }
    }

    /**
     * Create the job event.
     * @param e
     * @return
     */
    protected Event getJobEvent(Event e, String nodePath) {
        final String eventTopic = (String)e.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
        final Dictionary<String, Object> properties = new Hashtable<String, Object>();
        final String[] propertyNames = e.getPropertyNames();
        for(int i=0; i<propertyNames.length; i++) {
            properties.put(propertyNames[i], e.getProperty(propertyNames[i]));
        }
        // put properties for finished job callback
        properties.put(EventUtil.JobStatusNotifier.CONTEXT_PROPERTY_NAME, new EventUtil.JobStatusNotifier.NotifierContext(this, nodePath));
        return new Event(eventTopic, properties);
    }

    /**
     * @see org.apache.sling.engine.event.impl.JobPersistenceHandler#addNodeProperties(javax.jcr.Node, org.osgi.service.event.Event)
     */
    protected void addNodeProperties(Node eventNode, Event event)
    throws RepositoryException {
        super.addNodeProperties(eventNode, event);
        eventNode.setProperty(EventHelper.NODE_PROPERTY_TOPIC, (String)event.getProperty(EventUtil.PROPERTY_JOB_TOPIC));
        final String jobId = (String)event.getProperty(EventUtil.PROPERTY_JOB_ID);
        if ( jobId != null ) {
            eventNode.setProperty(EventHelper.NODE_PROPERTY_JOBID, jobId);
        }
    }

    /**
     * @see javax.jcr.observation.EventListener#onEvent(javax.jcr.observation.EventIterator)
     */
    public void onEvent(EventIterator iter) {
        // we create an own session here
        Session s = null;
        try {
            s = this.createSession();
            while ( iter.hasNext() ) {
                final javax.jcr.observation.Event event = iter.nextEvent();
                if ( event.getType() == javax.jcr.observation.Event.PROPERTY_CHANGED
                   || event.getType() == javax.jcr.observation.Event.PROPERTY_REMOVED) {
                    try {
                        final String propPath = event.getPath();
                        int pos = propPath.lastIndexOf('/');
                        final String nodePath = propPath.substring(0, pos);
                        final String propertyName = propPath.substring(pos+1);

                        // we are only interested in unlocks
                        if ( "jcr:lockOwner".equals(propertyName) ) {
                            boolean doNotProcess = false;
                            synchronized ( this.deletedJobs ) {
                                doNotProcess = this.deletedJobs.remove(nodePath);
                            }
                            if ( !doNotProcess ) {
                                final Node eventNode = (Node) s.getItem(nodePath);
                                if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
                                    try {
                                        final EventInfo info = new EventInfo();
                                        info.event = this.readEvent(eventNode);
                                        info.nodePath = nodePath;
                                        try {
                                            this.queue.put(info);
                                        } catch (InterruptedException e) {
                                            // we ignore this exception as this should never occur
                                            this.ignoreException(e);
                                        }
                                    } catch (ClassNotFoundException cnfe) {
                                        // store path for lazy loading
                                        synchronized ( this.unloadedJobs ) {
                                            this.unloadedJobs.add(nodePath);
                                        }
                                        this.ignoreException(cnfe);
                                    }
                                }
                            }
                        }
                    } catch (RepositoryException re) {
                        this.logger.error("Exception during jcr event processing.", re);
                    }
                }
            }
        } catch (RepositoryException re) {
            this.logger.error("Unable to create a session.", re);
        } finally {
            if ( s != null ) {
                s.logout();
            }
        }
    }

    /**
     * Load all active jobs from the repository.
     * @throws RepositoryException
     */
    protected void loadJobs() {
        try {
            final QueryManager qManager = this.backgroundSession.getWorkspace().getQueryManager();
            final StringBuffer buffer = new StringBuffer("/jcr:root");
            buffer.append(this.repositoryPath);
            buffer.append("//element(*, ");
            buffer.append(this.getEventNodeType());
            buffer.append(")");
            final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
            final NodeIterator result = q.execute().getNodes();
            while ( result.hasNext() ) {
                final Node eventNode = result.nextNode();
                if ( !eventNode.isLocked() && !eventNode.hasProperty(EventHelper.NODE_PROPERTY_FINISHED)) {
                    final String nodePath = eventNode.getPath();
                    try {
                        final Event event = this.readEvent(eventNode);
                        final EventInfo info = new EventInfo();
                        info.event = event;
                        info.nodePath = nodePath;
                        try {
                            this.queue.put(info);
                        } catch (InterruptedException e) {
                            // we ignore this exception as this should never occur
                            this.ignoreException(e);
                        }
                    } catch (ClassNotFoundException cnfe) {
                        // store path for lazy loading
                        synchronized ( this.unloadedJobs ) {
                            this.unloadedJobs.add(nodePath);
                        }
                        this.ignoreException(cnfe);
                    } catch (RepositoryException re) {
                        this.logger.error("Unable to load stored job from " + nodePath, re);
                    }
                }
            }
        } catch (RepositoryException re) {
            this.logger.error("Exception during initial loading of stored jobs.", re);
        }
    }

    /**
     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#finishedJob(org.osgi.service.event.Event, String, boolean)
     */
    public boolean finishedJob(Event job, String eventNodePath, boolean shouldReschedule) {
        boolean reschedule = shouldReschedule;
        if ( shouldReschedule ) {
            // check if we exceeded the number of retries
            int retries = this.maxJobRetries;
            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRIES) != null ) {
                retries = (Integer) job.getProperty(EventUtil.PROPERTY_JOB_RETRIES);
            }
            int retryCount = 0;
            if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT) != null ) {
                retryCount = (Integer)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_COUNT);
            }
            retryCount++;
            if ( retryCount > retries ) {
                reschedule = false;
            }
            // update event with retry count
            final Dictionary<String, Object> newProperties;
            // create a new dictionary
            newProperties = new Hashtable<String, Object>();
            final String[] names = job.getPropertyNames();
            for(int i=0; i<names.length; i++ ) {
                newProperties.put(names[i], job.getProperty(names[i]));
            }
            newProperties.put(EventUtil.PROPERTY_JOB_RETRY_COUNT, retryCount);
            job = new Event(job.getTopic(), newProperties);
        }
        final boolean parallelProcessing = job.getProperty(EventUtil.PROPERTY_JOB_PARALLEL) != null;
        // we have to use the same session for unlocking that we used for locking!
        synchronized ( this.backgroundSession ) {
            try {
                this.backgroundSession.refresh(false);
                final Node eventNode = (Node) this.backgroundSession.getItem(eventNodePath);
                boolean unlock = true;
                try {
                    if ( !reschedule ) {
                        synchronized ( this.deletedJobs ) {
                            this.deletedJobs.add(eventNodePath);
                        }
                        // unlock node
                        try {
                            eventNode.unlock();
                        } catch (RepositoryException e) {
                            // if unlock fails, we silently ignore this
                            this.ignoreException(e);
                        }
                        unlock = false;
                        final String jobId = (String)job.getProperty(EventUtil.PROPERTY_JOB_ID);
                        if ( jobId == null ) {
                            // remove node from repository if no job id set
                            final Node parentNode = eventNode.getParent();
                            eventNode.remove();
                            parentNode.save();
                        } else {
                            eventNode.setProperty(EventHelper.NODE_PROPERTY_FINISHED, Calendar.getInstance());
                            eventNode.save();
                        }
                    }
                } catch (RepositoryException re) {
                    // if an exception occurs, we just log
                    this.logger.error("Exception during job finishing.", re);
                } finally {
                    if ( !parallelProcessing) {
                        final String jobTopic = (String)job.getProperty(EventUtil.PROPERTY_JOB_TOPIC);
                        synchronized ( this.processingMap ) {
                            this.processingMap.put(jobTopic, Boolean.FALSE);
                        }
                    }
                    if ( unlock ) {
                        synchronized ( this.deletedJobs ) {
                            this.deletedJobs.add(eventNodePath);
                        }
                        // unlock node
                        try {
                            eventNode.unlock();
                        } catch (RepositoryException e) {
                            // if unlock fails, we silently ignore this
                            this.ignoreException(e);
                        }
                    }
                }
                if ( reschedule ) {
                    final EventInfo info = new EventInfo();
                    try {
                        info.event = job;
                        info.nodePath = eventNode.getPath();
                    } catch (RepositoryException e) {
                        // this should never happen
                        this.ignoreException(e);
                    }
                    // delay rescheduling?
                    if ( job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY) != null ) {
                        final long delay = (Long)job.getProperty(EventUtil.PROPERTY_JOB_RETRY_DELAY);
                        final Runnable t = new Runnable() {
                            public void run() {
                                try {
                                    Thread.sleep(delay);
                                } catch (InterruptedException e) {
                                    // this should never happen
                                    ignoreException(e);
                                }
                                try {
                                    queue.put(info);
                                } catch (InterruptedException e) {
                                    // this should never happen
                                    ignoreException(e);
                                }
                            }
                        };
                        this.threadPool.execute(t);
                    } else {
                        // put directly into queue
                        try {
                            this.queue.put(info);
                        } catch (InterruptedException e) {
                            // this should never happen
                            this.ignoreException(e);
                        }
                    }
                }
                if ( !shouldReschedule ) {
                    return true;
                }
                return reschedule;
            } catch (RepositoryException re) {
                this.logger.error("Unable to create new session.", re);
                return false;
            }
        }
    }

    /**
     * @see org.apache.sling.event.EventUtil.JobStatusNotifier#execute(java.lang.Runnable)
     */
    public void execute(Runnable job) {
        this.threadPool.execute(job);
    }

    /**
     * Search for active nodes
     * @param topic
     * @return
     * @throws RepositoryException
     */
    protected Collection<Event> queryCurrentJobs(String topic, boolean locked)  {
        // we create a new session
        Session s = null;
        final List<Event> jobs = new ArrayList<Event>();
        try {
            s = this.createSession();
            final QueryManager qManager = s.getWorkspace().getQueryManager();
            final StringBuffer buffer = new StringBuffer("/jcr:root");
            buffer.append(this.repositoryPath);
            buffer.append("//element(*, ");
            buffer.append(this.getEventNodeType());
            buffer.append(")");
            if ( topic != null ) {
                buffer.append(" [");
                buffer.append(EventHelper.NODE_PROPERTY_TOPIC);
                buffer.append(" = '");
                buffer.append(topic);
                buffer.append("'");
            }
            if ( locked ) {
                buffer.append(" and ");
                buffer.append("jcr:lockOwner");
            }
            buffer.append("]");
            final Query q = qManager.createQuery(buffer.toString(), Query.XPATH);
            final NodeIterator iter = q.execute().getNodes();
            while ( iter.hasNext() ) {
                final Node eventNode = iter.nextNode();
                try {
                    final Event event = this.readEvent(eventNode);
                    jobs.add(event);
                } catch (ClassNotFoundException cnfe) {
                    // in the case of a class not found exception we just ignore the exception
                    this.ignoreException(cnfe);
                }
            }
        } catch (RepositoryException e) {
            // in the case of an error, we return an empty list
            this.ignoreException(e);
        } finally {
            if ( s != null) {
                s.logout();
            }
        }
        return jobs;
    }

    /**
     * @see org.apache.sling.event.JobStatusProvider#getCurrentJobs(java.lang.String)
     */
    public Collection<Event> getCurrentJobs(String topic) {
        return this.queryCurrentJobs(topic, true);
    }

    /**
     * @see org.apache.sling.event.JobStatusProvider#scheduledJobs(java.lang.String)
     */
    public Collection<Event> scheduledJobs(String topic) {
        return this.queryCurrentJobs(topic, false);
    }
}
TOP

Related Classes of org.apache.sling.event.impl.JobEventHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.