Package org.xtreemfs.babudb.replication.service

Source Code of org.xtreemfs.babudb.replication.service.ServiceLayer

/*
* Copyright (c) 2010 - 2011, Jan Stender, Bjoern Kolbeck, Mikael Hoegqvist,
*                     Felix Hupfeld, Felix Langner, Zuse Institute Berlin
*
* Licensed under the BSD License, see LICENSE file for details.
*
*/
package org.xtreemfs.babudb.replication.service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.CRC32;

import org.xtreemfs.babudb.BabuDBRequestResultImpl;
import org.xtreemfs.babudb.api.exception.BabuDBException;
import org.xtreemfs.babudb.api.exception.BabuDBException.ErrorCode;
import org.xtreemfs.babudb.config.ReplicationConfig;
import org.xtreemfs.babudb.log.LogEntry;
import org.xtreemfs.babudb.log.SyncListener;
import org.xtreemfs.babudb.lsmdb.LSN;
import org.xtreemfs.babudb.replication.BabuDBInterface;
import org.xtreemfs.babudb.replication.FleaseMessageReceiver;
import org.xtreemfs.babudb.replication.Layer;
import org.xtreemfs.babudb.replication.control.ControlLayerInterface;
import org.xtreemfs.babudb.replication.proxy.ProxyRequestHandler;
import org.xtreemfs.babudb.replication.service.accounting.LatestLSNUpdateListener;
import org.xtreemfs.babudb.replication.service.accounting.ParticipantsOverview;
import org.xtreemfs.babudb.replication.service.accounting.ParticipantsStates;
import org.xtreemfs.babudb.replication.service.accounting.ParticipantsStates.UnknownParticipantException;
import org.xtreemfs.babudb.replication.service.accounting.ReplicateResponse;
import org.xtreemfs.babudb.replication.service.clients.ClientInterface;
import org.xtreemfs.babudb.replication.service.clients.ClientResponseFuture;
import org.xtreemfs.babudb.replication.service.clients.ClientResponseFuture.ClientResponseAvailableListener;
import org.xtreemfs.babudb.replication.service.clients.ConditionClient;
import org.xtreemfs.babudb.replication.service.clients.MasterClient;
import org.xtreemfs.babudb.replication.service.clients.SlaveClient;
import org.xtreemfs.babudb.replication.transmission.TransmissionToServiceInterface;
import org.xtreemfs.foundation.LifeCycleListener;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.flease.Flease;
import org.xtreemfs.foundation.logging.Logging;

/**
* Contains the service logic for all necessary operations used by the
* replication.
*
* @author flangner
* @since 04/12/2010
*/
public class ServiceLayer extends Layer implements  ServiceToControlInterface, SlaveView {
   
    /** states of all the other participants */
    private final ParticipantsStates             participantsStates;
   
    /** continuously sends heartbeat-messages to all participants */
    private final HeartbeatThread                heartbeatThread;
   
    /** thread to process replication requests from a master server */
    private final ReplicationStage               replicationStage;
   
    /** the last acknowledged LSN of the last view */
    private final AtomicReference<LSN>           lastOnView = new AtomicReference<LSN>();
   
    /** interface for operation performed on BabuDB */
    private final BabuDBInterface                babuDB;
       
    /** interface to the underlying layer */
    private final TransmissionToServiceInterface transmissionInterface;
       
    /** thread safe checksum object for serialization of LogEntries to replicate */
    private final AtomicReference<CRC32>         checksum = new AtomicReference<CRC32>(new CRC32());
       
    private final ReplicationConfig              config;
   
