Package org.elasticsearch.hadoop.pig

Source Code of org.elasticsearch.hadoop.pig.EsStorage

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.pig;

import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.util.IOUtils;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;

/**
* Pig storage for reading and writing data into an ElasticSearch index.
* Uses the tuple implied schema to create the resulting JSON string sent to ElasticSearch.
* <p/>
* Typical usage is:
*
* <pre>
* A = LOAD 'twitter/_search?q=kimchy' USING org.elasticsearch.hadoop.pig.ESStorage();
* </pre>
* <pre>
* STORE A INTO '<index>' USING org.elasticsearch.hadoop.pig.ESStorage();
* </pre>
*
* The ElasticSearch host/port can be specified through Hadoop properties (see package description)
* or passed to the {@link #EsStorage(String...)} constructor.
*/
public class EsStorage extends LoadFunc implements LoadMetadata, LoadPushDown, StoreFuncInterface, StoreMetadata {

    private static final Log log = LogFactory.getLog(EsStorage.class);
    private final boolean trace = log.isTraceEnabled();

    private Properties properties = new Properties();

    private String relativeLocation;
    private String signature;
    private ResourceSchema schema;
    private RecordReader<String, Map<?, ?>> reader;
    private RecordWriter<Object, Object> writer;
    private PigTuple pigTuple;

    private List<String> aliasesTupleNames;
    private boolean IS_ES_10;

    public EsStorage() {
        this(new String[0]);
    }

    public EsStorage(String... configuration) {
        if (!ObjectUtils.isEmpty(configuration)) {
            try {
                for (String string : configuration) {
                    // replace ; with line separators
                    properties.load(new StringReader(string));
                }
            } catch (IOException ex) {
                throw new EsHadoopIllegalArgumentException("Cannot parse options " + Arrays.toString(configuration), ex);
            }
        }
    }

    @Override
    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
        return location;
    }

    @Override
    public void setStoreFuncUDFContextSignature(String signature) {
        this.signature = signature;
    }

    private Properties getUDFProperties() {
        return UDFContext.getUDFContext().getUDFProperties(getClass(), new String[] { signature });
    }

    @Override
    public void checkSchema(ResourceSchema s) throws IOException {
        Properties props = getUDFProperties();

        // save schema to back-end for JSON translation
        if (!StringUtils.hasText(props.getProperty(ResourceSchema.class.getName()))) {
            // save the schema as String (used JDK serialization since toString() screws up the signature - see the testcase)
            props.setProperty(ResourceSchema.class.getName(), IOUtils.serializeToBase64(s));
        }
    }

    @Override
    public void setStoreLocation(String location, Job job) throws IOException {
        init(location, job, false);
    }

    private void init(String location, Job job, boolean read) {
        Settings settings = SettingsManager.loadFrom(job.getConfiguration()).merge(properties);

        settings = (read ? settings.setResourceRead(location) : settings.setResourceWrite(location));

        boolean changed = false;
        InitializationUtils.checkIdForOperation(settings);

        changed |= InitializationUtils.setValueWriterIfNotSet(settings, PigValueWriter.class, log);
        changed |= InitializationUtils.setValueReaderIfNotSet(settings, PigValueReader.class, log);
        changed |= InitializationUtils.setBytesConverterIfNeeded(settings, PigBytesConverter.class, log);
        changed |= InitializationUtils.setFieldExtractorIfNotSet(settings, PigFieldExtractor.class, log);

        IS_ES_10 = SettingsUtils.isEs10(settings);
    }

    @SuppressWarnings("unchecked")
    @Override
    public OutputFormat<Object, Map<Writable, Writable>> getOutputFormat() throws IOException {
        return new EsOutputFormat();
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public void prepareToWrite(RecordWriter writer) throws IOException {
        this.writer = writer;

        Properties props = getUDFProperties();
        String s = props.getProperty(ResourceSchema.class.getName());
        if (!StringUtils.hasText(s)) {
            log.warn("No resource schema found; using an empty one....");
            this.schema = new ResourceSchema();
        }
        else {
            this.schema = IOUtils.deserializeFromBase64(s);
        }
        this.pigTuple = new PigTuple(schema);
    }

    // TODO: make put more lenient (if the schema is not available just shove everything on the existing type or as a big charray)
    @Override
    public void putNext(Tuple t) throws IOException {
        pigTuple.setTuple(t);

        if (trace) {
            log.trace("Writing out tuple " + t);
        }
        try {
            writer.write(null, pigTuple);
        } catch (InterruptedException ex) {
            throw new EsHadoopIllegalArgumentException("interrupted", ex);
        }
    }

    @Override
    public void cleanupOnFailure(String location, Job job) throws IOException {
        // no special clean-up required
    }

    // added in Pig 11.x
    public void cleanupOnSuccess(String location, Job job) throws IOException {
        //no-op
    }

    //
    // Store metadata - kinda of useless due to its life-cycle
    //

    @Override
    public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException {
        // no-op
    }

    @Override
    public void storeSchema(ResourceSchema schema, String location, Job job) throws IOException {
        // no-op
        // this method is called _after_ the data (instead of before) has been written, which makes it useless
    }


    //
    // LoadFunc
    //
    public void setLocation(String location, Job job) throws IOException {
        init(location, job, true);

        Configuration cfg = job.getConfiguration();

        Settings settings = SettingsManager.loadFrom(cfg);
        IS_ES_10 = SettingsUtils.isEs10(settings);

        if (settings.getScrollFields() != null) {
            return;
        }

        extractProjection(cfg);
    }

    @Override
    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
        // TODO: potentially do additional parsing here
        relativeLocation = location;
        return relativeLocation;
    }

    @SuppressWarnings("rawtypes")
    @Override
    public InputFormat getInputFormat() throws IOException {
        return new EsPigInputFormat();
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
        this.reader = reader;
        aliasesTupleNames = StringUtils.tokenize(getUDFProperties().getProperty(
                InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS));
    }

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    public Tuple getNext() throws IOException {
        try {
            if (!reader.nextKeyValue()) {
                return null;
            }

            Map dataMap = reader.getCurrentValue();
            Tuple tuple = TupleFactory.getInstance().newTuple(dataMap.size());

            if (dataMap.isEmpty()) {
                return tuple;
            }

            if (!aliasesTupleNames.isEmpty()) {
                for (int i = 0; i < aliasesTupleNames.size(); i++) {
                    if (IS_ES_10) {
                        Object result = dataMap;
                        // check for multi-level alias
                        for (String level : StringUtils.tokenize(aliasesTupleNames.get(i), ".")) {
                            if (result instanceof Map) {
                                result = ((Map) result).get(level);
                                if (result == null) {
                                    break;
                                }
                            }
                        }
                        tuple.set(i, result);
                    }
                    // ES 0.90.x / fields
                    else {
                        tuple.set(i, dataMap.get(aliasesTupleNames.get(i)));
                    }
                }
            }
            else {
                int i = 0;
                Set<Entry<?, ?>> entrySet = dataMap.entrySet();
                for (Map.Entry entry : entrySet) {
                    tuple.set(i++, entry.getValue());
                }
            }

            if (trace) {
                log.trace("Reading out tuple " + tuple);
            }
            return tuple;

        } catch (InterruptedException ex) {
            throw new IOException("interrupted", ex);
        }
    }

    //
    // LoadPushDown
    //
    @Override
    public List<OperatorSet> getFeatures() {
        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
    }

    @Override
    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
        String fields = PigUtils.asProjection(requiredFieldList, properties);
        getUDFProperties().setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, fields);
        if (log.isTraceEnabled()) {
            log.trace(String.format("Given push projection; saving field projection [%s]", fields));
        }
        return new RequiredFieldResponse(true);
    }

    @Override
    public ResourceSchema getSchema(String location, Job job) throws IOException {
        return null;
    }

    @Override
    public ResourceStatistics getStatistics(String location, Job job) throws IOException {
        return null;
    }

    @Override
    public String[] getPartitionKeys(String location, Job job) throws IOException {
        return null;
    }

    @Override
    public void setPartitionFilter(Expression partitionFilter) throws IOException {
        //
    }

    @Override
    public void setUDFContextSignature(String signature) {
        this.signature = signature;
    }


    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void extractProjection(Configuration cfg) throws IOException {
        String fields = getUDFProperties().getProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS);
        if (fields != null) {
            if (log.isDebugEnabled()) {
                log.debug(String.format("Found field project [%s] in UDF properties", fields));
            }

            cfg.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, fields);
            return;
        }

        return;
