private final Cluster cluster;
private final Map<String, Errors> errors;
public MetadataResponse(Cluster cluster) {
super(new Struct(curSchema));
List<Struct> brokerArray = new ArrayList<Struct>();
for (Node node: cluster.nodes()) {
Struct broker = struct.instance(BROKERS_KEY_NAME);
broker.set(NODE_ID_KEY_NAME, node.id());
broker.set(HOST_KEY_NAME, node.host());
broker.set(PORT_KEY_NAME, node.port());
brokerArray.add(broker);
}
struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
List<Struct> topicArray = new ArrayList<Struct>();
for (String topic: cluster.topics()) {
Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
topicData.set(TOPIC_ERROR_CODE_KEY_NAME, (short)0); // no error
topicData.set(TOPIC_KEY_NAME, topic);
List<Struct> partitionArray = new ArrayList<Struct>();
for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, (short)0); // no error
partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
ArrayList<Integer> replicas = new ArrayList<Integer>();
for (Node node: fetchPartitionData.replicas())
replicas.add(node.id());
partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
ArrayList<Integer> isr = new ArrayList<Integer>();
for (Node node: fetchPartitionData.inSyncReplicas())
isr.add(node.id());
partitionData.set(ISR_KEY_NAME, isr.toArray());
partitionArray.add(partitionData);
}
topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
topicArray.add(topicData);
}