Package org.apache.flume.sink.hdfs

Source Code of org.apache.flume.sink.hdfs.HDFSEventSink$WriterLinkedHashMap

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flume.sink.hdfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.FlumeFormatter;
import org.apache.flume.sink.hdfs.BucketWriter.BucketFlushStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HDFSEventSink extends AbstractSink implements PollableSink,
    Configurable {
  private static final Logger LOG = LoggerFactory
      .getLogger(HDFSEventSink.class);

  static final long defaultRollInterval = 30;
  static final long defaultRollSize = 1024;
  static final long defaultRollCount = 10;
  static final String defaultFileName = "FlumeData";
  static final String defaultBucketFormat = "%yyyy-%mm-%dd/%HH";
  static final long defaultBatchSize = 1;
  static final long defaultTxnEventMax = 100;
  static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
  static final int defaultMaxOpenFiles = 5000;
  static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
  static final long defaultAppendTimeout = 1000;

  private long rollInterval;
  private long rollSize;
  private long rollCount;
  private long txnEventMax;
  private long batchSize;
  private CompressionCodec codeC;
  private CompressionType compType;
  private String fileType;
  private String path;
  private int maxOpenFiles;
  private String writeFormat;
  private HDFSWriterFactory myWriterFactory;
  private ExecutorService executor;
  private long appendTimeout;

  /*
   * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
   * the oldest file handle if there are too many open ones
   */
  private class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
    private static final long serialVersionUID = 1L;

    protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
      /*
       * FIXME: We probably shouldn't shared state this way. Make this class
       * private static and explicitly expose maxOpenFiles.
       */
      if (super.size() > maxOpenFiles) {
        // If we have more that max open files, then close the last one and
        // return true
        try {
          eldest.getValue().close();
        } catch (IOException eI) {
          LOG.warn(eldest.getKey().toString(), eI);
        }
        return true;
      } else {
        return false;
      }
    }
  }

  final WriterLinkedHashMap sfWriters = new WriterLinkedHashMap();

  // Used to short-circuit around doing regex matches when we know there are
  // no templates to be replaced.
  // private boolean shouldSub = false;

  public HDFSEventSink() {
    myWriterFactory = new HDFSWriterFactory();
  }
 
  public HDFSEventSink(HDFSWriterFactory newWriterFactory) {
    myWriterFactory = newWriterFactory;
  }

  // read configuration and setup thresholds
  @Override
  public void configure(Context context) {
    String dirpath = context.get("hdfs.path", String.class);
    String fileName = context.get("hdfs.filePrefix", String.class);
    String rollInterval = context.get("hdfs.rollInterval", String.class);
    String rollSize = context.get("hdfs.rollSize", String.class);
    String rollCount = context.get("hdfs.rollCount", String.class);
    String batchSize = context.get("hdfs.batchSize", String.class);
    String txnEventMax = context.get("hdfs.txnEventMax", String.class);
    String codecName = context.get("hdfs.codeC", String.class);
    String fileType = context.get("hdfs.fileType", String.class);
    String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
    String writeFormat = context.get("hdfs.writeFormat", String.class);
    String appendTimeout = context.get("hdfs.appendTimeout", String.class);

    if (fileName == null)
      fileName = defaultFileName;
    // FIXME: Not transportable.
    this.path = new String(dirpath + "/" + fileName);

    if (rollInterval == null) {
      this.rollInterval = defaultRollInterval;
    } else {
      this.rollInterval = Long.parseLong(rollInterval);
    }

    if (rollSize == null) {
      this.rollSize = defaultRollSize;
    } else {
      this.rollSize = Long.parseLong(rollSize);
    }

    if (rollCount == null) {
      this.rollCount = defaultRollCount;
    } else {
      this.rollCount = Long.parseLong(rollCount);
    }

    if ((batchSize == null) || batchSize.equals("0")) {
      this.batchSize = defaultBatchSize;
    } else {
      this.batchSize = Long.parseLong(batchSize);
    }

    if ((txnEventMax == null) || txnEventMax.equals("0")) {
      this.txnEventMax = defaultTxnEventMax;
    } else {
      this.txnEventMax = Long.parseLong(txnEventMax);
    }

    if (codecName == null) {
      codeC = null;
      compType = CompressionType.NONE;
    } else {
      codeC = getCodec(codecName);
      // TODO : set proper compression type
      compType = CompressionType.BLOCK;
    }

    if (fileType == null) {
      this.fileType = defaultFileType;
    } else {
      this.fileType = fileType;
    }

    if (maxOpenFiles == null) {
      this.maxOpenFiles = defaultMaxOpenFiles;
    } else {
      this.maxOpenFiles = Integer.parseInt(maxOpenFiles);
    }

    if (writeFormat == null) {
      this.writeFormat = defaultWriteFormat;
    } else {
      this.writeFormat = writeFormat;
    }

    if (appendTimeout == null) {
      this.appendTimeout = defaultAppendTimeout;
    } else {
      this.appendTimeout = Long.parseLong(appendTimeout);
    }
  }

  private static boolean codecMatches(Class<? extends CompressionCodec> cls,
      String codecName) {
    String simpleName = cls.getSimpleName();
    if (cls.getName().equals(codecName)
        || simpleName.equalsIgnoreCase(codecName)) {
      return true;
    }
    if (simpleName.endsWith("Codec")) {
      String prefix = simpleName.substring(0,
          simpleName.length() - "Codec".length());
      if (prefix.equalsIgnoreCase(codecName)) {
        return true;
      }
    }
    return false;
  }

  private static CompressionCodec getCodec(String codecName) {
    Configuration conf = new Configuration();
    List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
        .getCodecClasses(conf);
    // Wish we could base this on DefaultCodec but appears not all codec's
    // extend DefaultCodec(Lzo)
    CompressionCodec codec = null;
    ArrayList<String> codecStrs = new ArrayList<String>();
    codecStrs.add("None");
    for (Class<? extends CompressionCodec> cls : codecs) {
      codecStrs.add(cls.getSimpleName());

      if (codecMatches(cls, codecName)) {
        try {
          codec = cls.newInstance();
        } catch (InstantiationException e) {
          LOG.error("Unable to instantiate " + cls + " class");
        } catch (IllegalAccessException e) {
          LOG.error("Unable to access " + cls + " class");
        }
      }
    }

    if (codec == null) {
      if (!codecName.equalsIgnoreCase("None")) {
        throw new IllegalArgumentException("Unsupported compression codec "
            + codecName + ".  Please choose from: " + codecStrs);
      }
    } else if (codec instanceof org.apache.hadoop.conf.Configurable) {
      // Must check instanceof codec as BZip2Codec doesn't inherit Configurable
      // Must set the configuration for Configurable objects that may or do use
      // native libs
      ((org.apache.hadoop.conf.Configurable) codec).setConf(conf);
    }
    return codec;
  }

  /*
   * Execute the append on a separate thread and wait for the completion for the specified amount of time
   * In case of timeout, cancel the append and throw an IOException
   */
  private BucketFlushStatus backgroundAppend(final BucketWriter bw, final Event e) throws IOException, InterruptedException {
    Future<BucketFlushStatus> future = executor.submit(new Callable<BucketFlushStatus>() {
      public BucketFlushStatus call() throws Exception {
        return bw.append(e);
      }
    });

    try {
      if (appendTimeout > 0) {
        return future.get(appendTimeout, TimeUnit.MILLISECONDS);
      } else {
        return future.get();
      }
    } catch (TimeoutException eT) {
      future.cancel(true);
      throw new IOException("Append timed out", eT);
    } catch (ExecutionException e1) {
      Throwable cause = e1.getCause();
      if (cause instanceof IOException) {
        throw (IOException) cause;
      } else if (cause instanceof InterruptedException) {
        throw (InterruptedException) cause;
      } else if (cause instanceof RuntimeException) {
        throw (RuntimeException) cause;
      } else {
        // we have a throwable that is not an exception. (such as a
        // NoClassDefFoundError)
        LOG.error("Got a throwable that is not an exception! Bailing out!",
            e1.getCause());
        throw new RuntimeException(e1.getCause());
      }
    } catch (CancellationException ce) {
      throw new InterruptedException(
          "Blocked append interrupted by rotation event");
    } catch (InterruptedException ex) {
      LOG.warn("Unexpected Exception " + ex.getMessage(), ex);
      throw (InterruptedException) ex;
    }
  }

  /**
   * Pull events out of channel and send it to HDFS - take at the most
   * txnEventMax, that's the maximum #events to hold in channel for a given
   * transaction - find the corresponding bucket for the event, ensure the file
   * is open - extract the pay-load and append to HDFS file
   */
  @Override
  public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    Map<String, BucketWriter> batchMap = new HashMap<String, BucketWriter>();
    BucketFlushStatus syncedUp;

    try {
      transaction.begin();
      for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
        Event event = channel.take();
        if (event == null)
          break;

        // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(path, event.getHeaders());
        BucketWriter bw = sfWriters.get(realPath);

        // we haven't seen this file yet, so open it and cache the handle
        if (bw == null) {
          HDFSWriter writer = myWriterFactory.getWriter(fileType);
          FlumeFormatter formatter = HDFSFormatterFactory
              .getFormatter(writeFormat);
          bw = new BucketWriter(rollInterval, rollSize, rollCount, batchSize);
          bw.open(realPath, codeC, compType, writer, formatter);
          sfWriters.put(realPath, bw);
        }

        // Write the data to HDFS
        syncedUp = backgroundAppend(bw, event);

        // keep track of the files in current batch that are not flushed
        // we need to flush all those at the end of the transaction
        if (syncedUp == BucketFlushStatus.BatchStarted)
          batchMap.put(bw.getFilePath(), bw);
        else if ((batchSize > 1)
            && (syncedUp == BucketFlushStatus.BatchFlushed))
          batchMap.remove(bw.getFilePath());
      }

      // flush any pending writes in the given transaction
      for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
        e.getValue().flush();
      }
      batchMap.clear();
      transaction.commit();
      return Status.READY;
    } catch (IOException eIO) {
      transaction.rollback();
      LOG.warn("HDFS IO error", eIO);
      return Status.BACKOFF;
    } catch (Exception e) {
      transaction.rollback();
      LOG.error("process failed", e);
      throw new EventDeliveryException(e.getMessage());
    } finally {
      // clear any leftover writes in the given transaction
      for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
        e.getValue().abort();
      }
      transaction.close();
    }
  }

  @Override
  public void stop() {
    try {
      for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
        LOG.info("Closing " + e.getKey());
        e.getValue().close();
      }
    } catch (IOException eIO) {
      LOG.warn("IOException in opening file", eIO);
    }
    executor.shutdown();
    try {
      while (executor.isTerminated() == false) {
        executor.awaitTermination(defaultAppendTimeout, TimeUnit.MILLISECONDS);
      }
    } catch (InterruptedException ex) {
      LOG.warn("shutdown interrupted" + ex.getMessage(), ex);
    }

    executor = null;
    super.stop();
  }

  @Override
  public void start() {
    executor = Executors.newFixedThreadPool(1);
    for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
      try {
        e.getValue().open();
      } catch (IOException eIO) {
        LOG.warn("IOException in opening file", eIO);
      }
    }
    super.start();
  }

}
TOP

Related Classes of org.apache.flume.sink.hdfs.HDFSEventSink$WriterLinkedHashMap

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.