/*
* Copyright (C) 2006 http://www.chaidb.org
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
*/
package org.chaidb.db.index.btree;
import org.apache.log4j.Logger;
import org.chaidb.db.KernelContext;
import org.chaidb.db.exception.ChaiDBException;
import org.chaidb.db.exception.ErrorCode;
import org.chaidb.db.helper.ByteTool;
import org.chaidb.db.index.IDBIndex;
import org.chaidb.db.index.btree.bufmgr.Page;
import org.chaidb.db.index.btree.bufmgr.PageBufferManager;
import org.chaidb.db.index.btree.bufmgr.PageNumber;
import org.chaidb.db.log.logrecord.BTreeAddRemoveLogRecord;
import org.chaidb.db.log.logrecord.BTreeFreeOverflowPageLogRecord;
import org.chaidb.db.log.logrecord.BTreeNewPageLogRecord;
import org.chaidb.db.log.logrecord.BTreeReplLogRecord;
/**
* A single data page read from the database.
* <p/>
* A page in a data file has the following layout:</P>
* <PRE>
* +--------+------------+---------------------------+
* | Header | Free space | Nodes |
* +--------+------------+---------------------------+
* </PRE>
* <p/>
* <P><B>Header:</B> A data page has the following metadata:</P>
* <PRE>
* Size Description
* 4 Number of this page
* 4 Page number of preceding page
* 4 Page number of following page
* 3 Doc id of this page
* 1 Key type of this page
* 4 Reserved 4 bytes
* 4 Flags:
* 0x01 - BTree internal page
* 0x02 - BTree leaf page
* 0x04 - Overflow page
* 0x08 - Recno internal page
* 0x10 - Recno leaf page
* 0x20 - Never delete this chain of pages
* 2 node number on this page
* 2 Upper bound of free space on page
* </PRE>
* <p/>
* <p><b>Page number of preceding page</b>: For overflow page, it is used to store original page number.
* The parent page number of root page is 0.
* <p/>
* <p><b>Page number of following page</b>: For overflow pages, it is used to store next overflow page
* number.
* <p/>
* <P><B>Nodes:</B>
* Each node has a key and a data item. Ordinarily, the key and/or data
* immediately follow the flags. If the key or data is too big,
* it's stored on a separate 'overflow' page (or pageS, if it's really
* big!). In that case, the node contains a 4-byte of the first overflow page number</P>
* <p/>
* <P>Overflow pages have the same header as normal pages, but
* simply hold raw key/data bytes. They use the 'next page' value
* to chain successive pages of data together.</P>
* <p/>
* <P>Each node has the following format:</P>
* <PRE>
* Size Description
* 4 Key Size
* 4 Data Size
* 1 Flags: 0x01 for overflow data, 0x02 for overflow key
* 2 size of space allocated for this node within the page
* n Key/Data (4 bytes overflow page number in case key/data overflow)
* </PRE>
*/
public class DataPage {
private static final Logger logger = Logger.getLogger(AbstractBTree.class);
/**
* reference to bufferedPage
*/
PageBufferManager buffer;
/**
* reference to the raw page
*/
byte[] page; // get from BufferPage.lookup(pageNumber);
/**
* reference to the btree spec
*/
BTreeSpec btreeSpec;
/**
* head information of the page
*/
PageNumber pageNumber;
PageNumber prevPage;
PageNumber nextPage;
/**
* only support regular data page and overflow page now
*/
int docid = -1;
int flags;
short upperBound;
short nodes;
int txnId;
boolean needLog = false;
/**
* Default Constructor
*/
public DataPage(BTreeSpec btreeSpec, PageBufferManager buffer) {
this.btreeSpec = btreeSpec;
this.buffer = buffer;
this.pageNumber = new PageNumber(-1, -1, -1);
this.prevPage = new PageNumber(-1, -1, -1);
this.nextPage = new PageNumber(-1, -1, -1);
}
/**
* Basically, this class just stores a reference to the
* raw page bytes, and decodes them as necessary.
*/
public DataPage(int id, PageNumber pageNumber, BTreeSpec btreeSpec, PageBufferManager buffer) throws ChaiDBException {
this(btreeSpec, buffer);
nextPage.setTreeId(id);
prevPage.setTreeId(id);
pageNumber.setTreeId(id);
this.pageNumber = new PageNumber(pageNumber);
this.page = buffer.getPage(id, pageNumber);
if (this.page != null) {
// get variables from page
readMetaData();
} else {
}
}
/**
* Read the page header from byte[] page, and set the appropriate variables in
* DataPage class.
*/
private void readMetaData() {
prevPage.setPageNumber(ByteTool.bytesToInt(page, BTreeSpec.OFF_PREVPAGE, btreeSpec.isMsbFirst()));
nextPage.setPageNumber(ByteTool.bytesToInt(page, BTreeSpec.OFF_NEXTPAGE, btreeSpec.isMsbFirst()));
byte[] docBytes = new byte[4];
System.arraycopy(ByteTool.subByteArray(page, BTreeSpec.OFF_DOCID, BTree.DOCID_SIZE), 0, docBytes, docBytes.length - BTree.DOCID_SIZE, BTree.DOCID_SIZE);
docid = ByteTool.bytesToInt(docBytes, 0, btreeSpec.isMsbFirst());
flags = ByteTool.bytesToInt(page, BTreeSpec.OFF_FLAGS, btreeSpec.isMsbFirst());
nodes = ByteTool.bytesToShort(page, BTreeSpec.OFF_LOWERBOUND, btreeSpec.isMsbFirst());
upperBound = ByteTool.bytesToShort(page, BTreeSpec.OFF_UPPERBOUND, btreeSpec.isMsbFirst());
}
/**
* When first create a new page, set nodes and upperBound to the right number.
* BtreeSpec and BufferedPage are passed as parameters; pageNumber is got from btreeSpec,
* which keeps the first available free page number.
*
* @param btreeSpec
* @param buffer
*/
public static DataPage newPage(BTreeSpec btreeSpec, PageBufferManager buffer, boolean isOverflowPage, KernelContext kContext, int docid) throws ChaiDBException {
int txnId = kContext.getLocker();
boolean needLog = kContext.getNeedLog();
DataPage newPage = new DataPage(btreeSpec, buffer);
Page freePageInfo = buffer.getFreePage(btreeSpec, false, kContext);
newPage.setLogInfo(txnId, needLog);
/* begin:added by marriane 2001-9-29 for logging newPage log record */
int newPageFlag = 0;
if (isOverflowPage) newPageFlag = 4;
if (needLog) {
int newPageNo = freePageInfo.getPageNumber().getPageNumber();
BTreeNewPageLogRecord logRec = new BTreeNewPageLogRecord(freePageInfo.getPageNumber().getTreeId(), newPageNo, txnId, newPageFlag, docid, btreeSpec.btree.getType());
logRec.log();
}
/* end:added by marriane for logging newPage log record */
newPage.page = freePageInfo.getPageData();
newPage.setPageNumber(freePageInfo.getPageNumber());
newPage.setLogInfo(txnId, false);
// ### not write to page yet!
newPage.setNodes((short) 0);
newPage.setUpperBound((short) newPage.btreeSpec.getPageSize());
newPage.setNextPage(new PageNumber(BTreeSpec.INVALID_PAGENO));
newPage.setPrevPage(new PageNumber(BTreeSpec.INVALID_PAGENO));
if (!isOverflowPage) {
newPage.setFlags(0);
} else {
newPage.setOverflow();
}
newPage.setDocID(0);
newPage.setLogInfo(txnId, needLog);
return newPage;
}
public byte[] getPage() {
return page;
}
void setNodes(short nodes) throws ChaiDBException {
if (this.nodes == nodes) return;
/***************** Add by Leon Zhu,2002-8-21 ********************/
if (needLog) {
int pgno = pageNumber.getPageNumber();
BTreeReplLogRecord lr = new BTreeReplLogRecord(pageNumber.getTreeId(), pgno, txnId, BTreeSpec.OFF_LOWERBOUND, ByteTool.shortToBytes(this.nodes), ByteTool.shortToBytes(nodes), btreeSpec.btree.getType());
lr.log();
}
/******************************************************************/
this.nodes = nodes;
System.arraycopy(ByteTool.shortToBytes(nodes), 0, page, BTreeSpec.OFF_LOWERBOUND, 2);
}
void setUpperBound(short upperBound) throws ChaiDBException {
/***************** Add by marriane,2001-9-29 ********************/
if (needLog) {
int pgno = pageNumber.getPageNumber();
short oldV = this.upperBound;
short newV = upperBound;
if (newV != oldV) {
BTreeReplLogRecord lr = new BTreeReplLogRecord(pageNumber.getTreeId(), pgno, txnId, BTreeSpec.OFF_UPPERBOUND, ByteTool.shortToBytes(oldV), ByteTool.shortToBytes(newV), btreeSpec.btree.getType());
lr.log();
}
}
/******************************************************************/
this.upperBound = upperBound;
System.arraycopy(ByteTool.shortToBytes(this.upperBound), 0, this.page, BTreeSpec.OFF_UPPERBOUND, 2);
}
void setPageNumber(PageNumber pageNumber) {
this.pageNumber.setPageNumber(pageNumber);
System.arraycopy(ByteTool.intToBytes(this.pageNumber.getPageNumber()), 0, this.page, BTreeSpec.OFF_PAGENUMBER, 4);
}
public PageNumber getPageNumber() {
return pageNumber;
}
void setPrevPage(PageNumber prevPage) throws ChaiDBException {
/***************** Add by marriane,2001-9-29 ********************/
int oldV = this.prevPage.getPageNumber();
int newV = prevPage.getPageNumber();
if (needLog && (oldV != newV)) {
int pgno = pageNumber.getPageNumber();
BTreeReplLogRecord lr = new BTreeReplLogRecord(pageNumber.getTreeId(), pgno, txnId, BTreeSpec.OFF_PREVPAGE, ByteTool.intToBytes(oldV, btreeSpec.isMsbFirst()), ByteTool.intToBytes(newV, btreeSpec.isMsbFirst()), btreeSpec.btree.getType());
lr.log();
}
/******************************************************************/
this.prevPage.setPageNumber(prevPage);
System.arraycopy(ByteTool.intToBytes(this.prevPage.getPageNumber()), 0, this.page, BTreeSpec.OFF_PREVPAGE, 4);
}
void setNextPage(PageNumber nextPage) throws ChaiDBException {
/***************** Add by marriane ,2001-9-29 ********************/
int oldV = this.nextPage.getPageNumber();
int newV = nextPage.getPageNumber();
if (needLog && (oldV != newV)) {
int pgno = pageNumber.getPageNumber();
BTreeReplLogRecord lr = new BTreeReplLogRecord(pageNumber.getTreeId(), pgno, txnId, BTreeSpec.OFF_NEXTPAGE, ByteTool.intToBytes(oldV, btreeSpec.isMsbFirst()), ByteTool.intToBytes(newV, btreeSpec.isMsbFirst()), btreeSpec.btree.getType());
lr.log();
}
/******************************************************************/
this.nextPage.setPageNumber(nextPage);
System.arraycopy(ByteTool.intToBytes(this.nextPage.getPageNumber()), 0, this.page, BTreeSpec.OFF_NEXTPAGE, 4);
}
boolean isOfDoc(int docID) {
return docid == docID;
}
void setDocID(int docID) throws ChaiDBException {
if (docID == docid) return;
byte[] oldV = ByteTool.subByteArray(ByteTool.intToBytes(docid), 4 - BTree.DOCID_SIZE, BTree.DOCID_SIZE);
byte[] newV = ByteTool.subByteArray(ByteTool.intToBytes(docID), 4 - BTree.DOCID_SIZE, BTree.DOCID_SIZE);
/***************** Add by marriane ,2001-9-29 ********************/
if (needLog) {
int pgno = pageNumber.getPageNumber();
BTreeReplLogRecord lr = new BTreeReplLogRecord(pageNumber.getTreeId(), pgno, txnId, BTreeSpec.OFF_DOCID, oldV, newV, btreeSpec.btree.getType());
lr.log();
}
/******************************************************************/
docid = docID;
System.arraycopy(newV, 0, this.page, BTreeSpec.OFF_DOCID, BTree.DOCID_SIZE);
}
void setFlags(int flags) throws ChaiDBException {
/***************** Add by marriane ,2001-9-29 ********************/
int oldV = this.flags;
int newV = flags;
if (needLog && (oldV != newV)) {
int pgno = pageNumber.getPageNumber();
BTreeReplLogRecord lr = new BTreeReplLogRecord(pageNumber.getTreeId(), pgno, txnId, BTreeSpec.OFF_FLAGS, ByteTool.intToBytes(oldV, btreeSpec.isMsbFirst()), ByteTool.intToBytes(newV, btreeSpec.isMsbFirst()), btreeSpec.btree.getType());
lr.log();
}
/******************************************************************/
this.flags = flags;
System.arraycopy(ByteTool.intToBytes(this.flags), 0, this.page, BTreeSpec.OFF_FLAGS, 4);
}
short getFreeSpace() {
return (short) (upperBound - BTreeSpec.PAGE_HEADER_SIZE);
}
void setOverflow() throws ChaiDBException {
setFlags(4);
}
final boolean isOverflow() {
return (flags & 0x04) != 0;
}
//if the passed flag represents a normal (not overflow) Datapage.
//now is only used in BTreeFreePageLogRecord
public static final boolean isNormalDataPage(int flag) {
return flag == 0;
}
protected short getCurrNodeNumbers() {
return nodes;
}
/**
* insert a node in a page sorted by the key; guaranteed the node has enough
* space. If key already exist, and mode == STORE_REPLACE, the data is replaced
* by the new data; otherwise, do not touch anything.
* @param key
* @param data Can be the value or the pageNumber
* @param mode STORE_REPLACE, or STORE_INSERT
* @param kContext KernelContext of this caller
* @param nodeOff location of an existed node or -1 of nonexisted ones
*/
// public short insertNode(byte[] key,byte[] data, short mode,KernelContext kContext,int nodeOff) throws ChaiDBException {
// int txnId = kContext.getLocker();
// boolean needLog = kContext.getNeedLog();
// byte[] oldData=null;
//
// try {
// /***** added by marriane *****/
// setLogInfo(txnId,needLog);
// /*****************************/
//
// // fix the current page
// page = buffer.getPage(btreeSpec.btree.getBtreeId(),pageNumber);
// if (page==null)
// throw new ChaiDBException(ErrorCode.BTREE_INVALID_DATAPAGE,
// "Page is null: "+pageNumber.toHexString()+" of "+btreeSpec.getBtreeName());
//
// //====>>> 1. calculate the place to insert the node
// // sort insert
// boolean keyExist = nodeOff>0;
// // check mode to replace existing key's value or just return
// if ( keyExist && mode == IDBIndex.STORE_INSERT ) {
// // untouch the node, may throw exception later &&&
// // unfix this page
// buffer.releasePage(pageNumber.getTreeId() ,pageNumber, false);
//
// // details -ranjeet
// String details = "The key value is " + new String(key) + ".";
// throw new ChaiDBException(ErrorCode.ENTRY_ALREADY_EXIST, details);
// }
//
// //=====>>> 2. insert / replace key/data pair
// DataNode dataNode;
// // size in byte of the node needed
// int newNodeSize = BTreeSpec.DATA_NODE_HEADER_SIZE + data.length;
//
// /* If data.length < PageNumberSzie, we must enlarge it to ensure
// * the update operation in the furture, because more data may be
// * used to replace current data, if the size is too few to cotain
// * PageNumber, it can't store overflow page number.
// * Kurt 2004-9-8*/
// if (data.length < BTreeSpec.PAGENUMBER_BYTE_SIZE) {
// newNodeSize += (BTreeSpec.PAGENUMBER_BYTE_SIZE - data.length);
// }
//
// if (keyExist) {
// // get the existing node
// dataNode = getNode(nodeOff);
// //remember oldData
// oldData=ByteTool.copyByteArray(page,dataNode.getNodeOffset(),(int)dataNode.getNodeSpace());
// //short reUsedSpace=oldData.length;
// if ( dataNode.isOverflow()) {
// // free the overflow pages
// PageNumber overflowPageNumber = new PageNumber(ByteTool.bytesToInt(page,
// dataNode.getNodeOffset() + BTreeSpec.DATA_NODE_HEADER_SIZE,
// btreeSpec.isMsbFirst()));
// overflowPageNumber.setTreeId(btreeSpec.btree.getBtreeId());
//
// DataPage overflowPage =
// new DataPage(btreeSpec.btree.getBtreeId() ,overflowPageNumber,btreeSpec,buffer);
//
// // put into freeList
// /* begin:added by marriane 2001-9-29 for logging free overflow page log record */
// if (needLog) {
// int pgno = overflowPage.pageNumber.getPageNumber();
// short upBound = overflowPage.upperBound ;
// short lowBound = (short)BTreeSpec.PAGE_HEADER_SIZE ;
// BTreeFreeOverflowPageLogRecord lr = new BTreeFreeOverflowPageLogRecord(
// pageNumber.getTreeId(),
// pgno,
// txnId,
// ByteTool.copyByteArray(overflowPage.page, 0,lowBound),
// ByteTool.copyByteArray(overflowPage.page,
// upBound,BTreeSpec.PAGE_SIZE-upBound),
// btreeSpec.btree.getType());
//
// lr.log();
// }
// /* end:added by marriane 2001-9-29 for logging free overflow page log record */
//
// buffer.addToFreeList(btreeSpec.btree.getBtreeId(),overflowPage.pageNumber
// ,needLog?new Integer(txnId):null);
//
// while ( overflowPage.nextPage.getPageNumber() > 0 ) {
//
// overflowPage = new DataPage(btreeSpec.btree.getBtreeId() ,overflowPage.nextPage,
// btreeSpec, buffer);
// // put into freeList
// /* begin:added by marriane 2001-9-29 for logging free overflow page log record */
// if (needLog) {
// int pgno = overflowPage.pageNumber.getPageNumber();
// short upBound = overflowPage.upperBound ;
// short lowBound = (short)BTreeSpec.PAGE_HEADER_SIZE ;
// BTreeFreeOverflowPageLogRecord lr = new BTreeFreeOverflowPageLogRecord(
// pageNumber.getTreeId(),
// pgno,
// txnId,
// ByteTool.copyByteArray(overflowPage.page, 0,lowBound),
// ByteTool.copyByteArray(overflowPage.page,
// upBound,BTreeSpec.PAGE_SIZE-upBound),
// btreeSpec.btree.getType());
//
// lr.log();
// }
// /* end:added by marriane 2001-9-29 for logging free overflow page log record */
//
// //v.add(overflowPage.pageNumber);
// buffer.addToFreeList(btreeSpec.btree.getBtreeId(),overflowPage.pageNumber
// ,needLog?new Integer(txnId):null);
// }
// }
//
// //here we don't set node sapce because when a large node is replaced by a little size node
// //, setting node space may cause later replace this little one with a larger one
// //result to overflow because the below condition
// if (newNodeSize <= dataNode.getNodeSpace()){
// // if the page has enough space to hold the new node
// dataNode.setFlags((byte)0);
//
// } else {
// // The page doesn't have enough space to hold data
// dataNode.setFlags((byte)1);
// }
//
// } else {
//
// boolean overflow = false;
//
// /** in BTreePage's insertNode() function, we gurantee this data page has enough space
// * to store a data node with minimum space
// */
// /* begin : added by marriane 2001-12-28 for bind all insert node log records*/
// setLogInfo(txnId,false);
// if (getFreeSpace() >= newNodeSize){// + 2){ // node size + 2 bytes long offset
// setUpperBound((short) (upperBound - newNodeSize));
// } else {
// setUpperBound((short) (upperBound - BTreeSpec.DATA_NODE_HEADER_SIZE - 4));
// overflow = true;
// }
//
// dataNode = new DataNode(this, upperBound, data);
//
// if (overflow) {
// dataNode.setFlags((byte)1);
// dataNode.setNodeSpace((short)(BTreeSpec.DATA_NODE_HEADER_SIZE + 4));
// }
// else {
// dataNode.setFlags((byte)0);
// dataNode.setNodeSpace((short)newNodeSize);
// }
// //setNodes((short) (nodes + 2));
// setLogInfo(txnId,needLog);
//
// }
//
// // write the new key/data to the node
// if (keyExist) {
// dataNode.setDataSize(data.length);
// }else
// setNodes((short)(nodes+1));
// dataNode.storeNode(data,oldData,kContext);
//
// // unfix this page
// if (Debug.DEBUG_DATAPAGE)
// logger.debug("insertNode - release the page " + pageNumber);
// buffer.releasePage(pageNumber.getTreeId() ,pageNumber, true);
// return (short)dataNode.getNodeOffset();
// }finally {
// /**todo release lock in order not to leak lock*/
// }
// }
/**
* insert a node in a page sorted by the key; guaranteed the node has enough
* space. If key already exist, and mode == STORE_REPLACE, the data is replaced
* by the new data; otherwise, do not touch anything.
*
* @param key
* @param data Can be the value or the pageNumber
* @param mode STORE_REPLACE, or STORE_INSERT
* @param kContext KernelContext of this caller
* @param nodeOff location of an existed node or -1 of nonexisted ones
* @param nodeFlags
*/
public short insertNode(byte[] key, byte[] data, short mode, KernelContext kContext, int nodeOff, byte nodeFlags) throws ChaiDBException {
int txnId = kContext.getLocker();
boolean needLog = kContext.getNeedLog();
byte[] oldData = null;
try {
/***** added by marriane *****/
setLogInfo(txnId, needLog);
/*****************************/
// fix the current page
page = buffer.getPage(btreeSpec.btree.getBtreeId(), pageNumber);
if (page == null)
throw new ChaiDBException(ErrorCode.BTREE_INVALID_DATAPAGE, "Page is null: " + pageNumber.toHexString() + " of " + btreeSpec.getBtreeName());
//====>>> 1. calculate the place to insert the node
// sort insert
boolean keyExist = nodeOff > 0;
// check mode to replace existing key's value or just return
if (keyExist && mode == IDBIndex.STORE_INSERT) {
// untouch the node, may throw exception later &&&
// unfix this page
buffer.releasePage(pageNumber.getTreeId(), pageNumber, false);
// details -ranjeet
String details = "The key value is " + new String(key) + ".";
throw new ChaiDBException(ErrorCode.ENTRY_ALREADY_EXIST, details);
}
//=====>>> 2. insert / replace key/data pair
DataNode dataNode;
// size in byte of the node needed
int newNodeSize = BTreeSpec.DATA_NODE_HEADER_SIZE + data.length;
/* If data.length < PageNumberSzie, we must enlarge it to ensure
* the update operation in the furture, because more data may be
* used to replace current data, if the size is too few to cotain
* PageNumber, it can't store overflow page number.
* Kurt 2004-9-8*/
if (data.length < BTreeSpec.PAGENUMBER_BYTE_SIZE) {
newNodeSize += (BTreeSpec.PAGENUMBER_BYTE_SIZE - data.length);
}
if (keyExist) {
// get the existing node
dataNode = getNode(nodeOff);
//remember oldData
oldData = ByteTool.copyByteArray(page, dataNode.getNodeOffset(), (int) dataNode.getNodeSpace());
//short reUsedSpace=oldData.length;
if (dataNode.isOverflow()) {
insertWhenKeyExistAndOverflow(dataNode, needLog, txnId);
}
//here we don't set node sapce because when a large node is replaced by a little size node
//, setting node space may cause later replace this little one with a larger one
//result to overflow because the below condition
if (newNodeSize <= dataNode.getNodeSpace()) {
// if the page has enough space to hold the new node
// dataNode.setFlags((byte)0);
dataNode.setFlags((byte) (0xFE & nodeFlags));
} else {
// The page doesn't have enough space to hold data
// dataNode.setFlags((byte)1);
dataNode.setFlags((byte) (1 | nodeFlags));
}
} else {
dataNode = insertNodeWhenKeyNotExist(txnId, newNodeSize, data, nodeFlags, needLog);
}
// write the new key/data to the node
if (keyExist) {
dataNode.setDataSize(data.length);
} else setNodes((short) (nodes + 1));
dataNode.storeNode(data, oldData, kContext);
// unfix this page
if (Debug.DEBUG_DATAPAGE) logger.debug("insertNode - release the page " + pageNumber);
buffer.releasePage(pageNumber.getTreeId(), pageNumber, true);
return (short) dataNode.getNodeOffset();
} finally {
/**todo release lock in order not to leak lock*/
}
}
private DataNode insertNodeWhenKeyNotExist(int txnId, int newNodeSize, byte[] data, byte nodeFlags, boolean needLog) throws ChaiDBException {
DataNode dataNode;
boolean overflow = false;
/** in BTreePage's insertNode() function, we gurantee this data page has enough space
* to store a data node with minimum space
*/
/* begin : added by marriane 2001-12-28 for bind all insert node log records*/
if (btreeSpec.btree.getType() != IDBIndex.HYPER_BTREE) {
setLogInfo(txnId, false);
}
if (getFreeSpace() >= newNodeSize) {// + 2){ // node size + 2 bytes long offset
setUpperBound((short) (upperBound - newNodeSize));
} else {
setUpperBound((short) (upperBound - BTreeSpec.DATA_NODE_HEADER_SIZE - 4));
overflow = true;
}
dataNode = new DataNode(this, upperBound, data);
if (overflow) {
// dataNode.setFlags((byte)1);
dataNode.setFlags((byte) (1 | nodeFlags));
dataNode.setNodeSpace((short) (BTreeSpec.DATA_NODE_HEADER_SIZE + 4));
} else {
// dataNode.setFlags((byte)0);
dataNode.setFlags((byte) (0xFE & nodeFlags));
dataNode.setNodeSpace((short) newNodeSize);
}
//setNodes((short) (nodes + 2));
if (btreeSpec.btree.getType() != IDBIndex.HYPER_BTREE) {
setLogInfo(txnId, needLog);
}
return dataNode;
}
private void insertWhenKeyExistAndOverflow(DataNode dataNode, boolean needLog, int txnId) throws ChaiDBException {
// free the overflow pages
PageNumber overflowPageNumber = new PageNumber(ByteTool.bytesToInt(page, dataNode.getNodeOffset() + BTreeSpec.DATA_NODE_HEADER_SIZE, btreeSpec.isMsbFirst()));
overflowPageNumber.setTreeId(btreeSpec.btree.getBtreeId());
DataPage overflowPage = new DataPage(btreeSpec.btree.getBtreeId(), overflowPageNumber, btreeSpec, buffer);
// put into freeList
/* begin:added by marriane 2001-9-29 for logging free overflow page log record */
if (needLog) {
int pgno = overflowPage.pageNumber.getPageNumber();
short upBound = overflowPage.upperBound;
short lowBound = (short) BTreeSpec.PAGE_HEADER_SIZE;
BTreeFreeOverflowPageLogRecord lr = new BTreeFreeOverflowPageLogRecord(pageNumber.getTreeId(), pgno, txnId, ByteTool.copyByteArray(overflowPage.page, 0, lowBound), ByteTool.copyByteArray(overflowPage.page, upBound, BTreeSpec.PAGE_SIZE - upBound), btreeSpec.btree.getType());
lr.log();
}
/* end:added by marriane 2001-9-29 for logging free overflow page log record */
buffer.addToFreeList(btreeSpec.btree.getBtreeId(), overflowPage.pageNumber, needLog ? new Integer(txnId) : null);
while (overflowPage.nextPage.getPageNumber() > 0) {
overflowPage = new DataPage(btreeSpec.btree.getBtreeId(), overflowPage.nextPage, btreeSpec, buffer);
// put into freeList
/* begin:added by marriane 2001-9-29 for logging free overflow page log record */
if (needLog) {
int pgno = overflowPage.pageNumber.getPageNumber();
short upBound = overflowPage.upperBound;
short lowBound = (short) BTreeSpec.PAGE_HEADER_SIZE;
BTreeFreeOverflowPageLogRecord lr = new BTreeFreeOverflowPageLogRecord(pageNumber.getTreeId(), pgno, txnId, ByteTool.copyByteArray(overflowPage.page, 0, lowBound), ByteTool.copyByteArray(overflowPage.page, upBound, BTreeSpec.PAGE_SIZE - upBound), btreeSpec.btree.getType());
lr.log();
}
/* end:added by marriane 2001-9-29 for logging free overflow page log record */
//v.add(overflowPage.pageNumber);
buffer.addToFreeList(btreeSpec.btree.getBtreeId(), overflowPage.pageNumber, needLog ? new Integer(txnId) : null);
}
}
/**
* @param nodeOffset
* @return an object representing the indicated node on this page
*/
// did NOT unfix the page ###
public DataNode getNode(int nodeOffset) throws ChaiDBException {
// Build and return a node object
return new DataNode(this, nodeOffset);
}
/**
* @param kContext KernelContext of this caller
* @param nodeOff location of an existed node or -1 of nonexisted ones
* @return true if the key found; otherwise return false
*/
public boolean deleteNode(KernelContext kContext, int nodeOff) throws ChaiDBException {
int txnId = kContext.getLocker();
boolean needLog = kContext.getNeedLog();
try {
/***** added by marriane *****/
setLogInfo(txnId, needLog);
/*****************************/
// fix the page
page = buffer.getPage(btreeSpec.btree.getBtreeId(), pageNumber);
if (page == null)
throw new ChaiDBException(ErrorCode.BTREE_INVALID_DATAPAGE, "Page is null: " + pageNumber.toHexString() + " of " + btreeSpec.getBtreeName());
DataNode node = getNode(nodeOff);
// 1. free the overflow pages
if (node.isOverflow()) {
PageNumber overflowPageNumber = new PageNumber(ByteTool.bytesToInt(page, node.getNodeOffset() + BTreeSpec.DATA_NODE_HEADER_SIZE, btreeSpec.isMsbFirst()));
if (needLog) {
byte[] removeData = ByteTool.append(node.getHeader(), ByteTool.intToBytes(overflowPageNumber.getPageNumber()));
BTreeAddRemoveLogRecord logRec = new BTreeAddRemoveLogRecord(pageNumber.getTreeId(), pageNumber.getPageNumber(), txnId, BTreeAddRemoveLogRecord.REMOVE_FLAG, node.getNodeOffset(), removeData, btreeSpec.btree.getType());
logRec.log();
}
overflowPageNumber.setTreeId(btreeSpec.btree.getBtreeId());
DataPage overflowPage = new DataPage(btreeSpec.btree.getBtreeId(), overflowPageNumber, btreeSpec, buffer);
// put into freeList
/* begin:added by marriane 2001-9-29 for logging free overflow page log record */
if (needLog) {
int pgno = overflowPage.pageNumber.getPageNumber();
short upBound = overflowPage.upperBound;
short lowBound = (short) BTreeSpec.PAGE_HEADER_SIZE;
BTreeFreeOverflowPageLogRecord lr = new BTreeFreeOverflowPageLogRecord(pageNumber.getTreeId(), pgno, txnId, ByteTool.copyByteArray(overflowPage.page, 0, lowBound), ByteTool.copyByteArray(overflowPage.page, upBound, BTreeSpec.PAGE_SIZE - upBound), btreeSpec.btree.getType());
lr.log();
}
/* end:added by marriane 2001-9-29 for logging free overflow page log record */
buffer.addToFreeList(btreeSpec.btree.getBtreeId(), overflowPage.pageNumber, needLog ? new Integer(txnId) : null);
while (overflowPage.nextPage.getPageNumber() > 0) {
overflowPage = new DataPage(btreeSpec.btree.getBtreeId(), overflowPage.nextPage, btreeSpec, buffer);
// put into freeList
/* begin:added by marriane 2001-9-29 for logging free overflow page log record */
if (needLog) {
int pgno = overflowPage.pageNumber.getPageNumber();
short upBound = overflowPage.upperBound;
short lowBound = (short) BTreeSpec.PAGE_HEADER_SIZE;
BTreeFreeOverflowPageLogRecord lr = new BTreeFreeOverflowPageLogRecord(pageNumber.getTreeId(), pgno, txnId, ByteTool.copyByteArray(overflowPage.page, 0, lowBound), ByteTool.copyByteArray(overflowPage.page, upBound, BTreeSpec.PAGE_SIZE - upBound), btreeSpec.btree.getType());
lr.log();
}
/* end:added by marriane 2001-9-29 for logging free overflow page log record */
buffer.addToFreeList(btreeSpec.btree.getBtreeId(), overflowPage.pageNumber, needLog ? new Integer(txnId) : null);
}
} else {
if (needLog) {
/* logging remove data log record */
byte[] removeData = ByteTool.append(node.getHeader(), node.getData());
BTreeAddRemoveLogRecord logRec = new BTreeAddRemoveLogRecord(pageNumber.getTreeId(), pageNumber.getPageNumber(), txnId, BTreeAddRemoveLogRecord.REMOVE_FLAG, node.getNodeOffset(), removeData, btreeSpec.btree.getType());
logRec.log();
}
}
// 3. rest clean up, upperBound, nodes
/* begin:added by marriane 2001-9-29 logging for delete offset and move offset after it*/
//if this node is the top one, decrease upperbound by page space occupied by this node
if (node.getNodeOffset() == upperBound) setUpperBound((short) (upperBound + node.getNodeSpace()));
this.setLogInfo(txnId, needLog);
setNodes((short) (nodes - 1));
buffer.releasePage(pageNumber.getTreeId(), pageNumber, true);
//now we have delete the node, then we can return.
return true;
} finally {
/**todo release lock in order not to leak lock*/
if (this.getCurrNodeNumbers() == 0) {
setUpperBound((short) BTreeSpec.PAGE_SIZE);
}
}
}
/**
* Set the txnId and needLog flag of this page
*/
void setLogInfo(int txnId, boolean needLog) {
this.txnId = txnId;
this.needLog = needLog;
}
}