Package org.fcrepo.server.journal

Source Code of org.fcrepo.server.journal.JournalConsumerThread

/* The contents of this file are subject to the license and copyright terms
* detailed in the license directory at the root of the source tree (also
* available online at http://fedora-commons.org/license/).
*/
package org.fcrepo.server.journal;

import java.util.Map;

import org.fcrepo.server.journal.entry.ConsumerJournalEntry;
import org.fcrepo.server.journal.helpers.JournalHelper;
import org.fcrepo.server.journal.recoverylog.JournalRecoveryLog;
import org.fcrepo.server.management.ManagementDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Process the journal entries as a separate Thread, while the JournalConsumer
* is blocking all calls from outside.
*
* @author Jim Blake
*/
public class JournalConsumerThread
        extends Thread {

    private static final Logger logger =
            LoggerFactory.getLogger(JournalConsumerThread.class);

    private final ServerInterface server;

    private final JournalReader reader;

    private final JournalRecoveryLog recoveryLog;

    private ManagementDelegate delegate;

    private boolean shutdown = false;

    /**
     * Store references to all of this stuff, but we can't start work without a
     * ManagementDelegate is provided, and we won't get that until the
     * post-initialization stage.
     */
    public JournalConsumerThread(Map<String, String> parameters,
                                 String role,
                                 ServerInterface server,
                                 JournalReader reader,
                                 JournalRecoveryLog recoveryLog) {
        this.server = server;
        this.reader = reader;
        this.recoveryLog = recoveryLog;
    }

    /**
     * Now that we have a ManagementDelegate to perform the operations, we can
     * start working.
     */
    public void setManagementDelegate(ManagementDelegate delegate) {
        this.delegate = delegate;
        start();
    }

    /**
     * Wait until the server completes its initialization, then process journal
     * entries until the reader says there are no more, or until a shutdown is
     * requested.
     */
    @Override
    public void run() {
        try {
            waitUntilServerIsInitialized();

            recoveryLog.log("Start recovery.");

            while (true) {
                if (shutdown) {
                    break;
                }
                ConsumerJournalEntry cje = reader.readJournalEntry();
                if (cje == null) {
                    break;
                }
                cje.invokeMethod(delegate, recoveryLog);
                cje.close();
            }
            reader.shutdown();

            recoveryLog.log("Recovery complete.");
        } catch (Throwable e) {
            /*
             * It makes sense to catch Exception here, because any uncaught
             * exception will not be reported - there is no console to print the
             * stack trace! It might not be appropriate to catch Throwable, but
             * it's the only way we can know about missing class files and such.
             * Of course, if we catch an OutOfMemoryError or a
             * VirtualMachineError, all bets are off.
             */
            logger.error("Error during Journal recovery", e);
            String stackTrace = JournalHelper.captureStackTrace(e);
            recoveryLog.log("PROBLEM: " + stackTrace);
            recoveryLog.log("Recovery terminated prematurely.");
        } finally {
            recoveryLog.shutdown();
        }
    }

    /**
     * Wait for the server to initialize. If we wait too long, give up and shut
     * down the thread.
     */
    private void waitUntilServerIsInitialized() {
        int i = 0;
        for (; i < 60; i++) {
            if (server.hasInitialized() || shutdown) {
                return;
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.warn("Thread was interrupted");
            }
        }
        logger.error("Can't recover from the Journal - "
                + "the server hasn't initialized after " + i + " seconds.");
        shutdown = true;
    }

    /**
     * Set the flag saying that it's time to quit.
     */
    public void shutdown() {
        recoveryLog.log("Shutdown requested by server");
        shutdown = true;
    }
}
TOP

Related Classes of org.fcrepo.server.journal.JournalConsumerThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.