Package org.apache.bookkeeper.client

Source Code of org.apache.bookkeeper.client.QuorumOpMonitor$PendingReadOp

package org.apache.bookkeeper.client;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/


import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.InvalidKeyException;
import javax.crypto.Mac;


import org.apache.bookkeeper.client.BookieHandle;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.QuorumEngine.Operation.AddOp;
import org.apache.bookkeeper.client.QuorumEngine.Operation.ReadOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubAddOp;
import org.apache.bookkeeper.client.QuorumEngine.SubOp.SubReadOp;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.ReadEntryCallback;
import org.apache.bookkeeper.proto.WriteCallback;
import org.apache.log4j.Logger;


/**
* Monitors reponses from bookies to requests of a client. It implements
* two interfaces of the proto package that correspond to callbacks from
* BookieClient objects.
*
*/
public class QuorumOpMonitor implements WriteCallback, ReadEntryCallback {
    static Logger LOG = Logger.getLogger(QuorumOpMonitor.class);
   
    LedgerHandle lh;
   
    static final int MAXRETRIES = 2;
   
   
    /**
     * Message disgest instance
     *
     */
    MessageDigest digest = null;
    int dLength;
   
    /**
     * Get digest instance if there is none.
     *
     */
    MessageDigest getDigestInstance(String alg)
    throws NoSuchAlgorithmException {
        if(digest == null){
            digest = MessageDigest.getInstance(alg);
        }
       
        return digest;
    }
   
    public static class PendingOp{
        //Operation op = null;
        HashSet<Integer> bookieIdSent;
        HashSet<Integer> bookieIdRecv;
        int retries = 0;
     
        PendingOp(){
            this.bookieIdSent = new HashSet<Integer>();
            this.bookieIdRecv = new HashSet<Integer>();
        }
       
    };
   
   
    /**
     * Objects of this type are used to keep track of the status of
     * a given read request.
     *
     */
   
    public static class PendingReadOp extends PendingOp{
        /*
         * Values for ongoing reads
         */

        ArrayList<ByteBuffer> proposedValues;
               
        PendingReadOp(LedgerHandle lh){
            this.proposedValues =
                new ArrayList<ByteBuffer>();
        }   
     
    }
   
    QuorumOpMonitor(LedgerHandle lh){
        this.lh = lh;
        try{
            this.dLength = getDigestInstance(lh.getDigestAlg()).getDigestLength();
        } catch(NoSuchAlgorithmException e){
            LOG.error("Problem with message digest: " + e);
            this.dLength = 0;
        }
    }
   
  
    /**
     * Callback method for write operations. There is one callback for
     * each write to a server.
     *
     */
   
