Package backtype.hadoop

Source Code of backtype.hadoop.ElasticSearchInputFormat$ElasticSearchRecordReader

package backtype.hadoop;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.search.SearchHit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
* A Hadoop InputFormat to read data from an Elasticsearch index. The RecordReader divulges records
* where the key is the record id in elasticsearch and the value is a json string of the (source)
* record contents.
*/
public class ElasticSearchInputFormat implements Configurable, InputFormat<Text, Text> {

    static Log LOG = LogFactory.getLog(ElasticSearchInputFormat.class);
    private Configuration conf = null;

    private Node node;
    private Client client;

    private Integer requestSize;
    private Long numHits;
    private Long numSplits;
    private Long numSplitRecords;
    private String indexName;
    private String objType;
    private String queryString;

    private static final String ES_REQUEST_SIZE = "elasticsearch.request.size";
    // number of records to fetch at one time
    private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits";
    // number of hadoop map tasks to launch
    private static final String ES_QUERY_STRING = "elasticsearch.query.string";

    private static final String ES_CONFIG_NAME = "elasticsearch.yml";
    private static final String ES_PLUGINS_NAME = "plugins";
    private static final String ES_INDEX_NAME = "elasticsearch.index.name";
    private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
    private static final String ES_CONFIG = "es.config";
    private static final String ES_PLUGINS = "es.path.plugins";
    private static final String SLASH = "/";

    public RecordReader<Text, Text> getRecordReader(InputSplit is, JobConf jc, Reporter reporter) {
        return new ElasticSearchRecordReader();
    }

    /** The number of splits is specified in the Hadoop configuration object. */
    public InputSplit[] getSplits(JobConf conf, int ignored) throws IOException {

        setConf(conf);

        List<InputSplit> splits = new ArrayList<InputSplit>(numSplits.intValue());

        for (int i = 0; i < numSplits; i++) {
            Long size = (numSplitRecords == 1) ? 1 : numSplitRecords - 1;
            splits.add(new ElasticSearchSplit(queryString, i * numSplitRecords, size));
        }
        if (numHits % numSplits > 0) {
            splits.add(new ElasticSearchSplit(queryString,
                numSplits * numSplitRecords, numHits % numSplits - 1));
        }
        LOG.info("Created [" + splits.size() + "] splits for [" + numHits + "] hits");

        return splits.toArray(new InputSplit[splits.size()]);
    }

    /**
     * Sets the configuration object, opens a connection to elasticsearch, and initiates the initial
     * search request.
     */
    public void setConf(Configuration configuration) {
        this.conf = configuration;
        this.indexName = conf.get(ES_INDEX_NAME);
        this.objType = conf.get(ES_OBJECT_TYPE);
        this.requestSize = Integer.parseInt(conf.get(ES_REQUEST_SIZE));
        this.numSplits = Long.parseLong(conf.get(ES_NUM_SPLITS));
        this.queryString = conf.get(ES_QUERY_STRING);

        //
        // Need to ensure that this is set in the hadoop configuration so we can
        // instantiate a local client. The reason is that no files are in the
        // distributed cache when this is called.
        //
        System.setProperty(ES_CONFIG, conf.get(ES_CONFIG));
        System.setProperty(ES_PLUGINS, conf.get(ES_PLUGINS));

        start_embedded_client();

        initiate_search();
    }

    public Configuration getConf() {
        return conf;
    }

    /** Starts an embedded elasticsearch client (ie. data = false) */
    private void start_embedded_client() {
        LOG.info("Starting embedded elasticsearch client ...");
        this.node = NodeBuilder.nodeBuilder().client(true).node();
        this.client = node.client();
    }

    private void initiate_search() {
        SearchResponse response =
            client.prepareSearch(indexName).setTypes(objType).setSearchType(SearchType.COUNT)
                .setQuery(QueryBuilders.queryString(queryString)).setSize(requestSize).execute()
                .actionGet();
        this.numHits = response.hits().totalHits();
        if (numSplits > numHits) {
            numSplits = numHits; // This could be bad
        }
        this.numSplitRecords = (numHits / numSplits);
    }

