110111112113114115116117118119120
private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) { ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount]; for (int i = 0; i < msgs.size(); i++) { TaskMessage message = msgs.get(i); int task = message.task(); if (task == -1) { closing = true; return null; }
6566676869707172737475
@Override public void handleEvent(Object event, boolean endOfBatch) throws Exception { TaskMessage message = (TaskMessage) event; int task = message.task(); DisruptorQueue queue = deserializeQueues.get(task); if (queue == null) { LOG.warn("Received invalid message directed at port " + task + ". Dropping...");
302303304305306307308309310311312
Assert.assertEquals(String.valueOf(i + base), new String(message.message())); if (i % 1000 == 0) { System.out.println("Receive " + message.task()); } } System.out.println("Finish Receive ");
386387388389390391392393394395396
457458459460461462463464465466467
for (int i = 1; i < base; i++) { TaskMessage message = server.recv(0); JStormUtils.sleepMs(100); Assert.assertEquals(req_msg, new String(message.message())); System.out.println("receive " + message.task()); } System.out.println("Finish Receive ");