Package org.apache.accumulo.master.tableOps

Source Code of org.apache.accumulo.master.tableOps.CompactRange

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master.tableOps;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException.NoNodeException;

class CompactionDriver extends MasterRepo {

  private static final long serialVersionUID = 1L;

  private long compactId;
  private String tableId;
  private byte[] startRow;
  private byte[] endRow;
  private String namespaceId;

  public CompactionDriver(long compactId, String tableId, byte[] startRow, byte[] endRow) {

    this.compactId = compactId;
    this.tableId = tableId;
    this.startRow = startRow;
    this.endRow = endRow;
    Instance inst = HdfsZooInstance.getInstance();
    this.namespaceId = Tables.getNamespaceId(inst, tableId);
  }

  @Override
  public long isReady(long tid, Master master) throws Exception {

    String zCancelID = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId
        + Constants.ZTABLE_COMPACT_CANCEL_ID;

    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();

    if (Long.parseLong(new String(zoo.getData(zCancelID, null))) >= compactId) {
      // compaction was canceled
      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER, "Compaction canceled");
    }

    MapCounter<TServerInstance> serversToFlush = new MapCounter<TServerInstance>();
    Connector conn = master.getConnector();

    Scanner scanner;

    if (tableId.equals(MetadataTable.ID)) {
      scanner = new IsolatedScanner(conn.createScanner(RootTable.NAME, Authorizations.EMPTY));
      scanner.setRange(MetadataSchema.TabletsSection.getRange());
    } else {
      scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
      Range range = new KeyExtent(new Text(tableId), null, startRow == null ? null : new Text(startRow)).toMetadataRange();
      scanner.setRange(range);
    }

    TabletsSection.ServerColumnFamily.COMPACT_COLUMN.fetch(scanner);
    TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);
    scanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);

    long t1 = System.currentTimeMillis();
    RowIterator ri = new RowIterator(scanner);

    int tabletsToWaitFor = 0;
    int tabletCount = 0;

    while (ri.hasNext()) {
      Iterator<Entry<Key,Value>> row = ri.next();
      long tabletCompactID = -1;

      TServerInstance server = null;

      Entry<Key,Value> entry = null;
      while (row.hasNext()) {
        entry = row.next();
        Key key = entry.getKey();

        if (TabletsSection.ServerColumnFamily.COMPACT_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier()))
          tabletCompactID = Long.parseLong(entry.getValue().toString());

        if (TabletsSection.CurrentLocationColumnFamily.NAME.equals(key.getColumnFamily()))
          server = new TServerInstance(entry.getValue(), key.getColumnQualifier());
      }

      if (tabletCompactID < compactId) {
        tabletsToWaitFor++;
        if (server != null)
          serversToFlush.increment(server, 1);
      }

      tabletCount++;

      Text tabletEndRow = new KeyExtent(entry.getKey().getRow(), (Text) null).getEndRow();
      if (tabletEndRow == null || (endRow != null && tabletEndRow.compareTo(new Text(endRow)) >= 0))
        break;
    }

    long scanTime = System.currentTimeMillis() - t1;

    Instance instance = master.getInstance();
    Tables.clearCache(instance);
    if (tabletCount == 0 && !Tables.exists(instance, tableId))
      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);

    if (serversToFlush.size() == 0 && Tables.getTableState(instance, tableId) == TableState.OFFLINE)
      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, null);

    if (tabletsToWaitFor == 0)
      return 0;

    for (TServerInstance tsi : serversToFlush.keySet()) {
      try {
        final TServerConnection server = master.getConnection(tsi);
        if (server != null)
          server.compact(master.getMasterLock(), tableId, startRow, endRow);
      } catch (TException ex) {
        Logger.getLogger(CompactionDriver.class).error(ex.toString());
      }
    }

    long sleepTime = 500;

    if (serversToFlush.size() > 0)
      sleepTime = Collections.max(serversToFlush.values()) * sleepTime; // make wait time depend on the server with the most to
                                                                        // compact

    sleepTime = Math.max(2 * scanTime, sleepTime);

    sleepTime = Math.min(sleepTime, 30000);

    return sleepTime;
  }

  @Override
  public Repo<Master> call(long tid, Master environment) throws Exception {
    CompactRange.removeIterators(tid, tableId);
    Utils.getReadLock(tableId, tid).unlock();
    Utils.getReadLock(namespaceId, tid).unlock();
    return null;
  }

  @Override
  public void undo(long tid, Master environment) throws Exception {

  }

}

public class CompactRange extends MasterRepo {

  private static final long serialVersionUID = 1L;
  private String tableId;
  private byte[] startRow;
  private byte[] endRow;
  private byte[] iterators;
  private String namespaceId;

  public static class CompactionIterators implements Writable {
    byte[] startRow;
    byte[] endRow;
    List<IteratorSetting> iterators;

    public CompactionIterators(byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) {
      this.startRow = startRow;
      this.endRow = endRow;
      this.iterators = iterators;
    }

