Package net.tomp2p.connection

Source Code of net.tomp2p.connection.HeartBeat

package net.tomp2p.connection;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import net.tomp2p.futures.BaseFuture;
import net.tomp2p.p2p.builder.PingBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Striped down version of the IdleStateHandler.
*/
public class HeartBeat extends ChannelDuplexHandler {
 
  private static int MIN_TIME_TO_HEARTBEAT_MILLIS = 500;
   
    private static final Logger LOG = LoggerFactory.getLogger(HeartBeat.class);

    private final long timeToHeartBeatMillis;
   
    private volatile long lastReadTime;

    private volatile long lastWriteTime;
   
    private volatile ScheduledFuture<?> heartBeatFuture;
   
    private volatile int state; // 0 - none, 1 - initialized, 2 - destroyed
   
    private final PingBuilderFactory pingBuilderFactory;
    //may be set from other threads
    private volatile PeerConnection peerConnection;

    /**
     * Creates a new instance firing {@link IdleStateEvent}s.
     *
     * @param allIdleTime
     *            a ping will be triggered when neither
     *            read nor write was performed for the specified period of time. Specify {@code 0} to disable.
     * @param unit
     *            the {@link TimeUnit} of {@code readerIdleTime}, {@code writeIdleTime}, and {@code allIdleTime}
     * @param pingBuilderFactory
     *            the pingBuilderFactory
     */
    public HeartBeat(long allIdleTime, TimeUnit unit, PingBuilderFactory pingBuilderFactory) {
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (allIdleTime <= 0) {
            timeToHeartBeatMillis = 0;
        } else {
            timeToHeartBeatMillis = Math.max(unit.toMillis(allIdleTime), MIN_TIME_TO_HEARTBEAT_MILLIS);
        }
        this.pingBuilderFactory = pingBuilderFactory;
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        lastReadTime = System.currentTimeMillis();
        ctx.fireChannelRead(msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                lastWriteTime = System.currentTimeMillis();
            }
        });
        ctx.write(msg, promise);
    }

    /**
     * Return the allIdleTime that was given when instance this class in milliseconds.
     *
     */
    public long getAllIdleTimeInMillis() {
        return timeToHeartBeatMillis;
    }
   
    public PeerConnection peerConnection() {
        return peerConnection;
    }
   
    public HeartBeat peerConnection(PeerConnection peerConnection) {
        this.peerConnection = peerConnection;
        return this;
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            // channelActvie() event has been fired already, which means this.channelActive() will
            // not be invoked. We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelActive() event has not been fired yet. this.channelActive() will be invoked
            // and initialization will occur there.
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Initialize early if channel is active already.
        if (ctx.channel().isActive()) {
            initialize(ctx);
        }
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired. If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }
   
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        destroy();
    }
   
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        destroy();
        super.channelInactive(ctx);
    }

    private void initialize(ChannelHandlerContext ctx) {
       
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;

        EventExecutor loop = ctx.executor();

        lastReadTime = lastWriteTime = System.currentTimeMillis();
       
        heartBeatFuture = loop.scheduleAtFixedRate(new HeartBeating(ctx), timeToHeartBeatMillis, timeToHeartBeatMillis, TimeUnit.MILLISECONDS);
    }
   
    private void destroy() {
        state = 2;

        if (heartBeatFuture != null) {
            heartBeatFuture.cancel(false);
            heartBeatFuture = null;
        }
    }

    private final class HeartBeating implements Runnable {

        private final ChannelHandlerContext ctx;

        HeartBeating(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            if (!ctx.channel().isOpen()) {
                return;
            }
           
            long currentTime = System.currentTimeMillis();
            long lastIoTime = Math.max(lastReadTime, lastWriteTime);
            long nextDelay = timeToHeartBeatMillis - (currentTime - lastIoTime);
           
            if(peerConnection!=null && nextDelay <= 0) {
                LOG.debug("sending heart beat to {}, {}", peerConnection.remotePeer(),
                    (peerConnection.channelFuture() != null ? peerConnection.channelFuture().channel() : null));
                PingBuilder builder = pingBuilderFactory.create();
                BaseFuture baseFuture = builder.peerConnection(peerConnection).start();
                builder.notifyAutomaticFutures(baseFuture);
            } else {
              LOG.debug("Not sending heart beat to {}, {}", peerConnection.remotePeer(),
                    (peerConnection.channelFuture() != null ? peerConnection.channelFuture().channel() : null));
            }
        }
    }
}
TOP

Related Classes of net.tomp2p.connection.HeartBeat

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.