Package org.apache.hadoop.hive.metastore

Source Code of org.apache.hadoop.hive.metastore.TestMetaStoreEventListener

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.metastore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import junit.framework.TestCase;

import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.PreAlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.PreAlterTableEvent;
import org.apache.hadoop.hive.metastore.events.PreCreateDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreCreateTableEvent;
import org.apache.hadoop.hive.metastore.events.PreDropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreDropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
import org.apache.hadoop.hive.metastore.events.PreEventContext;
import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;

/**
* TestMetaStoreEventListener. Test case for
* {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} and
* {@link org.apache.hadoop.hive.metastore.MetaStorePreEventListener}
*/
public class TestMetaStoreEventListener extends TestCase {
  private HiveConf hiveConf;
  private HiveMetaStoreClient msc;
  private Driver driver;

  private static final String dbName = "hive2038";
  private static final String tblName = "tmptbl";
  private static final String renamed = "tmptbl2";

  @Override
  protected void setUp() throws Exception {

    super.setUp();

    System.setProperty("hive.metastore.event.listeners",
        DummyListener.class.getName());
    System.setProperty("hive.metastore.pre.event.listeners",
        DummyPreListener.class.getName());

    int port = MetaStoreUtils.findFreePort();
    MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge());

    hiveConf = new HiveConf(this.getClass());
    hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
    hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
    SessionState.start(new CliSessionState(hiveConf));
    msc = new HiveMetaStoreClient(hiveConf, null);
    driver = new Driver(hiveConf);

    driver.run("drop database if exists " + dbName + " cascade");

