Package com.alibaba.otter.shared.arbitrate.impl.setl.monitor

Source Code of com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor

/*
* Copyright (C) 2010-2101 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.otter.shared.arbitrate.impl.setl.monitor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.commons.lang.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import com.alibaba.otter.shared.arbitrate.impl.ArbitrateConstants;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.ArbitrateLifeCycle;
import com.alibaba.otter.shared.arbitrate.impl.setl.helper.StagePathUtils;
import com.alibaba.otter.shared.arbitrate.impl.setl.monitor.listener.PermitListener;
import com.alibaba.otter.shared.arbitrate.impl.zookeeper.ZooKeeperClient;
import com.alibaba.otter.shared.arbitrate.model.MainStemEventData;
import com.alibaba.otter.shared.common.model.config.channel.ChannelStatus;
import com.alibaba.otter.shared.common.utils.JsonUtils;
import com.alibaba.otter.shared.common.utils.lock.BooleanMutex;
import com.alibaba.otter.shared.common.utils.zookeeper.ZkClientx;

/**
* 同步任务状态的监控
*
* <pre>
* 监控数据内容:
* 1. channel的status状态
* 2. 当前pipeline的mainStem状态 & 反向同步的pipeline的mainStem状态
* </pre>
*
* @author jianghang
*/
public class PermitMonitor extends ArbitrateLifeCycle implements Monitor {

    private static final Logger      logger                 = LoggerFactory.getLogger(PermitMonitor.class);

    private ZkClientx                zookeeper              = ZooKeeperClient.getInstance();
    private ChannelStatus            channelStatus          = ChannelStatus.STOP;                                           // 标识channel的状态
    private MainStemEventData.Status mainStemStatus         = MainStemEventData.Status.TAKEING;                             // 当前pipeline的mainStem状态
    private MainStemEventData.Status oppositeMainStemStatus = MainStemEventData.Status.TAKEING;                             // 反方向的pipeline的mainStem状态

    private ExecutorService          arbitrateExecutor;
    private BooleanMutex             permitMutex            = new BooleanMutex(false);                                      // 控制器
    private BooleanMutex             channelMutex           = new BooleanMutex(false);
    private List<PermitListener>     listeners              = Collections.synchronizedList(new ArrayList<PermitListener>());
    private volatile boolean         existOpposite          = false;
    private IZkDataListener          channelDataListener;
    private IZkDataListener          mainstemDataListener;
    private IZkDataListener          oppositeMainstemDataListener;

