Package com.facebook.presto.hive

Source Code of com.facebook.presto.hive.AbstractTestHiveClientS3$TestingHiveMetastore

/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.CachingHiveMetastore;
import com.facebook.presto.hive.shaded.org.apache.thrift.TException;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSink;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TupleDomain;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import static com.facebook.presto.hadoop.HadoopFileStatus.isDirectory;
import static com.facebook.presto.hive.util.Types.checkType;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.TimeZoneKey.UTC_KEY;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

@Test(groups = "hive-s3")
public abstract class AbstractTestHiveClientS3
{
    private static final ConnectorSession SESSION = new ConnectorSession("user", "test", "default", "default", UTC_KEY, Locale.ENGLISH, null, null);

    protected String database;
    protected SchemaTableName tableS3;
    protected SchemaTableName temporaryCreateTable;

    protected HdfsEnvironment hdfsEnvironment;
    protected TestingHiveMetastore metastoreClient;
    protected HiveClient client;

    protected void setupHive(String databaseName)
    {
        database = databaseName;
        tableS3 = new SchemaTableName(database, "presto_test_s3");

        String random = UUID.randomUUID().toString().toLowerCase().replace("-", "");
        temporaryCreateTable = new SchemaTableName(database, "tmp_presto_test_create_s3_" + random);
    }

    protected void setup(String host, int port, String databaseName, String awsAccessKey, String awsSecretKey, String writableBucket)
    {
        setupHive(databaseName);

        HiveClientConfig hiveClientConfig = new HiveClientConfig()
                .setS3AwsAccessKey(awsAccessKey)
                .setS3AwsSecretKey(awsSecretKey);

        String proxy = System.getProperty("hive.metastore.thrift.client.socks-proxy");
        if (proxy != null) {
            hiveClientConfig.setMetastoreSocksProxy(HostAndPort.fromString(proxy));
        }

        HiveCluster hiveCluster = new TestingHiveCluster(hiveClientConfig, host, port);
        ExecutorService executor = newCachedThreadPool(daemonThreadsNamed("hive-s3-%s"));

        hdfsEnvironment = new HdfsEnvironment(new HdfsConfiguration(hiveClientConfig));
        metastoreClient = new TestingHiveMetastore(hiveCluster, executor, hiveClientConfig, writableBucket);
        client = new HiveClient(
                new HiveConnectorId("hive-test"),
                hiveClientConfig,
                metastoreClient,
                new NamenodeStats(),
                new HdfsEnvironment(new HdfsConfiguration(hiveClientConfig)),
                new HadoopDirectoryLister(),
                sameThreadExecutor(),
                new TypeRegistry());
    }

    @Test
    public void testGetRecordsS3()
            throws Exception
    {
        ConnectorTableHandle table = getTableHandle(tableS3);
        List<ConnectorColumnHandle> columnHandles = ImmutableList.copyOf(client.getColumnHandles(table).values());
        Map<String, Integer> columnIndex = indexColumns(columnHandles);

        ConnectorPartitionResult partitionResult = client.getPartitions(table, TupleDomain.<ConnectorColumnHandle>all());
        assertEquals(partitionResult.getPartitions().size(), 1);
        ConnectorSplitSource splitSource = client.getPartitionSplits(table, partitionResult.getPartitions());

        long sum = 0;
        for (ConnectorSplit split : getAllSplits(splitSource)) {
            try (RecordCursor cursor = client.getRecordSet(split, columnHandles).cursor()) {
                while (cursor.advanceNextPosition()) {
                    sum += cursor.getLong(columnIndex.get("t_bigint"));
                }
            }
        }
        assertEquals(sum, 78300);
    }

    @Test
    public void testGetFileStatus()
            throws Exception
    {
        Path basePath = new Path("s3://presto-test-hive/");
        Path tablePath = new Path(basePath, "presto_test_s3");
        Path filePath = new Path(tablePath, "test1.csv");
        FileSystem fs = hdfsEnvironment.getFileSystem(basePath);

        assertTrue(isDirectory(fs.getFileStatus(basePath)));
        assertTrue(isDirectory(fs.getFileStatus(tablePath)));
        assertFalse(isDirectory(fs.getFileStatus(filePath)));
        assertFalse(fs.exists(new Path(basePath, "foo")));
    }

    @Test
    public void testTableCreation()
            throws Exception
    {
        try {
            doCreateTable(temporaryCreateTable, "presto_test");
        }
        finally {
            dropTable(temporaryCreateTable);
        }
    }

