Package org.apache.flume.client.avro

Source Code of org.apache.flume.client.avro.TestReliableSpoolingFileEventReader

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.client.avro;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.flume.Event;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;

public class TestReliableSpoolingFileEventReader {

  private static final Logger logger = LoggerFactory.getLogger
      (TestReliableSpoolingFileEventReader.class);

  private static final File WORK_DIR = new File("target/test/work/" +
      TestReliableSpoolingFileEventReader.class.getSimpleName());

  @Before
  public void setup() throws IOException, InterruptedException {
    if (!WORK_DIR.isDirectory()) {
      Files.createParentDirs(new File(WORK_DIR, "dummy"));
    }

    // write out a few files
    for (int i = 0; i < 4; i++) {
      File fileName = new File(WORK_DIR, "file"+i);
      StringBuilder sb = new StringBuilder();

      // write as many lines as the index of the file
      for (int j = 0; j < i; j++) {
        sb.append("file" + i + "line" + j + "\n");
      }
      Files.write(sb.toString(), fileName, Charsets.UTF_8);
    }
    Thread.sleep(1500L); // make sure timestamp is later
    Files.write("\n", new File(WORK_DIR, "emptylineFile"), Charsets.UTF_8);
  }

  @After
  public void tearDown() {
    deleteDir(WORK_DIR);
  }

  private void deleteDir(File dir) {
    // delete all the files & dirs we created
    File[] files = dir.listFiles();
    for (File f : files) {
      if (f.isDirectory()) {
        File[] subDirFiles = f.listFiles();
        for (File sdf : subDirFiles) {
          if (!sdf.delete()) {
            logger.warn("Cannot delete file {}", sdf.getAbsolutePath());
          }
        }
        if (!f.delete()) {
          logger.warn("Cannot delete directory {}", f.getAbsolutePath());
        }
      } else {
        if (!f.delete()) {
          logger.warn("Cannot delete file {}", f.getAbsolutePath());
        }
      }
    }
    if (!dir.delete()) {
      logger.warn("Cannot delete work directory {}", dir.getAbsolutePath());
    }
  }

  @Test
  public void testIgnorePattern() throws IOException {
    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(WORK_DIR)
        .ignorePattern("^file2$")
        .deletePolicy(DeletePolicy.IMMEDIATE.toString())
        .build();

    List<File> before = listFiles(WORK_DIR);
    Assert.assertEquals("Expected 5, not: " + before, 5, before.size());

    List<Event> events;
    do {
      events = reader.readEvents(10);
      reader.commit();
    } while (!events.isEmpty());

    List<File> after = listFiles(WORK_DIR);
    Assert.assertEquals("Expected 1, not: " + after, 1, after.size());
    Assert.assertEquals("file2", after.get(0).getName());
    List<File> trackerFiles = listFiles(new File(WORK_DIR,
        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR));
    Assert.assertEquals("Expected 0, not: " + trackerFiles, 0,
        trackerFiles.size());
  }

  @Test
  public void testRepeatedCallsWithCommitAlways() throws IOException {
    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(WORK_DIR).build();

    final int expectedLines = 0 + 1 + 2 + 3 + 1;
    int seenLines = 0;
    for (int i = 0; i < 10; i++) {
      List<Event> events = reader.readEvents(10);
      seenLines += events.size();
      reader.commit();
    }

    Assert.assertEquals(expectedLines, seenLines);
  }

  @Test
  public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
    String trackerDirPath =
        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
    File trackerDir = new File(WORK_DIR, trackerDirPath);

    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();

