Package co.cask.cdap.explore.client

Source Code of co.cask.cdap.explore.client.ExploreHttpClient

/*
* Copyright © 2014 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package co.cask.cdap.explore.client;

import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.http.HttpMethod;
import co.cask.cdap.common.http.HttpRequest;
import co.cask.cdap.common.http.HttpRequests;
import co.cask.cdap.common.http.HttpResponse;
import co.cask.cdap.explore.service.Explore;
import co.cask.cdap.explore.service.ExploreException;
import co.cask.cdap.explore.service.HandleNotFoundException;
import co.cask.cdap.explore.service.MetaDataInfo;
import co.cask.cdap.explore.service.TableNotFoundException;
import co.cask.cdap.explore.utils.ColumnsArgs;
import co.cask.cdap.explore.utils.FunctionsArgs;
import co.cask.cdap.explore.utils.SchemasArgs;
import co.cask.cdap.explore.utils.TablesArgs;
import co.cask.cdap.proto.ColumnDesc;
import co.cask.cdap.proto.QueryHandle;
import co.cask.cdap.proto.QueryInfo;
import co.cask.cdap.proto.QueryResult;
import co.cask.cdap.proto.QueryStatus;
import co.cask.cdap.proto.TableInfo;
import co.cask.cdap.proto.TableNameInfo;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSyntaxException;
import com.google.gson.reflect.TypeToken;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
* The methods of this class call the HTTP APIs exposed by explore and return the raw information
* contained in their json responses. This class is only meant to be extended by classes
* which implement ExploreClient.
*/
abstract class ExploreHttpClient implements Explore {
  private static final Logger LOG = LoggerFactory.getLogger(ExploreHttpClient.class);
  private static final Gson GSON = new Gson();

  private static final Type MAP_TYPE_TOKEN = new TypeToken<Map<String, String>>() { }.getType();
  private static final Type TABLES_TYPE = new TypeToken<List<TableNameInfo>>() { }.getType();
  private static final Type COL_DESC_LIST_TYPE = new TypeToken<List<ColumnDesc>>() { }.getType();
  private static final Type QUERY_INFO_LIST_TYPE = new TypeToken<List<QueryInfo>>() { }.getType();
  private static final Type ROW_LIST_TYPE = new TypeToken<List<QueryResult>>() { }.getType();

  protected abstract InetSocketAddress getExploreServiceAddress();

  protected abstract String getAuthorizationToken();

  protected boolean isAvailable() {
    try {
      HttpResponse response = doGet("explore/status");
      return HttpResponseStatus.OK.getCode() == response.getResponseCode();
    } catch (Exception e) {
      LOG.info("Caught exception when checking Explore availability", e);
      return false;
    }
  }

  protected QueryHandle doEnableExplore(String datasetInstance) throws ExploreException {
    HttpResponse response = doPost(String.format("data/explore/datasets/%s/enable", datasetInstance), null, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot enable explore on dataset " + datasetInstance + ". Reason: " +
                                 getDetails(response));
  }

  protected QueryHandle doDisableExplore(String datasetInstance) throws ExploreException {
    HttpResponse response = doPost(String.format("data/explore/datasets/%s/disable", datasetInstance), null, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot disable explore on dataset " + datasetInstance + ". Reason: " +
                                 getDetails(response));
  }

