8586878889909192939495
Event event; FileOutputStream outputStream = new FileOutputStream( "/tmp/flume-execsource." + Thread.currentThread().getId()); while ((event = channel.take()) != null) { outputStream.write(event.getBody()); outputStream.write('\n'); } outputStream.close();
132133134135136137138139140141142
transaction.begin(); long start = System.currentTimeMillis(); for(int i = 0; i < 5; i++) { Event event = channel.take(); assertNotNull(event); assertNotNull(event.getBody()); assertEquals("flume", new String(event.getBody(), Charsets.UTF_8)); }
123124125126127128129130131132133
List<Event> channelEvents = new ArrayList<Event>(); Transaction txn = channel.getTransaction(); txn.begin(); for (int i = 0; i < 1000; i++) { Event e = channel.take(); if (e == null) { throw new NullPointerException("Event is null"); } channelEvents.add(e); }
124125126127128129130131132133134
8081828384858687888990
127128129130131132133134135136137
7172737475767778798081
cp.processEvent(event1); Transaction tx = memCh.getTransaction(); tx.begin(); Event event1a = memCh.take(); Assert.assertNull(event1a.getHeaders().get("Bad-Words")); tx.commit(); tx.close();
7980818283848586878889
6566676869707172737475
int numEvents = 0; FileOutputStream outputStream = new FileOutputStream( "/tmp/flume-execsource." + Thread.currentThread().getId()); while ((event = channel.take()) != null) { outputStream.write(event.getBody()); outputStream.write('\n'); numEvents++; }