Package com.spotify.helios.servicescommon.coordination

Source Code of com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCacheTest$DataPojo

/*
* Copyright (c) 2014 Spotify AB.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.helios.servicescommon.coordination;

import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.SettableFuture;

import com.spotify.helios.Parallelized;
import com.spotify.helios.Polling;
import com.spotify.helios.ZooKeeperTestManager;
import com.spotify.helios.ZooKeeperTestingServerManager;
import com.spotify.helios.common.Json;

import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.zookeeper.KeeperException.NoNodeException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

@RunWith(Parallelized.class)
public class PersistentPathChildrenCacheTest {

  public static class DataPojo {

    public DataPojo() {
    }

    public DataPojo(final String name) {
      this.name = name;
    }

    public String name;
    public int bar = 17;
    public Map<String, String> baz = ImmutableMap.of("foos", "bars");

    @Override
    public boolean equals(final Object o) {
      if (this == o) {
        return true;
      }
      if (o == null || getClass() != o.getClass()) {
        return false;
      }

      final DataPojo dataPojo = (DataPojo) o;

      if (bar != dataPojo.bar) {
        return false;
      }
      if (baz != null ? !baz.equals(dataPojo.baz) : dataPojo.baz != null) {
        return false;
      }
      if (name != null ? !name.equals(dataPojo.name) : dataPojo.name != null) {
        return false;
      }

      return true;
    }

    @Override
    public int hashCode() {
      int result = name != null ? name.hashCode() : 0;
      result = 31 * result + bar;
      result = 31 * result + (baz != null ? baz.hashCode() : 0);
      return result;
    }

    @Override
    public String toString() {
      return Objects.toStringHelper(this)
          .add("name", name)
          .add("bar", bar)
          .add("baz", baz)
          .toString();
    }
  }

  private static final String PATH = "/foos";

  private ZooKeeperTestManager zk;

  private PersistentPathChildrenCache<DataPojo> cache;
  private Path directory;
  private Path stateFile;

  private PersistentPathChildrenCache.Listener listener =
      mock(PersistentPathChildrenCache.Listener.class);

  @Before
  public void setup() throws Exception {
    zk = new ZooKeeperTestingServerManager();
    zk.ensure("/foos");
    directory = Files.createTempDirectory("helios-test");
    stateFile = directory.resolve("persistent-path-children-cache-test-nodes.json");
    startCache();
  }

  @After
  public void teardown() throws Exception {
    FileUtils.deleteQuietly(directory.toFile());
    zk.close();
  }

  @Test
  public void verifyListenerCalledOnNodeAdd() throws Exception {
    final DataPojo created = new DataPojo("foo");
    ensure("/foos/foo", created);
    verify(listener, timeout(60000).atLeastOnce()).nodesChanged(cache);
    final DataPojo read = Iterables.getOnlyElement(cache.getNodes().values());
    assertEquals(created, read);
  }

  @Test
  public void verifyListenerCalledOnNodeChange() throws Exception {
    final DataPojo created = new DataPojo("foo");
    ensure("/foos/foo", created);
    verify(listener, timeout(60000).atLeastOnce()).nodesChanged(cache);
    reset(listener);
    final DataPojo changed = new DataPojo("foo-changed");
    zk.curator().setData().forPath("/foos/foo", Json.asBytesUnchecked(changed));
    verify(listener, timeout(60000).atLeastOnce()).nodesChanged(cache);
    final DataPojo read = Iterables.getOnlyElement(cache.getNodes().values());
    assertEquals(changed, read);
  }

  @Test
  public void verifyListenerCalledOnNodeRemoved() throws Exception {
    ensure("/foos/foo", new DataPojo("foo"));
    verify(listener, timeout(60000).atLeastOnce()).nodesChanged(cache);
    reset(listener);
    try {
      zk.curator().delete().forPath("/foos/foo");
    } catch (NoNodeException ignore) {
    }
    verify(listener, timeout(60000).atLeastOnce()).nodesChanged(cache);
    assertTrue(cache.getNodes().isEmpty());
  }

  @Test
  public void verifyNodesAreRetainedWhenZKGoesDown() throws Exception {
    // Create two nodes
    final String FOO1 = "/foos/foo1";
    final String FOO2 = "/foos/foo2";
    final Set<String> paths = ImmutableSet.of(FOO1, FOO2);
    for (String path : paths) {
      ensure(path, new DataPojo(path));
    }

    // Wait for the cache to pick them up
    Polling.await(5, MINUTES, new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return cache.getNodes().keySet().equals(paths) ? true : null;
      }
    });

    verify(listener, atLeastOnce()).nodesChanged(cache);

    // Take down zk
    zk.stop();

    // Wait for connection to be lost
    final SettableFuture<Void> connectionLost = SettableFuture.create();
    doAnswer(new Answer<Object>() {
      @Override
      public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
        if (invocationOnMock.getArguments()[0] == ConnectionState.LOST) {
          connectionLost.set(null);
        }
        return null;
      }
    }).when(listener).connectionStateChanged(any(ConnectionState.class));
    connectionLost.get(5, MINUTES);

    // Keep probing for 30 seconds to build some confidence that the snapshot is not going away
    for (int i = 0; i < 30; i++) {
      Thread.sleep(1000);
      assertEquals(paths, cache.getNodes().keySet());
    }
  }

  @Test
  public void verifyNodesRemovedWhilePathChildrenCacheIsDownAreDetected() throws Exception {
    // Create two nodes
    final String FOO1 = "/foos/foo1";
    final String FOO2 = "/foos/foo2";
    final Set<String> paths = ImmutableSet.of(FOO1, FOO2);
    for (String path : paths) {
      ensure(path, new DataPojo(path));
    }

    // Wait for the cache to pick them up
    Polling.await(5, MINUTES, new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return cache.getNodes().keySet().equals(paths) ? true : null;
      }
    });

    verify(listener, atLeastOnce()).nodesChanged(cache);

    // Stop the cache
    stopCache();

    // Remove a node
    try {
      zk.curator().delete().forPath(FOO1);
    } catch (NoNodeException ignore) {
    }

    // Start the cache
    startCache();

    // Wait for the cache to reflect that there's only one node left
    final Set<String> postDeletePaths = ImmutableSet.of(FOO2);
    Polling.await(5, MINUTES, new Callable<Object>() {
      @Override
      public Object call() throws Exception {
        return cache.getNodes().keySet().equals(postDeletePaths) ? true : null;
      }
    });

    verify(listener, atLeastOnce()).nodesChanged(cache);
  }

  private void startCache() throws IOException, InterruptedException {
    reset(listener);
    cache = new PersistentPathChildrenCache<>(zk.curator(), PATH, null, stateFile, Json.type(DataPojo.class));
    cache.addListener(listener);
    cache.startAsync().awaitRunning();
  }

  private void stopCache() {
    cache.stopAsync().awaitTerminated();
  }

  private void ensure(final String path, final Object value) throws Exception {
    zk.ensure(ZKPaths.getPathAndNode(path).getPath());
    try {
      zk.curator().create().forPath(path, Json.asBytesUnchecked(value));
    } catch (KeeperException.NodeExistsException ignore) {
    }

  }

}
TOP

Related Classes of com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCacheTest$DataPojo

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.