Package org.kiji.schema.impl.hbase

Source Code of org.kiji.schema.impl.hbase.HBaseTableLayoutUpdater$UpdaterUsersUpdateHandler

/**
* (c) Copyright 2013 WibiData, Inc.
*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kiji.schema.impl.hbase;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;

import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.kiji.schema.InternalKijiError;
import org.kiji.schema.KijiMetaTable;
import org.kiji.schema.KijiURI;
import org.kiji.schema.RuntimeInterruptedException;
import org.kiji.schema.avro.TableLayoutDesc;
import org.kiji.schema.layout.InvalidLayoutException;
import org.kiji.schema.layout.KijiTableLayout;
import org.kiji.schema.layout.impl.TableLayoutUpdateValidator;
import org.kiji.schema.util.Lock;
import org.kiji.schema.util.Time;
import org.kiji.schema.zookeeper.TableLayoutTracker;
import org.kiji.schema.zookeeper.TableLayoutUpdateHandler;
import org.kiji.schema.zookeeper.UsersTracker;
import org.kiji.schema.zookeeper.UsersUpdateHandler;
import org.kiji.schema.zookeeper.ZooKeeperUtils;

/**
* Updates the layout of an HBase Kiji table.
*
* <p>
*   The parameters of the updater include a function to compute a layout update given the
*   current layout of the table. This function may be invoked several times during the update
*   process. For instance, it may be invoked first to pre-validate the update, and then a second
*   time after the table layout lock has been acquired, to re-validate the layout update.
* </p>
*/
public class HBaseTableLayoutUpdater {
  private static final Logger LOG = LoggerFactory.getLogger(HBaseTableLayoutUpdater.class);

  private final HBaseKiji mKiji;
  private final KijiURI mTableURI;
  private final CuratorFramework mZKClient;

  private final UpdaterUsersUpdateHandler mUsersUpdateHandler = new UpdaterUsersUpdateHandler();
  private final UpdaterLayoutUpdateHandler mLayoutUpdateHandler = new UpdaterLayoutUpdateHandler();

  /**  */
  private final Function<KijiTableLayout, TableLayoutDesc> mLayoutUpdate;

  /** New table layout, set after the layout update completed. */
  private KijiTableLayout mNewLayout = null;

  // -----------------------------------------------------------------------------------------------

  /** Handles update notifications of the users list of the table. */
  private final class UpdaterUsersUpdateHandler implements UsersUpdateHandler {
    /** Monitor for table users notifications. */
    private final Object mLock = new Object();

    /** Map: user ID -> layout ID. */
    private Multimap<String, String> mUserMap = null;

    /** {@inheritDoc} */
    @Override
    public void update(Multimap<String, String> userMap) {
      LOG.debug("Layout updater received user map update for table {}: {}.",
          mTableURI, userMap);
      synchronized (mLock) {
        mUserMap = userMap;
        mLock.notifyAll();
      }
    }

    /**
     * Waits for all users of the table to have a consistent view on the table layout.
     *
     * @return the layout ID as seen consistently by all users.
     */
    public String waitForConsistentView() {
      synchronized (mLock) {
        while (true) {
          if (mUserMap != null) {
            final Map<String, List<String>> mLayoutMap = Maps.newHashMap();
            for (Map.Entry<String, String> entry : mUserMap.entries()) {
              final String userId = entry.getKey();
              final String layoutId = entry.getValue();
              List<String> userIds = mLayoutMap.get(layoutId);
              if (null == userIds) {
                userIds = Lists.newArrayList();
                mLayoutMap.put(layoutId, userIds);
              }
              userIds.add(userId);
            }
            LOG.info("User map for table {}: {}", mTableURI, mLayoutMap);
            switch (mLayoutMap.size()) {
              case 0: return null;
              case 1: return mLayoutMap.keySet().iterator().next();
              default: break;
            }
          } else {
            LOG.debug("Waiting for table users notification.");
          }
          try {
            mLock.wait();
          } catch (InterruptedException ie) {
            throw new RuntimeInterruptedException(ie);
          }
        }
      }
    }
  }

  // -----------------------------------------------------------------------------------------------

  /** Handles update notifications of the table layout. */
  private final class UpdaterLayoutUpdateHandler implements TableLayoutUpdateHandler {
    /** Monitor for table layout notifications. */
    private final Object mLock = new Object();

