(2) Transactionality and idempotency of Kafka

ape come so dj2022-08-06 07:54:37


事务性Multiple producers are different to the same clustertopic投递消息时,数据一致性的保证,That is, the whole is not heavy, not lost, not disordered, and atomic(要么都成功要么都失败).在 Kafka on transactionality,There are three levels of meaning:One is the support of idempotency(Idempotency is the foundation of transactionality);The second is transactional support;三是 Kafka Streams 的 exactly once 的实现.
幂等性:Producer The idempotency of , refers to when the sametopicwhen sending the same message,数据在 Server 端只会被持久化一次,数据不丟不重.


Idempotency is the guarantee of a single producer to the same one in the same clustertopicThe delivered data is neither heavy nor lost nor out of order,Idempotency is the foundation of a transactional implementation

Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//It is enabled by default when idempotency is enabled,There is no need to manually configure the following three parameters
//client单个connectionThe maximum number of unacknowledged requests that can be held before blocking,默认为5
//clientThe number of retries after sending failed,默认为Integer.MAX_VALUE
//The number of message reliability confirmations,默认为1

2:The realization principle of idempotency
Producer 设置 at least once 时,Data duplication due to exception trigger retry mechanism,The purpose of idempotency is to solve this data duplication problem,简单来说就是:
at least once + 幂等 = exactly once

Kafka Producer There are the following two important mechanisms in the implementation:

  • PID(Producer id),用来标识每个 producer client,Unique per producer client;
  • sequence numbers,client Each message sent will carry the corresponding sequence number,Server 端就是根据这个值来判断数据是否重复,从0开始递增到.

client端:data before being sentProducerBatch 也提供了一个 setProducerState() 方法,它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),This information will accompany ProduceRequest 发到 Server 端,Server The end is also through these meta 来做相应的判断.

server端处理:当 Broker 收到 ProduceRequest 请求之后,会通过 handleProduceRequest() 做相应的处理.
1:Check if transactionality is turned on
2:Check if the data has pid,做数据校验
3:Then write the corresponding data.


Multiple producers are different to the same clustertopic投递消息时,数据一致性的保证,That is, the whole is not heavy, not lost, not disordered, and atomic(要么都成功要么都失败).通过设置transactional.id 事务id实现(其必须是唯一的).

当用户使用 Kafka 的事务性时,Kafka 可以做到的保证:

  • 跨会话的幂等性写入:即使中间故障,恢复后依然可以保持幂等性;
  • 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(commit 或者 abort);
  • 跨多个 Topic-Partition 的幂等性写入,Kafka 可以保证跨多个 Topic-Partition 的数据要么全部写入成功,要么全部失败,不会出现中间状态.

1:Turn on transactional

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
try {

String msg = "test";
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
catch (ProducerFencedException e1) {

e1.printStackTrace(); producer.close();
} catch (KafkaException e2) {

//When an exception occurs, the transaction is rolled back

2:Transactional problems to be solved
Transactionality is actually more about solving problems that are not solved in idempotency,比如:

  • 2.1:writing multiple Topic-Partition 时,A batch of write operations performed,There may be parts Topic-Partition 写入成功,部分写入失败(For example, the number of retries is reached),This is equivalent to an intermediate state,这并不是我们期望的结果;
  • 2.2:Producer After the application is suspended in the middle, it will be resumed,无法做到 Exactly-Once 语义保证;比如kafka-flink、kafka+spark等.

Exactly-Once,仅仅靠 Kafka 是无法做到的,The application itself also needs to do the corresponding fault-tolerant design,以 Flink 为例,Its fault-tolerant design is checkpoint 机制,Jobs are guaranteed every time checkpoint 成功时,It was processed before Exactly-Once 的,If an intermediate job fails,恢复之后,Just continue the last time checkpoint The records can be restored,Rollback is performed on the unfinished transaction before the failure(abort)就可以了,This is the case Flink + Kafka 端到端的 Exactly-Once
3:Transactional implementation principle

关于这点,The easiest one to think of should be citations 2PC 协议(It mainly solves the problem of data consistency in distributed systems)The role of the coordinator,Its role is to count the voting results of all participants,If everyone agrees it can commit,那么就执行 commit,否则执行 abort:

我们来想一下,Kafka Is it also possible to introduce a similar role to manage the state of the transaction,只有当 Producer 真正 commit 时,事务才会提交,Otherwise the transaction will still be in progress(The actual implementation also needs to be considered timeout 的情况),will not be in a completed state;

Producer when starting a transaction,告诉【协调者】事务开始,Then start to multiple Topic-Partition 写数据,Only this batch of data is all written(There is no exception in the middle),Producer 会调用 commit 接口进行 commit,Then the transaction actually commits,Otherwise if there is an exception in the middle,Then the transaction will be done abort(Producer 通过 abort 接口告诉【协调者】执行 abort 操作);

The coordinator here is with 2PC The coordinators in are slightly different,Mainly to manage transaction-related state information,这就是 Kafka Server 端的 TransactionCoordinator 角色;

为了保证TransactionCoordinator high availability and fault tolerance,事务数据(transaction log)就是 __transaction_state 这个内部 topic,All transaction state information is persisted to this topic,TransactionCoordinator Also doing failover from this topic 中恢复数据;

Similar articles