/*
* Copyright © 2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package co.cask.cdap.data2.datafabric.dataset.service;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.data2.datafabric.dataset.type.DatasetModuleConflictException;
import co.cask.cdap.data2.datafabric.dataset.type.DatasetTypeManager;
import co.cask.cdap.proto.DatasetModuleMeta;
import co.cask.cdap.proto.DatasetTypeMeta;
import co.cask.http.AbstractHttpHandler;
import co.cask.http.HandlerContext;
import co.cask.http.HttpResponder;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.inject.Inject;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
/**
* Handles dataset type management calls.
*/
// todo: do we want to make it authenticated? or do we treat it always as "internal" piece?
@Path(Constants.Gateway.GATEWAY_VERSION)
public class DatasetTypeHandler extends AbstractHttpHandler {
public static final String HEADER_CLASS_NAME = "X-Class-Name";
private static final Logger LOG = LoggerFactory.getLogger(DatasetTypeHandler.class);
private final DatasetTypeManager manager;
private final LocationFactory locationFactory;
private final String archiveDir;
@Inject
public DatasetTypeHandler(DatasetTypeManager manager,
LocationFactory locationFactory,
CConfiguration conf) {
this.manager = manager;
this.locationFactory = locationFactory;
String dataFabricDir = conf.get(Constants.Dataset.Manager.OUTPUT_DIR, System.getProperty("java.io.tmpdir"));
this.archiveDir = dataFabricDir + "/archive";
}
@Override
public void init(HandlerContext context) {
LOG.info("Starting DatasetTypeHandler");
}
@Override
public void destroy(HandlerContext context) {
LOG.info("Stopping DatasetTypeHandler");
}
@GET
@Path("/data/modules")
public void listModules(HttpRequest request, final HttpResponder responder) {
// Sorting by name for convenience
List<DatasetModuleMeta> list = Lists.newArrayList(manager.getModules());
Collections.sort(list, new Comparator<DatasetModuleMeta>() {
@Override
public int compare(DatasetModuleMeta o1, DatasetModuleMeta o2) {
return o1.getName().compareTo(o2.getName());
}
});
responder.sendJson(HttpResponseStatus.OK, list);
}
@DELETE
@Path("/data/modules")
public void deleteModules(HttpRequest request, final HttpResponder responder) {
try {
manager.deleteModules();
responder.sendStatus(HttpResponseStatus.OK);
} catch (DatasetModuleConflictException e) {
responder.sendError(HttpResponseStatus.CONFLICT, e.getMessage());
}
}
@PUT
@Path("/data/modules/{name}")
public void addModule(HttpRequest request, final HttpResponder responder,
@PathParam("name") String name) throws IOException {
String className = request.getHeader(HEADER_CLASS_NAME);
Preconditions.checkArgument(className != null, "Required header 'class-name' is absent.");
LOG.info("Adding module {}, class name: {}", name, className);
DatasetModuleMeta existing = manager.getModule(name);
if (existing != null) {
String message = String.format("Cannot add module %s: module with same name already exists: %s",
name, existing);
LOG.warn(message);
responder.sendError(HttpResponseStatus.CONFLICT, message);
return;
}
ChannelBuffer content = request.getContent();
if (content == null) {
LOG.warn("Cannot add module {}: content is null", name);
responder.sendString(HttpResponseStatus.BAD_REQUEST, "Content is null");
return;
}
Location uploadDir = locationFactory.create(archiveDir).append("account_placeholder");
String archiveName = name + ".jar";
Location archive = uploadDir.append(archiveName);
LOG.info("Storing module {} jar at {}", name, archive.toURI().toString());
if (!uploadDir.exists() && !uploadDir.mkdirs()) {
LOG.warn("Unable to create directory '{}'", uploadDir.getName());
}
InputStream inputStream = new ChannelBufferInputStream(content);
try {
// todo: store to temp file first and do some verifications? Or even datasetFramework should persist file?
OutputStream outStream = archive.getOutputStream();
try {
ByteStreams.copy(inputStream, outStream);
} finally {
outStream.close();
}
} finally {
inputStream.close();
}
try {
manager.addModule(name, className, archive);
} catch (DatasetModuleConflictException e) {
responder.sendError(HttpResponseStatus.CONFLICT, e.getMessage());
return;
}
// todo: response with DatasetModuleMeta of just added module (and log this info)
LOG.info("Added module {}", name);
responder.sendStatus(HttpResponseStatus.OK);
}
@DELETE
@Path("/data/modules/{name}")
public void deleteModule(HttpRequest request, final HttpResponder responder, @PathParam("name") String name) {
boolean deleted;
try {
deleted = manager.deleteModule(name);
} catch (DatasetModuleConflictException e) {
responder.sendError(HttpResponseStatus.CONFLICT, e.getMessage());
return;
}
if (!deleted) {
responder.sendStatus(HttpResponseStatus.NOT_FOUND);
return;
}
responder.sendStatus(HttpResponseStatus.OK);
}
@GET
@Path("/data/modules/{name}")
public void getModuleInfo(HttpRequest request, final HttpResponder responder, @PathParam("name") String name) {
DatasetModuleMeta moduleMeta = manager.getModule(name);
if (moduleMeta == null) {
responder.sendStatus(HttpResponseStatus.NOT_FOUND);
} else {
responder.sendJson(HttpResponseStatus.OK, moduleMeta);
}
}
@GET
@Path("/data/types")
public void listTypes(HttpRequest request, final HttpResponder responder) {
// Sorting by name for convenience
List<DatasetTypeMeta> list = Lists.newArrayList(manager.getTypes());
Collections.sort(list, new Comparator<DatasetTypeMeta>() {
@Override
public int compare(DatasetTypeMeta o1, DatasetTypeMeta o2) {
return o1.getName().compareTo(o2.getName());
}
});
responder.sendJson(HttpResponseStatus.OK, list);
}
@GET
@Path("/data/types/{name}")
public void getTypeInfo(HttpRequest request, final HttpResponder responder,
@PathParam("name") String name) {
DatasetTypeMeta typeMeta = manager.getTypeInfo(name);
if (typeMeta == null) {
responder.sendStatus(HttpResponseStatus.NOT_FOUND);
} else {
responder.sendJson(HttpResponseStatus.OK, typeMeta);
}
}
}