Package org.switchyard.deployment.torquebox

Source Code of org.switchyard.deployment.torquebox.ServiceInvoker

/*
* Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, 
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.switchyard.deployment.torquebox;

import org.jruby.RubyHash;
import org.jruby.runtime.builtin.IRubyObject;
import org.switchyard.Exchange;
import org.switchyard.ExchangeHandler;
import org.switchyard.ExchangePattern;
import org.switchyard.ExchangeState;
import org.switchyard.HandlerException;
import org.switchyard.Message;
import org.switchyard.ServiceReference;
import org.switchyard.metadata.BaseExchangeContract;
import org.switchyard.metadata.ServiceOperation;
import org.switchyard.metadata.java.JavaService;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

/**
* Service Invoker.
*
* @author <a href="mailto:tom.fennelly@gmail.com">tom.fennelly@gmail.com</a>
*/
public class ServiceInvoker {

    private ServiceReference _serviceReference;

    /**
     * Public constructor.
     * @param serviceReference Reference to the target Service.
     */
    public ServiceInvoker(ServiceReference serviceReference) {
        if (serviceReference == null) {
            throw new IllegalArgumentException("null 'serviceReference' arg in method call.");
        }
        this._serviceReference = serviceReference;
    }

    /**
     * Send a message hash to the specified operation.
     * @param operationName The operation name.
     * @param rubyHash The message hash.
     * @return The response object if an IN_OUT operation was invoked, otherwise null.
     * @throws Throwable An exception occurred while invoking the target operation.
     */
    public Object send(String operationName, RubyHash rubyHash) throws Throwable {
        if (operationName == null) {
            throw new IllegalArgumentException("null 'operationName' argument.");
        }
        if (rubyHash == null) {
            throw new IllegalArgumentException("null 'rubyHash' argument.");
        }

        ServiceOperation operation = _serviceReference.getInterface().getOperation(operationName);

        if (operation == null) {
            throw new IllegalArgumentException("Unknown operation name '" + operationName + "' on Service '" + _serviceReference.getName() + "'.");
        }

        // Clone the RubyHash to convert it to a normal Map based graph.  This makes it possible
        // to more safely transport the payload data out of the ruby app via a SwitchYard Exchange...
        Map<String,Object> payload = deepClone(rubyHash);

        // Create the exchange contract...
        BaseExchangeContract exchangeContract = new BaseExchangeContract(operation);

        // Set the input type...
        exchangeContract.getInvokerInvocationMetaData().setInputType(JavaService.toMessageType(payload.getClass()));

        if (operation.getExchangePattern() == ExchangePattern.IN_OUT) {
            final BlockingQueue<Exchange> responseQueue = new ArrayBlockingQueue<Exchange>(1);

            AtomicReference<ExchangeHandler> responseExchangeHandler = new AtomicReference<ExchangeHandler>(new ExchangeHandler() {
                public void handleMessage(Exchange exchange) throws HandlerException {
                    responseQueue.offer(exchange);
                }

                public void handleFault(Exchange exchange) {
                    responseQueue.offer(exchange);
                }
            });

            Exchange exchange = _serviceReference.createExchange(exchangeContract, responseExchangeHandler.get());
            Message message = exchange.createMessage().setContent(payload);

            exchange.send(message);
            Exchange exchangeOut = null;
            try {
                exchangeOut = responseQueue.take();
            } catch (InterruptedException e) {
                throw new SwitchYardException("Operation '" + operationName + "' on Service '" + _serviceReference.getName() + "' interrupted.", e);
            }

            if (exchangeOut.getState() == ExchangeState.OK) {
                return exchangeOut.getMessage().getContent();
            } else {
                Object failureObj = exchangeOut.getMessage().getContent();
                if (failureObj instanceof Throwable) {
                    if (failureObj instanceof InvocationTargetException) {
                        throw ((Throwable)failureObj).getCause();
                    } else {
                        throw (Throwable) failureObj;
                    }
                } else {
                    throw new SwitchYardException("Service invocation failure.  Service '" + _serviceReference.getName() + "', operation '" + operationName + "'.  Non Throwable failure message payload: " + failureObj);
                }
            }
        } else {
            Exchange exchange = _serviceReference.createExchange(exchangeContract);
            Message message = exchange.createMessage().setContent(payload);

            exchange.send(message);
        }

        return null;
    }

    /**
     * Create a deep clone of the supplied {@link Map} instance.
     * @param sourceMap The Map to be cloned.
     * @return The cloned Map.
     */
    public static Map<String, Object> deepClone(Map<String, Object> sourceMap) {
        Map<String, Object> map = new LinkedHashMap<String, Object>();
        Set<Map.Entry<String,Object>> mapEntries = sourceMap.entrySet();

        for (Map.Entry<String,Object> entry : mapEntries) {
            map.put(entry.getKey(), deepClone(entry.getValue()));
        }

        return map;
    }

    private static Collection<Object> deepClone(Collection<Object> sourceCollection) {
        Collection<Object> collection = createCollection(sourceCollection.getClass());

        for (Object entry : sourceCollection) {
            collection.add(deepClone(entry));
        }

        return collection;
    }

    private static Object deepClone(Object object) {
        if (object instanceof Map) {
            return deepClone((Map)object);
        } else if (object instanceof Collection) {
            return deepClone((Collection)object);
        } else if (object instanceof IRubyObject) {
            return object.toString();
        }
        return object;
    }

    private static Collection<Object> createCollection(Class<? extends Collection> colClass) {
        if (Set.class.isAssignableFrom(colClass)) {
            return new LinkedHashSet();
        } else if (List.class.isAssignableFrom(colClass)) {
            return new ArrayList();
        } else {
            return new ArrayList();
        }
    }
}
TOP

Related Classes of org.switchyard.deployment.torquebox.ServiceInvoker

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.