Instantiated a transactional producer.
TransactionManager
TransactionManager
is created exclusively for a transactional KafkaProducer (when a user-configured transaction or idempotence are enabled).
Tip
|
A transactional KafkaProducer uses [Producer clientId=[clientId], transactionalId=[transactionalId]] prefix for logging. |
Tip
|
You should see one of the following INFO messages in the logs with a transactional KafkaProducer: or
|
// Enable TRACE logging level for KafkaProducer
val props = new java.util.Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val clientId = this.getClass.getSimpleName.replace("$", "")
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txId")
val producer = new KafkaProducer[String, String](props)
// Observe the logs
KafkaProducer
uses the following configuration properties when creating a TransactionManager
:
TransactionManager
is transactional when the Transactional Id is assigned.
TransactionManager
is in one of the following states:
-
UNINITIALIZED
when created -
INITIALIZING
-
READY
-
IN_TRANSACTION
-
COMMITTING_TRANSACTION
-
ABORTING_TRANSACTION
-
ABORTABLE_ERROR
-
FATAL_ERROR
Name | Description |
---|---|
|
Kafka Node Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
java.util.PriorityQueue of Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Kafka Node Used when…FIXME |
beginTransaction
Method
void beginTransaction()
beginTransaction
…FIXME
Note
|
beginTransaction is used when…FIXME
|
Creating TransactionManager Instance
TransactionManager
takes the following to be created:
-
Transactional ID (as transactional.id)
-
transactionTimeoutMs
(as transaction.timeout.ms) -
retryBackoffMs
(as retry.backoff.ms)
TransactionManager
initializes the internal registries and counters.
initializeTransactions
Method
TransactionalRequestResult initializeTransactions()
initializeTransactions
…FIXME
Note
|
initializeTransactions is used when…FIXME
|
sendOffsetsToTransaction
Method
TransactionalRequestResult sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId)
sendOffsetsToTransaction
…FIXME
Note
|
sendOffsetsToTransaction is used when…FIXME
|
beginCommit
Method
TransactionalRequestResult beginCommit()
beginCommit
…FIXME
Note
|
beginCommit is used when…FIXME
|