Package com.cloudera.flume.handlers.text

Source Code of com.cloudera.flume.handlers.text.TailSource

/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.flume.handlers.text;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSource;
import com.cloudera.util.Clock;
import com.google.common.base.Preconditions;

/**
* This "tail"s a filename. Like a unix tail utility, it will wait for more
* information to come to the file and periodically dump data as it is written.
* It assumes that each line is a separate event.
*
* This is for legacy log files where the file system is the only mechanism
* flume has to get events. It assumes that there is one entry per line (per
* \n). If a file currently does not end with \n, it will remain buffered
* waiting for more data until either a different file with the same name has
* appeared, or the tail source is closed.
*
* It also has logic to deal with file rotations -- if a file is renamed and
* then a new file is created, it will shift over to the new file. The current
* file is read until the file pointer reaches the end of the file. It will wait
* there until periodic checks notice that the file has become longer. If the
* file "shrinks" we assume that the file has been replaced with a new log file.
*
* TODO (jon) This is not perfect.
*
* This reads bytes and does not assume any particular character encoding other
* than that entry are separated by new lines ('\n').
*
* There is a possibility for inconsistent conditions when logs are rotated.
*
* 1) If rotation periods are faster than periodic checks, a file may be missed.
* (this mimics gnu-tail semantics here)
*
* 2) Truncations of files will reset the file pointer. This is because the Java
* file api does not a mechanism to get the inode of a particular file, so there
* is no way to differentiate between a new file or a truncated file!
*
* 3) If a file is being read, is moved, and replaced with another file of
* exactly the same size in a particular window and the last mod time of the two
* are identical (this is often at the second granularity in FS's), the data in
* the new file may be lost. If the original file has been completely read and
* then replaced with a file of the same length this problem will not occur.
* (See TestTailSource.readRotatePrexistingFailure vs
* TestTailSource.readRotatePrexistingSameSizeWithNewModetime)
*
* Ideally this would use the inode number of file handle number but didn't find
* java api to get these, or Java 7's WatchService file watcher API.
*/
public class TailSource extends EventSource.Base {
  private static final Logger LOG = LoggerFactory.getLogger(TailSource.class);
  public static final String A_TAILSRCFILE = "tailSrcFile";

  private static int thdCount = 0;
  private volatile boolean done = false;

  private final long sleepTime; // millis
  final List<Cursor> cursors = new ArrayList<Cursor>();
  private final List<Cursor> newCursors = new ArrayList<Cursor>();
  private final List<Cursor> rmCursors = new ArrayList<Cursor>();

  // We "queue" only allowing a single Event.
  final SynchronousQueue<Event> sync = new SynchronousQueue<Event>();
  private TailThread thd = null;

  /**
   * Constructor for backwards compatibility.
   */
  public TailSource(File f, long offset, long waitTime) {
    this(f, offset, waitTime, false);
  }

  /**
   * Specify the file, the starting offset (something >=0) and wait time between
   * checks in millis. If startFromEnd is set, begin reading the file at the
   * end, not the beginning.
   */
  public TailSource(File f, long offset, long waitTime, boolean startFromEnd) {
    Preconditions.checkArgument(offset >= 0 || startFromEnd,
        "offset needs to be >=0 or startFromEnd needs to be true");
    Preconditions.checkNotNull(f);
    Preconditions.checkArgument(waitTime > 0);
    this.sleepTime = waitTime;

    // add initial cursor.
    long fileLen = f.length();
    long readOffset = startFromEnd ? fileLen : offset;
    long modTime = f.lastModified();
    Cursor c = new Cursor(sync, f, readOffset, fileLen, modTime);
    addCursor(c);
  }

  /**
   * This creates an empty tail source. It expects something else to add cursors
   * to it
   */
  public TailSource(long waitTime) {
    this.sleepTime = waitTime;
  }

  /**
   * To support multiple tail readers, we have a Cursor for each file name
   *
   * It takes a File and optionally a starting offset in the file. From there it
   * attempts to open the file as a RandomAccessFile, and reads data using the
   * RAF's FileChannel into a ByteBuffer. As \n's are found in the buffer, new
   * event bodies are created and new Events are create and put into the sync
   * queue.
   *
   * If a file rotate is detected, the previous RAF is closed, and the File with
   * the specified name is opened.
   */
  static class Cursor {
    final BlockingQueue<Event> sync;
    // For following a file name
    final File file;
    // For buffering reads
    final ByteBuffer buf = ByteBuffer.allocateDirect(Short.MAX_VALUE);
    // For closing file handles and getting FileChannels
    RandomAccessFile raf = null;
    // For reading data
    FileChannel in = null;

