Package com.alibaba.otter.canal.protocol.position

Examples of com.alibaba.otter.canal.protocol.position.LogPosition


                if (!successed) {
                    throw new CanalParseException("consume failed!");
                }

                LogPosition position = buildLastTranasctionPosition(transaction);
                if (position != null) { // 可能position为空
                    logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
                }
            }
        });
View Full Code Here


    protected LogPosition buildLastTranasctionPosition(List<CanalEntry.Entry> entries) { // 初始化一下
        for (int i = entries.size() - 1; i > 0; i--) {
            CanalEntry.Entry entry = entries.get(i);
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {// 尽量记录一个事务做为position
                LogPosition logPosition = new LogPosition();
                EntryPosition position = new EntryPosition();
                position.setJournalName(entry.getHeader().getLogfileName());
                position.setPosition(entry.getHeader().getLogfileOffset());
                position.setTimestamp(entry.getHeader().getExecuteTime());
                logPosition.setPostion(position);

                LogIdentity identity = new LogIdentity(runningInfo.getAddress(), -1L);
                logPosition.setIdentity(identity);
                return logPosition;
            }
        }

        return null;
View Full Code Here

        return null;
    }

    protected LogPosition buildLastPosition(CanalEntry.Entry entry) { // 初始化一下
        LogPosition logPosition = new LogPosition();
        EntryPosition position = new EntryPosition();
        position.setJournalName(entry.getHeader().getLogfileName());
        position.setPosition(entry.getHeader().getLogfileOffset());
        position.setTimestamp(entry.getHeader().getExecuteTime());
        logPosition.setPostion(position);

        LogIdentity identity = new LogIdentity(runningInfo.getAddress(), -1L);
        logPosition.setIdentity(identity);
        return logPosition;
    }
