Package com.datasift.dropwizard.hbase

Source Code of com.datasift.dropwizard.hbase.HBaseClientFactory

package com.datasift.dropwizard.hbase;

import io.dropwizard.util.Duration;
import io.dropwizard.util.Size;
import com.codahale.metrics.MetricRegistry;
import com.datasift.dropwizard.zookeeper.ZooKeeperFactory;
import io.dropwizard.setup.Environment;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

/**
* A factory for creating and managing {@link HBaseClient} instances.
* <p/>
* The resulting {@link HBaseClient} will have its lifecycle managed by an {@link Environment} and
* will have {@link com.codahale.metrics.health.HealthCheck}s installed for the {@code .META.} and
* {@code -ROOT-} tables.
*
* @see HBaseClient
*/
public class HBaseClientFactory {

    private static final String DEFAULT_NAME = "hbase-default";

    @NotNull
    @Valid
    protected ZooKeeperFactory zookeeper = new ZooKeeperFactory();

    @NotNull
    protected Duration flushInterval = Duration.seconds(1);

    @NotNull
    protected Size incrementBufferSize = Size.kilobytes(64);

    @Min(0)
    protected int maxConcurrentRequests = 0;

    @NotNull
    protected Duration connectionTimeout = Duration.seconds(5);

    protected boolean instrumented = true;

    /**
     * Returns the ZooKeeper quorum co-ordinating the HBase cluster.
     *
     * @return the factory for connecting to the ZooKeeper quorum co-ordinating the HBase cluster.
     */
    @JsonProperty
    public ZooKeeperFactory getZookeeper() {
        return zookeeper;
    }

    /**
     * Sets the ZooKeeper quorum co-ordinating the HBase cluster.
     *
     * @param factory a factory for the ZooKeeper quorum co-ordinating the HBase cluster.
     */
    @JsonProperty
    public void setZookeeper(final ZooKeeperFactory factory) {
        this.zookeeper = factory;
    }

    /**
     * Returns the maximum amount of time requests may be buffered client-side before sending them
     * to the server.
     *
     * @return the maximum amount of time requests may be buffered.
     *
     * @see org.hbase.async.HBaseClient#getFlushInterval()
     */
    @JsonProperty
    public Duration getFlushInterval() {
        return flushInterval;
    }

    /**
     * Sets the maximum amount of time requests may be buffered client-side before sending them
     * to the server.
     *
     * @param flushInterval the maximum amount of time requests may be buffered.
     *
     * @see org.hbase.async.HBaseClient#setFlushInterval(short)
     */
    @JsonProperty
    public void setFlushInterval(final Duration flushInterval) {
        this.flushInterval = flushInterval;
    }

    /**
     * Returns the maximum size of the buffer for increment operations.
     * <p/>
     * Once this buffer is full, a flush is forced irrespective of the {@link #getFlushInterval()
     * flushInterval}.
     *
     * @return the maximum number of increments to buffer.
     *
     * @see org.hbase.async.HBaseClient#getIncrementBufferSize()
     */
    @JsonProperty
    public Size getIncrementBufferSize() {
        return incrementBufferSize;
    }

    /**
     * Sets the maximum size of the buffer for increment operations.
     * <p/>
     * Once this buffer is full, a flush is forced irrespective of the {@link #getFlushInterval()
     * flushInterval}.
     *
     * @param incrementBufferSize the maximum number of increments to buffer.
     *
     * @see org.hbase.async.HBaseClient#setIncrementBufferSize(int)
     */
    @JsonProperty
    public void setIncrementBufferSize(final Size incrementBufferSize) {
        this.incrementBufferSize = incrementBufferSize;
    }

    /**
     * Returns maximum number of concurrent asynchronous requests for the client.
     * <p/>
     * Useful for throttling high-throughput applications when HBase is the bottle-neck to prevent
     * the client running out of memory.
     * <p/>
     * With this is zero ("0"), no limit will be placed on the number of concurrent asynchronous
     * requests.
     *
     * @return the maximum number of requests that may be executing concurrently.
     *
     * @see BoundedHBaseClient
     */
    @JsonProperty
    public int getMaxConcurrentRequests() {
        return maxConcurrentRequests;
    }

    /**
     * Sets the maximum number of concurrent asynchronous requests for the client.
     * <p/>
     * Useful for throttling high-throughput applications when HBase is the bottle-neck to prevent
     * the client running out of memory.
     * <p/>
     * With this is zero ("0"), no limit will be placed on the number of concurrent asynchronous
     * requests.
     *
     * @param maxConcurrentRequests the maximum number of requests that may execute concurrently.
     *
     * @see BoundedHBaseClient
     */
    @JsonProperty
    public void setMaxConcurrentRequests(final int maxConcurrentRequests) {
        this.maxConcurrentRequests = maxConcurrentRequests;
    }

