logger.info("init eventSink end! \n\t load CanalEventSink:{}", eventSink.getClass().getName());
}
protected void initEventParser() {
logger.info("init eventParser begin...");
SourcingType type = parameters.getSourcingType();
List<List<DataSourcing>> groupDbAddresses = parameters.getGroupDbAddresses();
if (!CollectionUtils.isEmpty(groupDbAddresses)) {
int size = groupDbAddresses.get(0).size();// 取第一个分组的数量,主备分组的数量必须一致
List<CanalEventParser> eventParsers = new ArrayList<CanalEventParser>();
for (int i = 0; i < size; i++) {
List<InetSocketAddress> dbAddress = new ArrayList<InetSocketAddress>();
SourcingType lastType = null;
for (List<DataSourcing> groupDbAddress : groupDbAddresses) {
if (lastType != null && !lastType.equals(groupDbAddress.get(i).getType())) {
throw new CanalException(String.format("master/slave Sourcing type is unmatch. %s vs %s",
lastType, groupDbAddress.get(i).getType()));
}
lastType = groupDbAddress.get(i).getType();