    protected class ElasticSearchRecordReader implements RecordReader<Text, Text> {

        private Node node;
        private Client client;

        private String indexName;
        private String objType;
        private Long numSplitRecords;
        private Integer requestSize;
        private Text currentKey;
        private Text currentValue;
        private Integer recordsRead;
        private Iterator<SearchHit> hitsItr = null;


        private String queryString;
        private Long from;
        private Long recsToRead;

        public ElasticSearchRecordReader() {
        }

        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
            Configuration conf = context.getConfiguration();
            this.indexName = conf.get(ES_INDEX_NAME);
            this.objType = conf.get(ES_OBJECT_TYPE);
            LOG.info("Initializing elasticsearch record reader on index [" + indexName
                     + "] and object type [" + objType + "]");

            //
            // Fetches elasticsearch.yml and the plugins directory from the distributed cache
            //
            try {
                String taskConfigPath = ElasticUtil.fetchFileFromCache(ES_CONFIG_NAME, conf);
                LOG.info("Using [" + taskConfigPath + "] as es.config");
                String taskPluginsPath = ElasticUtil.fetchArchiveFromCache(ES_PLUGINS_NAME, conf);
                LOG.info("Using [" + taskPluginsPath + "] as es.plugins.dir");
                System.setProperty(ES_CONFIG, taskConfigPath);
                System.setProperty(ES_PLUGINS, taskPluginsPath + SLASH + ES_PLUGINS_NAME);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

            queryString = ((ElasticSearchSplit) split).getQueryString();
            from = ((ElasticSearchSplit) split).getFrom();
            recsToRead = ((ElasticSearchSplit) split).getSize();

            LOG.info("elasticsearch record reader: query [" + queryString + "], from [" + from
                     + "], size [" + recsToRead + "]");
            start_embedded_client();
            recordsRead = 0;
        }

        /** Starts an embedded elasticsearch client (ie. data = false) */
        private void start_embedded_client() {
            LOG.info("Starting embedded elasticsearch client ...");
            this.node = NodeBuilder.nodeBuilder().client(true).node();
            this.client = node.client();
        }

        private Iterator<SearchHit> fetchNextHits() {
            SearchResponse response =
                client.prepareSearch(indexName).setTypes(objType).setFrom(from.intValue())
                    .setSize(recsToRead.intValue()).setQuery(QueryBuilders.queryString(queryString))
                    .execute().actionGet();
            return response.hits().iterator();
        }

        public boolean next(Text key, Text val) throws IOException {
            if (hitsItr != null) {
                if (recordsRead < recsToRead) {
                    if (hitsItr.hasNext()) {
                        SearchHit hit = hitsItr.next();
                        key.set(hit.id());
                        val.set(hit.sourceAsString());
                        recordsRead += 1;
                        return true;
                    }
                } else {
                    hitsItr = null;
                }
            } else {
                if (recordsRead < recsToRead) {
                    hitsItr = fetchNextHits();
                    if (hitsItr.hasNext()) {
                        SearchHit hit = hitsItr.next();
                        key.set(hit.id());
                        val.set(hit.sourceAsString());
                        recordsRead += 1;
                        return true;
                    }
                }
            }
            return false;
        }

        public Text createKey() {
            return new Text();
        }

        public Text createValue() {
            return new Text();
        }

        public long getPos() throws IOException {
            return 0;
        }

        public float getProgress() throws IOException {
            return recordsRead;
        }

        public void close() throws IOException {
            LOG.info("Closing record reader");
            client.close();
            LOG.info("Client is closed");
            if (node != null) {
                node.close();
            }
            LOG.info("Record reader closed.");
        }


    }
}
TOP

Related Classes of backtype.hadoop.ElasticSearchInputFormat$ElasticSearchRecordReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.