Package org.mom4j.xcp.impl

Source Code of org.mom4j.xcp.impl.XcpListener$WorkerThreads

package org.mom4j.xcp.impl;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

import org.mom4j.xcp.XcpConfig;
import org.mom4j.xcp.util.BlockingQueue;


public class XcpListener extends Thread {
   
    /** logger */
    private static Log log = LogFactory.getLog(XcpListener.class);   

   
    private XcpConfig     config;
    private BlockingQueue queue;
    private ServerSocket  server;
    private XcpWorker[]   workers;
    private WorkerThreads workerThreads;
    private boolean       stopped;
    private boolean       running;
   

    XcpListener(XcpConfig cfg) {
        super("XcpListener");
   
        this.config  = cfg;
        this.queue   = new BlockingQueue();
        this.server  = null;
        this.stopped = false;
        this.running = false;
    }
   
   
    public void listen() {
        this.workerThreads = new WorkerThreads("xcp-workers");
        try {
            this.server = new ServerSocket(this.config.getPort());
            //On Linux, e.g. ServerSocket.accept doesn't return, even
            //if the server-socket is closed. Setting an SoTimeout helps!
            this.server.setSoTimeout(5000);
        } catch(IOException ioex) {
            throw new IllegalStateException(ioex.getMessage());
        }
        this.workers = new XcpWorker[this.config.getWorkersCount()];
        for(int i = 0; i < this.workers.length; i++) {
            this.workers[i] = new XcpWorker(this.queue,
                                            i + 1,
                                            this.config,
                                            this.workerThreads);
        }
        this.start();
    }
   
   
    public boolean isRunning() {
        return this.running;
    }
   
   
    public void shutdown() {
        this.stopped = true;
        try {
            this.server.close();
        } catch(Exception ex) {
            log.error("server.close() failed", ex);
        }
        while(this.running) {
            try {
                Thread.sleep(500);
            } catch(InterruptedException ex) {
                break;
            }
        }
    }
   
   
    public void run() {
        this.running = true;
        try {
            Socket s;
            for(int i = 0; i < this.workers.length; i++) {
                this.workers[i].start();
            }
            while(true) {
                if(this.stopped) {
                    break;
                }
                try {
                    s = this.server.accept();
                } catch(IOException ioex) {
                    continue;
                }
                if(s != null) {
                    queue.add(s);
                }
            }
        } finally {
            this.running = false;
        }
    }
   
   
    class WorkerThreads extends ThreadGroup {
       
        public WorkerThreads(String name) {
            super(name);
        }
       
        public void uncaughtException(Thread t, Throwable th) {
            if(XcpListener.this.stopped) {
                return;
            }
            if(th instanceof ThreadDeath) {
                return;
            }
            if(!(t instanceof XcpWorker)) {
                return;
            }
           
            log.error(t.getName() +
                      " is dying due to uncaught" +
                      " exception " + th.getClass().getName());
            log.error("try to spawn a new WorkerThread ...");
           
            int i;
            for(i = 0; i < XcpListener.this.workers.length; i++) {
                if(System.identityHashCode(XcpListener.this.workers[i]) ==
                   System.identityHashCode(t))
                {
                    break;
                }
            }
            if(i == XcpListener.this.workers.length) {
                log.warn("Confused. Thread is not a current XcpWorker");
                log.warn("ignore.");
                return;
            }
            XcpWorker w = new XcpWorker(XcpListener.this.queue,
                                        i + 1,
                                        XcpListener.this.config,
                                        this);
            XcpListener.this.workers[i] = w;
            w.start();
        }
    }
   
}
TOP

Related Classes of org.mom4j.xcp.impl.XcpListener$WorkerThreads

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.