TransactionManager

TransactionManager is created exclusively for a transactional KafkaProducer (when a user-configured transaction or idempotence are enabled).

Tip

A transactional KafkaProducer uses [Producer clientId=[clientId], transactionalId=[transactionalId]] prefix for logging.

Tip

You should see one of the following INFO messages in the logs with a transactional KafkaProducer:

Instantiated a transactional producer.

or

Instantiated an idempotent producer.
// Enable TRACE logging level for KafkaProducer
val props = new java.util.Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val clientId = this.getClass.getSimpleName.replace("$", "")
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txId")
val producer = new KafkaProducer[String, String](props)

// Observe the logs

KafkaProducer uses the following configuration properties when creating a TransactionManager:

TransactionManager is transactional when the Transactional Id is assigned.

TransactionManager is in one of the following states:

  • UNINITIALIZED when created

  • INITIALIZING

  • READY

  • IN_TRANSACTION

  • COMMITTING_TRANSACTION

  • ABORTING_TRANSACTION

  • ABORTABLE_ERROR

  • FATAL_ERROR

Table 1. TransactionManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

consumerGroupCoordinator

Kafka Node

Used when…​FIXME

inflightBatchesBySequence

Map<TopicPartition, PriorityQueue<ProducerBatch>>

Used when…​FIXME

lastAckedOffset

Map<TopicPartition, Long>

Used when…​FIXME

lastAckedSequence

Map<TopicPartition, Integer>

Used when…​FIXME

newPartitionsInTransaction

Set<TopicPartition>

Used when…​FIXME

nextSequence

Map<TopicPartition, Integer>

Used when…​FIXME

partitionsInTransaction

Set<TopicPartition>

Used when…​FIXME

partitionsWithUnresolvedSequences

Set<TopicPartition>

Used when…​FIXME

pendingPartitionsInTransaction

Set<TopicPartition>

Used when…​FIXME

pendingRequests

java.util.PriorityQueue of TxnRequestHandlers

Used when…​FIXME

pendingTxnOffsetCommits

Map<TopicPartition, CommittedOffset>

Used when…​FIXME

producerIdAndEpoch

ProducerIdAndEpoch

Used when…​FIXME

transactionCoordinator

Kafka Node

Used when…​FIXME

beginTransaction Method

void beginTransaction()

beginTransaction…​FIXME

Note
beginTransaction is used when…​FIXME

Creating TransactionManager Instance

TransactionManager takes the following to be created:

TransactionManager initializes the internal registries and counters.

initializeTransactions Method

TransactionalRequestResult initializeTransactions()

initializeTransactions…​FIXME

Note
initializeTransactions is used when…​FIXME

sendOffsetsToTransaction Method

TransactionalRequestResult sendOffsetsToTransaction(
  Map<TopicPartition, OffsetAndMetadata> offsets,
  String consumerGroupId)

sendOffsetsToTransaction…​FIXME

Note
sendOffsetsToTransaction is used when…​FIXME

beginCommit Method

TransactionalRequestResult beginCommit()

beginCommit…​FIXME

Note
beginCommit is used when…​FIXME

beginAbort Method

TransactionalRequestResult beginAbort()

beginAbort…​FIXME

Note
beginAbort is used when…​FIXME

maybeAddPartitionToTransaction Method

void maybeAddPartitionToTransaction(TopicPartition topicPartition)

maybeAddPartitionToTransaction…​FIXME

Note
maybeAddPartitionToTransaction is used when…​FIXME

results matching ""

    No results matching ""