initStage(p3, ArbitrateConstants.NODE_SELECTED);
initStage(p3, ArbitrateConstants.NODE_EXTRACTED);
initStage(p3, ArbitrateConstants.NODE_TRANSFORMED, getData(nid));
// 初始化
LoadStageListener load = new LoadStageListener(pipelineId);
// 开始变化
// initStage(p1, ArbitrateConstants.NODE_LOADED);
destoryStage(p1, ArbitrateConstants.NODE_SELECTED);
destoryStage(p1, ArbitrateConstants.NODE_EXTRACTED);
destoryStage(p1, ArbitrateConstants.NODE_TRANSFORMED);
// destoryStage(p1, ArbitrateConstants.NODE_LOADED);
destoryProcess(p1);
// 准备清理数据
initProcessIds.add(p2);
initProcessIds.add(p3);
List<String> p1Stages = Lists.newArrayList();
stages.put(p1, p1Stages);
List<String> p2Stages = Arrays.asList(ArbitrateConstants.NODE_SELECTED, ArbitrateConstants.NODE_EXTRACTED,
ArbitrateConstants.NODE_TRANSFORMED);
stages.put(p2, p2Stages);
List<String> p3Stages = Arrays.asList(ArbitrateConstants.NODE_SELECTED, ArbitrateConstants.NODE_EXTRACTED,
ArbitrateConstants.NODE_TRANSFORMED);
stages.put(p3, p3Stages);
sleep();
// 进行验证
Long processId = load.waitForProcess();
want.number(processId).isEqualTo(p2);
// 验证下process信息
StageMonitor monitor = ArbitrateFactory.getInstance(pipelineId, StageMonitor.class);
List<Long> processIds = monitor.getCurrentProcessIds();
// 获取下stage信息
List<String> currentP1Stages = monitor.getCurrentStages(p1);
List<String> currentP2Stages = monitor.getCurrentStages(p2);
List<String> currentP3Stages = monitor.getCurrentStages(p3);
want.collection(processIds).isEqualTo(initProcessIds);
want.collection(currentP1Stages).isEqualTo(stages.get(p1));
want.collection(currentP2Stages).isEqualTo(stages.get(p2));
want.collection(currentP3Stages).isEqualTo(stages.get(p3));
// 继续变化
// initStage(p2, ArbitrateConstants.NODE_LOADED);
destoryStage(p2, ArbitrateConstants.NODE_SELECTED);
destoryStage(p2, ArbitrateConstants.NODE_EXTRACTED);
destoryStage(p2, ArbitrateConstants.NODE_TRANSFORMED);
// destoryStage(p2, ArbitrateConstants.NODE_LOADED);
destoryProcess(p2);
// 准备清理数据
initProcessIds.remove(p2);
p2Stages = Lists.newArrayList();
stages.put(p2, p2Stages);
sleep();
// 进行验证
processId = load.waitForProcess();
want.number(processId).isEqualTo(p3);
// 验证下process信息
processIds = monitor.getCurrentProcessIds();
// 获取下stage信息
currentP1Stages = monitor.getCurrentStages(p1);
currentP2Stages = monitor.getCurrentStages(p2);
currentP3Stages = monitor.getCurrentStages(p3);
want.collection(processIds).isEqualTo(initProcessIds);
want.collection(currentP1Stages).isEqualTo(stages.get(p1));
want.collection(currentP2Stages).isEqualTo(stages.get(p2));
want.collection(currentP3Stages).isEqualTo(stages.get(p3));
load.destory();
ArbitrateFactory.destory(pipelineId);
} catch (InterruptedException e) {
want.fail();
} finally {
for (Long processId : initProcessIds) {