Package net.greghaines.jesque.meta.dao.impl

Source Code of net.greghaines.jesque.meta.dao.impl.QueueInfoDAORedisImpl

/*
* Copyright 2011 Greg Haines
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.greghaines.jesque.meta.dao.impl;

import static net.greghaines.jesque.utils.ResqueConstants.PROCESSED;
import static net.greghaines.jesque.utils.ResqueConstants.QUEUE;
import static net.greghaines.jesque.utils.ResqueConstants.QUEUES;
import static net.greghaines.jesque.utils.ResqueConstants.STAT;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.meta.QueueInfo;
import net.greghaines.jesque.meta.dao.QueueInfoDAO;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.PoolUtils;
import net.greghaines.jesque.utils.PoolUtils.PoolWork;
import redis.clients.jedis.Jedis;
import redis.clients.util.Pool;

/**
* QueueInfoDAORedisImpl gets queue information from Redis.
*
* @author Greg Haines
* @author Animesh Kumar
*/
public class QueueInfoDAORedisImpl implements QueueInfoDAO {
   
    private final Config config;
    private final Pool<Jedis> jedisPool;

    /**
     * Constructor.
     * @param config the Jesque configuration
     * @param jedisPool the pool of Jedis connections
     */
    public QueueInfoDAORedisImpl(final Config config, final Pool<Jedis> jedisPool) {
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        if (jedisPool == null) {
            throw new IllegalArgumentException("jedisPool must not be null");
        }
        this.config = config;
        this.jedisPool = jedisPool;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public List<String> getQueueNames() {
        return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, List<String>>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public List<String> doWork(final Jedis jedis) throws Exception {
                final List<String> queueNames = new ArrayList<String>(jedis.smembers(key(QUEUES)));
                Collections.sort(queueNames);
                return queueNames;
            }
        });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public long getPendingCount() {
        final List<String> queueNames = getQueueNames();
        return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Long>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public Long doWork(final Jedis jedis) throws Exception {
                long pendingCount = 0L;
                for (final String queueName : queueNames) {
                    pendingCount += size(jedis, queueName);
                }
                return pendingCount;
            }
        });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public long getProcessedCount() {
        return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Long>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public Long doWork(final Jedis jedis) throws Exception {
                final String processedStr = jedis.get(key(STAT, PROCESSED));
                return (processedStr == null) ? 0L : Long.parseLong(processedStr);
            }
        });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public List<QueueInfo> getQueueInfos() {
        final List<String> queueNames = getQueueNames();
        return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, List<QueueInfo>>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public List<QueueInfo> doWork(final Jedis jedis) throws Exception {
                final List<QueueInfo> queueInfos = new ArrayList<QueueInfo>(queueNames.size());
                for (final String queueName : queueNames) {
                    final QueueInfo queueInfo = new QueueInfo();
                    queueInfo.setName(queueName);
                    queueInfo.setSize(size(jedis, queueName));
                    queueInfos.add(queueInfo);
                }
                Collections.sort(queueInfos);
                return queueInfos;
            }
        });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public QueueInfo getQueueInfo(final String name, final long jobOffset, final long jobCount) {
        return PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, QueueInfo>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public QueueInfo doWork(final Jedis jedis) throws Exception {
                final QueueInfo queueInfo = new QueueInfo();
                queueInfo.setName(name);
                queueInfo.setSize(size(jedis, name));
                final Collection<String> payloads = paylods(jedis, name, jobOffset, jobCount);
                final List<Job> jobs = new ArrayList<Job>(payloads.size());
                for (final String payload : payloads) {
                    jobs.add(ObjectMapperFactory.get().readValue(payload, Job.class));
                }
                queueInfo.setJobs(jobs);
                return queueInfo;
            }
        });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void removeQueue(final String name) {
        PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public Void doWork(final Jedis jedis) throws Exception {
                jedis.srem(key(QUEUES), name);
                jedis.del(key(QUEUE, name));
                return null;
            }
        });
    }

    /**
     * Builds a namespaced Redis key with the given arguments.
     *
     * @param parts
     *            the key parts to be joined
     * @return an assembled String key
     */
    private String key(final String... parts) {
        return JesqueUtils.createKey(this.config.getNamespace(), parts);
    }

    /**
     * Size of a queue.
     *
     * @param jedis
     * @param queueName
     * @return
     */
    private long size(final Jedis jedis, final String queueName) {
        final String key = key(QUEUE, queueName);
        final long size;
        if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD
            size = jedis.zcard(key);
        } else { // Else, use LLEN
            size = jedis.llen(key);
        }
        return size;
    }

    /**
     * Get list of payload from a queue.
     *
     * @param jedis
     * @param queueName
     * @param jobOffset
     * @param jobCount
     * @return
     */
    private Collection<String> paylods(final Jedis jedis, final String queueName, final long jobOffset, final long jobCount) {
        final String key = key(QUEUE, queueName);
        final Collection<String> payloads;
        if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZRANGE
            payloads = jedis.zrange(key, jobOffset, jobOffset + jobCount - 1);
        } else { // Else, use LRANGE
            payloads = jedis.lrange(key, jobOffset, jobOffset + jobCount - 1);
        }
        return payloads;
    }
}
TOP

Related Classes of net.greghaines.jesque.meta.dao.impl.QueueInfoDAORedisImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.