Package org.apache.flume.sink

Source Code of org.apache.flume.sink.AvroSink

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flume.sink;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;

import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSink;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* <p>
* A {@link Sink} implementation that can send events to an Avro server (such as
* Flume's <tt>AvroSource</tt>).
* </p>
* <p>
* This sink forms one half of Flume's tiered collection support. Events sent to
* this sink are turned into {@link AvroFlumeEvent}s and sent to the configured
* hostname / port pair using Avro's {@link NettyTransceiver}. The intent is
* that the destination is an instance of Flume's <tt>AvroSource</tt> which
* allows Flume to nodes to forward to other Flume nodes forming a tiered
* collection infrastructure. Of course, nothing prevents one from using this
* sink to speak to other custom built infrastructure that implements the same
* Avro protocol (specifically {@link AvroSourceProtocol}).
* </p>
* <p>
* Events are taken from the configured {@link Channel} in batches of the
* configured <tt>batch-size</tt>. The batch size has no theoretical limits
* although all events in the batch <b>must</b> fit in memory. Generally, larger
* batches are far more efficient, but introduce a slight delay (measured in
* millis) in delivery. The batch behavior is such that underruns (i.e. batches
* smaller than the configured batch size) are possible. This is a compromise
* made to maintain low latency of event delivery. If the channel returns a null
* event, meaning it is empty, the batch is immediately sent, regardless of
* size. Batch underruns are tracked in the metrics. Empty batches do not incur
* an RPC roundtrip.
* </p>
* <p>
* <b>Configuration options</b>
* </p>
* <table>
* <tr>
* <th>Parameter</th>
* <th>Description</th>
* <th>Unit / Type</th>
* <th>Default</th>
* </tr>
* <tr>
* <td><tt>hostname</tt></td>
* <td>The hostname to which events should be sent.</td>
* <td>Hostname or IP / String</td>
* <td>none (required)</td>
* </tr>
* <tr>
* <td><tt>port</tt></td>
* <td>The port to which events should be sent on <tt>hostname</tt>.</td>
* <td>TCP port / int</td>
* <td>none (required)</td>
* </tr>
* <tr>
* <td><tt>batch-size</tt></td>
* <td>The maximum number of events to send per RPC.</td>
* <td>events / int</td>
* <td>100</td>
* </tr>
* </table>
* <p>
* <b>Metrics</b>
* </p>
* <p>
* TODO
* </p>
*/
public class AvroSink extends AbstractSink implements PollableSink,
    Configurable {

  private static final Logger logger = LoggerFactory.getLogger(AvroSink.class);
  private static final Integer defaultBatchSize = 100;

  private String hostname;
  private Integer port;
  private Integer batchSize;

  private AvroSourceProtocol client;
  private Transceiver transceiver;
  private CounterGroup counterGroup;

  public AvroSink() {
    counterGroup = new CounterGroup();
  }

  @Override
  public void configure(Context context) {
    hostname = context.get("hostname", String.class);
    port = Integer.parseInt(context.get("port", String.class));
    batchSize = Integer.parseInt(context.get("batch-size", String.class));

    if (batchSize == null) {
      batchSize = defaultBatchSize;
    }

    Preconditions.checkState(hostname != null, "No hostname specified");
    Preconditions.checkState(port != null, "No port specified");
  }

  private void createConnection() throws IOException {
    if (transceiver == null) {
      logger.debug("Creating new tranceiver connection to hostname:{} port:{}",
          hostname, port);
      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
    }

    if (client == null) {
      logger.debug("Creating Avro client with tranceiver:{}", transceiver);
      client = SpecificRequestor.getClient(AvroSourceProtocol.class,
          transceiver);
    }
  }

  private void destroyConnection() {
    if (transceiver != null) {
      logger.debug("Destroying tranceiver:{}", transceiver);
      try {
        transceiver.close();
      } catch (IOException e) {
        logger
            .error(
                "Attempt to clean up avro tranceiver after client error failed. Exception follows.",
                e);
      }

      transceiver = null;
    }

    client = null;
  }

  @Override
  public void start() {
    logger.info("Avro sink starting");

    try {
      createConnection();
    } catch (Exception e) {
      logger.error("Unable to create avro client using hostname:" + hostname
          + " port:" + port + ". Exception follows.", e);

      /* Try to prevent leaking resources. */
      destroyConnection();

      /* FIXME: Mark ourselves as failed. */
      return;
    }

    super.start();

    logger.debug("Avro sink started");
  }

  @Override
  public void stop() {
    logger.info("Avro sink stopping");

    destroyConnection();

    super.stop();

    logger.debug("Avro sink stopped. Metrics:{}", counterGroup);
  }

  @Override
  public Status process() throws EventDeliveryException {
    Status status = Status.READY;
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();

    try {
      transaction.begin();
      createConnection();

      List<AvroFlumeEvent> batch = new LinkedList<AvroFlumeEvent>();

      for (int i = 0; i < batchSize; i++) {
        Event event = channel.take();

        if (event == null) {
          counterGroup.incrementAndGet("batch.underflow");
          break;
        }

        AvroFlumeEvent avroEvent = new AvroFlumeEvent();

        avroEvent.body = ByteBuffer.wrap(event.getBody());
        avroEvent.headers = new HashMap<CharSequence, CharSequence>();

        for (Entry<String, String> entry : event.getHeaders().entrySet()) {
          avroEvent.headers.put(entry.getKey(), entry.getValue());
        }

        batch.add(avroEvent);
      }

      if (batch.isEmpty()) {
        counterGroup.incrementAndGet("batch.empty");
        status = Status.BACKOFF;
      } else {
        if (!client.appendBatch(batch).equals(
            org.apache.flume.source.avro.Status.OK)) {
          throw new AvroRemoteException("RPC communication returned FAILED");
        }
      }

      transaction.commit();
      counterGroup.incrementAndGet("batch.success");
    } catch (ChannelException e) {
      transaction.rollback();
      logger.error("Unable to get event from channel. Exception follows.", e);
      status = Status.BACKOFF;
    } catch (AvroRemoteException e) {
      transaction.rollback();
      logger.error("Unable to send event batch. Exception follows.", e);
      status = Status.BACKOFF;
    } catch (Exception e) {
      transaction.rollback();
      logger.error(
          "Unable to communicate with Avro server. Exception follows.", e);
      status = Status.BACKOFF;
      destroyConnection();
    } finally {
      transaction.close();
    }

    return status;
  }

}
TOP

Related Classes of org.apache.flume.sink.AvroSink

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.