Package org.apache.bookkeeper.client

Source Code of org.apache.bookkeeper.client.LedgerRecoveryMonitor

package org.apache.bookkeeper.client;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/


import java.lang.Math;
import java.lang.InterruptedException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.TreeMap;

//import org.apache.bookkeeper.client.AsyncCallback.FailCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerSequence;
import org.apache.bookkeeper.client.LedgerHandle.QMode;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.ReadEntryCallback;
import org.apache.log4j.Logger;

import org.apache.zookeeper.KeeperException;

/**
* Implements the mechanism to recover a ledger that was not closed
* correctly. It reads entries from the ledger using the hint field
* until it finds the last entry written. It then writes to ZooKeeper.
*
*/

class LedgerRecoveryMonitor implements ReadEntryCallback {
    Logger LOG = Logger.getLogger(LedgerRecoveryMonitor.class);
   
    BookKeeper self;
    long lId;
    int qSize;
    QMode qMode;
    ArrayList<InetSocketAddress> bookies;
    ArrayList<BookieClient> clients;
    HashMap<Long, ArrayList<ByteBuffer> > votes;
    TreeMap<Long, Integer > hints;
    AtomicInteger counter;
   
    private int minimum;
   
    /**
     * Constructor simply initiates data structures
     *
     * @param self  Instance of BookKeeper
     * @param lId   Ledger identifier
     * @param qSize Quorum size
     * @param bookies   List of bookie addresses
     * @param qMode     Quorum mode
     */
    LedgerRecoveryMonitor(BookKeeper self,
            long lId,
            int qSize,
            ArrayList<InetSocketAddress> bookies,
            QMode qMode){
        this.self = self;
        this.lId = lId;
        this.qSize = qSize;
        this.qMode = qMode;
        this.bookies = bookies;
        this.clients = new ArrayList<BookieClient>();
        this.votes = new HashMap<Long, ArrayList<ByteBuffer> >();
        this.hints = new TreeMap<Long, Integer>();
        this.counter = new AtomicInteger(0);
       
        this.minimum = bookies.size();
        if(qMode == QMode.VERIFIABLE){
            this.minimum += 1 - qSize;
        } else if(qMode == QMode.GENERIC){
            this.minimum -= Math.floor(qSize/2);
        }
       
    }
   
   
    /**
     * Determines the last entry written to a ledger not closed properly
     * due to a client crash
     *
     * @param   passwd 
     */
    boolean recover(byte[] passwd) throws
    IOException, InterruptedException, BKException, KeeperException {
        /*
         * Create BookieClient objects and send a request to each one.
         */
       
        for(InetSocketAddress s : bookies){
            LOG.info(s);
            BookieClient client = new BookieClient(s, 3000);
            clients.add(client);
            client.readEntry(lId,
                    -1,
                    this,
                    null);
        }       
       
        /*
         * Wait until I have received enough responses
         */
        synchronized(counter){
            LOG.info("Counter: " + counter.get() + ", " + minimum + ", " + qMode);
            if(counter.get() < minimum){
                LOG.info("Waiting...");
                counter.wait(5000);
            }
        }
       
        /*
         * Obtain largest hint
         */
        LedgerHandle lh = new LedgerHandle(self, lId, 0, qSize, qMode, passwd);
        for(InetSocketAddress addr : bookies){
            lh.addBookieForReading(addr);
        }
       
        boolean notLegitimate = true;
        long readCounter = 0;
        while(notLegitimate){
            readCounter = getNextHint();
            if(readCounter > -1){
                lh.setLast(readCounter);
                boolean hasMore = true;
                while(hasMore){
                    hasMore = false;
                    LOG.debug("Recovering: " + lh.getLast());
                    LedgerSequence ls = lh.readEntries(lh.getLast(), lh.getLast());
                    LOG.debug("Received entry for: " + lh.getLast());
                   
                    byte[] le = ls.nextElement().getEntry();
                    if(le != null){
                        if(notLegitimate) notLegitimate = false;
                        lh.addEntry(le);
                        hasMore = true;
                    }
                }
            } else break;  
        }
       
        /*
         * Write counter as the last entry of ledger
         */
        if(!notLegitimate){
            lh.setAddConfirmed(readCounter);
            lh.close();
           
            return true;
        } else {
          lh.setLast(0);
          lh.close();
         
          return false;
        }
               
    }
   
    /**
     * Read callback implementation
     *
     * @param rc    return code
     * @param ledgerId  Ledger identifier
     * @param entryId   Entry identifier
     * @param bb        Data
     * @param ctx       Control object
     *
     */
    public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuffer bb, Object ctx){
        if(rc == 0){
            bb.rewind();
       
            /*
             * Collect new vote
             */
            if(!votes.containsKey(entryId)){           
                votes.put(entryId, new ArrayList<ByteBuffer>());
            }
            votes.get(entryId).add(bb);
        
            /*
             * Extract hint
             */
       
            bb.position(16);
            long hint = bb.getLong();
       
            LOG.info("Received a response: " + rc + ", " + entryId + ", " + hint);
       
            if(!hints.containsKey(hint)){
                hints.put(hint, 0);
            }
            hints.put(hint, hints.get(hint) + 1);
       
            synchronized(counter){
                if(counter.incrementAndGet() >= minimum);
                counter.notify();
            }
        } else {
            LOG.debug("rc != 0");
        }
       
    }
   
    /**
     * Returns one hint at a time. We add a new hint to
     * the "hints" TreeMap used in this method upon a read
     * callback. Such callbacks correspond to returned values from bookies upon a request
     * for the last entry written hint.
     *
     * @return long next hint
     */
    private long getNextHint(){
        if(hints.size() == 0) return -1;
       
        long hint = hints.lastKey();
        hints.remove(hint);
       
        return hint;
    }
   
}
TOP

Related Classes of org.apache.bookkeeper.client.LedgerRecoveryMonitor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.