【RocketMQ】顺序消息

  |   0 评论   |   284 浏览

前言

RocketMQ使用FIFO(先进先出)顺序提供有序消息,有顺序的消息在实际业务中是比较常见的,比如下单,下面让我们来看看代码如何实现。

rocketmq支持全局顺序和分区顺序,全局顺序只是分区顺序的一种特例,但是对性能会有很大的影响,建议使用分区顺序,以下示例也是使用了分区顺序。

生产者

public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者group名称
        DefaultMQProducer producer = new DefaultMQProducer("order_producer_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            //创建消息实例,指定主题、标签、消息体
            Message msg = new Message("TopicOrderTest", "tagA", "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                System.out.println("producer:queueId:"+id+";content:"+new String(msg.getBody()));
                return mqs.get(index);
            }, 3);

        }
        //生产者关闭
        producer.shutdown();
    }
}

执行结果
image.png

消费者

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_name");

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicOrderTest", "*");

        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            System.out.println("consumer:queueId:"+msgs.get(0).getQueueId()+";content:"+new String(msgs.get(0).getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

执行结果
image.png

源代码

传送门

也可以关注我的公众号:程序之声
图片
关注公众号,领取更多资源

本文为博主原创文章,未经博主允许不得转载。

评论

发表评论