Package com.cloudera.flume.handlers.syslog

Source Code of com.cloudera.flume.handlers.syslog.SyslogWireExtractor

/**
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.flume.handlers.syslog;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.handlers.text.EventExtractException;
import com.cloudera.flume.handlers.text.Extractor;
import com.google.common.base.Preconditions;

/**
* This uses the Extractor interface to take a DataInputStream an extract Events
* out. This in combination with removing syscalls to unixtime when
* instantiating EventImpls significantly improved performance when compared to
* the previous regex based approach.
*/
public class SyslogWireExtractor implements Extractor, SyslogConsts {
  static final Logger LOG =
      LoggerFactory.getLogger(SyslogWireExtractor.class);

  static SyslogWireExtractor format = new SyslogWireExtractor();

  static int calcSyslogPrio(Event e) {
    int slPrio = 0;

    byte[] fac = e.get(SYSLOG_FACILITY);
    if (fac == null || fac.length != 1) {
      slPrio = 1 * 8; // default to syslog facility.
    } else {
      slPrio = fac[0] * 8;
    }

    byte[] sev = e.get(SYSLOG_SEVERITY);
    if (sev == null || sev.length != 1) {
      slPrio += PRIO2SEVERITY[e.getPriority().ordinal()];
    } else {
      slPrio += sev[0];
    }
    return slPrio;
  }

  /**
   * This is a version that removes unneeded character encoding and decoding
   * steps.
   */
  public byte[] toBytes(Event e) {
    try {
      int slPrio = calcSyslogPrio(e);

      ByteArrayOutputStream bais = new ByteArrayOutputStream();
      bais.write('<');
      bais.write(("" + slPrio).getBytes());
      bais.write('>');
      bais.write(e.getBody());
      bais.write('\n');
      return bais.toByteArray();
    } catch (IOException e1) {
      // TODO Auto-generated catch block
      LOG.warn("Ran out of bytes during extraction", e1);
    }
    return null;
  }

  public static Event extractEvent(DataInputStream in)
      throws EventExtractException {
    return format.extract(in);
    // return format.extract(in);

  }

  enum Mode {
    START, PRIO, DATA, ERR
  };

  static Event buildEvent(StringBuilder prio, ByteArrayOutputStream baos) {

    int pri = Integer.parseInt(prio.toString());
    byte[] facility = { (byte) (pri / 8) };
    byte[] sev = { (byte) (pri % 8) };

    // 15.2s 14.9s 15.2s
    // Event e = new EventImpl(empty, 0,
    // SyslogWireFormat.SEVERITY[sev[0]], 0, "localhost");

    // 15.0s 15.0s 15.2s
    // Event e =
    // new EventImpl(baos.toByteArray(), 0, SEVERITY[sev[0]], 0, NetUtils
    // .localhost());

    // 15.7s 15.3s 14.9s
    // Event e = new EventImpl(baos.toByteArray(), 0,
    // SyslogWireFormat.SEVERITY[sev[0]], 0, "localhost");

    // // Pick correctness over efficiency

    // 27.1s (due to sys calls).
    Event e = new EventImpl(baos.toByteArray());

    // 24.5s 24.9s 25.6s (due to sys calls)
    // Event e = new EventImpl(empty);

    e.set(SYSLOG_FACILITY, facility);
    e.set(SYSLOG_SEVERITY, sev);

    return e;
  }

  /**
   * This is basically a state machine implementation of the extract function.
   * It uses a DataInputStream instead of a string to avoid the cost of string
   * and character encoding
   */
  public Event extract(DataInputStream in) throws EventExtractException {
    Preconditions.checkNotNull(in);
    Mode m = Mode.START;
    StringBuilder prio = new StringBuilder();
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    byte b = 0;
    long cnt = 0;
    try {
      while (true) {
        b = in.readByte();
        cnt++;
        switch (m) {
        case START:
          if (b == '<') {
            m = Mode.PRIO;
          } else {
            m = Mode.ERR;
          }
          break;
        case PRIO:
          if (b == '>') {
            m = Mode.DATA;
          } else {
            char ch = (char) b;
            if (Character.isDigit(ch)) {
              prio.append(ch); // stay in PRIO mode
            } else {
              m = Mode.ERR;
            }
          }
          break;
        case DATA:
          if (b == '\n') {
            Event e = buildEvent(prio, baos);
            return e;
          }

          baos.write(b);
          break;
        case ERR:
          // read until we get to a \n
          if (b == '\n') {
            throw new EventExtractException(
                "Failed to extract syslog wire entry");
          }
          // stay in Mode.ERR;
          break;
        }
      }
    } catch (EOFException e) {
      switch (m) {
      case ERR:
        // end of stream but was in error state? Throw extraction exception
        throw new EventExtractException("Failed to extract syslog wire entry");
      case DATA:
        // end of stream but had data, return it.
        return buildEvent(prio, baos);
      default:
        // if not in error state just return done;
        return null;
      }
    } catch (IOException e) {
      throw new EventExtractException("Failed to extract syslog wire entry: "
              + e.getMessage());
    }
  }

  public static byte[] formatEventToBytes(Event e) {
    return format.toBytes(e);
  }

}
TOP

Related Classes of com.cloudera.flume.handlers.syslog.SyslogWireExtractor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.