Package com.alibaba.ons.api.impl.rocketmq

Source Code of com.alibaba.ons.api.impl.rocketmq.OrderProducerImpl

package com.alibaba.ons.api.impl.rocketmq;

import java.util.List;
import java.util.Properties;

import com.alibaba.ons.api.Message;
import com.alibaba.ons.api.PropertyKeyConst;
import com.alibaba.ons.api.SendResult;
import com.alibaba.ons.api.exception.ONSClientException;
import com.alibaba.ons.api.order.MessageQueueSelector;
import com.alibaba.ons.api.order.OrderProducer;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.message.MessageQueue;


public class OrderProducerImpl extends ONSClientAbstract implements OrderProducer {
    private final DefaultMQProducer defaultMQProducer;


    public OrderProducerImpl(final Properties properties) {
        super(properties);
        this.defaultMQProducer = new DefaultMQProducer();

        String producerGroup =
                properties.getProperty(PropertyKeyConst.ProducerId, "__ONS_PRODUCER_DEFAULT_GROUP");
        this.defaultMQProducer.setProducerGroup(producerGroup);

        String sendMsgTimeoutMillis = properties.getProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        this.defaultMQProducer.setSendMsgTimeout(Integer.parseInt(sendMsgTimeoutMillis));

        this.defaultMQProducer.setInstanceName(this.buildIntanceName());
    }


    @Override
    public Properties getProperties() {
        return this.properties;
    }


    @Override
    public void start() {
        try {
            this.defaultMQProducer.start();
            if (!this.sessionCredentials.equals(this.defaultMQProducer.getDefaultMQProducerImpl()
                .getmQClientFactory().getMQClientAPIImpl().getSessionCredentials())) {
                this.defaultMQProducer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl()
                    .setSessionCredentials(this.sessionCredentials);
            }
        }
        catch (Exception e) {
            throw new ONSClientException("defaultMQProducer start exception", e);
        }

    }


    @Override
    public void shutdown() {
        this.defaultMQProducer.shutdown();
    }


    @Override
    public SendResult send(final Message message, final MessageQueueSelector selector, final Object arg) {
        final com.alibaba.rocketmq.common.message.Message msgRMQ = ONSUtil.msgConvert(message);
        try {
            com.alibaba.rocketmq.client.producer.SendResult sendResultRMQ =
                    this.defaultMQProducer.send(msgRMQ,
                        new com.alibaba.rocketmq.client.producer.MessageQueueSelector() {

                            @Override
                            public MessageQueue select(List<MessageQueue> arg0,
                                    com.alibaba.rocketmq.common.message.Message arg1, Object arg2) {
                                int select = selector.select(arg0.size(), message, arg);
                                return arg0.get(select);
                            }
                        }, arg);
            SendResult sendResult = new SendResult();
            sendResult.setMessageId(sendResultRMQ.getMsgId());
            return sendResult;
        }
        catch (Exception e) {
            throw new ONSClientException("defaultMQProducer send order exception", e);
        }
    }
}
TOP

Related Classes of com.alibaba.ons.api.impl.rocketmq.OrderProducerImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.