Package org.chaidb.db.index.btree.bufmgr

Source Code of org.chaidb.db.index.btree.bufmgr.PageBufferManager$ThreadRecorder

/*
* Copyright (C) 2006  http://www.chaidb.org
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
*/
package org.chaidb.db.index.btree.bufmgr;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.chaidb.db.Db;
import org.chaidb.db.DbEnvironment;
import org.chaidb.db.KernelContext;
import org.chaidb.db.exception.ChaiDBException;
import org.chaidb.db.exception.ErrorCode;
import org.chaidb.db.helper.AbstractDaemonThread;
import org.chaidb.db.helper.ByteTool;
import org.chaidb.db.helper.Config;
import org.chaidb.db.helper.DaemonThreadManager;
import org.chaidb.db.helper.cache.storage.BTreePathStorage;
import org.chaidb.db.helper.cache.storage.SecondaryStorageException;
import org.chaidb.db.index.BTreeFactory;
import org.chaidb.db.index.IBTreeConst;
import org.chaidb.db.index.IDBIndex;
import org.chaidb.db.index.btree.*;
import org.chaidb.db.lock.LockManager;
import org.chaidb.db.log.DefaultLogFile;
import org.chaidb.db.log.logrecord.BTreeSpecLogRecord;

import java.io.*;
import java.util.*;


/**
* This class manages a buffer to cache certain amount of pages.
* It keeps a buffered page pool using byte[][], a hashtable matches pageNumber
* to page information, a look aside link list keeps all the pages not accessed
* by any thread at the moment.
* <p> If a thread trying to fetch a page, it gets the page info from the hashtable
* first. <br>
* If the page info indicated the page is a fixed page, which means it is
* accessed by some thread at the moment, the page is directly returned. <br>
* If the page is in a look aside link list, the page is removed from there, and
* becomes fixed page, and return. <br>
* If the page is not found, read it from pagedFile, put as fixed. <br>
* If there is no slot in the buffer for a new page, the first node (header) in the
* look aside link list is kicked off. If it is dirty, will do a write through.<br>
* <p/>
* After finishing reading / writing, insert the page to the end of look aside
* link list. <br>
*
* @version 1.0
*/
public class PageBufferManager {
    /**
     * the logger
     */
    private static Logger logger = Logger.getLogger(PageBufferManager.class);

    // ------------------------------------------------------ constants

    /**
     * it can be removed
     */
    public static final boolean PROTECT_PAGE = true;

    // ------------------------------------------------------ class variables

    /**
     * is in recovery
     */
    private static boolean isRecovery;

    /**
     * the singleton instance
     */
    private static PageBufferManager _instance;

    private static final int MAX_FREEPAGE_NUMBERS = 50000;

    /**
     * The threshold of clean list.
     */
    private static final int RESERVED_CLEAN_SIZE = 50;

    /**
     * The size of dirty list which will be kicked out
     */
    private static final int KICKOUT_DIRTY_SIZE = 100;
    public static final int BTREEID_BASE = 1000000;

    /**
     * To get a existed page
     */
    private static LinkedList _tempBuf = new LinkedList();
    private static final String TMP_DIR = Config.getDbHome() + File.separator + "TEMP";

    // ------------------------------------------------------ instance variables

    /**
     * the buffer size, default is 16M
     */
    private final int bufferSize = Config.getConfigAsInt("chaidb.db.buffersize", 16) * 0x100000 / BTreeSpec.PAGE_SIZE;

    /**
     * the reference to the pagedFile
     */
    private Storage _storage = new Storage(BTreeSpec.PAGE_SIZE);

    //    private BTree btreeName;
    private BTreePathStorage idStorage = null;

    /**
     * the list to hold the buffered pages
     */
    private byte[][] bufferedPages = new byte[bufferSize][BTreeSpec.PAGE_SIZE];

    /**
     * The LRU list which item is clean and fix = 0
     */
    DList cleanList = new DList();

    /**
     * The list which item is dirty and fix = 0
     */
    DList dirtyList = new DList();

    /**
     * this map btree name to treeid
     */
    private BTreeIdManager btreeName2Id = BTreeIdManager.getInstance();

    /**
     * keeps a list of free locations in the buffer pool
     */
    private LinkedList freeSlots = new LinkedList();

    /**
     * keeps pageNumber and bufferedPageInfo pair
     */
    private HashMap bufferedPageInfos = new HashMap();

    /**
     * free page numbers
     */
    private Hashtable freePageNumbers = new Hashtable();

    /**
     * thread recorders
     */
    private Hashtable threadRecorders = new Hashtable();

    /**
     * All resources used by active  txns
     */
    private Hashtable resources = new Hashtable();

    /**
     * The temp memory block which holds the dirty pages. It is used by deamon thread.
     */
    private byte[][] writeBufferedPages = null;
    private PageNumber[] writePageNumber = null;

    /**
     * The size of dirty pages which deamon thread will flush it.
     */
    private int writeSize = 0;

    //The write lock used by cooperate deamon thread with other threads.
    //To prevent other threads kick out the pages which are flushing to disc by deamon thread.
    private Object writeLock = new Object();

    // justin add 2004-04-24
    // page change tracker, one instance here for performance reason
    // can be changed to container for more trackers
    private IChangeTracker tracker;
    private Hashtable idStorageList = new Hashtable();

    /**
     * the page size
     */
    private int pageSize = BTreeSpec.PAGE_SIZE;

    // ------------------------------------------------------------- constructor

    /**
     * Default constructor
     * Inits the freeSlots with all buffer slots.
     */
    private PageBufferManager() {
        for (int i = 0; i < bufferSize; i++) {
            freeSlots.addLast(new Integer(i));
        }

        writeBufferedPages = new byte[KICKOUT_DIRTY_SIZE][BTreeSpec.PAGE_SIZE];
        writePageNumber = new PageNumber[KICKOUT_DIRTY_SIZE];
        writeSize = 0;
        /**
         * The deamon thread which be responsible for flushing dirty pages when the size of cleanList reaches threshold
         */
        BufferWriter bufWriter = new BufferWriter();
        bufWriter.start();
    }

    public synchronized static PageBufferManager getInstance() {
        if (_instance == null) {
            _instance = new PageBufferManager();
        }

        return _instance;
    }

    /**
     * btree file name save to a mapping file
     *
     * @return name of btree map file
     */
    public String getBTreeMapFileName() {
        return DefaultLogFile.getInstance().getLogFilePath() + IDBIndex.BTREE_NAME_INDEX;
    }

    private BTreePathStorage initIdStorage(String path) throws ChaiDBException {
        BTreePathStorage ids = new BTreePathStorage(path + File.separator + IDBIndex.BTREE_NAME_INDEX);

        try {
            ids.init();
        } catch (SecondaryStorageException e) {
            logger.fatal(new Throwable("Init id Storage failed on path " + path));
            throw new ChaiDBException(ErrorCode.RUNTIME_IO_ERROR, e);
        }

        return ids;
    }

    /**
     * Inits id storage.
     *
     * @throws ChaiDBException
     */
    private void initIdStorage() throws ChaiDBException {
        idStorage = initIdStorage(DefaultLogFile.getInstance().getLogFilePath());
    }

    // --- method for change trackers justin added 2004-04-24

    /**
     * register a tracker to PageBufferManager
     *
     * @param track the tracker
     */
    public void register(IChangeTracker track) {
        tracker = track;
    }

    /**
     * unregister a tracker from PageBufferManager
     *
     * @param track the tracker that would be unregistered
     */
    public void unregister(IChangeTracker track) {
        if (tracker == track) {
            tracker = null;
        }
    }

    private void onChange(PageNumber pno) {
        if (tracker != null) {
            tracker.onChange(pno.getTreeId(), pno);
        }
    }

    private void onClose(int treeid) {
        if (tracker != null) {
            tracker.onClose(treeid);
        }
    }

    // justin add to retrieve storage, package level
    Storage getStorage() {
        return _storage;
    }

    public String getRelatviePath(String filename) {
        String rName = filename;

        if (filename.startsWith(DbEnvironment.getDataHome())) {
            rName = filename.substring(DbEnvironment.getDataHome().length());
        }

        return rName;
    }

    public String getFullPath(String relatvieName) {
        String fName = relatvieName;

        if (relatvieName.indexOf(':') < 0) {
            fName = DbEnvironment.getDataHome() + relatvieName;
        }

        return fName;
    }

    /**
     * Allocates btree id.
     *
     * @param btreeFileName The btree file name.
     * @return The tree id.
     * @throws ChaiDBException
     */
    public int allocateBTreeId(String btreeFileName) throws ChaiDBException {
        if (DbEnvironment.READ_ONLY) {
            throw new ChaiDBException(ErrorCode.DATABASE_IS_READONLY);
        }

        int treeId = -1;

        try {
            if (idStorage == null) {
                initIdStorage();
            }

            String rName = getRelatviePath(btreeFileName);

            treeId = idStorage.store(rName);

            if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
                logger.error("PageBufferManager::allocateBTreeId: the tree id is " + treeId);
                logger.error("PageBufferManager::allocateBTreeId: the tree name is " + btreeFileName);
            }
        } catch (SecondaryStorageException e) {
            logger.fatal(e);
            throw new ChaiDBException(ErrorCode.RUNTIME_IO_ERROR, e);
        }