    /**
     * @param config
     * @param babuDB
     * @param transLayer
     *
     * @throws IOException if {@link Flease} could not be initialized.
     */
    public ServiceLayer(ReplicationConfig config, BabuDBInterface babuDB,
            TransmissionToServiceInterface transLayer) throws IOException {
       
        this.config = config;
        this.transmissionInterface = transLayer;
        this.babuDB = babuDB;
       
        // ----------------------------------
        // initialize the participants states
        // ----------------------------------
        try {
            participantsStates = new ParticipantsStates(config.getSyncN(), config.getParticipants(),
                    transLayer);
        } catch (UnknownParticipantException e) {
            throw new IOException("The address of at least one participant could not have been " +
                "resolved, because: " + e.getMessage());
        }
       
        // ----------------------------------
        // initialize the heartbeat
        // ----------------------------------
        heartbeatThread = new HeartbeatThread(participantsStates, config.getPort());
       
        // ----------------------------------
        // initialize replication stage
        // ----------------------------------
        replicationStage = new ReplicationStage(
                config.getBabuDBConfig().getMaxQueueLength(), heartbeatThread, this,
                transLayer.getFileIOInterface(), babuDB, lastOnView, config.getChunkSize());
    }
   
    /**
     * @param <T>
     * @param receiver
     */
    public <T extends ControlLayerInterface & FleaseMessageReceiver> void init(T receiver) {
        assert (receiver != null);
       
        // ----------------------------------
        // initialize request logic for
        // handling BabuDB remote calls
        // ----------------------------------
        ProxyRequestHandler rqCtrl = new ProxyRequestHandler(babuDB,
                config.getBabuDBConfig().getMaxQueueLength());
        transmissionInterface.addRequestHandler(rqCtrl);
        receiver.registerProxyRequestControl(rqCtrl);
       
        // ----------------------------------
        // coin the dispatcher of the
        // underlying layer
        // ----------------------------------
        transmissionInterface.addRequestHandler(
                new ReplicationRequestHandler(participantsStates, receiver, babuDB,
                        replicationStage, lastOnView, config.getChunkSize(),
                        transmissionInterface.getFileIOInterface(),
                        config.getBabuDBConfig().getMaxQueueLength()));
       
        receiver.registerReplicationControl(replicationStage);
    }
   
/*
* Overridden methods
*/
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.service.ServiceToControlInterface#replicate(
     *          org.xtreemfs.babudb.log.LogEntry)
     */
    @Override
    public ReplicateResponse replicate(LogEntry le) {
       
        // update the LSN of the heartbeat
        heartbeatThread.updateLSN(le.getLSN());
       
        // replicate the entry at the slaves
        List<SlaveClient> slaves;
        try {
            slaves = participantsStates.getAvailableParticipants();
        } catch (Exception e) {
            return new ReplicateResponse(le, e);
       
        final ReplicateResponse result = new ReplicateResponse(le,
                slaves.size() - participantsStates.getLocalSyncN());
       
        // make the replicate call at the clients
        if (slaves.size() == 0) {
            Logging.logMessage(Logging.LEVEL_DEBUG, this,
                    "There are no slaves available anymore! " +
                    "BabuDB runs if it would be in non-replicated mode.");
        } else {
           
            // serialize the LogEntry
            ReusableBuffer payload = null;
            synchronized(checksum) {
                CRC32 csumAlgo = checksum.get();
                try {
                    payload = le.serialize(csumAlgo);
                } finally {
                    csumAlgo.reset();
                }
            }
           
            // send the LogEntry to the other servers
            for (final SlaveClient slave : slaves) {
                slave.replicate(le.getLSN(), payload.createViewBuffer()).registerListener(
                        new ClientResponseAvailableListener<Object>() {
               
                    @Override
                    public void responseAvailable(Object r) {
                       
                        // evaluate the response
                        participantsStates.requestFinished(slave);
                    }

                    @Override
                    public void requestFailed(Exception e) {
                       
                        participantsStates.markAsDead(slave);
                        result.decrementPermittedFailures();
                       
                        Logging.logMessage(Logging.LEVEL_INFO, this,
                                "'%s' was marked as dead, because %s",
                                slave.getDefaultServerAddress().toString(),
                                e.getMessage());
                        if (e.getMessage() == null) {
                            Logging.logError(Logging.LEVEL_DEBUG, this, e);
                        }
                    }
                });
            }
           
            BufferPool.free(payload);
        }   
        return result;
    }
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.service.ServiceToControlInterface#synchronize(
     *          org.xtreemfs.babudb.log.SyncListener, int)
     */
    @Override
    public void synchronize(SyncListener listener, int port) throws BabuDBException,
            InterruptedException, IOException {
           
        final int localSyncN = participantsStates.getLocalSyncN();
        List<ConditionClient> slaves = participantsStates.getConditionClients();
        if (localSyncN > 0) {
           
            // always get the latest available state of all servers
            LSN latest = null;
            Map<ClientInterface ,LSN> states = getStates(slaves, true);
       
            // getting enough slaves has failed
            if (states.size() < localSyncN) {
                throw new BabuDBException(ErrorCode.REPLICATION_FAILURE,
                        "Not enough slaves available to synchronize with!");
               
            } else {
               
                List<LSN> values = new LinkedList<LSN>(states.values());
                Collections.sort(values, Collections.reverseOrder());
                latest = values.get(0);
            }
       
            // synchronize with the most up-to-date slave, if necessary
            LSN localState = babuDB.getState();
            if (localState.compareTo(latest) < 0) {
                for (Entry<ClientInterface, LSN> entry : states.entrySet()) {
                    if (entry.getValue().equals(latest)) {  
                               
                        Logging.logMessage(Logging.LEVEL_INFO, this,
                                "Starting synchronization from '%s' to '%s'.",
                                localState.toString(), latest.toString());
                   
                        BabuDBRequestResultImpl<Object> ready = babuDB.createRequestFuture();
                       
                        replicationStage.manualLoad(ready, latest);
                        ready.get();
                   
                        assert(latest.equals(babuDB.getState())) :
                            "Synchronization failed: (expected=" +
                            latest.toString() + ") != (acknowledged=" +
                            babuDB.getState() + ")";
                        break;
                    }
                }
            }
        }
       
        // take a checkpoint on master-failover (inclusive viewId incrementation), if necessary
        LSN beforeCP = lastOnView.get();
        if (babuDB.getState().getSequenceNo() > 0L) {
            Logging.logMessage(Logging.LEVEL_DEBUG, this, "taking a checkpoint");
            beforeCP = babuDB.checkpoint();
            lastOnView.set(beforeCP);
        }
        Logging.logMessage(Logging.LEVEL_INFO, this,
                "Agreed to synchronize to %s before the next view.", beforeCP.toString());
       
        // wait for the slaves to recognize the master-change and for at least N servers to
        // establish a stable state
        LSN syncState = babuDB.getState();
        final ReplicateResponse result = new ReplicateResponse(syncState, listener,
                slaves.size() - localSyncN);
       
        subscribeListener(result);
       
        for (ConditionClient slave : slaves) {
            slave.synchronize(beforeCP, port).registerListener(
                    new ClientResponseAvailableListener<Object>() {

                        @Override
                        public void responseAvailable(Object r) { /* ignored */ }

                        @Override
                        public void requestFailed(Exception e) {
                            result.decrementPermittedFailures();
                        }
            });
        }
       
        Logging.logMessage(Logging.LEVEL_INFO, this,
                "The next view will start with %s.", syncState.toString());
    }
 
    /*
     * (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.service.SlaveView#getSynchronizationPartner(org.xtreemfs.babudb.lsmdb.LSN)
     */
    @Override
    public MasterClient getSynchronizationPartner(LSN progressAtLeast) {
       
        List<ConditionClient> servers = participantsStates.getConditionClients();
       
        if (servers.size() > 0) {
            Map<ClientInterface, LSN> states = getStates(servers, false);
           
            for (Entry<ClientInterface, LSN> e : states.entrySet()) {
                if (e.getValue().compareTo(progressAtLeast) >= 0)
                    return (MasterClient) e.getKey();
            }
        } else {
            return null;
        }
       
        return (MasterClient) servers.get(0);
    }
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.service.ServiceToControlInterface#getParticipantOverview()
     */
    @Override
    public ParticipantsOverview getParticipantOverview() {
        return participantsStates;
    }
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.service.ServiceToControlInterface#reset()
     */
    @Override
    public void reset() {
        participantsStates.reset();
    }
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.Layer#_setLifeCycleListener(org.xtreemfs.foundation.LifeCycleListener)
     */
    @Override
    public void _setLifeCycleListener(LifeCycleListener listener) {
        this.heartbeatThread.setLifeCycleListener(listener);
        this.replicationStage.setLifeCycleListener(listener);
    }
   
    /**
     * Method to initialize all ServiceLayer services. Passes the userService to be available at the
     * {@link ReplicationStage}.
     *
     * @param userService
     */
    public void start(ControlLayerInterface topLayer) {
        LSN latest = babuDB.getState();
        LSN normalized = (latest.getSequenceNo() == 0L) ? new LSN(0,0L) : latest;
           
        // the sequence number of the initial LSN before incrementing the viewID must not be 0
        Logging.logMessage(Logging.LEVEL_DEBUG, this, "Setting last on view " +
           "LSN to '%s', initial was '%s'.", normalized, latest);
       
        lastOnView.set(normalized);
               
        try {
            replicationStage.start(topLayer);
            heartbeatThread.start(latest);
            heartbeatThread.waitForStartup();
            replicationStage.waitForStartup();
        } catch (Exception e) {
            listener.crashPerformed(e);
        }
    }
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.Layer#asyncShutdown()
     */
    @Override
    public void asyncShutdown() {
        this.heartbeatThread.shutdown();
        this.replicationStage.shutdown();
    }

    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.Layer#shutdown()
     */
    @Override
    public void shutdown() {
        this.heartbeatThread.shutdown();
        this.replicationStage.shutdown();
       
        try {
            this.heartbeatThread.waitForShutdown();
            this.replicationStage.waitForShutdown();
        } catch (Exception e) {
            this.listener.crashPerformed(e);
        }
    }
   
    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.service.ServiceToControlInterface#subscribeListener(
     *          org.xtreemfs.babudb.replication.service.accounting.LatestLSNUpdateListener)
     */
    @Override
    public void subscribeListener(LatestLSNUpdateListener rp) {
        participantsStates.subscribeListener(rp);
    }
   