    DummyListener.notifyList.clear();
    DummyPreListener.notifyList.clear();
  }

  @Override
  protected void tearDown() throws Exception {
    super.tearDown();
  }

  private void validateCreateDb(Database expectedDb, Database actualDb) {
    assertEquals(expectedDb.getName(), actualDb.getName());
    assertEquals(expectedDb.getLocationUri(), actualDb.getLocationUri());
  }

  private void validateTable(Table expectedTable, Table actualTable) {
    assertEquals(expectedTable.getTableName(), actualTable.getTableName());
    assertEquals(expectedTable.getDbName(), actualTable.getDbName());
    assertEquals(expectedTable.getSd().getLocation(), actualTable.getSd().getLocation());
  }

  private void validateCreateTable(Table expectedTable, Table actualTable) {
    validateTable(expectedTable, actualTable);
  }

  private void validateAddPartition(Partition expectedPartition, Partition actualPartition) {
    assertEquals(expectedPartition, actualPartition);
  }

  private void validateTableInAddPartition(Table expectedTable, Table actualTable) {
    assertEquals(expectedTable, actualTable);
  }

  private void validatePartition(Partition expectedPartition, Partition actualPartition) {
    assertEquals(expectedPartition.getValues(), actualPartition.getValues());
    assertEquals(expectedPartition.getDbName(), actualPartition.getDbName());
    assertEquals(expectedPartition.getTableName(), actualPartition.getTableName());
  }

  private void validateAlterPartition(Partition expectedOldPartition,
      Partition expectedNewPartition, String actualOldPartitionDbName,
      String actualOldPartitionTblName,List<String> actualOldPartitionValues,
      Partition actualNewPartition) {
    assertEquals(expectedOldPartition.getValues(), actualOldPartitionValues);
    assertEquals(expectedOldPartition.getDbName(), actualOldPartitionDbName);
    assertEquals(expectedOldPartition.getTableName(), actualOldPartitionTblName);

    validatePartition(expectedNewPartition, actualNewPartition);
  }

  private void validateAlterTable(Table expectedOldTable, Table expectedNewTable,
      Table actualOldTable, Table actualNewTable) {
    validateTable(expectedOldTable, actualOldTable);
    validateTable(expectedNewTable, actualNewTable);
  }

  private void validateAlterTableColumns(Table expectedOldTable, Table expectedNewTable,
      Table actualOldTable, Table actualNewTable) {
    validateAlterTable(expectedOldTable, expectedNewTable, actualOldTable, actualNewTable);

    assertEquals(expectedOldTable.getSd().getCols(), actualOldTable.getSd().getCols());
    assertEquals(expectedNewTable.getSd().getCols(), actualNewTable.getSd().getCols());
  }

  private void validateLoadPartitionDone(String expectedTableName,
      Map<String,String> expectedPartitionName, String actualTableName,
      Map<String,String> actualPartitionName) {
    assertEquals(expectedPartitionName, actualPartitionName);
    assertEquals(expectedTableName, actualTableName);
  }

  private void validateDropPartition(Partition expectedPartition, Partition actualPartition) {
    validatePartition(expectedPartition, actualPartition);
  }

  private void validateTableInDropPartition(Table expectedTable, Table actualTable) {
    validateTable(expectedTable, actualTable);
  }

  private void validateDropTable(Table expectedTable, Table actualTable) {
    validateTable(expectedTable, actualTable);
  }

  private void validateDropDb(Database expectedDb, Database actualDb) {
    assertEquals(expectedDb, actualDb);
  }

  public void testListener() throws Exception {
    int listSize = 0;

    List<ListenerEvent> notifyList = DummyListener.notifyList;
    assertEquals(notifyList.size(), listSize);
    List<PreEventContext> preNotifyList = DummyPreListener.notifyList;
    assertEquals(preNotifyList.size(), listSize);

    driver.run("create database " + dbName);
    listSize++;
    Database db = msc.getDatabase(dbName);
    assertEquals(listSize, notifyList.size());
    assertEquals(listSize, preNotifyList.size());

    CreateDatabaseEvent dbEvent = (CreateDatabaseEvent)(notifyList.get(listSize - 1));
    assert dbEvent.getStatus();
    validateCreateDb(db, dbEvent.getDatabase());

    PreCreateDatabaseEvent preDbEvent = (PreCreateDatabaseEvent)(preNotifyList.get(listSize - 1));
    validateCreateDb(db, preDbEvent.getDatabase());

    driver.run("use " + dbName);
    driver.run(String.format("create table %s (a string) partitioned by (b string)", tblName));
    listSize++;
    Table tbl = msc.getTable(dbName, tblName);
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    CreateTableEvent tblEvent = (CreateTableEvent)(notifyList.get(listSize - 1));
    assert tblEvent.getStatus();
    validateCreateTable(tbl, tblEvent.getTable());

    PreCreateTableEvent preTblEvent = (PreCreateTableEvent)(preNotifyList.get(listSize - 1));
    validateCreateTable(tbl, preTblEvent.getTable());

    driver.run("alter table tmptbl add partition (b='2011')");
    listSize++;
    Partition part = msc.getPartition("hive2038", "tmptbl", "b=2011");
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    AddPartitionEvent partEvent = (AddPartitionEvent)(notifyList.get(listSize-1));
    assert partEvent.getStatus();
    validateAddPartition(part, partEvent.getPartitions().get(0));
    validateTableInAddPartition(tbl, partEvent.getTable());

    PreAddPartitionEvent prePartEvent = (PreAddPartitionEvent)(preNotifyList.get(listSize-1));
    validateAddPartition(part, prePartEvent.getPartitions().get(0));

    // Test adding multiple partitions in a single partition-set, atomically.
    int currentTime = (int)System.currentTimeMillis();
    HiveMetaStoreClient hmsClient = new HiveMetaStoreClient(hiveConf);
    Table table = hmsClient.getTable(dbName, "tmptbl");
    Partition partition1 = new Partition(Arrays.asList("20110101"), dbName, "tmptbl", currentTime,
                                        currentTime, table.getSd(), table.getParameters());
    Partition partition2 = new Partition(Arrays.asList("20110102"), dbName, "tmptbl", currentTime,
                                        currentTime, table.getSd(), table.getParameters());
    Partition partition3 = new Partition(Arrays.asList("20110103"), dbName, "tmptbl", currentTime,
                                        currentTime, table.getSd(), table.getParameters());
    hmsClient.add_partitions(Arrays.asList(partition1, partition2, partition3));
    ++listSize;
    AddPartitionEvent multiplePartitionEvent = (AddPartitionEvent)(notifyList.get(listSize-1));
    assertEquals("Unexpected number of partitions in event!", 3, multiplePartitionEvent.getPartitions().size());
    assertEquals("Unexpected table value.", table, multiplePartitionEvent.getTable());
    assertEquals("Unexpected partition value.", partition1.getValues(), multiplePartitionEvent.getPartitions().get(0).getValues());
    assertEquals("Unexpected partition value.", partition2.getValues(), multiplePartitionEvent.getPartitions().get(1).getValues());
    assertEquals("Unexpected partition value.", partition3.getValues(), multiplePartitionEvent.getPartitions().get(2).getValues());

    driver.run(String.format("alter table %s touch partition (%s)", tblName, "b='2011'"));
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    //the partition did not change,
    // so the new partition should be similar to the original partition
    Partition origP = msc.getPartition(dbName, tblName, "b=2011");

    AlterPartitionEvent alterPartEvent = (AlterPartitionEvent)notifyList.get(listSize - 1);
    assert alterPartEvent.getStatus();
    validateAlterPartition(origP, origP, alterPartEvent.getOldPartition().getDbName(),
        alterPartEvent.getOldPartition().getTableName(),
        alterPartEvent.getOldPartition().getValues(), alterPartEvent.getNewPartition());

    PreAlterPartitionEvent preAlterPartEvent =
        (PreAlterPartitionEvent)preNotifyList.get(listSize - 1);
    validateAlterPartition(origP, origP, preAlterPartEvent.getDbName(),
        preAlterPartEvent.getTableName(), preAlterPartEvent.getNewPartition().getValues(),
        preAlterPartEvent.getNewPartition());

    List<String> part_vals = new ArrayList<String>();
    part_vals.add("c=2012");
    Partition newPart = msc.appendPartition(dbName, tblName, part_vals);

    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    AddPartitionEvent appendPartEvent =
        (AddPartitionEvent)(notifyList.get(listSize-1));
    validateAddPartition(newPart, appendPartEvent.getPartitions().get(0));

    PreAddPartitionEvent preAppendPartEvent =
        (PreAddPartitionEvent)(preNotifyList.get(listSize-1));
    validateAddPartition(newPart, preAppendPartEvent.getPartitions().get(0));

    driver.run(String.format("alter table %s rename to %s", tblName, renamed));
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    Table renamedTable = msc.getTable(dbName, renamed);

    AlterTableEvent alterTableE = (AlterTableEvent) notifyList.get(listSize-1);
    assert alterTableE.getStatus();
    validateAlterTable(tbl, renamedTable, alterTableE.getOldTable(), alterTableE.getNewTable());

    PreAlterTableEvent preAlterTableE = (PreAlterTableEvent) preNotifyList.get(listSize-1);
    validateAlterTable(tbl, renamedTable, preAlterTableE.getOldTable(),
        preAlterTableE.getNewTable());

    //change the table name back
    driver.run(String.format("alter table %s rename to %s", renamed, tblName));
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    driver.run(String.format("alter table %s ADD COLUMNS (c int)", tblName));
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    Table altTable = msc.getTable(dbName, tblName);

    alterTableE = (AlterTableEvent) notifyList.get(listSize-1);
    assert alterTableE.getStatus();
    validateAlterTableColumns(tbl, altTable, alterTableE.getOldTable(), alterTableE.getNewTable());

    preAlterTableE = (PreAlterTableEvent) preNotifyList.get(listSize-1);
    validateAlterTableColumns(tbl, altTable, preAlterTableE.getOldTable(),
        preAlterTableE.getNewTable());

    Map<String,String> kvs = new HashMap<String, String>(1);
    kvs.put("b", "2011");
    msc.markPartitionForEvent("hive2038", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    LoadPartitionDoneEvent partMarkEvent = (LoadPartitionDoneEvent)notifyList.get(listSize - 1);
    assert partMarkEvent.getStatus();
    validateLoadPartitionDone("tmptbl", kvs, partMarkEvent.getTable().getTableName(),
        partMarkEvent.getPartitionName());

    PreLoadPartitionDoneEvent prePartMarkEvent =
        (PreLoadPartitionDoneEvent)preNotifyList.get(listSize - 1);
    validateLoadPartitionDone("tmptbl", kvs, prePartMarkEvent.getTableName(),
        prePartMarkEvent.getPartitionName());

    driver.run(String.format("alter table %s drop partition (b='2011')", tblName));
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    DropPartitionEvent dropPart = (DropPartitionEvent)notifyList.get(listSize - 1);
    assert dropPart.getStatus();
    validateDropPartition(part, dropPart.getPartition());
    validateTableInDropPartition(tbl, dropPart.getTable());

    PreDropPartitionEvent preDropPart = (PreDropPartitionEvent)preNotifyList.get(listSize - 1);
    validateDropPartition(part, preDropPart.getPartition());
    validateTableInDropPartition(tbl, preDropPart.getTable());

    driver.run("drop table " + tblName);
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    DropTableEvent dropTbl = (DropTableEvent)notifyList.get(listSize-1);
    assert dropTbl.getStatus();
    validateDropTable(tbl, dropTbl.getTable());

    PreDropTableEvent preDropTbl = (PreDropTableEvent)preNotifyList.get(listSize-1);
    validateDropTable(tbl, preDropTbl.getTable());

    driver.run("drop database " + dbName);
    listSize++;
    assertEquals(notifyList.size(), listSize);
    assertEquals(preNotifyList.size(), listSize);

    DropDatabaseEvent dropDB = (DropDatabaseEvent)notifyList.get(listSize-1);
    assert dropDB.getStatus();
    validateDropDb(db, dropDB.getDatabase());

    PreDropDatabaseEvent preDropDB = (PreDropDatabaseEvent)preNotifyList.get(listSize-1);
    assert dropDB.getStatus();
    validateDropDb(db, preDropDB.getDatabase());
  }

}
TOP

Related Classes of org.apache.hadoop.hive.metastore.TestMetaStoreEventListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.