Package org.apache.bookkeeper.client

Source Code of org.apache.bookkeeper.client.LedgerHandle$RetCounter

package org.apache.bookkeeper.client;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.TreeMap;

import org.apache.bookkeeper.client.BKDefs;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieHandle;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.LedgerManagementProcessor.CloseLedgerOp;
import org.apache.bookkeeper.client.QuorumEngine.Operation;
import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
import org.apache.bookkeeper.client.QuorumEngine.Operation.StopOp;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;

/**
* Ledger handle on the client side. Contains ledger metadata
* used to access it. This api exposes the read and write
* to a ledger and also exposes a streaming api for the ledger.
*/
public class LedgerHandle implements ReadCallback, AddCallback {
    /**
     * the call stack looks like --
     * ledgerhandle->write->bookeeper->quorumengine->bookiehandle
     * ->bookieclient
     */
   static Logger LOG = Logger.getLogger(LedgerHandle.class);
   
    public enum QMode {VERIFIABLE, GENERIC, FREEFORM};
   
   
    private long ledger;
    private volatile long last;
    private volatile long lastAddConfirmed = 0;
    private HashMap<Integer, Long> lastRecvCorrectly;
    private volatile ArrayList<BookieHandle> bookies;
    private ArrayList<InetSocketAddress> bookieAddrList;
    private TreeMap<Long, ArrayList<BookieHandle> > bookieConfigMap;
    private long[] entryChange;
    private BookKeeper bk;
    private QuorumEngine qe;
    private int qSize;
    private QMode qMode = QMode.VERIFIABLE;
    private int lMode;

    private int threshold;
    private String digestAlg = "SHA1";
   
    private byte[] macKey;
    private byte[] ledgerKey;
    private byte[] passwd;
   
    /**
     * @param bk the bookkeeper handle
     * @param ledger the id for this ledger
     * @param last the last id written
     * @param passwd the passwd to encode
     * the entries
     * @throws InterruptedException
     */
    LedgerHandle(BookKeeper bk,
            long ledger,
            long last,
            byte[] passwd) throws InterruptedException {
        this.bk = bk;
        this.ledger = ledger;
        this.last = last;
        this.bookies = new ArrayList<BookieHandle>();
        this.lastRecvCorrectly = new HashMap<Integer, Long>();
        this.passwd = passwd;
        genLedgerKey(passwd);
        genMacKey(passwd);
        this.qSize = (bookies.size() + 1)/2;
        this.qe = new QuorumEngine(this);
    }
   
    /**
     * @param bk the bookkeeper handle
     * @param ledger the id for this ledger
     * @param last the last entree written
     * @param qSize the queuing size
     * for this ledger
     * @param mode the quueuing mode
     * for this ledger
     * @param passwd the passwd to encode
     * @throws InterruptedException
     */
    LedgerHandle(BookKeeper bk,
            long ledger,
            long last,
            int qSize,
            QMode mode,
            byte[] passwd) throws InterruptedException {
        this.bk = bk;
        this.ledger = ledger;
        this.last = last;
        this.bookies = new ArrayList<BookieHandle>();
        this.lastRecvCorrectly = new HashMap<Integer, Long>();


        this.qSize = qSize;
        this.qMode = mode;
        this.passwd = passwd;
        genLedgerKey(passwd);
        genMacKey(passwd);
        this.qe = new QuorumEngine(this);
    }
       
    /**
     *
     * @param bk the bookkeeper handle
     * @param ledger the id for this ledger
     * @param last the last entree written
     * @param qSize the queuing size
     * for this ledger
     * @param passwd the passwd to encode
     * @throws InterruptedException
     */
    LedgerHandle(BookKeeper bk,
            long ledger,
            long last,
            int qSize,
            byte[] passwd) throws InterruptedException {
        this.bk = bk;
        this.ledger = ledger;
        this.last = last;
        this.bookies = new ArrayList<BookieHandle>();
        this.lastRecvCorrectly = new HashMap<Integer, Long>();


        this.qSize = qSize;
        this.passwd = passwd;
        genLedgerKey(passwd);
        genMacKey(passwd);
        this.qe = new QuorumEngine(this);
    }
   
