package org.gridkit.coherence.offheap.storage.memlog;
import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PagedMemoryBinaryStoreManager2 implements BinaryStoreManager {
private static int EMPTY = 0;
private static int HASH_POS = 0;
private static int KEY_SIZE_POS = 4;
private static int VALUE_SIZE_POS = 8;
private static int DATA_POS = 12;
private static int ALLOC_NEW_VALUE = 0;
private static int ALLOC_NEW_LIST = 1;
private static int ALLOC_RELOCATE_VALUE = 0;
private static long MEM_DIAG_REPORT_PERIOD = TimeUnit.SECONDS.toNanos(10);
private final String name;
private List<BinaryHashTable> tables = new ArrayList<BinaryHashTable>();
private MemoryStoreBackend pageManager;
private Thread maintenanceDaemon;
public PagedMemoryBinaryStoreManager2(String name, MemoryStoreBackend pageManager) {
this.name = name;
this.pageManager = pageManager;
this.maintenanceDaemon = createMaintenanceThread();
}
private Thread createMaintenanceThread() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
maintenanceCycle();
}
});
thread.setName("PagedMemoryBinaryStore-" + name + "-ServiceThread");
thread.setDaemon(true);
return thread;
}
@Override
public synchronized BinaryStore create() {
BinaryHashTable hash = new BinaryHashTable();
tables.add(hash);
if (maintenanceDaemon.getState() == State.NEW) {
maintenanceDaemon.start();
}
return hash;
}
@Override
public synchronized void destroy(BinaryStore store) {
BinaryHashTable hash = (BinaryHashTable) store;
// TODO check owner
int n = tables.indexOf(store);
tables.remove(n);
hash.clear();
}
@SuppressWarnings("deprecation")
public synchronized void close() {
List<BinaryHashTable> tables = new ArrayList<BinaryHashTable>(this.tables);
for(BinaryHashTable table: tables) {
destroy(table);
}
if (maintenanceDaemon.getState() != State.NEW) {
// TODO graceful death
maintenanceDaemon.stop();
// try {
// maintenanceDaemon.join();
// } catch (InterruptedException e) {
// // ignore
// }
}
}
private void maintenanceCycle() {
int n = 0;
int idle = 0;
long diagTimestamp = System.nanoTime();
int[] evacuationHashes = new int[1024];
BinaryHashTable[] tableSet = new BinaryHashTable[0];
while(true) {
if (n % 500 == 0) {
synchronized(this) {
tableSet = tables.toArray(tableSet);
}
}
if (diagTimestamp + MEM_DIAG_REPORT_PERIOD < System.nanoTime()) {
pageManager.dumpStatistics();
synchronized (this) {
int x = 0;
for(BinaryHashTable table : tables) {
StringBuilder buf = new StringBuilder();
buf.append("Hashtable #" + x).append("\n");
buf.append("Size: ").append(table.size.get()).append("\n");
buf.append("Capacity: ").append(table.capacity).append("\n");
buf.append("Load factor: ").append(String.format("%f", 1.0d * table.size.get() / table.capacity)).append("\n");
System.out.println(buf.toString());
++x;
}
}
diagTimestamp = System.nanoTime();
}
if (tableSet.length == 0) {
++idle;
}
else {
int len = pageManager.collectHashesForEvacuation(evacuationHashes, 0);
if (len == 0) {
++idle;
}
else {
evacuateEntries(tableSet, evacuationHashes, len);
Thread.yield();
}
}
++n;
if (idle > 10) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(300));
idle = 0;
}
}
}
private void evacuateEntries(BinaryHashTable[] tableSet, int[] evacuationHashes, int hashCount) {
for(BinaryHashTable table: tableSet) {
table.tableLock.readLock().lock();
try {
int pervHash = evacuationHashes[0] + 1;
for (int i = 0; i != hashCount; ++i) {
int hash = evacuationHashes[i];
if (hash != pervHash) {
table.recycleHash(hash);
}
pervHash = hash;
}
}
finally {
table.tableLock.readLock().unlock();
}
}
}
// private void doTableMaintenance(BinaryHashTable table) {
// int n = 0;
//
// try {
// table.checkTablePhysicalSize();
// refresh:
// while(true) {
// table.tableLock.readLock().lock();
// try {
// for(int i = 0; i != 16; ++i) {
// table.recycleEntry(n++);
// if (n > table.capacity) {
// break refresh;
// }
// }
// }
// finally {
// table.tableLock.readLock().unlock();
// }
// if (n % (1 << 10) == 0) {
// table.checkTablePhysicalSize();
// }
// }
//
// table.maintenanceTimestamp = System.nanoTime();
// }
// catch(NullPointerException e) {
// // may happen if table is erased
// }
// }
private class BinaryHashTable implements BinaryStore {
AtomicIntegerArray hashtable = new AtomicIntegerArray(1024);
AtomicIntegerArray locktable = createLocktable(hashtable.length());
volatile int capacity = hashtable.length() >> 1;
ReadWriteLock tableLock = new ReentrantReadWriteLock();
AtomicInteger size = new AtomicInteger();
float targetLoadFactor = 0.8f;
float thresholdLoadFactor = 0.99f;
// lock assumed
private int[] getEntries(int index) {
int pointer;
pointer = hashtable.get(index);
if (pointer == 0) {
return null;
}
else if (pointer > 0) {
return new int[]{pointer};
}
else {
pointer = -pointer;
ByteChunk chunk = pageManager.get(pointer);
int[] entries = new int[chunk.lenght() / 4 - 1];
for(int i = 0; i != entries.length; ++i) {
entries[i] = chunk.intAt(4 + i * 4);
}
return entries;
}
}
// lock assumed
private void setEntries(int index, int[] entries) {
int pointer;
pointer = hashtable.get(index);
if (pointer != EMPTY && pointer < 0) {
pointer = -pointer;
pageManager.release(pointer);
}
if (entries == null || entries.length == 0) {
hashtable.set(index, EMPTY);
}
else if (entries.length == 1) {
hashtable.set(index, entries[0]);
}
else {
ByteChunk first = pageManager.get(entries[0]);
int hash = first.intAt(0);
int npp = pageManager.allocate(4 + 4 * entries.length, ALLOC_NEW_LIST);
ByteChunk list = pageManager.get(npp);
try {
list.assertEmpty();
}
catch(AssertionError e) {
System.out.println("Problem pointer is " + pageManager.page(npp) + ":" + pageManager.offset(npp));
throw e;
}
list.putInt(0, hash);
for(int i = 0; i != entries.length; ++i) {
list.putInt(4 + 4 * i, entries[i]);
}
// not required for in-heap backend
pageManager.update(npp, list);
hashtable.set(index, -npp);
}
}
public void clear() {
tableLock.writeLock().lock();
try {
for(int i = 0; i != capacity; ++i) {
int[] list = getEntries(i);
if (list != null) {
for(int pp: list) {
pageManager.release(pp);
}
setEntries(i, null);
}
}
hashtable = new AtomicIntegerArray(1024);
capacity = hashtable.length() >> 1;
size.set(0);
}
finally {
tableLock.writeLock().unlock();
}
}
private boolean sameKey(ByteChunk entry, ByteChunk key) {
int keySize = entry.intAt(KEY_SIZE_POS);
if (keySize == key.lenght()) {
for (int i = 0; i != keySize; ++i) {
if (entry.at(DATA_POS + i) != key.at(i)) {
return false;
}
}
return true;
}
else {
return false;
}
}
private ByteChunk getValue(ByteChunk entry) {
int keySize = entry.intAt(KEY_SIZE_POS);
int valueSize = entry.intAt(VALUE_SIZE_POS);
return entry.subChunk(DATA_POS + keySize, valueSize);
}
@Override
public int size() {
return size.get();
}
@Override
public ByteChunk get(ByteChunk key) {
tableLock.readLock().lock();
try {
int index = hashIndex(key, capacity);
readLock(index);
try {
int[] entries = getEntries(index);
if (entries != null) {
for(int pp : entries) {
ByteChunk entry = pageManager.get(pp);
if (sameKey(entry, key)) {
return getValue(entry);
}
}
}
return null;
}
finally {
readUnlock(index);
}
}
finally {
tableLock.readLock().unlock();
}
}
@Override
public void put(ByteChunk key, ByteChunk value) {
tableLock.readLock().lock();
try {
internalPut(key, value);
}
finally {
tableLock.readLock().unlock();
}
checkTableSize();
}
@Override
public void remove(ByteChunk key) {
tableLock.readLock().lock();
try {
int index = hashIndex(key, capacity);
writeLock(index);
try {
int[] entries = getEntries(index);
if (entries != null) {
for(int pp : entries) {
ByteChunk entry = pageManager.get(pp);
if (sameKey(entry, key)) {
pageManager.release(pp);
if (entries.length == 1) {
setEntries(index, null);
}
else {
int[] newEntries = new int[entries.length - 1];
int n = 0;
for(int pi : entries) {
if (pi != pp) {
newEntries[n++] = pi;
}
}
setEntries(index, newEntries);
}
size.decrementAndGet();
return;
}
}
}
}
finally {
writeUnlock(index);
}
}
finally {
tableLock.readLock().unlock();
}
checkTableSize();
}
// table lock is assumed
private void internalPut(ByteChunk key, ByteChunk value) {
int hash = BinHash.hash(key);
int index = splitHash(hash, capacity);
writeLock(index);
try {
int[] entries = getEntries(index);
if (entries != null) {
for(int i = 0; i != entries.length; ++i) {
int pp = entries[i];
ByteChunk entry = pageManager.get(pp);
if (sameKey(entry, key)) {
// overriding value
pageManager.release(pp);
int npp = pageManager.allocate(DATA_POS + key.lenght() + value.lenght(), ALLOC_NEW_VALUE);
createEntry(npp, key, value, hash);
entries[i] = npp;
setEntries(index, entries);
return;
}
}
}
// TODO refactoring, move allocation to createEntry method
// add new entry
int npp = pageManager.allocate(DATA_POS + key.lenght() + value.lenght(), ALLOC_NEW_VALUE);
createEntry(npp, key, value, hash);
int[] newEntries;
if (entries == null || entries.length == 0) {
newEntries = new int[]{npp};
}
else {
newEntries = Arrays.copyOf(entries, entries.length + 1);
newEntries[entries.length] = npp;
}
setEntries(index, newEntries);
size.incrementAndGet();
}
finally {
writeUnlock(index);
}
}
private void createEntry(int npp, ByteChunk key, ByteChunk value, int hash) {
ByteChunk chunk = pageManager.get(npp);
try {
chunk.assertEmpty();
}
catch(AssertionError e) {
System.out.println("Problem pointer is " + pageManager.page(npp) + ":" + pageManager.offset(npp));
throw e;
}
chunk.putInt(HASH_POS, hash);
chunk.putInt(KEY_SIZE_POS, key.lenght());
chunk.putInt(VALUE_SIZE_POS, value.lenght());
chunk.putBytes(DATA_POS, key);
chunk.putBytes(DATA_POS + key.lenght(), value);
// no need for in-heap storage
pageManager.update(npp, chunk);
}
// tableLock assumed
void recycleHash(int hash) {
while(true) {
int index = splitHash(hash, capacity);
writeLock(index);
try {
if (splitHash(hash, capacity) != index) {
// capacity has been updated
// need to recalculate index
continue;
}
int[] entries = getEntries(index);
if (entries != null && entries.length > 0) {
boolean modified = false;
for(int i = 0; i != entries.length; ++i) {
int pp = entries[i];
if (needRecycle(pp)) {
ByteChunk chunk = pageManager.get(pp);
int npp = pageManager.allocate(chunk.lenght(), ALLOC_RELOCATE_VALUE);
ByteChunk newChunk = pageManager.get(npp);
newChunk.putBytes(chunk);
pageManager.release(pp);
// not required for in-heap storage
pageManager.update(npp, newChunk);
entries[i] = npp;
modified = true;
}
}
if (!modified) {
int pe = hashtable.get(index);
pe = pe > 0 ? pe : -pe;
if (needRecycle(pe)) {
modified = true;
}
}
if (modified) {
setEntries(index, entries);
}
}
}
finally {
writeUnlock(index);
}
break;
}
}
private boolean needRecycle(int pointer) {
return pageManager.isMarkedForRecycle(pointer);
}
private void checkTableSize() {
float loadFactor = ((float)size.get()) / capacity;
if (loadFactor > thresholdLoadFactor && (capacity == hashtable.length())){
checkTablePhysicalSize();
}
if (loadFactor > targetLoadFactor && (capacity < hashtable.length())) {
growTable(4);
}
}
void checkTablePhysicalSize() {
if (capacity > ((hashtable.length() * 8) / 10)) {
// need to resize table
int delta = (hashtable.length() / 2) & 0xFFFFF800;
if (delta < 1024) {
delta = 1024;
}
tableLock.writeLock().lock();
try {
AtomicIntegerArray newTable = new AtomicIntegerArray(hashtable.length() + delta);
for(int i = 0; i != hashtable.length(); ++i) {
newTable.set(i, hashtable.get(i));
}
hashtable = newTable;
locktable = createLocktable(hashtable.length());
}
finally {
tableLock.writeLock().unlock();
}
}
}
private void growTable(int n) {
tableLock.readLock().lock();
//checkHashConsistency();
try {
for(int i = 0; i != n; ++i) {
if (capacity == hashtable.length()) {
return;
}
int nRound = Integer.highestOneBit(capacity);
int nSplit = (capacity) & ~nRound;
int nLast = capacity;
writeLock(nSplit);
writeLock(nLast);
try {
++capacity;
int[] entries = getEntries(nSplit);
if (entries != null) {
int n1 = 0;
int[] el1 = new int[entries.length];
int n2 = 0;
int[] el2 = new int[entries.length];
for(int pp: entries) {
ByteChunk chunk = pageManager.get(pp);
int hash = chunk.intAt(HASH_POS);
int index = splitHash(hash, capacity);
if (index == nSplit) {
el1[n1++] = pp;
}
else if (index == nLast) {
el2[n2++] = pp;
}
else {
throw new AssertionError("New index of hash " + Integer.toHexString(hash) +" is " + index + ", expected values eigther " + nSplit + " or " + nLast);
}
}
el1 = Arrays.copyOf(el1, n1);
el2 = Arrays.copyOf(el2, n2);
setEntries(nSplit, el1);
setEntries(nLast, el2);
}
}
finally {
writeUnlock(nSplit);
writeUnlock(nLast);
}
//checkHashConsistency();
}
}
finally {
tableLock.readLock().unlock();
}
}
@SuppressWarnings("unused") // for testing
private void checkHashConsistency() {
tableLock.readLock().lock();
try {
for(int i = 0; i != capacity; ++i) {
int[] entries = getEntries(i);
if (entries != null) {
for(int pp : entries) {
ByteChunk entry = pageManager.get(pp);
int keySize = entry.intAt(0);
ByteChunk key = entry.subChunk(8, keySize);
if (hashIndex(key, capacity) != i) {
throw new AssertionError();
}
}
}
}
}
finally {
tableLock.readLock().unlock();
}
}
private int hashIndex(ByteChunk key, int capacity) {
int hash = BinHash.hash(key);
return splitHash(hash, capacity);
}
public AtomicIntegerArray createLocktable(int size) {
AtomicIntegerArray table = new AtomicIntegerArray(size / 4); // 8 bits per lock
return table;
}
private void readLock(int index) {
int n = 0;
while(true) {
byte c = byte_get(locktable, index);
if (c >= 0 && c < 126) {
byte u = (byte) (c + 1) ;
if (byte_compareAndSet(locktable, index, c, u)) {
return;
}
}
++n;
if (n % 10 == 0) {
Thread.yield();
}
}
}
private void readUnlock(int index) {
int n = 0;
while(true) {
byte c = byte_get(locktable, index);
if (c > 0) {
byte u = (byte) (c - 1) ;
if (byte_compareAndSet(locktable, index, c, u)) {
return;
}
}
else if (c < 0) {
byte u = (byte) (c + 1);
if (byte_compareAndSet(locktable, index, c, u)) {
return;
}
}
else {
throw new IllegalStateException("Invalid lock state");
}
++n;
if (n % 10 == 0) {
Thread.yield();
}
}
}
private void writeLock(int index) {
int n = 0;
while(true) {
byte c = byte_get(locktable, index);
if (c == 0) {
byte u = (byte) -1;
if (byte_compareAndSet(locktable, index, c, u)) {
return;
}
}
else if (c < 0) {
// another writer is pending
}
else if (c > 0){
byte u = (byte) (-c - 1);
if (byte_compareAndSet(locktable, index, c, u)) {
break;
}
}
++n;
if (n % 10 == 0) {
Thread.yield();
}
}
// waiting read locks to get released
while(true) {
byte c = byte_get(locktable, index);
if (c == -1) {
return;
}
++n;
if (n % 10 == 0) {
Thread.yield();
}
}
}
private void writeUnlock(int index) {
int n = 0;
while(true) {
byte c = byte_get(locktable, index);
if (c == -1) {
byte u = (byte) 0;
if (byte_compareAndSet(locktable, index, c, u)) {
return;
}
}
else {
throw new IllegalStateException("Broken lock");
}
++n;
if (n % 10 == 0) {
Thread.yield();
}
}
}
private byte byte_get(AtomicIntegerArray table, int index) {
int x = index / 4;
int xx = index % 4;
int word = table.get(x);
return getByte(word, xx);
}
private boolean byte_compareAndSet(AtomicIntegerArray table, int index, byte expected, byte newValue) {
int x = index / 4;
int xx = index % 4;
while(true) {
int word = table.get(x);
byte val = getByte(word, xx);
if (val == expected) {
int newWord = setByte(word, xx, newValue);
if (table.compareAndSet(x, word, newWord)) {
return true;
}
else {
continue;
}
}
else {
return false;
}
}
}
private byte getByte(int word, int i) {
switch(i) {
case 0:
return (byte) (0xFF & word);
case 1:
return (byte) (0xFF & (word >> 8));
case 2:
return (byte) (0xFF & (word >> 16));
case 3:
return (byte) (0xFF & (word >> 24));
default:
throw new IllegalArgumentException("4 bytes per int");
}
}
private int setByte(int word,int i, byte value) {
switch(i) {
case 0:
word &= 0xFFFFFF00;
word |= 0xFF & (int)value;
return word;
case 1:
word &= 0xFFFF00FF;
word |= (0xFF & (int)value) << 8;
return word;
case 2:
word &= 0xFF00FFFF;
word |= (0xFF & (int)value) << 16;
return word;
case 3:
word &= 0x00FFFFFF;
word |= (0xFF & (int)value) << 24;
return word;
default:
throw new IllegalArgumentException("4 bytes per int");
}
}
@Override
public Iterator<ByteChunk> keys() {
return null;
}
}
static int splitHash(int hash, int capacity) {
int round = Integer.highestOneBit(capacity);
int split = capacity & ~round;
long idx = (0xFFFFFFFFl & hash) % (round);
if (idx < split) {
idx = (0xFFFFFFFFl & hash) % (round << 1);
}
return (int) idx;
}
private class HashIterator implements Iterator<ByteChunk> {
private final AtomicLongArray hashtable;
private int position = 0;
private final List<ByteChunk> buffer = new ArrayList<ByteChunk>();
public HashIterator(AtomicLongArray hashtable) {
this.hashtable = hashtable;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public ByteChunk next() {
if (hasNext()) {
return buffer.remove(0);
}
else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}