Package ru.decipher

Source Code of ru.decipher.Task

package ru.decipher;

import org.junit.Before;
import org.junit.Test;
import ru.decipher.exception.ProcessorTaskSubmissionException;
import ru.decipher.extraction.CallbackHandler;
import ru.decipher.extraction.Provider;
import ru.decipher.extraction.http.impl.BasicHttpRequest;
import ru.decipher.extraction.impl.GeneralProcessor;
import ru.decipher.mock.http.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static java.lang.Thread.sleep;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;

/**
* User: Alexander Paderin (apocarteres@gmail.com)
* Date: 10/23/13
* Time: 8:53 AM
*/
public class GeneralProcessorTest {

    private GeneralProcessor<BasicHttpRequest> processor;
    private MockHttpProviderWithProxySupportFactory factory;
    private BasicHttpRequest request;
    private MockHttpHandlerWithSuccessResult handler;
    private MockHttpTask task;

    @Before
    public void setUp() throws Exception {
        processor = new GeneralProcessor<>(10);
        factory = new MockHttpProviderWithProxySupportFactory();
        request = new BasicHttpRequest();
        handler = new MockHttpHandlerWithSuccessResult();
        task = new MockHttpTask();
        task.registerHandler(handler);
        task.setFactory(factory);
    }

    @Test
    public void testThatTaskWillBeSubmittedWell() throws Exception {
        final String url = "test://index";
        request.setUrl(url);
        task.setRequest(request);
        task.registerHandler(new CallbackHandler<BasicHttpRequest>() {
            @Override
            public void onProcessorResponse(BasicHttpRequest request) throws Exception {
                sleep(1000);
            }

            @Override
            public <U extends Provider<? extends BasicHttpRequest>> void beforeExecution(BasicHttpRequest request, U provider) {

            }
        });
        processor.submit(task);
        assertEquals(1, processor.getTaskCount());
    }

    @Test
    public void testThatTaskWillBeReleasedFromQueueWhenReady() throws Exception {
        final String url = "test://index";
        request.setUrl(url);
        task.setRequest(request);
        processor.submit(task);
        processor.shutdown();
        assertEquals(0, processor.getTaskCount());
    }

    @Test
    public void testThatTaskWillBeReleasedFromQueueWhenReadyIfPreviouslySubmittedTaskAreStillActive() throws Exception {
        final String url = "test://index";
        final MockHttpTask task1 = new MockHttpTask();
        task1.setRequest(new BasicHttpRequest(url));
        task1.registerHandler(new CallbackHandler<BasicHttpRequest>() {
            @Override
            public void onProcessorResponse(BasicHttpRequest request) throws Exception {
                sleep(2000);
            }

            @Override
            public <U extends Provider<? extends BasicHttpRequest>> void beforeExecution(BasicHttpRequest request, U provider) {

            }
        });
        final MockHttpTask task2 = new MockHttpTask();
        task2.setRequest(new BasicHttpRequest(url));
        task1.setFactory(factory);
        task2.setFactory(factory);
        processor.submit(task1);
        processor.submit(task2);
        sleep(500);//task2 should be already done here
        final long taskCount = processor.getTaskCount();
        processor.shutdown();
        assertEquals(1, taskCount);
    }

    @Test
    public void testThatCallbackInvokesWhenTaskExecuted() throws Exception {
        final String url = "test://index";
        request.setUrl(url);
        task.setRequest(request);
        processor.submit(task);
        processor.shutdown();
        assertEquals("response from localhost: content on index", handler.findContent(url));
    }

    @Test
    public void testThatActiveTaskCounterDecrementsWhnTaskCanceled() throws Exception {
        final String url = "test://index";
        final MockHttpTask task1 = new MockHttpTask();
        task1.setRequest(new BasicHttpRequest(url));
        task1.registerHandler(new CallbackHandler<BasicHttpRequest>() {
            @Override
            public void onProcessorResponse(BasicHttpRequest request) throws Exception {
                sleep(2000);
            }

            @Override
            public <U extends Provider<? extends BasicHttpRequest>> void beforeExecution(BasicHttpRequest request, U provider) {

            }
        });
        final MockHttpTask task2 = new MockHttpTask();
        task2.setRequest(new BasicHttpRequest(url));
        task1.setFactory(factory);
        task2.setFactory(factory);
        final Future<BasicHttpRequest> task1future = processor.submit(task1);
        processor.submit(task2);
        sleep(500);//task2 should be already done here
        task1future.cancel(true);
        processor.shutdown();
        assertEquals(0, processor.getTaskCount());
    }

    @Test
    public void testThatHandledExactThatCountWhichWasSubmitted() throws Exception {
        for (int i = 0; i < 1000; i++) {
            final MockHttpTask task = new MockHttpTask();
            task.setFactory(factory);
            task.registerHandler(handler);
            final BasicHttpRequest request = new BasicHttpRequest();
            request.setUrl("test://index");
            task.setRequest(request);
            processor.submit(task);
        }
        processor.shutdown();
        assertEquals(1000, handler.getHandledRequestsCount());
    }

    @Test
    public void testThatProcessorWIllBeShutDownWellWhenAllTasksDone() throws Exception {
        for (int i = 0; i < 1000; i++) {
            final MockHttpTask task = new MockHttpTask();
            task.setFactory(factory);
            final BasicHttpRequest request = new BasicHttpRequest();
            request.setUrl("test://index");
            task.setRequest(request);
            processor.submit(task);
        }
        processor.shutdown();
        final long actual = processor.getTaskCount();
        assertEquals(0, actual);
    }

