Package com.datasift.dropwizard.kafka

Source Code of com.datasift.dropwizard.kafka.KafkaProducerFactory

package com.datasift.dropwizard.kafka;

import com.datasift.dropwizard.kafka.producer.InstrumentedProducer;
import com.datasift.dropwizard.kafka.producer.KafkaProducer;
import com.datasift.dropwizard.kafka.producer.ManagedProducer;
import com.datasift.dropwizard.kafka.producer.ProxyProducer;
import com.datasift.dropwizard.kafka.util.Compression;
import com.datasift.dropwizard.zookeeper.ZooKeeperFactory;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.dropwizard.setup.Environment;
import io.dropwizard.util.Duration;
import io.dropwizard.util.Size;
import io.dropwizard.validation.ValidationMethod;
import kafka.javaapi.producer.Producer;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.serializer.Encoder;

import javax.validation.Valid;
import javax.validation.constraints.Min;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Properties;

public class KafkaProducerFactory extends KafkaClientFactory {

    static final int DEFAULT_BROKER_PORT = 9092;

    @Valid
    private Optional<ZooKeeperFactory> zookeeper = Optional.absent();

    @Valid
    private ImmutableMap<Integer, InetSocketAddress> brokers = ImmutableMap.of();

    private boolean async = false;

    private Size sendBufferSize = Size.kilobytes(100);

    private Duration connectionTimeout = Duration.seconds(5);

    @Min(1)
    private long reconnectInterval = 30000;

    private Optional<Duration> reconnectTimeInterval = Optional.of(Duration.seconds(10000));

    private Size maxMessageSize = Size.megabytes(1);

    private Compression compression = Compression.parse("none");

    private ImmutableSet<String> compressedTopics = ImmutableSet.of();

    private int zookeeperReadRetries = 3;

    private Duration queueTime = Duration.seconds(5);

    private long queueSize = 10000;

    private long batchSize = 200;

    @JsonIgnore
    @ValidationMethod(message = "only one of 'zookeeper' and 'brokers' may be set")
    public boolean isOneDiscoveryType() {
        return zookeeper.isPresent() ^ (!brokers.isEmpty());
    }

    @JsonIgnore
    @ValidationMethod(message = "one of 'zookeeper' and 'brokers' must be set")
    public boolean isZookeeperOrBrokers() {
        return zookeeper.isPresent() || (!brokers.isEmpty());
    }

    @JsonProperty
    public Optional<ZooKeeperFactory> getZookeeper() {
        return zookeeper;
    }

    @JsonProperty
    public void setZookeeper(final Optional<ZooKeeperFactory> zookeeper) {
        this.zookeeper = zookeeper;
    }

    @JsonProperty
    public ImmutableMap<Integer, InetSocketAddress> getBrokers() {
        return brokers;
    }

    @JsonProperty
    public void setBrokers(final ImmutableMap<Integer, InetSocketAddress> brokers) {
        this.brokers = brokers;
    }

    @JsonProperty
    public boolean isAsync() {
        return async;
    }

    @JsonProperty
    public void setAsync(final boolean async) {
        this.async = async;
    }

    @JsonProperty
    public Size getSendBufferSize() {
        return sendBufferSize;
    }

    @JsonProperty
    public void setSendBufferSize(final Size sendBufferSize) {
        this.sendBufferSize = sendBufferSize;
    }

    @JsonProperty
    public Duration getConnectionTimeout() {
        return connectionTimeout;
    }