    private void setBookies(ArrayList<InetSocketAddress> bookies)
    throws InterruptedException {
      try{
        for(InetSocketAddress a : bookies){
          LOG.debug("Opening bookieHandle: " + a);
           
          //BookieHandle bh = new BookieHandle(this, a);
          this.bookies.add(bk.getBookieHandle(this, a));
        }
      } catch(ConnectException e){
        LOG.error(e);
        InetSocketAddress addr = bk.getNewBookie(bookies);
        if(addr != null){
            bookies.add(addr);
        }
      } catch(IOException e) {
        LOG.error(e);
      }
    }
   
    /**
     * set the quorum engine
     * @param qe the quorum engine
     */
    void setQuorumEngine(QuorumEngine qe) {
        this.qe = qe;
    }
   
    /** get the quorum engine
     * @return return the quorum engine
     */
    QuorumEngine getQuorumEngine() {
        return this.qe;
    }
   
    /**
     * Create bookie handle and add it to the list
     *
     * @param addr  socket address
     */
    int addBookieForWriting(InetSocketAddress addr)
    throws IOException {
        LOG.debug("Bookie address: " + addr);
        lMode = BKDefs.WRITE;
        //BookieHandle bh = new BookieHandle(this, addr);
        this.bookies.add(bk.getBookieHandle(this, addr));
        if(bookies.size() > qSize) setThreshold();
        return (this.bookies.size() - 1);
    }
   
    /**
     * Create bookie handle and add it to the list
     *
     * @param addr  socket address
     */
    int addBookieForReading(InetSocketAddress addr)
    throws IOException {
        LOG.debug("Bookie address: " + addr);
        lMode = BKDefs.READ;
        //BookieHandle bh = new BookieHandle(this, addr);
        try{
            this.bookies.add(bk.getBookieHandle(this, addr));
        } catch (IOException e){
            LOG.info("Inserting a decoy bookie handle");
            this.bookies.add(new BookieHandle(addr, false));
        }
        if(bookies.size() > qSize) setThreshold();
        return (this.bookies.size() - 1);
    }

   
    private void setThreshold() {
        switch(qMode){
        case GENERIC:
            threshold = bookies.size() - qSize/2;
            break;
        case VERIFIABLE:
            threshold = bookies.size() - qSize + 1;
            break;
        default:
            threshold = bookies.size();
        }
       
    }
   
    public int getThreshold() {
        return threshold;
    }
   
   
    /**
     * Writes to BookKeeper changes to the ensemble.
     *        
     * @param addr  Address of faulty bookie
     * @param entry Last entry written before change of ensemble.
     */
   
