RocketMQ 是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠的消息發布與訂閱服務。同時,廣泛應用于多個領域,包括異步通信解耦、企業解決方案、金融支付、電信、電子商務、快遞物流、廣告營銷、社交、即時通信、移動應用、手遊、視頻、物聯網、車聯網等。
RocketMQ 前身叫做MetaQ,在MetaQ發布3.0版本的時候改名為 RocketMQ
RocketMQ本質上的設計思路和Kafka類似,但是和Kafka不同的是其使用Java進行開發,由于在國内的Java受衆群體遠遠多于Scala、Erlang,所以RocketMQ是很多以Java語言為主的公司的首選。同樣的RocketMQ和Kafka都是Apache基金會中的頂級項目,他們社區的活躍度都非常高,項目更新叠代也非常快。
RocketMQ是阿裡review kafka的java版,如果消息性能要求高,用 RocketMQ 與 Kafka 可以更優
消息隊列在實際應用中常用的使用場景,包含應用解耦、異步處理、流量削鋒、消息通訊、日志處理等。
RocketMQ 官網:http://rocketmq.apache.org
RocketMQ Github:https://github./apache/rocketmq
Kafka 官網:http://kafka.apache.org
ActiveMQ 官網:http://activemq.apache.org
RabbitMQ 官網:https://www.rabbitmq.
ZeroMQ 官網:https://zeromq.org
MetaMQ RocketMQ的前世今生
某公司一直用的消息中間件是MetaMQ現在,網上相關的資料也不是很多,今天去想淘寶為什會把MetaMQ給替換成了RocketMQ。就網上搜索了一下,這兩個居然是爺孫關系。
阿裡巴巴消息中間件起源于2001年的五彩石項目,Notify在這期間應運而生,用于交易核心消息的流轉。
至2010年,B2B開始大規模使用ActiveMQ作為消息内核,随着阿裡業務的快速發展,急需一款支持順序消息,擁有海量消息堆積能力的消息中間件,MetaQ 1.0在2011年誕生。
到2012年,MetaQ已經發展到了MetaQ 3.0,并抽象出了通用的消息引擎RocketMQ。
随後,将RocketMQ進行了開源,阿裡的消息中間件正式走入了公衆的視野。
到2015年,RocketMQ已經經曆了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。與此同時,雲計算大行其道,阿裡消息中間件基于RocketMQ推出了Aliware MQ 1.0,開始為阿裡雲上成千上萬家企業提供消息服務。
到今年,MetaQ在2016年雙十一承載了萬億級消息的流轉,跨越了一個新的裡程碑,同時RocketMQ進入Apache 孵化。
RocketMQ 産品發展曆史
大約經曆了三個主要版本叠代:
1)Metaq 1.x(Metamorphosis)
由開源社區killme2008維護,開源社區非常活躍 https://github./killme2008/Metamorphosis
2)Metaq 2.x
于2012年10月份上線,在淘寶内部被廣泛使用。
3)RocketMQ 3.x
基于公司内部開源共建原則, RocketMQ項目隻維護核心功能,且去除了所有其他運行時依賴,核心功能最簡化。每個BU的個性化需求都在RocketMQ項目之上進行深度定制。RocketMQ向其他BU提供的僅僅是Jar包,例如要定制一個Broker,那麼隻需要依賴rocketmq-broker這個jar包即可,可通過API進行交互,如果定制client,則依賴rocketmq-client這個jar包,對其提供的api進行再封裝。
在RocketMQ項目基礎上衍生的項目如下
.taobao.metaq v3.0 = RocketMQ + 淘寶個性化需求,為淘寶應用提供消息服務。
.alipay.zpullmsg v1.0 = RocketMQ + 支付寶個性化需求,為支付寶應用提供消息服務。
.alibaba.monmq v1.0 = Notify + RocketMQ + B2B個性化需求,為B2B應用提供消息服務。
一、 MQ背景&選型
消息隊列作為分布式、高并發系統的核心組件之一,能夠幫助業務系統解構,提升開發效率和系統穩定性。
MQ 主要具有以下優勢:
1)削峰填谷:主要解決瞬時寫壓力大于應用服務能力導緻消息丢失、系統奔潰等問題
2)系統解耦:解決不同重要程度、不同能力級别系統之間依賴導緻一死全死
3)提升性能:當存在一對多調用時,可以發一條消息給消息系統,讓消息系統通知相關系統
4)蓄流壓測:線上有些鍊路不好壓測,可以通過堆積一定量消息再放開來壓測
目前主流的MQ主要是RocketMQ、kafka、RabbitMQ等
RocketMQ相比于RabbitMQ、kafka具有主要優勢特性有:
1)支持事務型消息(消息發送和DB操作保持兩方的最終一緻性,rabbitmq和kafka不支持)
2)支持結合rocketmq的多個系統之間數據最終一緻性(多方事務,二方事務是前提)
3)支持18個級别的延遲消息(rabbitmq和kafka不支持)
4)支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq需要手動确認)
5)支持consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支持)
6)支持重複消費(rabbitmq不支持,kafka支持)
Rocketmq、kafka、Rabbitmq的詳細對比,請參照下表格:
二、RocketMQ集群概述
1. RocketMQ集群部署結構
Name Server是一個幾乎無狀态節點,可集群部署,節點之間無任何信息同步。
Broker部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave隻能對應一個Master,Master與Slave的對應關系通過指定相同的Broker Name,不同的Broker Id來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。
每個Broker與Name Server集群中的所有節點建立長連接,定時(每隔30s)注冊Topic信息到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接。
Producer與Name Server集群中的其中一個節點(随機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀态,可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊列的最新情況,這意味着如果Broker不可用,Producer最多30s能夠感知,在此期間内發往Broker的所有消息都會失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送心跳,Broker每隔10s中掃描所有存活的連接,如果Broker在2分鐘内沒有收到心跳數據,則關閉與Producer的連接。
Consumer與Name Server集群中的其中一個節點(随機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味着Broker不可用時,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送心跳,Broker每隔10s掃描所有存活的連接,若某個連接2分鐘内沒有發送心跳數據,則關閉連接;并向該Consumer Group的所有Consumer發出通知,Group内的Consumer重新分配隊列,然後繼續消費。
當Consumer得到master宕機通知後,轉向slave消費,slave不能保證master的消息100%都同步過來了,因此會有少量的消息丢失。但是一旦master恢複,未同步過去的消息會被最終消費掉。
消費者隊列是消費者連接之後(或者之前有連接過)才創建的。我們将原生的消費者标識由 {IP}@{消費者group}擴展為 {IP}@{消費者group}{topic}{tag},例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk。任何一個元素不同,都認為是不同的消費端,每個消費端會擁有一份自己消費對列(默認是broker對列數量*broker數量)。新挂載的消費者對列中擁有mitlog中的所有數據。
如果有需要,可以查看Rocketmq更多源碼解析
三、 Rocketmq如何支持分布式事務消息
A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務一緻性,通過引入中間層MQ,
A和MQ保持事務一緻性(異常情況下通過MQ反查A接口實現check),
B和MQ保證事務一緻(通過重試),從而達到最終事務一緻性。
原理:大事務 = 小事務 + 異步
流程圖
上圖是RocketMQ提供的保證MQ消息、DB事務一緻性的方案。
MQ消息、DB操作一緻性方案:
1)發送消息到MQ服務器,此時消息狀态為SEND_OK。此消息為consumer不可見。
2)執行DB操作;DB執行成功Commit DB操作,DB執行失敗Rollback DB操作。
3)如果DB執行成功,回複MQ服務器,将狀态為COMMIT_MESSAGE;如果DB執行失敗,回複MQ服務器,将狀态改為ROLLBACK_MESSAGE。注意此過程有可能失敗。
4)MQ内部提供一個名為“事務狀态服務”的服務,此服務會檢查事務消息的狀态,如果發現消息未COMMIT,則通過Producer啟動時注冊的TransactionCheckListener來回調業務系統,業務系統在checkLocalTransactionState方法中檢查DB事務狀态,如果成功,則回複COMMIT_MESSAGE,否則回複ROLLBACK_MESSAGE。
說明:
上面以DB為例,其實此處可以是任何業務或者數據源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE 均是client jar提供的狀态,在MQ服務器内部是一個數字。
TransactionCheckListener 是在消息的mit或者rollback消息丢失的情況下才會回調(上圖中灰色部分)。這種消息丢失隻存在于斷網或者RocketMQ集群挂了的情況下。當RocketMQ集群挂了,如果采用異步刷盤,存在1s内數據丢失風險,異步刷盤場景下保障事務沒有意義。所以如果要核心業務用RocketMQ解決分布式事務問題,建議選擇同步刷盤模式。
當需要保證多方(超過2方)的分布式一緻性,上面的兩方事務一緻性(通過RocketMQ的事務性消息解決)已經無法支持。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。
以上圖交易系統為例:
1)交易系統創建訂單(往DB插入一條記錄),同時發送訂單創建消息。通過RocketMQ事務性消息保證一緻性
2)接着執行完成訂單所需的同步核心RPC服務(非核心的系統通過監聽MQ消息自行處理,處理結果不會影響交易狀态)。執行成功更改訂單狀态,同時發送MQ消息。
3)交易系統接受自己發送的訂單創建消息,通過定時調度系統創建延時回滾任務(或者使用RocketMQ的重試功能,設置第二次發送時間為定時任務的延遲創建時間。在非消息堵塞的情況下,消息第一次到達延遲為1ms左右,這時可能RPC還未執行完,訂單狀态還未設置為完成,第二次消費時間可以指定)。延遲任務先通過查詢訂單狀态判斷訂單是否完成,完成則不創建回滾任務,否則創建。 PS:多個RPC可以創建一個回滾任務,通過一個消費組接受一次消息就可以;也可以通過創建多個消費組,一個消息消費多次,每次消費創建一個RPC的回滾任務。 回滾任務失敗,通過MQ的重發來重試。
以上是交易系統和其他系統之間保持最終一緻性的解決方案。
如下為A給B轉賬的例子。
1鎖定A的賬戶
2鎖定B的賬戶
3檢查A賬戶是否有1元
4A的賬戶扣減1元
5給B的賬戶加1元
6解鎖B的賬戶
7解鎖A的賬戶
步驟動作
以上過程在代碼層面,甚至可以簡化到在一個事物中,執行兩條sql語句。
2) 分布式環境下事務
和單機事務不同,A、B賬戶可能不在同一個DB中,此時無法像在單機情況下使用事務來實現。
此時可以通過一下方式實現,将轉賬操作分成兩個操作。
a) A賬戶
1鎖定A的賬戶
2檢查A賬戶是否有1元
3A的賬戶扣減1元
4解鎖A的賬戶
步驟動作
b) MQ消息
A賬戶數據發生變化時,發送MQ消息,MQ服務器将消息推送給轉賬系統,轉賬系統來給B賬号加錢。
c) B賬戶
1鎖定B的賬戶
2給B的賬戶加1元
3解鎖B的賬戶
步驟動作
四、 順序消息
1. 普通消費
2. 順序消費
3. 事務消費
順序消費場景:在網購的時候,我們需要下單,那麼下單需要假如有三個順序:
第一、創建訂單
第二:訂單付款
第三:訂單完成
也就是這三個環節要有順序,這個訂單才有意義,RocketMQ可以保證順序消費。
RocketMQ 實現順序消費的原理:produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端隻有一個線程去消費消息
注意:是把把消息發到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue
1. 順序消息缺陷
發送順序消息,無法利用集群Fail Over特性,消費順序消息的并行度依賴于隊列數量隊列熱點問題,個别隊列由于哈希不均導緻消息過多,消費速度跟不上,産生消息堆積問題遇到消息失敗的消息,無法跳過,當前隊列消費暫停。
2. 原理
produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器為MessageListenerOrderly,這樣就可以保證消費端隻有一個線程去消費消息。
注意:把消息發到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue
3. 擴展
可以通過實現發送消息的隊列選擇器方法,實現部分順序消息。
舉例:比如一個數據庫通過MQ來同步,隻需要保證每個表的數據是同步的就可以。解析binlog,将表名作為隊列選擇器的參數,這樣就可以保證每個表的數據到同一個對列裡面,從而保證表數據的順序消費
五、 最佳實踐
一個應用盡可能用一個Topic,消息子類型用tags來标識,tags可以由應用自由設置。
隻有發送消息設置了tags,消費方在訂閱消息時,才可以利用tags 在broker做消息過濾。
每個消息在業務層面的唯一标識碼,要設置到 keys 字段,方便将來定位消息丢失問題。服務器會為每個消息創建索引(哈希索引),應用可以通過 topic,key來查詢這條消息内容,以及消息被誰消費。由于是哈希索引,請務必保證key 盡可能唯一,這樣可以避免潛在的哈希沖突。
// 訂單Id
String orderId= '20034568923546';
message.setKeys(orderId);
消息發送成功或者失敗,要打印消息日志,務必要打印 send result 和key 字段。
send消息方法,隻要不抛異常,就代表發送成功。但是發送成功會有多個狀态,在sendResult裡定義。
SEND_OK:消息發送成功
FLUSH_DISK_TIMEOUT:消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,隻有此時服務器宕機,消息才會丢失
FLUSH_SLAVE_TIMEOUT:消息發送成功,但是服務器同步到Slave時超時,消息已經進入服務器隊列,隻有此時服務器宕機,消息才會丢失
SLAVE_NOT_AVAILABLE:消息發送成功,但是此時slave不可用,消息已經進入服務器隊列,隻有此時服務器宕機,消息才會丢失
RocketMQ使用的消息原語是At Least Once,所以consumer可能多次收到同一個消息,此時務必做好幂等。
幂等(idempotent、idempotence)是一個數學與計算機學概念,常見于抽象代數中。
在編程中一個幂等操作的特點是:其任意多次執行所産生的影響(結果),均與一次執行的影響相同。
幂等函數,或幂等方法,是指可以使用相同參數重複執行,并能獲得相同結果的函數。這些函數不會影響系統狀态,也不用擔心重複執行會對系統造成改變。
例如,“setTrue()”函數就是一個幂等函數,無論多次執行,其結果都是一樣的。更複雜的操作幂等保證,是利用唯一交易号(流水号)實現。
消費時記錄日志,以便後續定位問題。
盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。
MQ 消息隊列的底層原理
1、生産者
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer('ProducerGroupName');
producer.start();
for (int i = 0; i < 128; i++)
try {
Message msg = new Message('TopicTest',
'TagA',
'OrderID188',
'Hello world'.getBytes
(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf('%s%n', sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
2、消費者
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer('CID_JODIE_1');
consumer.subscribe('TopicTest', '*');
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// wrong time format 2017_0422_221800
consumer.setConsumeTimestamp('20181109221800');
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf('%s Receive New Messages: %s %n', Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf('Consumer Started.%n');
}
}
3、RocketMQ架構原理
對于RocketMQ先抛出幾個問題:
RocketMQ的topic和隊列是什麼樣的,和Kafka的分區有什麼不同?
RocketMQ網絡模型是什麼樣的,和Kafka對比如何?
RocketMQ消息存儲模型是什麼樣的,如何保證高可靠的存儲,和Kafka對比如何?
3.1 RocketMQ架構圖
對于RocketMQ的架構圖,在大體上來看和Kafka并沒有太多的差别,
但是在很多細節上是有很多差别的,接下來會一一進行講述。
3.2 RocketMQ名詞解釋
在3.1的架構中我們有 多個Producer,多個主Broker,多個從Broker
每個Producer可以對應多個Topic,每個Consumer也可以消費多個Topic,多對多的關系
Broker信息會上報至NameServer,Consumer會從NameServer中拉取Broker和Topic的信息。
Producer:消息生産者,向Broker發送消息的客戶端
Consumer:消息消費者,從Broker讀取消息的客戶端
Broker:消息中間的處理節點,這裡和kafka不同,kafka的Broker沒有主從的概念,都可以寫入請求以及備份其他節點數據,RocketMQ隻有主Broker節點才能寫,一般也通過主節點讀,當主節點有故障或者一些其他特殊情況才會使用從節點讀,有點類似- 于mysql的主從架構。
Topic:消息主題,一級消息類型,生産者向其發送消息, 消費者讀取其消息。
Group:分為ProducerGroup,ConsumerGroup,代表某一類的生産者和消費者,一般來說同一個服務可以作為Group,同一個Group一般來說發送和消費的消息都是一樣的。
Tag:Kafka中沒有這個概念,Tag是屬于二級消息類型,一般來說業務有關聯的可以使用同一個Tag,比如訂單消息隊列,使用Topic_Order,Tag可以分為Tag_食品訂單,Tag_服裝訂單等等。
Queue: 在kafka中叫Partition,每個Queue内部是有序的,在RocketMQ中分為讀和寫兩種隊列,一般來說讀寫隊列數量一緻,如果不一緻就會出現很多問題。
NameServer:Kafka中使用的是ZooKeeper保存Broker的地址信息,以及Broker的Leader的選舉,在RocketMQ中并沒有采用選舉Broker的策略,所以采用了無狀态的NameServer來存儲,由于NameServer是無狀态的,集群節點之間并不會通信,所以上傳數據的時候都需要向所有節點進行發送。
很多朋友都在問什麼是無狀态呢?狀态的有無實際上就是數據是否會做存儲,有狀态的話數據會被持久化,無狀态的服務可以理解就是一個内存服務,NameServer本身也是一個内存服務,所有數據都存儲在内存中,重啟之後都會丢失。
3.3 Topic 和 Queue
在RocketMQ中的每一條消息,都有一個Topic,用來區分不同的消息。一個Topic主題一般會有多個消息的訂閱者,當生産者發布消息到某個主題時,訂閱了這個主題的消費者都可以接收到生産者寫入的新消息。
在Topic中有分為了多個Queue,這其實是我們發送/讀取消息通道的最小單位,我們發送消息都需要指定某個寫入某個Queue,拉取消息的時候也需要指定拉取某個Queue,所以我們的順序消息可以基于我們的Queue維度保持隊列有序,如果想做到全局有序那麼需要将Queue大小設置為1,這樣所有的數據都會在Queue中有序。
在上圖中我們的Producer會通過一些策略進行Queue的選擇:
1)非順序消息:非順序消息一般直接采用輪訓發送的方式進行發送。
2)順序消息:根據某個Key,比如我們常見的訂單Id,用戶Id,進行Hash,将同一類數據放在同一個隊列中,保證我們的順序性。
我們同一組Consumer也會根據一些策略來選Queue,常見的比如平均分配或者一緻性Hash分配。
要注意的是當Consumer出現下線或者上線的時候,這裡需要做重平衡,也就是Rebalance,RocketMQ的重平衡機制如下:
1)定時拉取broker,topic的最新信息
2)每隔20s做重平衡
3)随機選取當前Topic的一個主Broker,這裡要注意的是不是每次重平衡所有主Broker都會被選中,因為會存在一個Broker再多個Broker的情況
4)獲取當前Broker,當前ConsumerGroup的所有機器ID
5)然後進行策略分配
由于重平衡是定時做的,所以這裡有可能會出現某個Queue同時被兩個Consumer消費,所以會出現消息重複投遞。
Kafka的重平衡機制和RocketMQ不同,Kafka的重平衡是通過Consumer和Coordinator聯系來完成的,當Coordinator感知到消費組的變化,會在心跳過程中發送重平衡的信号,然後由一個ConsumerLeader進行重平衡選擇,然後再由Coordinator将結果通知給所有的消費者。
Queue 讀寫數量不一緻
在RocketMQ中Queue被分為讀和寫兩種,在最開始接觸RocketMQ的時候,一直以為讀寫隊列數量配置不一緻不會出現什麼問題的,比如當消費者機器很多的時候我們配置很多讀的隊列,但是實際過程中發現會出現消息無法消費和根本沒有消息消費的情況。
當寫的隊列數量大于讀的隊列的數量,當大于讀隊列這部分ID的寫隊列的數據會無法消費,因為不會将其分配給消費者。
當讀的隊列數量大于寫的隊列數量,那麼多的隊列數量就不會有消息被投遞進來。
這個功能在RocketMQ在我看來明顯沒什麼用,因為基本上都會設置為讀寫隊列大小一樣,這個為啥不直接将其進行統一,反而容易讓用戶配置不一樣出現錯誤。
這個問題在RocketMQ的Issue裡也沒有收到好的答案。
3.4 消費模型
一般來說消息隊列的消費模型分為兩種:MQPullConsumer 和 MQPushConsumer,基于推送的消息(push)模型和基于拉取(poll)的消息模型。
1)基于推送模型的消息系統,由消息代理記錄消費狀态。
消息代理将消息推送到消費者後,标記這條消息為已經被消費,但是這種方式無法很好地保證消費的處理語義。比如當我們把已經把消息發送給消費者之後,由于消費進程挂掉或者由于網絡原因沒有收到這條消息,如果我們在消費代理将其标記為已消費,這個消息就永久丢失了。如果我們利用生産者收到消息後回複這種方法,消息代理需要記錄消費狀态,這種不可取。
用過RocketMQ的同學肯定不禁會想到,在RocketMQ中不是提供了兩種消費者嗎?
MQPullConsumer和MQPushConsumer,其中MQPushConsumer不就是我們的推模型嗎?
其實這兩種模型都是客戶端主動去拉消息,其中的實現區别如下:
1)MQPullConsumer
每次拉取消息需要傳入拉取消息的offset和每次拉取多少消息量,具體拉取哪裡的消息,拉取多少是由客戶端控制。
2)MQPushConsumer
同樣也是客戶端主動拉取消息,但是消息進度是由服務端保存,Consumer會定時上報自己消費到哪裡,所以Consumer下次消費的時候是可以找到上次消費的點,一般來說使用PushConsumer我們不需要關心offset和拉取多少數據,直接使用即可。
集群消費和廣播消費
消費模式我們分為兩種,集群消費,廣播消費:
1)集群消費
同一個GroupId都屬于一個集群,一般來說一條消息,隻會被任意一個消費者處理。
2)廣播消費
廣播消費的消息會被集群中所有消費者進行消息,但是要注意:因為廣播消費的offset在服務端保存成本太高,所以客戶端每一次重啟都會從最新消息消費,而不是上次保存的offset。
3.5 網絡模型
在Kafka中使用的原生的socket實現網絡通信,而RocketMQ使用的是Netty網絡框架,現在越來越多的中間件都不會直接選擇原生的socket,而是使用的Netty框架,主要得益于下面幾個原因:
1)API使用簡單,不需要關心過多的網絡細節,更專注于中間件邏輯。
2)性能高。
3)成熟穩定,jdk nio的bug都被修複了。
選擇框架是一方面,而想要保證網絡通信的高效,網絡線程模型也是一方面,我們常見的有
1+N(1個Acceptor線程,N個IO線程)
1+N+M(1個acceptor線程,N個IO線程,M個worker線程)等模型
RocketMQ使用的是 1+N1+N2+M 的模型,如下圖所示:
1個acceptor線程,N1個IO線程,N2個線程用來做Shake-hand,SSL驗證,編解碼,M個線程用來做業務處理。這樣的好處将編解碼,和SSL驗證等一些可能耗時的操作放在了一個單獨的線程池,不會占據我們業務線程和IO線程。
3.6 高可靠的分布式存儲模型
做為一個好的消息系統,高性能的存儲,高可用都不可少。
3.6.1 高性能日志存儲
RocketMQ和Kafka的存儲核心設計有很大的不同,所以其在寫入性能方面也有很大的差别,這是2016年阿裡中間件團隊對RocketMQ和Kafka不同Topic下做的性能測試:
産品Topic數量發送端并發數發送端RT(ms)發送端TPS消費端TPS
RocketMQ6480089000086000
12880097800077000
256800107500075000
Kafka648005136000136000
1282562385008500
25625613322152352
從上可以看出:
Kafka在Topic數量由64增長到256時,吞吐量下降了98.37%
RocketMQ在Topic數量由64增長到256時,吞吐量隻下降了16%
這是為什麼呢?kafka一個topic下面的所有消息都是以partition的方式分布式的存儲在多個節點上。同時在kafka的機器上,每個Partition其實都會對應一個日志目錄,在目錄下面會對應多個日志分段。所以如果Topic很多的時候Kafka雖然寫文件是順序寫,但實際上文件過多,會造成磁盤IO競争非常激烈。
那RocketMQ為什麼在多Topic的情況下,依然還能很好的保持較多的吞吐量呢?
我們首先來看一下RocketMQ中比較關鍵的文件:
這裡有四個目錄(這裡的解釋就直接用RocketMQ官方的了):
mitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體内容,消息内容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;
config:保存一些配置信息,包括一些Group,Topic以及Consumer消費offset等信息。
consumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍曆mitlog文件中根據topic檢索消息是非常低效的。
Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的mitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:HOME storeindex${fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。 我們發現我們的消息主體數據并沒有像Kafka一樣寫入多個文件,而是寫入一個文件,這樣我們的寫入IO競争就非常小,可以在很多Topic的時候依然保持很高的吞吐量。有同學說這裡的ConsumeQueue寫是在不停的寫入呢,并且ConsumeQueue是以Queue維度來創建文件,那麼文件數量依然很多,在這裡ConsumeQueue的寫入的數據量很小,每條消息隻有20個字節,30W條數據也才6M左右,所以其實對我們的影響相對Kafka的Topic之間影響是要小很多的。我們整個的邏輯可以如下:
Producer不斷的再往CommitLog添加新的消息,有一個定時任務ReputService會不斷的掃描新添加進來的CommitLog,然後不斷的去構建ConsumerQueue和Index。
注意:這裡指的都是普通的硬盤,在SSD上面多個文件并發寫入和單個文件寫入影響不大。
讀取消息
Kafka中每個Partition都會是一個單獨的文件,所以當消費某個消息的時候,會很好的出現順序讀,我們知道OS從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取,将數據放入PageCache,所以Kafka的讀取消息性能比較好。
RocketMQ讀取流程如下:
先讀取ConsumerQueue中的offset對應CommitLog物理的offset
根據offset讀取CommitLog
ConsumerQueue也是每個Queue一個單獨的文件,并且其文件體積小,所以很容易利用PageCache提高性能。而CommitLog,由于同一個Queue的連續消息在CommitLog其實是不連續的,所以會造成随機讀,RocketMQ對此做了幾個優化:
Mmap映射讀取,Mmap的方式減少了傳統IO将磁盤文件數據在操作系統内核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷
使用DeadLine調度算法+SSD存儲盤
由于Mmap映射受到内存限制,當不在Mmmap映射這部分數據的時候(也就是消息堆積過多),默認是内存的40%,會将請求發送到SLAVE,減緩Master的壓力
3.6.2 可用性
3.6.2.1 集群模式
我們首先需要選擇一種集群模式,來适應我們可忍耐的可用程度,一般來說分為四種:
1)單Master
這種模式,可用性最低,但是成本也是最低,一旦宕機,所有都不可用。這種一般隻适用于本地測試。
2)單Master多SLAVE
這種模式,可用性一般,如果主宕機,那麼所有寫入都不可用,讀取依然可用,如果master磁盤損壞,可以依賴slave的數據。
3)多Master
這種模式,可用性一般,如果出現部分master宕機,那麼這部分master上的消息都不可消費,也不可寫數據,如果一個Topic的隊列在多個Master上都有,那麼可以保證沒有宕機的那部分可以正常消費,寫入。如果master的磁盤損壞會導緻消息丢失。
4)多Master多Slave
這種模式,可用性最高,但是維護成本也最高,當master宕機了之後,隻會出現在這部分master上的隊列不可寫入,但是讀取依然是可以的,并且如果master磁盤損壞,可以依賴slave的數據。
一般來說投入生産環境的話都會選擇第四種,來保證最高的可用性。
3.6.2.2 消息的可用性
當我們選擇好了集群模式之後,那麼我們需要關心的就是怎麼去存儲和複制這個數據,rocketMQ對消息的刷盤提供了同步和異步的策略來滿足我們的,當我們選擇同步刷盤之後,如果刷盤超時會給返回FLUSH_DISK_TIMEOUT,如果是異步刷盤不會返回刷盤相關信息,選擇同步刷盤可以盡最大程度滿足我們的消息不會丢失。
除了存儲有選擇之後,我們的主從同步提供了同步和異步兩種模式來進行複制,當然選擇同步可以提升可用性,但是消息的發送RT時間會下降10%左右。
3.6.3 Dleger
我們上面對于master-slave部署模式已經做了很多分析,我們發現,當master出現問題的時候,我們的寫入怎麼都會不可用,除非恢複master,或者手動将我們的slave切換成master,導緻了我們的Slave在多數情況下隻有讀取的作用。RocketMQ在最近的幾個版本中推出了Dleger-RocketMQ,使用Raft協議複制CommitLog,并且自動進行選主,這樣master宕機的時候,寫入依然保持可用。
有關Dleger-RocketMQ的信息更多的可以查看這篇文章:Dledger-RocketMQ 基于Raft協議的mitlog存儲庫
3.7 定時/延時消息
定時消息和延時消息在實際業務場景中使用的比較多,比如下面的一些場景:
1)訂單超時未支付自動關閉,因為在很多場景中下單之後庫存就被鎖定了,這裡需要将其進行超時關閉。
2)需要一些延時的操作,比如一些兜底的邏輯,當做完某個邏輯之後,可以發送延時消息比如延時半個小時,進行兜底檢查補償。
3)在某個時間給用戶發送消息,同樣也可以使用延時消息。
在開源版本的RocketMQ中延時消息并不支持任意時間的延時,需要設置幾個固定的延時等級,目前默認設置為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,從1s到2h分别對應着等級1到18,而阿裡雲中的版本(要付錢)是可以支持40天内的任何時刻(毫秒級别)。
我們先看下在RocketMQ中定時任務原理圖:
Step1:Producer在自己發送的消息上設置好需要延時的級别。
Step2: Broker發現此消息是延時消息,将Topic進行替換成延時Topic,每個延時級别都會作為一個單獨的queue,将自己的Topic作為額外信息存儲。
Step3: 構建ConsumerQueue
Step4: 定時任務定時掃描每個延時級别的ConsumerQueue。
Step5: 拿到ConsumerQueue中的CommitLog的Offset,獲取消息,判斷是否已經達到執行時間
Step6: 如果達到,那麼将消息的Topic恢複,進行重新投遞。如果沒有達到則延遲沒有達到的這段時間執行任務。
可以看見延時消息是利用新建單獨的Topic和Queue來實現的,如果我們要實現40天之内的任意時間度,基于這種方案,那麼需要402460601000個queue,這樣的成本是非常之高的,那阿裡雲上面的支持任意時間是怎麼實現的呢?這裡猜測是持久化二級TimeWheel時間輪,二級時間輪用于替代我們的ConsumeQueue,保存Commitlog-Offset,然後通過時間輪不斷的取出當前已經到了的時間,然後再次投遞消息。具體的實現邏輯需要後續會單獨寫一篇文章。
3.8 事務消息
事務消息同樣的也是RocketMQ中的一大特色,其可以幫助我們完成分布式事務的最終一緻性
具體使用事務消息步驟如下:
Step1:調用sendMessageInTransaction發送事務消息
Step2: 如果發送成功,則執行本地事務。
Step3: 如果執行本地事務成功則發送mit,如果失敗則發送rollback。
Step4: 如果其中某個階段比如mit發送失敗,rocketMQ會進行定時從Broker回查,本地事務的狀态。
事務消息的使用整個流程相對之前幾種消息使用比較複雜,下面是事務消息實現的原理圖:
Step1: 發送事務消息,這裡也叫做halfMessage,會将Topic替換為HalfMessage的Topic。
Step2: 發送mit或者rollback,如果是mit這裡會查詢出之前的消息,然後将消息複原成原Topic,并且發送一個OpMessage用于記錄當前消息可以删除。如果是rollback這裡會直接發送一個OpMessage删除。
Step3: 在Broker有個處理事務消息的定時任務,定時對比halfMessage和OpMessage,如果有OpMessage且狀态為删除,那麼該條消息必定mit或者rollback,所以就可以删除這條消息。
Step4: 如果事務超時(默認是6s),還沒有opMessage,那麼很有可能mit信息丢了,這裡會去反查我們的Producer本地事務狀态。
Step5: 根據查詢出來的信息做Step2。
我們發現RocketMQ實現事務消息也是通過修改原Topic信息,和延遲消息一樣,然後模拟成消費者進行消費,做一些特殊的業務邏輯。當然我們還可以利用這種方式去做RocketMQ更多的擴展。
總結以上是真正的電腦專家為你收集整理的Rocketmq原則和實踐(RocketMQ設計原則和最佳實踐)的全部内容,希望文章能夠幫你解決所遇到的問題。
如果覺得真正的電腦專家網站内容還不錯,歡迎将真正的電腦專家推薦給好友。
有話要說...