Package com.linkedin.databus.core.util

Examples of com.linkedin.databus.core.util.BufferPositionParser


    log.info("starting");
    final DbusEventBuffer dbusBuf =
        new DbusEventBuffer(TestDbusEventBuffer.getConfig(
            1200, 500, 100, 500, AllocationPolicy.HEAP_MEMORY, QueuePolicy.OVERWRITE_ON_WRITE,
            AssertLevel.ALL));
    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    log.info("append initial events");
    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(11, 1, 120, 39, events);

    // Add events to the EventBuffer. Now the buffer is full
    DbusEventAppender appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread
    log.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));
    log.info("Num buffers :" + dbusBuf.getBuffer().length);
    log.info("Buffer :" + Arrays.toString(dbusBuf.getBuffer()));

    long headPos = dbusBuf.getHead();
    long tailPos = dbusBuf.getTail();
    long headGenId = parser.bufferGenId(headPos);
    long headIndexId = parser.bufferIndex(headPos);
    long headOffset = parser.bufferOffset(headPos);
    long tailGenId = parser.bufferGenId(tailPos);
    long tailIndexId = parser.bufferIndex(tailPos);
    long tailOffset = parser.bufferOffset(tailPos);

    // current writing position should be 222 (id=1)
    assertEquals(0, headGenId);
    assertEquals(1, headIndexId);
    assertEquals(383, headOffset);
    assertEquals(1, tailGenId);
    assertEquals(1, tailIndexId);
    assertEquals(222, tailOffset);


    log.info("append event to stretch beyond limit but less than capacity");
    generator = new DbusEventGenerator(100);
    events = new Vector<DbusEvent>();
    generator.generateEvents(1, 1, 400, 150, events); // will add two event 61 + 150

    // Add events. this will cause limit increased
    appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run();
    log.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));
    log.info("Num buffers :" + dbusBuf.getBuffer().length);
    log.info("Buffer :" + Arrays.toString(dbusBuf.getBuffer()));


    headPos = dbusBuf.getHead();
    tailPos = dbusBuf.getTail();
    headGenId = parser.bufferGenId(headPos);
    headIndexId = parser.bufferIndex(headPos);
    headOffset = parser.bufferOffset(headPos);
    tailGenId = parser.bufferGenId(tailPos);
    tailIndexId = parser.bufferIndex(tailPos);
    tailOffset = parser.bufferOffset(tailPos);

    assertEquals(1, headGenId);
    assertEquals(0, headIndexId);
    assertEquals(61, headOffset);
    assertEquals(2, tailIndexId);
View Full Code Here


  {
    final DbusEventBuffer dbusBuf =
        new DbusEventBuffer(getConfig(1145,5000,100,500,AllocationPolicy.HEAP_MEMORY,
                                      QueuePolicy.OVERWRITE_ON_WRITE,
                                      AssertLevel.ALL));
    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(9, 3, 120, 39, events);

    // Add events to the EventBuffer. Now the buffer is full
    DbusEventAppender appender = new DbusEventAppender(events,dbusBuf,null);
    //Logger.getRootLogger().setLevel(Level.ALL);
    appender.run(); // running in the same thread

    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) +
             ",Tail:" + parser.toString(dbusBuf.getTail()));

    long headPos = dbusBuf.getHead();
    long tailPos = dbusBuf.getTail();
    long scnIndexHead = dbusBuf.getScnIndex().getHead();
    long scnIndexTail = dbusBuf.getScnIndex().getTail();
    long headGenId = parser.bufferGenId(headPos);
    long headIndexId = parser.bufferIndex(headPos);
    long headOffset = parser.bufferOffset(headPos);
    long tailGenId = parser.bufferGenId(tailPos);
    long tailIndexId = parser.bufferIndex(tailPos);
    long tailOffset = parser.bufferOffset(tailPos);

    assertEquals("Head GenId", 0, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 0, headOffset);
    assertEquals("Tail GenId", 0, tailGenId);
    assertEquals("Tail Index", 0, tailIndexId);
    assertEquals("Tail Offset", 1144, tailOffset);
    assertEquals("SCNIndex Head",0,scnIndexHead);
    assertEquals("SCNIndex Tail",80,scnIndexTail);

    LOG.info("ScnIndex Head is :" + scnIndexHead + ", ScnIndex Tail is :" + scnIndexTail);


    events = new Vector<DbusEvent>();
    generator = new DbusEventGenerator(100);
    /*
     * The event size is carefully created such that after adding 2nd
     * event CWP and tail points to the same location. Now the 3rd event corrupts the EVB and index (in the presence of bug).
     */

    generator.generateEvents(3, 2, 150, 89, events);

    appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread

    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    headPos = dbusBuf.getHead();
    tailPos = dbusBuf.getTail();
    headGenId = parser.bufferGenId(headPos);
    headIndexId = parser.bufferIndex(headPos);
    headOffset = parser.bufferOffset(headPos);
    tailGenId = parser.bufferGenId(tailPos);
    tailIndexId = parser.bufferIndex(tailPos);
    tailOffset = parser.bufferOffset(tailPos);
    scnIndexHead = dbusBuf.getScnIndex().getHead();
    scnIndexTail = dbusBuf.getScnIndex().getTail();
    assertEquals("Head GenId", 0, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 783, headOffset);
