}
String instanceName = groupId +"@" + JStormUtils.process_pid();
consumer.setInstanceName(instanceName);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(config.getTopic(), config.getSubExpress());
consumer.registerMessageListener(listener);
consumer.setPullThresholdForQueue(config.getQueueSize());
consumer.setConsumeMessageBatchMaxSize(config.getSendBatchSize());
consumer.setPullBatchSize(config.getPullBatchSize());