Package org.chaidb.db.transaction.recover

Source Code of org.chaidb.db.transaction.recover.ForwardRecovery

/*
* Copyright (C) 2006  http://www.chaidb.org
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
*/
package org.chaidb.db.transaction.recover;

import org.apache.log4j.Logger;
import org.chaidb.db.Db;
import org.chaidb.db.DBState;
import org.chaidb.db.exception.ChaiDBException;
import org.chaidb.db.exception.ErrorCode;
import org.chaidb.db.helper.SafeUpdateUtils;
import org.chaidb.db.index.btree.bufmgr.PageBufferManager;
import org.chaidb.db.log.*;
import org.chaidb.db.log.logrecord.TxnFuzzyCkpLogRecord;
import org.chaidb.db.transaction.TransactionManager;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;


/**
* @author Kurt Sung
*/
public final class ForwardRecovery {
    private static final Logger logger = Logger.getLogger(ForwardRecovery.class);

    private static final int MAX_TXN_IN_LIST = 25;
    private static final int PRINT_PROGRESS_INTERVAL = 1000;
    public static final int SCAN_PERCENT = 50;
    private final String tmpPath;
    private final String logPath;
    private LogFile logWr;
    private ReadCache readCache;
    private Lsn nullLsn = new Lsn(-1, -1);
    private Lsn prevLastLsn = null;
    private Lsn lastSLsn = null;
    private String prefix = "";
    private HashMap txnList = new HashMap();
    private ArrayList redoList = new ArrayList();
    private HashMap undoList = new HashMap();
    private HashMap ddlList = new HashMap();
    private boolean running;
    private int progress;

    /**
     * Create a recovery instance. Should be created before copy any new log
     *
     * @param logPath
     * @param tmpPath
     * @throws ChaiDBException
     */
    public ForwardRecovery(String logPath, String tmpPath, boolean backup) throws ChaiDBException {
        this.tmpPath = tmpPath;
        this.logPath = logPath;
        logWr = new LogFile(logPath);
        readCache = new ReadCache(logWr);
        readCache.setDirection(true);

        // check whether the first time or backup server crashed
        // should be called before get new log
        FrControlFile frc = FrControlFile.getControlFile(logPath);

        if (frc != null) {
            //backup server crashed
            lastSLsn = new Lsn(frc.getLastSLsn());
            prevLastLsn = new Lsn(frc.getLastSLsn());

            if (backup) {
                logWr.truncate(prevLastLsn.getFileId(), prevLastLsn.getOffset());
            }
        } else {
            if (!backup) {
                throw new ChaiDBException(ErrorCode.RECOVER_ERROR_BASE, "Can't get control file. It must be transfer from backup server first.");
            }

            //the first time
            int lastid = logWr.getLastFileIdInDir();
            boolean copyWhole = false;
            if (lastid == 0) {
                copyWhole = true;
            } else if (lastid == 1) {
                File lastf = new File(logWr.getFullFileName(lastid));
                if (lastf.length() < 200 * 1024) {
                    copyWhole = true;
                }
            }
            if (copyWhole) {
                prevLastLsn = new Lsn(1, 0);
            } else {
                prevLastLsn = removeEndChkp();
            }

            if (prevLastLsn == null) {
                throw new ChaiDBException(ErrorCode.LOG_FILE_NOT_EXIST);
            }

            lastSLsn = prevLastLsn;

            frc = FrControlFile.createControlFile(logPath);
            frc.setLastLsn(prevLastLsn.toString());
            frc.setLastSLsn(prevLastLsn.toString());
            frc.writeToDisk();
        }
    }