    @JsonProperty
    public void setConnectionTimeout(final Duration connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    @JsonProperty
    public long getReconnectInterval() {
        return reconnectInterval;
    }

    @JsonProperty
    public void setReconnectInterval(final long reconnectInterval) {
        this.reconnectInterval = reconnectInterval;
    }

    @JsonProperty
    public Optional<Duration> getReconnectTimeInterval() {
        return reconnectTimeInterval;
    }

    @JsonProperty
    public void setReconnectTimeInterval(final Optional<Duration> reconnectTimeInterval) {
        this.reconnectTimeInterval = reconnectTimeInterval;
    }

    @JsonProperty
    public Size getMaxMessageSize() {
        return maxMessageSize;
    }

    @JsonProperty
    public void setMaxMessageSize(final Size maxMessageSize) {
        this.maxMessageSize = maxMessageSize;
    }

    @JsonProperty
    public Compression getCompression() {
        return compression;
    }

    @JsonProperty
    public void setCompression(final Compression compression) {
        this.compression = compression;
    }

    @JsonProperty
    public ImmutableSet<String> getCompressedTopics() {
        return compressedTopics;
    }

    @JsonProperty
    public void setCompressedTopics(final ImmutableSet<String> compressedTopics) {
        this.compressedTopics = compressedTopics;
    }

    @JsonProperty
    public int getZookeeperReadRetries() {
        return zookeeperReadRetries;
    }

    @JsonProperty
    public void setZookeeperReadRetries(final int zookeeperReadRetries) {
        this.zookeeperReadRetries = zookeeperReadRetries;
    }

    @JsonProperty
    public Duration getQueueTime() {
        return queueTime;
    }

    @JsonProperty
    public void setQueueTime(final Duration queueTime) {
        this.queueTime = queueTime;
    }

    @JsonProperty
    public long getQueueSize() {
        return queueSize;
    }

    @JsonProperty
    public void setQueueSize(final long queueSize) {
        this.queueSize = queueSize;
    }

    @JsonProperty
    public long getBatchSize() {
        return batchSize;
    }

    @JsonProperty
    public void setBatchSize(final long batchSize) {
        this.batchSize = batchSize;
    }

    public <K, T> KafkaProducer<K, T> build(final Encoder<T> encoder,
                                            final Environment environment,
                                            final String name) {
        return build(encoder, null, environment, name);
    }

    public <K, T> KafkaProducer<K, T> build(final Encoder<T> encoder,
                                            final Partitioner<K> partitioner,
                                            final Environment environment,
                                            final String name) {
        final KafkaProducer<K, T> producer = build(encoder, partitioner);
        environment.lifecycle().manage(new ManagedProducer(producer));
        return new InstrumentedProducer<>(producer, environment.metrics(), name);
    }

    public <K, T> KafkaProducer<K, T> build(final Encoder<T> encoder) {
        return build(encoder, null);
    }

    public <K, T> KafkaProducer<K, T> build(final Encoder<T> encoder,
                                            final Partitioner<K> partitioner) {
        return new ProxyProducer<>(new Producer<>(
                toProducerConfig(this, encoder, partitioner),
                encoder,
                null,
                null,
                partitioner));
    }

    static ProducerConfig toProducerConfig(final KafkaProducerFactory factory,
                                           final Encoder<?> encoder,
                                           final Partitioner<?> partitioner) {
        final Properties props = new Properties();

        props.setProperty("serializer.class", encoder.getClass().getCanonicalName());

        if (partitioner != null && factory.getBrokers().isEmpty()) {
            props.setProperty("partitioner.class", partitioner.getClass().getCanonicalName());
        }

        props.setProperty("producer.type", factory.isAsync() ? "async" : "sync");

        final Optional<ZooKeeperFactory> zooKeeperFactory = factory.getZookeeper();
        if (zooKeeperFactory.isPresent()) {
            final ZooKeeperFactory zk = zooKeeperFactory.get();
            props.setProperty("zk.connect", zk.getQuorumSpec() + zk.getNamespace());
        } else {
            final StringBuilder sb = new StringBuilder(10*factory.getBrokers().size());
            for (final ImmutableMap.Entry<Integer, InetSocketAddress> e : factory.getBrokers().entrySet()) {
                final String host = e.getValue().getHostString();
                final int port = e.getValue().getPort() == 0 ? DEFAULT_BROKER_PORT : e.getValue().getPort();
                sb.append(e.getKey()).append(':').append(host).append(':').append(port).append(',');
            }
            props.setProperty("broker.list", sb.substring(0, sb.length() - 1));
        }

        props.setProperty("buffer.size", Long.toString(factory.getSendBufferSize().toBytes()));
        props.setProperty("connect.timeout.ms",
                Long.toString(factory.getConnectionTimeout().toMilliseconds()));
        props.setProperty("socket.timeout.ms",
                Long.toString(factory.getSocketTimeout().toMilliseconds()));
        props.setProperty("reconnect.interval", Long.toString(factory.reconnectInterval));
        props.setProperty("reconnect.time.interval.ms",
                Long.toString(factory.reconnectTimeInterval
                        .or(Duration.milliseconds(-1)).toMilliseconds()));
        props.setProperty("max.message.size", Long.toString(factory.maxMessageSize.toBytes()));
        props.setProperty("compression.codec",
                Integer.toString(factory.compression.getCodec().codec()));
        props.setProperty("zk.read.num.retries",
                Integer.toString(factory.getZookeeperReadRetries()));

        final ImmutableSet<String> compressedTopics = factory.getCompressedTopics();
        if (!compressedTopics.isEmpty()) {
            props.setProperty("compressed.topics", Joiner.on(',').join(compressedTopics));
        }

        if (factory.isAsync()) {
            props.setProperty("queue.time", Long.toString(factory.getQueueTime().toMilliseconds()));
            props.setProperty("queue.size", Long.toString(factory.getQueueSize()));
            props.setProperty("batch.size", Long.toString(factory.getBatchSize()));
        }

        return new ProducerConfig(props);
    }
}
TOP

Related Classes of com.datasift.dropwizard.kafka.KafkaProducerFactory

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.