StateStoreCoordinator RPC Endpoint — Tracking Locations of StateStores for StateStoreRDD

StateStoreCoordinator keeps track of state stores on Spark executors (per host and executor ID).

StateStoreCoordinator is used by StateStoreRDD when requested to get the location preferences of partitions (based on the location of the stores).

StateStoreCoordinator is a ThreadSafeRpcEndpoint RPC endpoint that manipulates instances registry through RPC messages.

Table 1. StateStoreCoordinator RPC Endpoint’s Messages and Message Handlers
Message Message Handler

DeactivateInstances

Removes StateStoreProviderIds of a streaming query (given runId)


Internally, StateStoreCoordinator finds the StateStoreProviderIds of the streaming query per queryRunId and the given runId and removes them from the instances internal registry.

StateStoreCoordinator prints out the following DEBUG message to the logs:

Deactivating instances related to checkpoint location [runId]: [storeIdsToRemove]

GetLocation

Gives the location of StateStoreProviderId (from instances) with the host and an executor id on that host.

You should see the following DEBUG message in the logs:

Got location of the state store [id]: [executorId]

ReportActiveInstance

One-way asynchronous (fire-and-forget) message to register a new StateStoreProviderId on an executor (given host and executorId).

Sent out exclusively when StateStoreCoordinatorRef RPC endpoint reference is requested to reportActiveInstance (when StateStore utility is requested to look up the StateStore by provider ID when the StateStore and a corresponding StateStoreProvider were just created and initialized).


Internally, StateStoreCoordinator prints out the following DEBUG message to the logs:

Reported state store [id] is active at [executorId]

In the end, StateStoreCoordinator adds the StateStoreProviderId to the instances internal registry.

StopCoordinator

Stops StateStoreCoordinator RPC Endpoint

You should see the following DEBUG message in the logs:

StateStoreCoordinator stopped

VerifyIfInstanceActive

Verifies if a given StateStoreProviderId is registered (in instances) on executorId

You should see the following DEBUG message in the logs:

Verified that state store [id] is active: [response]
Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.state.StateStoreCoordinator=ALL

Refer to Logging.

instances Internal Registry

instances: HashMap[StateStoreProviderId, ExecutorCacheTaskLocation]

instances is an internal registry of StateStoreProviders by their StateStoreProviderIds and ExecutorCacheTaskLocations (with a host and a executorId).

results matching ""

    No results matching ""