今天來聊一聊 RocketMQ 的延時消息是怎么實現(xiàn)的。
延時消息是指發(fā)送到 RocketMQ 后不會馬上被消費者拉取到,而是等待固定的時間,才能被消費者拉取到。
延時消息的使用場景很多,比如電商場景下關(guān)閉超時未支付的訂單,某些場景下需要在固定時間后發(fā)送提示消息。
1.生產(chǎn)者
首先看一個生產(chǎn)者發(fā)送延時消息的官方示例代碼:
public static void main(String[] args) throws Exception { // Instantiate a producer to send scheduled messages DefaultMQProducer producer = new DefaultMQProducer(“ExampleProducerGroup”); // Launch producer producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // This message will be delivered to consumer 10 seconds later. message.setDelayTimeLevel(3); // Send the message producer.send(message); } // Shutdown producer after use. producer.shutdown();}
從上面的代碼可以看到,跟普通消息不一樣的是,消息設(shè)置 setDelayTimeLevel 屬性值,這里設(shè)置為 3,這里最終將 3 這個延時級別復(fù)制給了 DELAY 屬性。
關(guān)于延時級別,可以看下面這個定義:
//MessageStoreConfig類private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
這里延時級別有 18 個,上面的示例代碼中延遲級別是 3,消息會延遲 10s 后消費者才能拉取。
2.Broker 處理
2.1 寫入消息
Broker 收到消息后,會將消息寫入 CommitLog。在寫入時,會判斷消息 DELAY 屬性是否大于 0。代碼如下:
//CommitLog 類if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId);}
從上面的代碼可以看到,CommitLog 寫入時并沒有直接寫入,而是把 Topic 改為 SCHEDULE_TOPIC_XXXX,把 queueId 改為延時級別減 1。因為延時級別有 18 個,所以這里有 18 個隊列。如下圖:
2.2 調(diào)度消息
延時消息寫入后,會有一個調(diào)度任務(wù)不停地拉取這些延時消息,這個邏輯在類 ScheduleMessageService。這個類的初始化代碼如下:
public void start() { if (started.compareAndSet(false, true)) { this.load(); this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl(“ScheduleMessageTimerThread_”)); //省略部分邏輯 for (Map.Entry entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //省略部分邏輯 this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } //省略持久化的邏輯 }}
上面的 load() 方法會加載一個 delayLevelTable(ConcurrentHashMap類型),key 保存延時級別(從 1 開始),value 保存延時時間(單位是 ms)。
load() 方法結(jié)束后,創(chuàng)建了一個有 18 個核心線程的定時線程池,然后遍歷 delayLevelTable,創(chuàng)建 18 個任務(wù)(DeliverDelayedMessageTimerTask)進行每個延時級別的任務(wù)調(diào)度。任務(wù)調(diào)度的代碼邏輯如下:
public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); if (cq == null) { this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE); return; } SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ == null) { //省略部分邏輯 this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE); return; } long nextOffset = this.offset; try { int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i 0) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt == null) { continue; } MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt); //事務(wù)消息判斷省略 boolean deliverSuc; //只保留同步 deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy); if (!deliverSuc) { this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); return; } } nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); } catch (Exception e) { log.error(“ScheduleMessageService, messageTimeup execute error, offset = {}”, nextOffset, e); } finally { bufferCQ.release(); } this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);}
這段代碼可以參考下面的流程圖來進行理解:
上面有一個修正投遞時間的函數(shù),這個函數(shù)的意義是如果已經(jīng)過了投遞時間,那么立即投遞。代碼如下:
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) { long result = deliverTimestamp; long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel); if (deliverTimestamp > maxTimestamp) { result = now; } return result;}
注意:消息從 CommitLog 轉(zhuǎn)發(fā)到 ConsumeQueue 時,會判斷是否是延時消息(Topic = SCHEDULE_TOPIC_XXXX 并且延時級別大于 0),如果是延時消息,就會修改 tagsCode 值為消息投遞的時間戳,而 tagsCode 原值是 tag 的 HashCode。代碼如下:
//CommitLog類checkMessageAndReturnSize方法if (delayLevel > 0) { tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel, storeTimestamp);}
如下圖:
而 ScheduleMessageService 調(diào)度線程將消息從 ConsumeQueue 重新投遞到原始隊列中時,會把 tagsCode 再次修改為 tag 的 HashCode,代碼如下:
//類MessageExtBrokerInner,這個方法被 messageTimeup 方法調(diào)用。public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } return tags.hashCode();}
如下圖:
2.3 一個問題
如果有一個業(yè)務(wù)場景,要求延時消息 3 小時才能消費,而 RocketMQ 的延時消息最大延時級別只支持延時 2 小時,怎么處理?
這里提供兩個思路供大家參考:
在 Broker 上修改 messageDelayLevel 的默認配置;
在客戶端緩存 msgId,先設(shè)置延時級別是 18(2h),當(dāng)客戶端拉取到消息后首先判斷有沒有緩存,如果有緩存則再次發(fā)送延時消息,這次延時級別是 17(1h),如果沒有緩存則進行消費。
3 總結(jié)
經(jīng)過上面的講解,延時消息的處理流程如下:
最后,延時消息的延時時間并不精確,這個時間是 Broker 調(diào)度線程把消息重新投遞到原始的 MessageQueue 的時間,如果發(fā)生消息積壓或者 RocketMQ 客戶端發(fā)生流量管控,客戶端拉取到消息后進行處理的時間可能會超出預(yù)設(shè)的延時時間