jsonMapper.registerSubtypes(new NamedType(KafkaSink.class, "kafka"));
KafkaSink sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
sink.open();
Iterator<Message> msgIterator = new MessageSetReader(TestKafkaSink.createMessageSet(topicName, 2)).iterator();
while (msgIterator.hasNext()) {
sink.writeTo(new StringMessage(msgIterator.next()));
}
sink.close();
}
}