Package com.trendmicro.mist.session

Source Code of com.trendmicro.mist.session.ProducerSession

package com.trendmicro.mist.session;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import javax.jms.Message;

import com.trendmicro.mist.Client;
import com.trendmicro.mist.Daemon;
import com.trendmicro.mist.ExchangeMetric;
import com.trendmicro.mist.MistException;
import com.trendmicro.mist.mfr.RouteFarm;
import com.trendmicro.mist.proto.GateTalk;
import com.trendmicro.mist.proto.MistMessage;
import com.trendmicro.mist.proto.MistMessage.KeyValuePair;
import com.trendmicro.mist.util.Exchange;
import com.trendmicro.mist.util.MessageFilter;
import com.trendmicro.mist.util.Packet;
import com.trendmicro.spn.common.util.Utils;

public class ProducerSession extends Session {
    /**
     * After the producer receives a message from socket, it transforms the raw
     * content by create a MessagePrepared object
     */
    public static class MessagePrepared {
        /**
         * The message's destination exchange.
         */
        public Exchange dest;

        /**
         * The TTL of the message. It will remain the default TTL of JMS if not
         * specified in message
         */
        public long ttl = Message.DEFAULT_TIME_TO_LIVE;

        /**
         * The message body
         */
        public byte[] msg;

        /**
         * The JMS property map, if not in the mist message then it will be null
         * Note that if ttl in MessageBlock is set, then a property named
         * "MIST_TTL" will be added to carry over the TTL value
         */
        public HashMap<String, String> props = null;

        /**
         * Where a raw message gets prepared. A MistException will be thrown if
         * the incoming message cannot be delivered
         *
         * @param raw
         *            The incoming raw message from socket
         * @throws MistException
         *             It contains the reason why the message cannot be
         *             delivered
         */
        public MessagePrepared(byte[] raw) throws MistException {
            // Try to parse the raw message as MistMessage.MessageBlock
            MistMessage.MessageBlock mBlock = null;
            try {
                MistMessage.MessageBlock.Builder mblockBuilder = MistMessage.MessageBlock.newBuilder();
                mblockBuilder.mergeFrom(raw);
                mBlock = mblockBuilder.build();
            }
            catch(Exception e) {
                throw new MistException(MistException.UNABLE_TO_PARSE_MIST_MESSAGE);
            }

            // Unpack the attributes
            dest = new Exchange(mBlock.getId());
            if(mBlock.hasTtl()) {
                props = new HashMap<String, String>();
                ttl = mBlock.getTtl();
                props.put(Session.MIST_MESSAGE_TTL, new Long(ttl).toString());
            }

            msg = mBlock.getMessage().toByteArray();
            if(mBlock.getPropertiesCount() > 0) {
                if(props == null)
                    props = new HashMap<String, String>();
                for(KeyValuePair pair : mBlock.getPropertiesList())
                    props.put(pair.getKey(), pair.getValue());
            }

            for(MessageFilter filter : Daemon.messageFilters) {
                filter.preSend(this);
            }

            if(msg.length > Daemon.MAX_TRANSMIT_MESSAGE_SIZE)
                throw new MistException(MistException.sizeTooLarge(msg.length));
        }
    }

    private static final long ROUTE_CACHE_TTL_MILLIS = 2000;
    private long lastRouteUpdate;
    private boolean retryDeliver = true;

    private HashMap<Exchange, List<Exchange>> routeCacheMap = new HashMap<Exchange, List<Exchange>>();

    /**
     * Update the routing information from RouteFarm
     */
    private void updateRoute(Exchange ex) {
        // If the current cache has not expired, skip the update
        if((System.currentTimeMillis() - lastRouteUpdate) < ROUTE_CACHE_TTL_MILLIS)
            return;

        // Get a copied list from the RouteFarm
        List<Exchange> destList = RouteFarm.getInstance().getDestList(ex.getName());
       
        if(destList != null) {
            routeCacheMap.put(ex, destList);
        }
        else
            routeCacheMap.remove(ex);

        lastRouteUpdate = System.currentTimeMillis();
    }

    public ProducerSession(int sessId, GateTalk.Session sessConfig) throws MistException {
        super(sessId, sessConfig);
        // TODO Auto-generated constructor stub
    }

    /**
     * The helper function to ack the client with status and fail reason
     *
     * @param packet
     *            The Packet object used to communicate with client
     * @param success
     *            If the attempt to deliver the message is success or not
     * @param reason
     *            If success, this parameter is ignored; if fail, this parameter
     *            is set into the Exception field of the response message
     */
    private void ackClient(Packet packet, boolean success, String reason) {
        final byte[] successAck = GateTalk.Response.newBuilder().setSuccess(true).build().toByteArray();
        if(success)
            packet.setPayload(successAck);
        else
            packet.setPayload(GateTalk.Response.newBuilder().setSuccess(false).setException(reason).build().toByteArray());
        try {
            packet.write(socketOutput);
        }
        catch(Exception e) {
            logger.error(e.getMessage());
        }
    }

