MockMasterRPC mock = new MockMasterRPC();
FlumeNode node = new FlumeNode(mock, false /* starthttp */, false /* onshot */);
AckListener pending = node.getAckChecker().getAgentAckQueuer();
// initial source of data.
MemorySinkSource mem = new MemorySinkSource();
byte[] tag = "my tag".getBytes();
AckChecksumInjector<EventSink> ackinj = new AckChecksumInjector<EventSink>(
mem, tag, pending);
ackinj.open();
for (int i = 0; i < 5; i++) {
ackinj.append(new EventImpl(("Event " + i).getBytes()));
}
ackinj.close();
// there now are now 7 events in the mem
// consume data.
EventSink es1 = new ConsoleEventSink();
EventUtil.dumpAll(mem, es1);
System.out.println("---");
String rpt = "foo";
String snk = " { intervalDroppyAppend(5) => { ackChecker => [console, counter(\""
+ rpt + "\") ] } } ";
EventSink es = FlumeBuilder.buildSink(new Context(), snk);
mem.open(); // resets index.
es.open();
EventUtil.dumpAll(mem, es);
node.getAckChecker().checkAcks();
assertEquals(1, node.getAckChecker().pending.size());
mem.open(); // resets index.
// send to collector, collector updates master state (bypassing flakeyness)
EventUtil.dumpAll(mem, ((EventSinkDecorator<EventSink>) es).getSink());
// agent gets state from master, updates it ack states
try {
node.getAckChecker().checkAcks();