/*
Copyright (C) 2007 Mobixess Inc. http://www.java-objects-database.com
This file is part of the JODB (Java Objects Database) open source project.
JODB is free software; you can redistribute it and/or modify it under
the terms of version 2 of the GNU General Public License as published
by the Free Software Foundation.
JODB is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package com.mobixess.jodb.core.index;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ConcurrentModificationException;
import java.util.WeakHashMap;
import com.mobixess.jodb.core.JODBTransient;
import com.mobixess.jodb.core.JodbIOException;
import com.mobixess.jodb.core.agent.JODBAgent;
import com.mobixess.jodb.core.io.JODBOperationContext;
import com.mobixess.jodb.core.transaction.TransactionContainer;
import com.mobixess.jodb.util.PrimitiveJavaTypesUtil;
import com.mobixess.jodb.util.PrimitiveJavaTypesUtil.PRIMITIVES_ENUMERATION;
public class JODBIndexingAgent extends JODBAgent {
private final static int CHUNK_HOLDER_DEFAULT_CAPACITY = 1000;
private final static int CHUNK_HOLDER_DEFAULT_INCREMENT = 500;
private final static int CHUNK_CAPACITY = 1000;
private final static double PREFERRED_CHUNK_LOAD_FACTOR = 0.75;
private final static boolean DEEP_SANITY_CHECK = false;
private int _fieldId;
private int _fieldTypeEnumIndex;
private int _classId;
private int _dataElementSize;
private long[][] _offsetArrays;
private byte[][] _dataArrays;
private int[] _totalIndexesWithinChunks;
private int _totalChunks;
private int _totalIndexes;
@JODBTransient private transient boolean _activated;
@JODBTransient private transient WeakHashMap<long[], ByteBuffer> _chunksWrappersCache;
@JODBTransient private transient PRIMITIVES_ENUMERATION _fieldTypeEnum;
public JODBIndexingAgent() {
}
/**
* @param fieldId
* @throws JodbIOException
*/
public JODBIndexingAgent(Field field, JODBOperationContext context) throws IOException {
super();
if(field == null || context == null){
throw new NullPointerException();
}
_classId = context.getBase().getOrSetClassTypeSubstitutionID(field.getDeclaringClass());
_fieldId = context.getBase().getOrSetFieldSubstitutionID(field);
_fieldTypeEnum = PrimitiveJavaTypesUtil.getEnumeratedType(field.getType().getName());
_fieldTypeEnumIndex = _fieldTypeEnum.ordinal();
_totalIndexesWithinChunks = new int[CHUNK_HOLDER_DEFAULT_CAPACITY];
_offsetArrays = new long[CHUNK_HOLDER_DEFAULT_CAPACITY][];
_offsetArrays[0] = new long[CHUNK_CAPACITY];
_totalChunks = 1;
Class type = field.getType();
if(type.isPrimitive()){
try {
_dataElementSize = PrimitiveJavaTypesUtil.getDataOutputWriteLen(type);
_dataArrays = new byte[CHUNK_HOLDER_DEFAULT_CAPACITY][];
_dataArrays[0] = new byte[CHUNK_CAPACITY*_dataElementSize];
} catch (JodbIOException e) {
// TODO log
e.printStackTrace();
}
}
_activated = true;
}
private synchronized void checkIfActivated(JODBOperationContext context) throws IOException{
if(_activated){
return;
}
context.getSession().activate(this, Integer.MAX_VALUE);
_activated = true;
}
private PRIMITIVES_ENUMERATION getFieldTypeEnum(){
if(_fieldTypeEnum == null){
_fieldTypeEnum = PRIMITIVES_ENUMERATION.values()[_fieldTypeEnumIndex];
}
return _fieldTypeEnum;
}
private void sanityCheck(){
int indexesInChunks = 0;
for (int i = 0; i < _totalChunks; i++) {
indexesInChunks+=_totalIndexesWithinChunks[i];
}
if(indexesInChunks!=_totalIndexes){
throw new RuntimeException("Sanity check failed indexesInChunks!=_totalIndexes");
}
for (int i = 0; i < _totalChunks; i++) {
for (int j = 0; j < _totalIndexesWithinChunks[i]; j++) {
int nextChunk = i, nextIndex = j+1;
if(nextIndex == _totalIndexesWithinChunks[i]){
nextChunk++;
nextIndex = 0;
}
checkOffsetNotExists(_offsetArrays[i][j],nextChunk,nextIndex);
}
}
/*
for (int i = 0; i < _totalChunks; i++) {
for (int j = 0; j < _totalIndexesWithinChunks[i]; j++) {
int nextChunk = i, nextIndex = j+1;
if(nextIndex == _totalIndexesWithinChunks[i]){
nextChunk++;
nextIndex = 0;
}
checkOffsetNotExists(_offsetArrays[i][j],nextChunk,nextIndex);
}
}
*/
for (int i = 0; i < _totalIndexes-1; i++) {
ByteBuffer first = getValueForIndex(i,true);
ByteBuffer next = getValueForIndex(i+1, true);
try {
int compareResult = PrimitiveJavaTypesUtil.comparePrimitives(first, getFieldTypeEnum(), next);
if(compareResult > 0){
throw new RuntimeException();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// if(first.compareTo(next)>0){
// throw new RuntimeException();
// }
}
}
private void checkOffsetNotExists(long offset, int startChunk, int startOffsetWithinChunk){
for (int i = startChunk; i < _totalChunks; i++) {
for (int j = startOffsetWithinChunk; j < _totalIndexesWithinChunks[i]; j++) {
if(offset == _offsetArrays[i][j]){
throw new RuntimeException();
}
}
}
}
public int getFieldId() {
return _fieldId;
}
public int getClassId() {
return _classId;
}
public synchronized void insertIndex(long offsetId, ByteBuffer value, JODBOperationContext context) throws IOException{
// if(DEEP_SANITY_CHECK){
// sanityCheck();
// }
int insertionIndex = searchIndex(value);
if(insertionIndex < 0){
insertionIndex = -insertionIndex - 1;
}
long chunkIndexAndChunkStart = getChunkForIndex(insertionIndex);
int chunkIndex = (int) ((chunkIndexAndChunkStart>>32)&0xFFFFFFFF);
int chunkStart = (int) (chunkIndexAndChunkStart&0xFFFFFFFF);
if(chunkIndex == _totalChunks){
insertChunk(chunkIndex, context);
}
int indexInChunk = insertionIndex - chunkStart;
reserveSpace(chunkIndex, indexInChunk, context);
_offsetArrays[chunkIndex][indexInChunk] = offsetId;
if(value!=null){
value.get(_dataArrays[chunkIndex],indexInChunk*_dataElementSize , _dataElementSize);
}
// if(DEEP_SANITY_CHECK){
// sanityCheck();
// }
}
public synchronized boolean removeIndex(long objectId, ByteBuffer value, JODBOperationContext context) throws IOException{
// if(DEEP_SANITY_CHECK){
// sanityCheck();
// }
long compositeIndex = searchIndex(objectId, value);
if(compositeIndex == -1){
if(DEEP_SANITY_CHECK){
sanityCheck();
}
compositeIndex = searchIndex(objectId, value);
return false;
}
int chunkIndex = (int) ((compositeIndex >> 32) & 0xFFFFFFFF);
int indexInChunk = (int) (compositeIndex & 0xFFFFFFFF);
long[] offsetsChunk = _offsetArrays[chunkIndex];
System.arraycopy(offsetsChunk, indexInChunk+1, offsetsChunk, indexInChunk, _totalIndexesWithinChunks[chunkIndex]-indexInChunk-1);
byte[] dataChunks = _dataArrays[chunkIndex];
System.arraycopy(dataChunks, (indexInChunk+1)*_dataElementSize, dataChunks, indexInChunk*_dataElementSize, (_totalIndexesWithinChunks[chunkIndex]-indexInChunk-1)*_dataElementSize);
_totalIndexesWithinChunks[chunkIndex]--;
_totalIndexes--;
// if(DEEP_SANITY_CHECK){
// sanityCheck();
// }
return true;
}
private void reserveSpace(int chunkIndex, int indexInChunk, JODBOperationContext context){
long[] currentOffsetsChunk = _offsetArrays[chunkIndex];
int chunkLen = _offsetArrays[chunkIndex].length;
if( _totalIndexesWithinChunks[chunkIndex] == chunkLen ){
//move indexes to next chunk or split
int indexesToMove = Math.min((int) (chunkLen*PREFERRED_CHUNK_LOAD_FACTOR), chunkLen-indexInChunk );
if(chunkIndex == _totalChunks - 1 || (_offsetArrays[chunkIndex+1].length - _totalIndexesWithinChunks[chunkIndex+1]) <= indexesToMove ){
insertChunk(chunkIndex+1, context);
indexesToMove = Math.min((int) (chunkLen*0.5), chunkLen-indexInChunk );
}
long[] nextOffsetsChunk = _offsetArrays[chunkIndex+1];
//make sure next chunk have space
System.arraycopy(nextOffsetsChunk,0,nextOffsetsChunk,indexesToMove,_totalIndexesWithinChunks[chunkIndex+1] );
if(_dataArrays!=null){
byte[] nexDataArrayChunk = _dataArrays[chunkIndex+1];
System.arraycopy(nexDataArrayChunk,0,nexDataArrayChunk,indexesToMove*_dataElementSize,_totalIndexesWithinChunks[chunkIndex+1]*_dataElementSize );
}
//move data
int srcIndex = _totalIndexesWithinChunks[chunkIndex] - indexesToMove;
System.arraycopy(currentOffsetsChunk, srcIndex, nextOffsetsChunk, 0 , indexesToMove);
if(_dataArrays!=null){
byte[] nexDataArrayChunk = _dataArrays[chunkIndex+1];
byte[] currentDataArrayChunk = _dataArrays[chunkIndex];
System.arraycopy(currentDataArrayChunk, srcIndex*_dataElementSize, nexDataArrayChunk, 0 , indexesToMove*_dataElementSize);
}
_totalIndexesWithinChunks[chunkIndex] = srcIndex;//new chunk length
_totalIndexesWithinChunks[chunkIndex+1] = _totalIndexesWithinChunks[chunkIndex+1]+indexesToMove;
}
//space is guaranteed from here
System.arraycopy(currentOffsetsChunk, indexInChunk, currentOffsetsChunk, indexInChunk+1, _totalIndexesWithinChunks[chunkIndex]-indexInChunk);
if(_dataArrays!=null){
byte[] currentDataArrayChunk = _dataArrays[chunkIndex];
System.arraycopy(currentDataArrayChunk, indexInChunk*_dataElementSize, currentDataArrayChunk, (indexInChunk+1)*_dataElementSize, (_totalIndexesWithinChunks[chunkIndex]-indexInChunk)*_dataElementSize);
}
_totalIndexesWithinChunks[chunkIndex]++;
_totalIndexes++;
}
/**
*
* @param index
* @return -1 means need chunk append
*/
public final long getChunkForIndex(int index){
int chunkIndex = -1;
int totalInPreviousChunks = 0;
int currentChunkStart = 0;
do{
chunkIndex++;
currentChunkStart = totalInPreviousChunks;
totalInPreviousChunks+=_totalIndexesWithinChunks[chunkIndex];
} while(totalInPreviousChunks < index + 1 && chunkIndex < _totalChunks-1);
if(totalInPreviousChunks < index+1 && _totalIndexesWithinChunks[chunkIndex] == _offsetArrays[chunkIndex].length){
currentChunkStart = totalInPreviousChunks;
chunkIndex++;
}
return (long)chunkIndex<<32|(long)currentChunkStart;
}
private ByteBuffer getValueForIndex(int index){
return getValueForIndex(index, false);
}
private ByteBuffer getValueForIndex(int index, boolean newChunkWrapper){
long compositeIndex = getChunkForIndex(index);
int chunkIndex = (int) ((compositeIndex>>32)&0xFFFFFFFF);
int chunkStart = (int) (compositeIndex&0xFFFFFFFF);
int indexInChunk = index - chunkStart;
return getValueForIndex(chunkIndex, indexInChunk, newChunkWrapper);
}
private ByteBuffer getValueForIndex(int chunkIndex, int offsetIndexInChunk){
return getValueForIndex(chunkIndex, offsetIndexInChunk, false);
}
private ByteBuffer getValueForIndex(int chunkIndex, int offsetIndexInChunk, boolean newByteBuffer){
ByteBuffer byteBuffer;
if(!newByteBuffer){
byteBuffer = getWrapperForChunk(chunkIndex);
}else{
byteBuffer = ByteBuffer.wrap(_dataArrays[chunkIndex]);
}
getValueForIndex(chunkIndex, offsetIndexInChunk, byteBuffer);
return byteBuffer;
}
private void getValueForIndex(int chunkIndex, int offsetIndexInChunk, ByteBuffer chunkWrapper ){
int indexInChunk = offsetIndexInChunk*_dataElementSize;
chunkWrapper.limit(indexInChunk+_dataElementSize);
chunkWrapper.position(indexInChunk);
}
private ByteBuffer getWrapperForChunk(int chunkIndex){
if(_chunksWrappersCache == null){
_chunksWrappersCache = new WeakHashMap<long[], ByteBuffer>();
}
ByteBuffer result = _chunksWrappersCache.get(_offsetArrays[chunkIndex]);
if(result==null){
result = ByteBuffer.wrap(_dataArrays[chunkIndex]);
_chunksWrappersCache.put(_offsetArrays[chunkIndex], result);
}
return result;
}
private int linearValueSearch(ByteBuffer valueKey){
for (int i = 0; i < _totalIndexes; i++) {
ByteBuffer array_key = getValueForIndex(i);
if(valueKey.equals(array_key)){
return i;
}
}
return -1;
}
public int linearIdSearch(long id){
int index = 0;
for (int i = 0; i < _totalChunks; i++) {
for (int j = 0; j < _totalIndexesWithinChunks[i]; j++) {
if(id == _offsetArrays[i][j]){
return index;
}
index++;
}
}
return -1;
}
/**
*
* @param objectId
* @param valueKey
* @return "chunk index" << 32 | "index in chunk"
* @throws IOException
*/
private long searchIndex(long objectId, ByteBuffer valueKey) throws IOException{
int index = binarySearch(valueKey);
// if(objectId == 45001){
// index = linearValueSearch(valueKey);
// System.err.println("linear ID search "+linearIdSearch(objectId));
// }
long compositeIndex = getChunkForIndex(index);
int chunkIndexForward, chunkIndexBackward;
chunkIndexForward = chunkIndexBackward = (int) ((compositeIndex>>32)&0xFFFFFFFF);
int chunkStartForward, chunkStartBackward;
chunkStartForward = chunkStartBackward = (int) (compositeIndex&0xFFFFFFFF);
int indexInChunkBackward = index - chunkStartBackward;
int indexInChunkForward = indexInChunkBackward+1;
boolean continueBackward = true;
while (continueBackward) {
if(_offsetArrays[chunkIndexBackward][indexInChunkBackward] == objectId){
return (long)chunkIndexBackward<<32|indexInChunkBackward;
}
indexInChunkBackward--;
if(indexInChunkBackward < 0){
chunkStartBackward-=_totalIndexesWithinChunks[chunkIndexBackward];//move chunk start to previous chunk start
chunkIndexBackward--;
if(chunkIndexBackward<0){
break;
}
indexInChunkBackward = _totalIndexesWithinChunks[chunkIndexBackward]-1;
}
ByteBuffer nextValue = getValueForIndex(chunkIndexBackward, indexInChunkBackward);
continueBackward = valueKey.equals(nextValue);
}
boolean continueForward = true;
while (continueForward) {
if(indexInChunkForward >= _totalIndexesWithinChunks[chunkIndexForward]){
chunkStartForward += _totalIndexesWithinChunks[chunkIndexForward];//move chunk end to next chunk start
chunkIndexForward++;
if( chunkIndexForward >= _totalChunks ){
break;
}
indexInChunkForward = 0;
}
if(_offsetArrays[chunkIndexForward][indexInChunkForward] == objectId){
return (long)chunkIndexForward<<32|indexInChunkForward;
}
ByteBuffer nextValue = getValueForIndex(chunkIndexForward, indexInChunkForward);
continueForward = valueKey.equals(nextValue);
indexInChunkForward++;
}
return -1;
}
private int searchIndex( ByteBuffer key) throws IOException{
return binarySearch(key);
}
private int binarySearch( ByteBuffer key) throws IOException {
int fromIndex = 0, toIndex = _totalIndexes;
int low = fromIndex;
int high = toIndex-1;
while( low <= high ) {
int mid = (low + high) >> 1;
ByteBuffer array_key = getValueForIndex(mid);
int result = -PrimitiveJavaTypesUtil.comparePrimitives(array_key, array_key.position(), getFieldTypeEnum(), key); //compareTo(key, array_key);//key.compareTo(array_key);// c.compare( key, array_key );
if( result < 0 ) {
high = mid - 1;
} else if( result > 0 ) {
low = mid + 1;
} else {
return mid;
}
}
return -(low + 1);
}
/**
* Compares this buffer to another.
*
* <p> Two byte buffers are compared by comparing their sequences of
* remaining elements lexicographically, without regard to the starting
* position of each sequence within its corresponding buffer.
*
* <p> A byte buffer is not comparable to any other type of object.
*
* @return A negative integer, zero, or a positive integer as this buffer
* is less than, equal to, or greater than the given buffer
* @throws IOException
*/
public int compareTo(ByteBuffer thisKey, ByteBuffer that) throws IOException {
return PrimitiveJavaTypesUtil.comparePrimitives(thisKey, 0, getFieldTypeEnum(), that);
/*int n = thisKey.position() + Math.min(thisKey.remaining(), that.remaining());
for (int i = thisKey.position(), j = that.position(); i < n; i++, j++) {
byte v1 = thisKey.get(i);
byte v2 = that.get(j);
if (v1 == v2)
continue;
if ((v1 != v1) && (v2 != v2)) // For float and double
continue;
if (v1 < v2)
return -1;
return +1;
}
return thisKey.remaining() - that.remaining();*/
}
private synchronized void insertChunk(int index, JODBOperationContext context){
TransactionContainer transactionContainer = context!=null ? context.getTransactionContainer() : null;
_offsetArrays = JODBIndexingRootAgent.ensurePersistentArrayCapacity(_offsetArrays, _totalChunks+1, transactionContainer, CHUNK_HOLDER_DEFAULT_CAPACITY);
if(index!=_totalChunks){
System.arraycopy(_offsetArrays, index, _offsetArrays, index+1, _totalChunks-index);
System.arraycopy(_totalIndexesWithinChunks, index, _totalIndexesWithinChunks, index+1, _totalChunks-index);
}
_offsetArrays[index] = new long[CHUNK_CAPACITY];
if(transactionContainer != null){//make sure the new array is registered in transaction container
try {
transactionContainer.set(_offsetArrays, Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
_totalIndexesWithinChunks[index] = 0;
if(_dataArrays!=null){
_dataArrays = JODBIndexingRootAgent.ensurePersistentArrayCapacity(_dataArrays, _totalChunks+1, transactionContainer, CHUNK_HOLDER_DEFAULT_CAPACITY);
if(index!=_totalChunks){
System.arraycopy(_dataArrays, index, _dataArrays, index+1, _totalChunks-index);
}
_dataArrays[index] = new byte[CHUNK_CAPACITY*_dataElementSize];
if(transactionContainer != null){//make sure the new array is registered in transaction container
try {
transactionContainer.set(_dataArrays, Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
_totalChunks++;
}
public IndexDataIterator getIndexIterator(boolean isForwardIterator){
return isForwardIterator? new ForwardIndexIteratorImpl() : new BackwardIndexIteratorImpl();
}
class ForwardIndexIteratorImpl implements IndexDataIterator{
int _initialTotalIndexes;
int _indexesLeft;
int _chunkOffset;
int _offsetInChunk;
public ForwardIndexIteratorImpl() {
_initialTotalIndexes = _indexesLeft = _totalIndexes;
_chunkOffset = 0;
_offsetInChunk = -1;
}
public boolean hasNext() {
return _indexesLeft > 0;
}
public int length() {
return _initialTotalIndexes;
}
public long next(ByteBuffer result) {
if(_indexesLeft <= 0) {
throw new IllegalStateException();
}
if(_initialTotalIndexes!=_totalIndexes){
throw new ConcurrentModificationException();
}
_offsetInChunk++;
while(_offsetInChunk >= _totalIndexesWithinChunks[_chunkOffset]){
_chunkOffset++;
_offsetInChunk = 0;
}
_indexesLeft--;
if(result!=null){
byte[] src = _dataArrays[_chunkOffset];
result.put(src, _offsetInChunk*_dataElementSize, _dataElementSize);
}
return _offsetArrays[_chunkOffset][_offsetInChunk];
}
public long next() {
return next(null);
}
}
class BackwardIndexIteratorImpl implements IndexDataIterator{
int _initialTotalIndexes;
int _indexesLeft;
int _chunkOffset;
int _offsetInChunk;
public BackwardIndexIteratorImpl() {
_initialTotalIndexes = _indexesLeft = _totalIndexes;
_chunkOffset = _totalChunks-1;
_offsetInChunk = _totalIndexesWithinChunks[_chunkOffset];
}
public boolean hasNext() {
return _indexesLeft > 0;
}
public int length() {
return _initialTotalIndexes;
}
public long next(ByteBuffer result) {
if(_indexesLeft <= 0) {
throw new IllegalStateException();
}
if(_initialTotalIndexes!=_totalIndexes){
throw new ConcurrentModificationException();
}
_offsetInChunk--;
while(_offsetInChunk < 0 ||_totalIndexesWithinChunks[_chunkOffset]<=0){
_chunkOffset--;
_offsetInChunk = _totalIndexesWithinChunks[_chunkOffset]-1;
}
_indexesLeft--;
if(result!=null){
byte[] src = _dataArrays[_chunkOffset];
result.put(src, _offsetInChunk*_dataElementSize, _dataElementSize);
}
return _offsetArrays[_chunkOffset][_offsetInChunk];
}
public long next() {
return next(null);
}
}
}