package ru.decipher;
import org.junit.Before;
import org.junit.Test;
import ru.decipher.exception.ProcessorTaskSubmissionException;
import ru.decipher.extraction.CallbackHandler;
import ru.decipher.extraction.Provider;
import ru.decipher.extraction.http.impl.BasicHttpRequest;
import ru.decipher.extraction.impl.GeneralProcessor;
import ru.decipher.mock.http.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static java.lang.Thread.sleep;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
/**
* User: Alexander Paderin (apocarteres@gmail.com)
* Date: 10/23/13
* Time: 8:53 AM
*/
public class GeneralProcessorTest {
private GeneralProcessor<BasicHttpRequest> processor;
private MockHttpProviderWithProxySupportFactory factory;
private BasicHttpRequest request;
private MockHttpHandlerWithSuccessResult handler;
private MockHttpTask task;
@Before
public void setUp() throws Exception {
processor = new GeneralProcessor<>(10);
factory = new MockHttpProviderWithProxySupportFactory();
request = new BasicHttpRequest();
handler = new MockHttpHandlerWithSuccessResult();
task = new MockHttpTask();
task.registerHandler(handler);
task.setFactory(factory);
}
@Test
public void testThatTaskWillBeSubmittedWell() throws Exception {
final String url = "test://index";
request.setUrl(url);
task.setRequest(request);
task.registerHandler(new CallbackHandler<BasicHttpRequest>() {
@Override
public void onProcessorResponse(BasicHttpRequest request) throws Exception {
sleep(1000);
}
@Override
public <U extends Provider<? extends BasicHttpRequest>> void beforeExecution(BasicHttpRequest request, U provider) {
}
});
processor.submit(task);
assertEquals(1, processor.getTaskCount());
}
@Test
public void testThatTaskWillBeReleasedFromQueueWhenReady() throws Exception {
final String url = "test://index";
request.setUrl(url);
task.setRequest(request);
processor.submit(task);
processor.shutdown();
assertEquals(0, processor.getTaskCount());
}
@Test
public void testThatTaskWillBeReleasedFromQueueWhenReadyIfPreviouslySubmittedTaskAreStillActive() throws Exception {
final String url = "test://index";
final MockHttpTask task1 = new MockHttpTask();
task1.setRequest(new BasicHttpRequest(url));
task1.registerHandler(new CallbackHandler<BasicHttpRequest>() {
@Override
public void onProcessorResponse(BasicHttpRequest request) throws Exception {
sleep(2000);
}
@Override
public <U extends Provider<? extends BasicHttpRequest>> void beforeExecution(BasicHttpRequest request, U provider) {
}
});
final MockHttpTask task2 = new MockHttpTask();
task2.setRequest(new BasicHttpRequest(url));
task1.setFactory(factory);
task2.setFactory(factory);
processor.submit(task1);
processor.submit(task2);
sleep(500);//task2 should be already done here
final long taskCount = processor.getTaskCount();
processor.shutdown();
assertEquals(1, taskCount);
}
@Test
public void testThatCallbackInvokesWhenTaskExecuted() throws Exception {
final String url = "test://index";
request.setUrl(url);
task.setRequest(request);
processor.submit(task);
processor.shutdown();
assertEquals("response from localhost: content on index", handler.findContent(url));
}
@Test
public void testThatActiveTaskCounterDecrementsWhnTaskCanceled() throws Exception {
final String url = "test://index";
final MockHttpTask task1 = new MockHttpTask();
task1.setRequest(new BasicHttpRequest(url));
task1.registerHandler(new CallbackHandler<BasicHttpRequest>() {
@Override
public void onProcessorResponse(BasicHttpRequest request) throws Exception {
sleep(2000);
}
@Override
public <U extends Provider<? extends BasicHttpRequest>> void beforeExecution(BasicHttpRequest request, U provider) {
}
});
final MockHttpTask task2 = new MockHttpTask();
task2.setRequest(new BasicHttpRequest(url));
task1.setFactory(factory);
task2.setFactory(factory);
final Future<BasicHttpRequest> task1future = processor.submit(task1);
processor.submit(task2);
sleep(500);//task2 should be already done here
task1future.cancel(true);
processor.shutdown();
assertEquals(0, processor.getTaskCount());
}
@Test
public void testThatHandledExactThatCountWhichWasSubmitted() throws Exception {
for (int i = 0; i < 1000; i++) {
final MockHttpTask task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(handler);
final BasicHttpRequest request = new BasicHttpRequest();
request.setUrl("test://index");
task.setRequest(request);
processor.submit(task);
}
processor.shutdown();
assertEquals(1000, handler.getHandledRequestsCount());
}
@Test
public void testThatProcessorWIllBeShutDownWellWhenAllTasksDone() throws Exception {
for (int i = 0; i < 1000; i++) {
final MockHttpTask task = new MockHttpTask();
task.setFactory(factory);
final BasicHttpRequest request = new BasicHttpRequest();
request.setUrl("test://index");
task.setRequest(request);
processor.submit(task);
}
processor.shutdown();
final long actual = processor.getTaskCount();
assertEquals(0, actual);
}
@Test
public void testThatProcessorWillBeShutDownWellWhenAllTasksDoneEvenExceptionInTaskOccurred() throws Exception {
for (int i = 0; i < 1000; i++) {
final MockHttpTask task = new MockHttpTask();
task.setFactory(factory);
final BasicHttpRequest request = new BasicHttpRequest();
request.setUrl("test://index");
task.setRequest(request);
processor.submit(task);
}
processor.shutdown();
final long actual = processor.getTaskCount();
assertEquals(0, actual);
}
@Test
public void testThatActiveTaskCountLimitWillBeProcessedWell() throws Exception {
processor = new GeneralProcessor<>(5);
MockHttpHandlerWithDelay handler = new MockHttpHandlerWithDelay(1000); // one second
long time = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
final MockHttpTask task = new MockHttpTask();
final BasicHttpRequest request = new BasicHttpRequest();
request.setUrl("test://index");
task.setRequest(request);
task.setFactory(factory);
task.registerHandler(handler);
processor.submit(task);
}
processor.shutdown();
time = System.currentTimeMillis() - time;
//10 request attempted to submit, but only 5 concurrent requests are allowed (each takes ~1 sec)
// so summary they will be taking about 2 sec or bit more
assertTrue(time >= 2000);
}
@Test
public void testThatRetrialSubmissionWillBePerformedWellInCaseOfActiveTaskLimitIsSet() throws Exception {
processor = new GeneralProcessor<>(5);
MockHttpHandlerWith3Retry handler = new MockHttpHandlerWith3Retry(processor, factory);
for (int i = 0; i < 1000; i++) {
BasicHttpRequest request = new BasicHttpRequest();
request.setUrl("test://index");
processor.submit(handler.buildTask(request));
}
processor.shutdown();
assertEquals(1000, handler.getDone());
}
@Test
public void testThatRequestPriorityWorksWell() throws Exception {
for (int i = 0; i < 1000; i++) {
final MockHttpTask task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(handler);
final BasicHttpRequest request = new BasicHttpRequest("test://index-" + i, 1);
task.setRequest(request);
processor.submit(task);
}
processor.shutdown();
assertEquals(1000, handler.getHandledRequestsCount());
}
@Test
public void testThatThereIsNoDeadLockOnConcurrentAccess() throws Exception {
final MockHttpHandlerWith3ConcurrentRetry handler = new MockHttpHandlerWith3ConcurrentRetry(processor, factory);
class Task implements Runnable {
private final int page;
public Task(int page) {
this.page = page;
}
@Override
public void run() {
try {
processor.submit(handler.buildTask(new BasicHttpRequest("test://index?" + page)));
sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ExecutorService service = Executors.newFixedThreadPool(10);
Collection<Callable<Object>> tasks = new ArrayList<>();
for (int i = 0; i < 500; i++) {
tasks.add(Executors.callable(new Task(i)));
}
service.invokeAll(tasks);
processor.shutdown();
service.shutdown();
assertTrue(handler.getDone() <= 500);
}
@Test
public void testThatDuplicatesAreAllowedToSubmitWhenNoLimit() throws Exception {
processor = new GeneralProcessor<>(1);
final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
MockHttpTask task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
BasicHttpRequest request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
long actual = processor.getTaskCount();
processor.shutdown();
assertEquals(2, actual);
}
@Test
public void testThatDuplicatesCounterDecrementsWhenTaskIsDone() throws Exception {
processor = new GeneralProcessor<>(1, 0, "duplicates test processor");
final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
MockHttpTask task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
BasicHttpRequest request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
sleep(2000);
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
long actual = processor.getTaskCount();
processor.shutdown();
assertEquals(1, actual);
}
@Test(expected = ProcessorTaskSubmissionException.class)
public void testThatDuplicatesAreNotAllowedToSubmitWhenZeroDuplicateLimitIsSet() throws Exception {
processor = new GeneralProcessor<>(1, 0, "duplicates test processor");
final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
MockHttpTask task;
BasicHttpRequest request;
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
processor.shutdown();
}
@Test(expected = ProcessorTaskSubmissionException.class)
public void testThatDuplicatesNoExceedSpecifiedLimit() throws Exception {
processor = new GeneralProcessor<>(1, 1, "duplicates test processor");
final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
MockHttpTask task;
BasicHttpRequest request;
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
task = new MockHttpTask();
task.setFactory(factory);
task.registerHandler(delayedHandler);
request = new BasicHttpRequest("test://index", 1);
task.setRequest(request);
processor.submit(task);
processor.shutdown();
}
}