        return treeId;
    }

    /**
     * Retrieve btree file name by tree id.
     *
     * @param treeId The tree id.
     * @return The btree file name.
     * @throws ChaiDBException
     */
    private String retrieveBTreeFileName(int treeId) throws ChaiDBException {
        try {
            String rName = "";

            if (treeId > BTREEID_BASE) {
                //in reporting server mode, treeId > BTREEID_BASE
                int server = (treeId / BTREEID_BASE);
                int trueId = treeId % BTREEID_BASE;

                String name = org.chaidb.db.helper.DbConstants.REPORTING_SERVER_PREFIX + server;
                BTreePathStorage ids;

                if ((ids = (BTreePathStorage) idStorageList.get(name)) == null) {
                    ids = initIdStorage(DbEnvironment.getDataHome() + name + File.separator + "datalog");
                    idStorageList.put(name, ids);
                }

                rName = ids.retrieve(trueId);
                logger.debug("treeId:" + treeId + " name:" + rName);

                return DbEnvironment.getDataHome() + name + File.separator + rName;
            } else {
                if (idStorage == null) {
                    initIdStorage();
                }

                rName = idStorage.retrieve(treeId);
                logger.debug("treeId:" + treeId + " name:" + rName);
                if (rName == null) {
                    return "";
                }

                return getFullPath(rName);
            }
        } catch (SecondaryStorageException e) {
            logger.fatal(e);
            throw new ChaiDBException(ErrorCode.RUNTIME_IO_ERROR, e);
        }
    }

    /**
     * It is only used by recovery.
     *
     * @param btree
     * @param treeid
     * @return
     * @throws ChaiDBException
     */
    public synchronized boolean open(IBTree btree, File f, int treeid) throws ChaiDBException {
        String btreeName = f.getAbsolutePath();
        Integer id = btreeName2Id.getBTreeId(btreeName);

        if (id != null) {
            throw new ChaiDBException(ErrorCode.BTREE_ALREADY_OPENED, f.getPath());
        }

        _storage.open(f);
        btree.setBtreeId(treeid);
        btreeName2Id.put(btreeName, treeid);
        btreeName2Id.putOpenedBTree(treeid, new OpenedBTree(btree));

        return false;
    }

    public synchronized boolean open(IBTree btree, File f) throws ChaiDBException {
        String btreeName = f.getAbsolutePath();
        Integer id = btreeName2Id.getBTreeId(btreeName);

        if (id != null) {
            btree.setBtreeId(id.intValue());
            throw new ChaiDBException(ErrorCode.BTREE_ALREADY_OPENED, f.getPath());
        }

        _storage.open(f);

        byte[] metaPage = new byte[BTreeSpec.PAGE_SIZE];
        boolean ret = _storage.readPage(btreeName, new PageNumber(0, 0, 0), metaPage);
        int treeid = ByteTool.bytesToInt(metaPage, BTreeSpec.BTREEID_OFF, btree.getBTreeSpec().getMsbFirst());

        if (!ret) {
            treeid = allocateBTreeId(btreeName);
        } else {
            if (treeid == 0) {
                treeid = allocateBTreeId(btreeName);
            } else {
                String rpath = getRelatviePath(f.getAbsolutePath());

                int slash = 0;

                if ((slash = rpath.indexOf(File.separatorChar)) > -1) {
                    String sName = rpath.substring(0, slash);

                    if (sName.startsWith(org.chaidb.db.helper.DbConstants.REPORTING_SERVER_PREFIX)) {
                        int i = Integer.parseInt(sName.substring(6));

                        if (i >= 1) {
                            treeid += (i * BTREEID_BASE);
                        }
                    }
                }

                /* After defrag, btreename.key and btreename.dat will be removed.
                 * We should deal with this case and re-allocate btreeid for
                 * each btree. -- Kurt 2004-1-5
                 */
                String storedname = retrieveBTreeFileName(treeid);

                /* We should use File.getAbsolutePath to verify filename because
                * when the data is from linux, relatvie path stored in btreename
                * is "2/0_a.idb", we use File to convert '/' or '\' to File.seperator
                * according to current OS. -- Kurt 2004-11-18
                */
                File storedFile = new File(storedname);

                if ((storedFile == null) || !storedFile.getAbsolutePath().equals(btreeName)) {
                    treeid = allocateBTreeId(btreeName);
                }
            }
        }

        btree.setBtreeId(treeid);
        btreeName2Id.put(btreeName, treeid);
        btreeName2Id.putOpenedBTree(treeid, new OpenedBTree(btree));

        return false;
    }

    /**
     * dump pages in buffer to file, then close the database
     *
     * @param treeid
     * @throws ChaiDBException
     */
    public synchronized void close(int treeid) throws ChaiDBException {
        logger.debug("close btree, id=" + treeid);

        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);

        if (openedBTree == null) {
            String btreeName = btreeName2Id.getBTreeName(treeid);
            if (btreeName != null) {
                btreeName2Id.remove(btreeName);
            }
            logger.debug("close btree, id=" + treeid + ", btree not found.");

            return;
        }

        synchronized (writeLock) {
            while (writeSize != 0) {
                try {
                    if (Debug.DEBUG_PERFORMANCE) {
                        Debug.blockOnRealFlush++;
                    }

                    writeLock.wait();
                } catch (InterruptedException e) {
                    logger.warn("Buffer: close tree process is interrupted.");
                    return;
                }
            }
        }

        //if it's a special tree, close the index tree
        if (!Db.getTxnManager().isOnRecovery()) {
            if (openedBTree.btree.getType() == IDBIndex.ID2NODE_BTREE) {
                ((Id2nodeBTree) (openedBTree.btree)).indexBTree.close();
            }
        }

        flushMetaPage(treeid);
        dump(treeid);

        String name = getBTreeName(treeid);

        //Flush the page which is isFix() && isDirty().
        BufferedPageInfo bpi = null;
        LinkedList fixedPageNumbers = new LinkedList();
        LinkedList cleanedPageNumbers = new LinkedList();
        Collection values = this.bufferedPageInfos.values();
        Iterator iter = values.iterator();

        while (iter.hasNext()) {
            bpi = (BufferedPageInfo) iter.next();

            if ((bpi != null) && (bpi.getPageNumber().getTreeId() == treeid)) {
                if (bpi.isFix()) {
                    if (bpi.isDirty()) {
                        String filename = btreeName2Id.getBTreeName(treeid);

                        if (filename == null) {
                            logger.error("PageBufferManager::close The file name is null. The tree id " + treeid);
                        } else {
                            if (filename.equals("")) {
                                logger.error("PageBufferManager::close The file name is empty. The tree id " + treeid);
                            }
                        }

                        logger.error(new Throwable("There still exist fixed page is dirty."));
                        logger.error("PageBufferManager::close: The fixed dirty page: " + bpi.toString());

                        if (Debug.DEBUG_CHECKPAGENUMBER) {
                            if (!Db.getTxnManager().isOnRecovery()) {
                                if (!Debug.checkPageNumber(bpi.getPageNumber(), bufferedPages[bpi.getPageLocation()])) {
                                    logger.fatal(new Throwable("there is hole in GetPage!"));
                                    new Throwable().printStackTrace(System.err);
                                    dump();
                                    Db.getLogManager().flush();

                                    if (Debug.DEBUG_DUMP_MEMORY) {
                                        dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                                    }

                                    Debug.dumpPageToLog(bufferedPages[bpi.getPageLocation()]);

                                    if (Debug.DEBUG_PAGEINFO_TRACE) {
                                        Debug.pageHistory(bpi.getPageNumber());
                                    }

                                    System.exit(-1);
                                }
                            }
                        }

                        _storage.writePage(filename, bpi.getPageNumber(), bufferedPages[bpi.getPageLocation()]);
                        bpi.clean();
                    }

                    bpi.resetFix();
                    fixedPageNumbers.add(bpi.getPageNumber());
                } else { // isnot fixed

                    if (!bpi.isDirty()) {
                        cleanedPageNumbers.add(bpi.getPageNumber());
                    }
                }
            }
        }

        PageNumber tmpPN = null;
        BufferedPageInfo tmpBPI = null;

        for (int i = 0; i < fixedPageNumbers.size(); i++) {
            tmpPN = (PageNumber) fixedPageNumbers.get(i);
            tmpBPI = (BufferedPageInfo) bufferedPageInfos.get(tmpPN);
            freeSlots.add(new Integer(tmpBPI.getPageLocation()));
            bufferedPageInfos.remove(tmpPN);
        }

        for (int i = 0; i < cleanedPageNumbers.size(); i++) {
            tmpPN = (PageNumber) cleanedPageNumbers.get(i);
            tmpBPI = (BufferedPageInfo) bufferedPageInfos.get(tmpPN);
            freeSlots.add(new Integer(tmpBPI.getPageLocation()));
            bufferedPageInfos.remove(tmpPN);
            cleanList.removeItem(tmpBPI);

            if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                Debug.write(tmpBPI.getPageNumber(), 11, "[Remove item]");
            }
        }

        openedBTree.btree.setReady(false);
        _storage.close(name);

        if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
            logger.error(new Throwable("PageBufferManager::close: the tree name " + name));
            logger.error(new Throwable("PageBufferManager::close: the tree id " + btreeName2Id.getBTreeId(name)));
        }

        btreeName2Id.remove(name);

        //      justin add for track btree change
        onClose(treeid);
        logger.debug("close btree, id=" + treeid + ", btree closed successfully.");
    }

    /**
     * add one freed page number to the freeList
     *
     * @param treeid  id of tree to which this freed page belongs
     * @param pNumber freed page no
     * @param txnId   id of txn does this operation. If null, no txn and this will operate
     *                global variables directly.
     */
    public synchronized void addToFreeList(int treeid, PageNumber pNumber, Integer txnId) throws ChaiDBException {
        pNumber.setTreeId(treeid);

        BufferedPageInfo pageInfo = (BufferedPageInfo) bufferedPageInfos.remove(pNumber);

        // this page still in buffer
        if (pageInfo != null) {
            if (!pageInfo.isFix()) {
                logger.error("Error: why pg[0x" + Integer.toHexString(pNumber.getPageNumber()) + "] is fixed=" + pageInfo.getFixed());
            } else {
                unFix(pageInfo, false);
            }

            if (freePageNumbers.get(pNumber) == null) {
                //@todo - Kurt  50%
                if (freePageNumbers.size() < MAX_FREEPAGE_NUMBERS) {
                    if (txnId != null) { //belongs a txn

                        ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

                        if (rcb == null) {
                            rcb = new ResourceControlBlock(txnId.intValue());
                            resources.put(txnId, rcb);
                        }

                        rcb.addToFreeList(pNumber);
                    } else { //no txn

                        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
                        openedBTree.putFreePage(pNumber);
                    }

                    freePageNumbers.put(pNumber, pNumber);
                }

                freeSlots.addLast(new Integer(pageInfo.getPageLocation()));

                return;
            } else {
                freeSlots.addLast(new Integer(pageInfo.getPageLocation()));

                //when recovery, it may be already in FPL by the former correct recovery.
                if (isRecovery) {
                    return;
                }

                logger.error("Error: why pg[" + pNumber.toHexString() + "] is in freelist??");
            }
        } else {
            logger.error("Error: why pg[" + pNumber.toHexString() + "] is not in pagepool??");
        }
    }

    public synchronized void addToFreeListWhenOpenBTree(int treeid, PageNumber pNumber) {
        //@todo - Kurt
        if (freePageNumbers.size() < MAX_FREEPAGE_NUMBERS) {
            OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
            openedBTree.putFreePage(pNumber);
            freePageNumbers.put(pNumber, pNumber);
        }
    }

    public synchronized void addToLatestDataPageListWhenOpenBTree(int treeid, PageNumber pNumber) throws ChaiDBException {
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);

        /* although there's no transaction while system restart,the first parameter
         * still input 'true' for adding latest datapage to openBTree's latest datapage
         * list,and we could judge it different from SYSTEM_TXN_OFF
         */
        byte[] page = getPage(treeid, pNumber);

        if (page != null) {
            byte[] docBytes = new byte[4];
            System.arraycopy(ByteTool.subByteArray(page, BTreeSpec.OFF_DOCID, BTree.DOCID_SIZE), 0, docBytes, docBytes.length - BTree.DOCID_SIZE, BTree.DOCID_SIZE);

            int docid = ByteTool.bytesToInt(docBytes, 0, openedBTree.btree.getBTreeSpec().getMsbFirst());
            releasePage(treeid, pNumber, false);

            openedBTree.putLatestDataPage(docid, true, pNumber);
        }
    }

    /**
     * If the specified page is current used latest Datapage.
     *
     * @param p     specified page
     * @param txnId id of txn call this. If there is no txn, it is null.
     */
    public boolean isLatestDataPage(int docid, PageNumber p, Integer txnId) {
        if (txnId != null) {
            ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

            if (rcb != null) {
                return rcb.isLatestDataPage(docid, p);
            }
        }

        //Get a free page from global free page list
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(p.getTreeId());

        return openedBTree.isLatestDataPage(docid, p);
    }

    /**
     * Get the latestDataPageNumber of a btree associated with the specified txn.
     * If no txn, get one from latestDataPageList
     *
     * @param treeid id of a btree
     */
    public PageNumber getLatestDataPage(int docid, int treeid, Integer txnId) throws ChaiDBException {
        PageNumber pn = null;

        if (txnId != null) {
            ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

            if (rcb == null) {
                rcb = new ResourceControlBlock(txnId.intValue());
                resources.put(txnId, rcb);
            }

            return rcb.getLatestDataPage(docid, treeid);
        }

        //Get a free page from global free page list
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
        pn = openedBTree.getLatestDataPage(docid, false);

        return pn;
    }

    /**
     * Get another latestDataPageNumber of a btree associated with the specified txn.
     * If no txn, get one from latestDataPageList
     *
     * @param treeid id of a btree
     */
    public PageNumber getANewLatestDataPage(int docid, int treeid, Integer txnId) throws ChaiDBException {
        PageNumber pn = null;

        if (txnId != null) {
            ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

            if (rcb == null) {
                rcb = new ResourceControlBlock(txnId.intValue());
                resources.put(txnId, rcb);
            }

            return rcb.getANewLatestDataPage(docid, treeid);
        }

        //Get a free page from global free page list
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
        pn = openedBTree.getANewLatestDataPage(docid, false);

        return pn;
    }

    /**
     * Replace the latestDataPageNumber of a btree associated with the specified txn.
     * If no txn, get one from latestDataPageList
     */
    public void putLatestDataPage(int docid, PageNumber newPage, Integer txnId) throws ChaiDBException {
        if (txnId != null) {
            ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

            if (rcb == null) {
                rcb = new ResourceControlBlock(txnId.intValue());
                resources.put(txnId, rcb);
            }

            //if this txn has free page, then return it.
            rcb.putLatestDataPage(docid, newPage, true);

            return;
        }

        //Replace that in global free page list
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(newPage.getTreeId());
        openedBTree.putLatestDataPage(docid, false, newPage);
    }

    /**
     * This is called by BTreeNewPageLogRecord.doUndo and BTreeFreePage.doRedo.
     * if a txn new a latestDataPage, its undo will put it to
     * freelist, so that the corresponding ldp should be deleted
     */
    public void removeLatestDataPage(int docid, int treeId, int pgNo, Integer txnId) throws ChaiDBException {
        PageNumber p = new PageNumber(pgNo);
        p.setTreeId(treeId);
        removeLatestDataPage(docid, treeId, p, txnId);
    }

    public void removeLatestDataPage(int docid, int tid, PageNumber p, Integer txnId) throws ChaiDBException {
        if (txnId != null) {
            ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

            if (rcb == null) {
                rcb = new ResourceControlBlock(txnId.intValue());
                resources.put(txnId, rcb);
            }

            rcb.removeLatestDataPage(docid, p);
        }

        //the following is called by BTreeFreePage.doRedo

        /*During recovery, a previous txn1 may add a ldp to GLDP list, which may be
          discarded by another txn2 when acquiring a new LDP.
          Later the page is freed after txn3 deltes all nodes in it
          and gernerate a BTreeFreePage log record. All the three txn commits.
          Later recovery, the following scenario occurs:
            (1) txn1 put this ldp to GLDP list after its redo done
            (2) txn2 do nothing without any log record relating to this page although
                it does dispose this ldp in normal flow
            (3) txn3 add it to freelist by BTreeFreePage.doRedo().
           Therefore, we must clear it form GLDP to guranteen this page ONLY be permitted
           exist within one list,either GLDP list or free page list.
        */
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(tid);
        openedBTree.removeLatestDataPage(docid, p);
    }

    private boolean checkPageNumber(PageNumber pn, byte[] page) {
        if (pn.getPageNumber() == 0) { //o page is right
            return true;
        }
        int i = ByteTool.bytesToInt(page, 0, true);

        if (pn.getPageNumber() != i) {
            logger.debug(new StringBuffer().append("Expected pagenumber is: ").append(pn).append("(").append(pn.toHexString()).append(")").toString());
            logger.debug(new StringBuffer().append("Actual pagenumber is: ").append(i).append("(0x").append(Integer.toHexString(i)).append(")").toString());
        }
        return (pn.getPageNumber() == i);
    }

    private byte[] getPageFromCache(PageNumber pNumber) throws ChaiDBException {
        long startTime = 0;
        long endTime = 0;
        long beginTime = 0;
        BufferedPageInfo pageInfo = null;

        if (freePageNumbers.containsKey(pNumber)) {
            if (isRecovery) {
                return null;
            }

            if (Debug.DEBUG_PAGEINFO_TRACE) {
                logger.fatal("The page in freePageNumbers when getPageFromCache " + pNumber.toHexString() + " of " + getBTreeName(pNumber.getTreeId()));
                logger.fatal(new Throwable("The page in freePageNumbers when getPageFromCache"));
                dump();
                Db.getLogManager().flush();

                if (Debug.DEBUG_PAGEINFO_TRACE) {
                    Debug.pageHistory(pNumber);
                }

                if (Debug.DEBUG_DUMP_MEMORY) {
                    dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                }

                System.exit(-1);
            }

            throw new ChaiDBException(ErrorCode.BTREE_USE_FREEPAGE, pNumber.toHexString() + " of " + getBTreeName(pNumber.getTreeId()));
        }

        if (Debug.DEBUG_PERFORMANCE) {
            Debug.increaseBufferHitCount();
        }

        if (Debug.DEBUG_PERFORMANCE) {
            beginTime = System.currentTimeMillis();
        }

        synchronized (this) {
            try {
                if (Debug.DEBUG_PERFORMANCE) {
                    startTime = System.currentTimeMillis();
                }

                pageInfo = (BufferedPageInfo) bufferedPageInfos.get(pNumber);

                if (pageInfo == null) { // found the page in buffer

                    return null;
                }

                pageInfo.incHitCount();

                if (!(pageInfo.isFix())) {
                    // remove it from look aside list
                    if (pageInfo.isDirty()) {
                        dirtyList.removeItem(pageInfo);

                        if (Debug.DEBUG_DIRTY_LIST_TRACE) {
                            Debug.write(pageInfo.getPageNumber(), 11, "[Remove item]");
                        }
                    } else {
                        cleanList.removeItem(pageInfo);

                        if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                            Debug.write(pageInfo.getPageNumber(), 11, "[Remove item]");
                        }
                    }
                }

                pageInfo.fix();
            } finally {
                if (Debug.DEBUG_PERFORMANCE) {
                    endTime = System.currentTimeMillis();
                    Debug.getPageTime += (endTime - startTime);
                    Debug.getTotalPageTime += (endTime - beginTime);
                }
            }
        }

        //end sync
        if (PROTECT_PAGE) {
            ThreadRecorder tr = (ThreadRecorder) threadRecorders.get(Thread.currentThread());

            if (tr == null) {
                tr = new ThreadRecorder();
                threadRecorders.put(Thread.currentThread(), tr);
            }

            tr.fixPage(pNumber);
        }

        if (Debug.DEBUG_CHECKPAGENUMBER) {
            if (!Db.getTxnManager().isOnRecovery()) {
                if (!Debug.checkPageNumber(pNumber, bufferedPages[pageInfo.getPageLocation()])) {
                    logger.fatal(new Throwable("there is hole in GetPage!"));
                    new Throwable().printStackTrace(System.err);
                    dump();
                    Db.getLogManager().flush();

                    if (Debug.DEBUG_DUMP_MEMORY) {
                        dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                    }

                    Debug.dumpPageToLog(bufferedPages[pageInfo.getPageLocation()]);

                    if (Debug.DEBUG_PAGEINFO_TRACE) {
                        Debug.pageHistory(pNumber);
                    }

                    System.exit(-1);
                }
            }
        }

        return bufferedPages[pageInfo.getPageLocation()];
    }

    /**
     * Get a page.
     *
     * @param treeid
     * @param pNumber
     * @return
     * @throws ChaiDBException
     */
    public byte[] getPage(int treeid, PageNumber pNumber) throws ChaiDBException {
        long startTime = 0;
        long endTime = 0;
        long beginTime = 0;

        if (pNumber.getPageInFile() == BTreeSpec.INVALID_PAGENO) {
            return null;
        }

        BufferedPageInfo pageInfo = null;

        //When no record, root page no is BTreeSpec.INVALID_PAGENO
        //Usually the page we get should not stay at freelist. But there is one exception:
        //With txn, each txn get LDP from GLDP list. Thus assuming
        //(1) txn1 get a LDP (pageNumber) from GLDP list and remove it from the list and
        //paused due to OS scheduler
        //(2) At this moment, txn2 deletes all nodes in this page
        //, finds that it is not a LDP by isLatestDataPage() (checking a page in its own LDP
        //list and GLDP list), then frees this page.
        //(3) Later txn1 resumes getting the LDP page with the pagenumber it got at (1), then
        // finds it has been freed. The following exception occurs!!!!!!!!
        //Hereby, when we get a LDP, we must catch this exception and get a new LDP. See codes
        //BTreePage.insertNode().
        byte[] page = null;

        if (Debug.DEBUG_PERFORMANCE) {
            Debug.increaseHitCount();
        }

        if ((page = getPageFromCache(pNumber)) != null) {
            if (!checkPageNumber(pNumber, page)) {
                throw new ChaiDBException(ErrorCode.BTREE_INVALID, "inconsistent page number:" + pNumber);
            }
            return page;
        }

        // read raw page from hard storage
        byte[] page_temp = null;

        synchronized (this) {
            if (!_tempBuf.isEmpty()) {
                page_temp = (byte[]) _tempBuf.removeFirst();
            } else {
                page_temp = new byte[BTreeSpec.PAGE_SIZE];
            }
        }

        if (Debug.DEBUG_PERFORMANCE) {
            beginTime = System.currentTimeMillis();
        }

        String filename = btreeName2Id.getBTreeName(treeid);

        if (filename == null) {
            logger.error("PageBufferManager::getPage The file name is null. The tree id " + treeid);
        } else {
            if (filename.equals("")) {
                logger.error("PageBufferManager::getPage The file name is empty. The tree id " + treeid);
            }
        }

        if (!_storage.readPage(filename, pNumber, page_temp)) { //xxx

            return null;
        }

        if (Debug.DEBUG_CHECKPAGENUMBER) {
            if (!Db.getTxnManager().isOnRecovery()) {
                if (!Debug.checkPageNumber(pNumber, page_temp)) {
                    logger.fatal(new Throwable("there is hole in GetPage!"));
                    logger.fatal("The file name is :" + filename);
                    new Throwable().printStackTrace(System.err);
                    dump();
                    Db.getLogManager().flush();

                    if (Debug.DEBUG_DUMP_MEMORY) {
                        dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                    }

                    Debug.dumpPageToLog(page_temp);

                    if (Debug.DEBUG_PAGEINFO_TRACE) {
                        Debug.pageHistory(pNumber);
                    }

                    if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
                        Debug.pageHistory(pNumber);
                    }

                    System.exit(-1);
                }
            }
        }

        synchronized (this) {
            try {
                if (Debug.DEBUG_PERFORMANCE) {
                    startTime = System.currentTimeMillis();
                }

                pageInfo = (BufferedPageInfo) bufferedPageInfos.get(pNumber);

                if (pageInfo != null) { // found the page in buffer
                    fix(pageInfo, true);
                    pageInfo.incHitCount();
                    page = bufferedPages[pageInfo.getPageLocation()];
                    _tempBuf.addLast(page_temp);

                    if (Debug.DEBUG_CHECKPAGENUMBER) {
                        if (!Db.getTxnManager().isOnRecovery()) {
                            if (!Debug.checkPageNumber(pNumber, page)) {
                                logger.fatal(new Throwable("there is hole in GetPage!"));
                                new Throwable().printStackTrace(System.err);
                                dump();
                                Db.getLogManager().flush();

                                if (Debug.DEBUG_DUMP_MEMORY) {
                                    dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                                }

                                Debug.dumpPageToLog(page);

                                if (Debug.DEBUG_PAGEINFO_TRACE) {
                                    Debug.pageHistory(pNumber);
                                }

                                System.exit(-1);
                            }
                        }
                    }
                    if (!checkPageNumber(pNumber, page)) {
                        throw new ChaiDBException(ErrorCode.BTREE_INVALID, "inconsistent page number:" + pNumber);
                    }

                    return page;
                }

                // if buffer has space to hold more page
                if (freeSlots.size() > 0) {
                    // check free slots
                    int freeSlot = ((Integer) freeSlots.removeFirst()).intValue();
                    page = bufferedPages[freeSlot];
                    System.arraycopy(page_temp, 0, page, 0, BTreeSpec.PAGE_SIZE);
                    pageInfo = new BufferedPageInfo(treeid, freeSlot, pNumber);
                    fix(pageInfo, false);
                    bufferedPageInfos.put(pNumber, pageInfo);
                    _tempBuf.addLast(page_temp);

                    if (!checkPageNumber(pNumber, page)) {
                        throw new ChaiDBException(ErrorCode.BTREE_INVALID, "inconsistent page number:" + pNumber);
                    }
                    return page;
                }

                BufferedPageInfo tmpPageInfo = null;

                if (cleanList.getSize() <= RESERVED_CLEAN_SIZE) {
                    //If the size of dirtyList = 0 and the size of cleanList != 0
                    //Just jump to "kick out tailer from LRU clean list" diretly.
                    if ((dirtyList.getSize() == 0) && (cleanList.getSize() == 0)) {
                        //debug header==null exception
                        logger.error("FreePNUmbers=" + freePageNumbers.size());
                        logger.error("freeSlots=" + freeSlots.size());
                        this.dumpMemInfo(Debug.DUMP_ALL, "memory info");
                        Debug.pageHistoryAll();
                        System.exit(-1);

                        ////////////////////////////////////////////////////////////////
                        // If all the information printed above is important then
                        // let us return it in the details of the error message -ranjeet
                        String details = "FreePNUmbers=" + Integer.toString(freePageNumbers.size()) + "freeSlots=" + Integer.toString(freeSlots.size());

                        throw new ChaiDBException(ErrorCode.BTREE_BUFFER_OVERFLOW, details);
                    } else {
                        // NO space, need to kick out one page,
                        // Remember always kick out one clean page to replace one new space.
                        // Once the clean list is empty, the dirty page list will be flushed to underlying storage,
                        // And we need to move dirty page list into clean page list.
                        flushDirtyPages();
                    }
                }

                // kick out tailer from LRU clean list
                tmpPageInfo = cleanList.popBack();

                if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                    Debug.write(tmpPageInfo.getPageNumber(), 11, "[Pop back]");
                }

                checkExistInWriteBuffer(tmpPageInfo);

                try {
                    page = bufferedPages[tmpPageInfo.getPageLocation()]; //old page
                } catch (Exception e) {
                    if (tmpPageInfo == null) {
                        logger.error(new Throwable("PageBufferManager::getPage: tmpPageInfo is NULL."));
                    }

                    logger.error("PageBufferManager::getPage: The clean size is " + cleanList.getSize());
                    logger.error("PageBufferManager::getPage: The dirty size is " + dirtyList.getSize());

                    logger.error("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
                    logger.error("PageBufferManager::getPage: The clean list is: ");
                    logger.error(cleanList.toString());

                    logger.error("PageBufferManager::getPage: The dirty list is: ");
                    logger.error(dirtyList.toString());
                    logger.error("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");

                    //                    this.dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                    if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                        Debug.pageHistoryAll();
                    }
                }

                // remove it from hashmap
                bufferedPageInfos.remove(tmpPageInfo.getPageNumber());

                if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
                    Debug.write(tmpPageInfo.getPageNumber(), 11, "[bufferedPageInfos remove, and new page is ]" + pNumber.toHexString());
                    Debug.dumpPageToLog(page);
                }

                //Replace the old page by new page
                System.arraycopy(page_temp, 0, page, 0, BTreeSpec.PAGE_SIZE);
                pageInfo = new BufferedPageInfo(treeid, tmpPageInfo.getPageLocation(), pNumber);
                fix(pageInfo, false);
                bufferedPageInfos.put(pageInfo.getPageNumber(), pageInfo);
                _tempBuf.addLast(page_temp);

                if (!checkPageNumber(pNumber, page)) {
                    throw new ChaiDBException(ErrorCode.BTREE_INVALID, "inconsistent page number:" + pNumber);
                }

                return page;
            } finally {
                if (Debug.DEBUG_PERFORMANCE) {
                    endTime = System.currentTimeMillis();
                    Debug.getPageTime += (endTime - startTime);
                    Debug.getTotalPageTime += (endTime - beginTime);
                }
            }
        }

        //end synchronized
    }

    /**
     * Flush dirty pages to underlying storage
     * Before flush dirty pages, the log info will be flush at first.
     */
    private void flushDirtyPages() {
        long startTime = 0;
        long endTime;

        if (Debug.DEBUG_PERFORMANCE) {
            startTime = System.currentTimeMillis();
        }

        if (cleanList.getSize() <= RESERVED_CLEAN_SIZE) {
            synchronized (writeLock) {
                while (writeSize != 0) {
                    try {
                        if (Debug.DEBUG_PERFORMANCE) {
                            Debug.blockOnRealFlush++;
                        }

                        writeLock.wait();
                    } catch (InterruptedException e) {
                        logger.warn("Buffer: flush dirty page process is interrupted.");
                        return;
                    }
                }
            }
        }

        BufferedPageInfo[] tmpBPIs = null;

        if (dirtyList.getSize() <= KICKOUT_DIRTY_SIZE) {
            writeSize = dirtyList.getSize();
        } else {
            writeSize = KICKOUT_DIRTY_SIZE;
        }

        tmpBPIs = new BufferedPageInfo[writeSize];

        Iterator iter = dirtyList.getIterator();

        for (int j = 0; j < writeSize; j++) {
            tmpBPIs[j] = (BufferedPageInfo) iter.next();
            writeBufferedPages[j] = ByteTool.copyByteArray(bufferedPages[tmpBPIs[j].getPageLocation()], 0, BTreeSpec.PAGE_SIZE);
            writePageNumber[j] = new PageNumber(tmpBPIs[j].getPageNumber());
        }

        for (int j = 0; j < writeSize; j++) {
            dirtyList.removeItem(tmpBPIs[j]);

            if (Debug.DEBUG_DIRTY_LIST_TRACE) {
                Debug.write(tmpBPIs[j].getPageNumber(), 11, "[Remove Item]");
            }

            tmpBPIs[j].clean();
            cleanList.addFront(tmpBPIs[j]);

            if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                Debug.write(tmpBPIs[j].getPageNumber(), 11, "[Add front]");
            }
        }
        AbstractDaemonThread bufWriter = DaemonThreadManager.getInstance().getThread(BufferWriter.THREADNAME);

        ((BufferWriter) bufWriter).flushBuffer();

        if (Debug.DEBUG_PERFORMANCE) {
            endTime = System.currentTimeMillis();
            Debug.flushTime += (endTime - startTime);
            Debug.flushTimes++;
        }
    }

    /**
     * Get a reference to a freed page in the buffer, with fixed bit on, dirty bit on;
     * when the page got kicked off, write it into pagedFile if it is dirty.
     *
     * @param btreeSpec
     * @param fix       Whether force to used the suggested pageNumber
     * @return The real pageNumber, and byte[] page in buffer
     */
    public Page getFreePage(BTreeSpec btreeSpec, boolean fix, KernelContext kc) throws ChaiDBException {
        int treeid = btreeSpec.btree.getBtreeId();

        Page pinfo = new Page();
        BufferedPageInfo pageInfo = null;
        long startTime = 0;
        long endTime = 0;
        long beginTime = 0;

        try {
            // get the free page number in the file if any
            PageNumber freePageNumber = null;

            while ((freePageNumber == null) || isReservedPage(freePageNumber)) {
                if (fix || ((freePageNumber = getFreePageNumber(treeid, kc)) == null)) {
                    freePageNumber = btreeSpec.expandANewPage(kc);
                }
            }

            pinfo.pageNumber = new PageNumber(freePageNumber);

            if (Debug.DEBUG_PERFORMANCE) {
                beginTime = System.currentTimeMillis();
            }

            //here we synchronize, avoiding deadlock with btreeSpec.expandANewPage(kc)
            synchronized (this) {
                try {
                    if (Debug.DEBUG_PERFORMANCE) {
                        startTime = System.currentTimeMillis();
                    }

                    if (freeSlots.size() > 0) {
                        int freeLoc = ((Integer) freeSlots.removeFirst()).intValue();
                        pinfo.pageData = bufferedPages[freeLoc];
                        pageInfo = new BufferedPageInfo(treeid, freeLoc, pinfo.pageNumber);
                        fix(pageInfo, false);
                        bufferedPageInfos.put(pinfo.pageNumber, pageInfo);

                        return pinfo;
                    }

                    // NO space, need to kick out one page,
                    // Remember always kick out one clean page to replace one new space.
                    // Once the clean list is empty, the dirty page list will be flushed to underlying storage,
                    // And we need to move dirty page list into clean page list.
                    BufferedPageInfo tmpPageInfo = null;

                    if (cleanList.getSize() <= RESERVED_CLEAN_SIZE) {
                        //If the size of dirtyList = 0 and the size of cleanList != 0
                        //Just jump to "kick out tailer from LRU clean list" diretly.
                        if ((dirtyList.getSize() == 0) && (cleanList.getSize() == 0)) {
                            String details = "Run out of buffer!!";
                            throw new ChaiDBException(ErrorCode.BTREE_BUFFER_OVERFLOW, details);
                        } else {
                            flushDirtyPages();
                        }
                    }

                    tmpPageInfo = cleanList.popBack();

                    if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                        Debug.write(tmpPageInfo.getPageNumber(), 11, "[Pop back]");
                    }

                    checkExistInWriteBuffer(tmpPageInfo);

                    pinfo.pageData = bufferedPages[tmpPageInfo.getPageLocation()]; //old page

                    // remove it from hashmap
                    bufferedPageInfos.remove(tmpPageInfo.getPageNumber());

                    if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
                        Debug.write(tmpPageInfo.getPageNumber(), 11, "[bufferedPageInfos remove and new page is ]" + pinfo.pageNumber.toHexString());
                        Debug.dumpPageToLog(pinfo.pageData);
                    }

                    pageInfo = new BufferedPageInfo(treeid, tmpPageInfo.getPageLocation(), pinfo.pageNumber);

                    fix(pageInfo, false);
                    bufferedPageInfos.put(pinfo.pageNumber, pageInfo);

                    return pinfo;
                } finally {
                    if (Debug.DEBUG_PERFORMANCE) {
                        endTime = System.currentTimeMillis();
                        Debug.getPageTime += (endTime - startTime);
                        Debug.getTotalPageTime += (endTime - beginTime);
                    }
                }
            }
        } finally {
            if (Debug.DEBUG_PAGEINFO_TRACE1) {
                Debug.write(pinfo.getPageNumber(), Debug._CALLER_LEVEL, " [C] pg=" + Integer.toHexString(pinfo.getPageNumber().getPageNumber()) + " fix=" + pageInfo.getFixed());
            }
        }
    }

    /**
     * add by Stanley
     * check whecher the Current Directory has the treeid file whose filenumber is fileno
     * if it has it return true else false;
     *
     * @param treeid
     * @param fileno
     * @return boolean
     */
    public boolean checkTreeExistinCurrentDir(int treeid, int fileno) {
        String filePath = null;
        filePath = btreeName2Id.getBTreeName(treeid);
        return _storage.curDirHasFile(filePath, fileno);
    }

    public void addVirtualFile(int treeid, int fileno) {
        String filePath = null;
        filePath = btreeName2Id.getBTreeName(treeid);
        _storage.addVirtualFile(filePath, fileno);
    }

    /**
     * Check whether the special btree page is flushing by deamon thread.
     * If true, the thread will be blocked. Otherwise continue.
     *
     * @param tmpPageInfo The special btree page.
     */
    private void checkExistInWriteBuffer(BufferedPageInfo tmpPageInfo) {
        synchronized (writeLock) {
            while (writeSize != 0) {
                int i = 0;
                boolean waitCondition = false;

                for (; i < writeSize; i++) {
                    if (tmpPageInfo.getPageNumber().equ(writePageNumber[i])) {
                        try {
                            if (Debug.DEBUG_PERFORMANCE) {
                                Debug.blockOnWrite++;
                            }

                            if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
                                Debug.write(tmpPageInfo.getPageNumber(), 11, "[checkExistInWriteBuffer ]");
                            }

                            writeLock.wait();
                            waitCondition = true;

                            break;
                        } catch (InterruptedException e) {
                            logger.warn("Buffer: check buffer process is interrupted.");
                            return;
                        }
                    }
                }

                if ((i == writeSize) && (waitCondition == false)) {
                    break;
                }
            }
        }
    }

    /**
     * Changes a page from fixed to unfixed, and move it to look aside link list.
     * If the page number is not found in the buffer, no action is taken.
     *
     * @param pNumber The page number
     * @param dirty   Whether the page has become dirty
     */
    public void releasePage(int treeid, PageNumber pNumber, boolean dirty) {
        long startTime = 0;
        long endTime = 0;
        long beginTime = 0;

        if (Debug.DEBUG_PERFORMANCE) {
            beginTime = System.currentTimeMillis();
        }

        synchronized (this) {
            try {
                if (Debug.DEBUG_PERFORMANCE) {
                    startTime = System.currentTimeMillis();
                }

                BufferedPageInfo pageInfo = (BufferedPageInfo) bufferedPageInfos.get(pNumber);

                if (pageInfo == null) {
                    return;
                }

                if (!(pageInfo.isFix())) {
                    return;
                }

                if (dirty) {
                    pageInfo.dirty();

                    // JUSTIN ADD for track page changes
                    onChange(pNumber);
                }

                pageInfo.unFix();

                if (!(pageInfo.isFix())) {
                    if (pageInfo.isDirty()) {
                        dirtyList.addFront(pageInfo);

                        if (Debug.DEBUG_DIRTY_LIST_TRACE) {
                            Debug.write(pageInfo.getPageNumber(), 11, "[Add Front]");
                        }
                    } else {
                        cleanList.addFront(pageInfo);

                        if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                            Debug.write(pageInfo.getPageNumber(), 11, "[Add Front]");
                        }
                    }
                }
            } finally {
                if (Debug.DEBUG_PERFORMANCE) {
                    endTime = System.currentTimeMillis();
                    Debug.getPageTime += (endTime - startTime);
                    Debug.getTotalPageTime += (endTime - beginTime);
                }
            }
        }

        if (PROTECT_PAGE) {
            ThreadRecorder tr = (ThreadRecorder) threadRecorders.get(Thread.currentThread());

            if (tr != null) {
                tr.releasePage(pNumber);
            }
        }
    }

    /**
     * Write meta data of a btree
     *
     * @param treeid
     * @param forceWrite true to write forcely. false, only if btree is ready,
     *                   metapage is written to disk.
     * @throws ChaiDBException
     */
    public synchronized void writeMetaPage(int treeid, boolean forceWrite) throws ChaiDBException {
        OpenedBTree ob = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);

        if (ob == null) {
            String details = "Fatal error in writeMetaPage. Btree not found. TreeId=" + treeid + ".";
            throw new ChaiDBException(ErrorCode.BTREE_NOT_FOUND, details);
        }

        IBTree t = ob.btree;

        if (!forceWrite && !t.isReady()) {
            return;
        }

        BTreeSpec btreeSpec = t.getBTreeSpec();

        //        btreeSpec.setModified(true);
        byte[] metaPage = fillMetaPage(treeid);
        boolean msbFirst = btreeSpec.getMsbFirst();

        /* save latest dataPage PageNumber to new page */
        PageNumber savedPage = saveLatestDataPageNumber(ob, btreeSpec);
        System.arraycopy(ByteTool.intToBytes(savedPage.getPageNumber(), msbFirst), 0, metaPage, BTreeSpec.LATEST_DATA_PAGE_LIST_OFF, 4); // latest data page number **XZ**

        /* save freelist to new Page */
        LinkedList freeList = ob.getFreePageList();

        if (freeList != null) {
            int size = freeList.size();

            // if there are more free pages than the meta page can hold, just forget
            // at this point; later might add overflow pages just for free page list #######
            int leftSpace = (BTreeSpec.PAGE_SIZE - BTreeSpec.FREE_PAGE_LIST_OFF - 4) / 4;

            boolean moreFreeList = false;
            PageNumber newPageNum = new PageNumber();
            int begin = 0;

            if (size > leftSpace) {
                moreFreeList = true;

                int[] freeListPointer = new int[1]; //for return value

                /* tpnlist'size will be modified while save freePage to new page */
                newPageNum = saveFreeListToNewPage(treeid, btreeSpec, leftSpace - 1, freeList, freeList, freeListPointer);
                size = leftSpace - 1;
                begin = freeListPointer[0];
            }

            for (int i = 0; i < size; i++) {
                PageNumber freePage = (PageNumber) freeList.get(i + begin);
                System.arraycopy(ByteTool.intToBytes(freePage.getPageNumber(), msbFirst), 0, metaPage, BTreeSpec.FREE_PAGE_LIST_OFF + 4 + (i * 4), 4);
            }

            /* the last 4 bytes to save the next page to contain free page number */
            System.arraycopy(ByteTool.intToBytes(freeList.size(), msbFirst), 0, metaPage, BTreeSpec.FREE_PAGE_LIST_OFF, 4);

            if (moreFreeList) {
                System.arraycopy(ByteTool.intToBytes(newPageNum.getPageNumber(), msbFirst), 0, metaPage, BTreeSpec.FREE_PAGE_LIST_OFF + 4 + (size * 4), 4);
            }
        } else {
            System.arraycopy(ByteTool.intToBytes(0, msbFirst), 0, metaPage, BTreeSpec.FREE_PAGE_LIST_OFF, 4);
        }

        System.arraycopy(ByteTool.intToBytes(btreeSpec.getPageNumber().getPageNumber(), msbFirst), 0, metaPage, BTreeSpec.NEXT_AVAILABLE_PAGE_OFF, 4);

        String filename = btreeName2Id.getBTreeName(treeid);

        if (filename == null) {
            logger.error("PageBufferManager::writeMetaPage The file name is null. The tree id " + treeid);
        } else {
            if (filename.equals("")) {
                logger.error("PageBufferManager::writeMetaPage The file name is empty. The tree id " + treeid);
            }
        }

        _storage.writePage(filename, new PageNumber(treeid, 0, 0), metaPage);

        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
        openedBTree.resetReservedList();
    }

    public synchronized void dump(boolean sync) {
        Iterator trees = btreeName2Id.getAllBTreeIds();

        while (trees.hasNext()) {
            int tid = ((Integer) trees.next()).intValue();

            try {
                flushMetaPage(tid);
                dump(tid);
            } catch (Exception e) {
                logger.error(e);
            }
        }

        if (sync) {
            _storage.sync();
        }
    }

    public synchronized void dump() {
        dump(false);
    }

    /**
     * Dump all pages in the buffer to pagedFile; if the page is dirty, need to
     * write the page; otherwise just skip it.
     */
    public void dump(int treeid) throws ChaiDBException {
        synchronized (this) {
            _dump(treeid);
        }
    }

    private void _dump(int treeid) throws ChaiDBException {
        // Although this method seems to dump all pages of one tree
        // to disk. But since one page with fixed>0 may be dirty later
        // and is rewritten to disk, write ONLY fixed==0(in look-aside list)
        // && dirty page is meaningful. Maybe in future we will adopt
        // this strategy.
        if (dirtyList.getSize() == 0) {
            return;
        }

        synchronized (writeLock) {
            while (writeSize != 0) {
                try {
                    if (Debug.DEBUG_PERFORMANCE) {
                        Debug.blockOnRealFlush++;
                    }

                    writeLock.wait();
                } catch (InterruptedException e) {
                    logger.warn("Buffer: dump process is interrupted.");
                    return;
                }
            }
        }

        Db.getLogManager().flush();

        HashMap tmpBPIs = new HashMap();

        int dirtySize = dirtyList.getSize();
        BufferedPageInfo tmpBPI = null;
        Iterator iter = dirtyList.getIterator();

        for (int j = 0; j < dirtySize; j++) {
            tmpBPI = (BufferedPageInfo) iter.next();

            if (tmpBPI.getPageNumber().getTreeId() == treeid) {
                String filename = btreeName2Id.getBTreeName(treeid);

                if (filename == null) {
                    logger.error("PageBufferManager::_dump(int) The file name is null. The tree id " + treeid);
                } else {
                    if (filename.equals("")) {
                        logger.error("PageBufferManager::_dump(int) The file name is empty. The tree id " + treeid);
                    }
                }

                if (Debug.DEBUG_CHECKPAGENUMBER) {
                    if (!Db.getTxnManager().isOnRecovery()) {
                        if (!Debug.checkPageNumber(tmpBPI.getPageNumber(), bufferedPages[tmpBPI.getPageLocation()])) {
                            logger.fatal(new Throwable("there is hole in GetPage!"));
                            new Throwable().printStackTrace(System.err);
                            dump();
                            Db.getLogManager().flush();

                            if (Debug.DEBUG_DUMP_MEMORY) {
                                dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                            }

                            Debug.dumpPageToLog(bufferedPages[tmpBPI.getPageLocation()]);

                            if (Debug.DEBUG_PAGEINFO_TRACE) {
                                Debug.pageHistory(tmpBPI.getPageNumber());
                            }

                            System.exit(-1);
                        }
                    }
                }

                _storage.writePage(filename, tmpBPI.getPageNumber(), bufferedPages[tmpBPI.getPageLocation()]);
                tmpBPIs.put(tmpBPI, tmpBPI);
            }
        }

        Iterator iter1 = tmpBPIs.values().iterator();

        while (iter1.hasNext()) {
            tmpBPI = (BufferedPageInfo) iter1.next();
            dirtyList.removeItem(tmpBPI);

            if (Debug.DEBUG_DIRTY_LIST_TRACE) {
                Debug.write(tmpBPI.getPageNumber(), 11, "[Remove Item]");
            }

            tmpBPI.clean();
            cleanList.addFront(tmpBPI);

            if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                Debug.write(tmpBPI.getPageNumber(), 11, "[Add front]");
            }
        }

        if (Debug._DEBUG) {
            logger.debug("Flush dirty pages of " + getBTreeName(treeid));
        }
    }

    /**
     * Gets btree spec instance by given btreeId. for recovery
     *
     * @param treeId
     * @param btreeType
     * @return
     * @throws ChaiDBException
     */
    public BTreeSpec getBTreeSpec(int treeId, short btreeType) throws ChaiDBException {
        try {
            IDBIndex tree;
            BTreeSpec treeSpec = null;

            if (btreeName2Id.getBTreeName(treeId) == null) {
                tree = BTreeFactory.createBTree(btreeType);

                String btreeFileName = retrieveBTreeFileName(treeId);
                tree.openForRecovery(btreeFileName, treeId);

                if (tree instanceof IBTree) {
                    IBTree tmpTree = (IBTree) tree;
                    treeSpec = tmpTree.getBTreeSpec();
                }
            } else {
                treeSpec = getBTreeSpec(treeId);
            }

            return treeSpec;
        } catch (ChaiDBException e) {
            logger.error(e);
            throw e;
        }
    }

    /**
     * Gets btree spec instance by given btree file name.
     *
     * @param btreeFileName The btree file name.
     * @return The btree spec associating with the btree with btree file name.
     */
    public BTreeSpec getBTreeSpec(String btreeFileName, short btreeType, KernelContext kc) throws ChaiDBException {
        try {
            Integer tID;
            int tid;
            IDBIndex tree;
            BTreeSpec treeSpec = null;

            if ((tID = btreeName2Id.getBTreeId(btreeFileName)) == null) {
                //                tree = new BTree();
                tree = BTreeFactory.createBTree(btreeType);
                tree.open(btreeFileName, kc);

                if (tree instanceof IBTree) {
                    IBTree tmpTree = (IBTree) tree;
                    tid = tmpTree.getBtreeId();
                    treeSpec = tmpTree.getBTreeSpec();
                }
            } else {
                tid = tID.intValue();
                treeSpec = getBTreeSpec(tid);
            }

            return treeSpec;
        } catch (ChaiDBException e) {
            logger.error(e);
            throw e;
        }
    }

    /**
     * Get page specified by fileName, fileId and pgno.
     * Only used in recovery. Here we may:
     * (1) get this BTree from buffer
     * (2) if (1) fail, load BTree from disk
     * (3) if (2) fail too, create BTree and load it in buffer
     * todo fix new btree() problem
     */
    public byte[] getPage(int txnId, int treeId, short btreeType, int pgno, boolean redo, ArrayList locks) throws ChaiDBException {
        IDBIndex tree;

        if (btreeName2Id.getBTreeName(treeId) == null) {
            tree = BTreeFactory.createBTree(btreeType);

            String btreeFileName = retrieveBTreeFileName(treeId);
            tree.openForRecovery(btreeFileName, treeId);
        }

        PageNumber p = new PageNumber(pgno);
        p.setTreeId(treeId);

        //if redo, we must get a free page!
        //if undo and the specified page does not exist, return null to do nothing for undo.
        byte[] page = null;
        isRecovery = true;

        if (redo) {
            page = getSpecificPage(treeId, p, txnId);
        } else {
            page = getPage(treeId, p);
        }

        isRecovery = false;

        return page;
    }

    /**
     * Get thre filename via tree id
     *
     * @return filename if found;Otherwise, null
     */
    public String getBTreeName(int tid) throws ChaiDBException {
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(tid);

        if (openedBTree == null) {
            throw new ChaiDBException(ErrorCode.BTREE_NOT_FOUND, "BTree not found, treeid=" + tid + ", btreename is " + retrieveBTreeFileName(tid) + ": Collection may be deleted");
        }

        IBTree tree = openedBTree.btree;
        BTreeSpec spec = tree.getBTreeSpec();

        return spec.getBtreeName();
    }

    public void releasePage(int treeId, int pgno, boolean dirty) {
        if (treeId != -1) {
            PageNumber p = new PageNumber(pgno);
            p.setTreeId(treeId);
            releasePage(treeId, p, dirty);
        }
    }

    /**
     * add one freed page number to the freeList for recover
     *
     * @param treeId
     * @param pgNo
     * @param txnId
     */
    public synchronized void addToFreeList(int treeId, int pgNo, Integer txnId) throws ChaiDBException {
        PageNumber p = new PageNumber(pgNo);
        p.setTreeId(treeId);
        isRecovery = true;
        addToFreeList(treeId, p, txnId);
        isRecovery = false;
    }

    /**
     * Set reserved list of a btree. this is called when open a btree.
     *
     * @param treeid
     * @param list
     */
    public void setReserveredList(int treeid, LinkedList list) {
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
        openedBTree.setReservedList(list);
    }

    /**
     * update BTreeSpec while recover
     */
    public boolean updateBTreeSpec(int treeId, int savedPageNum, int flag, Integer txnId, byte[] docID, short btreeType) {
        try {
            BTreeSpec btreeSpec = getBTreeSpec(treeId, btreeType);
            btreeSpec.setModified(true);

            PageNumber pgNum = null;

            if ((flag != BTreeSpecLogRecord.LAYER_FLAG) && (flag != BTreeSpecLogRecord.NODE_SIZE_FLAG)) {
                pgNum = new PageNumber(savedPageNum);
                pgNum.setTreeId(btreeSpec.btree.getBtreeId());
            }

            if (!Db.getTxnManager().isOnRecovery()) {
                PageNumber p0 = new PageNumber(btreeSpec.btree.getBtreeId(), 0, 0);

                Db.getLockManager().get(txnId.intValue(), LockManager.LOCK_WAITING, LockManager.OBJECT_BTREE, p0.uniqueID(), LockManager.LOCK_WRITE);
            }

            switch (flag) {
                case BTreeSpecLogRecord.PAGE_NUMBER_FLAG: {
                    //                    if (btreeSpec.getPageNumber().getPageInFile() <= pgNum.getPageInFile()){
                    btreeSpec.getPageNumber().setPageNumber(pgNum);

                    //                        }
                    break;
                }

                case BTreeSpecLogRecord.ROOT_PAGE_NUMBER_FLAG: {
                    btreeSpec.setRootPageNumber(pgNum);

                    break;
                }

                case BTreeSpecLogRecord.DOC_ROOT_PAGE_NUMBER_FLAG: {
                    if (!Db.getTxnManager().isOnRecovery()) {
                        ((Id2nodeBTree) (btreeSpec.btree)).updateDocRootForRollBack(docID, pgNum);
                    }

                    break;
                }

                case BTreeSpecLogRecord.LATEST_DATA_PAGE_NUMBER_FLAG: {
                    try {
                        ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

                        if (rcb == null) {
                            rcb = new ResourceControlBlock(txnId.intValue());
                            resources.put(txnId, rcb);
                        }

                        byte[] docBytes = new byte[4];
                        System.arraycopy(docID, 0, docBytes, docBytes.length - BTree.DOCID_SIZE, BTree.DOCID_SIZE);

                        int docid = ByteTool.bytesToInt(docBytes, 0, btreeSpec.getMsbFirst());

                        //if this txn has free page, then return it.
                        rcb.putLatestDataPage(docid, pgNum, false);
                    } catch (ChaiDBException e) {
                        logger.error(e);
                    }

                    break;
                }

                case BTreeSpecLogRecord.NODE_SIZE_FLAG: {
                    btreeSpec.setNodeSize(savedPageNum);

                    break;
                }

                case BTreeSpecLogRecord.LAYER_FLAG:
                    btreeSpec.setLayer((byte) savedPageNum, null);

                default:
            }

            return true;
        } catch (Exception e) {
            logger.error(e);

            return false;
        }
    }

    /**
     * Release resources occupied by a txn. This is called in the end of
     * a txn.
     *
     * @param txnId txn Id
     */
    public void releaseResource(int txnId) throws ChaiDBException {
        ResourceControlBlock rcb = (ResourceControlBlock) resources.remove(new Integer(txnId));

        /* if transaction delete node and abort in the end,there's no new resource */
        if (rcb != null) {
            rcb.release();
        }
    }

    public void flushChangedDocRoot(KernelContext kc) throws ChaiDBException {
        ResourceControlBlock rcb = (ResourceControlBlock) resources.get(new Integer(kc.getLocker()));

        if (rcb != null) {
            rcb.flushChangedDocRoot(kc);
        }
    }

    public void addChangedDoc(int txnId, Integer docID, int treeid) {
        ResourceControlBlock rcb = (ResourceControlBlock) resources.get(new Integer(txnId));

        /* if transaction delete node and abort in the end,there's no new resource */
        if (rcb == null) {
            rcb = new ResourceControlBlock(txnId);
            resources.put(new Integer(txnId), rcb);
        }

        rcb.addChangedDoc(docID, treeid);
    }

    /**
     * Release all resources of all txns. Called just after recovery.
     */
    public void releaseAllTxnResources() throws ChaiDBException {
        Enumeration resourcesenum = resources.elements();

        while (resourcesenum.hasMoreElements()) {
            ResourceControlBlock rcb = (ResourceControlBlock) resourcesenum.nextElement();
            rcb.release();
        }

        resources = new Hashtable();
    }

    /**
     * Release all pages hold by a wounded thread, which means exceptions or errors
     * occurs during the thread's operations in this btree.
     *
     * @param treeid the tree this thread will exit eaxctly!
     */
    public void releaseHoldedPage(int treeid) {
        ThreadRecorder tr = (ThreadRecorder) threadRecorders.get(Thread.currentThread());

        if (tr == null) {
            return;
        }

        tr.end(treeid);

        //if tr still hold pages of other btree, don't remove it.
        //This may occurs when a thread call BTree.store() or BTree.delete()
        //recursively, which is at BTree1 -> LogManagerImpl -> BTree2, with txn.
        if (!tr.hasMoreFixedPage()) {
            threadRecorders.remove(Thread.currentThread());
        }
    }

    public synchronized void dCloseBTree(String path) throws ChaiDBException {
        Integer id = btreeName2Id.getBTreeId(path);

        if (id != null) {
            close(id.intValue());
        }
    }

    /**
     * Close all opened btrees under a collection( path) forcely
     *
     * @param fpath
     */
    public synchronized void dCloseAllTreesofCollection(File fpath) {
        String path = fpath.getAbsolutePath();

        if (!path.endsWith(File.separator)) {
            path += File.separator;
        }

        Iterator it = btreeName2Id.getAllBTreeNames();

        logger.debug("Begin close files under the collection:" + path);

        int i = 0;
        StringBuffer res = new StringBuffer();

        while (it.hasNext()) {
            String files = (String) it.next();

            if (files.startsWith(path)) {
                try {
                    Integer tID = btreeName2Id.getBTreeId(files);

                    if (tID == null) {
                        logger.error(" Close unopened btree " + files);
                    } else {
                        close(tID.intValue());
                    }

                    i++;
                    res.append(files + "->");
                } catch (Exception e) {
                    logger.error(e);
                }
            }
        }

        logger.debug("Succeed closing " + i + " files:" + res.toString());
    }

    /**
     * close all btrees for finishing recovery
     */
    public synchronized void closeAllBTrees() {
        Iterator it = btreeName2Id.getAllBTreeNames();

        /* add by Stanley to fix ConcurrentModificationException*/
        ArrayList nameList = new ArrayList();
        while (it.hasNext()) {
            nameList.add(it.next());
        }
        it = nameList.iterator();
        /*end by Stanley*/

        logger.debug("Begin close all BTree files ");

        int i = 0;
//        StringBuffer res = new StringBuffer();

        while (it.hasNext()) {
            String files = (String) it.next();

            try {
                if (btreeName2Id.getBTreeId(files) != null) {
                    close(btreeName2Id.getBTreeId(files).intValue());
                    i++;
//                    res.append(files + "->");
                }
            } catch (Exception e) {
                logger.error(e);
            }
        }

        /**
         * Clear id storage. this is for recovery getting id storage
         * from different data home.
         */
        idStorageList = new Hashtable();
        idStorage = null;

        logger.debug("Succeed closing " + i + " files.");
    }

    /**
     * Build the first meta page 0 from BTreeSpec, and write it back to disk.
     */
    private synchronized void flushMetaPage(int treeid) {
        OpenedBTree ob = null;
        IBTree t = null;

        try {
            ob = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
            t = ob.btree;

            if (t.getBTreeSpec().isModified()) {
                writeMetaPage(treeid, false);
            }
        } catch (ChaiDBException e) {
            logger.error(e);
        } finally {
            if (t != null) {
                t.getBTreeSpec().setModified(false);
            }
        }
    }

    /**
     * Do page unfix. If none refer this page, remove it from usedlist and
     * add in look-aside list.
     *
     * @param isMove if it needs add into look-aside list
     */
    private synchronized void unFix(BufferedPageInfo pageInfo, boolean isMove) {
        if (isMove) {
            pageInfo.unFix();
        } else {
            pageInfo.resetFix(); //called by addToFreeList to free this page
        }

        if (PROTECT_PAGE) {
            if (isMove) {
                ((ThreadRecorder) threadRecorders.get(Thread.currentThread())).releasePage(pageInfo.getPageNumber());
            } else {
                ((ThreadRecorder) threadRecorders.get(Thread.currentThread())).resetPage(pageInfo.getPageNumber());
            }
        }

        if (!pageInfo.isFix()) {
            if (isMove) {
                // insert it to the header of the clean list if the page info is clean,
                // otherwise to the header of the dirty list.
                if (pageInfo.isDirty()) {
                    dirtyList.addFront(pageInfo);

                    if (Debug.DEBUG_DIRTY_LIST_TRACE) {
                        Debug.write(pageInfo.getPageNumber(), 11, "[Add Front]");
                    }
                } else {
                    cleanList.addFront(pageInfo);

                    if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                        Debug.write(pageInfo.getPageNumber(), 11, "[Add front]");
                    }
                }
            }
        }
    }

    private synchronized BTreeSpec getBTreeSpec(int treeid) {
        OpenedBTree ob = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);

        if (ob != null) {
            return ob.btree.getBTreeSpec();
        }

        return null;
    }

    /**
     * remove a pageInfo from look aside list, add into used list and fix it
     *
     * @param isMove if it needs removing from look-aside list
     */
    private void fix(BufferedPageInfo pageInfo, boolean isMove) {
        if (!pageInfo.isFix()) {
            if (isMove) {
                logger.debug("fix page and remove it from look-aside list.");
                // remove it from dirty list if the pageinfo is dirty, otherwise removing it from clean list.
                if (pageInfo.isDirty()) {
                    dirtyList.removeItem(pageInfo);

                    if (Debug.DEBUG_DIRTY_LIST_TRACE) {
                        Debug.write(pageInfo.getPageNumber(), 11, "[remove item]");
                    }
                } else {
                    cleanList.removeItem(pageInfo);

                    if (Debug.DEBUG_CLEAN_LIST_TRACE) {
                        Debug.write(pageInfo.getPageNumber(), 11, "[Remove item]");
                    }
                }
            }
        }

        pageInfo.fix();

        if (PROTECT_PAGE) {
            ThreadRecorder tr = (ThreadRecorder) threadRecorders.get(Thread.currentThread());

            if (tr == null) {
                tr = new ThreadRecorder();
                threadRecorders.put(Thread.currentThread(), tr);
            }

            tr.fixPage(pageInfo.getPageNumber());
        }
    }

    /**
     * @return Free page number; or null if not any more
     */
    private synchronized PageNumber getFreePageNumber(int treeid, KernelContext kc) {
        PageNumber pn = null;

        try {
            if ((kc != null) && (kc.getTxn() != null)) {
                Integer txnId = new Integer(kc.getLocker());
                ResourceControlBlock rcb = (ResourceControlBlock) resources.get(txnId);

                //if this txn has free page, then return it.
                if ((rcb != null) && ((pn = rcb.removeFreePage(treeid)) != null)) {
                    freePageNumbers.remove(pn);

                    return pn;
                }
            }

            //Get a free page from global free page list
            OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
            pn = openedBTree.removeFreePage();

            if ((pn != null) && (freePageNumbers.remove(pn) == null)) {
                logger.error("[" + Thread.currentThread().getName() + "] time=" + System.currentTimeMillis() + "Error in BufferedPage.getFreePageNumber pg=" + pn.toHexString());
            }

            return pn;
        } finally {
            if (pn != null) {
                pn.setTreeId(treeid);
            }
        }
    }

    /**
     * Remove the page from btree's freelist and txn's freelist
     */
    private synchronized void makeituseful(int treeid, PageNumber pNumber, int txnId) {
        PageNumber t = (PageNumber) freePageNumbers.remove(pNumber);

        if (t != null) {
            ResourceControlBlock rcb = (ResourceControlBlock) resources.get(new Integer(txnId));

            if (rcb != null) {
                rcb.removeFreePage(pNumber);
            }

            //remove t from btree
            OpenedBTree ob = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);
            ob.removeFreePage(t);
        }
    }

    private boolean isReservedPage(PageNumber pn) {
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(pn.getTreeId());

        return openedBTree.isReservedPage(pn);
    }

    /**
     * save latest dataPage PageNumber to page
     * 0-4 bytes current page number
     * 4-8 bytes latest datapage pageNumber in current page
     * 4*nbytes latest datapage PageNumber per 4 bytes
     * and delete the new page from freelist
     *
     * @param ob
     * @return first page to save latest datapage PageNumber
     */
    private PageNumber saveLatestDataPageNumber(OpenedBTree ob, BTreeSpec btreeSpec) throws ChaiDBException {
        int treeId = ob.btree.getBtreeId();
        LinkedList pageNumberList = ob.getLatestDataPageList();

        if (pageNumberList.size() == 0) {
            return new PageNumber(treeId, -1, -1);
        }

        LinkedList freePageList = ob.getFreePageList();
        int[] freeListPointer = new int[1];

        return saveFreeListToNewPage(treeId, btreeSpec, 0, pageNumberList, freePageList, freeListPointer);
    }

    private Page getAReservedFreePage(int treeid) throws ChaiDBException {
        OpenedBTree openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);

        return openedBTree.getAReservedFreePage();
    }

    /**
     * for BufferedPage.writeMetaPage()
     * save left freelist page number except those in metapage
     * or save latest datapage pagenumber to new page
     * page structure:
     * 0-4bytes page number
     * 4-8bytes total freepage number on current page
     * 4*nbytes freepage number per 4 bytes
     * lastest 4 bytes save next page number if need more page.
     *
     * @param treeId                  btree id
     * @param btreeSpec
     * @param metaPageSaveFreeListNum
     * @param saveList                freePageList or latestDataPageList
     * @param allFreeList             FreePage's PageNumber list
     * @param freeListPointer         pointer to left FreePage location in allFreeList,
     *                                it's just one number for return value.
     * @return int the first new page number
     */
    private PageNumber saveFreeListToNewPage(int treeId, BTreeSpec btreeSpec, int metaPageSaveFreeListNum, LinkedList saveList, LinkedList allFreeList, int[] freeListPointer) throws ChaiDBException {
        PageNumber firstPageNum = new PageNumber(treeId, 0, 0);
        boolean msbFirst = btreeSpec.getMsbFirst();
        int freePageNumOffset = BTreeSpec.OFF_PAGENUMBER + BTreeSpec.PAGENUMBER_BYTE_SIZE;

        int pageTotalSpace = (btreeSpec.getPageSize() - BTreeSpec.PAGENUMBER_BYTE_SIZE - 4) / 4; //save total free page number in 4->8 bytes
        int newPageNum = 0;
        int endLeftPageNum = 0;

        if (saveList == allFreeList) {
            /* for save freelist to new page */
            /* n-x =a*x,n:total freelist size,aageTotalSpace-1, x:newPageNum */
            if (((saveList.size() - metaPageSaveFreeListNum) % pageTotalSpace) > 0) {
                newPageNum = ((allFreeList.size() - metaPageSaveFreeListNum) / pageTotalSpace) + 1;
            } else {
                newPageNum = (allFreeList.size() - metaPageSaveFreeListNum) / pageTotalSpace;
            }

            endLeftPageNum = (allFreeList.size() - metaPageSaveFreeListNum) - (((newPageNum - 1) * (pageTotalSpace - 1)) + newPageNum);
        } else {
            /* for save latest datapage list to new page */
            newPageNum = (saveList.size() - metaPageSaveFreeListNum) / (pageTotalSpace - 1);
            endLeftPageNum = (saveList.size() - metaPageSaveFreeListNum) % (pageTotalSpace - 1);

            if (endLeftPageNum > 0) {
                newPageNum++;
            }
        }

        byte[][] pageList = new byte[newPageNum][btreeSpec.getPageSize()];

        /* new all pages first,and remove them from allFreeList */
        for (int i = 0; i < newPageNum; i++) {
            Page pi = getAReservedFreePage(treeId);
            byte[] page = pi.getPageData();

            /* end: get a new page */
            pageList[i] = page;

            if ((allFreeList != null) && (allFreeList.size() > 0)) {
                allFreeList.remove(pi.pageNumber);
            }

            releasePage(treeId, pi.pageNumber, true);
        }

        /* save freePage to new created pages */
        for (int i = 0; i < newPageNum; i++) {
            byte[] page = pageList[i];
            int size = 0; //ByteTool.bytesToInt(page,freePageNumOffset,msbFirst);
            PageNumber currentPageNum = new PageNumber(ByteTool.bytesToInt(page, BTreeSpec.OFF_PAGENUMBER, msbFirst));
            currentPageNum.setTreeId(treeId);

            if (i == 0) {
                firstPageNum = new PageNumber(currentPageNum);
            }

            int nextPageNum;

            if (i == (newPageNum - 1)) {
                nextPageNum = -1;
                size = endLeftPageNum;
            } else {
                nextPageNum = ByteTool.bytesToInt(pageList[i + 1], BTreeSpec.OFF_PAGENUMBER, msbFirst);
                size = pageTotalSpace - 1;
            }

            if (Config.DEBUG_CODE) {
                if (size > 1000) {
                    logger.error("check point store too many pages:size = " + size);
                }
            }

            for (int j = 0; j < size; j++) {
                PageNumber freePage = (PageNumber) saveList.get(j + (i * (pageTotalSpace - 1)));
                System.arraycopy(ByteTool.intToBytes(freePage.getPageNumber(), msbFirst), 0, page, freePageNumOffset + 4 + (j * 4), 4);
            }

            /* saved total freePage number on current page */
            System.arraycopy(ByteTool.intToBytes(size), 0, page, freePageNumOffset, 4);

            /* set next page number */
            if ((nextPageNum > 0) || ((nextPageNum < 0) && (size < pageTotalSpace))) {
                System.arraycopy(ByteTool.intToBytes(nextPageNum, msbFirst), 0, page, freePageNumOffset + 4 + (size * 4), 4);
            }

            /* write page to file */
            String filename = btreeName2Id.getBTreeName(treeId);

            if (filename == null) {
                logger.error("PageBufferManager::saveFreeListToNewPage The file name is null. The tree id " + treeId);
            } else {
                if (filename.equals("")) {
                    logger.error("PageBufferManager::saveFreeListToNewPage The file name is empty. The tree id " + treeId);
                }
            }

            if (Debug.DEBUG_CHECKPAGENUMBER) {
                if (!Db.getTxnManager().isOnRecovery()) {
                    if (!Debug.checkPageNumber(currentPageNum, page)) {
                        logger.fatal(new Throwable("there is hole in GetPage!"));
                        new Throwable().printStackTrace(System.err);
                        dump();
                        Db.getLogManager().flush();

                        if (Debug.DEBUG_DUMP_MEMORY) {
                            dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                        }

                        Debug.dumpPageToLog(page);

                        if (Debug.DEBUG_PAGEINFO_TRACE) {
                            Debug.pageHistory(currentPageNum);
                        }

                        System.exit(-1);
                    }
                }
            }

            _storage.writePage(filename, currentPageNum, page);
        }

        freeListPointer[0] = ((newPageNum - 1) * (pageTotalSpace - 1)) + endLeftPageNum;

        return firstPageNum;
    }

    private byte[] fillMetaPage(int treeid) throws ChaiDBException {
        byte[] metaPage = new byte[BTreeSpec.PAGE_SIZE];
        OpenedBTree ob = (OpenedBTree) btreeName2Id.getOpenedBTree(treeid);

        if (ob == null) {
            String details = "Btree not found. TreeId=" + treeid + ".";
            throw new ChaiDBException(ErrorCode.BTREE_NOT_FOUND, details);
        }

        IBTree t = ob.btree;
        BTreeSpec btreeSpec = t.getBTreeSpec();

        if (btreeSpec == null) {
            throw new ChaiDBException(ErrorCode.BTREE_INVALID, "btreeSpec is null treeid = " + treeid);
        }

        boolean msbFirst = btreeSpec.getMsbFirst();
        int magic = IBTreeConst.MAGIC_NUMBER;
        System.arraycopy(ByteTool.intToBytes(magic, msbFirst), 0, metaPage, BTreeSpec.MAGIC_OFF, 4); // msbFirst
        System.arraycopy(ByteTool.intToBytes(t.getBtreeId(), msbFirst), 0, metaPage, BTreeSpec.BTREEID_OFF, 4); // version
        System.arraycopy(ByteTool.intToBytes(btreeSpec.getPageNumber().getPageNumber(), msbFirst), 0, metaPage, BTreeSpec.NEXT_AVAILABLE_PAGE_OFF, 4);
        metaPage[BTreeSpec.LAYER_OFF] = btreeSpec.getLayers();

        int[] keyTypes;

        if (t.getType() == IDBIndex.SUPER_BTREE) {
            keyTypes = ((SuperBTree) t).getKeyTypes();
        } else {
            keyTypes = new int[1];
        }

        for (int i = 0; (keyTypes != null) && (i < btreeSpec.getLayers()); i++)
            metaPage[BTreeSpec.LAYER_OFF + 1 + i] = (byte) keyTypes[i];

        metaPage[BTreeSpec.FLAG_OFF] = 0;
        metaPage[BTreeSpec.BTREE_TYPE_OFF] = btreeSpec.treeType;
        System.arraycopy(ByteTool.intToBytes(btreeSpec.getRootPageNumber().getPageNumber(), msbFirst), 0, metaPage, BTreeSpec.ROOT_OFF, 4); // root page number
        System.arraycopy(ByteTool.intToBytes(btreeSpec.getNodeSize(), msbFirst), 0, metaPage, BTreeSpec.NODE_SIZE_OFF, 4); // nod? size

        return metaPage;
    }

    /**
     * Get the page specified by treeid and pNumber. This method is only called
     * during recovery.
     */
    private synchronized byte[] getSpecificPage(int treeid, PageNumber pNumber, int txnId) throws ChaiDBException {
        BufferedPageInfo pageInfo = null;
        Page pinfo = null;

        int freeLoc = -1;

        //first we remove it from freelist
        makeituseful(treeid, pNumber, txnId);

        pageInfo = (BufferedPageInfo) bufferedPageInfos.get(pNumber);

        if (pageInfo != null) {
            // found the page in buffer
            fix(pageInfo, true);
            pageInfo.incHitCount();

            return bufferedPages[pageInfo.getPageLocation()];
        }

        pinfo = new Page(pNumber);

        //here we set the btreespec pagenumber to real value
        BTreeSpec btreeSpec = getBTreeSpec(treeid);

        if ((pNumber.getPageNumber() + 1) > btreeSpec.getPageNumber().getPageNumber()) {
            btreeSpec.getPageNumber().setPageNumber(pNumber);
            btreeSpec.getPageNumber().add(1);
        }

        if (freeSlots.size() > 0) {
            freeLoc = ((Integer) freeSlots.getFirst()).intValue();
            freeSlots.removeFirst();

            // fix the new page
            pinfo.pageData = bufferedPages[freeLoc];

            // read raw page from file
            String filename = btreeName2Id.getBTreeName(treeid);

            if (filename == null) {
                logger.error("PageBufferManager::getSpecificPage The file name is null. The tree id " + treeid);
            } else {
                if (filename.equals("")) {
                    logger.error("PageBufferManager::getSpecificPage The file name is empty. The tree id " + treeid);
                }
            }

            _storage.readPage(filename, pNumber, pinfo.pageData);

            pageInfo = new BufferedPageInfo(treeid, freeLoc, pNumber);
            fix(pageInfo, false);
            bufferedPageInfos.put(pNumber, pageInfo);

            System.arraycopy(ByteTool.intToBytes(pNumber.getPageNumber()), 0, pinfo.pageData, BTreeSpec.OFF_PAGENUMBER, 4);

            return pinfo.pageData;
        }

        // NO space, need to kick out one page,
        // Remember always kick out one clean page to replace one new space.
        // Once the clean list is empty, the dirty page list will be flushed to underlying storage,
        // And we need to move dirty page list into clean page list.
        BufferedPageInfo tmpPageInfo = null;

        if (cleanList.getSize() <= RESERVED_CLEAN_SIZE) {
            //If the size of dirtyList = 0 and the size of cleanList != 0
            //Just jump to "kick out tailer from LRU clean list" diretly.
            if ((dirtyList.getSize() == 0) && (cleanList.getSize() == 0)) {
                String details = "Run out of buffer!!";
                Debug.pagesToFile("pages");
                throw new ChaiDBException(ErrorCode.BTREE_BUFFER_OVERFLOW, details);
            } else {
                flushDirtyPages();
            }
        }

        // kick out tailer from LRU clean list
        tmpPageInfo = cleanList.popBack();

        if (Debug.DEBUG_CLEAN_LIST_TRACE) {
            Debug.write(tmpPageInfo.getPageNumber(), 11, "[Pop back]");
        }

        checkExistInWriteBuffer(tmpPageInfo);
        freeLoc = tmpPageInfo.getPageLocation();

        // remove it from hashmap
        bufferedPageInfos.remove(tmpPageInfo.getPageNumber());

        if (Debug.DEBUG_BUFFEREDPAGEINFOS_REMOVE_TRACE) {
            Debug.write(tmpPageInfo.getPageNumber(), 11, "[bufferedPageInfos remove and new page is ]" + pNumber.toHexString());
            Debug.dumpPageToLog(pinfo.pageData);
        }

        // fix the new page
        pinfo.pageData = bufferedPages[freeLoc];

        // read raw page from file
        String filename = btreeName2Id.getBTreeName(treeid);

        if (filename == null) {
            logger.error("PageBufferManager::close The file name is null. The tree id " + treeid);
        } else {
            if (filename.equals("")) {
                logger.error("PageBufferManager::close The file name is empty. The tree id " + treeid);
            }
        }

        _storage.readPage(filename, pNumber, pinfo.pageData);

        pageInfo = new BufferedPageInfo(treeid, freeLoc, pNumber);
        fix(pageInfo, false);
        bufferedPageInfos.put(pNumber, pageInfo);

        System.arraycopy(ByteTool.intToBytes(pNumber.getPageNumber()), 0, pinfo.pageData, BTreeSpec.OFF_PAGENUMBER, 4);

        return pinfo.pageData;
    }

    /**
     * Dumps special memory info according to the flag. Just for debug.
     *
     * @param flag     The available value is Debug.DUMP_PAGEPOOL, Deubg.DUMP_LOOKASIDE, Debug.DUMP_FREESLOTS,
     *                 Debug.DUMP_FREELIST, Debug.DUMP_ALL
     * @param dumpfile The dump file name.
     * @see Debug#DUMP_PAGEPOOL
     * @see Debug#DUMP_CLEANLIST
     * @see Debug#DUMP_DIRTYLIST     *
     * @see Debug#DUMP_FREESLOTS
     * @see Debug#DUMP_FREELIST
     * @see Debug#DUMP_ALL
     */
    public synchronized void dumpMemInfo(byte flag, String dumpfile) {
        String tmpdir = TMP_DIR + File.separator + "ERR" + File.separator;
        BufferedWriter out = null;

        try {
            File dir = new File(tmpdir);

            if (!dir.exists() || dir.isFile()) {
                dir.mkdirs();
            }

            out = new BufferedWriter(new PrintWriter(new FileOutputStream(tmpdir + dumpfile + System.currentTimeMillis(), true)));

            if (flag > 0) {
                out.write(">>>>>>>>>>>>> Time=" + System.currentTimeMillis());
                out.newLine();
            }

            StringBuffer sb = new StringBuffer();

            //Dump page pool
            if ((flag & Debug.DUMP_PAGEPOOL) != 0) {
                if ((bufferedPageInfos == null) || (bufferedPageInfos.size() == 0)) {
                    sb.append("<Component name=\"PagePool\"/>");
                } else {
                    sb.append("<Component name=\"PagePool\">");

                    Iterator keyEnum = bufferedPageInfos.keySet().iterator();

                    while (keyEnum.hasNext()) {
                        sb.append("<bufferedPageInfos>");
                        sb.append("<pageNumber>");

                        PageNumber pn = (PageNumber) keyEnum.next();
                        sb.append(pn.toHexString());
                        sb.append("</pageNumber>");
                        sb.append("<PageInfo>");

                        BufferedPageInfo bpi = (BufferedPageInfo) bufferedPageInfos.get(pn);
                        String info = bpi.toString();
                        info += ("  " + ByteTool.toHexString(2, bufferedPages[bpi.getPageLocation()], 0, pageSize));
                        sb.append(info);
                        sb.append("</PageInfo>");
                        sb.append("</bufferedPageInfos>");
                        out.write(sb.toString()); //write one page to bufferwriter
                        sb.setLength(0); // clean up
                    }

                    sb.append("</Component>");
                }
            }

            out.write(sb.toString());
            sb.setLength(0);

            //dump clean list
            if (((flag & Debug.DUMP_CLEANLIST) != 0) && (cleanList.getSize() != 0)) {
                out.write("  \r\n>>>>>>>>> Clean list " + cleanList.getSize());

                Iterator iter = cleanList.getIterator();
                BufferedPageInfo pinfo = null;

                while (iter.hasNext()) {
                    pinfo = (BufferedPageInfo) iter.next();
                    sb.append(pinfo.getPageNumber().toHexString());
                    sb.append("->");
                }

                out.write(sb.toString());
            }

            sb.setLength(0);

            //dump dirty list
            if (((flag & Debug.DUMP_DIRTYLIST) != 0) && (dirtyList.getSize() != 0)) {
                out.write("  \r\n>>>>>>>>> Dirty list " + dirtyList.getSize());

                Iterator iter = dirtyList.getIterator();
                BufferedPageInfo pinfo = null;

                while (iter.hasNext()) {
                    pinfo = (BufferedPageInfo) iter.next();
                    sb.append(pinfo.getPageNumber().toHexString());
                    sb.append("->");
                }

                out.write(sb.toString());
            }

            sb.setLength(0);

            //dump free slot.
            if (((flag & Debug.DUMP_FREESLOTS) != 0) && (freeSlots != null)) {
                out.write(">>>>>>>>> Free Slots = " + freeSlots.size());
                out.newLine();

                for (int i = 0; i < freeSlots.size(); i++)
                    sb.append(((Integer) freeSlots.get(i)).intValue()).append(",");

                out.write(sb.toString());
            }

            sb.setLength(0);

            //dump free page numbers.
            if ((flag & Debug.DUMP_FREELIST) != 0) {
                out.write(">>>>>>>>> Free PageNumbers = " + freePageNumbers.size());
                out.newLine();

                Enumeration pageNumbers = freePageNumbers.elements();

                while (pageNumbers.hasMoreElements()) {
                    PageNumber p = (PageNumber) pageNumbers.nextElement();
                    sb.append(p.toHexString()).append("->");
                }

                out.write(sb.toString());
            }

            sb.setLength(0);

            out.flush();
        } catch (Exception e) {
            logger.error(e);
        } finally {
            if (out != null) {
                try {
                    out.close();
                } catch (IOException e) {
                    logger.error(e);
                }
            }
        }
    }

    public int getBufferedPageInfosSize() {
        return bufferedPageInfos.size();
    }

    public void clear() {
        bufferedPages = null;
        threadRecorders.clear();
    }

    /**
     * appended by Stanley
     *
     * @param collectionID
     * @param extendedDir
     * @throws ChaiDBException
     */
    public void addExtendedDir(int collectionID, String extendedDir) throws ChaiDBException {
        _storage.addExtendedDir(collectionID, extendedDir);
    }

    public String listCollectionDir(int collectionID) throws ChaiDBException {
        return _storage.listCollectionDirs(collectionID);
    }

    public Iterator listColDirs(int collectionID) throws ChaiDBException {
        return _storage.listColDirs(collectionID);
    }

    public Iterator removeExtendedColDirs(int collectionID) throws ChaiDBException {
        return _storage.removeExtendedColDirs(collectionID);
    }

    public CollectionStorage getColStorage(int collectionId) {
        return _storage.getColStorage(collectionId);
    }
    // --------------------------------------------------------- inner classes

    /**
     * This is designed especially for supporting Long Lock of txn.
     * Each user process(thread), here is txn, has its own space to
     * hold frequent competed resource.
     */
    private class ResourceControlBlock {
        /**
         * User process id.(txnid)
         */
        private int uid;

        /**
         * List of documents whose root change by this txn
         */
        private Hashtable trees = new Hashtable();

        /**
         * Latest Data Page Number of BTree where it fills data.
         * Treeid->Docid->ldp
         */
        private Hashtable LDPs = new Hashtable();

        /**
         * Free page list
         */
        private ArrayList freePageList = new ArrayList();

        /**
         * Constructor
         */
        ResourceControlBlock(int uid) {
            this.uid = uid;
        }

        /**
         * Get a free page from its Free Page List(FPL).
         */
        PageNumber removeFreePage(int treeid) {
            PageNumber pt = null;

            for (int i = 0; i < freePageList.size(); i++) {
                pt = (PageNumber) freePageList.get(i);

                if (treeid == pt.getTreeId()) {
                    freePageList.remove(i);

                    //freePageNumbers.remove(pt);
                    return pt;
                }
            }

            return null;
        }

        void addChangedDoc(Integer docID, int treeid) {
            Integer treeID = new Integer(treeid);
            ArrayList doclist = (ArrayList) trees.get(treeID);

            if (doclist == null) {
                doclist = new ArrayList();
                trees.put(treeID, doclist);
            }

            if (!doclist.contains(docID)) {
                doclist.add(docID);
            }
        }

        void flushChangedDocRoot(KernelContext kc) throws ChaiDBException {
            Enumeration tids = trees.keys();

            while (tids.hasMoreElements()) {
                Integer treeID = (Integer) tids.nextElement();
                IBTree bt = ((OpenedBTree) btreeName2Id.getOpenedBTree(treeID.intValue())).btree;

                if (bt.getType() == IDBIndex.ID2NODE_BTREE) {
                    Id2nodeBTree id2node = (Id2nodeBTree) bt;
                    ArrayList doclist = (ArrayList) trees.get(treeID);

                    for (int i = 0; i < doclist.size(); i++) {
                        Integer docID = (Integer) doclist.get(i);
                        id2node.flushDocRoot(docID, kc);
                    }
                }
            }
        }

        /**
         * remove input PageNumber from freePageList
         *
         * @param p
         * @return boolean
         */
        boolean removeFreePage(PageNumber p) {
            return freePageList.remove(p);
        }

        /**
         * Add a free page into its own FPL
         */
        void addToFreeList(PageNumber pNumber) {
            freePageList.add(pNumber);
        }

        /**
         * If the specified page is the latest data page
         */
        boolean isLatestDataPage(int docid, PageNumber p) {
            Integer tID = new Integer(p.getTreeId());
            Hashtable docLDPs = (Hashtable) LDPs.get(tID);

            if (docLDPs != null) {
                PageNumber ldp = (PageNumber) docLDPs.get(new Integer(docid));

                if ((ldp != null) && ldp.equ(p)) {
                    return true;
                }
            }

            return ((OpenedBTree) btreeName2Id.getOpenedBTree(tID.intValue())).isLatestDataPage(docid, p);
        }

        /**
         * This is called by BTreeNewPageLogRecord.doUndo.
         * if a txn new a latestDataPage, its undo will put it to
         * freelist, so that the corresponding ldp should be deleted
         */
        void removeLatestDataPage(int docid, PageNumber p) {
            Integer tID = new Integer(p.getTreeId());
            Hashtable docLDPs = (Hashtable) LDPs.get(tID);

            if (docLDPs != null) {
                PageNumber ldp = (PageNumber) docLDPs.get(new Integer(docid));

                if ((ldp != null) && ldp.equ(p)) {
                    docLDPs.remove(new Integer(docid));
                }
            }
        }

        /**
         * Get a latest page of the specified btree.
         *
         * @param treeid id of btree
         */
        PageNumber getLatestDataPage(int docid, int treeid) throws ChaiDBException {
            PageNumber pt = null;
            Integer tID = new Integer(treeid);
            Hashtable docLDPs = (Hashtable) LDPs.get(tID);

            if (docLDPs != null) {
                pt = (PageNumber) docLDPs.get(new Integer(docid));

                if (pt != null) {
                    return pt;
                }
            }

            pt = getANewLatestDataPage(docid, treeid);

            return pt;
        }

        /**
         * Get a new latestDataPage.
         */
        PageNumber getANewLatestDataPage(int docid, int treeid) throws ChaiDBException {
            //At first, Remove old latest data page.
            PageNumber oldPage = null;

            //At first, Remove old latest data page.
            PageNumber newPage = null;
            Integer tID = new Integer(treeid);
            Hashtable docLDPs = (Hashtable) LDPs.get(tID);
            Integer docID = new Integer(docid);

            if (docLDPs != null) {
                oldPage = (PageNumber) docLDPs.remove(docID);
            }

            OpenedBTree ob = (OpenedBTree) btreeName2Id.getOpenedBTree(tID.intValue());
            newPage = ob.getLatestDataPage(docid, true);

            if (newPage.getPageInFile() > BTreeSpec.INVALID_PAGENO) {
                if (docLDPs == null) {
                    docLDPs = new Hashtable();
                }

                docLDPs.put(docID, newPage);
                LDPs.put(tID, docLDPs);
            } else {
                return newPage; //Todo: ben zhang why  need return invalid ???
            }

            if (oldPage == null) {
                oldPage = new PageNumber(BTreeSpec.INVALID_PAGENO);
            }

            if (oldPage.getPageNumber() != newPage.getPageNumber()) {
                BTreeSpecLogRecord logRec = new BTreeSpecLogRecord(treeid, oldPage.getPageNumber(), newPage.getPageNumber(), BTreeSpecLogRecord.LATEST_DATA_PAGE_NUMBER_FLAG, uid, docid, ob.btree.getType());
                logRec.log();
            }

            if (Debug.DEBUG_RESOURCE_CONTROL_BLOCK) {
                logger.fatal(Integer.toHexString(uid) + " New LDP" + newPage.toHexString());
            }

            return newPage;
        }

        /**
         * Replace the old latest page of a btree with the new one
         *
         * @param pNumber the new LDP
         * @param normal  false if this is called in REDO/UNDO. Otherwise in normal, true
         */
        void putLatestDataPage(int docid, PageNumber pNumber, boolean normal) throws ChaiDBException {
            PageNumber oldPage = null;
            Integer tID = new Integer(pNumber.getTreeId());
            Hashtable docLDPs = (Hashtable) LDPs.get(tID);
            Integer docID = new Integer(docid);

            if (docLDPs != null) {
                oldPage = (PageNumber) docLDPs.remove(docID);
            } else {
                docLDPs = new Hashtable();
                LDPs.put(tID, docLDPs);
            }

            if (pNumber.getPageInFile() > 0) {
                docLDPs.put(docID, pNumber);
            }

            if (!normal) {
                return;
            }

            OpenedBTree ob = (OpenedBTree) btreeName2Id.getOpenedBTree(tID.intValue());

            if (oldPage == null) {
                oldPage = new PageNumber(BTreeSpec.INVALID_PAGENO);
            }

            if (!oldPage.equ(pNumber)) {
                BTreeSpecLogRecord logRec = new BTreeSpecLogRecord(pNumber.getTreeId(), oldPage.getPageNumber(), pNumber.getPageNumber(), BTreeSpecLogRecord.LATEST_DATA_PAGE_NUMBER_FLAG, uid, docid, ob.btree.getType());
                logRec.log();
            }
        }

        /**
         * Release all resources occupied by this txn after it ends
         */

        //void release(boolean redo) throws ChaiDBException {
        void release() throws ChaiDBException {
            PageNumber pt;
            OpenedBTree openedBTree;

            StringBuffer res = null;

            if (Debug.DEBUG_SHARE_MEMORY) {
                res = new StringBuffer("TXN:" + Integer.toHexString(uid));
                res.append("\r\n LDP List:");
            }

            Enumeration trees = LDPs.keys();

            while (trees.hasMoreElements()) {
                Integer tID = (Integer) trees.nextElement();
                openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(tID.intValue());

                if (openedBTree == null) {
                    continue;
                }

                Hashtable docLDPs = (Hashtable) LDPs.get(tID);
                Enumeration docIds = docLDPs.keys();

                while (docIds.hasMoreElements()) {
                    Integer docId = (Integer) docIds.nextElement();
                    PageNumber ldp = (PageNumber) docLDPs.get(docId);
                    openedBTree.putLatestDataPage(docId.intValue(), true, ldp);

                    if (Debug.DEBUG_SHARE_MEMORY) {
                        res.append("->" + ldp.toHexString());
                    }
                }
            }

            if (Debug.DEBUG_SHARE_MEMORY) {
                res.append("\r\n FreeList:");
            }

            for (int i = 0; i < freePageList.size(); i++) {
                pt = (PageNumber) freePageList.get(i);
                openedBTree = (OpenedBTree) btreeName2Id.getOpenedBTree(pt.getTreeId());

                if (openedBTree != null) {
                    openedBTree.putFreePage(pt);
                }

                if (Debug.DEBUG_SHARE_MEMORY) {
                    res.append("->" + pt.toHexString());
                }
            }

            if (Debug.DEBUG_SHARE_MEMORY) {
                logger.fatal(res.toString());
            }

            synchronized (resources) {
                resources.remove(new Integer(uid));
            }
        }
    }

    //end of class ResourceControlBlock

    /**
     * Every opened BTree has one and ONLY one instance of OpenedBTree.
     */
    private class OpenedBTree {
        //this variable means there are at most 100 docLdps in memory
        private final int LDP_BOX_SIZE = 100;
        private final double KICKOUT_PERCENT = 0.8;
        private final int BUCKET_NUMBER = 4096; //12 bits

        /**
         * instance of BTree
         */
        IBTree btree;
        private DocLDPBox docLdpBox;
        private HashMap[] buckets = new HashMap[BUCKET_NUMBER];

        /**
         * list of reserved pages storing Lastest DataPages and Free pages. Never used
         * by normal data page(BTreePage or DataPage).
         */
        private LinkedList reservedList;
        private int index;

        OpenedBTree(IBTree btree) {
            this.btree = btree;
            docLdpBox = new DocLDPBox();
        }

        void setReservedList(LinkedList list) {
            reservedList = list;
            resetReservedList();
        }

        void resetReservedList() {
            index = 0;
        }

        boolean isReservedPage(PageNumber pn) {
            if ((reservedList != null) && reservedList.contains(pn)) {
                return true;
            } else {
                return false;
            }
        }

        Page getAReservedFreePage() throws ChaiDBException {
            Page pi;

            if ((reservedList != null) && (index < reservedList.size())) {
                pi = new Page((PageNumber) reservedList.get(index++));

                //remove it from free page.
                //Assume one committed txn freed a page that later used
                //as a reserved page. Then during recovery, this page is
                //re-freed as part of operations of that committed txn.
                //So that if it's not removed from free pages list, getPage()
                //will throw out "Used freed pages" exception.
                removeFreePage((PageNumber) freePageNumbers.remove(pi.pageNumber));

                pi.pageData = getPage(btree.getBtreeId(), pi.pageNumber);
            } else {
                pi = _instance.getFreePage(btree.getBTreeSpec(), false, null);
                setReservedList(new LinkedList());
                reservedList.addLast(pi.pageNumber);
                index++;
            }

            System.arraycopy(ByteTool.intToBytes(pi.pageNumber.getPageNumber()), 0, pi.pageData, BTreeSpec.OFF_PAGENUMBER, BTreeSpec.PAGENUMBER_BYTE_SIZE);

            return pi;
        }

        //Only called when this btree is goint to be closed
        synchronized LinkedList getLatestDataPageList() {
            return docLdpBox.getAllLDPList();
        }

        synchronized LinkedList getFreePageList() {
            LinkedList freePageList = new LinkedList();

            for (int i = 0; i < BUCKET_NUMBER; i++) {
                if (buckets[i] != null) {
                    java.util.Iterator pages = buckets[i].keySet().iterator();

                    while (pages.hasNext()) freePageList.add(pages.next());
                }
            }

            return freePageList;
        }

        /**
         * Get a free page number from freePageList
         *
         * @return If freePageList's size is 0, return null.Otherwise, return one.
         */
        synchronized PageNumber removeFreePage() {
            for (int i = 0; i < BUCKET_NUMBER; i++) {
                if (buckets[i] != null) {
                    if (buckets[i].size() > 0) {
                        return (PageNumber) buckets[i].remove(buckets[i].keySet().iterator().next());
                    }
                }
            }

            return null;
        }

        /**
         * Get specified page
         *
         * @param pgno specified page number
         */
        synchronized void removeFreePage(PageNumber pgno) {
            if (pgno == null) {
                return;
            }

            findBucket(pgno).remove(pgno);
        }

        private HashMap findBucket(PageNumber pgno) {
            //index= (high 7 bits | file number <<7) & 0xfff
            //then it must be lower than 0x1000 (4096).
            int index = pgno.getPageInFile() >> 12; // index = pgno / 4096
            index |= (pgno.getFileNumber() << 7);
            index &= 0xfff; //index %= 4096

            if (buckets[index] == null) {
                buckets[index] = new HashMap();
            }

            return buckets[index];
        }

        /**
         * Put a free page of this btree into free page list
         *
         * @return list size after put
         */
        synchronized int putFreePage(PageNumber pn) {
            pn.setTreeId(btree.getBtreeId());

            HashMap freePageList = findBucket(pn);

            freePageList.put(pn, pn);

            return freePageList.size();
        }

        /**
         * If the specified page is current used latest Datapage.
         * We must traverse the LDP list in order not to free future ldp
         *
         * @param p specified page
         */
        boolean isLatestDataPage(int docid, PageNumber p) {
            return docLdpBox.isLatestDataPage(docid, p);
        }

        /**
         * Get a latest data page from latestDataPageList. If the list run out,
         * acquire a new page and add it to list.
         *
         * @param txnExists true if caller is a txn. Otherwise no txn
         * @return return a page.
         */
        synchronized PageNumber getLatestDataPage(int docid, boolean txnExists) throws ChaiDBException {
            return docLdpBox.getLatestDataPage(docid, txnExists);
        }

        /**
         * Get a new latestDataPage.
         *
         * @param docid the last one which will be removed because it is useless
         */
        synchronized PageNumber getANewLatestDataPage(int docid, boolean txnExists) throws ChaiDBException {
            return docLdpBox.getANewLatestDataPage(docid, txnExists);
        }

        void removeLatestDataPage(int docid, PageNumber ldp) {
            docLdpBox.removeLatestDataPage(docid, ldp);
        }

        /**
         * Put a latest page of this btree into free page list
         *
         * @param txnExists true if caller is a txn. Otherwise no txn
         */
        synchronized void putLatestDataPage(int docid, boolean txnExists, PageNumber newPage) {
            docLdpBox.putLDP(docid, txnExists, newPage);
        }

        private class DocLDPBox extends Hashtable {
            LinkedList docUsedFrequencys = new LinkedList();

            //check and kick out some if this box is full
            private synchronized void checkBoxSize() {
                if (docUsedFrequencys.size() == LDP_BOX_SIZE) {
                    StringBuffer buf = new StringBuffer("Removing docs:");

                    for (int i = 0; i < (LDP_BOX_SIZE * KICKOUT_PERCENT); i++) {
                        Integer docID = (Integer) docUsedFrequencys.removeLast();

                        if (logger.getLevel() == Level.DEBUG) {
                            buf.append(docID.intValue() + "->");
                        }

                        this.remove(docID);
                    }

                    if (logger.getLevel() == Level.DEBUG) {
                        buf.append(". List size=" + docUsedFrequencys.size() + ", Box size=" + size());
                        logger.debug(buf.toString());
                    }
                }
            }

            void putLDP(int docid, boolean txnExists, PageNumber ldp) {
                if ((ldp == null) || (ldp.getPageInFile() == BTreeSpec.INVALID_PAGENO)) {
                    return;
                }

                checkBoxSize();

                Integer docID = new Integer(docid);
                DocLDP docldp = (DocLDP) get(docID);

                if (docldp == null) {
                    docldp = new DocLDP(docid);
                    put(docID, docldp);
                }

                docldp.putLDP(txnExists, ldp);
                adjustFrequency(docID);
            }

            //adjust this element's index according to LRU
            private synchronized void adjustFrequency(Integer docID) {
                int index = docUsedFrequencys.indexOf(docID);

                if (index == 0) {
                    return;
                }

                if (index > 0) {
                    docUsedFrequencys.remove(index);
                }

                docUsedFrequencys.addFirst(docID);
            }

            PageNumber getANewLatestDataPage(int docid, boolean txnExists) throws ChaiDBException {
                Integer docID = new Integer(docid);
                DocLDP docldp = (DocLDP) get(docID);

                return ((docldp == null) ? new PageNumber(BTreeSpec.INVALID_PAGENO) : docldp.getANewLatestDataPage(txnExists));
            }

            PageNumber getLatestDataPage(int docid, boolean txnExists) throws ChaiDBException {
                Integer docID = new Integer(docid);
                DocLDP docldp = (DocLDP) get(docID);

                return ((docldp == null) ? new PageNumber(BTreeSpec.INVALID_PAGENO) : docldp.getLatestDataPage(txnExists));
            }

            boolean isLatestDataPage(int docid, PageNumber p) {
                DocLDP docldp = (DocLDP) get(new Integer(docid));

                return ((docldp == null) ? false : docldp.isLatestDataPage(p));
            }

            void removeLatestDataPage(int docid, PageNumber p) {
                DocLDP docldp = (DocLDP) get(new Integer(docid));

                if (docldp != null) {
                    docldp.removeLatestDataPage(p);
                }
            }

            LinkedList getAllLDPList() {
                LinkedList allList = new LinkedList();
                Enumeration docLDPs = this.elements();

                while (docLDPs.hasMoreElements()) {
                    DocLDP docldp = (DocLDP) docLDPs.nextElement();

                    if (docldp.ldpList != null) {
                        allList.addAll(docldp.ldpList);
                    }
                }

                return allList;
            }

            //The inner class which maintains the LDP list if txn exist, or latest
            //data page if txn not exist.
            private class DocLDP {
                //LDP LIST from which txn operations get LDP
                LinkedList ldpList;

                //LDP used by no_txn operations
                PageNumber ldp_notxn;
                int docid;

                DocLDP(int docid) {
                    ldpList = new LinkedList();
                    this.docid = docid;
                }

                synchronized void putLDP(boolean txnExists, PageNumber p) {
                    if (txnExists) {
                        if (!ldpList.contains(p)) {
                            ldpList.add(p);
                        }
                    } else if (ldp_notxn == null) {
                        ldp_notxn = new PageNumber(p);
                    } else { //Todo: ben zhang. why need this branch

                        if (!ldp_notxn.equ(p)) {
                            ldpList.add(p);
                        }
                    }
                }

                public synchronized boolean isLatestDataPage(PageNumber p) {
                    if ((ldp_notxn != null) && p.equ(ldp_notxn)) {
                        return true;
                    }

                    for (int i = 0; i < ldpList.size(); i++)
                        if (p.equals(ldpList.get(i))) {
                            return true;
                        }

                    return false;
                }

                public synchronized void removeLatestDataPage(PageNumber p) {
                    if ((ldp_notxn != null) && p.equ(ldp_notxn)) {
                        ldp_notxn = null;
                    }

                    ldpList.remove(p);
                }

                /**
                 * Get a latest data page from latestDataPageList. If the list run out,
                 * acquire a new page and add it to list.
                 *
                 * @param txnExists true if caller is a txn. Otherwise no txn
                 * @return return a page.
                 */
                synchronized PageNumber getLatestDataPage(boolean txnExists) throws ChaiDBException {
                    //if no txn and ldp_notxn!=null, return ldp_notxn
                    if (!txnExists && (ldp_notxn != null)) {
                        return ldp_notxn;
                    }

                    //if there is no availabe ldp,return a invalid one.
                    PageNumber pn = new PageNumber(BTreeSpec.INVALID_PAGENO);

                    if (ldpList.size() > 0) {
                        pn = (PageNumber) ldpList.removeFirst();

                        if (!txnExists) { // If no txn, and   ldp_notxn == null, evaluate ldp_notxn with pn which removed from ldpList
                            ldp_notxn = pn; //Todo: Ben zhang why need to evaluate.
                        }
                    }

                    return pn;
                }

                /**
                 * Get a new latestDataPage.
                 *
                 * @param txnExists the last one which will be removed because it is useless
                 */
                synchronized PageNumber getANewLatestDataPage(boolean txnExists) throws ChaiDBException {
                    if (!txnExists) {
                        ldp_notxn = null;
                    }

                    return this.getLatestDataPage(txnExists);
                }
            }

            //end of DocLDP
        }

        //end of class DocLDPBox
    }

    //end of class OpenedBTree
    private class ThreadRecorder {
        //the page a thread refer between a btree operation.
        Hashtable referedPages = new Hashtable();

        void fixPage(PageNumber pn) {
            PageCount pc = (PageCount) referedPages.get(pn);

            if (pc == null) {
                referedPages.put(pn, new PageCount());
            } else {
                pc.count++;
            }
        }

        void releasePage(PageNumber pn) {
            PageCount pc = (PageCount) referedPages.get(pn);

            if (pc != null) {
                if ((--pc.count) == 0) {
                    referedPages.remove(pn);
                }
            }
        }

        void resetPage(PageNumber pn) {
            referedPages.remove(pn);
        }

        //To see if this thread still fixed pages
        boolean hasMoreFixedPage() {
            return referedPages.size() > 0;
        }

        void end(int treeid) {
            if (referedPages.size() > 0) {
                Enumeration pageKyes = referedPages.keys();

                while (pageKyes.hasMoreElements()) {
                    PageNumber pn = (PageNumber) pageKyes.nextElement();
                    PageCount pc = (PageCount) referedPages.get(pn);
                    BufferedPageInfo bpi = (BufferedPageInfo) bufferedPageInfos.get(pn);

                    if ((bpi == null) || (pc == null) || (pc.count == 0) || (pn.getTreeId() != treeid)) {
                        continue;
                    }

                    //here we substract the number directly to enhance performance
                    synchronized (PageBufferManager.this) {
                        bpi.unFix(pc.count);
                        bpi.fix(); //since unFix substract fixed by 1, increase 1 to balance
                    }

                    unFix(bpi, true);
                    referedPages.remove(pn);
                }
            }
        }

        /**
         * this class's count can be manupulated directly, not as Integer
         * which can be done the same thing.
         */
        private class PageCount {
            int count = 1;
        }

        //end of class PageCount
    }

    //end of ThreadRecorder
    private class BufferWriter extends AbstractDaemonThread {
        static final String THREADNAME = "Page Buffer Writer thread";

        public BufferWriter() {
            super(BufferWriter.THREADNAME);
        }

        public void flushBuffer() {
            synchronized (this) {
                this.notify();
            }
        }

        protected void runningBody() {
            try {
                synchronized (this) {
                    if (writeSize == 0) {
                        try {
                            this.wait();
                        } catch (InterruptedException ie) {
                            return;
                        }
                    }

                    try {
                        Db.getLogManager().flush();

                        for (int i = 0; i < writeSize; i++) {
                            String filename = btreeName2Id.getBTreeName(writePageNumber[i].getTreeId());

                            if (filename == null) {
                                logger.error("BufferWriter::run The file name is null. The page number is " + writePageNumber[i].toHexString());
                            } else {
                                if (filename.equals("")) {
                                    logger.error("BufferWriter::run The file name is empty. The page number is " + writePageNumber[i].toHexString());
                                }
                            }

                            if (Debug.DEBUG_CHECKPAGENUMBER) {
                                if (!Db.getTxnManager().isOnRecovery()) {
                                    if (!Debug.checkPageNumber(writePageNumber[i], writeBufferedPages[i])) {
                                        logger.fatal(new Throwable("There is hole in GetPage!"));
                                        new Throwable().printStackTrace(System.err);
                                        dump();
                                        Db.getLogManager().flush();

                                        if (Debug.DEBUG_DUMP_MEMORY) {
                                            dumpMemInfo(Debug.DUMP_ALL, "memoryinfo");
                                        }

                                        Debug.dumpPageToLog(writeBufferedPages[i]);

                                        if (Debug.DEBUG_PAGEINFO_TRACE) {
                                            Debug.pageHistory(writePageNumber[i]);
                                        }

                                        System.exit(-1);
                                    }
                                }
                            }

                            _storage.writePage(filename, writePageNumber[i], writeBufferedPages[i]);
                        }
                    } catch (Throwable e) {
                        logger.fatal(e);
                    } finally {
                        synchronized (writeLock) {
                            writeSize = 0;
                            writeLock.notifyAll();
                        }
                    }
                }

                //sync(this)
            } catch (Throwable t) {
                logger.fatal(t);
            }
        }
    }
}
TOP

Related Classes of org.chaidb.db.index.btree.bufmgr.PageBufferManager$ThreadRecorder

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.