Package com.alibaba.otter.canal.parse.inbound

Source Code of com.alibaba.otter.canal.parse.inbound.EventTransactionBufferTest

package com.alibaba.otter.canal.parse.inbound;

import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import junit.framework.Assert;

import org.junit.Test;

import com.alibaba.otter.canal.parse.inbound.EventTransactionBuffer.TransactionFlushCallback;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;

public class EventTransactionBufferTest {

    private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String messgae     = "{0} [{1}:{2}:{3}] {4}.{5}";

    @Test
    public void testTransactionFlush() {
        final int bufferSize = 64;
        final int transactionSize = 5;
        EventTransactionBuffer buffer = new EventTransactionBuffer();
        buffer.setBufferSize(bufferSize);
        buffer.setFlushCallback(new TransactionFlushCallback() {

            public void flush(List<Entry> transaction) throws InterruptedException {
                Assert.assertEquals(transactionSize, transaction.size());
                System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
                for (Entry data : transaction) {

                    CanalEntry.Header header = data.getHeader();
                    Date date = new Date(header.getExecuteTime());
                    SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
                    if (data.getEntryType() == EntryType.TRANSACTIONBEGIN
                        || data.getEntryType() == EntryType.TRANSACTIONEND) {
                        System.out.println(data.getEntryType());

                    } else {
                        System.out.println(MessageFormat.format(messgae, new Object[] {
                                Thread.currentThread().getName(), header.getLogfileName(), header.getLogfileOffset(),
                                format.format(date), header.getSchemaName(), header.getTableName() }));
                    }

                }
                System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
            }
        });
        buffer.start();

        try {
            for (int i = 0; i < transactionSize * 10; i++) {
                if (i % transactionSize == 0) {
                    buffer.add(buildEntry("1", 1L + i, 40L + i, EntryType.TRANSACTIONBEGIN));
                } else if ((i + 1) % transactionSize == 0) {
                    buffer.add(buildEntry("1", 1L + i, 40L + i, EntryType.TRANSACTIONEND));
                } else {
                    buffer.add(buildEntry("1", 1L + i, 40L + i));
                }
            }
        } catch (InterruptedException e) {
            Assert.fail(e.getMessage());
        }

        buffer.stop();
    }

    @Test
    public void testForceFlush() {
        final int bufferSize = 64;
        EventTransactionBuffer buffer = new EventTransactionBuffer();
        buffer.setBufferSize(bufferSize);
        buffer.setFlushCallback(new TransactionFlushCallback() {

            public void flush(List<Entry> transaction) throws InterruptedException {
                Assert.assertEquals(bufferSize, transaction.size());
                System.out.println("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
                for (Entry data : transaction) {

                    CanalEntry.Header header = data.getHeader();
                    Date date = new Date(header.getExecuteTime());
                    SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
                    if (data.getEntryType() == EntryType.TRANSACTIONBEGIN
                        || data.getEntryType() == EntryType.TRANSACTIONEND) {
                        // System.out.println(MessageFormat.format(messgae, new Object[] {
                        // Thread.currentThread().getName(),
                        // header.getLogfilename(), header.getLogfileoffset(), format.format(date),
                        // data.getEntry().getEntryType(), "" }));
                        System.out.println(data.getEntryType());

                    } else {
                        System.out.println(MessageFormat.format(messgae, new Object[] {
                                Thread.currentThread().getName(), header.getLogfileName(), header.getLogfileOffset(),
                                format.format(date), header.getSchemaName(), header.getTableName() }));
                    }

                }
                System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n");
            }
        });
        buffer.start();

        try {
            for (int i = 0; i < bufferSize * 2 + 1; i++) {
                buffer.add(buildEntry("1", 1L + i, 40L + i));
            }
        } catch (InterruptedException e) {
            Assert.fail(e.getMessage());
        }

        buffer.stop();
    }

    private static Entry buildEntry(String binlogFile, long offset, long timestamp) {
        Header.Builder headerBuilder = Header.newBuilder();
        headerBuilder.setLogfileName(binlogFile);
        headerBuilder.setLogfileOffset(offset);
        headerBuilder.setExecuteTime(timestamp);
        Entry.Builder entryBuilder = Entry.newBuilder();
        entryBuilder.setHeader(headerBuilder.build());
        return entryBuilder.build();
    }

    private static Entry buildEntry(String binlogFile, long offset, long timestamp, EntryType type) {
        Header.Builder headerBuilder = Header.newBuilder();
        headerBuilder.setLogfileName(binlogFile);
        headerBuilder.setLogfileOffset(offset);
        headerBuilder.setExecuteTime(timestamp);
        Entry.Builder entryBuilder = Entry.newBuilder();
        entryBuilder.setHeader(headerBuilder.build());
        entryBuilder.setEntryType(type);
        return entryBuilder.build();
    }
}
TOP

Related Classes of com.alibaba.otter.canal.parse.inbound.EventTransactionBufferTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.