Package org.elasticsearch.gateway.local

Source Code of org.elasticsearch.gateway.local.SimpleRecoveryLocalGatewayTests

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway.local;

import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.Test;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;

/**
*
*/
@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
@Slow
@TestLogging("index.shard.service:TRACE,index.gateway.local:TRACE")
public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTest {

    private ImmutableSettings.Builder settingsBuilder() {
        return ImmutableSettings.settingsBuilder().put("gateway.type", "local");
    }

    @Test
    @Slow
    public void testOneNodeRecoverFromGateway() throws Exception {

        internalCluster().startNode(settingsBuilder().build());

        String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
                .startObject("properties").startObject("appAccountIds").field("type", "string").endObject().endObject()
                .endObject().endObject().string();
        assertAcked(prepareCreate("test").addMapping("type1", mapping));


        client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject()
                .field("_id", "10990239")
                .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
        client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject()
                .field("_id", "10990473")
                .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
        client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject()
                .field("_id", "10990513")
                .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
        client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject()
                .field("_id", "10990695")
                .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
        client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject()
                .field("_id", "11026351")
                .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();

        refresh();
        assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
        ensureYellow("test"); // wait for primary allocations here otherwise if we have a lot of shards we might have a
        // shard that is still in post recovery when we restart and the ensureYellow() below will timeout
        internalCluster().fullRestart();

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureYellow();

        client().admin().indices().prepareRefresh().execute().actionGet();
        assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);

        internalCluster().fullRestart();

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureYellow();

        client().admin().indices().prepareRefresh().execute().actionGet();
        assertHitCount(client().prepareCount().setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
    }

    @Test
    @Slow
    public void testSingleNodeNoFlush() throws Exception {

        internalCluster().startNode(settingsBuilder().build());

        String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
                .startObject("properties").startObject("field").field("type", "string").endObject().startObject("num").field("type", "integer").endObject().endObject()
                .endObject().endObject().string();
        // note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test.
        int numberOfShards = numberOfShards();
        assertAcked(prepareCreate("test").setSettings(
                SETTING_NUMBER_OF_SHARDS, numberOfShards(),
                SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)
        ).addMapping("type1", mapping));

        int value1Docs;
        int value2Docs;
        boolean indexToAllShards = randomBoolean();

        if (indexToAllShards) {
            // insert enough docs so all shards will have a doc
            value1Docs = randomIntBetween(numberOfShards * 10, numberOfShards * 20);
            value2Docs = randomIntBetween(numberOfShards * 10, numberOfShards * 20);

        } else {
            // insert a two docs, some shards will not have anything
            value1Docs = 1;
            value2Docs = 1;
        }


        for (int i = 0; i < 1 + randomInt(100); i++) {
            for (int id = 0; id < Math.max(value1Docs, value2Docs); id++) {
                if (id < value1Docs) {
                    index("test", "type1", "1_" + id,
                            jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()
                    );
                }
                if (id < value2Docs) {
                    index("test", "type1", "2_" + id,
                            jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject()
                    );
                }
            }

        }

        refresh();

        for (int i = 0; i <= randomInt(10); i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).get(), value1Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).get(), value2Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).get(), value1Docs);
        }
        if (!indexToAllShards) {
            // we have to verify primaries are started for them to be restored
            logger.info("Ensure all primaries have been started");
            ensureYellow();
        }
        internalCluster().fullRestart();

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureYellow();

        for (int i = 0; i <= randomInt(10); i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).get(), value1Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).get(), value2Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).get(), value1Docs);
        }

        internalCluster().fullRestart();


        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureYellow();

        for (int i = 0; i <= randomInt(10); i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).get(), value1Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).get(), value2Docs);
            assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).get(), value1Docs);
        }
    }


    @Test
    @Slow
    public void testSingleNodeWithFlush() throws Exception {

        internalCluster().startNode(settingsBuilder().build());
        client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
        flush();
        client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
        refresh();

        assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);

        ensureYellow("test"); // wait for primary allocations here otherwise if we have a lot of shards we might have a
        // shard that is still in post recovery when we restart and the ensureYellow() below will timeout

        internalCluster().fullRestart();

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureYellow();

        for (int i = 0; i < 10; i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
        }

        internalCluster().fullRestart();

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureYellow();

        for (int i = 0; i < 10; i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
        }
    }

    @Test
    @Slow
    public void testTwoNodeFirstNodeCleared() throws Exception {

        final String firstNode = internalCluster().startNode(settingsBuilder().build());
        internalCluster().startNode(settingsBuilder().build());

        client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
        flush();
        client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
        refresh();

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureGreen();

        for (int i = 0; i < 10; i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
        }

        internalCluster().fullRestart(new RestartCallback() {
            @Override
            public Settings onNodeStopped(String nodeName) throws Exception {
                return settingsBuilder().put("gateway.recover_after_nodes", 2).build();
            }

            @Override
            public boolean clearData(String nodeName) {
                return firstNode.equals(nodeName);
            }

        });

        logger.info("Running Cluster Health (wait for the shards to startup)");
        ensureGreen();

        for (int i = 0; i < 10; i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
        }
    }

    @Test
    @Slow
    public void testLatestVersionLoaded() throws Exception {
        // clean two nodes
        internalCluster().startNodesAsync(2, settingsBuilder().put("gateway.recover_after_nodes", 2).build()).get();

        client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).execute().actionGet();
        client().admin().indices().prepareFlush().execute().actionGet();
        client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).execute().actionGet();
        client().admin().indices().prepareRefresh().execute().actionGet();

        logger.info("--> running cluster_health (wait for the shards to startup)");
        ensureGreen();

        for (int i = 0; i < 10; i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
        }

        String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().uuid();
        assertThat(metaDataUuid, not(equalTo("_na_")));

        logger.info("--> closing first node, and indexing more data to the second node");
        internalCluster().fullRestart(new RestartCallback() {

            @Override
            public void doAfterNodes(int numNodes, Client client) throws Exception {
                if (numNodes == 1) {
                    logger.info("--> one node is closed - start indexing data into the second one");
                    client.prepareIndex("test", "type1", "3").setSource(jsonBuilder().startObject().field("field", "value3").endObject()).execute().actionGet();
                    client.admin().indices().prepareRefresh().execute().actionGet();

                    for (int i = 0; i < 10; i++) {
                        assertHitCount(client.prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3);
                    }

                    logger.info("--> add some metadata, additional type and template");
                    client.admin().indices().preparePutMapping("test").setType("type2")
                            .setSource(jsonBuilder().startObject().startObject("type2").startObject("_source").field("enabled", false).endObject().endObject().endObject())
                            .execute().actionGet();
                    client.admin().indices().preparePutTemplate("template_1")
                            .setTemplate("te*")
                            .setOrder(0)
                            .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
                                    .startObject("field1").field("type", "string").field("store", "yes").endObject()
                                    .startObject("field2").field("type", "string").field("store", "yes").field("index", "not_analyzed").endObject()
                                    .endObject().endObject().endObject())
                            .execute().actionGet();
                    client.admin().indices().prepareAliases().addAlias("test", "test_alias", FilterBuilders.termFilter("field", "value")).execute().actionGet();
                    logger.info("--> starting two nodes back, verifying we got the latest version");
                }

            }

        });

        logger.info("--> running cluster_health (wait for the shards to startup)");
        ensureGreen();

        assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().uuid(), equalTo(metaDataUuid));

        for (int i = 0; i < 10; i++) {
            assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 3);
        }

        ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
        assertThat(state.metaData().index("test").mapping("type2"), notNullValue());
        assertThat(state.metaData().templates().get("template_1").template(), equalTo("te*"));
        assertThat(state.metaData().index("test").aliases().get("test_alias"), notNullValue());
        assertThat(state.metaData().index("test").aliases().get("test_alias").filter(), notNullValue());
    }

    @Test
    @Slow
    public void testReusePeerRecovery() throws Exception {
        final Settings settings = settingsBuilder()
                .put("action.admin.cluster.node.shutdown.delay", "10ms")
                .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false)
                .put("gateway.recover_after_nodes", 4)
                .put(MockDirectoryHelper.CRASH_INDEX, false).build();

        internalCluster().startNodesAsync(4, settings).get();
        // prevent any rebalance actions during the peer recovery
        // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
        // we reuse the files on disk after full restarts for replicas.
        assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
        logger.info("--> indexing docs");
        for (int i = 0; i < 1000; i++) {
            client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
            if ((i % 200) == 0) {
                client().admin().indices().prepareFlush().execute().actionGet();
            }
        }
        client().admin().indices().prepareFlush().execute().actionGet();

        logger.info("Running Cluster Health");
        ensureGreen();

        logger.info("--> shutting down the nodes");

        // Disable allocations while we are closing nodes
        client().admin().cluster().prepareUpdateSettings()
                .setTransientSettings(settingsBuilder()
                        .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
                .get();
        internalCluster().fullRestart();

        logger.info("Running Cluster Health");
        ensureGreen();
        logger.info("--> shutting down the nodes");
        // Disable allocations while we are closing nodes
        client().admin().cluster().prepareUpdateSettings()
                .setTransientSettings(settingsBuilder()
                        .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
                .get();
        internalCluster().fullRestart();


        logger.info("Running Cluster Health");
        ensureGreen();

        RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();

        for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
            RecoveryState recoveryState = response.recoveryState();
            if (!recoveryState.getPrimary()) {
                logger.info("--> shard {}, recovered {}, reuse {}", response.getShardId(), recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount());
                assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(0l));
                assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(0l));
                assertThat(recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount()));
                assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
                assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
                assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
                assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes()));
            } else {
                assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(recoveryState.getIndex().reusedByteCount()));
                assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(recoveryState.getIndex().reusedFileCount()));
            }
        }

    }

    @Test
    @Slow
    public void testRecoveryDifferentNodeOrderStartup() throws Exception {
        // we need different data paths so we make sure we start the second node fresh

        final String node_1 = internalCluster().startNode(settingsBuilder().put("path.data", "data/data1").build());

        client().prepareIndex("test", "type1", "1").setSource("field", "value").execute().actionGet();

        internalCluster().startNode(settingsBuilder().put("path.data", "data/data2").build());

        ensureGreen();

        internalCluster().fullRestart(new RestartCallback() {

            @Override
            public boolean doRestart(String nodeName) {
                return !node_1.equals(nodeName);
            }
        });

        ensureYellow();

        assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
        assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
    }

}
TOP

Related Classes of org.elasticsearch.gateway.local.SimpleRecoveryLocalGatewayTests

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.