package org.yaac.server.egql.processor;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
import java.util.List;
import java.util.logging.Logger;
import org.yaac.server.egql.evaluator.EvaluationResult;
import org.yaac.server.egql.processor.ProcessData.ProcessDataRecord;
import org.yaac.shared.SharedConstants;
import org.yaac.shared.SharedConstants.Datastore;
import org.yaac.shared.egql.ResultStatus;
import org.yaac.shared.file.FileDownloadPath;
import com.google.appengine.api.datastore.Cursor;
import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.FetchOptions;
import com.google.appengine.api.datastore.Query;
import com.google.appengine.api.datastore.QueryResultList;
/**
* @author Max Zhu (thebbsky@gmail.com)
*
*/
public class DatastoreLoader implements Processor {
/**
*
*/
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
private static Logger logger = Logger.getLogger(DatastoreLoader.class.getName());
private String kind;
/**
* used by datastore loader
*/
private Integer batchSize;
/**
* current cursor, used by datastore loader
*/
private String startCursor;
@SuppressWarnings("unused")
private DatastoreLoader() {}
public DatastoreLoader(String kind, Integer batchSize) {
super();
this.kind = kind;
this.batchSize = batchSize;
}
@Override
public ProcessData process(ProcessContext context, ProcessData input) {
// step 1 : read data
DatastoreService datastore = DatastoreServiceFactory.getDatastoreService();
FetchOptions fetchOptions = FetchOptions.Builder.withLimit(this.batchSize);
if (!isNullOrEmpty(this.startCursor)) {
fetchOptions.startCursor(Cursor.fromWebSafeString(this.startCursor));
}
// step 2 : convert data into evaluation result
QueryResultList<Entity> entities = datastore.prepare(new Query(kind)).asQueryResultList(fetchOptions);
this.startCursor = entities.getCursor().toWebSafeString(); // update context
ProcessData output = new ProcessData();
for (final Entity entity : entities) {
output.addRecord(new ProcessDataRecord() {
@Override
public EvaluationResult lookup(String name) {
if (isNullOrEmpty(name)) {
return null;
} else if (Datastore.SELECT_ALL_ASTERISK.equals(name)) { // special handling for select all case
List<EvaluationResult> resultList = newArrayListWithExpectedSize(entity.getProperties().size() + 1);
resultList.add(lookup(Datastore.KEY_RESERVED_NAME));
for (String propertyName : entity.getProperties().keySet()) {
resultList.add(lookup(propertyName));
}
EvaluationResult [] resultArray = new EvaluationResult[resultList.size()];
resultList.toArray(resultArray);
return new EvaluationResult(resultArray).withTitle(name);
} else { // normal case, include key selection and property selection
Object payload = Datastore.KEY_RESERVED_NAME.equals(name) ? entity.getKey() : entity.getProperty(name);
return new EvaluationResult(entity.getKey(), name, null, payload);
}
}
@Override
public Iterable<EvaluationResult> asIterable() {
List<EvaluationResult> result = newArrayListWithExpectedSize(entity.getProperties().size() + 1);
result.add(new EvaluationResult(entity.getKey(), SharedConstants.Datastore.KEY_RESERVED_NAME,
null, entity.getKey()));
for (String propertyName : entity.getProperties().keySet()) {
result.add(new EvaluationResult(entity.getKey(), propertyName, null, entity.getProperty(propertyName)));
}
return result;
}
@Override
public FileDownloadPath lookupFileReference(Integer index) {
throw new IllegalArgumentException();
}
});
}
// step 3 : check if there are more entities to go
if (entities.size() < this.batchSize) {
context.setStatus(ResultStatus.FINISHED);
}
return output;
}
}