}
}
private void removeQueuedTupleEntry() {
TupleEntry tupleEntry = pmem.getTupleQueue().remove();
PropagationContext originalPctx = tupleEntry.getPropagationContext();
boolean repeat = true;
while (repeat) {
if (log.isTraceEnabled()) {
log.trace("Stream removed entry {} {} size {}", System.identityHashCode(pmem.getTupleQueue()), tupleEntry, pmem.getTupleQueue().size());
}
if (tupleEntry.getLeftTuple() != null) {
SegmentMemory sm = tupleEntry.getNodeMemory().getSegmentMemory();
LeftTupleSets tuples = sm.getStagedLeftTuples();
tupleEntry.getLeftTuple().setPropagationContext(tupleEntry.getPropagationContext());
switch (tupleEntry.getPropagationType()) {
case PropagationContext.INSERTION:
case PropagationContext.RULE_ADDITION:
tuples.addInsert(tupleEntry.getLeftTuple());
break;
case PropagationContext.MODIFICATION:
tuples.addUpdate(tupleEntry.getLeftTuple());
break;
case PropagationContext.DELETION:
case PropagationContext.EXPIRATION:
case PropagationContext.RULE_REMOVAL:
tuples.addDelete(tupleEntry.getLeftTuple());
break;
}
} else {
BetaMemory bm = (BetaMemory) tupleEntry.getNodeMemory();
tupleEntry.getRightTuple().setPropagationContext(tupleEntry.getPropagationContext());
switch (tupleEntry.getPropagationType()) {
case PropagationContext.INSERTION:
case PropagationContext.RULE_ADDITION:
bm.getStagedRightTuples().addInsert(tupleEntry.getRightTuple());
break;
case PropagationContext.MODIFICATION:
bm.getStagedRightTuples().addUpdate(tupleEntry.getRightTuple());
break;
case PropagationContext.DELETION:
case PropagationContext.EXPIRATION:
case PropagationContext.RULE_REMOVAL:
bm.getStagedRightTuples().addDelete(tupleEntry.getRightTuple());
break;
}
}
if (!pmem.getTupleQueue().isEmpty()) {
tupleEntry = pmem.getTupleQueue().peek();
PropagationContext pctx = tupleEntry.getPropagationContext();
// repeat if either the pctx number is the same, or the event time is the same or before
if (pctx.getPropagationNumber() == originalPctx.getPropagationNumber()) {
repeat = true;
} else {
repeat = false;
}