rocketmq怎么设置唯一标识 rocketmq任意时间队列实现原理?

[更新]
·
·
分类:互联网
1986 阅读

rocketmq怎么设置唯一标识

rocketmq怎么设置唯一标识 rocketmq任意时间队列实现原理?

rocketmq任意时间队列实现原理?

rocketmq任意时间队列实现原理?

RocketMQ是一个开源的分布式消息系统,基于高可用的分布式集群技术,提供低延迟、高可靠性、万亿级容量、灵活可扩展的消息发布和订阅服务。

它的前身是MetaQ,是阿里基于Kafka 的设计和使用Java。2012年,阿里将其开源。2016年,阿里将其捐赠给阿帕奇软件基金会(ASF),正式成为孵化项目。2017年,Apache Software Foundation宣布RocketMQ已经孵化为Apache(简称TLP)的一个顶层项目,这是国内首个Apache上的互联网中间件顶层项目。

延迟消息

生产者将消息发送到消息队列后,它并不期望立即被消费,而是可以在等待指定时间后被消费者消费。这种消息通常被称为延迟消息。

在RocketMQ中,支持延迟消息,但不支持任何时间精度,只支持特定级别。如果你想支持任意的时间精度,你可以 如果不避免在代理级别进行消息排序,那么就涉及到持久性的考虑,那么消息排序必然会产生巨大的性能开销。

消息延迟级别为1s、5s、10s、30s、1m、2m、3m、4m、6m、7m、10m、20m、30m、1h和2h,共18个级别。发送消息时,只需设置消息延迟级别。设置消息延迟级别有三种情况:

当消息延迟级别设置为0时,消息是非延迟消息。

当消息延迟级别设置为大于或等于1且小于或等于18时,消息将延迟一段特定时间。例如,如果消息延迟级别设置为等于1,则延迟1s;如果消息延迟级别设置为2,将延迟5s,以此类推。

当消息延迟级别设置为大于18时,消息延迟级别为18;如果消息延迟级别设置为等于20,它将延迟2小时。

延迟消息的示例

首先,编写一个消费者来消费延迟的消息:

编写另一个延迟消息的生产者来发送延迟消息:

运行生成器后,将发送一条延迟消息:

十秒钟后,消费者收到了这条延迟的消息:

延迟消息的原理分析

下面分析的RocketMQ源代码版本号是4.7.1,版本和源代码略有不同。

CommitLog

在中,对延迟的消息进行了一些处理:

可以看出,每个延迟消息的主题都临时更改为SCHEDULE_TOPIC_XXXX,并且根据延迟级别更改新的队列Id。接下来,处理延迟消息的是。

ScheduleMessag消息电子服务

ScheduleMessageService由初始化,包括构造一个对象和调用load方法。最后,执行ScheduleMessageService的start方法:

遍历所有延迟级别,根据延迟级别得到对应队列的偏移量,如果偏移量不存在,则设置为0。然后,为每个延迟级别创建一个定时任务。第一次启动任务的延迟为1秒,第二次及后续启动任务的延迟为延迟级别对应的延迟时间。

然后,创建一个定时任务来保存每个队列消耗的偏移量。持续的频率由flushDelayOffsetInterval属性配置,默认值为10秒。

计时任务

在执行ScheduleMessageService的start方法后,每个延迟级别都会创建自己的计时任务。这里计时任务的具体实现在delivedelayedmessagegetimertask类中,其核心代码是executeOnTimeup方法。让 让我们来看看主要部分:

如果没有获得对应的消息队列,任务将在DELAY_FOR_A_WHILE(默认值为100)毫秒后执行。如果是,继续执行以下操作:

如果没有获得有效消息,任务将在DELAY_FOR_A_WHILE(默认值为100)毫秒后执行。如果是,继续执行以下操作:

如果当前消息小于消耗时间,任务将在倒计时毫秒后执行。如果到了消耗时间,继续执行以下操作:

如果获得消息,继续执行以下操作:

清除消息的延迟级别,并恢复真实的消息主题和队列Id。消息被重新发送到真实消息队列后,消费者可以立即消费它。

摘要

经过以上对源代码的分析,我们可以总结出延迟消息的实现步骤:

如果消息的延迟级别大于0,说明该消息是延迟消息,消息主题修改为SCHEDULE_TOPIC_XXXX,队列Id为延迟级别减1。

消息进入SCHEDULE_TOPIC_XXXX的队列。

计时任务根据最后的拉取偏移量从队列中连续取出所有消息。

根据消息的物理偏移量和大小再次获取消息。

根据消息属性重新创建消息,清除延迟级别,并恢复原始主题和队列Id。

将消息重新发送到原主题的队列中,供消费者消费。

rocketmq nameser是否可设置密码?

可以设置密码,方便用户使用。