/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static junit.framework.Assert.*;
public class LeaseDatabaseLockerTest {
private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLockerTest.class);
JDBCPersistenceAdapter jdbc;
BrokerService brokerService;
EmbeddedDataSource dataSource;
@Before
public void setUpStore() throws Exception {
dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc = new JDBCPersistenceAdapter();
jdbc.setDataSource(dataSource);
brokerService = new BrokerService();
jdbc.setBrokerService(brokerService);
jdbc.getAdapter().doCreateTables(jdbc.getTransactionContext());
}
@Test
public void testLockInterleave() throws Exception {
LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
brokerService.setBrokerName("First");
lockerA.configure(jdbc);
final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
brokerService.setBrokerName("Second");
lockerB.configure(jdbc);
final AtomicBoolean blocked = new AtomicBoolean(true);
final Connection connection = dataSource.getConnection();
printLockTable(connection);
lockerA.start();
printLockTable(connection);
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerB.start();
blocked.set(false);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
assertTrue("B is blocked", blocked.get());
assertTrue("A is good", lockerA.keepAlive());
printLockTable(connection);
lockerA.stop();
printLockTable(connection);
TimeUnit.MILLISECONDS.sleep(2 * lockerB.getLockAcquireSleepInterval());
assertFalse("lockerB has the lock", blocked.get());
lockerB.stop();
printLockTable(connection);
}
@Test
public void testLockAcquireRace() throws Exception {
// build a fake lock
final String fakeId = "Anon";
final Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(jdbc.getStatements().getLeaseObtainStatement());
final long now = System.currentTimeMillis();
statement.setString(1,fakeId);
statement.setLong(2, now + 30000);
statement.setLong(3, now);
assertEquals("we got the lease", 1, statement.executeUpdate());
printLockTable(connection);
final LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
lockerA.setLeaseHolderId("A");
lockerA.configure(jdbc);
final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
lockerB.setLeaseHolderId("B");
lockerB.configure(jdbc);
final Set<LeaseDatabaseLocker> lockedSet = new HashSet<LeaseDatabaseLocker>();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerA.start();
lockedSet.add(lockerA);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
lockerB.start();
lockedSet.add(lockerB);
printLockTable(connection);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// sleep for a bit till both are alive
TimeUnit.SECONDS.sleep(2);
assertTrue("no start", lockedSet.isEmpty());
assertFalse("A is blocked", lockerA.keepAlive());
assertFalse("B is blocked", lockerB.keepAlive());
LOG.info("releasing phony lock " + fakeId);
statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement());
statement.setString(1, null);
statement.setLong(2, 0l);
statement.setString(3, fakeId);
assertEquals("we released " + fakeId, 1, statement.executeUpdate());
LOG.info("released " + fakeId);
printLockTable(connection);
TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
assertEquals("one locker started", 1, lockedSet.size());
assertTrue("one isAlive", lockerA.keepAlive() || lockerB.keepAlive());
LeaseDatabaseLocker winner = lockedSet.iterator().next();
winner.stop();
lockedSet.remove(winner);
TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
assertEquals("one locker started", 1, lockedSet.size());
lockedSet.iterator().next().stop();
printLockTable(connection);
}
private void printLockTable(Connection connection) throws Exception {
((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
}
}