    final int expectedLines = 0 + 1 + 2 + 3 + 1;
    int seenLines = 0;
    for (int i = 0; i < 10; i++) {
      List<Event> events = reader.readEvents(10);
      int numEvents = events.size();
      if (numEvents > 0) {
        seenLines += numEvents;
        reader.commit();

        // ensure that there are files in the trackerDir
        File[] files = trackerDir.listFiles();
        Assert.assertNotNull(files);
        Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir
            .getAbsolutePath(), files.length > 0);
      }
    }

    Assert.assertEquals(expectedLines, seenLines);
  }

  @Test
  public void testFileDeletion() throws IOException {
    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
        .spoolDirectory(WORK_DIR)
        .deletePolicy(DeletePolicy.IMMEDIATE.name())
        .build();

    List<File> before = listFiles(WORK_DIR);
    Assert.assertEquals("Expected 5, not: " + before, 5, before.size());

    List<Event> events;
    do {
      events = reader.readEvents(10);
      reader.commit();
    } while (!events.isEmpty());

    List<File> after = listFiles(WORK_DIR);
    Assert.assertEquals("Expected 0, not: " + after, 0, after.size());
    List<File> trackerFiles = listFiles(new File(WORK_DIR,
        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR));
    Assert.assertEquals("Expected 0, not: " + trackerFiles, 0,
        trackerFiles.size());
  }

  @Test(expected = NullPointerException.class)
  public void testNullConsumeOrder() throws IOException {
    new ReliableSpoolingFileEventReader.Builder()
    .spoolDirectory(WORK_DIR)
    .consumeOrder(null)
    .build();
  }
 
  @Test
  public void testConsumeFileRandomly() throws IOException {
    ReliableEventReader reader
      = new ReliableSpoolingFileEventReader.Builder()
    .spoolDirectory(WORK_DIR)
    .consumeOrder(ConsumeOrder.RANDOM)
    .build();
    File fileName = new File(WORK_DIR, "new-file");
    FileUtils.write(fileName,
      "New file created in the end. Shoud be read randomly.\n");
    Set<String> actual = Sets.newHashSet();
    readEventsForFilesInDir(WORK_DIR, reader, actual);
    Set<String> expected = Sets.newHashSet();
    createExpectedFromFilesInSetup(expected);
    expected.add("");
    expected.add(
      "New file created in the end. Shoud be read randomly.");
    Assert.assertEquals(expected, actual);   
  }

  @Test
  public void testConsumeFileRandomlyNewFile() throws Exception {
    // Atomic moves are not supported in Windows.
    if (SystemUtils.IS_OS_WINDOWS) {
      return;
    }
    final ReliableEventReader reader
      = new ReliableSpoolingFileEventReader.Builder()
      .spoolDirectory(WORK_DIR)
      .consumeOrder(ConsumeOrder.RANDOM)
      .build();
    File fileName = new File(WORK_DIR, "new-file");
    FileUtils.write(fileName,
      "New file created in the end. Shoud be read randomly.\n");
    Set<String> expected = Sets.newHashSet();
    File tempDir = Files.createTempDir();
    File tempFile = new File(tempDir, "t");
    File finalFile = new File(WORK_DIR, "t-file");
    int totalFiles = WORK_DIR.listFiles().length;
    FileUtils.write(tempFile, "Last file");
    final Set<String> actual = Sets.newHashSet();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    final Semaphore semaphore = new Semaphore(0);
    Future<Void> wait = executor.submit(
      new Callable<Void>() {
        @Override
        public Void call() throws Exception {
          readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore);
          return null;
        }
      }
    );
    semaphore.acquire();
    tempFile.renameTo(finalFile);
    wait.get();
    int listFilesCount = ((ReliableSpoolingFileEventReader)reader)
      .getListFilesCount();
    finalFile.delete();
    FileUtils.deleteQuietly(tempDir);
    createExpectedFromFilesInSetup(expected);
    expected.add("");
    expected.add(
      "New file created in the end. Shoud be read randomly.");
    expected.add("Last file");
    Assert.assertTrue(listFilesCount < (totalFiles + 2));
    Assert.assertEquals(expected, actual);
  }


  @Test
  public void testConsumeFileOldest() throws IOException, InterruptedException {
    ReliableEventReader reader
      = new ReliableSpoolingFileEventReader.Builder()
      .spoolDirectory(WORK_DIR)
      .consumeOrder(ConsumeOrder.OLDEST)
      .build();
    File file1 = new File(WORK_DIR, "new-file1");  
    File file2 = new File(WORK_DIR, "new-file2");   
    File file3 = new File(WORK_DIR, "new-file3");
    Thread.sleep(1000L);
    FileUtils.write(file2, "New file2 created.\n");
    Thread.sleep(1000L);
    FileUtils.write(file1, "New file1 created.\n");
    Thread.sleep(1000L);
    FileUtils.write(file3, "New file3 created.\n");
    // order of age oldest to youngest (file2, file1, file3)
    List<String> actual = Lists.newLinkedList();   
    readEventsForFilesInDir(WORK_DIR, reader, actual);       
    List<String> expected = Lists.newLinkedList();
    createExpectedFromFilesInSetup(expected);
    expected.add(""); // Empty file was added in the last in setup.
    expected.add("New file2 created.");
    expected.add("New file1 created.");
    expected.add("New file3 created.");   
    Assert.assertEquals(expected, actual);
  }
 
  @Test
  public void testConsumeFileYoungest()
    throws IOException, InterruptedException {
    ReliableEventReader reader
      = new ReliableSpoolingFileEventReader.Builder()
      .spoolDirectory(WORK_DIR)
      .consumeOrder(ConsumeOrder.YOUNGEST)
      .build();
    File file1 = new File(WORK_DIR, "new-file1");
    File file2 = new File(WORK_DIR, "new-file2");
    File file3 = new File(WORK_DIR, "new-file3");
    Thread.sleep(1000L);
    FileUtils.write(file2, "New file2 created.\n");
    Thread.sleep(1000L);
    FileUtils.write(file3, "New file3 created.\n");
    Thread.sleep(1000L);
    FileUtils.write(file1, "New file1 created.\n");
    // order of age youngest to oldest (file2, file3, file1)
    List<String> actual = Lists.newLinkedList();   
    readEventsForFilesInDir(WORK_DIR, reader, actual);       
    List<String> expected = Lists.newLinkedList();
    createExpectedFromFilesInSetup(expected);
    Collections.sort(expected);
    // Empty Line file was added in the last in Setup.
    expected.add(0, "");
    expected.add(0, "New file2 created.");   
    expected.add(0, "New file3 created.");
    expected.add(0, "New file1 created.");
       
    Assert.assertEquals(expected, actual);
  }

  @Test
  public void testConsumeFileOldestWithLexicographicalComparision()
    throws IOException, InterruptedException {
    ReliableEventReader reader
      = new ReliableSpoolingFileEventReader.Builder()
      .spoolDirectory(WORK_DIR)
      .consumeOrder(ConsumeOrder.OLDEST)
      .build();
    File file1 = new File(WORK_DIR, "new-file1");
    File file2 = new File(WORK_DIR, "new-file2");
    File file3 = new File(WORK_DIR, "new-file3");
    Thread.sleep(1000L);
    FileUtils.write(file3, "New file3 created.\n");
    FileUtils.write(file2, "New file2 created.\n");
    FileUtils.write(file1, "New file1 created.\n");
    file1.setLastModified(file3.lastModified());
    file1.setLastModified(file2.lastModified());
    // file ages are same now they need to be ordered
    // lexicographically (file1, file2, file3).
    List<String> actual = Lists.newLinkedList();
    readEventsForFilesInDir(WORK_DIR, reader, actual);
    List<String> expected = Lists.newLinkedList();
    createExpectedFromFilesInSetup(expected);
    expected.add(""); // Empty file was added in the last in setup.
    expected.add("New file1 created.");
    expected.add("New file2 created.");
    expected.add("New file3 created.");
    Assert.assertEquals(expected, actual);
  }

  @Test
  public void testConsumeFileYoungestWithLexicographicalComparision()
    throws IOException, InterruptedException {
    ReliableEventReader reader
      = new ReliableSpoolingFileEventReader.Builder()
      .spoolDirectory(WORK_DIR)
      .consumeOrder(ConsumeOrder.YOUNGEST)
      .build();
    File file1 = new File(WORK_DIR, "new-file1");
    File file2 = new File(WORK_DIR, "new-file2");
    File file3 = new File(WORK_DIR, "new-file3");
    Thread.sleep(1000L);
    FileUtils.write(file1, "New file1 created.\n");
    FileUtils.write(file2, "New file2 created.\n");
    FileUtils.write(file3, "New file3 created.\n");
    file1.setLastModified(file3.lastModified());
    file1.setLastModified(file2.lastModified());
    // file ages are same now they need to be ordered
    // lexicographically (file1, file2, file3).
    List<String> actual = Lists.newLinkedList();
    readEventsForFilesInDir(WORK_DIR, reader, actual);
    List<String> expected = Lists.newLinkedList();
    createExpectedFromFilesInSetup(expected);
    expected.add(0, ""); // Empty file was added in the last in setup.
    expected.add(0, "New file3 created.");
    expected.add(0, "New file2 created.");
    expected.add(0, "New file1 created.");
    Assert.assertEquals(expected, actual);
  }

  @Test public void testLargeNumberOfFilesOLDEST() throws IOException {   
    templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000);
  }
  @Test public void testLargeNumberOfFilesYOUNGEST() throws IOException {   
    templateTestForLargeNumberOfFiles(ConsumeOrder.YOUNGEST, new Comparator<Long>() {

      @Override
      public int compare(Long o1, Long o2) {
        return o2.compareTo(o1);
      }
    }, 1000);
  }
  @Test public void testLargeNumberOfFilesRANDOM() throws IOException {   
    templateTestForLargeNumberOfFiles(ConsumeOrder.RANDOM, null, 1000);
  }

  @Test
  public void testZeroByteTrackerFile() throws IOException {
    String trackerDirPath =
            SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
    File trackerDir = new File(WORK_DIR, trackerDirPath);
    if(!trackerDir.exists()) {
      trackerDir.mkdir();
    }
    File trackerFile = new File(trackerDir, ReliableSpoolingFileEventReader.metaFileName);
    if(trackerFile.exists()) {
      trackerFile.delete();
    }
    trackerFile.createNewFile();

    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
            .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();
    final int expectedLines = 1;
    int seenLines = 0;
    List<Event> events = reader.readEvents(10);
    int numEvents = events.size();
    if (numEvents > 0) {
      seenLines += numEvents;
      reader.commit();
    }
    // This line will fail, if the zero-byte tracker file has not been handled
    Assert.assertEquals(expectedLines, seenLines);
  }

  private void templateTestForLargeNumberOfFiles(ConsumeOrder order,
      Comparator<Long> comparator,
      int N) throws IOException {
    File dir = null;
    try {
      dir = new File(
        "target/test/work/" + this.getClass().getSimpleName() +
          "_large");
      Files.createParentDirs(new File(dir, "dummy"));
      ReliableEventReader reader
        = new ReliableSpoolingFileEventReader.Builder()
      .spoolDirectory(dir).consumeOrder(order).build();
      Map<Long, List<String>> expected;
      if (comparator == null) {
        expected = new TreeMap<Long, List<String>>();
      } else {
        expected = new TreeMap<Long, List<String>>(comparator);
      }
      for (int i = 0; i < N; i++) {
        File f = new File(dir, "file-" + i);
        String data = "file-" + i;
        Files.write(data, f, Charsets.UTF_8);
        if (expected.containsKey(f.lastModified())) {
          expected.get(f.lastModified()).add(data);
        } else {
          expected.put(f.lastModified(), Lists.newArrayList(data));
        }
      }
      Collection<String> expectedList;
      if (order == ConsumeOrder.RANDOM) {
        expectedList = Sets.newHashSet();
      } else {
        expectedList = Lists.newArrayList();
      }
      for (Entry<Long, List<String>> entry : expected.entrySet()) {
        Collections.sort(entry.getValue());
        expectedList.addAll(entry.getValue());
      }
      for (int i = 0; i < N; i++) {
        List<Event> events;
        events = reader.readEvents(10);
        for (Event e : events) {
          if (order == ConsumeOrder.RANDOM) {           
            Assert.assertTrue(expectedList.remove(new String(e.getBody())));
          } else {
            Assert.assertEquals(
              ((ArrayList<String>) expectedList).get(0),
              new String(e.getBody()));
            ((ArrayList<String>) expectedList).remove(0);
          }
        }
        reader.commit();       
      }
    } finally {
      deleteDir(dir);
    }
  }

  private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
    Collection<String> actual) throws IOException {
    readEventsForFilesInDir(dir, reader, actual, null);
  }
   
  /* Read events, one for each file in the given directory. */
  private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
      Collection<String> actual, Semaphore semaphore) throws IOException {
    List<Event> events;
    for (int i=0; i < listFiles(dir).size(); i++) {
      events = reader.readEvents(10);
      for (Event e : events) {
        actual.add(new String(e.getBody()));
      }
      reader.commit();
      try {
        if (semaphore != null) {
          semaphore.release();
        }
      } catch (Exception ex) {
        throw new IOException(ex);
      }
    }
  }   
  /* Create expected results out of the files created in the setup method. */
  private void createExpectedFromFilesInSetup(Collection<String> expected) {
    for (int i = 0; i < 4; i++) {     
      for (int j = 0; j < i; j++) {       
        expected.add("file" + i + "line" + j);
      }     
    }
  }
 
  private static List<File> listFiles(File dir) {
    List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter
        () {
      @Override
      public boolean accept(File pathname) {
        return !pathname.isDirectory();
      }
    }));
    return files;
  }
}
TOP

Related Classes of org.apache.flume.client.avro.TestReliableSpoolingFileEventReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.