View Full Code Here

  public void testAppendEventOverlapNgt0() throws Exception
  {
    final DbusEventBuffer dbusBuf =
        new DbusEventBuffer(getConfig(1145,5000,100,500,AllocationPolicy.HEAP_MEMORY,
                                      QueuePolicy.OVERWRITE_ON_WRITE, AssertLevel.ALL));
    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(9, 3, 120, 39, events);

    // Add events to the EventBuffer. Now the buffer is full
    DbusEventAppender appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread

    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    long headPos = dbusBuf.getHead();
    long tailPos = dbusBuf.getTail();
    long scnIndexHead = dbusBuf.getScnIndex().getHead();
    long scnIndexTail = dbusBuf.getScnIndex().getTail();
    long headGenId = parser.bufferGenId(headPos);
    long headIndexId = parser.bufferIndex(headPos);
    long headOffset = parser.bufferOffset(headPos);
    long tailGenId = parser.bufferGenId(tailPos);
    long tailIndexId = parser.bufferIndex(tailPos);
    long tailOffset = parser.bufferOffset(tailPos);

    assertEquals("Head GenId", 0, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 0, headOffset);
    assertEquals("Tail GenId", 0, tailGenId);
    assertEquals("Tail Index", 0, tailIndexId);
    assertEquals("Tail Offset", 1144, tailOffset);
    assertEquals("SCNIndex Head",0,scnIndexHead);
    assertEquals("SCNIndex Tail",80,scnIndexTail);


    headPos = parser.setGenId(headPos, 300);
    tailPos = parser.setGenId(tailPos, 300);
    dbusBuf.setHead(headPos);
    dbusBuf.setTail(tailPos);
    dbusBuf.recreateIndex();

    events = new Vector<DbusEvent>();
    generator = new DbusEventGenerator(1000);
    /*
     * The event size is carefully created such that after adding 2nd
     * event CWP and tail points to the same location. Now the 3rd event corrupts the EVB and index (in the presence of bug).
     */
    generator.generateEvents(3, 2, 150, 89, events);

    appender = new DbusEventAppender(events,dbusBuf,null);
    LOG.info("1");
    //Logger.getRootLogger().setLevel(Level.ALL);
    appender.run(); // running in the same thread
    LOG.info("2");

    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    headPos = dbusBuf.getHead();
    tailPos = dbusBuf.getTail();
    headGenId = parser.bufferGenId(headPos);
    headIndexId = parser.bufferIndex(headPos);
    headOffset = parser.bufferOffset(headPos);
    tailGenId = parser.bufferGenId(tailPos);
    tailIndexId = parser.bufferIndex(tailPos);
    tailOffset = parser.bufferOffset(tailPos);
    scnIndexHead = dbusBuf.getScnIndex().getHead();
    scnIndexTail = dbusBuf.getScnIndex().getTail();
    assertEquals("Head GenId", 300, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 783, headOffset);
View Full Code Here

  public void testAppendEventOverlapMany() throws Exception
  {
    final DbusEventBuffer dbusBuf =
        new DbusEventBuffer(getConfig(1145,5000,100,500,AllocationPolicy.HEAP_MEMORY,
                                      QueuePolicy.OVERWRITE_ON_WRITE, AssertLevel.ALL));
    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(9, 3, 120, 39, events);

    // Add events to the EventBuffer. Now the buffer is full
    DbusEventAppender appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread

    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    long headPos = dbusBuf.getHead();
    long tailPos = dbusBuf.getTail();
    long scnIndexHead = dbusBuf.getScnIndex().getHead();
    long scnIndexTail = dbusBuf.getScnIndex().getTail();
    long headGenId = parser.bufferGenId(headPos);
    long headIndexId = parser.bufferIndex(headPos);
    long headOffset = parser.bufferOffset(headPos);
    long tailGenId = parser.bufferGenId(tailPos);
    long tailIndexId = parser.bufferIndex(tailPos);
    long tailOffset = parser.bufferOffset(tailPos);


    assertEquals("Head GenId", 0, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 0, headOffset);
    assertEquals("Tail GenId", 0, tailGenId);
    assertEquals("Tail Index", 0, tailIndexId);
    assertEquals("Tail Offset", 1144, tailOffset);
    assertEquals("SCNIndex Head",0,scnIndexHead);
    assertEquals("SCNIndex Tail",80,scnIndexTail);

    LOG.info("ScnIndex Head is :" + scnIndexHead + ", ScnIndex Tail is :" + scnIndexTail);


    /*
     * Dump lots of events
     */
    events = new Vector<DbusEvent>();
    generator = new DbusEventGenerator(100);
    generator.generateEvents(655, 3, 150, 89, events);
    appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread
    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    headPos = dbusBuf.getHead();
    tailPos = dbusBuf.getTail();
    headGenId = parser.bufferGenId(headPos);
    headIndexId = parser.bufferIndex(headPos);
    headOffset = parser.bufferOffset(headPos);
    tailGenId = parser.bufferGenId(tailPos);
    tailIndexId = parser.bufferIndex(tailPos);
    tailOffset = parser.bufferOffset(tailPos);
    scnIndexHead = dbusBuf.getScnIndex().getHead();
    scnIndexTail = dbusBuf.getScnIndex().getTail();
    assertEquals("Head GenId", 109, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 511, headOffset);
    assertEquals("Tail GenId", 110, tailGenId);
    assertEquals("Tail Index", 0, tailIndexId);
    assertEquals("Tail Offset", 211, tailOffset);
    assertEquals("SCNIndex Head",32,scnIndexHead);
    assertEquals("SCNIndex Tail",16,scnIndexTail);

    /*
     * The event size is carefully created such that after adding 2nd
     * event CWP and tail points to the same location.
     */
    events = new Vector<DbusEvent>();
    generator = new DbusEventGenerator(10000);
    generator.generateEvents(3, 5, 100, 28, events);

    appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread

    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    events = new Vector<DbusEvent>();
    generator = new DbusEventGenerator(10000);
    generator.generateEvents(3, 3, 120, 19, events);

    headPos = dbusBuf.getHead();
    tailPos = dbusBuf.getTail();
    headGenId = parser.bufferGenId(headPos);
    headIndexId = parser.bufferIndex(headPos);
    headOffset = parser.bufferOffset(headPos);
    tailGenId = parser.bufferGenId(tailPos);
    tailIndexId = parser.bufferIndex(tailPos);
    tailOffset = parser.bufferOffset(tailPos);
    scnIndexHead = dbusBuf.getScnIndex().getHead();
    scnIndexTail = dbusBuf.getScnIndex().getTail();
    assertEquals("Head GenId", 110, headGenId);
    assertEquals("Head Index", 0, headIndexId);
    assertEquals("Head Offset", 0, headOffset);
View Full Code Here

     * enough. In this case, the readEvents should block until an event is removed by the other thread.
     */
    final DbusEventBuffer dbusBuf =
        new DbusEventBuffer(getConfig(1000,1000,100,500,AllocationPolicy.HEAP_MEMORY,
                                      QueuePolicy.BLOCK_ON_WRITE, AssertLevel.NONE));
    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(11, 11, 100, 10, events);

    // Add events to the EventBuffer
    DbusEventAppender appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread

    LOG.info("Head:" + dbusBuf.getHead() + ",Tail:" +dbusBuf.getTail());
    assertEquals("Head Check",0,dbusBuf.getHead());
    assertEquals("Tail Check",903,dbusBuf.getTail());

    // Remove the first event
    DbusEventIterator itr = dbusBuf.acquireIterator("dummy");
    assertTrue(itr.hasNext());
    DbusEvent event = itr.next();
    assertTrue(event.isValid());
    itr.remove();
    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));
    assertEquals("Head Check",61,dbusBuf.getHead());
    assertEquals("Tail Check",903,dbusBuf.getTail());

    for (DbusEvent e : events)
    {
      assertTrue("invalid event", e.isValid());
    }

    // set up the ReadChannel with 2 events
    ByteArrayOutputStream oStream = new ByteArrayOutputStream();
    WritableByteChannel oChannel = Channels.newChannel(oStream);
    for (int i = 0; i < 2; ++i)
    {
      ((DbusEventInternalReadable)events.get(i)).writeTo(oChannel,Encoding.BINARY);
    }
    byte[] writeBytes = oStream.toByteArray();
    ByteArrayInputStream iStream = new ByteArrayInputStream(writeBytes);
    final ReadableByteChannel rChannel = Channels.newChannel(iStream);

    // Create a Thread to call readEvents on the channel
    Runnable writer = new Runnable() {
      @Override
      public void run()
      {
        try
        {
          dbusBuf.readEvents(rChannel);
        } catch (InvalidEventException ie) {
          ie.printStackTrace();
          throw new RuntimeException(ie);
        }
      }
    };

    Thread writerThread = new Thread(writer);
    writerThread.start();

    //Check if the thread is alive (blocked) and head/tail is not overlapped
    trySleep(1000);
    assertTrue(writerThread.isAlive());
    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));
    assertEquals("Head Check",61,dbusBuf.getHead());
    assertEquals("Tail Check",2048,dbusBuf.getTail()); //GenId set here but tail is not yet overlapped

    //Read the next event to unblock the writer
    event = itr.next();
    assertTrue(event.isValid());
    itr.remove();
    try
    {
      writerThread.join(1000);
    } catch (InterruptedException ie) {
      ie.printStackTrace();
    }
    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));

    assertFalse(writerThread.isAlive());
    assertEquals("Head Check",132,dbusBuf.getHead());
    assertEquals("Tail Check",2119,dbusBuf.getTail());

    while (itr.hasNext())
    {
      assertTrue(itr.next().isValid(true));
      itr.remove();
    }
    LOG.info("Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));
    assertEquals("Head Check",dbusBuf.getHead(),dbusBuf.getTail());
  }
