Package org.openstreetmap.osmosis.plugin.elasticsearch

Source Code of org.openstreetmap.osmosis.plugin.elasticsearch.ElasticSearchWriterTask

package org.openstreetmap.osmosis.plugin.elasticsearch;

import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
import org.openstreetmap.osmosis.core.domain.v0_6.Entity;
import org.openstreetmap.osmosis.core.domain.v0_6.EntityType;
import org.openstreetmap.osmosis.core.task.v0_6.Sink;
import org.openstreetmap.osmosis.plugin.elasticsearch.builder.AbstractIndexBuilder;
import org.openstreetmap.osmosis.plugin.elasticsearch.utils.Endpoint;
import org.openstreetmap.osmosis.plugin.elasticsearch.utils.EntityCounter;
import org.openstreetmap.osmosis.plugin.elasticsearch.utils.Parameters;
import org.openstreetmap.osmosis.plugin.elasticsearch.worker.WorkerPool;

public class ElasticSearchWriterTask implements Sink {

  private static final Logger LOG = Logger.getLogger(ElasticSearchWriterTask.class.getName());

  private final Endpoint endpoint;
  private final Set<AbstractIndexBuilder> indexBuilders;
  private final EntityCounter entityCounter;
  private final WorkerPool workerPool;

  public ElasticSearchWriterTask(Endpoint endpoint, Set<AbstractIndexBuilder> indexBuilders, Parameters params) {
    this.endpoint = endpoint;
    this.indexBuilders = indexBuilders;
    this.entityCounter = new EntityCounter();
    this.workerPool = new WorkerPool(endpoint.getEntityDao(), params);
  }

  @Override
  public void initialize(Map<String, Object> metadata) {
    LOG.fine("initialize() with metadata: " + metadata.toString());
  }

  @Override
  public void process(EntityContainer entityContainer) {
    Entity entity = entityContainer.getEntity();
    EntityType type = entity.getType();
    workerPool.submit(entity);
    entityCounter.increment(type);
  }

  @Override
  public void complete() {
    workerPool.shutdown();
    LOG.info("OSM indexing completed!\n" +
        "total processed nodes: ....... " + entityCounter.getCount(EntityType.Node) + "\n" +
        "total processed ways: ........ " + entityCounter.getCount(EntityType.Way) + "\n" +
        "total processed relations: ... " + entityCounter.getCount(EntityType.Relation) + "\n" +
        "total processed bounds: ...... " + entityCounter.getCount(EntityType.Bound));
    buildSpecializedIndex();
  }

  protected void buildSpecializedIndex() {
    for (AbstractIndexBuilder indexBuilder : indexBuilders) {
      try {
        String indexName = indexBuilder.getSpecializedIndexName();
        LOG.info("Creating selected index [" + indexName + "]");
        indexBuilder.createIndex();
        LOG.info("Building selected index [" + indexName + "]");
        long time = System.currentTimeMillis();
        indexBuilder.buildIndex();
        time = System.currentTimeMillis() - time;
        LOG.info("Index [" + indexName + "] successfully built in " + time + " milliseconds!");
      } catch (Exception e) {
        LOG.log(Level.SEVERE, "Unable to build index", e);
        break;
      }
    }
  }

  @Override
  public void release() {
    float consumedMemoryMb = (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())
        / (float) Math.pow(1024, 2);
    LOG.info(String.format("Estimated memory consumption: %.2f MB", consumedMemoryMb));
    endpoint.getClient().close();
  }

}
TOP

Related Classes of org.openstreetmap.osmosis.plugin.elasticsearch.ElasticSearchWriterTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.