0%

rocketmq

幂等

  • 最多一次(At-most-once)
  • 至少一次(At-least-once)
  • 精确一次(Exactly-once)

消息推送服务,消息队列保证至少一次投递, 应用保证精确一次消费,底层通道保证做多触达一次。

实际上,没有引擎能够保证正好只处理一次。在面对任意故障时,不可能保证每个算子中的用户定义逻辑在每个事件中只执行一次,因为用户代码被部分执行的可能性是永远存在的。

参考文献

rocketmq

为什么使用rocketmq

为什么RocketMQ是金融核心系统消息中间件的第一选择

在企业整体架构中解耦,主要设计两个方面:一是简化减少交互,二是增加一个中间层实现两方的隔离。

资料补充

github仓库地址

github仓库的中文文档

Spring Boot集成RocketMQ以及RocketMQ的基本使用

具体使用中的疑问

%DLQ%为前缀的topic是什么意思

死信队列 (Dead letter queue)

如果重试消息的最大重试次数超过 16 次(默认),则将消息放入 %DLQ% 队列(死信队列)。等待人工处理。

rocketMQ读队列个数,写队列个数的作用?

生产者与消费者的配置

生产者组?

producer.group:这个value有什么用

consumer.instanceName

consumer.messageListener:listener的类

consumer.group

consumer.subscribes:

@Autowired
@Qualifier(“xxxProducer”)
public Producer producer;

这个producer是什么时候注入的

消费者组、生产者组

RocketMQ消费者组用于将一组消费者组织在一起共同消费消息。每个消费者组中的消费者可以并行地消费来自不同消息队列的消息,并且RocketMQ确保在同一时间只有一个消费者组中的一个消费者消费队列中的消息,以确保消息的顺序性。消费者组还可以提供高可用性和负载均衡,因为当一个消费者失败或离线时,其他消费者可以接管其消费队列,确保仍然能够进行消息的消费。消费者组还可以实现消息的广播消费,即每个消费者都可以消费相同的消息,这在某些场景下很有用。

在RocketMQ中,如果不配置消费者组名称,会导致消费者无法正常消费消息。消费者组名称是必填项,它用于标识一组消费者,协同消费消息和进行负载均衡。

如果不配置消费者组名称,RocketMQ将抛出异常或警告,并且消费者无法注册到Broker上,无法获取到消息队列并消费消息。这样会导致消息无法被处理,可能会造成消息积压或丢失的情况。因此,为了正常消费消息,应该确保正确配置每个消费者的消费者组名称。

比较好理解,但是这个配置需要怎么设置

maxReconsumeTimes

consumeThreadMax

consumeThreadMin

RocketMQ中Topic、Tag如何正确使用

1、消息类型是否一致:如普通消息,事务消息,定时消息,顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。

2、业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。

3、消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会会慢一些,不同优先级的消息用不同的 Topic 进行区分。

4、消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而『饿死』,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

参考文献

rocketmq的instantName

rocketmq设置instanceName

rocketmq InstanceName 作用和配置解析

RocketMQ消费者设置了instanceName属性后消息竟不翼而飞

文档中的疑问

「提交或回溯消费」应该是「提交或回溯消息」?

问题定位

普通顺序消息和严格顺序消息的具体实践区别是什么?

疑问定位

能够不指定tag就消费消息吗?

疑问定位

安装

docker 安装完root账号的密码如何查看:sudo docker exec -it gitlab grep 'Password:' /etc/gitlab/initial_root_password

参考文献

运行小代码来测试

bug:RocketMQLog:WARN Please initialize the logger system properly.

错误日志详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
Exception in thread "main" java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <dnow.com/192.168.1.104:49159> failed
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:716)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:580)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1398)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:330)
at Main$SyncProducer.main(Main.java:39)
at Main.main(Main.java:77)
Caused by: org.apache.rocketmq.remoting.exception.RemotingSendRequestException: send request to <dnow.com/192.168.1.104:49159> failed
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.invokeSyncImpl(NettyRemotingAbstract.java:439)
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:377)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1365)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1355)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
... 8 more
Caused by: io.netty.channel.StacklessClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)

问题排查

官方说查看docker有没有跑起来的方法

结果这行代码都执行不了

代码详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# docker exec -it 08ed21ce2d5b ./mqadmin clusterList -n localhost:9876

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.
org.apache.rocketmq.tools.command.SubCommandException: ClusterListSubCommand command failed
at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.execute(ClusterListSubCommand.java:93)
at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:145)
at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:96)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [localhost:9876] failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:445)
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:400)
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:369)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1335)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:327)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:258)
at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.printClusterBaseInfo(ClusterListSubCommand.java:172)
at org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand.execute(ClusterListSubCommand.java:88)