View Full Code Here

    final DbusEventBuffer dbusBuf2 = new DbusEventBuffer(
      getConfig(1000,1000,100,3000,AllocationPolicy.HEAP_MEMORY,
                QueuePolicy.BLOCK_ON_WRITE, AssertLevel.NONE));
    //dbusBuf2.getLog().setLevel(Level.DEBUG);

    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    final BufferPositionParser parser2 = dbusBuf2.getBufferPositionParser();

    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(24, 24, 100, 10, events);
    log.info("Num Events :" + events.size());

    // Add events to the EventBuffer
    DbusEventAppender appender = new DbusEventAppender(events, dbusBuf, null);
    appender.run();

    final AtomicBoolean stopReader = new AtomicBoolean(false);
    log.info("dbusBuf : Head:" + parser.toString(dbusBuf.getHead()) +
             ",Tail:" + parser.toString(dbusBuf.getTail()));

    class EvbReader implements Runnable
    {
      private int _count = 0;
      public EvbReader()
      {
        _count = 0;
      }

      public int getCount()
      {
        return _count;
      }

      @Override
      public void run()
      {
        try
        {
          Thread.sleep(5*1000);
        }
        catch (InterruptedException ie)
        {
        }
        DbusEventBuffer.DbusEventIterator itr = dbusBuf2.acquireIterator("dummy");
        log.info("Reader iterator:" + itr);
        while (!stopReader.get() || itr.hasNext())
        {
          while (itr.hasNext())
          {
            itr.next();
            itr.remove();
            _count++;
          }
          itr.await(100, TimeUnit.MILLISECONDS);
        }
        log.info("Reader Thread: dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) +
                 ",Tail:" + parser2.toString(dbusBuf2.getTail()));
        ByteBuffer[] buf = dbusBuf2.getBuffer();
        log.info("Reader Thread : dbusBuf2 : Buffer :" + buf[0]);
        log.info("Count is :" + _count);
        log.info("Reader iterator:" + itr);
      }
View Full Code Here

    final DbusEventBuffer dbusBuf2 =
        new DbusEventBuffer(getConfig(2000, 2000, 100, 1000, AllocationPolicy.HEAP_MEMORY,
                            QueuePolicy.BLOCK_ON_WRITE, AssertLevel.NONE));

    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    final BufferPositionParser parser2 = dbusBuf2.getBufferPositionParser();

    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(12, 12, 100, 10, events);

    log.info("generate sample events to the EventBuffer");
    DbusEventAppender appender = new DbusEventAppender(events, dbusBuf, null);
    appender.run();

    log.info("dbusBuf : Head:" + parser.toString(dbusBuf.getHead()) + ",Tail:" + parser.toString(dbusBuf.getTail()));
    ByteBuffer[] buf = dbusBuf.getBuffer();
    byte[] b = new byte[(int)dbusBuf.getTail()];
    buf[0].position(0);
    buf[0].get(b);

    log.info("copy data to the destination buffer: 1");
    ReadableByteChannel rChannel = Channels.newChannel(new ByteArrayInputStream(b));

    dbusBuf2.readEvents(rChannel);
    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) + ",Tail:" + parser2.toString(dbusBuf2.getTail()));
    rChannel.close();

    log.info("copy data to the destination buffer: 2");
    rChannel = Channels.newChannel(new ByteArrayInputStream(b));
    dbusBuf2.readEvents(rChannel);
    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) + ",Tail:" + parser2.toString(dbusBuf2.getTail()));
    log.info("Buffer Size is :" + dbusBuf2.getBuffer().length);
    rChannel.close();

    log.info("process data in destination buffer: 1");
    DbusEventBuffer.DbusEventIterator itr = dbusBuf2.acquireIterator("dummy1");

    for (int i = 0 ; i < 15; i++)
    {
      itr.next();
      itr.remove();
    }
    itr.close();
    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) + ",Tail:" + parser2.toString(dbusBuf2.getTail()));

    log.info("copy data to the destination buffer: 3");
    rChannel = Channels.newChannel(new ByteArrayInputStream(b));
    dbusBuf2.readEvents(rChannel);
    ByteBuffer[] buf2 = dbusBuf2.getBuffer();
    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) + ",Tail:" + parser2.toString(dbusBuf2.getTail()));
    log.info("dbusBuf2 : Buffer :" + buf2[0]);
    rChannel.close();

    log.info("process data in destination buffer: 2");
    itr = dbusBuf2.acquireIterator("dummy2");
    for (int i = 0 ; i < 15; i++)
    {
      itr.next();
      itr.remove();
    }
    itr.close();
    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) +
             ",Tail:" + parser2.toString(dbusBuf2.getTail()));
    log.info("dbusBuf2 : Buffer :" + buf2[0]);

    log.info("generate more sample events to the EventBuffer");
    dbusBuf = new DbusEventBuffer(
    getConfig(2000,2000,100,500,AllocationPolicy.HEAP_MEMORY,
              QueuePolicy.BLOCK_ON_WRITE, AssertLevel.ALL));
    events = new Vector<DbusEvent>();
    generator.generateEvents(8, 9, 150, 52, events);
    log.info("Events Size is :" + events.get(0).size());
    appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run();

    final AtomicBoolean stopReader = new AtomicBoolean(false);

    Runnable reader = new Runnable()
    {
      @Override
      public void run()
      {
        try
        {
          Thread.sleep(5*1000);
        }
        catch (InterruptedException ie)
        {
        }
        DbusEventBuffer.DbusEventIterator itr =  dbusBuf2.acquireIterator("dummy3");
        log.info("Reader iterator:" + itr);
        while (!stopReader.get() || itr.hasNext())
        {
          while (itr.hasNext())
          {
            itr.next();
            itr.remove();
          }
          itr.await(100, TimeUnit.MILLISECONDS);
        }
        itr.close();
        log.info("Reader Thread: dbusBuf2 : Head:" +
                 parser2.toString(dbusBuf2.getHead()) +
                 ",Tail:" + parser2.toString(dbusBuf2.getTail()));
        ByteBuffer[] buf = dbusBuf2.getBuffer();
        log.info("Reader Tread : dbusBuf2 : Buffer :" + buf[0]);
        log.info("Reader iterator:" + itr);
      }
    };

    log.info("generate sample events to the EventBuffer");
    Thread t = new Thread(reader, "BufferOverflowReader");

    b = new byte[(int)dbusBuf.getTail()];
    buf = dbusBuf.getBuffer();
    buf[0].position(0);
    buf[0].get(b);

    log.info("copy data to the destination buffer: 4");
    log.info("Size is :" + b.length);
    rChannel = Channels.newChannel(new ByteArrayInputStream(b));
    dbusBuf2.readEvents(rChannel);    // <=== Overflow happened at this point
    rChannel.close();

    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) +
             ",Tail:" + parser2.toString(dbusBuf2.getTail()));
    log.info("dbusBuf2 : Buffer :" + buf2[0]);

    log.info("test if the readEvents can allow reader to proceed while it is blocked");
    rChannel = Channels.newChannel(new ByteArrayInputStream(b));
    log.info("start reader thread");
    t.start();
    log.info("copy data to the destination buffer: 5");
    dbusBuf2.readEvents(rChannel);
    rChannel.close();
    log.info("data copied to the destination buffer: 5");
    stopReader.set(true);

    t.join(20000);
    log.info("check if dbusBuf2 is empty");
    Assert.assertTrue(!t.isAlive());
    if (!dbusBuf2.empty())
    {
      log.error("dbusBuf2 not empty: " + dbusBuf2);
    }
    Assert.assertTrue(dbusBuf2.toString(), dbusBuf2.empty());

    log.info("dbusBuf2 : Head:" + parser2.toString(dbusBuf2.getHead()) +
             ",Tail:" + parser2.toString(dbusBuf2.getTail()));
    log.info("dbusBuf2 : Buffer :" + buf2[0]);
    log.info("done");
  }
