【RocketMQ】简单消息示例

  |   0 评论   |   217 浏览

引入依赖

项目中用的是maven,代码如下

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.0</version>
    </dependency>

因为我的RocketMQ版本是4.6.0,jar包也用4.6.0,尽量保持一致。

同步消息

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者group名称
        DefaultMQProducer producer = new
                DefaultMQProducer("simple_producer_name");
        //指定nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            //创建消息实例,指定主题、标签、消息体
            Message msg = new Message("TopicTest" /* 主题 */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
            );
            //调用发送消息方法进行消息发送
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //生产者发送完之后需要给他关闭掉
        producer.shutdown();
    }
}

异步消息

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者group名称
        DefaultMQProducer producer = new DefaultMQProducer("simple_producer_name");
        //指定nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //启动实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 100; i++) {
            final int index = i;
            //创建消息实例,指定主题、标签、消息体
            Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                //发送完之后会有回调
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    //出现异常可以做一些事情
                }
            });
        }
        //生产者发送完之后需要给他关闭掉,需要把这个代码注释掉或者sleep几秒再关闭
//        producer.shutdown();
    }
}

单向消息

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //初始化生产者group名称
        DefaultMQProducer producer = new DefaultMQProducer("simple_producer_name");
        //指定nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //启动实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            //创建消息实例,指定主题、标签、消息体
            Message msg = new Message("TopicTest" /* 主题 */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
            );
            //调用发送消息方法进行消息发送
            producer.sendOneway(msg);

        }
        //生产者发送完之后需要给他关闭掉
        producer.shutdown();
    }
}

消费消息

public class SimpleConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        //初始化指定消费者的group名称
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("simple_consumer_name");

        //指定name server地址
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以通过控制台俩看发送消息、消费消息的日志。

源代码

传送门

也可以关注我的公众号:程序之声
图片
关注公众号,领取更多资源

本文为博主原创文章,未经博主允许不得转载。

评论

发表评论