// This software is released into the Public Domain. See copying.txt for details.
package org.openstreetmap.osmosis.apidb.v0_6;
import java.util.Collections;
import java.util.Date;
import org.openstreetmap.osmosis.core.OsmosisConstants;
import org.openstreetmap.osmosis.apidb.common.DatabaseContext2;
import org.openstreetmap.osmosis.apidb.v0_6.impl.AllEntityDao;
import org.openstreetmap.osmosis.apidb.v0_6.impl.EntitySnapshotReader;
import org.openstreetmap.osmosis.apidb.v0_6.impl.SchemaVersionValidator;
import org.openstreetmap.osmosis.core.container.v0_6.BoundContainer;
import org.openstreetmap.osmosis.core.container.v0_6.EntityContainer;
import org.openstreetmap.osmosis.core.database.DatabaseLoginCredentials;
import org.openstreetmap.osmosis.core.database.DatabasePreferences;
import org.openstreetmap.osmosis.core.domain.v0_6.Bound;
import org.openstreetmap.osmosis.core.lifecycle.ReleasableIterator;
import org.openstreetmap.osmosis.core.task.v0_6.RunnableSource;
import org.openstreetmap.osmosis.core.task.v0_6.Sink;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
/**
* An OSM data source reading from a database. The entire contents of the database are read.
*
* @author Brett Henderson
*/
public class ApidbReader implements RunnableSource {
private Sink sink;
private DatabaseLoginCredentials loginCredentials;
private DatabasePreferences preferences;
private Date snapshotInstant;
/**
* Creates a new instance.
*
* @param loginCredentials Contains all information required to connect to the database.
* @param preferences Contains preferences configuring database behaviour.
* @param snapshotInstant The state of the node table at this point in time will be dumped. This
* ensures a consistent snapshot.
*/
public ApidbReader(DatabaseLoginCredentials loginCredentials, DatabasePreferences preferences,
Date snapshotInstant) {
this.loginCredentials = loginCredentials;
this.preferences = preferences;
this.snapshotInstant = snapshotInstant;
}
/**
* {@inheritDoc}
*/
public void setSink(Sink sink) {
this.sink = sink;
}
/**
* Runs the task implementation. This is called by the run method within a transaction.
*
* @param dbCtx
* Used to access the database.
*/
protected void runImpl(DatabaseContext2 dbCtx) {
try {
AllEntityDao entityDao;
ReleasableIterator<EntityContainer> reader;
sink.initialize(Collections.<String, Object>emptyMap());
new SchemaVersionValidator(loginCredentials, preferences)
.validateVersion(ApidbVersionConstants.SCHEMA_MIGRATIONS);
entityDao = new AllEntityDao(dbCtx.getJdbcTemplate());
sink.process(new BoundContainer(new Bound("Osmosis " + OsmosisConstants.VERSION)));
reader = new EntitySnapshotReader(entityDao.getHistory(), snapshotInstant);
try {
while (reader.hasNext()) {
sink.process(reader.next());
}
} finally {
reader.release();
}
sink.complete();
} finally {
sink.release();
}
}
/**
* Reads all data from the database and send it to the sink.
*/
public void run() {
final DatabaseContext2 dbCtx = new DatabaseContext2(loginCredentials);
try {
dbCtx.executeWithinTransaction(new TransactionCallbackWithoutResult() {
private DatabaseContext2 dbCtxInner = dbCtx;
@Override
protected void doInTransactionWithoutResult(TransactionStatus arg0) {
runImpl(dbCtxInner);
} });
} finally {
dbCtx.release();
}
}
}