  @Override
  public QueryHandle execute(String statement) throws ExploreException {
    HttpResponse response = doPost("data/explore/queries", GSON.toJson(ImmutableMap.of("query", statement)), null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot execute query. Reason: " + getDetails(response));
  }

  @Override
  public QueryStatus getStatus(QueryHandle handle) throws ExploreException, HandleNotFoundException {
    HttpResponse response = doGet(String.format("data/explore/queries/%s/%s", handle.getHandle(), "status"));
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, QueryStatus.class);
    } else if (HttpResponseStatus.NOT_FOUND.getCode() == response.getResponseCode()) {
      throw new HandleNotFoundException("Handle " + handle.getHandle() + "not found.");
    }
    throw new ExploreException("Cannot get status. Reason: " + getDetails(response));
  }

  @Override
  public List<ColumnDesc> getResultSchema(QueryHandle handle) throws ExploreException, HandleNotFoundException {
    HttpResponse response = doGet(String.format("data/explore/queries/%s/%s", handle.getHandle(), "schema"));
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, COL_DESC_LIST_TYPE);
    } else if (HttpResponseStatus.NOT_FOUND.getCode() == response.getResponseCode()) {
      throw new HandleNotFoundException("Handle " + handle.getHandle() + "not found.");
    }
    throw new ExploreException("Cannot get result schema. Reason: " + getDetails(response));
  }

  @Override
  public List<QueryResult> nextResults(QueryHandle handle, int size) throws ExploreException, HandleNotFoundException {
    HttpResponse response = doPost(String.format("data/explore/queries/%s/%s", handle.getHandle(), "next"),
                                   GSON.toJson(ImmutableMap.of("size", size)), null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, ROW_LIST_TYPE);
    } else if (HttpResponseStatus.NOT_FOUND.getCode() == response.getResponseCode()) {
      throw new HandleNotFoundException("Handle " + handle.getHandle() + "not found.");
    }
    throw new ExploreException("Cannot get next results. Reason: " + getDetails(response));
  }

  @Override
  public List<QueryResult> previewResults(QueryHandle handle)
    throws ExploreException, HandleNotFoundException, SQLException {
    HttpResponse response = doPost(String.format("data/explore/queries/%s/%s", handle.getHandle(), "preview"),
                                   null, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, ROW_LIST_TYPE);
    } else if (HttpResponseStatus.NOT_FOUND.getCode() == response.getResponseCode()) {
      throw new HandleNotFoundException("Handle " + handle.getHandle() + "not found.");
    }
    throw new ExploreException("Cannot get results preview. Reason: " + getDetails(response));
  }

  @Override
  public void close(QueryHandle handle) throws ExploreException, HandleNotFoundException {
    HttpResponse response = doDelete(String.format("data/explore/queries/%s", handle.getHandle()));
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return;
    } else if (HttpResponseStatus.NOT_FOUND.getCode() == response.getResponseCode()) {
      throw new HandleNotFoundException("Handle " + handle.getHandle() + "not found.");
    }
    throw new ExploreException("Cannot close operation. Reason: " + getDetails(response));
  }

  @Override
  public List<QueryInfo> getQueries() throws ExploreException, SQLException {

    HttpResponse response = doGet("data/explore/queries/");
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, QUERY_INFO_LIST_TYPE);
    }
    throw new ExploreException("Cannot get list of queries. Reason: " + getDetails(response));
  }

  public QueryHandle getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
    throws ExploreException, SQLException {
    String body = GSON.toJson(new ColumnsArgs(catalog, schemaPattern,
                                                                tableNamePattern, columnNamePattern));
    HttpResponse response = doPost("data/explore/jdbc/columns", body, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the columns. Reason: " + getDetails(response));
  }

  @Override
  public QueryHandle getCatalogs() throws ExploreException, SQLException {
    HttpResponse response = doPost("data/explore/jdbc/catalogs", null, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the catalogs. Reason: " + getDetails(response));
  }

  @Override
  public QueryHandle getSchemas(String catalog, String schemaPattern) throws ExploreException, SQLException {
    String body = GSON.toJson(new SchemasArgs(catalog, schemaPattern));
    HttpResponse response = doPost("data/explore/jdbc/schemas", body, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the schemas. Reason: " + getDetails(response));
  }

  @Override
  public QueryHandle getFunctions(String catalog, String schemaPattern, String functionNamePattern)
    throws ExploreException, SQLException {
    String body = GSON.toJson(new FunctionsArgs(catalog, schemaPattern, functionNamePattern));
    HttpResponse response = doPost("data/explore/jdbc/functions", body, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the functions. Reason: " + getDetails(response));
  }

  @Override
  public MetaDataInfo getInfo(MetaDataInfo.InfoType infoType) throws ExploreException, SQLException {
    HttpResponse response = doGet(String.format("data/explore/jdbc/info/%s", infoType.name()));
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, MetaDataInfo.class);
    }
    throw new ExploreException("Cannot get information " + infoType.name() + ". Reason: " + getDetails(response));
  }

  @Override
  public QueryHandle getTables(String catalog, String schemaPattern,
                               String tableNamePattern, List<String> tableTypes) throws ExploreException, SQLException {
    String body = GSON.toJson(new TablesArgs(catalog, schemaPattern, tableNamePattern, tableTypes));
    HttpResponse response = doPost("data/explore/jdbc/tables", body, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the tables. Reason: " + getDetails(response));
  }

  @Override
  public List<TableNameInfo> getTables(@Nullable String database) throws ExploreException {
    HttpResponse response = doGet(String.format("data/explore/tables%s", (database != null) ? "?db=" + database : ""));
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, TABLES_TYPE);
    }
    throw new ExploreException("Cannot get the tables. Reason: " + getDetails(response));
  }

  @Override
  public TableInfo getTableInfo(@Nullable String database, String table)
    throws ExploreException, TableNotFoundException {
    String tableNamePrefix = (database != null) ? database + "." : "";
    HttpResponse response = doGet(String.format("data/explore/tables/%s/info", tableNamePrefix + table));
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return parseJson(response, TableInfo.class);
    } else if (HttpResponseStatus.NOT_FOUND.getCode() == response.getResponseCode()) {
      throw new TableNotFoundException("Table " + tableNamePrefix + table + " not found.");
    }
    throw new ExploreException("Cannot get the schema of table " + tableNamePrefix + table +
                               ". Reason: " + getDetails(response));
  }

  @Override
  public QueryHandle getTableTypes() throws ExploreException, SQLException {
    HttpResponse response = doPost("data/explore/jdbc/tableTypes", null, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the tables. Reason: " + getDetails(response));
  }

  @Override
  public QueryHandle getTypeInfo() throws ExploreException, SQLException {
    HttpResponse response = doPost("data/explore/jdbc/types", null, null);
    if (HttpResponseStatus.OK.getCode() == response.getResponseCode()) {
      return QueryHandle.fromId(parseResponseAsMap(response, "handle"));
    }
    throw new ExploreException("Cannot get the tables. Reason: " + getDetails(response));
  }

  private String parseResponseAsMap(HttpResponse response, String key) throws ExploreException {
    Map<String, String> responseMap = parseJson(response, MAP_TYPE_TOKEN);
    if (responseMap.containsKey(key)) {
      return responseMap.get(key);
    }

    String message = String.format("Cannot find key %s in server response: %s", key,
                                   new String(response.getResponseBody(), Charsets.UTF_8));
    LOG.error(message);
    throw new ExploreException(message);
  }

  private <T> T parseJson(HttpResponse response, Type type) throws ExploreException {
    String responseString = new String(response.getResponseBody(), Charsets.UTF_8);
    try {
      return GSON.fromJson(responseString, type);
    } catch (JsonSyntaxException e) {
      String message = String.format("Cannot parse server response: %s", responseString);
      LOG.error(message, e);
      throw new ExploreException(message, e);
    } catch (JsonParseException e) {
      String message = String.format("Cannot parse server response as map: %s", responseString);
      LOG.error(message, e);
      throw new ExploreException(message, e);
    }
  }

  private HttpResponse doGet(String resource) throws ExploreException {
    return doRequest(resource, "GET", null, null);
  }

  private HttpResponse doPost(String resource, String body, Map<String, String> headers) throws ExploreException {
    return doRequest(resource, "POST", headers, body);
  }

  private HttpResponse doDelete(String resource) throws ExploreException {
    return doRequest(resource, "DELETE", null, null);
  }

  private HttpResponse doRequest(String resource, String requestMethod,
                                 @Nullable Map<String, String> headers,
                                 @Nullable String body) throws ExploreException {
    Map<String, String> newHeaders = headers;
    if (getAuthorizationToken() != null && !getAuthorizationToken().isEmpty()) {
      newHeaders = (headers != null) ? Maps.newHashMap(headers) : Maps.<String, String>newHashMap();
      newHeaders.put("Authorization", "Bearer " + getAuthorizationToken());
    }
    String resolvedUrl = resolve(resource);
    try {
      URL url = new URL(resolvedUrl);
      if (body != null) {
        return HttpRequests.execute(HttpRequest.builder(HttpMethod.valueOf(requestMethod), url)
                                      .addHeaders(newHeaders).withBody(body).build());
      } else {
        return HttpRequests.execute(HttpRequest.builder(HttpMethod.valueOf(requestMethod), url)
                                      .addHeaders(newHeaders).build());
      }
    } catch (IOException e) {
      throw new ExploreException(
        String.format("Error connecting to Explore Service at %s while doing %s with headers %s and body %s",
                      resolvedUrl, requestMethod,
                      newHeaders == null ? "null" : Joiner.on(",").withKeyValueSeparator("=").join(newHeaders),
                      body == null ? "null" : body), e);
    }
  }

  private String getDetails(HttpResponse response) {
    return String.format("Response code: %s, message:'%s', body: '%s'",
                         response.getResponseCode(), response.getResponseMessage(),
                         response.getResponseBody() == null ?
                           "null" : new String(response.getResponseBody(), Charsets.UTF_8));

  }

  private String resolve(String resource) {
    InetSocketAddress addr = getExploreServiceAddress();
    String url = String.format("http://%s:%s%s/%s", addr.getHostName(), addr.getPort(),
                               Constants.Gateway.GATEWAY_VERSION, resource);
    LOG.trace("Explore URL = {}", url);
    return url;
  }

}
TOP

Related Classes of co.cask.cdap.explore.client.ExploreHttpClient

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.