    private Lsn removeEndChkp() throws ChaiDBException {
        int[] endoffset = new int[1];
        Lsn lastLsn = logWr.getLastLsn(endoffset);

        while (true) {
            LogRecord record = readCache.get(lastLsn);

            if (record.getType() == LogRecord.LOG_TXN_FUZZY_CHECKPOINT) {
                endoffset[0] = lastLsn.getOffset();

                int offset = record.getHeader().getPrevOffset();

                if (offset < 0) {
                    offset = -offset;
                    lastLsn.setFileId(lastLsn.getFileId() - 1);
                }

                lastLsn.setOffset(offset);
            } else {
                break;
            }
        }

        File f = new File(logWr.getFullFileName(lastLsn.getFileId()));

        if (f.exists() && (f.length() != endoffset[0])) {
            logWr.truncate(lastLsn.getFileId(), endoffset[0]);
        }

        return lastLsn;
    }

    public boolean recover() throws ChaiDBException {
        logger.debug("Fall into ForwardRecovery.recover");

        synchronized (this.getClass()) {
            if (running) {
                return false;
            }

            running = true;
        }

        try {
            //        PageBufferManager.getInstance().setInOnlineRecovery(true);
            try {
                Db.getTxnManager().setFlags(TransactionManager.TXN_IN_RECOVERY);
                logger.debug("lastLsn=" + prevLastLsn.toString());

                readCache.setDirection(true);
                progress = 0;

                //Scan and apply log
                redo();

                PageBufferManager.getInstance().dump(true);

                //After apply, close all
                PageBufferManager.getInstance().closeAllBTrees();

                DBState.getInstance().setNeedRecovery(false);
            } finally {
                // Remove TXN_IN_RECOVERY flag from txn manager.
                Db.getTxnManager().clearFlags(TransactionManager.TXN_IN_RECOVERY);
            }

            FrControlFile frc = FrControlFile.getControlFile(logPath);

            if (frc == null) {
                frc = FrControlFile.createControlFile(logPath);
            }

            SafeUpdateUtils bu = new SafeUpdateUtils();

            try {
                bu.backupFiles(new String[]{frc.getControlFilePath()}, "icb");
            } catch (IOException e) {
                return false;
            }

            try {
                frc.setLastLsn(prevLastLsn.toString());
                frc.setLastSLsn(lastSLsn.toString());
                frc.writeToDisk();
            } finally {
                bu.clearFiles();
            }

            logger.debug("Run out of ForwardRecovery.recover");

            return true;
        } finally {
            synchronized (this.getClass()) {
                running = false;
            }
        }
    }