    /**
     * Returns the maximum time to wait for a connection to a region server before failing.
     *
     * @return the maximum time to spend connecting to a server before failing.
     */
    @JsonProperty
    public Duration getConnectionTimeout() {
        return connectionTimeout;
    }

    /**
     * Returns the maximum time to wait for a connection to a region server before failing.
     *
     * @param connectionTimeout the maximum time to spend connecting to a server before failing.
     */
    @JsonProperty
    public void setConnectionTimeout(final Duration connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    /**
     * Returns whether the {@link HBaseClient} should be instrumented with metrics.
     *
     * @return whether the {@link HBaseClient} should be instrumented with metrics.
     */
    @JsonProperty
    public boolean isInstrumented() {
        return instrumented;
    }

    /**
     * Sets whether the {@link HBaseClient} should be instrumented with metrics.
     *
     * @param isInstrumented whether the {@link HBaseClient} should be instrumented with metrics.
     */
    @JsonProperty
    public void setInstrumented(final boolean isInstrumented) {
        this.instrumented = isInstrumented;
    }

    /**
     * Builds a default {@link HBaseClient} instance from the specified {@link
     * HBaseClientFactory}.
     *
     * @param environment the {@link Environment} to build {@link HBaseClient} instances for.
     * @return an {@link HBaseClient}, managed and configured according to the {@code configuration}
     */
    public HBaseClient build(final Environment environment) {
        return build(environment, DEFAULT_NAME);
    }

    /**
     * Builds an {@link HBaseClient} instance from the specified {@link HBaseClientFactory}
     * with the given {@code name}.
     *
     * @param environment the {@link Environment} to build {@link HBaseClient} instances for.
     * @param name the name for the {@link HBaseClient}.
     *
     * @return an {@link HBaseClient}, managed and configured according to the {@code
     *         configuration}.
     */
    public HBaseClient build(final Environment environment, final String name) {
        final ZooKeeperFactory zkFactory = getZookeeper();

        final HBaseClient proxy = new HBaseClientProxy(
                new org.hbase.async.HBaseClient(zkFactory.getQuorumSpec(), zkFactory.getNamespace()));

        // optionally instrument and bound requests for the client
        final HBaseClient client = instrument(boundRequests(proxy), environment.metrics(), name);

        // configure client
        client.setFlushInterval(getFlushInterval());
        client.setIncrementBufferSize(getIncrementBufferSize());

        // add healthchecks for META and ROOT tables
        environment.healthChecks().register(name + "-meta", new HBaseHealthCheck(client, ".META."));
        environment.healthChecks().register(name + "-root", new HBaseHealthCheck(client, "-ROOT-"));

        // manage client
        environment.lifecycle().manage(new ManagedHBaseClient(
                client, getConnectionTimeout()));

        return client;
    }

    /**
     * Builds a new {@link HBaseClient} according to the given {@link HBaseClientFactory}.
     * <p/>
     * If instrumentation {@link #instrumented is enabled} in the
     * configuration, this will build an {@link InstrumentedHBaseClient} wrapping the given {@link
     * HBaseClient}.
     * <p/>
     * If instrumentation is not enabled, the given {@link HBaseClient} will be returned verbatim.
     *
     * @param client an underlying {@link HBaseClient} implementation.
     * @param registry the {@link MetricRegistry} to register metrics with.
     * @param name the name of the client that is being instrumented.
     * @return an {@link HBaseClient} that satisfies the configuration of instrumentation.
     */
    private HBaseClient instrument(final HBaseClient client,
                                   final MetricRegistry registry,
                                   final String name) {
        return isInstrumented()
                ? new InstrumentedHBaseClient(client, registry, name)
                : client;
    }

    /**
     * Builds a new {@link HBaseClient} according to the given {@link HBaseClientFactory}.
     * <p/>
     * If the {@link #maxConcurrentRequests} is non-zero in the
     * configuration, this will build a {@link BoundedHBaseClient} that wraps the given client.
     * <p/>
     * If {@link #maxConcurrentRequests} is zero, the given {@link
     * HBaseClient} will be returned verbatim.
     *
     * @param client an underlying {@link HBaseClient} implementation.
     *
     * @return an {@link HBaseClient} that satisfies the configuration of the maximum concurrent
     *         requests.
     */
    private HBaseClient boundRequests(final HBaseClient client) {
        return getMaxConcurrentRequests() > 0
                ? new BoundedHBaseClient(client, getMaxConcurrentRequests())
                : client;
    }
}
TOP

Related Classes of com.datasift.dropwizard.hbase.HBaseClientFactory

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.