Package org.apache.flume.sink.solr.morphline

Source Code of org.apache.flume.sink.solr.morphline.MorphlineSink

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.solr.morphline;

import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.kitesdk.morphline.api.Command;

/**
* Flume sink that extracts search documents from Flume events and processes them using a morphline
* {@link Command} chain.
*/
public class MorphlineSink extends AbstractSink implements Configurable {

  private int maxBatchSize = 1000;
  private long maxBatchDurationMillis = 1000;
  private String handlerClass;
  private MorphlineHandler handler;
  private Context context;
  private SinkCounter sinkCounter;

  public static final String BATCH_SIZE = "batchSize";
  public static final String BATCH_DURATION_MILLIS = "batchDurationMillis";
  public static final String HANDLER_CLASS = "handlerClass";
 
  private static final Logger LOGGER = LoggerFactory.getLogger(MorphlineSink.class);

  public MorphlineSink() {
    this(null);
  }

  /** For testing only */
  protected MorphlineSink(MorphlineHandler handler) {
    this.handler = handler;
  }

  @Override
  public void configure(Context context) {
    this.context = context;
    maxBatchSize = context.getInteger(BATCH_SIZE, maxBatchSize);
    maxBatchDurationMillis = context.getLong(BATCH_DURATION_MILLIS, maxBatchDurationMillis);
    handlerClass = context.getString(HANDLER_CLASS, MorphlineHandlerImpl.class.getName());   
    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }
  }

  /**
   * Returns the maximum number of events to take per flume transaction;
   * override to customize
   */
  private int getMaxBatchSize() {
    return maxBatchSize;
  }

  /** Returns the maximum duration per flume transaction; override to customize */
  private long getMaxBatchDurationMillis() {
    return maxBatchDurationMillis;
  }

  @Override
  public synchronized void start() {
    LOGGER.info("Starting Morphline Sink {} ...", this);
    sinkCounter.start();
    if (handler == null) {
      MorphlineHandler tmpHandler;
      try {
        tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
      } catch (Exception e) {
        throw new ConfigurationException(e);
      }
      tmpHandler.configure(context);
      handler = tmpHandler;
    }   
    super.start();
    LOGGER.info("Morphline Sink {} started.", getName());
  }

  @Override
  public synchronized void stop() {
    LOGGER.info("Morphline Sink {} stopping...", getName());
    try {
      if (handler != null) {
        handler.stop();
      }
      sinkCounter.stop();
      LOGGER.info("Morphline Sink {} stopped. Metrics: {}, {}", getName(), sinkCounter);
    } finally {
      super.stop();
    }
  }

  @Override
  public Status process() throws EventDeliveryException {
    int batchSize = getMaxBatchSize();
    long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis();
    Channel myChannel = getChannel();
    Transaction txn = myChannel.getTransaction();
    txn.begin();
    boolean isMorphlineTransactionCommitted = true;
    try {
      int numEventsTaken = 0;
      handler.beginTransaction();
      isMorphlineTransactionCommitted = false;

      // repeatedly take and process events from the Flume queue
      for (int i = 0; i < batchSize; i++) {
        Event event = myChannel.take();
        if (event == null) {
          break;
        }
        sinkCounter.incrementEventDrainAttemptCount();
        numEventsTaken++;
        LOGGER.debug("Flume event: {}", event);
        //StreamEvent streamEvent = createStreamEvent(event);
        handler.process(event);
        if (System.currentTimeMillis() >= batchEndTime) {
          break;
        }
      }

      // update metrics
      if (numEventsTaken == 0) {
        sinkCounter.incrementBatchEmptyCount();
      }
      if (numEventsTaken < batchSize) {
        sinkCounter.incrementBatchUnderflowCount();
      } else {
        sinkCounter.incrementBatchCompleteCount();
      }
      handler.commitTransaction();
      isMorphlineTransactionCommitted = true;
      txn.commit();
      sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
      return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
    } catch (Throwable t) {
      // Ooops - need to rollback and back off
      LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName()
            + ". Exception follows.", t);
      try {
        if (!isMorphlineTransactionCommitted) {
          handler.rollbackTransaction();
        }
      } catch (Throwable t2) {
        LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " +
            "Exception follows.", t2);
      } finally {
        try {
          txn.rollback();
        } catch (Throwable t4) {
          LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback Flume transaction. " +
              "Exception follows.", t4);
        }
      }

      if (t instanceof Error) {
        throw (Error) t; // rethrow original exception
      } else if (t instanceof ChannelException) {
        return Status.BACKOFF;
      } else {
        throw new EventDeliveryException("Failed to send events", t); // rethrow and backoff
      }
    } finally {
      txn.close();
    }
  }
 
  @Override
  public String toString() {
    int i = getClass().getName().lastIndexOf('.') + 1;
    String shortClassName = getClass().getName().substring(i);
    return getName() + " (" + shortClassName + ")";
  }

}
TOP

Related Classes of org.apache.flume.sink.solr.morphline.MorphlineSink

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.