View Full Code Here

        executor = Executors.newScheduledThreadPool(1);
        positions = new MapMaker().makeComputingMap(new Function<String, LogPosition>() {

            public LogPosition apply(String destination) {
                LogPosition logPosition = loadDataFromFile(dataFileCaches.get(destination));
                if (logPosition == null) {
                    return nullPosition;
                } else {
                    return logPosition;
                }
View Full Code Here

        persistTasks.add(destination);// 添加到任务队列中进行触发
        super.persistLogPosition(destination, logPosition);
    }

    public LogPosition getLatestIndexBy(String destination) {
        LogPosition logPostion = super.getLatestIndexBy(destination);
        if (logPostion == nullPosition) {
            return null;
        } else {
            return logPostion;
        }
View Full Code Here

    private void flushDataToFile(String destination) {
        flushDataToFile(destination, dataFileCaches.get(destination));
    }

    private void flushDataToFile(String destination, File dataFile) {
        LogPosition position = positions.get(destination);
        if (position != null && position != nullPosition) {
            String json = JsonUtils.marshalToString(position);
            try {
                FileUtils.writeStringToFile(dataFile, json);
            } catch (IOException e) {
View Full Code Here

        }
        executor = Executors.newFixedThreadPool(1);
        positions = new MapMaker().makeComputingMap(new Function<String, LogPosition>() {

            public LogPosition apply(String destination) {
                LogPosition logPosition = zooKeeperLogPositionManager.getLatestIndexBy(destination);
                if (logPosition == null) {
                    return nullPosition;
                } else {
                    return logPosition;
                }
View Full Code Here

        });

    }

    public LogPosition getLatestIndexBy(String destination) {
        LogPosition logPosition = super.getLatestIndexBy(destination);
        if (logPosition == nullPosition) {
            return null;
        } else {
            return logPosition;
        }
View Full Code Here

        return startPosition;
    }

    protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
        MysqlConnection mysqlConnection = (MysqlConnection) connection;
        LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
        if (logPosition == null) {// 找不到历史成功记录
            EntryPosition entryPosition = null;
            if (masterInfo != null && mysqlConnection.getConnector().getAddress().equals(masterInfo.getAddress())) {
                entryPosition = masterPosition;
            } else if (standbyInfo != null
                       && mysqlConnection.getConnector().getAddress().equals(standbyInfo.getAddress())) {
                entryPosition = standbyPosition;
            }

            if (entryPosition == null) {
                entryPosition = findEndPosition(mysqlConnection); // 默认从当前最后一个位置进行消费
            }

            // 判断一下是否需要按时间订阅
            if (StringUtils.isEmpty(entryPosition.getJournalName())) {
                // 如果没有指定binlogName,尝试按照timestamp进行查找
                if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
                    logger.warn("prepare to find start position {}:{}:{}",
                        new Object[] { "", "", entryPosition.getTimestamp() });
                    return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
                } else {
                    logger.warn("prepare to find start position just show master status");
                    return findEndPosition(mysqlConnection); // 默认从当前最后一个位置进行消费
                }
            } else {
                if (entryPosition.getPosition() != null && entryPosition.getPosition() > 0L) {
                    // 如果指定binlogName + offest,直接返回
                    logger.warn("prepare to find start position {}:{}:{}",
                        new Object[] { entryPosition.getJournalName(), entryPosition.getPosition(), "" });
                    return entryPosition;
                } else {
                    EntryPosition specificLogFilePosition = null;
                    if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
                        // 如果指定binlogName +
                        // timestamp,但没有指定对应的offest,尝试根据时间找一下offest
                        EntryPosition endPosition = findEndPosition(mysqlConnection);
                        if (endPosition != null) {
                            logger.warn("prepare to find start position {}:{}:{}",
                                new Object[] { entryPosition.getJournalName(), "", entryPosition.getTimestamp() });
                            specificLogFilePosition = findAsPerTimestampInSpecificLogFile(mysqlConnection,
                                entryPosition.getTimestamp(),
                                endPosition,
                                entryPosition.getJournalName());
                        }
                    }

                    if (specificLogFilePosition == null) {
                        // position不存在,从文件头开始
                        entryPosition.setPosition(BINLOG_START_OFFEST);
                        return entryPosition;
                    } else {
                        return specificLogFilePosition;
                    }
                }
            }
        } else {
            if (logPosition.getIdentity().getSourceAddress().equals(mysqlConnection.getConnector().getAddress())) {
                logger.warn("prepare to find start position just last position");
                return logPosition.getPostion();
            } else {
                // 针对切换的情况,考虑回退时间
                long newStartTimestamp = logPosition.getPostion().getTimestamp() - fallbackIntervalInSeconds * 1000;
                logger.warn("prepare to find start position by switch {}:{}:{}", new Object[] { "", "",
                        logPosition.getPostion().getTimestamp() });
                return findByStartTimeStamp(mysqlConnection, newStartTimestamp);
            }
        }
    }
View Full Code Here

    private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
                                                              final Long startTimestamp,
                                                              final EntryPosition endPosition,
                                                              final String searchBinlogFile) {

        final LogPosition logPosition = new LogPosition();
        try {
            mysqlConnection.reconnect();
            // 开始遍历文件
            mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {

                private LogPosition lastPosition;

                public boolean sink(LogEvent event) {
                    EntryPosition entryPosition = null;
                    try {
                        CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
                        if (entry == null) {
                            return true;
                        }

                        String logfilename = entry.getHeader().getLogfileName();
                        Long logfileoffset = entry.getHeader().getLogfileOffset();
                        Long logposTimestamp = entry.getHeader().getExecuteTime();

                        if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
                            logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
                                    logfilename, logfileoffset, logposTimestamp, startTimestamp });
                            // 寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
                            if (logposTimestamp >= startTimestamp) {
                                return false;
                            }
                        }

                        if (StringUtils.equals(endPosition.getJournalName(), logfilename)
                            && endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
                            return false;
                        }

                        // 记录一下上一个事务结束的位置,即下一个事务的position
                        // position = current +
                        // data.length,代表该事务的下一条offest,避免多余的事务重复
                        if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                            entryPosition = new EntryPosition(logfilename,
                                logfileoffset + event.getEventLen(),
                                logposTimestamp);
                            logger.debug("set {} to be pending start position before finding another proper one...",
                                entryPosition);
                            logPosition.setPostion(entryPosition);
                        }

                        lastPosition = buildLastPosition(entry);
                    } catch (Exception e) {
                        processError(e, lastPosition, searchBinlogFile, 4L);
                    }

                    return running;
                }
            });

        } catch (IOException e) {
            logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
        }

        if (logPosition.getPostion() != null) {
            return logPosition.getPostion();
        } else {
            return null;
        }
    }
View Full Code Here

TOP

Related Classes of com.alibaba.otter.canal.protocol.position.LogPosition

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.