    void redo() throws ChaiDBException {
        LogRecord cursorLogRecord;
        logger.debug("Fall into ForwardRecovery.redo");
        logger.debug("prevLastLsn:" + prevLastLsn);
        readCache.clear();

        final int lastFid = logWr.getLastFileIdInDir();
        final long lastFlen = new File(logWr.getFullFileName(lastFid)).length();
        Lsn lastLsn = new Lsn(lastFid, (int) lastFlen);

        /* This group of variables are used to show oldScanProgress ratio
        */
        int counter = 0; // this counter is used to avoid frequently calculate oldScanProgress
        long allDistance = getDistance(prevLastLsn, lastLsn);
        int redoProgress = 0;

        /* Begin from the next lsn after prevLastLsn
        */
        Lsn cursorLsn = new Lsn(prevLastLsn);

        try {
            cursorLogRecord = readCache.get(cursorLsn);
        } catch (ChaiDBException e) {
            logger.error("Can't continue the replication from the lsn " + prevLastLsn);
            throw e;
        }

        int offset = 0;
        if (!(prevLastLsn.getFileId() == 1 && prevLastLsn.getOffset() == 0)) {
            offset = cursorLsn.getOffset() + cursorLogRecord.getRecordLength();
        }
        int fid = cursorLsn.getFileId();
        long flen = new File(logWr.getFullFileName(fid)).length();

        Lsn oldLastLsn = new Lsn(prevLastLsn);
        int headerLen = Hdr.getHdrLength();
        System.out.print("\r" + prefix + "1% completed.    ");
        progress = 1;

        while (true) {
            if ((offset >= flen) || ((flen - offset) <= headerLen)) {
                if (fid >= logWr.getLastFileIdInDir()) {
                    //is last log file
                    break;
                }

                fid++;
                offset = 0;
                flen = new File(logWr.getFullFileName(fid)).length();

                if (flen <= headerLen) {
                    //new file has no enough bytes
                    break;
                }

                cursorLsn.setFileId(fid);
            }

            cursorLsn.setOffset(offset);

            try {
                cursorLogRecord = readCache.get(cursorLsn);
            } catch (ChaiDBException e) {
                logger.error(e);
                throw e;
            }

            final byte type = cursorLogRecord.getType();

            switch (type) {
                case LogRecord.LOG_TXN_CHILD:
                    break;

                case LogRecord.LOG_TXN_CHECKPOINT:
                    break;

                case LogRecord.LOG_TXN_FUZZY_CHECKPOINT: //V3.1 fuzzy checkpoint
                    lastSLsn = ((TxnFuzzyCkpLogRecord) cursorLogRecord).getSmallestLsn();

                    break;

                default:

                    final Integer txnId = new Integer(cursorLogRecord.getTxnId());
                    TxnQueue txnQueue;

                    if (cursorLogRecord.getPrevLsn().equals(nullLsn)) {
                        txnQueue = new TxnQueue(txnId, cursorLsn, tmpPath);
                        txnList.put(txnId, txnQueue);

                        //                    System.out.println("Found transaction: " + txnId);
                    }

                    txnQueue = (TxnQueue) txnList.get(txnId);

                    if (txnQueue == null) {
                        if (ddlList.containsKey(txnId) && (type == LogRecord.LOG_TXN_REGOP)) {
                            ddlList.remove(txnId);
                        } else {
                            //after undo to sLsn, some committed txn may across the
                            //sLsn, so just ignore this case
                            ;
                        }
                    } else {
                        if ((type == LogRecord.LOG_TXN_REGOP) || (type == LogRecord.LOG_DELETE_FILES)) {
                            txnList.remove(txnId);
                            redoList.add(txnQueue);
                            logger.debug("Transaction " + Integer.toHexString(txnId.intValue()) + " committed.");

                            if (type == LogRecord.LOG_DELETE_FILES) {
                                ddlList.put(txnId, txnQueue);
                            }
                        } else {
                            byte[] data = new byte[cursorLogRecord.getRecordLength()];
                            cursorLogRecord.toBytes(data, 0);
                            txnQueue.offer(data);
                        }
                    }
            }

            prevLastLsn.setFileId(fid);
            prevLastLsn.setOffset(offset);
            offset += cursorLogRecord.getRecordLength();

            //show progress
            counter++;

            if (counter == PRINT_PROGRESS_INTERVAL) {
                long dis = getDistance(oldLastLsn, cursorLsn);
                int scanProgress = (int) ((dis * SCAN_PERCENT) / allDistance);

                int nprogress = scanProgress + redoProgress;

                if ((nprogress > progress) && (scanProgress < SCAN_PERCENT) && (nprogress < 100) && (nprogress > 0)) {
                    System.out.print("\r" + prefix + nprogress + "% completed.    ");
                    progress = nprogress;
                }

                counter = 0;

                if (redoList.size() > MAX_TXN_IN_LIST) {
                    logger.debug("redoList is full and need apply now");
                    apply(scanProgress);
                    redoProgress = scanProgress;
                }
            }
        }

        if (lastSLsn != null) {
            Iterator it = txnList.entrySet().iterator();

            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                final TxnQueue txnQueue = (TxnQueue) entry.getValue();

                if (txnQueue.getFirstLsn().compare(lastSLsn) < 0) {
                    txnQueue.abandon();
                    it.remove();
                }
            }
        }

        //apply
        logger.debug("apply the log to data");

        if (progress < SCAN_PERCENT) {
            System.out.print("\r" + prefix + "50% completed.    ");
            progress = SCAN_PERCENT + redoProgress;
        }

