Package com.blacklocus.qs.aws.sqs

Source Code of com.blacklocus.qs.aws.sqs.AmazonSQSPrioritizedMessageProvider

/**
* Copyright 2013 BlackLocus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.blacklocus.qs.aws.sqs;

import com.amazonaws.AmazonClientException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.ListQueuesRequest;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.blacklocus.qs.Message;
import com.blacklocus.qs.MessageProvider;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Provide a prefix-based SQS priority queue. Each queue matching a prefix is
* drained in lexicographically sorted order. New queues that are created that
* match the given prefix are automatically picked up and processed in the same
* priority order based on a configurable interval. Note that this
* MessageProvider iterator implementation is not thread-safe. Calls to next()
* are not intended to be called outside of a single queue reading thread.
* You will need to externally synchronize calls to next() if you intend to
* access it in more than one thread. The delete() method is thread-safe.
*/
public class AmazonSQSPrioritizedMessageProvider implements MessageProvider {

    private static final Logger LOG = LoggerFactory.getLogger(AmazonSQSPrioritizedMessageProvider.class);

    public static final Comparator<String> DEFAULT_QUEUE_COMPARATOR = new Comparator<String>() {
        @Override
        public int compare(String s1, String s2) {
            return s1.compareTo(s2);
        }
    };

    private AmazonSQS sqs;
    private String queuePrefix;
    private long updateIntervalNs;

    private Predicate<String> include;
    private Comparator<String> queueComparator;

    private List<AmazonSQSMessageProvider> messageProviders;
    private long lastNanoTime;

    /**
     * Create a new MessageProvider instance.
     *
     * @param sqs              AmazonSQS instance to use for communicating with AWS
     * @param queuePrefix      the prefix to use when searching for queues
     * @param updateIntervalMs time in milliseconds between when new queues should be checked
     */
    public AmazonSQSPrioritizedMessageProvider(AmazonSQS sqs, String queuePrefix, long updateIntervalMs) {
        this.sqs = sqs;
        this.queuePrefix = queuePrefix;
        this.updateIntervalNs = TimeUnit.MILLISECONDS.toNanos(updateIntervalMs);

        this.include = Predicates.alwaysTrue();
        this.queueComparator = DEFAULT_QUEUE_COMPARATOR;

        this.messageProviders = Lists.newArrayList();

        // ensure first next() causes an updateAvailableQueues()
        this.lastNanoTime = currentNanoTime() - updateIntervalNs - 1;
    }

    /**
     * Queue <b>URLs</b> that return true for this predicate are included in the list
     * of available queues to be prioritized. It defaults to including all
     * available queues pulled for a prefix.
     *
     * @param include the predicate to satisfy
     * @return this instance for chaining
     */
    public AmazonSQSPrioritizedMessageProvider withInclude(Predicate<String> include) {
        this.include = include;
        return this;
    }

    /**
     * This comparator controls the sort order of the returned queues. It
     * defaults to generic String.compareTo().
     *
     * @param queueComparator the custom sorting comparator
     * @return this instance for chaining
     */
    public AmazonSQSPrioritizedMessageProvider withQueueComparator(Comparator<String> queueComparator) {
        this.queueComparator = queueComparator;
        return this;
    }

    /**
     * Returns true if an update to the queues should occur based on the time
     * since this method was last called.
     *
     * @return true if an update to the queues should occur
     */
    public boolean shouldUpdateQueues() {
        long currentNanoTime = currentNanoTime();
        if (currentNanoTime - lastNanoTime >= updateIntervalNs) {
            lastNanoTime = currentNanoTime;
            return true;
        } else {
            return false;
        }
    }

    /**
     * A wrapper around System.nanoTime() to encapsulate elapsed time handling
     * in one place.
     *
     * @return the current value of the running Java Virtual Machine's
     *         high-resolution time source, in nanoseconds
     */
    public long currentNanoTime() {
        return System.nanoTime();
    }

    @Override
    public List<Message> next() {

        if (shouldUpdateQueues()) {
            updateAvailableQueues();
        }

        // iterate until we find a queue with messages
        List<Message> messages = null;
        for (int i = 0; i < messageProviders.size() && messages == null; i++) {
            final AmazonSQSMessageProvider currentProvider = messageProviders.get(i);
            List<Message> current = currentProvider.next();
            if (current.size() > 0) {
                messages = Lists.transform(current, new Function<Message, Message>() {
                    @Override
                    public OriginatingMessage apply(Message input) {
                        return new OriginatingMessage(currentProvider.getQueueUrl(), input);
                    }
                });
            } else {
                // empty queues are discarded from the list of available queues to minimize receive requests
                messageProviders.remove(i);

                // adjust i for shifting of the list
                i--;
            }
        }

        // always return a list, even an empty one when there are no messages
        return messages == null ? Collections.<Message>emptyList() : messages;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("remove() not supported");
    }

    @Override
    public boolean hasNext() {
        return true;
    }

    @Override
    public Iterator<Collection<Message>> iterator() {
        return this;
    }

    @Override
    public void delete(Message message) {
        if (message instanceof OriginatingMessage) {
            OriginatingMessage originatingMessage = (OriginatingMessage) message;
            sqs.deleteMessage(new DeleteMessageRequest(originatingMessage.getOriginatingQueueUrl(), message.getReceipt()));
        } else {
            throw new RuntimeException("Unsupported message type: " + message.getBody());
        }
    }

    private void updateAvailableQueues() {
        try {
            ListQueuesResult result = sqs.listQueues(new ListQueuesRequest(queuePrefix));
            List<String> availableQueues = Lists.newArrayList(Iterables.filter(result.getQueueUrls(), include));
            Collections.sort(availableQueues, queueComparator);
            messageProviders.clear();
            for (String queueUrl : availableQueues) {
                messageProviders.add(new AmazonSQSMessageProvider(sqs, queueUrl));
            }
        } catch (AmazonClientException e) {
            LOG.error("An error occurred while listing SQS queues: {}", e);
        }
    }

    /**
     * Wrap a {@link Message} and tack on the originating queue url.
     */
    class OriginatingMessage implements Message {

        private String originatingQueueUrl;
        private Message message;

        public OriginatingMessage(String originatingQueueUrl, Message message) {
            this.originatingQueueUrl = originatingQueueUrl;
            this.message = message;
        }

        @Override
        public String getId() {
            return message.getId();
        }

        @Override
        public String getReceipt() {
            return message.getReceipt();
        }

        @Override
        public String getBody() {
            return message.getBody();
        }

        @Override
        public Map<String, String> getAttributes() {
            return message.getAttributes();
        }

        public String getOriginatingQueueUrl() {
            return originatingQueueUrl;
        }
    }
}
TOP

Related Classes of com.blacklocus.qs.aws.sqs.AmazonSQSPrioritizedMessageProvider

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.