    private void doCreateTable(SchemaTableName tableName, String tableOwner)
            throws InterruptedException
    {
        // begin creating the table
        List<ColumnMetadata> columns = ImmutableList.<ColumnMetadata>builder()
                .add(new ColumnMetadata("id", BIGINT, 1, false))
                .build();

        ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(tableName, columns, tableOwner);
        HiveOutputTableHandle outputHandle = client.beginCreateTable(SESSION, tableMetadata);

        // write the records
        RecordSink sink = client.getRecordSink(outputHandle);

        sink.beginRecord(1);
        sink.appendLong(1);
        sink.finishRecord();

        sink.beginRecord(1);
        sink.appendLong(3);
        sink.finishRecord();

        sink.beginRecord(1);
        sink.appendLong(2);
        sink.finishRecord();

        String fragment = sink.commit();

        // commit the table
        client.commitCreateTable(outputHandle, ImmutableList.of(fragment));

        // Hack to work around the metastore not being configured for S3.
        // The metastore tries to validate the location when creating the
        // table, which fails without explicit configuration for S3.
        // We work around that by using a dummy location when creating the
        // table and update it here to the correct S3 location.
        metastoreClient.updateTableLocation(database, tableName.getTableName(), outputHandle.getTargetPath());

        // load the new table
        ConnectorTableHandle tableHandle = getTableHandle(tableName);
        List<ConnectorColumnHandle> columnHandles = ImmutableList.copyOf(client.getColumnHandles(tableHandle).values());

        // verify the data
        ConnectorPartitionResult partitionResult = client.getPartitions(tableHandle, TupleDomain.<ConnectorColumnHandle>all());
        assertEquals(partitionResult.getPartitions().size(), 1);
        ConnectorSplitSource splitSource = client.getPartitionSplits(tableHandle, partitionResult.getPartitions());
        ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));

        try (RecordCursor cursor = client.getRecordSet(split, columnHandles).cursor()) {
            assertTrue(cursor.advanceNextPosition());
            assertEquals(cursor.getLong(0), 1);

            assertTrue(cursor.advanceNextPosition());
            assertEquals(cursor.getLong(0), 3);

            assertTrue(cursor.advanceNextPosition());
            assertEquals(cursor.getLong(0), 2);

            assertFalse(cursor.advanceNextPosition());
        }
    }

    private void dropTable(SchemaTableName table)
    {
        try {
            metastoreClient.dropTable(table.getSchemaName(), table.getTableName());
        }
        catch (RuntimeException e) {
            // this usually occurs because the table was not created
        }
    }

    private ConnectorTableHandle getTableHandle(SchemaTableName tableName)
    {
        ConnectorTableHandle handle = client.getTableHandle(SESSION, tableName);
        checkArgument(handle != null, "table not found: %s", tableName);
        return handle;
    }

    private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource source)
            throws InterruptedException
    {
        ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
        while (!source.isFinished()) {
            splits.addAll(source.getNextBatch(1000));
        }
        return splits.build();
    }

    private static ImmutableMap<String, Integer> indexColumns(List<ConnectorColumnHandle> columnHandles)
    {
        ImmutableMap.Builder<String, Integer> index = ImmutableMap.builder();
        int i = 0;
        for (ConnectorColumnHandle columnHandle : columnHandles) {
            HiveColumnHandle hiveColumnHandle = checkType(columnHandle, HiveColumnHandle.class, "columnHandle");
            index.put(hiveColumnHandle.getName(), i);
            i++;
        }
        return index.build();
    }

    private static class TestingHiveMetastore
            extends CachingHiveMetastore
    {
        private final String writableBucket;

        public TestingHiveMetastore(HiveCluster hiveCluster, ExecutorService executor, HiveClientConfig hiveClientConfig, String writableBucket)
        {
            super(hiveCluster, executor, hiveClientConfig);
            this.writableBucket = writableBucket;
        }

        @Override
        public Database getDatabase(String databaseName)
                throws NoSuchObjectException
        {
            Database database = super.getDatabase(databaseName);
            database.setLocationUri("s3://" + writableBucket + "/");
            return database;
        }

        @Override
        public void createTable(Table table)
        {
            // hack to work around the metastore not being configured for S3
            table.getSd().setLocation("/");
            super.createTable(table);
        }

        @Override
        public void dropTable(String databaseName, String tableName)
        {
            try {
                // hack to work around the metastore not being configured for S3
                Table table = getTable(databaseName, tableName);
                table.getSd().setLocation("/");
                try (HiveMetastoreClient client = clientProvider.createMetastoreClient()) {
                    client.alter_table(databaseName, tableName, table);
                    client.drop_table(databaseName, tableName, false);
                }
            }
            catch (TException e) {
                throw Throwables.propagate(e);
            }
        }

        public void updateTableLocation(String databaseName, String tableName, String location)
        {
            try {
                Table table = getTable(databaseName, tableName);
                table.getSd().setLocation(location);
                try (HiveMetastoreClient client = clientProvider.createMetastoreClient()) {
                    client.alter_table(databaseName, tableName, table);
                }
            }
            catch (TException e) {
                throw Throwables.propagate(e);
            }
        }
    }
}
TOP

Related Classes of com.facebook.presto.hive.AbstractTestHiveClientS3$TestingHiveMetastore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.