        apply(SCAN_PERCENT);
        System.out.print("\r" + prefix + "100% completed.    ");
        logger.debug("Run out of ForwardRecovery.redo");
    }

    /**
     * @param scanProgress 0 - 100
     * @throws ChaiDBException
     */
    private void apply(int scanProgress) throws ChaiDBException {
        int logIndex = 0;
        int logCount = 0;
        int counter = 0;
        int diff = (scanProgress * 2) - progress;
        int startProgress = progress;
        final int redoCount = redoList.size();

        for (int i = 0; i < redoCount; i++) {
            logCount += ((TxnQueue) redoList.get(i)).getSize();
        }

        for (int i = 0; i < redoCount; i++) {
            TxnQueue txnQueue = (TxnQueue) redoList.get(i);

            //            System.out.println("Recovering transaction: " +
            //                txnQueue.getTxnId());
            byte[] data = txnQueue.poll();

            while (data != null) {
                LogRecord cursorLogRecord = RecordFactory.createRecord(data, 0);

                if (cursorLogRecord == null) {
                    break;
                }

                //prepare data for next loop
                data = txnQueue.poll();

                //show progress
                logIndex++;
                counter++;

                if (counter == PRINT_PROGRESS_INTERVAL) {
                    final int curper = (logIndex * diff) / logCount;
                    int nprogress = curper + startProgress;

                    if ((nprogress > progress) && (nprogress < (scanProgress * 2)) && (nprogress > 0) && (nprogress < 100)) {
                        System.out.print("\r" + prefix + nprogress + "% completed.    ");
                        progress = nprogress;
                    }

                    counter = 0;
                }

                try {
                    cursorLogRecord.recover(LogRecord.REDO);
                } catch (ChaiDBException e) {
                    logger.error(e.toString());
                    //continue;
                }
            }
        }

        redoList.clear();
    }

    public void setPrefix(String prefix) {
        this.prefix = prefix;
    }

    private long getDistance(Lsn start, Lsn end) {
        long fd = end.getFileId() - start.getFileId();
        long od = end.getOffset() - start.getOffset();

        if (end.getFileId() < start.getFileId()) {
            fd = -fd;
            od = -od;
        } else if ((end.getFileId() == start.getFileId()) && (od < 0)) {
            od = -od;
        }

        return (fd * LogManagerImpl.LOG_FILE_MAX_SIZE) + od;
    }

    public Lsn getPrevLastLsn() {
        return new Lsn(prevLastLsn);
    }

    public void deleteOutdatedLogFiles() {
        logWr.deleteOutdatedLogFiles(lastSLsn);
    }

    public Lsn getLastSLsn() {
        return new Lsn(lastSLsn);
    }

    /**
     * Undo from lastLsn to lastSLsn. Then truncate log to prevLastLsn.
     * the log finished between lastSLsn and the last Checkpoint before prevLastLsn
     * will be ignore.
     * ____ lastSLsn (sLsn of the checkpoint)
     * |  |
     * |__|_ checkpoint
     * |__|_ prevLastLsn
     * |  |
     * |  |
     * |__|_ lastLsn
     *
     * @throws ChaiDBException
     */
    public void undo() throws ChaiDBException {
        boolean foundCheckpoint = false;
        LogRecord cursorLogRecord = null;

        Lsn lastLsn = logWr.getLastLsn(null);

        if (lastLsn == null) {
            throw new ChaiDBException(ErrorCode.LOG_FILE_NO_RECORDS);
        }

        Lsn cursorLsn = new Lsn(lastLsn);

        /* This group of variables are used to show percent ratio
        */
        int counter = 0; // this counter is used to avoid frequently calculate percent
        long allDistance = getDistance(lastSLsn, lastLsn);
        long percent = 0;
        long curper = 0;
        long dis = 0;

        int offset = cursorLsn.getOffset();
        int fid = cursorLsn.getFileId();

        readCache.clear();
        readCache.setDirection(false);
        undoList.clear();

        try {
            Db.getTxnManager().setFlags(TransactionManager.TXN_IN_RECOVERY);

            while (cursorLsn.compare(lastSLsn) >= 0) {
                try {
                    cursorLogRecord = readCache.get(cursorLsn);

                    if (cursorLogRecord == null) {
                        break;
                    }

                    //show progress
                    counter++;

                    if (counter == PRINT_PROGRESS_INTERVAL) {
                        dis = getDistance(cursorLsn, lastLsn);
                        curper = (dis * 100) / allDistance;

                        if ((curper != percent)) {
                            System.out.print("\r" + prefix + curper + "% completed.    ");
                            percent = curper;
                        }

                        counter = 0;
                    }
                } catch (ChaiDBException e) {
                    logger.error(e);
                    throw e;
                }

                final byte type = cursorLogRecord.getType();

                switch (type) {
                    case LogRecord.LOG_TXN_CHILD:

                        //go through
                    case LogRecord.LOG_TXN_CHECKPOINT:
                        break;

                    case LogRecord.LOG_TXN_FUZZY_CHECKPOINT: //V3.1 fuzzy checkpoint

                        if (!foundCheckpoint && (cursorLsn.compare(prevLastLsn) < 0)) {
                            foundCheckpoint = true;
                        }

                        break;

                    default:

                        final Integer txnId = new Integer(cursorLogRecord.getTxnId());

                        if (undoList.containsKey(txnId)) {
                            try {
                                cursorLogRecord.recover(LogRecord.UNDO);
                            } catch (ChaiDBException e) {
                                logger.error(e);
                            }
                        } else {
                            if (!foundCheckpoint) {
                                undoList.put(txnId, txnId);

                                try {
                                    cursorLogRecord.recover(LogRecord.UNDO);
                                } catch (ChaiDBException e) {
                                    logger.error(e);
                                }
                            }
                        }

                        break;
                }

                if ((cursorLsn.getFileId() == 1) && (cursorLsn.getOffset() == 0)) {
                    break;
                }

                offset = cursorLogRecord.getHeader().getPrevOffset();

                if (offset < 0) {
                    fid--;
                    offset = -offset;
                    cursorLsn.setFileId(fid);
                }

                cursorLsn.setOffset(offset);
            }
        } finally {
            Db.getTxnManager().clearFlags(TransactionManager.TXN_IN_RECOVERY);
        }

        System.out.print("\r" + prefix + "100% completed.    ");

        //        System.out.println("logCount:" + logCount);
        logWr.truncate(prevLastLsn.getFileId(), prevLastLsn.getOffset());

        prevLastLsn = lastSLsn;

        FrControlFile frc = FrControlFile.getControlFile(logPath);

        if (frc == null) {
            frc = FrControlFile.createControlFile(logPath);
        }

        SafeUpdateUtils bu = new SafeUpdateUtils();

        try {
            bu.backupFiles(new String[]{frc.getControlFilePath()}, "icb");
        } catch (IOException e) {
            throw new ChaiDBException(ErrorCode.RECOVER_ERROR_BASE, "Can't backup " + frc.getControlFilePath());
        }

        try {
            frc.setLastLsn(prevLastLsn.toString());
            frc.setLastSLsn(lastSLsn.toString());
            frc.writeToDisk();
        } finally {
            bu.clearFiles();
        }
    }

    public void shutdown() throws ChaiDBException {
        if (redoList.size() > 0) {
            apply(50);
        }

        if (txnList != null) {
            Iterator it = txnList.entrySet().iterator();

            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                TxnQueue queue = (TxnQueue) entry.getValue();
                queue.abandon();
            }
        }

        txnList.clear();
        ddlList.clear();
    }
}
TOP

Related Classes of org.chaidb.db.transaction.recover.ForwardRecovery

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.