您现在的位置是:首页 > 后台技术 > JavaJava

rocketMQ实现消息集群消费(图文)

第十三双眼睛2021-06-26【Java】人已围观

简介本节记录如何实现rocketMQ下消息的集群消费

在公司上班有一次遇到一个问题,rocketMQ消息太多,单个应用消费速度慢,这时就应该增加消费者来提高消息消费速度了。特意记录一下此次的过程:
数据由一个生产者定时产生,每秒产生一条,弄两个消费者去消费,让消费者线程睡一会,模拟处理时间,项目结构如下:
一个消费者

项目为springboo项目,pom文件如下:
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.5.RELEASE</version>
  </parent>
  
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>
  
  <dependencies>
      <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
         <!-- <exclusions>
             <exclusion>
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-starter-tomcat</artifactId>
             </exclusion>
         </exclusions> -->
      </dependency>
      <dependency>
        <groupId>javax.servlet</groupId>
        <artifactId>javax.servlet-api</artifactId>
        <version>3.0.1</version>
      </dependency>
      <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.6</version>
       </dependency>
        <!-- mqtt -->     
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>     
        <!-- rocket mq -->     
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>ons-client</artifactId>
        <version>1.7.9.Final</version>
    </dependency>

    <!-- mqtt 管控API-->         
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <!-- <optional>true</optional> -->
        <version>4.3.3</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-ons</artifactId>
        <version>3.1.0</version>
    </dependency>
    
    <!-- lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.10</version>
        <scope>provided</scope>
    </dependency>
  </dependencies>
生产者:
package com.xinchen.producer.mqtt;

import java.util.Properties;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.xinchen.producer.message.MessageTest;
@Component
public class MQTTProducer {

    
    @Value("${aliyun.mq.topic}")
    private String topic;

    @Value("${aliyun.mq.group}")
    private String group;

    @Value("${aliyun.mqtt.group}")
    private String mqttGroup;

    @Value("${aliyun.mq.ak}")
    private String ak;

    @Value("${aliyun.mq.sk}")
    private String sk;

    @Value("${aliyun.mq.nameserver}")
    private String nameserver;

    @Value("${aliyun.mq.regionId}")
    private String regionId;

    @Value("${aliyun.mq.instanceId}")
    private String instanceId;

    private Producer producer;
    
    
    
    @PostConstruct
    public void initProducer() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, group);
        properties.put(PropertyKeyConst.AccessKey, ak);
        properties.put(PropertyKeyConst.SecretKey, sk);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameserver);
        properties.put(PropertyKeyConst.MqttQOS, 1);
        producer = ONSFactory.createProducer(properties);
        try {
            producer.start();
            System.out.println("producer 已启动");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    
    public SendResult send(MessageTest messageTest) {
        SendResult sendResult = null;
        try {
            String clientId = mqttGroup + "@@@" + messageTest.getClientId();
            Message msg = new Message(topic,"MQTT_TAG",messageTest.getDesc().getBytes());
            msg.setKey("UNIQUE_KEY_" + clientId);
            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId + "/" + messageTest.getOperate());
            sendResult = producer.send(msg);
        } catch (ONSClientException e) {
            e.printStackTrace();
        }
        return sendResult;
    }    
}
定时产生消息:
@Scheduled(cron = "0/1 * * * * *")
    public void testList() {
        while(true) {
            MessageTest message = new MessageTest();
            message.setClientId("producer");
            message.setOperate("open");
            message.setDesc("producer");
            
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult.getMessageId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            count ++;
            System.out.println("发送消息:" + count);
        }   
}
消费者项目结构:
package com.xinchen.consumer.mqtt;

import java.util.Properties;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PropertyValueConst;
@Component
public class MQTTConsumer {

    
    @Value("${aliyun.mq.topic}")
    private String topic;
    @Value("${aliyun.mq.group}")
    private String group;
    @Value("${aliyun.mqtt.group}")
    private String mqttGroup;
    @Value("${aliyun.mq.ak}")
    private String ak;
    @Value("${aliyun.mq.sk}")
    private String sk;
    @Value("${aliyun.mq.nameserver}")
    private String nameserver;
    private Consumer consumer;
    @Autowired
    private MQTTCallBack mqttCallBack;
    
    
    
    @PostConstruct
    public void initConsumer() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, group);
        properties.put(PropertyKeyConst.AccessKey, ak);
        properties.put(PropertyKeyConst.SecretKey, sk);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameserver);
        properties.put(PropertyKeyConst.ConsumeThreadNums, 10);
        properties.put(PropertyKeyConst.MaxReconsumeTimes, 10);
        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(topic, "*", mqttCallBack);
        try {
            consumer.start();
            System.out.println("Consumer启动成功");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
}
先后启动生产者,消费者1,消费者2
生产者发送10条消息:
C0A800645D5401D16E9385296F9B0000
发送消息:1
C0A800645D5401D16E93852974D90003
发送消息:2
C0A800645D5401D16E93852978E30006
发送消息:3
C0A800645D5401D16E9385297CEC0009
发送消息:4
C0A800645D5401D16E93852980FE000C
发送消息:5
C0A800645D5401D16E938529850D000F
发送消息:6
C0A800645D5401D16E938529891B0012
发送消息:7
C0A800645D5401D16E9385298D2F0015
发送消息:8
C0A800645D5401D16E93852991620018
发送消息:9
C0A800645D5401D16E938529956C001B
发送消息:10
消费者1消费的消息
消费者1处理中......
消费者11111消费消息:1
消费者1处理中......
消费者11111消费消息:2
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E93852978E30006
body:producer
消费者1处理中......
消费者11111消费消息:3
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E9385297CEC0009
body:producer
消费者1处理中......
消费者11111消费消息:4
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E93852980FE000C
body:producer
消费者1处理中......
消费者11111消费消息:5
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E938529850D000F
body:producer
消费者1处理中......
消费者11111消费消息:6
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E938529891B0012
body:producer
消费者1处理中......
消费者11111消费消息:7
消费者2消费的消息
消费者2处理中......
消费者2222消费消息:1
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E93852991620018
body:producer
消费者2处理中......
消费者2222消费消息:2
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E938529956C001B
body:producer
消费者2处理中......
消费者2222消费消息:3
subtopic:/p2p/GID_COMMON@@@producer/open
msgID:C0A800645D5401D16E938529997A001E
body:producer
消费者2处理中......
消费者2222消费消息:4
可以看到生产者产生的消息由消费者1和消费者2共同消费,这样,就可以提供消费速度,消息不会再积压了。




 

Tags:

很赞哦! ()

文章评论

    共有条评论来说两句吧...

    用户名:

    验证码:

站点信息

  • 网站名称:JavaStudy
  • 建站时间:2019-1-14
  • 网站程序:帝国CMS7.5
  • 文章统计247篇文章
  • 标签管理标签云
  • 统计数据百度统计
  • 微信公众号:扫描二维码,关注我们