Package com.google.appengine.tools.mapreduce.inputs

Source Code of com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLevelDbInputReader

package com.google.appengine.tools.mapreduce.inputs;

import static com.google.appengine.tools.mapreduce.impl.MapReduceConstants.GCS_RETRY_PARAMETERS;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.appengine.tools.cloudstorage.GcsFileMetadata;
import com.google.appengine.tools.cloudstorage.GcsFilename;
import com.google.appengine.tools.cloudstorage.GcsService;
import com.google.appengine.tools.cloudstorage.GcsServiceFactory;
import com.google.appengine.tools.cloudstorage.GcsServiceOptions;
import com.google.appengine.tools.mapreduce.impl.util.LevelDbConstants;
import com.google.common.collect.ImmutableMap;

import java.io.IOException;
import java.nio.channels.ReadableByteChannel;

/**
* A simple wrapper of LevelDb wrapper for GCS to provide getProgress() and do lazy initialization.
*/
public final class GoogleCloudStorageLevelDbInputReader extends LevelDbInputReader {

  private static final GcsService gcsService = GcsServiceFactory.createGcsService(
      new GcsServiceOptions.Builder()
          .setRetryParams(GCS_RETRY_PARAMETERS)
          // TODO(user): include version once b/12689661 is fixed
          .setHttpHeaders(ImmutableMap.of("User-Agent", "App Engine MR"))
          .build());

  private static final long serialVersionUID = 1014960525070958327L;

  private final GcsFilename file;
  private final int bufferSize;
  private double length = -1;

  /**
   * @param file File to be read.
   * @param bufferSize The buffersize to be used by the Gcs prefetching read channel.
   */
  public GoogleCloudStorageLevelDbInputReader(GcsFilename file, int bufferSize) {
    this.file = checkNotNull(file, "Null file");
    this.bufferSize = bufferSize;
    checkArgument(bufferSize > 0, "Buffersize must be > 0");
  }

  @Override
  public Double getProgress() {
    if (length == -1) {
      GcsFileMetadata metadata = null;
      try {
        metadata = gcsService.getMetadata(file);
      } catch (IOException e) {
        // It is just an estimate so it's probably not worth throwing.
      }
      if (metadata == null) {
        return null;
      }
      length = metadata.getLength();
    }
    if (length == 0f) {
      return null;
    }
    return getBytesRead() / length;
  }

  @Override
  public ReadableByteChannel createReadableByteChannel() {
    length = -1;
    return gcsService.openPrefetchingReadChannel(file, 0, bufferSize);
  }

  @Override
  public long estimateMemoryRequirement() {
    return LevelDbConstants.BLOCK_SIZE + bufferSize * 2; // Double buffered
  }
}
TOP

Related Classes of com.google.appengine.tools.mapreduce.inputs.GoogleCloudStorageLevelDbInputReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.