/**
* Copyright (C) 2010-2013 Alibaba Group Holding Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.rocketmq.broker.slave;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.broker.BrokerController;
import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
import com.alibaba.rocketmq.common.MixAll;
import com.alibaba.rocketmq.common.constant.LoggerName;
import com.alibaba.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
import com.alibaba.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
/**
* Slave从Master同步信息(非消息)
*
* @author shijia.wxr<vintage.wang@gmail.com>
* @author manhong.yqd<manhong.yqd@taobao.com>
* @since 2013-7-8
*/
public class SlaveSynchronize {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName);
private final BrokerController brokerController;
private volatile String masterAddr = null;
public SlaveSynchronize(BrokerController brokerController) {
this.brokerController = brokerController;
}
public String getMasterAddr() {
return masterAddr;
}
public void setMasterAddr(String masterAddr) {
this.masterAddr = masterAddr;
}
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();
log.info("update slave topic config from master, {}", masterAddrBak);
}
}
catch (Exception e) {
log.error("syncTopicConfig Exception, " + masterAddrBak, e);
}
}
}
private void syncConsumerOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("update slave consumer offset from master, {}", masterAddrBak);
}
catch (Exception e) {
log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
}
}
}
private void syncDelayOffset() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
String fileName = this.brokerController.getMessageStoreConfig().getDelayOffsetStorePath();
try {
MixAll.string2File(delayOffset, fileName);
}
catch (IOException e) {
log.error("persist file Exception, " + fileName, e);
}
}
log.info("update slave delay offset from master, {}", masterAddrBak);
}
catch (Exception e) {
log.error("syncDelayOffset Exception, " + masterAddrBak, e);
}
}
}
private void syncSubscriptionGroupConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
.getAllSubscriptionGroupConfig(masterAddrBak);
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
.equals(subscriptionWrapper.getDataVersion())) {
SubscriptionGroupManager subscriptionGroupManager =
this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
log.info("update slave Subscription Group from master, {}", masterAddrBak);
}
}
catch (Exception e) {
log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
}
}
}
}