Package com.alibaba.otter.canal.server.netty.handler

Source Code of com.alibaba.otter.canal.server.netty.handler.ClientAuthenticationHandler

package com.alibaba.otter.canal.server.netty.handler;

import org.apache.commons.lang.StringUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
import org.jboss.netty.handler.timeout.IdleStateEvent;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitor;
import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitors;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.CanalPacket.ClientAuth;
import com.alibaba.otter.canal.protocol.CanalPacket.Packet;
import com.alibaba.otter.canal.server.embeded.CanalServerWithEmbeded;
import com.alibaba.otter.canal.server.netty.NettyUtils;

/**
* 客户端身份认证处理
*
* @author jianghang 2012-10-24 上午11:12:45
* @version 1.0.0
*/
public class ClientAuthenticationHandler extends SimpleChannelHandler {

    private static final Logger    logger                                  = LoggerFactory.getLogger(ClientAuthenticationHandler.class);
    private final int              SUPPORTED_VERSION                       = 3;
    private final int              defaultSubscriptorDisconnectIdleTimeout = 5 * 60 * 1000;
    private CanalServerWithEmbeded embededServer;

    public ClientAuthenticationHandler(){

    }

    public ClientAuthenticationHandler(CanalServerWithEmbeded embededServer){
        this.embededServer = embededServer;
    }

    public void messageReceived(final ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
        final Packet packet = Packet.parseFrom(buffer.readBytes(buffer.readableBytes()).array());
        switch (packet.getVersion()) {
            case SUPPORTED_VERSION:
            default:
                final ClientAuth clientAuth = ClientAuth.parseFrom(packet.getBody());
                // 如果存在订阅信息
                if (StringUtils.isNotEmpty(clientAuth.getDestination())
                    && StringUtils.isNotEmpty(clientAuth.getClientId())) {
                    ClientIdentity clientIdentity = new ClientIdentity(clientAuth.getDestination(),
                                                                       Short.valueOf(clientAuth.getClientId()),
                                                                       clientAuth.getFilter());
                    try {
                        MDC.put("destination", clientIdentity.getDestination());
                        embededServer.subscribe(clientIdentity);
                        ctx.setAttachment(clientIdentity);// 设置状态数据
                        // 尝试启动,如果已经启动,忽略
                        if (!embededServer.isStart(clientIdentity.getDestination())) {
                            ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(clientIdentity.getDestination());
                            if (!runningMonitor.isStart()) {
                                runningMonitor.start();
                            }
                        }
                    } finally {
                        MDC.remove("destination");
                    }
                }

                NettyUtils.ack(ctx.getChannel(), new ChannelFutureListener() {

                    public void operationComplete(ChannelFuture future) throws Exception {
                        logger.info("remove unused channel handlers after authentication is done successfully.");
                        ctx.getPipeline().remove(HandshakeInitializationHandler.class.getName());
                        ctx.getPipeline().remove(ClientAuthenticationHandler.class.getName());

                        int readTimeout = defaultSubscriptorDisconnectIdleTimeout;
                        int writeTimeout = defaultSubscriptorDisconnectIdleTimeout;
                        if (clientAuth.getNetReadTimeout() > 0) {
                            readTimeout = clientAuth.getNetReadTimeout();
                        }
                        if (clientAuth.getNetWriteTimeout() > 0) {
                            writeTimeout = clientAuth.getNetWriteTimeout();
                        }
                        IdleStateHandler idleStateHandler = new IdleStateHandler(NettyUtils.hashedWheelTimer,
                                                                                 readTimeout, writeTimeout, 0);
                        ctx.getPipeline().addBefore(SessionHandler.class.getName(), IdleStateHandler.class.getName(),
                                                    idleStateHandler);

                        IdleStateAwareChannelHandler idleStateAwareChannelHandler = new IdleStateAwareChannelHandler() {

                            public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
                                logger.warn(
                                            "channel:{} idle timeout exceeds, close channel to save server resources...",
                                            ctx.getChannel());
                                ctx.getChannel().close();
                            }

                        };
                        ctx.getPipeline().addBefore(SessionHandler.class.getName(),
                                                    IdleStateAwareChannelHandler.class.getName(),
                                                    idleStateAwareChannelHandler);
                    }

                });
                break;
        }
    }

    public void setEmbededServer(CanalServerWithEmbeded embededServer) {
        this.embededServer = embededServer;
    }

}
TOP

Related Classes of com.alibaba.otter.canal.server.netty.handler.ClientAuthenticationHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.