Package com.cloudera.flume.handlers.text

Source Code of com.cloudera.flume.handlers.text.TestMultiLineCursor

/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.flume.handlers.text;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.LoggerFactory;

import com.cloudera.flume.core.Event;
import com.cloudera.flume.handlers.text.CustomDelimCursor.ByteBufferAsCharSequence;
import com.cloudera.flume.handlers.text.CustomDelimCursor.DelimMode;
import com.cloudera.util.Clock;
import com.cloudera.util.FileUtil;
import com.cloudera.util.OSUtils;

public class TestMultiLineCursor {
  public static final org.slf4j.Logger LOG = LoggerFactory
      .getLogger(TestMultiLineCursor.class);

  @Test
  public void testByteBufferCharSequence() {
    ByteBuffer buf = ByteBuffer.wrap("This is a test".getBytes());
    ByteBufferAsCharSequence bbcs = new ByteBufferAsCharSequence(buf);

    assertEquals('T', bbcs.charAt(0));
    assertEquals('h', bbcs.charAt(1));
    assertEquals('i', bbcs.charAt(2));
    assertEquals('s', bbcs.charAt(3));
    assertEquals('s', bbcs.charAt(12));
    assertEquals('t', bbcs.charAt(13));

  }

  @Test(expected = IndexOutOfBoundsException.class)
  public void testByteBufferCharSequenceIOOBE() {
    ByteBuffer buf = ByteBuffer.wrap("This is a test".getBytes());
    ByteBufferAsCharSequence bbcs = new ByteBufferAsCharSequence(buf);
    bbcs.charAt(14);
  }

  @Test(expected = IndexOutOfBoundsException.class)
  public void testByteBufferCharSequenceIOOBE2() {
    ByteBuffer buf = ByteBuffer.wrap("This is a test".getBytes());
    ByteBufferAsCharSequence bbcs = new ByteBufferAsCharSequence(buf);
    bbcs.charAt(-1);
  }

  @Test
  public void testBBCSSubsequence() {
    ByteBuffer buf = ByteBuffer.wrap("This is a test".getBytes());
    ByteBufferAsCharSequence bbcs = new ByteBufferAsCharSequence(buf);

    CharSequence cs = bbcs.subSequence(5, 13);
    assertEquals('i', cs.charAt(0));
    assertEquals('s', cs.charAt(1));
    assertEquals('s', cs.charAt(7));

    // original still sane?
    assertEquals('T', bbcs.charAt(0));
    assertEquals('t', bbcs.charAt(13));

  }

  @Test(expected = IndexOutOfBoundsException.class)
  public void testBBCSSubsequenceIOOBE() {
    ByteBuffer buf = ByteBuffer.wrap("This is a test".getBytes());
    ByteBufferAsCharSequence bbcs = new ByteBufferAsCharSequence(buf);

    CharSequence cs = bbcs.subSequence(5, 13);
    cs.charAt(8);
  }

  @Before
  public void setDebug() {
    Logger.getLogger(TailSource.class).setLevel(Level.DEBUG);
  }

  File createDataFile(int count) throws IOException {
    File f = FileUtil.createTempFile("tail", ".tmp");
    f.deleteOnExit();
    FileWriter fw = new FileWriter(f);
    for (int i = 0; i < count; i++) {
      fw.write("test " + i + "blah");
      fw.flush();
    }
    fw.close();
    return f;
  }

  void appendData(File f, int start, int count) throws IOException {
    FileWriter fw = new FileWriter(f, true);
    for (int i = start; i < start + count; i++) {
      fw.write("test " + i + "blah");
      fw.flush();
    }
    fw.close();
  }

