Package org.butor.mule.component

Source Code of org.butor.mule.component.ButorJsonServiceExecutorComponent

/*******************************************************************************
* Copyright 2013 butor.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/

package org.butor.mule.component;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.lang.reflect.Type;
import java.util.Map;

import com.google.common.base.Strings;

import org.butor.json.CommonRequestArgs;
import org.butor.json.JsonHelper;
import org.butor.json.JsonResponseHeader;
import org.butor.json.JsonResponseMessage;
import org.butor.json.JsonServiceRequest;
import org.butor.json.service.BinResponseHandler;
import org.butor.json.service.Context;
import org.butor.json.service.ResponseHandler;
import org.butor.utils.Message;
import org.mule.api.MuleMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
*/
public class ButorJsonServiceExecutorComponent extends ButorServiceExecutorComponent {
  protected Logger logger = LoggerFactory.getLogger(getClass());
 
  private JsonHelper jsh = new JsonHelper();
 
 

  @Override
  public Runnable createBinWorker(final MuleMessage mmsg, final PipedOutputStream pos,
      final JsonServiceRequest req, final String logReqInfo) {

    final ResponseHandler<byte[]> streamer = new BinResponseHandler() {
      boolean contentTypeSet = false;
      @Override
      public OutputStream getOutputStream() {
        return pos;
      }
      @Override
      public void setContentType(String contentType, Map<String, String> headers) {
        if (Strings.isNullOrEmpty(contentType)) {
          return;
        }
        if (contentTypeSet) {
          return;
        }
        contentTypeSet = true;
        this.addRow("___content_type___\n".getBytes());
        this.addRow(("Content-Type:" +contentType +"\n").getBytes());
        if (headers != null) {
          for (String h : headers.keySet()) {
            this.addRow((h +":" +headers.get(h) +"\n").getBytes());
          }
        }
        this.addRow("\n".getBytes());
      }
      @Override
      public void end() {
        //ok
      }
      @Override
      public boolean addRow(byte[] row_) {
        if (row_ == null) {
          return false;
        }
        try {
          pos.write(row_);
          pos.flush();
          return true;
        } catch (IOException e) {
          logger.warn("Failed while writing a response row!", e);
        }
        return false;
      }
      @Override
      public boolean addMessage(Message message_) {
        logger.error("Unsupported! return Message on binary reponse handler");
        return false;
      }
      @Override
      public Type getResponseType() {
        return byte[].class;
      }
    };
    final Context ctx = new Context() {
      @Override
      public ResponseHandler getResponseHandler() {
        return streamer;
      }
      @Override
      public CommonRequestArgs getRequest() {
        return req;
      }
    };
    Runnable worker = new Runnable() {
      @Override
      public void run() {
        long time = System.currentTimeMillis();
        boolean success = false;
        try {
          invoke(ctx);
          pos.close();
          success = true;
        } catch (Exception e) {
          logger.warn("Failed while invoking service!", e);
        } finally {
          long elapsed = System.currentTimeMillis() - time;
          Object[] args = new Object[] {logReqInfo, Boolean.valueOf(success), Long.valueOf(elapsed) };
          logger.info("STATS: {}, success: {}, elapsed: {} ms", args);
        }
      }
    };

    return worker;
  }

  @Override
  public Runnable createJsonWorker(final MuleMessage mmsg, final PipedOutputStream pos,
      final JsonServiceRequest req, final String logReqInfo) {
    final boolean logResponse = !this.servicesToNotLogResponses.contains(req.getNamespace() +";" +req.getService());

    mmsg.setInvocationProperty("Content-Type", "text/json");

    final ResponseHandler<Object> streamer = new ResponseHandler<Object>() {
      @Override
      public void end() {
        //ok
      }
      @Override
      public boolean addRow(Object row_) {
        if (row_ == null) {
          return false;
        }
        try {
          String chunk = jsh.serialize(row_);
          if (logResponse) {
            logger.info("RESPONSE: {}, row: {}", logReqInfo, chunk);
          }
          pos.write(chunk.getBytes());
          pos.write(0);// null Delimiter "//---".getBytes()
          pos.flush();
          return true;
        } catch (IOException e) {
          logger.warn("Failed while writing a response row!", e);
        }
        return false;
      }
      @Override
      public boolean addMessage(Message message_) {
        try {
          JsonResponseMessage msg = new JsonResponseMessage(req.getReqId());
          msg.setMessage(message_);
          String chunk = jsh.serialize(msg);
          if (logResponse) {
            logger.info("RESPONSE: {}, message: {}", logReqInfo, chunk);
          }
          pos.write(chunk.getBytes());
          pos.write(0);// null Delimiter
          pos.flush();
          return true;
        } catch (IOException e) {
          logger.warn("Failed while writing a response message!", e);
        }
        return false;
      }
      @Override
      public Type getResponseType() {
        return Object.class;
      }
    };
    final Context ctx = new Context() {
      @Override
      public ResponseHandler<Object> getResponseHandler() {
        return streamer;
      }
      @Override
      public CommonRequestArgs getRequest() {
        return req;
      }
    };
    Runnable worker = new Runnable() {
      @Override
      public void run() {
        long time = System.currentTimeMillis();
        boolean success = false;
        try {
          // delimiter to start response
          pos.write(0);// null Delimiter

          JsonResponseHeader header = new JsonResponseHeader(req.getReqId());
          pos.write(jsh.serialize(header).getBytes());
          pos.write(0);// null Delimiter
          pos.flush();

          invoke(ctx);
          pos.close();
          success = true;
        } catch (Exception e) {
          logger.warn("Failed while invoking service!", e);
        } finally {
          long elapsed = System.currentTimeMillis() - time;
          Object[] args = new Object[] {logReqInfo, Boolean.valueOf(success), Long.valueOf(elapsed) };
          logger.info("STATS: {}, success: {}, elapsed: {} ms", args);
        }
      }
    };

    return worker;
  }
}
TOP

Related Classes of org.butor.mule.component.ButorJsonServiceExecutorComponent

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.