package org.mom4j.xcp.impl;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.mom4j.xcp.XcpConfig;
import org.mom4j.xcp.util.BlockingQueue;
public class XcpListener extends Thread {
/** logger */
private static Log log = LogFactory.getLog(XcpListener.class);
private XcpConfig config;
private BlockingQueue queue;
private ServerSocket server;
private XcpWorker[] workers;
private WorkerThreads workerThreads;
private boolean stopped;
private boolean running;
XcpListener(XcpConfig cfg) {
super("XcpListener");
this.config = cfg;
this.queue = new BlockingQueue();
this.server = null;
this.stopped = false;
this.running = false;
}
public void listen() {
this.workerThreads = new WorkerThreads("xcp-workers");
try {
this.server = new ServerSocket(this.config.getPort());
//On Linux, e.g. ServerSocket.accept doesn't return, even
//if the server-socket is closed. Setting an SoTimeout helps!
this.server.setSoTimeout(5000);
} catch(IOException ioex) {
throw new IllegalStateException(ioex.getMessage());
}
this.workers = new XcpWorker[this.config.getWorkersCount()];
for(int i = 0; i < this.workers.length; i++) {
this.workers[i] = new XcpWorker(this.queue,
i + 1,
this.config,
this.workerThreads);
}
this.start();
}
public boolean isRunning() {
return this.running;
}
public void shutdown() {
this.stopped = true;
try {
this.server.close();
} catch(Exception ex) {
log.error("server.close() failed", ex);
}
while(this.running) {
try {
Thread.sleep(500);
} catch(InterruptedException ex) {
break;
}
}
}
public void run() {
this.running = true;
try {
Socket s;
for(int i = 0; i < this.workers.length; i++) {
this.workers[i].start();
}
while(true) {
if(this.stopped) {
break;
}
try {
s = this.server.accept();
} catch(IOException ioex) {
continue;
}
if(s != null) {
queue.add(s);
}
}
} finally {
this.running = false;
}
}
class WorkerThreads extends ThreadGroup {
public WorkerThreads(String name) {
super(name);
}
public void uncaughtException(Thread t, Throwable th) {
if(XcpListener.this.stopped) {
return;
}
if(th instanceof ThreadDeath) {
return;
}
if(!(t instanceof XcpWorker)) {
return;
}
log.error(t.getName() +
" is dying due to uncaught" +
" exception " + th.getClass().getName());
log.error("try to spawn a new WorkerThread ...");
int i;
for(i = 0; i < XcpListener.this.workers.length; i++) {
if(System.identityHashCode(XcpListener.this.workers[i]) ==
System.identityHashCode(t))
{
break;
}
}
if(i == XcpListener.this.workers.length) {
log.warn("Confused. Thread is not a current XcpWorker");
log.warn("ignore.");
return;
}
XcpWorker w = new XcpWorker(XcpListener.this.queue,
i + 1,
XcpListener.this.config,
this);
XcpListener.this.workers[i] = w;
w.start();
}
}
}