@Test
public void testWatchesWithClientSessionTimeout() throws Exception {
NIOServerCnxnFactory serverCnxnFactory = new NIOServerCnxnFactory();
ZKDatabase database = new ZKDatabase(null);
database.setlastProcessedZxid(2L);
QuorumPeer quorumPeer = mock(QuorumPeer.class);
FileTxnSnapLog logfactory = mock(FileTxnSnapLog.class);
// Directories are not used but we need it to avoid NPE
when(logfactory.getDataDir()).thenReturn(new File("/tmp"));
when(logfactory.getSnapDir()).thenReturn(new File("/tmp"));
FollowerZooKeeperServer fzks = null;
try {
fzks = new FollowerZooKeeperServer(logfactory, quorumPeer, null,
database);
fzks.startup();
fzks.setServerCnxnFactory(serverCnxnFactory);
quorumPeer.follower = new MyFollower(quorumPeer, fzks);
final SelectionKey sk = new FakeSK();
// Simulate a socket channel between a client and a follower
final SocketChannel socketChannel = createClientSocketChannel();
// Create the NIOServerCnxn that will handle the client requests
final MockNIOServerCnxn nioCnxn = new MockNIOServerCnxn(fzks,
socketChannel, sk, serverCnxnFactory);
// Send the connection request as a client do
nioCnxn.doIO(sk);
// Send the invalid session packet to the follower
QuorumPacket qp = createInvalidSessionPacket();
quorumPeer.follower.processPacket(qp);
// OK, now the follower knows that the session is invalid, let's try
// to
// send it the watches
nioCnxn.doIO(sk);
// wait for the the request processor to do his job
Thread.sleep(1000L);
// Session has not been re-validated !
// If session has not been validated, there must be NO watches
int watchCount = database.getDataTree().getWatchCount();
LOG.info("watches = " + watchCount);
assertEquals(0, watchCount);
} finally {
if (fzks != null) {
fzks.shutdown();