    long lastFileMod;
    long lastChannelPos;
    long lastChannelSize;
    int readFailures;

    Cursor(BlockingQueue<Event> sync, File f) {
      this(sync, f, 0, 0, 0);
    }

    Cursor(BlockingQueue<Event> sync, File f, long lastReadOffset,
        long lastFileLen, long lastMod) {
      this.sync = sync;
      this.file = f;
      this.lastChannelPos = lastReadOffset;
      this.lastChannelSize = lastFileLen;
      this.lastFileMod = lastMod;
      this.readFailures = 0;
    }

    /**
     * Setup the initial cursor position.
     */
    void initCursorPos() throws InterruptedException {
      try {
        LOG.debug("initCursorPos " + file);
        raf = new RandomAccessFile(file, "r");
        raf.seek(lastChannelPos);
        in = raf.getChannel();
      } catch (FileNotFoundException e) {
        resetRAF();
      } catch (IOException e) {
        resetRAF();
      }
    }

    /**
     * Flush any buffering the cursor has done. If the buffer does not end with
     * '\n', the remainder will get turned into a new event.
     *
     * This assumes that any remainders in buf are a single event -- this
     * corresponds to the last lines of files that do not end with '\n'
     */
    void flush() throws InterruptedException {
      if (raf != null) {
        try {
          raf.close(); // release handles
        } catch (IOException e) {
          LOG.error("problem closing file " + e.getMessage(), e);
        }
      }

      buf.flip(); // buffer consume mode
      int remaining = buf.remaining();
      if (remaining > 0) {
        byte[] body = new byte[remaining];
        buf.get(body, 0, remaining); // read all data

        Event e = new EventImpl(body);
        e.set(A_TAILSRCFILE, file.getName().getBytes());
        try {
          sync.put(e);
        } catch (InterruptedException e1) {
          LOG.error("interruptedException! " + e1.getMessage(), e1);
          throw e1;
        }
      }
      in = null;
      buf.clear();
    }

    /**
     * Closes cursor and releases all resources used by it. NOTE: to flush any
     * buffering the cursor has done and close cursor use {@link #flush()}
     * instead.
     */
    void close() {
      if (raf != null) {
        try {
          raf.close(); // release handles
        } catch (IOException e) {
          LOG.error("problem closing file " + e.getMessage(), e);
        }
      }

      in = null;
      buf.clear();
    }

    /**
     * Restart random accessfile cursor
     *
     * This assumes that the buf is in write mode, and that any remainders in
     * buf are last bytes of files that do not end with \n
     *
     * @throws InterruptedException
     */
    void resetRAF() throws InterruptedException {
      LOG.debug("reseting cursor");
      flush();
      lastChannelPos = 0;
      lastFileMod = 0;
      readFailures = 0;
    }

    /**
     * There is a race here on file system states -- a truly evil dir structure
     * can change things between these calls. Nothing we can really do though.
     *
     * Ideally we would get and compare inode numbers, but in java land, we
     * can't do that.
     *
     * returns true if changed, false if not.
     */
    boolean checkForUpdates() throws IOException {
      LOG.debug("tail " + file + " : recheck");
      if (file.isDirectory()) { // exists but not a file
        IOException ioe = new IOException("Tail expects a file '" + file
            + "', but it is a dir!");
        LOG.error(ioe.getMessage());
        throw ioe;
      }

      if (!file.exists()) {
        LOG.debug("Tail '" + file + "': nothing to do, waiting for a file");
        return false; // do nothing
      }

      if (!file.canRead()) {
        throw new IOException("Permission denied on " + file);
      }

      // oh! f exists and is a file
      try {
        // let's open the file
        raf = new RandomAccessFile(file, "r");
        lastFileMod = file.lastModified();
        in = raf.getChannel();
        lastChannelPos = in.position();
        lastChannelSize = in.size();

        LOG.debug("Tail '" + file + "': opened last mod=" + lastFileMod
            + " lastChannelPos=" + lastChannelPos + " lastChannelSize="
            + lastChannelSize);
        return true;
      } catch (FileNotFoundException fnfe) {
        // possible because of file system race, we can recover from this.
        LOG.debug("Tail '" + file
            + "': a file existed then disappeared, odd but continue");
        return false;
      }
    }

