/*******************************************************************************
* Copyright 2013 butor.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.butor.mule.component;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.lang.reflect.Type;
import java.util.Map;
import com.google.common.base.Strings;
import org.butor.json.CommonRequestArgs;
import org.butor.json.JsonHelper;
import org.butor.json.JsonResponseHeader;
import org.butor.json.JsonResponseMessage;
import org.butor.json.JsonServiceRequest;
import org.butor.json.service.BinResponseHandler;
import org.butor.json.service.Context;
import org.butor.json.service.ResponseHandler;
import org.butor.utils.Message;
import org.mule.api.MuleMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class ButorJsonServiceExecutorComponent extends ButorServiceExecutorComponent {
protected Logger logger = LoggerFactory.getLogger(getClass());
private JsonHelper jsh = new JsonHelper();
@Override
public Runnable createBinWorker(final MuleMessage mmsg, final PipedOutputStream pos,
final JsonServiceRequest req, final String logReqInfo) {
final ResponseHandler<byte[]> streamer = new BinResponseHandler() {
boolean contentTypeSet = false;
@Override
public OutputStream getOutputStream() {
return pos;
}
@Override
public void setContentType(String contentType, Map<String, String> headers) {
if (Strings.isNullOrEmpty(contentType)) {
return;
}
if (contentTypeSet) {
return;
}
contentTypeSet = true;
this.addRow("___content_type___\n".getBytes());
this.addRow(("Content-Type:" +contentType +"\n").getBytes());
if (headers != null) {
for (String h : headers.keySet()) {
this.addRow((h +":" +headers.get(h) +"\n").getBytes());
}
}
this.addRow("\n".getBytes());
}
@Override
public void end() {
//ok
}
@Override
public boolean addRow(byte[] row_) {
if (row_ == null) {
return false;
}
try {
pos.write(row_);
pos.flush();
return true;
} catch (IOException e) {
logger.warn("Failed while writing a response row!", e);
}
return false;
}
@Override
public boolean addMessage(Message message_) {
logger.error("Unsupported! return Message on binary reponse handler");
return false;
}
@Override
public Type getResponseType() {
return byte[].class;
}
};
final Context ctx = new Context() {
@Override
public ResponseHandler getResponseHandler() {
return streamer;
}
@Override
public CommonRequestArgs getRequest() {
return req;
}
};
Runnable worker = new Runnable() {
@Override
public void run() {
long time = System.currentTimeMillis();
boolean success = false;
try {
invoke(ctx);
pos.close();
success = true;
} catch (Exception e) {
logger.warn("Failed while invoking service!", e);
} finally {
long elapsed = System.currentTimeMillis() - time;
Object[] args = new Object[] {logReqInfo, Boolean.valueOf(success), Long.valueOf(elapsed) };
logger.info("STATS: {}, success: {}, elapsed: {} ms", args);
}
}
};
return worker;
}
@Override
public Runnable createJsonWorker(final MuleMessage mmsg, final PipedOutputStream pos,
final JsonServiceRequest req, final String logReqInfo) {
final boolean logResponse = !this.servicesToNotLogResponses.contains(req.getNamespace() +";" +req.getService());
mmsg.setInvocationProperty("Content-Type", "text/json");
final ResponseHandler<Object> streamer = new ResponseHandler<Object>() {
@Override
public void end() {
//ok
}
@Override
public boolean addRow(Object row_) {
if (row_ == null) {
return false;
}
try {
String chunk = jsh.serialize(row_);
if (logResponse) {
logger.info("RESPONSE: {}, row: {}", logReqInfo, chunk);
}
pos.write(chunk.getBytes());
pos.write(0);// null Delimiter "//---".getBytes()
pos.flush();
return true;
} catch (IOException e) {
logger.warn("Failed while writing a response row!", e);
}
return false;
}
@Override
public boolean addMessage(Message message_) {
try {
JsonResponseMessage msg = new JsonResponseMessage(req.getReqId());
msg.setMessage(message_);
String chunk = jsh.serialize(msg);
if (logResponse) {
logger.info("RESPONSE: {}, message: {}", logReqInfo, chunk);
}
pos.write(chunk.getBytes());
pos.write(0);// null Delimiter
pos.flush();
return true;
} catch (IOException e) {
logger.warn("Failed while writing a response message!", e);
}
return false;
}
@Override
public Type getResponseType() {
return Object.class;
}
};
final Context ctx = new Context() {
@Override
public ResponseHandler<Object> getResponseHandler() {
return streamer;
}
@Override
public CommonRequestArgs getRequest() {
return req;
}
};
Runnable worker = new Runnable() {
@Override
public void run() {
long time = System.currentTimeMillis();
boolean success = false;
try {
// delimiter to start response
pos.write(0);// null Delimiter
JsonResponseHeader header = new JsonResponseHeader(req.getReqId());
pos.write(jsh.serialize(header).getBytes());
pos.write(0);// null Delimiter
pos.flush();
invoke(ctx);
pos.close();
success = true;
} catch (Exception e) {
logger.warn("Failed while invoking service!", e);
} finally {
long elapsed = System.currentTimeMillis() - time;
Object[] args = new Object[] {logReqInfo, Boolean.valueOf(success), Long.valueOf(elapsed) };
logger.info("STATS: {}, success: {}, elapsed: {} ms", args);
}
}
};
return worker;
}
}