群晖启动印象的时候没有启动namesrv

docker exec -it {your container id} bash

./mqnamesrv &

参考文献

bug: Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest

  • 修改broker版本
  • 启动broker./mqbroker -n localhost:9876 autoCreateTopicEnable=true &

bug:Exception in thread “main” org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

将超时时间设的长一点

参考文献

bug:Exception in thread “main” org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6902]ms, Topic: TopicTest, BrokersSent: [apache-rocketmq, apache-rocketmq, apache-rocketmq]

绑定的ip地址错误,端口没有暴露出去

./mqbroker autoCreateTopicEnable=true -n localhost:9876 -c /home/rocketmq/rocketmq-4.9.2/conf/broker.conf &

在配置文件中添加本机的公网ip

代码详情
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [9905]ms, Topic: TopicTest, BrokersSent: [apache-rocketmq, apache-rocketmq, apache-rocketmq]
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:681)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1391)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1335)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:336)
at Main$SyncProducer.main(Main.java:40)
at Main.main(Main.java:78)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.17.0.6:10911 failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:505)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:489)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:433)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:870)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:606)
... 5 more

参考文献

spring boot集成

@Qualifier的作用

通过将 @Qualifier 注解与我们想要使用的特定 Spring bean 的名称一起进行装配

一定有个地方注入了这个名字的bean

PushConsumer

参考文献

有哪些Consumer?

源码阅读

class.this的作用

去掉class.this在1.8版本也能编译通过,是否真的要拿到这个

ClassName.this这个用法多用于在nested class(内部类)中,当inner class(内部类)必顺使用到outer class(外部类)的this instance(实例)时,就用OuterClassName.this

仓库地址

Java/android 里ClassName.this和this的使用

What is ClassName.this? [ duplicate ]

遇到过的问题

SLAVE_NOT_AVAILABLE

最近在 RocketMQ 钉钉官方群中看到有人反馈说 broker 主从部署,在发布消息的时候会报 SLAVE_NOT_AVAILABLE 异常,报这个异常的前提 master 的模式一定为 SYNC_MASTER(同步复制),从 异常码可以直接判断的一种原因就是因为 slave 挂掉了,导致 slave 不可用,但是他说 slave 一切正常。

最后总结,如果 slave 正常运行,报这个错是正常的,你可以适当调整 haSendHeartbeatInterval 参数(1000 * 5)的大小,它决定 slave 上报同步位移的心跳频率,以及 haSlaveFallbehindMax 参数值(默认 1024 * 1024 * 256),它决定允许 slave 最多落后 master 的位移。

问题

MessageExt对象的结构

MessageExt 是 RocketMQ 中消息扩展对象的类,它扩展了 Message 类,添加了一些供消费者使用的额外属性和方法。以下是 MessageExt 中一些常见字段的含义:

msgId: 消息的全局唯一标识,由 Broker 生成。

topic: 消息的主题,即消息分类的名称。

flag: RocketMQ 在 Message 类中定义了一个整型字段 flag 来放置不同的标记位,比如是否压缩、是否是批量消息等。

queueId: 消息所属的队列 ID,每个主题可以拥有多个队列。

storeSize: 消息存储时的大小,以字节为单位。

queueOffset: 队列中的偏移量,表示这个消息是队列中的第几条消息。

sysFlag: 系统标志位,用于标识一些标志位比如是否是事务型消息等。

bornTimestamp: 消息在客户端创建的时间戳。

storeTimestamp: 消息存储到 Broker 的时间戳。

bornHost: 消息发送者的源地址,格式通常是 IP:端口。

storeHost: 消息存储到 Broker 的服务器地址,也是 IP:端口格式。

bodyCRC: 消息体的 CRC 校验码,用于确保消息在传输过程中不被损坏。

reconsumeTimes: 消息重消费次数。如果消费者处理消息失败,消息可能会被重试,该字段则表示该消息被重新消费的次数。

commitLogOffset: 消息在 CommitLog 文件中的偏移量。

在 RocketMQ 中,可以通过检查 MessageExt 对象的 reconsumeTimes 属性来判断某条消息是否正在重试。reconsumeTimes 表示该消息已经被消费但失败并重新排队的次数。

可以用msgIdreconsumeTimes做幂等