    @Test
    public void testThatProcessorWillBeShutDownWellWhenAllTasksDoneEvenExceptionInTaskOccurred() throws Exception {
        for (int i = 0; i < 1000; i++) {
            final MockHttpTask task = new MockHttpTask();
            task.setFactory(factory);
            final BasicHttpRequest request = new BasicHttpRequest();
            request.setUrl("test://index");
            task.setRequest(request);
            processor.submit(task);
        }
        processor.shutdown();
        final long actual = processor.getTaskCount();
        assertEquals(0, actual);
    }

    @Test
    public void testThatActiveTaskCountLimitWillBeProcessedWell() throws Exception {
        processor = new GeneralProcessor<>(5);
        MockHttpHandlerWithDelay handler = new MockHttpHandlerWithDelay(1000); // one second

        long time = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            final MockHttpTask task = new MockHttpTask();
            final BasicHttpRequest request = new BasicHttpRequest();
            request.setUrl("test://index");
            task.setRequest(request);
            task.setFactory(factory);
            task.registerHandler(handler);
            processor.submit(task);
        }
        processor.shutdown();
        time = System.currentTimeMillis() - time;
        //10 request attempted to submit, but only 5 concurrent requests are allowed (each takes ~1 sec)
        // so summary they will be taking about 2 sec or bit more
        assertTrue(time >= 2000);
    }

    @Test
    public void testThatRetrialSubmissionWillBePerformedWellInCaseOfActiveTaskLimitIsSet() throws Exception {
        processor = new GeneralProcessor<>(5);
        MockHttpHandlerWith3Retry handler = new MockHttpHandlerWith3Retry(processor, factory);

        for (int i = 0; i < 1000; i++) {
            BasicHttpRequest request = new BasicHttpRequest();
            request.setUrl("test://index");
            processor.submit(handler.buildTask(request));
        }
        processor.shutdown();
        assertEquals(1000, handler.getDone());
    }

    @Test
    public void testThatRequestPriorityWorksWell() throws Exception {
        for (int i = 0; i < 1000; i++) {
            final MockHttpTask task = new MockHttpTask();
            task.setFactory(factory);
            task.registerHandler(handler);
            final BasicHttpRequest request = new BasicHttpRequest("test://index-" + i, 1);
            task.setRequest(request);
            processor.submit(task);
        }
        processor.shutdown();
        assertEquals(1000, handler.getHandledRequestsCount());
    }

    @Test
    public void testThatThereIsNoDeadLockOnConcurrentAccess() throws Exception {
        final MockHttpHandlerWith3ConcurrentRetry handler = new MockHttpHandlerWith3ConcurrentRetry(processor, factory);
        class Task implements Runnable {
            private final int page;

            public Task(int page) {
                this.page = page;
            }

            @Override
            public void run() {
                try {
                    processor.submit(handler.buildTask(new BasicHttpRequest("test://index?" + page)));
                    sleep(10);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        ExecutorService service = Executors.newFixedThreadPool(10);
        Collection<Callable<Object>> tasks = new ArrayList<>();
        for (int i = 0; i < 500; i++) {
            tasks.add(Executors.callable(new Task(i)));
        }
        service.invokeAll(tasks);
        processor.shutdown();
        service.shutdown();
        assertTrue(handler.getDone() <= 500);
    }

    @Test
    public void testThatDuplicatesAreAllowedToSubmitWhenNoLimit() throws Exception {
        processor = new GeneralProcessor<>(1);
        final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
        MockHttpTask task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        BasicHttpRequest request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);
        long actual = processor.getTaskCount();
        processor.shutdown();
        assertEquals(2, actual);
    }

    @Test
    public void testThatDuplicatesCounterDecrementsWhenTaskIsDone() throws Exception {
        processor = new GeneralProcessor<>(1, 0, "duplicates test processor");
        final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
        MockHttpTask task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        BasicHttpRequest request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);

        sleep(2000);

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);
        long actual = processor.getTaskCount();
        processor.shutdown();
        assertEquals(1, actual);
    }

    @Test(expected = ProcessorTaskSubmissionException.class)
    public void testThatDuplicatesAreNotAllowedToSubmitWhenZeroDuplicateLimitIsSet() throws Exception {
        processor = new GeneralProcessor<>(1, 0, "duplicates test processor");
        final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
        MockHttpTask task;
        BasicHttpRequest request;

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);
        processor.shutdown();
    }

    @Test(expected = ProcessorTaskSubmissionException.class)
    public void testThatDuplicatesNoExceedSpecifiedLimit() throws Exception {
        processor = new GeneralProcessor<>(1, 1, "duplicates test processor");
        final MockHttpHandlerWithDelay delayedHandler = new MockHttpHandlerWithDelay(1000);
        MockHttpTask task;
        BasicHttpRequest request;

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);

        task = new MockHttpTask();
        task.setFactory(factory);
        task.registerHandler(delayedHandler);
        request = new BasicHttpRequest("test://index", 1);
        task.setRequest(request);
        processor.submit(task);
        processor.shutdown();
    }
}
TOP

Related Classes of ru.decipher.Task

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.