    /** Current layout. */
    private String mCurrentLayoutId = null;

    /** {@inheritDoc} */
    @Override
    public void update(String layout) {
      synchronized (mLock) {
        mCurrentLayoutId = layout;
        LOG.debug("Layout updater received layout update for table {}: {}.",
            mTableURI, mCurrentLayoutId);
        mLock.notifyAll();
      }
    }

    /**
     * Reports the ID of the current table layout.
     *
     * @return the ID of the current table layout.
     */
    public String getCurrentLayoutId() {
      synchronized (mLock) {
        while (null == mCurrentLayoutId) {
          try {
            mLock.wait();
          } catch (InterruptedException ie) {
            throw new RuntimeInterruptedException(ie);
          }
        }
        return mCurrentLayoutId;
      }
    }

    /**
     * Waits for the current table layout to switch to the specified layout ID.
     *
     * @param layoutId ID of the layout to wait for.
     */
    public void waitForLayoutNotification(String layoutId) {
      synchronized (mLock) {
        while (!Objects.equal(getCurrentLayoutId(), layoutId)) {
          LOG.info("Waiting for layout notification with ID {}, current layout ID is {}.",
              layoutId, mCurrentLayoutId);
          try {
            mLock.wait();
          } catch (InterruptedException ie) {
            throw new RuntimeInterruptedException(ie);
          }
        }
      }
    }
  }

  // -----------------------------------------------------------------------------------------------

  /**
   * Initializes a new layout updater for the specified table and with the specified layout update.
   *
   * @param kiji Opened Kiji instance the table belongs to.
   * @param tableURI Update the layout of this table.
   * @param layoutUpdate Function to generate the layout update descriptor based on the current
   *     layout of the table.
   * @throws IOException on I/O error.
   * @throws KeeperException on ZooKeeper error.
   */
  public HBaseTableLayoutUpdater(
      final HBaseKiji kiji,
      final KijiURI tableURI,
      final Function<KijiTableLayout, TableLayoutDesc> layoutUpdate)
      throws IOException, KeeperException {
    mKiji = kiji;
    mKiji.retain();
    mTableURI = tableURI;
    mZKClient = mKiji.getZKClient();
    mLayoutUpdate = layoutUpdate;
  }

  /**
   * Initializes a new layout updater for the specified table and with the specified layout update.
   *
   * @param kiji Opened Kiji instance the table belongs to.
   * @param tableURI Update the layout of this table.
   * @param layoutUpdate Static layout update descriptor to update the table with.
   * @throws IOException on I/O error.
   * @throws KeeperException on ZooKeeper error.
   */
  public HBaseTableLayoutUpdater(
      final HBaseKiji kiji,
      final KijiURI tableURI,
      final TableLayoutDesc layoutUpdate)
      throws IOException, KeeperException {
    this(kiji, tableURI, new Function<KijiTableLayout, TableLayoutDesc>() {
      /** {@inheritDoc} */
      @Override
      public TableLayoutDesc apply(KijiTableLayout input) {
        return layoutUpdate;
      }
    });
  }

  /**
   * Releases the resources maintained by this updater.
   *
   * @throws IOException on I/O error.
   */
  public void close() throws IOException {
    mKiji.release();
  }