//
//        This discovery process is unreliable since the schema is not passed over but rather then next extraction operation
//        As such, the store will be forced to load all the fields
//
//        if (log.isTraceEnabled()) {
//            log.trace("No field projection specified, looking for existing stores...");
//        }
//
//        List pigInputs = (List) ObjectSerializer.deserialize(cfg.get("pig.inputs"));
//        // can't determine alias
//        if (pigInputs == null || pigInputs.size() != 1) {
//            return;
//        }
//
//        String mapValues = cfg.get(JobControlCompiler.PIG_MAP_STORES);
//        String reduceValues = cfg.get(JobControlCompiler.PIG_REDUCE_STORES);
//
//        List<POStore> mapStore = Collections.emptyList();
//        List<POStore> reduceStore = Collections.emptyList();
//
//        if (StringUtils.hasText(mapValues)) {
//            mapStore = (List<POStore>) ObjectSerializer.deserialize(mapValues);
//        }
//        if (StringUtils.hasText(reduceValues)) {
//            reduceStore = (List<POStore>) ObjectSerializer.deserialize(reduceValues);
//        }
//        if (mapStore.size() + reduceStore.size() > 1) {
//            log.warn("Too many POstores - cannot properly determine Pig schema");
//        }
//        else if (mapStore.size() + reduceStore.size() == 0) {
//            log.warn("No POstores - cannot properly determine Pig schema");
//        }
//        else {
//            POStore store = (reduceStore.isEmpty() ? mapStore.get(0) : reduceStore.get(0));
//            // no schema specified - load all fields (or the default)
//            if (store.getSchema() == null) {
//                if (log.isTraceEnabled()) {
//                    log.trace(String.format("Store [%s] defines no schema; falling back to default projection", store));
//                }
//                return;
//            }
//            else {
//                fields = PigUtils.asProjection(store.getSchema(), properties);
//            }
//            if (log.isDebugEnabled()) {
//                log.debug(String.format("Found field projection [%s] in store %s", fields, store));
//            }
//            cfg.set(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, fields);
//            getUDFProperties().setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, fields);
//        }
    }
}
TOP

Related Classes of org.elasticsearch.hadoop.pig.EsStorage

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.