HeartbeatThread is a daemon Kafka thread of execution that AbstractCoordinator starts to send HeartbeatRequests to the group coordinator (aka heartbeating) regularly every request.timeout.ms (default: 30000 ms).

HeartbeatThread is created and immediately started exclusively for AbstractCoordinator when requested to start a HeartbeatThread (when ConsumerCoordinator is requested to poll for coordinator events after KafkaConsumer was requested to poll for records).

HeartbeatThread uses kafka-coordinator-heartbeat-thread prefix followed by the group ID as the thread name.

kafka-coordinator-heartbeat-thread | [groupId]

HeartbeatThread uses closed flag to indicate…​FIXME

HeartbeatThread uses enabled flag to indicate whether to wait for AbstractCoordinator to notify it (release). The flag is off (false) initially and can be turned on (true) and off by enable or disable, respectively.

HeartbeatThread takes no arguments to be created.

Starting Thread — run Method

void run()
run is part of the java.lang.Runnable to start itself as a separately-executing thread.

run prints out the following DEBUG message to the logs:

Heartbeat thread started

In an infinite while loop, run does the following (infinitely):

  1. Checks the internal state to exit or wait:

    1. Exits when closed

    2. Waits for AbstractCoordinator to wake it up when not enabled

  2. Requests the ConsumerNetworkClient to pollNoWakeup

  3. Sends a HeartbeatRequest to the group coordinator

  4. (when the response eventually arrives) Requests the Heartbeat to record a success (receiveHeartbeat) or a failure (failHeartbeat)

In the end, when the infinite while loop is terminated for any reason, run prints out the following DEBUG message to the logs:

Heartbeat thread has closed
run does some other state control checks but they look uninteresting…​to me…​now.

Disabling HeartbeatThread — disable Method

void disable()

disable prints out the following DEBUG message to the logs and turns the enabled flag off (false).

Disabling heartbeat thread

disable is used when:

Enabling HeartbeatThread — enable Method

void enable()

enable prints out the following DEBUG message to the logs:

Enabling heartbeat thread

enable turns the enabled flag on (true) and requests the Heartbeat to resetTimeouts.

In the end, enable notifies (wakes up) the owning AbstractCoordinator.

enable is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.

results matching ""

    No results matching ""