Package org.elasticsearch.benchmark.search.child

Source Code of org.elasticsearch.benchmark.search.child.ChildSearchAndIndexingBenchmark$SearchThread

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.search.child;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.Node;

import java.util.Arrays;
import java.util.Random;

import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.FilterBuilders.hasChildFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;

/**
*
*/
public class ChildSearchAndIndexingBenchmark {

    static int PARENT_COUNT = (int) SizeValue.parseSizeValue("1m").singles();
    static int NUM_CHILDREN_PER_PARENT = 12;
    static int QUERY_VALUE_RATIO_PER_PARENT = 3;
    static int QUERY_COUNT = 50;
    static String indexName = "test";
    static Random random = new Random();

    public static void main(String[] args) throws Exception {
        Settings settings = settingsBuilder()
                .put("refresh_interval", "-1")
                .put("gateway.type", "local")
                .put(SETTING_NUMBER_OF_SHARDS, 1)
                .put(SETTING_NUMBER_OF_REPLICAS, 0)
                .build();

        String clusterName = ChildSearchAndIndexingBenchmark.class.getSimpleName();
        Node node1 = nodeBuilder().settings(settingsBuilder().put(settings).put("name", "node1"))
                .clusterName(clusterName)
                .node();
        Client client = node1.client();

        client.admin().cluster().prepareHealth(indexName).setWaitForGreenStatus().setTimeout("10s").execute().actionGet();
        try {
            client.admin().indices().create(createIndexRequest(indexName)).actionGet();
            client.admin().indices().preparePutMapping(indexName).setType("child").setSource(XContentFactory.jsonBuilder().startObject().startObject("child")
                    .startObject("_parent").field("type", "parent").endObject()
                    .endObject().endObject()).execute().actionGet();
            Thread.sleep(5000);

            long startTime = System.currentTimeMillis();
            ParentChildIndexGenerator generator = new ParentChildIndexGenerator(client, PARENT_COUNT, NUM_CHILDREN_PER_PARENT, QUERY_VALUE_RATIO_PER_PARENT);
            generator.index();
            System.out.println("--> Indexing took " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds.");
        } catch (IndexAlreadyExistsException e) {
            System.out.println("--> Index already exists, ignoring indexing phase, waiting for green");
            ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth(indexName).setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
            if (clusterHealthResponse.isTimedOut()) {
                System.err.println("--> Timed out waiting for cluster health");
            }
        }
        client.admin().indices().prepareRefresh().execute().actionGet();
        System.out.println("--> Number of docs in index: " + client.prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount());

        SearchThread searchThread = new SearchThread(client);
        new Thread(searchThread).start();
        IndexThread indexThread = new IndexThread(client);
        new Thread(indexThread).start();

        System.in.read();

        indexThread.stop();
        searchThread.stop();
        client.close();
        node1.close();
    }

    static class IndexThread implements Runnable {

        private final Client client;
        private volatile boolean run = true;

        IndexThread(Client client) {
            this.client = client;
        }

        @Override
        public void run() {
            while (run) {
                int childIdLimit = PARENT_COUNT * NUM_CHILDREN_PER_PARENT;
                for (int childId = 1; run && childId < childIdLimit;) {
                    try {
                        for (int j = 0; j < 8; j++) {
                            GetResponse getResponse = client
                                    .prepareGet(indexName, "child", String.valueOf(++childId))
                                    .setFields("_source", "_parent")
                                    .setRouting("1") // Doesn't matter what value, since there is only one shard
                                    .get();
                            client.prepareIndex(indexName, "child", Integer.toString(childId) + "_" + j)
                                    .setParent(getResponse.getField("_parent").getValue().toString())
                                    .setSource(getResponse.getSource())
                                    .get();
                        }
                        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
                        Thread.sleep(1000);
                        if (childId % 500 == 0) {
                            NodesStatsResponse statsResponse = client.admin().cluster().prepareNodesStats()
                                    .clear().setIndices(true).execute().actionGet();
                            System.out.println("Deleted docs: " + statsResponse.getAt(0).getIndices().getDocs().getDeleted());
                        }
                    } catch (Throwable e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public void stop() {
            run = false;
        }

    }

    static class SearchThread implements Runnable {

        private final Client client;
        private final int numValues;
        private volatile boolean run = true;

        SearchThread(Client client) {
            this.client = client;
            this.numValues = NUM_CHILDREN_PER_PARENT / NUM_CHILDREN_PER_PARENT;
        }

        @Override
        public void run() {
            while (run) {
                try {
                    long totalQueryTime = 0;
                    for (int j = 0; j < QUERY_COUNT; j++) {
                        SearchResponse searchResponse = client.prepareSearch(indexName)
                                .setQuery(
                                        filteredQuery(
                                                matchAllQuery(),
                                                hasChildFilter("child", termQuery("field2", "value" + random.nextInt(numValues)))
                                        )
                                )
                                .execute().actionGet();
                        if (searchResponse.getFailedShards() > 0) {
                            System.err.println("Search Failures " + Arrays.toString(searchResponse.getShardFailures()));
                        }
                        totalQueryTime += searchResponse.getTookInMillis();
                    }
                    System.out.println("--> has_child filter with term filter Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms");

                    totalQueryTime = 0;
                    for (int j = 1; j <= QUERY_COUNT; j++) {
                        SearchResponse searchResponse = client.prepareSearch(indexName)
                                .setQuery(
                                        filteredQuery(
                                                matchAllQuery(),
                                                hasChildFilter("child", matchAllQuery())
                                        )
                                )
                                .execute().actionGet();
                        if (searchResponse.getFailedShards() > 0) {
                            System.err.println("Search Failures " + Arrays.toString(searchResponse.getShardFailures()));
                        }
                        totalQueryTime += searchResponse.getTookInMillis();
                    }
                    System.out.println("--> has_child filter with match_all child query, Query Avg: " + (totalQueryTime / QUERY_COUNT) + "ms");

                    NodesStatsResponse statsResponse = client.admin().cluster().prepareNodesStats()
                            .setJvm(true).execute().actionGet();
                    System.out.println("--> Committed heap size: " + statsResponse.getNodes()[0].getJvm().getMem().getHeapCommitted());
                    System.out.println("--> Used heap size: " + statsResponse.getNodes()[0].getJvm().getMem().getHeapUsed());
                    Thread.sleep(1000);
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }

        public void stop() {
            run = false;
        }

    }

}
TOP

Related Classes of org.elasticsearch.benchmark.search.child.ChildSearchAndIndexingBenchmark$SearchThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.