View Full Code Here

    if (0 >= _initReadBufferSize)
    {
      throw new DatabusRuntimeException("invalid initial event staging buffer size: " + _initReadBufferSize);
    }

    _bufferPositionParser = new BufferPositionParser((int)(Math.min(_maxBufferSize, maxEventBufferSize)), buffers.size());

    _scnIndex = new ScnIndex(maxIndexSize, maxEventBufferSize, _maxBufferSize, _bufferPositionParser,
                             allocationPolicy, restoreBuffers, _mmapSessionDirectory, _assertLevel,
                             enableScnIndex, _eventFactory.getByteOrder());
View Full Code Here

    log.info("starting");
    final DbusEventBuffer dbusBuf =
        new DbusEventBuffer(TestDbusEventBuffer.getConfig(
            1200, 500, 100, 500, AllocationPolicy.HEAP_MEMORY, QueuePolicy.OVERWRITE_ON_WRITE,
            AssertLevel.ALL));
    BufferPositionParser parser = dbusBuf.getBufferPositionParser();
    log.info("append initial events");
    DbusEventGenerator generator = new DbusEventGenerator();
    Vector<DbusEvent> events = new Vector<DbusEvent>();
    generator.generateEvents(7, 1, 120, 39, events);

    // Add events to the EventBuffer. Now the buffer is full
    DbusEventAppender appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread
    log.info("Head:" + parser.toString(dbusBuf.getHead()) + ", Tail:" + parser.toString(dbusBuf.getTail()));
    log.info("Num buffers: " + dbusBuf.getBuffer().length);
    log.info("Buffer: " + Arrays.toString(dbusBuf.getBuffer()));

    long headPos = dbusBuf.getHead();
    long tailPos = dbusBuf.getTail();
    long headGenId = parser.bufferGenId(headPos);
    long headIndexId = parser.bufferIndex(headPos);
    long headOffset = parser.bufferOffset(headPos);
    long tailGenId = parser.bufferGenId(tailPos);
    long tailIndexId = parser.bufferIndex(tailPos);
    long tailOffset = parser.bufferOffset(tailPos);

    assertEquals(0, headGenId);
    assertEquals(0, headIndexId);
    assertEquals(222, headOffset);
    assertEquals(1, tailGenId);
    assertEquals(0, tailIndexId);
    assertEquals(61, tailOffset);

    log.info("append windows with one small and one big event");
    generator = new DbusEventGenerator(100);
    events = new Vector<DbusEvent>();
    generator.generateEvents(1, 1, 280, 139, events);
    generator.generateEvents(1, 1, 480, 339, events);

    // Add events to the EventBuffer. Now the buffer is full
    appender = new DbusEventAppender(events,dbusBuf,null);
    appender.run(); // running in the same thread
    log.info("Head:" + parser.toString(dbusBuf.getHead()) + ", Tail:" + parser.toString(dbusBuf.getTail()));
    log.info("Num buffers: " + dbusBuf.getBuffer().length);
    log.info("Buffer: " + Arrays.toString(dbusBuf.getBuffer()));

    headPos = dbusBuf.getHead();
    tailPos = dbusBuf.getTail();
    headGenId = parser.bufferGenId(headPos);
    headIndexId = parser.bufferIndex(headPos);
    headOffset = parser.bufferOffset(headPos);
    tailGenId = parser.bufferGenId(tailPos);
    tailIndexId = parser.bufferIndex(tailPos);
    tailOffset = parser.bufferOffset(tailPos);

    assertEquals(0, headGenId);
    assertEquals(2, headIndexId);
    assertEquals(61, headOffset);
    assertEquals(1, tailIndexId);
View Full Code Here

  {
    /**
     * Index is broken up into 3 entries for offsets
     * 0-1000, 1001-2000, 2001-3000
     */
  BufferPositionParser parser = new BufferPositionParser(3000,3);
  ScnIndex index = new ScnIndex(3 * ScnIndex.SIZE_OF_SCN_OFFSET_RECORD, 3000, 10000,
                                parser, AllocationPolicy.DIRECT_MEMORY, false, null, DEFAULT_ASSERT_LEVEL,
                                true /* enabled */, ByteOrder.BIG_ENDIAN);
  DbusEvent eopEvent = EasyMock.createNiceMock(DbusEvent.class);
  EasyMock.expect(eopEvent.isEndOfPeriodMarker()).andReturn(true).anyTimes();
View Full Code Here

TOP

Related Classes of com.linkedin.databus.core.util.BufferPositionParser

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.