Package org.apache.phoenix.trace

Source Code of org.apache.phoenix.trace.PhoenixTracingEndToEndIT

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.trace;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.end2end.HBaseManagedTimeTest;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.metrics.TracingTestCompat;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.Hadoop1TracingTestEnabler.Hadoop1Disabled;
import org.apache.phoenix.trace.TraceReader.SpanInfo;
import org.apache.phoenix.trace.TraceReader.TraceHolder;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.SpanReceiver;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

/**
* Test that the logging sink stores the expected metrics/stats
*/
@RunWith(Hadoop1TracingTestEnabler.class)
@Hadoop1Disabled("tracing")
@Category(HBaseManagedTimeTest.class)
public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {

    private static final Log LOG = LogFactory.getLog(PhoenixTracingEndToEndIT.class);
    private static final int MAX_RETRIES = 10;
    private final String table = "ENABLED_FOR_LOGGING";
    private final String index = "ENABALED_FOR_LOGGING_INDEX";

    private static DisableableMetricsWriter sink;

    @BeforeClass
    public static void setupMetrics() throws Exception {
        if (shouldEarlyExitForHadoop1Test()) {
            return;
        }
        PhoenixTableMetricsWriter pWriter = new PhoenixTableMetricsWriter();
        Connection conn = getConnectionWithoutTracing();
        pWriter.initForTesting(conn);
        sink = new DisableableMetricsWriter(pWriter);

        TracingTestCompat.registerSink(sink);
    }

    @After
    public void cleanup() {
        sink.disable();
        sink.clear();
        sink.enable();

        // LISTENABLE.clearListeners();
    }

    private static void waitForCommit(CountDownLatch latch) throws SQLException {
        Connection conn = new CountDownConnection(getConnectionWithoutTracing(), latch);
        replaceWriterConnection(conn);
    }

    private static void replaceWriterConnection(Connection conn) throws SQLException {
        // disable the writer
        sink.disable();

        // swap the connection for one that listens
        sink.getDelegate().initForTesting(conn);

        // enable the writer
        sink.enable();
    }

    /**
     * Simple test that we can correctly write spans to the phoenix table
     * @throws Exception on failure
     */
    @Test
    public void testWriteSpans() throws Exception {
        // get a receiver for the spans
        SpanReceiver receiver = TracingCompat.newTraceMetricSource();
        // which also needs to a source for the metrics system
        Metrics.getManager().registerSource("testWriteSpans-source", "source for testWriteSpans",
            receiver);

        // watch our sink so we know when commits happen
        CountDownLatch latch = new CountDownLatch(1);
        waitForCommit(latch);

        // write some spans
        TraceScope trace = Trace.startSpan("Start write test", Sampler.ALWAYS);
        Span span = trace.getSpan();

        // add a child with some annotations
        Span child = span.child("child 1");
        child.addTimelineAnnotation("timeline annotation");
        TracingCompat.addAnnotation(child, "test annotation", 10);
        child.stop();

        // sleep a little bit to get some time difference
        Thread.sleep(100);

        trace.close();

        // pass the trace on
        receiver.receiveSpan(span);

        // wait for the tracer to actually do the write
        latch.await();

        // look for the writes to make sure they were made
        Connection conn = getConnectionWithoutTracing();
        checkStoredTraces(conn, new TraceChecker() {
            @Override
            public boolean foundTrace(TraceHolder trace, SpanInfo info) {
                if (info.description.equals("child 1")) {
                    assertEquals("Not all annotations present", 1, info.annotationCount);
                    assertEquals("Not all tags present", 1, info.tagCount);
                    boolean found = false;
                    for (String annotation : info.annotations) {
                        if (annotation.startsWith("test annotation")) {
                            found = true;
                        }
                    }
                    assertTrue("Missing the annotations in span: " + info, found);
                    found = false;
                    for (String tag : info.tags) {
                        if (tag.endsWith("timeline annotation")) {
                            found = true;
                        }
                    }
                    assertTrue("Missing the tags in span: " + info, found);
                    return true;
                }
                return false;
            }
        });
    }

    /**
     * Test that span will actually go into the this sink and be written on both side of the wire,
     * through the indexing code.
     * @throws Exception
     */
    @Test
    public void testClientServerIndexingTracing() throws Exception {
        // one call for client side, one call for server side
        final CountDownLatch updated = new CountDownLatch(2);
        waitForCommit(updated);

        // separate connection so we don't create extra traces
        Connection conn = getConnectionWithoutTracing();
        createTestTable(conn, true);

        // trace the requests we send
        Connection traceable = getTracingConnection();
        LOG.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + table + " VALUES (?, ?)";
        PreparedStatement stmt = traceable.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1);
        // this first trace just does a simple open/close of the span. Its not doing anything
        // terribly interesting because we aren't auto-committing on the connection, so it just
        // updates the mutation state and returns.
        stmt.execute();
        stmt.setString(1, "key2");
        stmt.setLong(2, 2);
        stmt.execute();
        traceable.commit();

        // wait for the latch to countdown, as the metrics system is time-based
        LOG.debug("Waiting for latch to complete!");
        updated.await(200, TimeUnit.SECONDS);// should be way more than GC pauses

        // read the traces back out

        /* Expected:
         * 1. Single element trace - for first PreparedStatement#execute span
         * 2. Two element trace for second PreparedStatement#execute span
         *  a. execute call
         *  b. metadata lookup*
         * 3. Commit trace.
         *  a. Committing to tables
         *    i. Committing to single table
         *    ii. hbase batch write*
         *    i.I. span on server
         *    i.II. building index updates
         *    i.III. waiting for latch
         * where '*' is a generically named thread (e.g phoenix-1-thread-X)
         */
        boolean indexingCompleted = checkStoredTraces(conn, new TraceChecker() {
            @Override
            public boolean foundTrace(TraceHolder trace, SpanInfo span) {
                String traceInfo = trace.toString();
                // skip logging traces that are just traces about tracing
                if (traceInfo.contains(QueryServicesOptions.DEFAULT_TRACING_STATS_TABLE_NAME)) {
                    return false;
                }
                if (traceInfo.contains("Completing index")) {
                    return true;
                }
                return false;
            }
        });

        assertTrue("Never found indexing updates", indexingCompleted);
    }

    private void createTestTable(Connection conn, boolean withIndex) throws SQLException {
        // create a dummy table
        String ddl =
                "create table if not exists " + table + "(" + "k varchar not null, " + "c1 bigint"
                        + " CONSTRAINT pk PRIMARY KEY (k))";
        conn.createStatement().execute(ddl);

        // early exit if we don't need to create an index
        if (!withIndex) {
            return;
        }
        // create an index on the table - we know indexing has some basic tracing
        ddl = "CREATE INDEX IF NOT EXISTS " + index + " on " + table + " (c1)";
        conn.createStatement().execute(ddl);
        conn.commit();
    }

    @Test
    public void testScanTracing() throws Exception {
        // separate connections to minimize amount of traces that are generated
        Connection traceable = getTracingConnection();
        Connection conn = getConnectionWithoutTracing();

        // one call for client side, one call for server side
        CountDownLatch updated = new CountDownLatch(2);
        waitForCommit(updated);

        // create a dummy table
        createTestTable(conn, false);

        // update the table, but don't trace these, to simplify the traces we read
        LOG.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + table + " VALUES (?, ?)";
        PreparedStatement stmt = conn.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1);
        stmt.execute();
        conn.commit();
        conn.rollback();

        // setup for next set of updates
        stmt.setString(1, "key2");
        stmt.setLong(2, 2);
        stmt.execute();
        conn.commit();
        conn.rollback();

        // do a scan of the table
        String read = "SELECT * FROM " + table;
        ResultSet results = traceable.createStatement().executeQuery(read);
        assertTrue("Didn't get first result", results.next());
        assertTrue("Didn't get second result", results.next());
        results.close();

        assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS));
        // don't trace reads either
        boolean tracingComplete = checkStoredTraces(conn, new TraceChecker(){

            @Override
            public boolean foundTrace(TraceHolder currentTrace) {
                String traceInfo = currentTrace.toString();
                return traceInfo.contains("Parallel scanner");
            }
        });
        assertTrue("Didn't find the parallel scanner in the tracing", tracingComplete);
    }

    @Test
    public void testScanTracingOnServer() throws Exception {
        // separate connections to minimize amount of traces that are generated
        Connection traceable = getTracingConnection();
        Connection conn = getConnectionWithoutTracing();

        // one call for client side, one call for server side
        CountDownLatch updated = new CountDownLatch(2);
        waitForCommit(updated);

        // create a dummy table
        createTestTable(conn, false);

        // update the table, but don't trace these, to simplify the traces we read
        LOG.debug("Doing dummy the writes to the tracked table");
        String insert = "UPSERT INTO " + table + " VALUES (?, ?)";
        PreparedStatement stmt = conn.prepareStatement(insert);
        stmt.setString(1, "key1");
        stmt.setLong(2, 1);
        stmt.execute();
        conn.commit();
        conn.rollback();

        // setup for next set of updates
        stmt.setString(1, "key2");
        stmt.setLong(2, 2);
        stmt.execute();
        conn.commit();
        conn.rollback();

        // do a scan of the table
        String read = "SELECT COUNT(*) FROM " + table;
        ResultSet results = traceable.createStatement().executeQuery(read);
        assertTrue("Didn't get count result", results.next());
        // make sure we got the expected count
        assertEquals("Didn't get the expected number of row", 2, results.getInt(1));
        results.close();

        assertTrue("Get expected updates to trace table", updated.await(200, TimeUnit.SECONDS));
        // don't trace reads either
        boolean found = checkStoredTraces(conn, new TraceChecker() {
            @Override
            public boolean foundTrace(TraceHolder trace) {
                String traceInfo = trace.toString();
                return traceInfo.contains(BaseScannerRegionObserver.SCANNER_OPENED_TRACE_INFO);
            }
        });
        assertTrue("Didn't find the parallel scanner in the tracing", found);
    }

    private boolean checkStoredTraces(Connection conn, TraceChecker checker) throws Exception {
        TraceReader reader = new TraceReader(conn);
        int retries = 0;
        boolean found = false;
        outer: while (retries < MAX_RETRIES) {
            Collection<TraceHolder> traces = reader.readAll(100);
            for (TraceHolder trace : traces) {
                LOG.info("Got trace: " + trace);
                found = checker.foundTrace(trace);
                if (found) {
                    break outer;
                }
                for (SpanInfo span : trace.spans) {
                    found = checker.foundTrace(trace, span);
                    if (found) {
                        break outer;
                    }
                }
            }
            LOG.info("======  Waiting for tracing updates to be propagated ========");
            Thread.sleep(1000);
            retries++;
        }
        return found;
    }

    private abstract class TraceChecker {
        public boolean foundTrace(TraceHolder currentTrace) {
            return false;
        }

        public boolean foundTrace(TraceHolder currentTrace, SpanInfo currentSpan) {
            return false;
        }
    }

    private static class CountDownConnection extends DelegatingConnection {
        private CountDownLatch commit;

        @SuppressWarnings("unchecked")
        public CountDownConnection(Connection conn, CountDownLatch commit) {
            super(conn);
            this.commit = commit;
        }

        @Override
        public void commit() throws SQLException {
            commit.countDown();
            super.commit();
        }

    }
}
TOP

Related Classes of org.apache.phoenix.trace.PhoenixTracingEndToEndIT

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.