    public void writeComplete(int rc, long ledgerId, long entryId, Object ctx){
        //PendingAddOp pOp;
        //synchronized(pendingAdds){
        //pOp = pendingAdds.get(entryId);
        //}
        SubAddOp sAdd = (SubAddOp) ctx;
        PendingOp pOp = sAdd.pOp;
        Integer sId = sAdd.bIndex;
       
        if(pOp == null){
            LOG.error("No such an entry ID: " + entryId + "(" + ledgerId + ")");
            return;
        }
       
        ArrayList<BookieHandle> list = lh.getBookies();
        if(rc == 0){
            // Everything went ok with this op
            synchronized(pOp){
                pOp.bookieIdRecv.add(sId);
                lh.setLastRecvCorrectly(sId, entryId);
                if(pOp.bookieIdRecv.size() >= lh.getQuorumSize()){
                    sAdd.op.setReady();    
                }
            }
        } else {
            //LOG.warn("Error sending write request: " + rc + " : " + ledgerId + ": " + lh.getBookies().size());
            /*
             * If ledger is closed already, then simply return
             */
            if(lh.getId() == -1){
                LOG.warn("Ledger identifier is not valid");
                return;
            }
           
            HashSet<Integer> ids;
             
            synchronized(pOp){
                pOp.bookieIdSent.add(sId);
                ids = pOp.bookieIdSent;               
                //Check if we tried all possible bookies already
                if(ids.size() == lh.getBookies().size()){
                    if(pOp.retries++ >= MAXRETRIES){
                        //Call back with error code
 
                        sAdd.op.setErrorCode(BKDefs.ENR);
                        sAdd.op.setReady();
                        return;
                    }
                   
                    ids.clear();
                }
                // Select another bookie that we haven't contacted yet
                try{
                    //LOG.info("Selecting another bookie " + entryId);
                    int bCounter;
                    if(sId >= (entryId % (lh.getBookies().size() + 1))){
                        bCounter = sId - (((int) entryId) % (lh.getBookies().size() + 1));
                    } else {
                        bCounter = (lh.getBookies().size() + 1) - (((int) entryId) % (lh.getBookies().size() + 1)) - sId;
                    }
                   
                    int tmpId = (((int) entryId) + lh.getQuorumSize()) % (lh.getBookies().size() + 1);
                    int newId = tmpId % lh.getBookies().size();
                    //LOG.info("Sending a new add operation to bookie: " + newId + ", " + lh.getBookies().get(newId).addr);
                   
                    BookieHandle bh = lh.getBookies().get(newId);
                   
                    //LOG.info("Got handle for " + newId);
                   
                    bh.sendAdd(lh, new SubAddOp(sAdd.op,
                            pOp,
                            newId,
                            this), entryId);
              
                    //LOG.info("Ended " + entryId + ", " + newId);
                } catch(IOException e){
                    LOG.error(e);
                } catch(BKException e){
                    LOG.error(e);
                }
            }
        }      
    }

   
    /**
     * Callback method for read operations. There is one callback for
     * each entry of a read request.
     *
     * TODO: We might want to change the way a client application specify
     * the quorum size. It is really loose now, and it allows an application
     * to set any quorum size the client wants.
     */
   
    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
        /*
         * Collect responses, and reply when there are sufficient
         * answers.
         */
        if(rc == 0){
            SubReadOp sRead = (SubReadOp) ctx;
            ReadOp rOp = (ReadOp) sRead.op;
            PendingReadOp pOp = sRead.pOp;
            if(pOp != null){
                HashSet<Integer> received = pOp.bookieIdRecv;
               
                boolean result = received.add(sRead.bIndex);
                int counter = -1;
                if(result){
                   
                    ByteBuffer voted = null;
                    ArrayList<ByteBuffer> list;
                    switch(lh.getQMode()){
                    case VERIFIABLE:
                        if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null)
                           try{
                                voted = voteVerifiable(bb);
                            } catch(NoSuchAlgorithmException e){
                                LOG.error("Problem with message digest: " + e);
                            } catch(BKException bke) {
                                LOG.error(bke.toString() + "( " + ledgerId + ", " + entryId + ", " + pOp.bookieIdRecv + ")");
                                countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
                            } catch(InvalidKeyException e){
                                LOG.error(e);
                            }
                            if(voted != null) {
                                if(voted.capacity() - dLength > 0){
                                    byte[] data = new byte[voted.capacity() - dLength - 24];
                                    voted.position(24);                                   
                                    voted.get(data, 0, data.length);
                                    //LOG.warn("Data length (" + entryId + "): " + data.length);
                                    counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
                                }
                            }
                              
                        break;
                    case GENERIC:
                        list = pOp.proposedValues;
                       
                        synchronized(list){
                            if(rOp.seq[(int) (entryId % (rOp.lastEntry - rOp.firstEntry + 1))] == null){
                                list.add(bb);
                                bb.position(24);
                                if(list.size() >= ((lh.getQuorumSize() + 1)/2)){
                                    voted = voteGeneric(list, (lh.getQuorumSize() + 1)/2);
                                }
                            }
                        }
                       
                                   
                        if(voted != null){
                            LOG.debug("Voted: " + voted.array());
                            byte[] data = new byte[voted.capacity() - 24];
                            voted.position(24);
                            voted.get(data, 0, data.length);
                            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, data), rOp);
                        }
                               
                               
                        break;
                    case FREEFORM:
                        list = pOp.proposedValues;
                        LOG.debug("List length before: " + list.size());
                        synchronized(list){
                            if(list.size() == lh.getQuorumSize()){
                                voted = voteFree(list);
                            }
                        }
                       
                        if(voted != null){
                            LOG.debug("Voted: " + voted.array());
                            byte[] data = new byte[voted.capacity() - 24];
                            voted.position(24);
                            voted.get(data, 0, data.length);
                            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, voted.array()), rOp);
                        }                     
                    }  
       
                    if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
                            !sRead.op.isReady()){
                        sRead.op.setReady();
                    }
           
                   
                    //long diff = rOp.lastEntry - rOp.firstEntry;
                    //LOG.debug("Counter: " + rOp.counter + ", " + diff);
                }
            }
        } else {
            /*
             * Have to count the number of negative responses
             */
            countNacks((ReadOp) ((SubReadOp) ctx).op, (SubReadOp) ctx, ledgerId, entryId);
           
        }
    }
   
   
    /**
     * Counts negative responses
     *
     * @param   rOp read operation
     * @param   sRead   specific read sub-operation
     */
   
    synchronized void countNacks(ReadOp rOp, SubReadOp sRead, long ledgerId, long entryId){
       
        if(!rOp.nacks.containsKey(entryId)){
            rOp.nacks.put(entryId, new AtomicInteger(0));
        }
       
        if(rOp.nacks.get(entryId).incrementAndGet() >= lh.getThreshold()){
            int counter = -1;
            //LOG.warn("Giving up on " + entryId + "(" + lh.getThreshold() + ")");
            counter = addNewEntry(new LedgerEntry(ledgerId, entryId, null), rOp);
           
            if((counter == (rOp.lastEntry - rOp.firstEntry + 1)) &&
                    !sRead.op.isReady()){
               
                sRead.op.setReady();
            }
        }
    }
   
    /**
     * Verify if the set of votes in the list can produce a correct answer
     * for verifiable data.
     *
     * @param list
     * @return
     */
   
   
    private ByteBuffer voteVerifiable(ByteBuffer bb)
    throws NoSuchAlgorithmException, InvalidKeyException, BKException{
        /*
         * Check if checksum matches
         */
       
        Mac mac = ((BookieClient) Thread.currentThread()).getMac("HmacSHA1", lh.getMacKey());
        int dlength = mac.getMacLength();
      
        if(bb.capacity() <= dlength){
            LOG.warn("Something wrong with this entry, length smaller than digest length");
            return null;
        }
       
        byte[] data = new byte[bb.capacity() - dlength];
        bb.get(data, 0, bb.capacity() - dlength);
       
       
        byte[] sig = new byte[dlength];
        bb.position(bb.capacity() - dlength);
        bb.get(sig, 0, dlength);

        bb.rewind();
       
        byte[] msgDigest = mac.doFinal(data);
        if(Arrays.equals(msgDigest, sig)){
            return bb;
        } else {
            LOG.error("Entry id: " + new String(msgDigest) + new String(sig));
            throw BKException.create(Code.DigestMatchException);
        }
       
    }
   
    /**
     * Verify if the set of votes in the list can produce a correct answer
     * for generic data.
     *
     * @param list
     * @return
     */
       
    private ByteBuffer voteGeneric(ArrayList<ByteBuffer> list, int threshold){ 
        HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
        for(ByteBuffer bb : list){ 
            if(!map.containsKey(bb)){
                map.put(bb, new Integer(0));
            } else LOG.debug("Not equal");
           
            if(bb != null)
                map.put(bb, map.get(bb) + 1);
           
            if(map.get(bb) >= threshold)
                return bb; 
        }
       
        return null;  
    }

    /**
     * Verify if the set of votes in the list can produce a correct answer
     * for generic data.
     *
     * @param list
     * @return
     */
       
    private ByteBuffer voteFree(ArrayList<ByteBuffer> list){ 
        HashMap<ByteBuffer, Integer> map = new HashMap<ByteBuffer, Integer>();
        for(ByteBuffer bb : list){
            bb.position(24);
            if(!map.containsKey(bb)){
                map.put(bb, Integer.valueOf(0));
            }
            map.put(bb, map.get(bb) + 1);
           
            if(map.get(bb) == list.size())
                return bb;
        }
       
        return null;  
    }
   
    /**
     * Add new entry to the list of received.
     *
     * @param le  ledger entry to add to list
     * @param op  read operation metadata
     */
   
    private int addNewEntry(LedgerEntry le, ReadOp op){
        long index = le.getEntryId() % (op.lastEntry - op.firstEntry + 1);
        if(op.seq[(int) index] == null){
            if(le.getEntry() == null) LOG.warn("Ledger entry is null (" + le.getEntryId() + ")");
            //if(le.getEntryId() % 100 == 0) LOG.info("New entry: " + le.getEntryId() + ")");
            op.seq[(int) index] = le;
           
            return op.counter.incrementAndGet();
        }
       
        return -1;
    }
}
TOP

Related Classes of org.apache.bookkeeper.client.QuorumOpMonitor$PendingReadOp

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.