  /**
   * Pre-existing file, start cursor, and check we get # of events we expected
   */
  @Test
  public void testCursorPreexisting() throws IOException, InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file
    assertTrue(c.tailBody()); // read the file
    assertFalse(c.tailBody()); // no new data.
    assertEquals(5, q.size()); // should be 5 in queue.
  }

  /**
   * Have a pre-existing file, rename th efile, make sure still follow handle.
   **/
  @Test
  public void testCursorMovedPreexisting() throws IOException,
      InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    File f2 = FileUtil.createTempFile("move", ".tmp");
    f2.delete();
    f2.deleteOnExit();
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file

    f.renameTo(f2); // move the file (should be no problem).

    assertTrue(c.tailBody()); // finish reading the file
    assertEquals(5, q.size()); // should be 5 in queue.

    assertFalse(c.tailBody()); // No more to read
    assertEquals(5, q.size()); // should be 5 in queue.

    assertFalse(c.tailBody()); // attempt to open file again.

  }

  /**
   * This test case exercises the weakness of the current tail implementation,
   * due to not having an java API to get an inode for a particular file. This
   * new implementation is about as close as we can get.
   *
   * It is incorrect behavior but should be rare because it requires a file to
   * change at a higher frequency than generally expected and because file would
   * have to have exactly the same length, and because there is only a small
   * window where this is possible.
   */
  @Test
  @Ignore("When a file rotates in with the same size, we cannot tell!")
  public void testCursorRotatePreexistingFailure() throws IOException,
      InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    File f2 = FileUtil.createTempFile("move", ".tmp");
    f2.delete();
    f2.deleteOnExit();
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file

    f.renameTo(f2); // move the file (should be no problem).

    // wait a second to force a new last modified time.
    Clock.sleep(1000);
    appendData(f, 5, 5);

    assertTrue(c.tailBody()); // finish reading the first file
    assertEquals(5, q.size()); // should be 5 in queue.

    // If we had Inode info we could detect if it is a different file, however,
    // we don't. Thus, this tail cannot tell that a new file with the same
    // size is different with available metadata.
    assertTrue(c.tailBody()); // attempt to open file again.
    assertTrue(c.tailBody()); // read 2nd file.
    // This should be 10, but actually results in 5.
    assertEquals(5, q.size()); // should be 5 in queue.
  }

  /**
   * rotate with a new file that is longer than the original.
   **/
  @Test
  public void testCursorRotatePreexistingNewLonger() throws IOException,
      InterruptedException {

    // Windows rename semantics different than unix
    Assume.assumeTrue(!OSUtils.isWindowsOS());

    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    File f2 = FileUtil.createTempFile("move", ".tmp");
    f2.delete();
    f2.deleteOnExit();
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file

    f.renameTo(f2); // move the file (should be no problem).

    // wait a second to force a new last modified time.
    Clock.sleep(1000);
    appendData(f, 5, 6); // should be a new file.

    assertTrue(c.tailBody()); // finish reading the first file
    assertEquals(5, q.size()); // should be 5 in queue.

    assertTrue(c.tailBody()); // notice raflen!= filelen, reset
    assertTrue(c.tailBody()); // open new file
    assertTrue(c.tailBody()); // read
    assertFalse(c.tailBody()); // no more to read
    assertEquals(11, q.size()); // should be 5 in queue.

    assertFalse(c.tailBody()); // no change this time
    assertEquals(11, q.size());
  }

  /**
   * rotate with a new file that is shorter. This works
   */
  @Test
  public void testCursorRotatePreexistingNewShorter() throws IOException,
      InterruptedException {

    // Windows rename semantics different than unix
    Assume.assumeTrue(!OSUtils.isWindowsOS());

    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    File f2 = FileUtil.createTempFile("move", ".tmp");
    f2.delete();
    f2.deleteOnExit();
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file

    f.renameTo(f2); // move the file (should be no problem).

    // wait a second to force a new last modified time.
    Clock.sleep(1000);
    appendData(f, 5, 4);

    assertTrue(c.tailBody()); // finish reading the first file
    assertEquals(5, q.size()); // should be 5 in queue.

    assertTrue(c.tailBody()); // notice file rotation, reset
    assertTrue(c.tailBody()); // attempt to open file again.
    assertTrue(c.tailBody()); // read 4 lines from new file
    assertFalse(c.tailBody()); // no more to read

    assertEquals(9, q.size()); // should be 5 + 4 in queue.
  }

  /**
   * This has an explicit deletion between creating a new file with the same
   * size.
   */
  @Test
  public void testCursorRotatePreexistingSameSizeWithDelete()
      throws IOException, InterruptedException {

    // Windows rename semantics different than unix
    Assume.assumeTrue(!OSUtils.isWindowsOS());

    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    File f2 = FileUtil.createTempFile("move", ".tmp");
    f2.delete();
    f2.deleteOnExit();
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file

    f.renameTo(f2); // move the file (should be no problem).

    assertTrue(c.tailBody()); // finish reading the first file
    assertEquals(5, q.size()); // should be 5 in queue.

    assertFalse(c.tailBody()); // attempt to file again. (not there, no
    // progress)

    // wait a second to force a new last modified time.
    Clock.sleep(1000);
    appendData(f, 5, 5);

    assertTrue(c.tailBody()); // open the new file

    assertTrue(c.tailBody()); // read new file
    assertFalse(c.tailBody()); // fails this time
    assertEquals(10, q.size());
  }

  /**
   * Here, we complete reading a file and then replace it with a new file that
   * has a different modification time.
   */
  @Test
  public void testCursorRotatePreexistingSameSizeWithNewModtime()
      throws IOException, InterruptedException {

    // Windows rename semantics different than unix
    Assume.assumeTrue(!OSUtils.isWindowsOS());

    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    File f2 = FileUtil.createTempFile("move", ".tmp");
    f2.delete();
    f2.deleteOnExit();
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file
    assertTrue(c.tailBody()); // finish reading the first file
    assertEquals(5, q.size()); // should be 5 in queue.

    assertFalse(c.tailBody()); // no more to read
    assertFalse(c.tailBody()); // no more to read

    // wait a second to force a new last modified time.
    f.renameTo(f2); // move the file (should be no problem).
    Clock.sleep(1000);
    appendData(f, 5, 5);

    assertTrue(c.tailBody()); // notice new mod time and reset, file has data to
                              // read
    assertTrue(c.tailBody()); // open the new file
    assertTrue(c.tailBody()); // read new file
    assertFalse(c.tailBody()); // no more to read
    assertEquals(10, q.size());
  }

  /**
   * read, write to file, read more
   */
  @Test
  public void testCursorNewAppendPreexisting() throws IOException,
      InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
    File f = createDataFile(5);
    f.deleteOnExit();
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open the file

    assertTrue(c.tailBody()); // finish reading the file

    appendData(f, 5, 5);

    assertTrue(c.tailBody()); // attempt to open file again.
    assertEquals(10, q.size()); // should be 5 in queue.
  }

  /**
   * no file, file appears, read
   */
  @Test
  public void testCursorFileAppear() throws IOException, InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
    File f = FileUtil.createTempFile("appear", ".tmp");
    f.delete();
    f.deleteOnExit();
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertFalse(c.tailBody()); // attempt to open, nothing there.
    assertFalse(c.tailBody()); // attempt to open, nothing there.
    assertEquals(0, c.lastChannelSize);
    assertEquals(null, c.in);

    appendData(f, 0, 5);
    assertTrue(c.tailBody()); // finish reading the file
    assertEquals(0, c.lastChannelPos);
    assertTrue(null != c.in);

    assertTrue(c.tailBody()); // finish reading the file
    assertTrue(0 != c.lastChannelSize);
    assertTrue(null != c.in);

    assertFalse(c.tailBody()); // attempt to open file again.
    assertEquals(5, q.size()); // should be 5 in queue.
  }

  /**
   * read end of file with no newline
   */
  @Test
  public void testCursorFileNoNL() throws IOException, InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
    File f = FileUtil.createTempFile("appear", ".tmp");
    f.delete();
    f.deleteOnExit();
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertFalse(c.tailBody()); // attempt to open, nothing there.
    assertFalse(c.tailBody()); // attempt to open, nothing there.
    assertEquals(0, c.lastChannelSize);
    assertEquals(null, c.in);

    FileWriter fw = new FileWriter(f);
    fw.append("No new line");
    fw.close();

    assertTrue(c.tailBody()); // find and load file
    assertEquals(0, c.lastChannelPos);
    assertTrue(null != c.in);

    assertTrue(c.tailBody()); // read but since of EOL, buffer (no progress)
    assertEquals(0, q.size()); // no events since no EOL found
    assertTrue(0 != c.lastChannelSize);
    assertTrue(null != c.in);

    assertFalse(c.tailBody()); // try to read, but in buffer, no progress

    c.flush();
    assertEquals(1, q.size());

    boolean append = true;
    FileWriter fw2 = new FileWriter(f, append);
    fw2.append("more no new line");
    fw2.close();

    assertTrue(c.tailBody()); // open file
    assertTrue(0 != c.lastChannelSize);
    assertTrue(null != c.in);

    assertTrue(c.tailBody()); // read file.
    assertEquals(1, q.size());
    c.flush();
    assertEquals(2, q.size());

  }

  /**
   * file truncated. no file, file appears, read
   */
  @Test
  public void testCursorTruncate() throws IOException, InterruptedException {
    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(10);
    File f = createDataFile(5);
    Cursor c = new CustomDelimCursor(q, f, "blah", DelimMode.EXCLUDE);

    assertTrue(c.tailBody()); // open file the file
    assertEquals(0, c.lastChannelPos);
    assertTrue(null != c.in);

    assertTrue(c.tailBody()); // finish reading the file
    assertTrue(0 != c.lastChannelSize);
    assertTrue(null != c.in);

    assertFalse(c.tailBody()); // attempt to open file again.
    assertEquals(5, q.size()); // should be 5 in queue.

    // truncate the file -- there will be 1 full event and one unproperly closed
    // event.
    RandomAccessFile raf = new RandomAccessFile(f, "rw");
    raf.setLength(10);
    raf.close();

    assertFalse(c.tailBody()); // detect file truncation, no data to read
    assertEquals(5, q.size()); // should be 5 in queue.
    assertEquals(10, c.lastChannelPos);
    assertTrue(null != c.in);

    assertFalse(c.tailBody()); // no data to read
    assertEquals(5, q.size()); // should be 5 in queue.

    appendData(f, 5, 5); // appending data after truncation

    assertTrue(c.tailBody()); // reading appended data
    assertEquals(10, q.size()); // should be 5 + 5 in queue.

    assertFalse(c.tailBody()); // no data to read
    assertEquals(10, q.size()); // should be 5 + 5 in queue.
  }

  /**
   * multiple files
   */
  @Test
  public void testMultiCursor() throws IOException, InterruptedException {

    // normal implementation uses synchronous queue, but we use array blocking
    // queue for single threaded testing
    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    File f1 = createDataFile(5);
    Cursor c1 = new CustomDelimCursor(q, f1, "blah", DelimMode.EXCLUDE);

    File f2 = createDataFile(5);
    Cursor c2 = new CustomDelimCursor(q, f2, "blah", DelimMode.EXCLUDE);

    assertTrue(c1.tailBody()); // open the file
    assertTrue(c2.tailBody()); // open the file
    assertTrue(c1.tailBody()); // open the file
    assertTrue(c2.tailBody()); // open the file
    assertFalse(c1.tailBody()); // read the file
    assertFalse(c2.tailBody()); // no new data.
    assertEquals(10, q.size()); // should be 5 in queue.

    appendData(f1, 5, 5);
    assertTrue(c1.tailBody()); // open the file
    assertFalse(c2.tailBody()); // open the file
    assertEquals(15, q.size()); // should be 5 in queue.

    appendData(f2, 5, 5);
    assertFalse(c1.tailBody()); // open the file
    assertTrue(c2.tailBody()); // open the file
    assertEquals(20, q.size()); // should be 5 in queue.

    assertFalse(c1.tailBody()); // open the file
    assertFalse(c2.tailBody()); // open the file

  }

  // ///////////////////////////////////////////////////////////////////

  @Test
  public void testDelimIncludeNext() throws IOException, InterruptedException {
    File f = FileUtil.createTempFile("tail", ".tmp");
    f.deleteOnExit();
    FileWriter fw = new FileWriter(f);
    fw.append("2011-03-07 00:26:53,918 [exec-thread] WARN commands.SetChokeLimitForm: PhysicalNode: physNode not present yet!\n"
        + "2011-03-07 00:26:54,920 [Thrift server:class org.apache.thrift.TProcessorFactory on class org.apache.thrift.transport.TSaneServerSocket]"
        + " WARN server.TSaneThreadPoolServer: Transport error occurred during acceptance of message.\n"
        + "org.apache.thrift.transport.TTransportException: java.net.SocketException: Socket closed\n"
        + "      at org.apache.thrift.transport.TSaneServerSocket.acceptImpl(TSaneServerSocket.java:137)\n"
        + "      at org.apache.thrift.transport.TServerTransport.accept(TServerTransport.java:31)\n"
        + "      at org.apache.thrift.server.TSaneThreadPoolServer$1.run(TSaneThreadPoolServer.java:175)\n"
        + "Caused by: java.net.SocketException: Socket closed\n"
        + "      at java.net.PlainSocketImpl.socketAccept(Native Method)\n"
        + "      at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:408)\n"
        + "      at java.net.ServerSocket.implAccept(ServerSocket.java:462)\n"
        + "      at java.net.ServerSocket.accept(ServerSocket.java:430)\n"
        + "      at org.apache.thrift.transport.TSaneServerSocket.acceptImpl(TSaneServerSocket.java:132)\n"
        + "      ... 2 more\n"
        + "2011-03-07 00:26:55,924 [main] INFO server.ThriftReportServer: Stopping ReportServer...");
    fw.close();

    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    Cursor c1 = new CustomDelimCursor(q, f, "\\n\\d\\d\\d\\d",
        DelimMode.INCLUDE_NEXT);
    assertTrue(c1.tailBody());
    assertTrue(c1.tailBody());
    assertFalse(c1.tailBody());
    c1.flush();

    // output for debugging.
    List<byte[]> l = new ArrayList<byte[]>(3);
    while (!q.isEmpty()) {
      System.out.println("====");
      byte bs[] = q.poll().getBody();
      l.add(bs);
      System.out.println(new String(bs));
    }

    // should be three events
    assertEquals(3, l.size());
    assertEquals(110, l.get(0).length);
    assertEquals(1008, l.get(1).length);
    assertEquals(88, l.get(2).length);
  }

  @Test
  public void testDelimIncludePrev() throws IOException, InterruptedException {
    File f = FileUtil.createTempFile("tail", ".tmp");
    f.deleteOnExit();
    FileWriter fw = new FileWriter(f);
    fw.append("<?xml version=\"1.0\"?><?xml-stylesheet type=\"text/xsl\"  href=\"logs.xsl\"?>"
        + "<a><b><c/></b><b><c/></b></a>");
    fw.close();

    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    Cursor c1 = new CustomDelimCursor(q, f, "</b>", DelimMode.INCLUDE_PREV);
    assertTrue(c1.tailBody());
    assertTrue(c1.tailBody());
    assertFalse(c1.tailBody());
    c1.flush();

    // output for debugging.
    List<byte[]> l = new ArrayList<byte[]>(3);
    while (!q.isEmpty()) {
      System.out.println("====");
      byte bs[] = q.poll().getBody();
      l.add(bs);
      System.out.println(new String(bs));
    }

    // should be three events
    assertEquals(3, l.size());
    assertEquals(86, l.get(0).length);
    assertEquals(11, l.get(1).length);
    assertEquals(4, l.get(2).length);
  }

  @Test
  public void testDelimExclude() throws IOException, InterruptedException {
    File f = FileUtil.createTempFile("tail", ".tmp");
    f.deleteOnExit();
    FileWriter fw = new FileWriter(f);
    fw.append("a\nb\n\n" + "c\nd\ne\n\n\n" + "f\ng\n\n\n\n" + "h");
    fw.close();

    BlockingQueue<Event> q = new ArrayBlockingQueue<Event>(100);
    // must have more than 1 \n, + is greedy (consumes longest possible match)
    Cursor c1 = new CustomDelimCursor(q, f, "\n\n+", DelimMode.EXCLUDE);
    assertTrue(c1.tailBody());
    assertTrue(c1.tailBody());
    assertFalse(c1.tailBody());
    c1.flush();

    // output for debugging.
    List<byte[]> l = new ArrayList<byte[]>(3);
    while (!q.isEmpty()) {
      System.out.println("====");
      byte bs[] = q.poll().getBody();
      l.add(bs);
      System.out.println(new String(bs));
    }

    // should be three events
    assertEquals(4, l.size());
    assertEquals(3, l.get(0).length);
    assertEquals(5, l.get(1).length);
    assertEquals(3, l.get(2).length);
    assertEquals(1, l.get(3).length);

  }
}
TOP

Related Classes of com.cloudera.flume.handlers.text.TestMultiLineCursor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.