    boolean extractLines(ByteBuffer buf) throws IOException,
        InterruptedException {
      boolean madeProgress = false;
      int start = buf.position();
      buf.mark();
      while (buf.hasRemaining()) {
        byte b = buf.get();
        // TODO windows: ('\r\n') line separators
        if (b == '\n') {
          int end = buf.position();
          int sz = end - start;
          byte[] body = new byte[sz - 1];
          buf.reset(); // go back to mark
          buf.get(body, 0, sz - 1); // read data
          buf.get(); // read '\n'
          buf.mark(); // new mark.
          start = buf.position();

          Event e = new EventImpl(body);
          e.set(A_TAILSRCFILE, file.getName().getBytes());
          sync.put(e);
          madeProgress = true;
        }
      }

      // rewind for any left overs
      buf.reset();
      buf.compact(); // shift leftovers to front.
      return madeProgress;
    }

    /**
     * Attempt to get new data.
     *
     * Returns true if cursor's state has changed (progress was made)
     */
    boolean tailBody() throws InterruptedException {
      try {
        // no file named f currently, needs to be opened.
        if (in == null) {
          LOG.debug("tail " + file + " : cur file is null");
          return checkForUpdates();
        }

        long chlen = in.size();
        boolean madeProgress = readAllFromChannel();

        if (madeProgress) {
          lastChannelSize = lastChannelPos; // this is more safe than in.size()
          // due to possible race conditions
          // NOTE: this is racy (but very very small chance): if file was
          // rotated right before execution of next line with the file of the
          // same length and this new file is never modified the logic in this
          // method will never discover the rotation.
          lastFileMod = file.lastModified();
          LOG.debug("tail " + file + " : new data found");
          return true;
        }

        // this may seem racy but race conds handled properly with
        // extra checks below
        long fmod = file.lastModified();
        long flen = file.length(); // length of filename

        // If nothing can be read from channel, then cases:
        // 1) no change -> return
        if (flen == lastChannelSize && fmod == lastFileMod) {
          LOG.debug("tail " + file + " : no change");
          return false;
        }

        // 2) file rotated
        LOG.debug("tail " + file + " : file rotated?");
        // a) rotated with file of same length
        if (flen == lastChannelSize && fmod != lastFileMod) {
          // this is not trivial situation: it can
          // be "false positive", so we want to be sure rotation with same
          // file length really happened by additional checks.
          // Situation is ultimately rare so doing time consuming/heavy
          // things is OK here
          LOG.debug("tail " + file + " : file rotated with new one with " +
                  "same length?");
          raf.getFD().sync(); // Alex: not sure this helps at all...
          Thread.sleep(1000); // sanity interval: more data may be written
        }

        // b) "false positive" for file rotation due to race condition
        // during fetching file stats: actually more data was added into the
        // file (and hence it is visible in channel)
        if (in.size() != chlen) {
          LOG.debug("tail " + file + " : there's extra data to be read from " +
                  "file, aborting file rotation handling");
          return true;
        }

        // c) again "false positive" for file rotation: file was truncated
        if (chlen < lastChannelSize) {
          LOG.debug("tail " + file + " : file was truncated, " +
                  "aborting file rotation handling");
          lastChannelSize = chlen;
          lastChannelPos = chlen;
          lastFileMod = file.lastModified();
          in.position(chlen); // setting cursor to the last position of
          // truncated file
          return false;
        }

        LOG.debug("tail " + file + " : file rotated!");
        resetRAF(); // resetting raf to catch up new file

        // if file is not empty report true to start reading from it without a
        // delay
        return flen > 0;

      } catch (IOException e) {
        LOG.debug(e.getMessage(), e);
        in = null;
        readFailures++;

        /*
         * Back off on retries after 3 failures so we don't burn cycles. Note
         * that this can exacerbate the race condition illustrated above where a
         * file is truncated, created, written to, and truncated / removed while
         * we're sleeping.
         */
        if (readFailures > 3) {
          LOG.warn("Encountered " + readFailures + " failures on "
              + file.getAbsolutePath() + " - sleeping");
          return false;
        }
      }
      return true;
    }

    private boolean readAllFromChannel() throws IOException, InterruptedException {
      // I make this a rendezvous because this source is being pulled
      // copy data from current file pointer to EOF to dest.
      boolean madeProgress = false;

      int rd;
      while ((rd = in.read(buf)) > 0) {
        madeProgress = true;

        // need char encoder to find line breaks in buf.
        lastChannelPos += (rd < 0 ? 0 : rd); // rd == -1 if at end of
        // stream.

        int lastRd = 0;
        boolean progress = false;
        do {

          if (lastRd == -1 && rd == -1) {
            return true;
          }

          buf.flip();

          // extract lines
          extractLines(buf);

          lastRd = rd;
        } while (progress); // / potential race

        // if the amount read catches up to the size of the file, we can fall
        // out and let another fileChannel be read. If the last buffer isn't
        // read, then it remain in the byte buffer.

      }

      LOG.debug("tail " + file + ": last read position " + lastChannelPos + ", madeProgress: " + madeProgress);

      return madeProgress;
    }
  }