    public CompactionIterators() {
      startRow = null;
      endRow = null;
      iterators = Collections.emptyList();
    }

    @Override
    public void write(DataOutput out) throws IOException {
      out.writeBoolean(startRow != null);
      if (startRow != null) {
        out.writeInt(startRow.length);
        out.write(startRow);
      }

      out.writeBoolean(endRow != null);
      if (endRow != null) {
        out.writeInt(endRow.length);
        out.write(endRow);
      }

      out.writeInt(iterators.size());
      for (IteratorSetting is : iterators) {
        is.write(out);
      }
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      if (in.readBoolean()) {
        startRow = new byte[in.readInt()];
        in.readFully(startRow);
      } else {
        startRow = null;
      }

      if (in.readBoolean()) {
        endRow = new byte[in.readInt()];
        in.readFully(endRow);
      } else {
        endRow = null;
      }

      int num = in.readInt();
      iterators = new ArrayList<IteratorSetting>(num);

      for (int i = 0; i < num; i++) {
        iterators.add(new IteratorSetting(in));
      }
    }

    public Text getEndRow() {
      if (endRow == null)
        return null;
      return new Text(endRow);
    }

    public Text getStartRow() {
      if (startRow == null)
        return null;
      return new Text(startRow);
    }

    public List<IteratorSetting> getIterators() {
      return iterators;
    }
  }

  public CompactRange(String tableId, byte[] startRow, byte[] endRow, List<IteratorSetting> iterators) throws ThriftTableOperationException {
    this.tableId = tableId;
    this.startRow = startRow.length == 0 ? null : startRow;
    this.endRow = endRow.length == 0 ? null : endRow;
    Instance inst = HdfsZooInstance.getInstance();
    this.namespaceId = Tables.getNamespaceId(inst, tableId);

    if (iterators.size() > 0) {
      this.iterators = WritableUtils.toByteArray(new CompactionIterators(this.startRow, this.endRow, iterators));
    } else {
      iterators = null;
    }

    if (this.startRow != null && this.endRow != null && new Text(startRow).compareTo(new Text(endRow)) >= 0)
      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.BAD_RANGE,
          "start row must be less than end row");
  }

  @Override
  public long isReady(long tid, Master environment) throws Exception {
    return Utils.reserveNamespace(namespaceId, tid, false, true, TableOperation.COMPACT)
        + Utils.reserveTable(tableId, tid, false, true, TableOperation.COMPACT);
  }

  @Override
  public Repo<Master> call(final long tid, Master environment) throws Exception {
    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;

    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();
    byte[] cid;
    try {
      cid = zoo.mutate(zTablePath, null, null, new Mutator() {
        @Override
        public byte[] mutate(byte[] currentValue) throws Exception {
          String cvs = new String(currentValue, Constants.UTF8);
          String[] tokens = cvs.split(",");
          long flushID = Long.parseLong(tokens[0]);
          flushID++;

          String txidString = String.format("%016x", tid);

          for (int i = 1; i < tokens.length; i++) {
            if (tokens[i].startsWith(txidString))
              continue; // skip self

            throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.OTHER,
                "Another compaction with iterators is running");
          }

          StringBuilder encodedIterators = new StringBuilder();

          if (iterators != null) {
            Hex hex = new Hex();
            encodedIterators.append(",");
            encodedIterators.append(txidString);
            encodedIterators.append("=");
            encodedIterators.append(new String(hex.encode(iterators), Constants.UTF8));
          }

          return (Long.toString(flushID) + encodedIterators).getBytes(Constants.UTF8);
        }
      });

      return new CompactionDriver(Long.parseLong(new String(cid, Constants.UTF8).split(",")[0]), tableId, startRow, endRow);
    } catch (NoNodeException nne) {
      throw new ThriftTableOperationException(tableId, null, TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
    }

  }

  static void removeIterators(final long txid, String tableId) throws Exception {
    String zTablePath = Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_COMPACT_ID;

    IZooReaderWriter zoo = ZooReaderWriter.getRetryingInstance();

    zoo.mutate(zTablePath, null, null, new Mutator() {
      @Override
      public byte[] mutate(byte[] currentValue) throws Exception {
        String cvs = new String(currentValue, Constants.UTF8);
        String[] tokens = cvs.split(",");
        long flushID = Long.parseLong(tokens[0]);

        String txidString = String.format("%016x", txid);

        StringBuilder encodedIterators = new StringBuilder();
        for (int i = 1; i < tokens.length; i++) {
          if (tokens[i].startsWith(txidString))
            continue;
          encodedIterators.append(",");
          encodedIterators.append(tokens[i]);
        }

        return (Long.toString(flushID) + encodedIterators).getBytes(Constants.UTF8);
      }
    });

  }

  @Override
  public void undo(long tid, Master environment) throws Exception {
    try {
      removeIterators(tid, tableId);
    } finally {
      Utils.unreserveNamespace(namespaceId, tid, false);
      Utils.unreserveTable(tableId, tid, false);
    }
  }

}
TOP

Related Classes of org.apache.accumulo.master.tableOps.CompactRange

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.