您现在的位置是:首页 > 后台技术 > JavaJava
rocketMQ实现消息集群消费(图文)
第十三双眼睛2021-06-26【Java】人已围观
简介本节记录如何实现rocketMQ下消息的集群消费
在公司上班有一次遇到一个问题,rocketMQ消息太多,单个应用消费速度慢,这时就应该增加消费者来提高消息消费速度了。特意记录一下此次的过程:
数据由一个生产者定时产生,每秒产生一条,弄两个消费者去消费,让消费者线程睡一会,模拟处理时间,项目结构如下:
一个消费者
项目为springboo项目,pom文件如下:
生产者:
定时产生消息:
消费者项目结构:
先后启动生产者,消费者1,消费者2
生产者发送10条消息:
消费者1消费的消息
消费者2消费的消息
可以看到生产者产生的消息由消费者1和消费者2共同消费,这样,就可以提供消费速度,消息不会再积压了。
数据由一个生产者定时产生,每秒产生一条,弄两个消费者去消费,让消费者线程睡一会,模拟处理时间,项目结构如下:
一个消费者
项目为springboo项目,pom文件如下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> --> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.0.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.6</version> </dependency> <!-- mqtt --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> <!-- rocket mq --> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.7.9.Final</version> </dependency> <!-- mqtt 管控API--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <!-- <optional>true</optional> --> <version>4.3.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-ons</artifactId> <version>3.1.0</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.10</version> <scope>provided</scope> </dependency> </dependencies> |
package com.xinchen.producer.mqtt; import java.util.Properties; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.xinchen.producer.message.MessageTest; @Component public class MQTTProducer { @Value("${aliyun.mq.topic}") private String topic; @Value("${aliyun.mq.group}") private String group; @Value("${aliyun.mqtt.group}") private String mqttGroup; @Value("${aliyun.mq.ak}") private String ak; @Value("${aliyun.mq.sk}") private String sk; @Value("${aliyun.mq.nameserver}") private String nameserver; @Value("${aliyun.mq.regionId}") private String regionId; @Value("${aliyun.mq.instanceId}") private String instanceId; private Producer producer; @PostConstruct public void initProducer() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, group); properties.put(PropertyKeyConst.AccessKey, ak); properties.put(PropertyKeyConst.SecretKey, sk); properties.put(PropertyKeyConst.NAMESRV_ADDR, nameserver); properties.put(PropertyKeyConst.MqttQOS, 1); producer = ONSFactory.createProducer(properties); try { producer.start(); System.out.println("producer 已启动"); } catch (Exception e) { e.printStackTrace(); } } public SendResult send(MessageTest messageTest) { SendResult sendResult = null; try { String clientId = mqttGroup + "@@@" + messageTest.getClientId(); Message msg = new Message(topic,"MQTT_TAG",messageTest.getDesc().getBytes()); msg.setKey("UNIQUE_KEY_" + clientId); msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId + "/" + messageTest.getOperate()); sendResult = producer.send(msg); } catch (ONSClientException e) { e.printStackTrace(); } return sendResult; } } |
@Scheduled(cron = "0/1 * * * * *") public void testList() { while(true) { MessageTest message = new MessageTest(); message.setClientId("producer"); message.setOperate("open"); message.setDesc("producer"); SendResult sendResult = producer.send(message); System.out.println(sendResult.getMessageId()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } count ++; System.out.println("发送消息:" + count); } } |
package com.xinchen.consumer.mqtt; import java.util.Properties; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.PropertyValueConst; @Component public class MQTTConsumer { @Value("${aliyun.mq.topic}") private String topic; @Value("${aliyun.mq.group}") private String group; @Value("${aliyun.mqtt.group}") private String mqttGroup; @Value("${aliyun.mq.ak}") private String ak; @Value("${aliyun.mq.sk}") private String sk; @Value("${aliyun.mq.nameserver}") private String nameserver; private Consumer consumer; @Autowired private MQTTCallBack mqttCallBack; @PostConstruct public void initConsumer() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, group); properties.put(PropertyKeyConst.AccessKey, ak); properties.put(PropertyKeyConst.SecretKey, sk); properties.put(PropertyKeyConst.NAMESRV_ADDR, nameserver); properties.put(PropertyKeyConst.ConsumeThreadNums, 10); properties.put(PropertyKeyConst.MaxReconsumeTimes, 10); properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", mqttCallBack); try { consumer.start(); System.out.println("Consumer启动成功"); } catch (Exception e) { e.printStackTrace(); } } } |
生产者发送10条消息:
C0A800645D5401D16E9385296F9B0000 发送消息:1 C0A800645D5401D16E93852974D90003 发送消息:2 C0A800645D5401D16E93852978E30006 发送消息:3 C0A800645D5401D16E9385297CEC0009 发送消息:4 C0A800645D5401D16E93852980FE000C 发送消息:5 C0A800645D5401D16E938529850D000F 发送消息:6 C0A800645D5401D16E938529891B0012 发送消息:7 C0A800645D5401D16E9385298D2F0015 发送消息:8 C0A800645D5401D16E93852991620018 发送消息:9 C0A800645D5401D16E938529956C001B 发送消息:10 |
消费者1处理中...... 消费者11111消费消息:1 消费者1处理中...... 消费者11111消费消息:2 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E93852978E30006 body:producer 消费者1处理中...... 消费者11111消费消息:3 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E9385297CEC0009 body:producer 消费者1处理中...... 消费者11111消费消息:4 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E93852980FE000C body:producer 消费者1处理中...... 消费者11111消费消息:5 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529850D000F body:producer 消费者1处理中...... 消费者11111消费消息:6 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529891B0012 body:producer 消费者1处理中...... 消费者11111消费消息:7 |
消费者2处理中...... 消费者2222消费消息:1 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E93852991620018 body:producer 消费者2处理中...... 消费者2222消费消息:2 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529956C001B body:producer 消费者2处理中...... 消费者2222消费消息:3 subtopic:/p2p/GID_COMMON@@@producer/open msgID:C0A800645D5401D16E938529997A001E body:producer 消费者2处理中...... 消费者2222消费消息:4 |
Tags:
很赞哦! ()