Package backtype.storm.spout

Source Code of backtype.storm.spout.ShellSpout

package backtype.storm.spout;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.generated.ShellComponent;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.ShellProcess;
import backtype.storm.utils.Utils;



public class ShellSpout implements ISpout {
  public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);

  private SpoutOutputCollector _collector;
  private String[] _command;
  private ShellProcess _process;

  public ShellSpout(ShellComponent component) {
    this(component.get_execution_command(), component.get_script());
  }

  public ShellSpout(String... command) {
    _command = command;
  }

  public void open(Map stormConf, TopologyContext context,
      SpoutOutputCollector collector) {
    _process = new ShellProcess(_command);
    _collector = collector;

    try {
      Number subpid = _process.launch(stormConf, context);
      LOG.info("Launched subprocess with pid " + subpid);
    } catch (IOException e) {
      throw new RuntimeException(
          "Error when launching multilang subprocess\n"
              + _process.getErrorsString(), e);
    }
  }

  public void close() {
    _process.destroy();
  }

  private Map _next;

  public void nextTuple() {
    if (_next == null) {
      _next = new HashMap();
      _next.put("command", "next");
    }

    querySubprocess(_next);
  }

  private Map _ack;

  public void ack(Object msgId) {
    if (_ack == null) {
      _ack = new HashMap();
      _ack.put("command", "ack");
    }

    _ack.put("id", msgId);
    querySubprocess(_ack);
  }

  private Map _fail;

  public void fail(Object msgId) {
    if (_fail == null) {
      _fail = new HashMap();
      _fail.put("command", "fail");
    }

    _fail.put("id", msgId);
    querySubprocess(_fail);
  }

  private void querySubprocess(Object query) {
    try {
      _process.writeMessage(query);

      while (true) {
        Map action = _process.readMessage();
        String command = (String) action.get("command");
        if (command.equals("sync")) {
          return;
        } else if (command.equals("log")) {
          String msg = (String) action.get("msg");
          LOG.info("Shell msg: " + msg);
        } else if (command.equals("emit")) {
          String stream = (String) action.get("stream");
          if (stream == null)
            stream = Utils.DEFAULT_STREAM_ID;
          Long task = (Long) action.get("task");
          List<Object> tuple = (List) action.get("tuple");
          Object messageId = (Object) action.get("id");
          if (task == null) {
            List<Integer> outtasks = _collector.emit(stream, tuple,
                messageId);
            Object need_task_ids = action.get("need_task_ids");
            if (need_task_ids == null
                || ((Boolean) need_task_ids).booleanValue()) {
              _process.writeMessage(outtasks);
            }
          } else {
            _collector.emitDirect((int) task.longValue(), stream,
                tuple, messageId);
          }
        }
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void activate() {
  }

  @Override
  public void deactivate() {
  }
}
TOP

Related Classes of backtype.storm.spout.ShellSpout

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.