  /**
   * This is the main driver thread that runs through the file cursor list
   * checking for updates and sleeping if there are none.
   */
  class TailThread extends Thread {

    TailThread() {
      super("TailThread-" + thdCount++);
    }

    @Override
    public void run() {
      try {
        // initialize based on initial settings.
        for (Cursor c : cursors) {
          c.initCursorPos();
        }

        while (!done) {
          synchronized (newCursors) {
            cursors.addAll(newCursors);
            newCursors.clear();
          }

          synchronized (rmCursors) {
            cursors.removeAll(rmCursors);
            for (Cursor c : rmCursors) {
              c.flush();
            }
            rmCursors.clear();
          }

          boolean madeProgress = false;
          for (Cursor c : cursors) {
            LOG.debug("Progress loop: " + c.file);
            if (c.tailBody()) {
              madeProgress = true;
            }
          }

          if (!madeProgress) {
            Clock.sleep(sleepTime);
          }
        }
        LOG.debug("Tail got done flag");
      } catch (InterruptedException e) {
        LOG.error("tail unexpected interrupted: " + e.getMessage(), e);
      } finally {
        LOG.info("TailThread has exited");
      }

    }
  }

  /**
   * Add another file Cursor to tail concurrently.
   */
  synchronized void addCursor(Cursor cursor) {
    Preconditions.checkArgument(cursor != null);

    if (thd == null) {
      cursors.add(cursor);
      LOG.debug("Unstarted Tail has added cursor: " + cursor.file.getName());

    } else {
      synchronized (newCursors) {
        newCursors.add(cursor);
      }
      LOG.debug("Tail added new cursor to new cursor list: "
          + cursor.file.getName());
    }

  }

  /**
   * Remove an existing cursor to tail.
   */
  synchronized public void removeCursor(Cursor cursor) {
    Preconditions.checkArgument(cursor != null);
    if (thd == null) {
      cursors.remove(cursor);
    } else {

      synchronized (rmCursors) {
        rmCursors.add(cursor);
      }
    }

  }

  @Override
  public void close() throws IOException {
    synchronized (this) {
      done = true;
      thd = null;
    }
  }

  /**
   * This function will block when the end of all the files it is trying to tail
   * is reached.
   */
  @Override
  public Event next() throws IOException {
    try {
      while (!done) {
        // This blocks on the synchronized queue until a new event arrives.
        Event e = sync.poll(100, TimeUnit.MILLISECONDS);
        if (e == null)
          continue; // nothing there, retry.
        updateEventProcessingStats(e);
        return e;
      }
      return null; // closed
    } catch (InterruptedException e1) {
      LOG.warn("next unexpectedly interrupted :" + e1.getMessage(), e1);
      Thread.currentThread().interrupt();
      throw new IOException(e1.getMessage());
    }
  }

  @Override
  synchronized public void open() throws IOException {
    if (thd != null) {
      throw new IllegalStateException("Attempted to open tail source twice!");
    }
    thd = new TailThread();
    thd.start();
  }

  public static SourceBuilder builder() {
    return new SourceBuilder() {

      @Override
      public EventSource build(Context ctx, String... argv) {
        if (argv.length != 1 && argv.length != 2) {
          throw new IllegalArgumentException(
              "usage: tail(filename, [startFromEnd]) ");
        }
        boolean startFromEnd = false;
        if (argv.length == 2) {
          startFromEnd = Boolean.parseBoolean(argv[1]);
        }
        return new TailSource(new File(argv[0]), 0, FlumeConfiguration.get()
            .getTailPollPeriod(), startFromEnd);
      }
    };
  }

  public static SourceBuilder multiTailBuilder() {
    return new SourceBuilder() {

      @Override
      public EventSource build(Context ctx, String... argv) {
        Preconditions.checkArgument(argv.length >= 1,
            "usage: multitail(file1[, file2[, ...]]) ");
        boolean startFromEnd = false;
        long pollPeriod = FlumeConfiguration.get().getTailPollPeriod();
        TailSource src = null;
        for (int i = 0; i < argv.length; i++) {
          if (src == null) {
            src = new TailSource(new File(argv[i]), 0, pollPeriod, startFromEnd);
          } else {
            src.addCursor(new Cursor(src.sync, new File(argv[i])));
          }
        }
        return src;
      }
    };
  }

}
TOP

Related Classes of com.cloudera.flume.handlers.text.TailSource

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.