Package org.apache.sling.replication.queue.impl.jobhandling

Source Code of org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sling.replication.queue.impl.jobhandling;

import javax.annotation.Nonnull;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.replication.queue.ReplicationQueue;
import org.apache.sling.replication.queue.ReplicationQueueException;
import org.apache.sling.replication.queue.ReplicationQueueProcessor;
import org.apache.sling.replication.queue.ReplicationQueueProvider;
import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(metatype = false, label = "Sling Job handling based Replication Queue Provider")
@Service(value = ReplicationQueueProvider.class)
@Property(name = "name", value = JobHandlingReplicationQueueProvider.NAME)
public class JobHandlingReplicationQueueProvider extends AbstractReplicationQueueProvider
        implements ReplicationQueueProvider {

    public static final String NAME = "sjh";

    private final Logger log = LoggerFactory.getLogger(getClass());

    @Reference
    private JobManager jobManager;


    private final Map<String, ServiceRegistration> jobConsumers = new ConcurrentHashMap<String, ServiceRegistration>();

    private BundleContext context;


    protected JobHandlingReplicationQueueProvider(JobManager jobManager, BundleContext context) {
        this.jobManager = jobManager;
        this.context = context;
    }

    public JobHandlingReplicationQueueProvider() {
    }

    @Override
    protected ReplicationQueue getInternalQueue(String agentName, String queueName)
            throws ReplicationQueueException {
        String name = agentName;
        if (queueName.length() > 0) {
            name += "/" + queueName;
        }
        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
        return new JobHandlingReplicationQueue(name, topic, jobManager);
    }


    public void enableQueueProcessing(@Nonnull String agentName, @Nonnull ReplicationQueueProcessor queueProcessor) {
        // eventually register job consumer for sling job handling based queues
        Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
        String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agentName;
        String childTopic = topic + "/*";
        jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
        synchronized (jobConsumers) {
            log.info("registering job consumer for agent {}", agentName);
            ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
                    new ReplicationAgentJobConsumer(queueProcessor), jobProps);
            if (jobReg != null) {
                jobConsumers.put(agentName, jobReg);
            }
            log.info("job consumer for agent {} registered", agentName);
        }
    }

    public void disableQueueProcessing(@Nonnull String agentName) {
        synchronized (jobConsumers) {
            log.info("unregistering job consumer for agent {}", agentName);
            ServiceRegistration jobReg = jobConsumers.remove(agentName);
            if (jobReg != null) {
                jobReg.unregister();
                log.info("job consumer for agent {} unregistered", agentName);
            }
        }
    }

    @Activate
    private void activate(BundleContext context) {
        this.context = context;
    }

    @Deactivate
    private void deactivate() {
        for (ServiceRegistration jobReg : jobConsumers.values()) {
            jobReg.unregister();
        }
        this.context = null;
    }
}
TOP

Related Classes of org.apache.sling.replication.queue.impl.jobhandling.JobHandlingReplicationQueueProvider

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.