    private Client mountAndAddProducer(Exchange exchange) throws MistException {
        GateTalk.Channel.Builder ch_builder = GateTalk.Channel.newBuilder();
        if(exchange.isQueue()) {
            ch_builder.setType(GateTalk.Channel.Type.QUEUE);
            ch_builder.setName(exchange.getName());
        }
        else {
            ch_builder.setType(GateTalk.Channel.Type.TOPIC);
            ch_builder.setName(exchange.getName());
        }

        GateTalk.Client.Builder cl_builder = GateTalk.Client.newBuilder();
        cl_builder.setSessionId(sessionId);
        cl_builder.setChannel(ch_builder.build());
        cl_builder.setType(GateTalk.Client.Type.PRODUCER);
        cl_builder.setAction(GateTalk.Client.Action.MOUNT);
        GateTalk.Client client_config = cl_builder.build();

        if(!Exchange.isValidExchange(client_config.getChannel().getName()))
            throw new MistException(String.format("exchange `%s' not valid", client_config.getChannel().getName()));

        Client client = null;
        try {
            client = addClient(client_config);
            logger.info(String.format("session %d: create exchange `%s:%s'", sessionId, client.isQueue() ? "queue": "topic", client.getChannelName()));
        }
        catch(MistException e) {
            logger.error(e.getMessage());
            throw e;
        }
        return client;
    }

    protected void deliverMessage(byte[] msg, long ttl, HashMap<String, String> props, List<Exchange> destList) {
        for(Exchange dest : destList) {
            if(dest.getName().length() == 0)
                continue;

            while(retryDeliver) {
                try {
                    Client c = null;
                    synchronized(allClients) {
                        c = findClient(dest);
                        if(c == null || c.getProducer() == null)
                            c = mountAndAddProducer(dest);
                    }
                    c.getProducer().setTimeToLive(ttl);
                    if(props != null)
                        c.sendMessageBytes(msg, props);
                    else
                        c.sendMessageBytes(msg);

                    ExchangeMetric metric = ExchangeMetric.getExchangeMetric(dest);
                    metric.increaseMessageOut(msg.length);
                    break;
                }
                catch(Exception e) {
                    logger.warn("problem in deliverMessage(): ", e);
                    Utils.justSleep(1000);
                }
            }
        }
    }

    private void sendLoop() {
        lastRouteUpdate = -1;
        routeCacheMap.clear();
        // Accept the incoming connection and setup socket IO streams
        if(!acceptConnection())
            return;
        open(false);
        retryDeliver = true;
        isReady = true;

        Packet packet = new Packet();
        List<Exchange> notRoutedDest = new ArrayList<Exchange>();

        // The main loop to handle incoming messages to be sent
        while(!detachNow) {
            try {
                int rdcnt;
                // Reads a message from socket
                // If negative, indicates the other side shutdowns
                if((rdcnt = packet.read(socketInput)) < 0)
                    return;
                // If zero, which means invalid message size, ackClient and
                // continue
                else if(rdcnt == 0) {
                    ackClient(packet, false, MistException.INVALID_MESSAGE_SIZE);
                    continue;
                }
            }
            catch(IOException e) {
                // Socket exception happens, if it is not caused by detach, log
                // the error
                if(!detachNow)
                    logger.error(e.getMessage(), e);
                return;
            }

            // Received a message from socket, try to deliver it

            // Prepare the message to be send
            MessagePrepared mp = null;
            try {
                mp = new MessagePrepared(packet.getPayload());
            }
            catch(MistException e) {
                // If any error occurs, acknowledge fail response and reason
                // to client and continue to handle next message
                ackClient(packet, false, e.getMessage());
                logger.error(e.getMessage());
                continue;
            }

            // Updates the routing cache before delivering the message
            updateRoute(mp.dest);

            // Get the routing destination list
            List<Exchange> destList = routeCacheMap.get(mp.dest);
            if(destList == null) {
                // If the message is not routed, use the not routed
                // destination
                notRoutedDest.clear();
                notRoutedDest.add(mp.dest);
                destList = notRoutedDest;
            }

            // deliver the message
            try {
                deliverMessage(mp.msg, mp.ttl, mp.props, destList);
            }
            catch(Exception e) {
                ackClient(packet, false, e.getMessage());
                logger.error(e.getMessage());
                continue;
            }

            ackClient(packet, true, null);
        }
    }

    @Override
    public void run() {
        sendLoop();
        close(false);
    }

    @Override
    public void addClientIfAttached(Client c) {
        // TODO Auto-generated method stub

    }

    @Override
    protected void detach() {
        // Do not get more messages from socket
        try {
            socketInput.close();
        }
        catch(Exception e) {
        }

        // Wait for 10 seconds to deliver the final message
        try {
            sessionThread.join(10000);
        }
        catch(InterruptedException e) {
        }
        if(sessionThread.isAlive()) {
            retryDeliver = false;
            logger.error("force closing the producer, the message might not have been delivered!");
        }
    }

    @Override
    public boolean isAttached() {
        if(sessionThread == null)
            return false;
        else
            return sessionThread.isAlive();
    }
}
TOP

Related Classes of com.trendmicro.mist.session.ProducerSession

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.