Package org.kitesdk.data.crunch

Source Code of org.kitesdk.data.crunch.DatasetSourceTarget

/**
* Copyright 2014 Cloudera Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kitesdk.data.crunch;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.avro.generic.GenericData;
import org.apache.crunch.ReadableData;
import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.View;
import org.kitesdk.data.mapreduce.DatasetKeyInputFormat;
import org.kitesdk.data.spi.LastModifiedAccessor;
import org.kitesdk.data.spi.SizeAccessor;
import org.kitesdk.data.spi.filesystem.FileSystemDataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DatasetSourceTarget<E> extends DatasetTarget<E> implements ReadableSourceTarget<E> {

  private static final Logger LOG = LoggerFactory
      .getLogger(DatasetSourceTarget.class);

  private View<E> view;
  private FormatBundle formatBundle;
  private AvroType<E> avroType;

  public DatasetSourceTarget(View<E> view) {
    this(view, view.getType());
  }

  @SuppressWarnings("unchecked")
  public DatasetSourceTarget(View<E> view, Class<E> type) {
    this(view, toAvroType(view, type));
  }

  public DatasetSourceTarget(URI uri, Class<E> type) {
    this(Datasets.load(uri, type));
  }

  public DatasetSourceTarget(View<E> view, AvroType<E> avroType) {
    super(view);

    this.view = view;
    this.avroType = avroType;

    Configuration temp = new Configuration(false /* use an empty conf */ );
    DatasetKeyInputFormat.configure(temp).readFrom(view);
    this.formatBundle = inputBundle(temp);

    Dataset<E> dataset = view.getDataset();

    // Disable CombineFileInputFormat in Crunch unless we're dealing with Avro or Parquet files
    Format format = dataset.getDescriptor().getFormat();
    boolean isAvroOrParquetFile = (dataset instanceof FileSystemDataset)
        && (Formats.AVRO.equals(format) || Formats.PARQUET.equals(format));
    formatBundle.set(RuntimeParameters.DISABLE_COMBINE_FILE, Boolean.toString(!isAvroOrParquetFile));
  }

  public DatasetSourceTarget(URI uri, AvroType<E> avroType) {
    this(Datasets.load(uri, avroType.getTypeClass()), avroType);
  }

  @SuppressWarnings("unchecked")
  private static <E> AvroType<E> toAvroType(View<E> view, Class<E> type) {
    if (type.isAssignableFrom(GenericData.Record.class)) {
      return (AvroType<E>) Avros.generics(
        view.getDataset().getDescriptor().getSchema());
    } else {
      return Avros.records(type);
    }
  }

  @Override
  public Source<E> inputConf(String key, String value) {
    formatBundle.set(key, value);
    return this;
  }

  @Override
  public PType<E> getType() {
    return avroType;
  }

  @Override
  public Converter<?, ?, ?, ?> getConverter() {
    return new KeyConverter<E>(avroType);
  }

  @Override
  @SuppressWarnings("unchecked")
  public void configureSource(Job job, int inputId) throws IOException {
    Configuration conf = job.getConfiguration();
    if (inputId == -1) {
      job.setMapperClass(CrunchMapper.class);
      job.setInputFormatClass(formatBundle.getFormatClass());
      formatBundle.configure(conf);
    } else {
      Path dummy = new Path("/view/" + view.getDataset().getName());
      CrunchInputs.addInputPath(job, dummy, formatBundle, inputId);
    }
  }

  @Override
  public long getSize(Configuration configuration) {
    if (view instanceof SizeAccessor) {
      return ((SizeAccessor) view).getSize();
    }
    LOG.warn("Cannot determine size for view: " + toString());
    return 1000L * 1000L * 1000L; // fallback to HBase default size
  }

  @Override
  public long getLastModifiedAt(Configuration configuration) {
    if (view instanceof LastModifiedAccessor) {
      return ((LastModifiedAccessor) view).getLastModified();
    }
    LOG.warn("Cannot determine last modified time for source: " + toString());
    return -1;
  }

  @Override
  public Iterable<E> read(Configuration configuration) throws IOException {
    // TODO: what to do with Configuration? create new view?
    return view.newReader(); // TODO: who calls close?
  }

  @Override
  public ReadableData<E> asReadable() {
    return new ReadableData<E>() {

      @Override
      public Set<SourceTarget<?>> getSourceTargets() {
        return ImmutableSet.<SourceTarget<?>>of(DatasetSourceTarget.this);
      }

      @Override
      public void configure(Configuration conf) {
        // TODO: optimize for file-based datasets by using distributed cache
      }

      @Override
      public Iterable<E> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
        return view.newReader();
      }
    };
  }

  @Override
  public SourceTarget<E> conf(String key, String value) {
    inputConf(key, value);
    outputConf(key, value);
    return this;
  }

  /**
   * Builds a FormatBundle for DatasetKeyInputFormat by copying a temp config.
   *
   * All properties will be copied from the temporary configuration
   *
   * @param conf A Configuration that will be copied
   * @return a FormatBundle with the contents of conf
   */
  private static FormatBundle<DatasetKeyInputFormat> inputBundle(Configuration conf) {
    FormatBundle<DatasetKeyInputFormat> bundle = FormatBundle
        .forInput(DatasetKeyInputFormat.class);
    for (Map.Entry<String, String> entry : conf) {
      bundle.set(entry.getKey(), entry.getValue());
    }
    return bundle;
  }
}
TOP

Related Classes of org.kitesdk.data.crunch.DatasetSourceTarget

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.