public void init(final MetaMorphosisBroker metaMorphosisBroker, final Properties props) {
final MetaConfig metaConfig = metaMorphosisBroker.getMetaConfig();
// Added slave status topic for master to check it.
TopicConfig topicConfig = new TopicConfig(Constants.TEST_SLAVE_TOPIC, metaConfig);
topicConfig.setNumPartitions(1);
metaConfig.addTopic(Constants.TEST_SLAVE_TOPIC, topicConfig);
// slave��ע�ᵽzk,ǿ��
metaMorphosisBroker.getBrokerZooKeeper().getZkConfig().zkEnable = false;
this.orderedPutExecutor =
new OrderedThreadPoolExecutor(metaConfig.getPutProcessThreadCount(),
metaConfig.getPutProcessThreadCount(), 60, TimeUnit.SECONDS, new NamedThreadFactory("putProcessor"));