/*
* private methods
*/
   
    /**
     * Performs a network broadcast to get the latest LSN from every available DB.
     *
     * @param clients
     * @param hard - decide whether the requested state should be preserved (true), or not (false).
     * @return the LSNs of the latest written LogEntries for the <code>babuDBs
     *         </code> that were available (could be empty).
     */
    private Map<ClientInterface, LSN> getStates(List<ConditionClient> clients, boolean hard) {
        Map<ClientInterface, LSN> result = new HashMap<ClientInterface, LSN>();
        int numRqs = clients.size();
       
        if (numRqs == 0) {
            return result;
        }
       
        List<ClientResponseFuture<LSN, org.xtreemfs.babudb.pbrpc.GlobalTypes.LSN>> rps =
            new ArrayList<ClientResponseFuture<LSN, org.xtreemfs.babudb.pbrpc.GlobalTypes.LSN>>(
                    numRqs);
       
        if (numRqs > 0) {
            // send the requests
            for (int i = 0; i < numRqs; i++) {
                if (hard) {
                    rps.add(i, clients.get(i).state());
                } else {
                    rps.add(i, clients.get(i).volatileState());
                }
            }
           
            // get the responses
            for (int i = 0; i < numRqs; i++) {
                try{
                    LSN val = rps.get(i).get();
                    result.put(clients.get(i), val);
                } catch (Exception e) {
                    Logging.logMessage(Logging.LEVEL_INFO, this,
                            "Could not receive state of '%s', because: %s.",
                            clients.get(i), e.getMessage());
                    if (e.getMessage() == null)
                        Logging.logError(Logging.LEVEL_INFO, this, e);
                }
            }
        }
       
        return result;
    }

    /* (non-Javadoc)
     * @see org.xtreemfs.babudb.replication.Layer#start()
     */
    @Override
    public void start() {
        throw new UnsupportedOperationException("Use start(LockableService userService) instead!");
    }
}
TOP

Related Classes of org.xtreemfs.babudb.replication.service.ServiceLayer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.