package com.alibaba.otter.canal.store.memory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.protocol.position.PositionRange;
import com.alibaba.otter.canal.store.AbstractCanalStoreScavenge;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.CanalStoreException;
import com.alibaba.otter.canal.store.CanalStoreScavenge;
import com.alibaba.otter.canal.store.helper.CanalEventUtils;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.canal.store.model.Events;
/**
* 基于内存buffer构建内存memory store
*
* @author jianghang 2012-6-20 上午09:46:31
* @version 1.0.0
*/
public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {
private static final long INIT_SQEUENCE = -1;
private int bufferSize = 16 * 1024;
private int indexMask;
private Event[] entries;
// 记录下put/get/ack操作的三个下标
private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置
private AtomicLong getSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前get操作读取的最后一条的位置
private AtomicLong ackSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前ack操作的最后一条的位置
// 阻塞put/get操作控制信号
private ReentrantLock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public void start() throws CanalStoreException {
super.start();
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
indexMask = bufferSize - 1;
entries = new Event[bufferSize];
}
public void stop() throws CanalStoreException {
super.stop();
cleanAll();
}
public void put(List<Event> data) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return;
}
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkFreeSlotAt(putSequence.get() + data.size())) { // 检查是否有空位
notFull.await(); // wait until not full
}
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
doPut(data);
if (Thread.interrupted()) {
throw new InterruptedException();
}
} finally {
lock.unlock();
}
}
public boolean put(List<Event> data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (checkFreeSlotAt(putSequence.get() + data.size())) {
doPut(data);
return true;
}
if (nanos <= 0) {
return false;
}
try {
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
public boolean tryPut(List<Event> data) throws CanalStoreException {
if (data == null || data.isEmpty()) {
return true;
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (!checkFreeSlotAt(putSequence.get() + data.size())) {
return false;
} else {
doPut(data);
return true;
}
} finally {
lock.unlock();
}
}
public void put(Event data) throws InterruptedException, CanalStoreException {
put(Arrays.asList(data));
}
public boolean put(Event data, long timeout, TimeUnit unit) throws InterruptedException, CanalStoreException {
return put(Arrays.asList(data), timeout, unit);
}
public boolean tryPut(Event data) throws CanalStoreException {
return tryPut(Arrays.asList(data));
}
/**
* 执行具体的put操作
*/
private void doPut(List<Event> data) {
long current = putSequence.get();
long end = current + data.size();
// 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值
for (long next = current + 1; next <= end; next++) {
entries[getIndex(next)] = data.get((int) (next - current - 1));
}
putSequence.set(end);
// tell other threads that store is not empty
notEmpty.signal();
}
public Events<Event> get(Position start, int batchSize) throws InterruptedException, CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (!checkUnGetSlotAt((LogPosition) start, batchSize))
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
return doGet(start, batchSize);
} finally {
lock.unlock();
}
}
public Events<Event> get(Position start, int batchSize, long timeout, TimeUnit unit) throws InterruptedException,
CanalStoreException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (checkUnGetSlotAt((LogPosition) start, batchSize)) {
return doGet(start, batchSize);
}
if (nanos <= 0) {
// 如果时间到了,有多少取多少
return doGet(start, batchSize);
}
try {
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
}
} finally {
lock.unlock();
}
}
public Events<Event> tryGet(Position start, int batchSize) throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (checkUnGetSlotAt((LogPosition) start, 1)) {// 注意这里直接使用get操作,有数据就取
return doGet(start, batchSize);
} else {
return new Events<Event>();
}
} finally {
lock.unlock();
}
}
private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {
LogPosition startPosition = (LogPosition) start;
long current = getSequence.get();
long maxAbleSequence = putSequence.get();
long next = current;
// 如果startPosition为null,说明是第一次,默认+1处理
if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
next = next + 1;
}
if (current >= maxAbleSequence) {
return new Events<Event>();
}
long end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence;
Events<Event> result = new Events<Event>();
if (getSequence.compareAndSet(current, end)) {
// 提取数据并返回
List<Event> entrys = result.getEvents();
for (; next <= end; next++) {
entrys.add(entries[getIndex(next)]);
}
PositionRange<LogPosition> range = new PositionRange<LogPosition>();
result.setPositionRange(range);
range.setStart(CanalEventUtils.createPosition(entrys.get(0)));
range.setEnd(CanalEventUtils.createPosition(entrys.get(result.getEvents().size() - 1)));
// 记录一下是否存在可以被ack的点
for (int i = entrys.size() - 1; i >= 0; i--) {
Event event = entrys.get(i);
if (CanalEntry.EntryType.TRANSACTIONBEGIN == event.getEntry().getEntryType()
|| CanalEntry.EntryType.TRANSACTIONEND == event.getEntry().getEntryType()) {
// 将事务头/尾设置可被为ack的点
range.setAck(CanalEventUtils.createPosition(event));
break;
}
}
notFull.signal();
}
return result;
}
public LogPosition getFirstPosition() throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
long firstSeqeuence = ackSequence.get();
if (firstSeqeuence == INIT_SQEUENCE && firstSeqeuence < putSequence.get()) {
// 没有ack过数据
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack为-1,需要移动到下一条,included = false
return CanalEventUtils.createPosition(event, false);
} else if (firstSeqeuence > INIT_SQEUENCE && firstSeqeuence < putSequence.get()) {
// ack未追上put操作
Event event = entries[getIndex(firstSeqeuence + 1)]; // 最后一次ack的位置数据 + 1
return CanalEventUtils.createPosition(event, true);
} else if (firstSeqeuence > INIT_SQEUENCE && firstSeqeuence == putSequence.get()) {
// 已经追上,store中没有数据
Event event = entries[getIndex(firstSeqeuence)]; // 最后一次ack的位置数据,和last为同一条,included = false
return CanalEventUtils.createPosition(event, false);
} else {
// 没有任何数据
return null;
}
} finally {
lock.unlock();
}
}
public LogPosition getLatestPosition() throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
long latestSequence = putSequence.get();
if (latestSequence > INIT_SQEUENCE && latestSequence != ackSequence.get()) {
Event event = entries[(int) putSequence.get() & indexMask]; // 最后一次写入的数据,最后一条未消费的数据
return CanalEventUtils.createPosition(event, true);
} else if (latestSequence > INIT_SQEUENCE && latestSequence == ackSequence.get()) {
// ack已经追上了put操作
Event event = entries[(int) putSequence.get() & indexMask]; // 最后一次写入的数据,included = false
return CanalEventUtils.createPosition(event, false);
} else {
// 没有任何数据
return null;
}
} finally {
lock.unlock();
}
}
public void ack(Position position) throws CanalStoreException {
cleanUntil(position);
}
public void cleanUntil(Position position) throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
long sequence = ackSequence.get();
long maxSequence = getSequence.get();
boolean hasMatch = false;
for (long next = sequence + 1; next <= maxSequence; next++) {
Event event = entries[getIndex(next)];
boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position);
if (match) {// 找到对应的position,更新ack seq
hasMatch = true;
if (ackSequence.compareAndSet(sequence, next)) {// 避免并发ack
notFull.signal();
return;
}
}
}
if (!hasMatch) {// 找不到对应需要ack的position
throw new CanalStoreException("no match ack position" + position.toString());
}
} finally {
lock.unlock();
}
}
public void rollback() throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
getSequence.set(ackSequence.get());
} finally {
lock.unlock();
}
}
public void cleanAll() throws CanalStoreException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
putSequence.set(INIT_SQEUENCE);
getSequence.set(INIT_SQEUENCE);
ackSequence.set(INIT_SQEUENCE);
entries = null;
// for (int i = 0; i < entries.length; i++) {
// entries[i] = null;
// }
} finally {
lock.unlock();
}
}
// =================== helper method =================
private long getMinimumGetOrAck() {
long get = getSequence.get();
long ack = ackSequence.get();
return ack <= get ? ack : get;
}
/**
* 查询是否有空位
*/
private boolean checkFreeSlotAt(final long sequence) {
final long wrapPoint = sequence - bufferSize;
if (wrapPoint > getMinimumGetOrAck()) { // 刚好追上一轮
return false;
} else {
return true;
}
}
/**
* 检查是否存在需要get的数据,并且数量>=batchSize
*/
private boolean checkUnGetSlotAt(LogPosition startPosition, int batchSize) {
long current = getSequence.get();
long maxAbleSequence = putSequence.get();
long next = current;
if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录
next = next + 1;// 少一条数据
}
if (current < maxAbleSequence && next + batchSize - 1 <= maxAbleSequence) {
return true;
} else {
return false;
}
}
private int getIndex(long sequcnce) {
return (int) sequcnce & indexMask;
}
// ================ setter / getter ==================
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}