Instantiated a transactional producer.
TransactionManager
TransactionManager is created exclusively for a transactional KafkaProducer (when a user-configured transaction or idempotence are enabled).
|
Tip
|
A transactional KafkaProducer uses [Producer clientId=[clientId], transactionalId=[transactionalId]] prefix for logging. |
|
Tip
|
You should see one of the following INFO messages in the logs with a transactional KafkaProducer: or
|
// Enable TRACE logging level for KafkaProducer
val props = new java.util.Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val clientId = this.getClass.getSimpleName.replace("$", "")
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txId")
val producer = new KafkaProducer[String, String](props)
// Observe the logs
KafkaProducer uses the following configuration properties when creating a TransactionManager:
TransactionManager is transactional when the Transactional Id is assigned.
TransactionManager is in one of the following states:
-
UNINITIALIZEDwhen created -
INITIALIZING -
READY -
IN_TRANSACTION -
COMMITTING_TRANSACTION -
ABORTING_TRANSACTION -
ABORTABLE_ERROR -
FATAL_ERROR
| Name | Description |
|---|---|
|
Kafka Node Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
java.util.PriorityQueue of Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Kafka Node Used when…FIXME |
beginTransaction Method
void beginTransaction()
beginTransaction…FIXME
|
Note
|
beginTransaction is used when…FIXME
|
Creating TransactionManager Instance
TransactionManager takes the following to be created:
-
Transactional ID (as transactional.id)
-
transactionTimeoutMs(as transaction.timeout.ms) -
retryBackoffMs(as retry.backoff.ms)
TransactionManager initializes the internal registries and counters.
initializeTransactions Method
TransactionalRequestResult initializeTransactions()
initializeTransactions…FIXME
|
Note
|
initializeTransactions is used when…FIXME
|
sendOffsetsToTransaction Method
TransactionalRequestResult sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId)
sendOffsetsToTransaction…FIXME
|
Note
|
sendOffsetsToTransaction is used when…FIXME
|
beginCommit Method
TransactionalRequestResult beginCommit()
beginCommit…FIXME
|
Note
|
beginCommit is used when…FIXME
|