    public PermitMonitor(Long pipelineId){
        super(pipelineId);
        existOpposite = (ArbitrateConfigUtils.getOppositePipeline(getPipelineId()) != null);
        // 开始同步
        channelDataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                initChannelStatus((byte[]) data);
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                channelStatus = ChannelStatus.STOP;
                permitSem();
            }
        };
        String path = StagePathUtils.getChannel(getPipelineId());
        zookeeper.subscribeDataChanges(path, channelDataListener);

        mainstemDataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                initMainStemStatus((byte[]) data);
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                // mainstem节点挂了后,状态直接修改为taking
                mainStemStatus = MainStemEventData.Status.TAKEING;
                permitSem();
            }
        };

        path = StagePathUtils.getMainStem(getPipelineId());
        zookeeper.subscribeDataChanges(path, mainstemDataListener);

        initChannelStatus();
        initMainStemStatus();
        // syncChannelStatus();
        // syncMainStemStatus();
        if (existOpposite) {
            oppositeMainstemDataListener = new IZkDataListener() {

                public void handleDataChange(String dataPath, Object data) throws Exception {
                    initOppositeMainStemStatus((byte[]) data);
                }

                public void handleDataDeleted(String dataPath) throws Exception {
                    // mainstem节点挂了后,状态直接修改为taking
                    oppositeMainStemStatus = MainStemEventData.Status.TAKEING;
                    permitSem();
                }
            };

            path = StagePathUtils.getOppositeMainStem(getPipelineId());
            zookeeper.subscribeDataChanges(path, oppositeMainstemDataListener);
            initOppositeMainStemStatus();
            // syncOppositeMainStemStatus();
        }
        MonitorScheduler.register(this);
    }

    public void reload() {
        if (logger.isDebugEnabled()) {
            logger.debug("## reload Permit pipeline[{}]", getPipelineId());
        }

        try {
            initChannelStatus();
        } catch (Exception e) {// 处理下异常
        }

        try {
            initMainStemStatus();
        } catch (Exception e) {// 处理下异常
        }

        boolean prev = existOpposite;
        existOpposite = (ArbitrateConfigUtils.getOppositePipeline(getPipelineId()) != null);
        if (existOpposite) {
            if (prev == false) {
                // syncOppositeMainStemStatus();// 是个变化的过程,开启反向同步
                String path = StagePathUtils.getOppositeMainStem(getPipelineId());
                zookeeper.subscribeDataChanges(path, oppositeMainstemDataListener);
            }

            try {
                initOppositeMainStemStatus();
            } catch (Exception e) {// 处理下异常
            }
        }

    }

    public void destory() {
        super.destory();

        if (logger.isDebugEnabled()) {
            logger.debug("## destory Permit pipeline[{}]", getPipelineId());
        }
        String path = StagePathUtils.getChannel(getPipelineId());
        zookeeper.unsubscribeDataChanges(path, channelDataListener);

        path = StagePathUtils.getMainStem(getPipelineId());
        zookeeper.unsubscribeDataChanges(path, mainstemDataListener);

        if (existOpposite) {
            path = StagePathUtils.getOppositeMainStem(getPipelineId());
            zookeeper.unsubscribeDataChanges(path, oppositeMainstemDataListener);
        }

        MonitorScheduler.unRegister(this);
    }

    /**
     * 查询是否允许授权处理,非阻塞
     */
    public boolean isPermit() {
        return isPermit(false);
    }

    /**
     * 查询是否允许授权处理,非阻塞,指定是否强制刷新
     */
    public boolean isPermit(boolean reload) {
        if (reload) {// 判断是否需要重新reload
            reload();
        }

        boolean result = channelStatus.isStart() && mainStemStatus.isOverTake();
        if (existOpposite) {// 判断是否存在反向同步
            result &= oppositeMainStemStatus.isOverTake();
        }

        return result;
    }

    /**
     * 查询对应的channel授权状态
     */
    public ChannelStatus getChannelPermit() {
        return getChannelPermit(false);
    }

    /**
     * 查询对应的mainstem授权状态
     */
    public MainStemEventData.Status getMainStemPermit() {
        return getMainStemPermit(false);
    }

    /**
     * 查询对应的channel授权状态,指定是否强制刷新
     */
    public ChannelStatus getChannelPermit(boolean reload) {
        if (reload) {
            initChannelStatus();
        }

        return channelStatus;
    }

    /**
     * 查询对应的mainstem授权状态,指定是否强制刷新
     */
    public MainStemEventData.Status getMainStemPermit(boolean reload) {
        if (reload) {
            initMainStemStatus();
        }

        return mainStemStatus;
    }

    /**
     * 阻塞等待允许授权处理, 支持线程中断信号
     *
     * @return
     */
    public void waitForPermit() throws InterruptedException {
        permitMutex.get();
    }

    /**
     * 阻塞等待允许channel的授权处理, 支持线程中断信号
     *
     * @throws InterruptedException
     */
    public void waitForChannelPermit() throws InterruptedException {
        channelMutex.get();
    }

    // ================ 状态数据同步 ================

    private void initChannelStatus() {
        String path = null;
        try {
            path = StagePathUtils.getChannel(getPipelineId());
            byte[] bytes = zookeeper.readData(path);
            initChannelStatus(bytes);
        } catch (ZkNoNodeException e) {
            channelStatus = ChannelStatus.STOP;
            permitSem();
        } catch (ZkException e) {
            logger.error(path, e);
        }
    }

    private void initChannelStatus(byte[] bytes) {
        ChannelStatus newChannelStatus = JsonUtils.unmarshalFromByte(bytes, ChannelStatus.class);

        if (logger.isDebugEnabled()) {
            logger.debug("pipeline[{}] newChannelStatus is [{}]", getPipelineId(), newChannelStatus);
        }

        synchronized (this) {
            // 发生变化,才触发权限检查
            if (!newChannelStatus.equals(channelStatus)) {
                channelStatus = newChannelStatus;
                permitSem();
            }
        }
    }

    // private void syncChannelStatus() {
    // final String path = StagePathUtils.getChannel(getPipelineId());
    // try {
    // zookeeper.exists(path, new AsyncWatcher() {
    //
    // public void asyncProcess(WatchedEvent event) {
    // MDC.put(ArbitrateConstants.splitPipelineLogFileKey, String.valueOf(getPipelineId()));
    // if (isStop()) {// 如果已关闭,停止递归同步
    // return;
    // }
    // // 出现session expired/connection losscase下,会触发所有的watcher响应,同时老的watcher会继续保留,所以会导致出现多次watcher响应
    // boolean dataChanged = event.getType() == EventType.NodeDataChanged
    // || event.getType() == EventType.NodeDeleted
    // || event.getType() == EventType.NodeCreated
    // || event.getType() == EventType.NodeChildrenChanged;
    // if (dataChanged) {
    // syncChannelStatus();// 开始同步channel状态
    // }
    // }
    // });
    //
    // // 防止 watcher 错过事件
    // initChannelStatus();
    // } catch (KeeperException e) {
    // syncChannelStatus();// 开始同步channel状态
    // logger.error(path, e);
    // } catch (InterruptedException e) {
    // // ignore
    // }
    // }

    private void initMainStemStatus() {
        String path = null;
        try {
            path = StagePathUtils.getMainStem(getPipelineId());
            byte[] bytes = zookeeper.readData(path);
            initMainStemStatus(bytes);
        } catch (ZkNoNodeException e) {
            // mainstem节点挂了后,状态直接修改为taking
            mainStemStatus = MainStemEventData.Status.TAKEING;
            permitSem();
        } catch (ZkException e) {
            logger.error(path, e);
        }
    }

    private void initMainStemStatus(byte[] bytes) {
        MainStemEventData eventData = JsonUtils.unmarshalFromByte(bytes, MainStemEventData.class);
        MainStemEventData.Status newStatus = eventData.getStatus();

        if (logger.isDebugEnabled()) {
            logger.debug("pipeline[{}] new mainStemStatus is [{}]", getPipelineId(), newStatus);
        }

        synchronized (this) {
            if (!mainStemStatus.equals(newStatus)) {
                mainStemStatus = newStatus;
                permitSem();
            }
        }
    }

    // private void syncMainStemStatus() {
    // final String path = StagePathUtils.getMainStem(getPipelineId());
    // try {
    // // exists同样在data发生变化时会触发
    // zookeeper.exists(path, new AsyncWatcher() {
    //
    // public void asyncProcess(WatchedEvent event) {
    // MDC.put(ArbitrateConstants.splitPipelineLogFileKey, String.valueOf(getPipelineId()));
    // if (isStop()) {// 如果已关闭,停止递归同步
    // return;
    // }
    // // 出现session expired/connection losscase下,会触发所有的watcher响应,同时老的watcher会继续保留,所以会导致出现多次watcher响应
    // boolean dataChanged = event.getType() == EventType.NodeDataChanged
    // || event.getType() == EventType.NodeDeleted
    // || event.getType() == EventType.NodeCreated
    // || event.getType() == EventType.NodeChildrenChanged;
    // if (dataChanged) {
    // syncMainStemStatus();// 开始同步mainStem状态
    // }
    // }
    // });
    //
    // initMainStemStatus();
    // } catch (NoNodeException e) {
    // // 可能不存在对应的节点,忽略
    // } catch (KeeperException e) {
    // syncMainStemStatus();// 开始同步mainStem状态
    // logger.error(path, e);
    // } catch (InterruptedException e) {
    // // ignore
    // }
    // }

    private void initOppositeMainStemStatus() {
        String path = null;
        try {
            path = StagePathUtils.getOppositeMainStem(getPipelineId());
            byte[] bytes = zookeeper.readData(path);
            initOppositeMainStemStatus(bytes);
        } catch (ZkNoNodeException e) {
            // mainstem节点挂了后,状态直接修改为taking
            oppositeMainStemStatus = MainStemEventData.Status.TAKEING;
            permitSem();
        } catch (ZkException e) {
            logger.error(path, e);
        }
    }

    private void initOppositeMainStemStatus(byte[] bytes) {
        MainStemEventData eventData = JsonUtils.unmarshalFromByte(bytes, MainStemEventData.class);
        MainStemEventData.Status newStatus = eventData.getStatus();

        if (logger.isDebugEnabled()) {
            logger.debug("pipeline[{}] new oppositeMainStemStatus is [{}]", getPipelineId(), newStatus);
        }

        synchronized (this) {
            if (!oppositeMainStemStatus.equals(newStatus)) {
                oppositeMainStemStatus = newStatus;
                permitSem();
            }
        }
    }

    // private void syncOppositeMainStemStatus() {
    // final String path = StagePathUtils.getOppositeMainStem(getPipelineId());
    // try {
    // // exists同样在data发生变化时会触发
    // zookeeper.exists(path, new AsyncWatcher() {
    //
    // public void asyncProcess(WatchedEvent event) {
    // MDC.put(ArbitrateConstants.splitPipelineLogFileKey, String.valueOf(getPipelineId()));
    // if (existOpposite == false || isStop()) {// 如果已关闭,停止递归同步
    // return;
    // }
    // // 出现session expired/connection losscase下,会触发所有的watcher响应,同时老的watcher会继续保留,所以会导致出现多次watcher响应
    // boolean dataChanged = event.getType() == EventType.NodeDataChanged
    // || event.getType() == EventType.NodeDeleted
    // || event.getType() == EventType.NodeCreated
    // || event.getType() == EventType.NodeChildrenChanged;
    // if (dataChanged) {
    // syncOppositeMainStemStatus();// 开始同步mainStem状态
    // }
    // }
    // });
    //
    // initOppositeMainStemStatus();
    // } catch (NoNodeException e) {
    // // 可能不存在对应的opposite节点,忽略
    // } catch (KeeperException e) {
    // syncOppositeMainStemStatus();// 开始同步mainStem状态
    // logger.error(path, e);
    // } catch (InterruptedException e) {
    // // ignore
    // }
    // }

    /**
     * permit信号的处理
     */
    private void permitSem() {
        if (channelStatus.isStart()) {
            channelMutex.set(true);
            logger.debug("channel status is ok!");
        } else {
            channelMutex.set(false);
            logger.debug("channel status is fail!");
        }

        boolean permit = isPermit(false);
        if (permit == false) {
            if (logger.isDebugEnabled()) {
                logger.debug("Permit is fail!");
            }
            // 如果未授权,则设置信号量为0
            permitMutex.set(false);
        } else {
            // 信号量+1
            if (logger.isDebugEnabled()) {
                logger.debug("Permit is Ok!");
            }
            permitMutex.set(true);
        }

        processChanged(permit);// 通知下变化
    }

    // ======================== listener处理 ======================

    public void addListener(PermitListener listener) {
        if (logger.isDebugEnabled()) {
            logger.debug("## pipeline[{}] add listener [{}]", getPipelineId(),
                         ClassUtils.getShortClassName(listener.getClass()));
        }

        this.listeners.add(listener);
    }

    public void removeListener(PermitListener listener) {
        if (logger.isDebugEnabled()) {
            logger.debug("## pipeline[{}] remove listener [{}]", getPipelineId(),
                         ClassUtils.getShortClassName(listener.getClass()));
        }

        this.listeners.remove(listener);
    }

    private void processChanged(final boolean isPermit) {
        for (final PermitListener listener : listeners) {
            // 异步处理
            arbitrateExecutor.submit(new Runnable() {

                public void run() {
                    MDC.put(ArbitrateConstants.splitPipelineLogFileKey, String.valueOf(getPipelineId()));
                    listener.processChanged(isPermit);
                }
            });
        }
    }

    public void setArbitrateExecutor(ExecutorService arbitrateExecutor) {
        this.arbitrateExecutor = arbitrateExecutor;
    }

}
TOP

Related Classes of com.alibaba.otter.shared.arbitrate.impl.setl.monitor.PermitMonitor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.