  /**
   * Performs the specified table layout update.
   *
   * @throws IOException on I/O error.
   * @throws KeeperException on ZooKeeper error.
   */
  public void update() throws IOException, KeeperException {
    final KijiMetaTable metaTable = mKiji.getMetaTable();

    final Lock lock = ZooKeeperUtils.newTableLayoutLock(mZKClient, mTableURI);
    lock.lock();
    try {
      final NavigableMap<Long, KijiTableLayout> layoutMap =
          metaTable.getTimedTableLayoutVersions(mTableURI.getTable(), Integer.MAX_VALUE);

      final KijiTableLayout currentLayout = layoutMap.lastEntry().getValue();
      final TableLayoutDesc update = mLayoutUpdate.apply(currentLayout);
      if (!Objects.equal(currentLayout.getDesc().getLayoutId(), update.getReferenceLayout())) {
        throw new InvalidLayoutException(String.format(
            "Reference layout ID %s does not match current layout ID %s.",
            update.getReferenceLayout(), currentLayout.getDesc().getLayoutId()));
      }

      final TableLayoutUpdateValidator validator = new TableLayoutUpdateValidator(mKiji);
      validator.validate(
          currentLayout,
          KijiTableLayout.createUpdatedLayout(update , currentLayout));

      final TableLayoutTracker layoutTracker =
          new TableLayoutTracker(mZKClient, mTableURI, mLayoutUpdateHandler);
      try {
        layoutTracker.start();
        final UsersTracker usersTracker =
            ZooKeeperUtils
                .newTableUsersTracker(mZKClient, mTableURI)
                .registerUpdateHandler(mUsersUpdateHandler);
        try {
          usersTracker.start();
          final String currentLayoutId = mLayoutUpdateHandler.getCurrentLayoutId();
          LOG.info("Table {} has current layout ID {}.", mTableURI, currentLayoutId);
          if (!Objects.equal(currentLayoutId, currentLayout.getDesc().getLayoutId())) {
            throw new InternalKijiError(String.format(
                "Inconsistency between meta-table and ZooKeeper: "
                + "meta-table layout has ID %s while ZooKeeper has layout ID %s.",
                currentLayout.getDesc().getLayoutId(), currentLayoutId));
          }

          final String consistentLayoutId = waitForConsistentView();
          if ((consistentLayoutId != null) && !Objects.equal(consistentLayoutId, currentLayoutId)) {
            throw new InternalKijiError(String.format(
                "Consistent layout ID %s does not match current layout %s for table %s.",
                consistentLayoutId, currentLayout, mTableURI));
          }

          writeMetaTable(update);
          final TableLayoutDesc newLayoutDesc = mNewLayout.getDesc();
          writeZooKeeper(newLayoutDesc);

          mLayoutUpdateHandler.waitForLayoutNotification(newLayoutDesc.getLayoutId());

          // The following is not necessary:
          while (true) {
            final String newLayoutId = waitForConsistentView();
            if (newLayoutId == null) {
              LOG.info("Layout update complete for table {}: table has no users.", mTableURI);
              break;
            } else if (Objects.equal(newLayoutId, newLayoutDesc.getLayoutId())) {
              LOG.info("Layout update complete for table {}: all users switched to layout ID {}.",
                  mTableURI, newLayoutId);
              break;
            } else {
              LOG.info("Layout update in progress for table {}: users still using layout ID {}.",
                  mTableURI, newLayoutId);
              Time.sleep(1.0);
            }
          }

        } finally {
          usersTracker.close();
        }
      } finally {
        layoutTracker.close();
      }
    } finally {
      lock.unlock();
      lock.close();
    }
  }

  /**
   * Waits for all clients of the table to have a consistent view on the table layout.
   *
   * @return the layout ID being used consistently by all users, or null if no users.
   * @throws IOException on I/O error.
   */
  private String waitForConsistentView() throws IOException {
    return mUsersUpdateHandler.waitForConsistentView();
  }

  /**
   * Writes the new table layout to the meta-table.
   *
   * @param update Layout update to write to the meta-table.
   * @throws IOException on I/O error.
   */
  private void writeMetaTable(TableLayoutDesc update) throws IOException {
    LOG.info("Updating layout for table {} from layout ID {} to layout ID {} in meta-table.",
        mTableURI, update.getReferenceLayout(), update.getLayoutId());
    final String table = update.getName();
    mNewLayout = mKiji.getMetaTable().updateTableLayout(table, update);
  }

  /**
   * Writes the new layout to ZooKeeper.
   *
   * <p> This pushes a layout update to all table users. </p>
   *
   * @param update Layout update to push to ZooKeeper.
   *
   * @throws IOException on I/O error.
   * @throws KeeperException on ZooKeeper error.
   */
  private void writeZooKeeper(TableLayoutDesc update) throws IOException, KeeperException {
    LOG.info("Updating layout for table {} from layout ID {} to layout ID {} in ZooKeeper.",
        mTableURI, update.getReferenceLayout(), update.getLayoutId());
    ZooKeeperUtils.setTableLayout(mZKClient, mTableURI, update.getLayoutId());
  }

  /**
   * Returns the new layout, after it has been applied to the table.
   *
   * @return the new layout, after it has been applied to the table.
   *     Null before the update completes.
   */
  public KijiTableLayout getNewLayout() {
    return mNewLayout;
  }
}
TOP

Related Classes of org.kiji.schema.impl.hbase.HBaseTableLayoutUpdater$UpdaterUsersUpdateHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.