Rocketmq source code analysis: how does rocketmq store messages?

Java knowledge hall 2021-11-25 16:41:16

 Insert picture description here

Broker Files used by the client

 Insert picture description here
So let's see RocketMQ Message storage process , When a message is sent to RocketMQ Upper time , Will be written sequentially CommitLog file , This ensures high performance and high throughput of message storage .

But the message is according to Topic To consume , If consumption starts from CommitLog When searching for the corresponding message on , It will be slow . In order to improve the efficiency of message consumption ,RocketMQ Will Topic The same message is put in ConsumerQueue in , Every ConsumerQueue It is divided into several write queues , A queue, a file .

Suppose you create a named TopicTest Of topic, And create 4 Write queues . So in RocketMQ Is stored in the following form
 Insert picture description here
It should be noted that ,CommitLog and ComsumerQueue Instead of storing the same message 2 Share .CommitLog Stores the original content of the message , and ComsumerQueue It mainly stores messages in CommitLog Offset in , See the following figure for the specific message format
 Please add a picture description
borker The message format stored on the client side is as follows

Content explain length
TOTALSIZE Total message length 4 byte
MAGICCODE Magic , Fixed value Oxdaa320a7 4 byte
BODYCRC news crc Check code 4 byte
QUEUEID Message queue id 4 byte
FLAG news flag, For application use 4 byte
QUEUEOFFSET The offset of the message in the consumption queue 8 byte
PHYSICALOFFSET Message in CommitLog Offset in file 8 byte
SYSFLAG The messaging system flag, For example, whether to compress , Whether it is a transaction message, etc 4 byte
BORNTIMESTAMP The producer invokes message sending API The timestamp 8 byte
BORNHOST Message sender ip, Port number 8 byte
STORETIMESTAMP Message store timestamp 8 byte
STOREHOSTADDRESS Broker The server ip+ Port number 8 byte
RECONSUMETIMES Number of message retries 4 byte
Prepared Transaction Offset Transaction message physical offset 8 byte
BodyLength Message body length 4 byte
Body Message body content BodyLength byte
TopicLength topic length ,1 byte , That is, the subject name cannot exceed 255 Characters 1 byte
Topic The theme TopicLength byte
PropertiesLength Message attribute length 2 byte
Properties Message properties PropertiesLength byte

ConsumerQueue The format of the message in is as follows
 Please add a picture description
From the storage diagram, we can see another IndexFile and CommitLog It also matters

IndexFile Its main function is to base on Message Key Find the corresponding message

IndexFile The file structure is as follows

 Insert picture description here
As you can see from the diagram ,IndexFile Mainly divided into the following 3 part ,IndexHead,Hash Slot ,Index entry

IndexHead The format is as follows

Field explain
beginTimestamp Minimum message storage time
endTimestamp Maximum message storage time
beginPhyOffset The minimum offset of the message (commitLog Offset in file )
endPhyOffset The maximum offset of the message (commitLog Offset in file )
hashSlotCount hash Number of slots
indexCount index Number of entries currently in use

Hash The contents stored in the slot are those falling in the slot Hash In slot Index The index of ( Look at the diagram in the back and you will be very clear )

Every Index The format of the entry is as follows

Field explain
hashcode key Of hashcode
phyoffset Offset of message (commitLog Offset in file )
timedif The difference between the message storage time and the timestamp of the first message , Less than 0 The message is invalid
pre index no Of the previous record of this entry Index Indexes , When hash When the conflict , Used to build linked lists

IndexFile The composition process is more troublesome , Draw a picture to demonstrate , You can take IndexFile Think of it as a file based implementation HashMap.

Let's say the length of the array is 10 Of HashMap In turn into 3 strip key by 11,34,21 The data of ,HashMap The structure is as follows
 Insert picture description here
take key by 11,34,21 Put your data in IndexFile The process is as follows ( If hash The number of slots is 10
 Insert picture description here
The specific process is

  1. Put the message order in Index In entry , take 11 Put it in index=1 The location of ( use index[1] It means ha ),11%1=1, Work out hash The position of the slot is 1, The stored value is 0( We are just beginning 0, use hash[0] Express ), take index[1].preIndexNo=hash[0]=0,hash[0]=1(1 by index Array subscript ha )
  2. take 34 Put it in index[2],34%10=4,index[2].preIndexNo=hash[0]=0
  3. take 21 Put it in index[3],21%10=1,index[3].preIndexNo=hash[1]=1

It can be seen from the picture that , Happen when hash When the conflict Index Objective preIndexNo Properties act as linked lists . The process of searching and HashMap similar , Locate the slot first , Then follow the list to find it .

Interested in specific algorithms can see the source code , I won't post code , A little bit more

// IndexFile The building process of
// IndexFile The search process

 Insert picture description here

Other documents

commitLog: Message store directory
consumequeue: Message queuing storage directory
index: Used to find messages
lock: Sometimes there will be more than one... On one machine broker, If the data file is placed in a directory , At this time, you can prompt you to use another directory through the lock , To prevent conflict
checkpoint: File checkpoints , Storage commitLog Time stamp of last disk swiping ,consumeQueue Time stamp of last disk swiping ,IndexFile Time stamp of last disk swiping
config: Some configuration information during operation
abort: If there is abort The document states Broker Abnormal shutdown , This file is created by default at startup , Delete on normal exit

 Please add a picture description

The source code parsing

Reference blog


Please bring the original link to reprint ,thank
Similar articles