    void changeEnsemble(long entry){
        String path = BKDefs.prefix +
        bk.getZKStringId(getId())
        BKDefs.quorumEvolution + "/" +
        String.format("%010d", entry);
       
        LOG.info("Report failure: " + String.format("%010d", entry));
        try{
            if(bk.getZooKeeper().exists(BKDefs.prefix +
                    bk.getZKStringId(getId())
                    BKDefs.quorumEvolution, false) == null)
                bk.getZooKeeper().create(BKDefs.prefix + bk.getZKStringId(getId()) +
                        BKDefs.quorumEvolution, new byte[0], Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT);
       
            boolean first = true;
            String addresses = "";
            for(BookieHandle bh : bookies){
                if(first){
                    addresses = bh.addr.toString();
                    first = false;
                }
                else
                    addresses = addresses + " " + bh.addr.toString();
            }
           
            bk.getZooKeeper() .create(path, addresses.getBytes(),
                    Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch(Exception e){
            LOG.error("Could not write to ZooKeeper: " + path + ", " + e);
        }
    }
   
    /**
     * Replace bookie in the case of a failure
     */
    void replaceBookie(int index)
    throws BKException {
        InetSocketAddress addr = null;
        try{
            addr = bk.getNewBookie(bookieAddrList);
        } catch(InterruptedException e){
            LOG.error(e);
        }
       
        if(addr == null){
            throw BKException.create(Code.NoBookieAvailableException);
        } else {          
            try{
                //BookieHandle bh = new BookieHandle(this, addr);
               
                /*
                 * TODO: Read from current bookies, and write to this one
                 */
               
                /*
                 * If successful in writing to new bookie, add it to the set
                 */
                this.bookies.set(index, bk.getBookieHandle(this, addr));
            } catch(ConnectException e){
                bk.blackListBookie(addr);
                LOG.error(e);
            } catch(IOException e) {
                bk.blackListBookie(addr);
                LOG.error(e);
            }
        }
    }
   
    /**
     * This method is used when BK cannot find a bookie
     * to replace the current faulty one. In such cases,
     * we simply remove the bookie.
     *
     *
     * @param BookieHandle
     */
    synchronized void removeBookie(BookieHandle bh){
       if(lMode == BKDefs.WRITE){
           LOG.info("Removing bookie: " + bh.addr);
           int index = bookies.indexOf(bh);
           if(index >= 0){
               Long tmpLastRecv = lastRecvCorrectly.get(index);
               bookies.remove(index);
       
               if(tmpLastRecv == null)
                   changeEnsemble(0);
               else
                   changeEnsemble(tmpLastRecv);
           }
       }
    }
   
   
    /**
     * Returns the ledger identifier
     * @return long
     */
    public long getId(){
        return ledger;
    }
   
    /**
     * Returns the last entry identifier submitted
     * @return long
     */
    public long getLast(){
        return last;  
    }
   
    /**
     * Returns the last entry identifier submitted and increments it.
     * @return long
     */
    long incLast(){
        return last++;
    }
   
    /**
     * Sets the last entry identifier submitted.
     *
     * @param   last    last entry
     * @return  long    returns the value just set
     */
    long setLast(long last){
        this.last = last;
        return this.last;
    }
   
    /**
     * Sets the value of the last add confirmed. This is used
     * when adding new entries, since we use this value as a hint
     * to recover from failures of the client.
     */
    void setAddConfirmed(long entryId){
        if(entryId > lastAddConfirmed)
            lastAddConfirmed = entryId;
    }
   
    long getAddConfirmed(){
        return lastAddConfirmed;
    }
   
    void setLastRecvCorrectly(int sId, long entry){
        //LOG.info("Setting last received correctly: " + entry);
        lastRecvCorrectly.put(sId, entry);
    }
   
    /**
     * Returns the list of bookies
     * @return ArrayList<BookieHandle>
     */
    ArrayList<BookieHandle> getBookies(){
        return bookies;
    }
   
    /**
     * For reads, there might be multiple operations.
     *
     * @param entry
     * @return ArrayList<BookieHandle>  returns list of bookies
     */
    ArrayList<BookieHandle> getBookies(long entry){
        return getConfig(entry);
    }
   
    /**
     * Returns the bookie handle corresponding to the addresses in the input.
     *
     * @param addr
     * @return
     */
    BookieHandle getBookieHandleDup(InetSocketAddress addr){
        for(BookieHandle bh : bookies){
            if(bh.addr.equals(addr))
                return bh;
        }
       
        return null;
    }
   
    /**
     * Sets a new bookie configuration corresponding to a failure during
     * writes to the ledger. We have one configuration for every failure.
     *
     * @param entry
     * @param list
     */
   
    void setNewBookieConfig(long entry, ArrayList<BookieHandle> list){
        if(bookieConfigMap == null)
            bookieConfigMap = new TreeMap<Long, ArrayList<BookieHandle> >();
       
        /*
         * If initial config is not in the list, we include it.
         */
        if(!bookieConfigMap.containsKey(new Long(0))){
            bookieConfigMap.put(new Long(0), bookies);
        }
       
        LOG.info("Adding new entry: " + entry + ", " + bookies.size() + ", " + list.size());
        bookieConfigMap.put(entry, list);
    }
   
    /**
     * Once we read all changes to the bookie configuration, we
     * have to call this method to generate an array that we use
     * to determine the bookie configuration for an entry.
     *
     * Note that this array is a performance optimization and
     * it is not necessary for correctness. We could just use
     * bookieConfigMap but it would be slower.
     */
   
    void prepareEntryChange(){
        entryChange = new long[bookieConfigMap.size()];
   
        int counter = 0;
        for(Long l : bookieConfigMap.keySet()){
            entryChange[counter++] = l;
        }
    }
   
    /**
     * Return the quorum size. By default, the size of a quorum is (n+1)/2,
     * where n is the size of the set of bookies.
     * @return int
     */
    int getQuorumSize(){
        return qSize;  
    }
   
   
    /**
     *  Returns the config corresponding to the entry
     * 
     * @param entry
     * @return
     */
    private ArrayList<BookieHandle> getConfig(long entry){
        if(bookieConfigMap == null)
            return bookies;
       
        int index = Arrays.binarySearch(entryChange, entry);
       
        /*
         * If not on the map, binarySearch returns a negative value
         */
        int before = index;
        index = index >= 0? index : ((-1) - index);

        if(index == 0){
            if((entry % 10) == 0){
                LOG.info("Index: " + index + ", " + before + ", " + entry + ", " + bookieConfigMap.get(entryChange[index]).size());
            }
            return bookieConfigMap.get(entryChange[index]);
        } else{
            //LOG.warn("IndexDiff " + entry);
            return bookieConfigMap.get(entryChange[index - 1]);
        }
    }
   
    /**
     * Returns the quorum mode for this ledger: Verifiable or Generic
     */
    QMode getQMode(){
        return qMode;  
    }
   
    /**
     * Sets message digest algorithm.
     */
   
    void setDigestAlg(String alg){
        this.digestAlg = alg;
    }
   
    /**
     * Get message digest algorithm.
     */
   
    String getDigestAlg(){
        return digestAlg;
    }
   
    /**
     * Generates and stores Ledger key.
     *
     * @param passwd
     */
   
    private void genLedgerKey(byte[] passwd){
        try{
            MessageDigest digest = MessageDigest.getInstance("SHA");
            String pad = "ledger";
           
            byte[] toProcess = new byte[passwd.length + pad.length()];
            System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
            System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
       
            digest.update(toProcess);
            this.ledgerKey = digest.digest();
        } catch(NoSuchAlgorithmException e){
            this.passwd = passwd;
            LOG.error("Storing password as plain text because secure hash implementation does not exist");
        }
    }
   
    /**
     * Generates and stores Mac key.
     *
     * @param passwd
     */
   
    private void genMacKey(byte[] passwd){
        try{
            MessageDigest digest = MessageDigest.getInstance("SHA");
            String pad = "mac";
           
            byte[] toProcess = new byte[passwd.length + pad.length()];
            System.arraycopy(pad.getBytes(), 0, toProcess, 0, pad.length());
            System.arraycopy(passwd, 0, toProcess, pad.length(), passwd.length);
       
            digest.update(toProcess);
            this.macKey = digest.digest();
        } catch(NoSuchAlgorithmException e){
            this.passwd = passwd;
            LOG.error("Storing password as plain text because secure hash implementation does not exist");
        }
    }
   
    /**
     * Returns password in plain text
     */
    byte[] getPasswd(){
      return passwd;
    }
   
   
    /**
     * Returns MAC key
     *
     * @return byte[]
     */
    byte[] getMacKey(){
       return macKey;
    }
  
    /**
     * Returns Ledger key
     *
     * @return byte[]
     */
    byte[] getLedgerKey(){
       return ledgerKey;
    }
   
    void closeUp(){
        ledger = -1;
        last = -1;
        bk.haltBookieHandles(this, bookies);
    }
   
    /**
     * Close ledger.
     *
     */
    public void close()
    throws KeeperException, InterruptedException, BKException {
        //Set data on zookeeper
        ByteBuffer last = ByteBuffer.allocate(8);
        last.putLong(lastAddConfirmed);
        LOG.info("Last saved on ZK is: " + lastAddConfirmed);
        String closePath = BKDefs.prefix + bk.getZKStringId(getId()) + BKDefs.close;
        if(bk.getZooKeeper().exists(closePath, false) == null){
           bk.getZooKeeper().create(closePath,
                   last.array(),
                   Ids.OPEN_ACL_UNSAFE,
                   CreateMode.PERSISTENT);
        }
       
        closeUp();
        StopOp sOp = new StopOp();
        qe.sendOp(sOp);
        LOG.info("##### CB worker queue size: " + qe.cbWorker.pendingOps.size());
    }
   
    /**
     * Asynchronous close
     *
     * @param cb    callback implementation
     * @param ctx   control object
     * @throws InterruptedException
     */
    public void asyncClose(CloseCallback cb, Object ctx)
    throws InterruptedException {
        CloseLedgerOp op = new CloseLedgerOp(this, cb, ctx);
        LedgerManagementProcessor lmp = bk.getMngProcessor();
        lmp.addOp(op)
    }
      
    /**
     * Read a sequence of entries asynchronously.
     *
     * @param firstEntry    id of first entry of sequence
     * @param lastEntry     id of last entry of sequence
     * @param cb    object implementing read callback interface
     * @param ctx   control object
     */
    public void asyncReadEntries(long firstEntry,
            long lastEntry, ReadCallback cb, Object ctx)
    throws BKException, InterruptedException {
        // Little sanity check
        if((firstEntry > getLast()) || (firstEntry > lastEntry))
            throw BKException.create(Code.ReadException);
       
        Operation r = new ReadOp(this, firstEntry, lastEntry, cb, ctx);
        qe.sendOp(r);
        //qeMap.get(lh.getId()).put(r);
    }
   
   
    /**
     * Read a sequence of entries synchronously.
     *
     * @param firstEntry    id of first entry of sequence
     * @param lastEntry     id of last entry of sequence
     *
     */
    public LedgerSequence readEntries(long firstEntry, long lastEntry)
    throws InterruptedException, BKException {
        // Little sanity check
        if((firstEntry > getLast()) || (firstEntry > lastEntry))
            throw BKException.create(Code.ReadException);
       
        RetCounter counter = new RetCounter();
        counter.inc();
    
        Operation r = new ReadOp(this, firstEntry, lastEntry, this, counter);
        qe.sendOp(r);
       
        LOG.debug("Going to wait for read entries: " + counter.i);
        counter.block(0);
        LOG.debug("Done with waiting: " + counter.i + ", " + firstEntry);
       
        if(counter.getSequence() == null){
            LOG.error("Failed to read entries: " + firstEntry + ", " + lastEntry);
            throw BKException.create(Code.ReadException);
        }
        return counter.getSequence();
    }
  
    /**
     * Add entry asynchronously to an open ledger.
     *
     * @param data  array of bytes to be written
     * @param cb    object implementing callbackinterface
     * @param ctx   some control object
     */
    public void asyncAddEntry(byte[] data, AddCallback cb, Object ctx)
    throws InterruptedException, BKException {
        AddOp r = new AddOp(this, data, cb, ctx);
        qe.sendOp(r);
    }
   
   
    /**
     * Add entry synchronously to an open ledger.
     *
     * @param   data byte[]
     */
   
    public long addEntry(byte[] data)
    throws InterruptedException, BKException{
        LOG.debug("Adding entry " + data);
        RetCounter counter = new RetCounter();
        counter.inc();
       
        Operation r = new AddOp(this, data, this, counter);
        qe.sendOp(r);  
        //qeMap.get(lh.getId()).put(r);
        counter.block(0);
        return counter.getrc();
    }
   
   
    /**
     * Implementation of callback interface for synchronous read method.
     *
     * @param rc    return code
     * @param leder ledger identifier
     * @param seq   sequence of entries
     * @param ctx   control object
     */
    public void readComplete(int rc,
            LedgerHandle lh,
            LedgerSequence seq, 
            Object ctx){       
       
        RetCounter counter = (RetCounter) ctx;
        counter.setSequence(seq);
        LOG.debug("Read complete: " + seq.size() + ", " + counter.i);
        counter.dec();
    }
   
    /**
     * Implementation of callback interface for synchronous read method.
     *
     * @param rc    return code
     * @param leder ledger identifier
     * @param entry entry identifier
     * @param ctx   control object
     */
    public void addComplete(int rc,
            LedgerHandle lh,
            long entry,
            Object ctx){         
        RetCounter counter = (RetCounter) ctx;
       
        counter.setrc(rc);
        counter.dec();
    }
   
   
   
    /**
     * Implements objects to help with the synchronization of asynchronous calls
     *
     */
   
    private static class RetCounter {
        int i;
        int rc;
        int total;
        LedgerSequence seq = null;
       
        synchronized void inc() {
            i++;
            total++;
        }
        synchronized void dec() {
            i--;
            notifyAll();
        }
        synchronized void block(int limit) throws InterruptedException {
            while(i > limit) {
                int prev = i;
                wait(15000);
                if(i == prev){
                    break;
                }
            }
        }
        synchronized int total() {
            return total;
        }
       
        void setrc(int rc){
            this.rc = rc;
        }
       
        int getrc(){
            return rc;
        }
       
        void setSequence(LedgerSequence seq){
            this.seq = seq;
        }
       
        LedgerSequence getSequence(){
            return seq;
        }
    }
}
TOP

Related Classes of org.apache.bookkeeper.client.LedgerHandle$RetCounter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.