Package com.taobao.metamorphosis.client.consumer

Source Code of com.taobao.metamorphosis.client.consumer.FetchRequestQueue

/*
* (C) 2007-2012 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Authors:
*   wuhua <wq163@163.com> , boyan <killme2008@gmail.com>
*/
package com.taobao.metamorphosis.client.consumer;

import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


/**
* �ȶ������delay queue���̰߳�ȫ
*
* @author boyan
* @Date 2011-4-27
*
*/
class FetchRequestQueue {
    private final LinkedList<FetchRequest> queue = new LinkedList<FetchRequest>();
    private final Lock lock = new ReentrantLock();
    private final Condition available = this.lock.newCondition();

    /**
     * Thread designated to wait for the element at the head of the queue. This
     * variant of the Leader-Follower pattern
     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to minimize
     * unnecessary timed waiting. When a thread becomes the leader, it waits
     * only for the next delay to elapse, but other threads await indefinitely.
     * The leader thread must signal some other thread before returning from
     * take() or poll(...), unless some other thread becomes leader in the
     * interim. Whenever the head of the queue is replaced with an element with
     * an earlier expiration time, the leader field is invalidated by being
     * reset to null, and some waiting thread, but not necessarily the current
     * leader, is signalled. So waiting threads must be prepared to acquire and
     * lose leadership while waiting.
     */
    private Thread leader = null;


    public FetchRequest take() throws InterruptedException {
        final Lock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                FetchRequest first = this.queue.peek();
                if (first == null) {
                    this.available.await();
                }
                else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay <= 0) {
                        return this.queue.poll();
                    }
                    else if (this.leader != null) {
                        this.available.await();
                    }
                    else {
                        Thread thisThread = Thread.currentThread();
                        this.leader = thisThread;
                        try {
                            this.available.awaitNanos(delay);
                        }
                        finally {
                            if (this.leader == thisThread) {
                                this.leader = null;
                            }
                        }
                    }
                }
            }
        }
        finally {
            if (this.leader == null && this.queue.peek() != null) {
                this.available.signal();
            }
            lock.unlock();
        }
    }


    public void offer(FetchRequest e) {
        final Lock lock = this.lock;
        lock.lock();
        try {
            /**
             * A request is not referenced by this queue,so we don't want to add
             * it.
             */
            if (e.getRefQueue() != null && e.getRefQueue() != this) {
                return;
            }
            // Reference to request.
            e.setRefQueue(this);
            this.queue.offer(e);
            Collections.sort(this.queue);
            // Leader is changed.
            if (this.queue.peek() == e) {
                this.leader = null;
                this.available.signal();
            }
        }
        finally {
            lock.unlock();
        }
    }


    public int size() {
        final Lock lock = this.lock;
        lock.lock();
        try {
            return this.queue.size();
        }
        finally {
            lock.unlock();
        }
    }

}
TOP

Related Classes of com.taobao.metamorphosis.client.consumer.FetchRequestQueue

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.