ThreadCache

ThreadCache is created when:

Tip

Enable ALL logging level for org.apache.kafka.streams.state.internals.ThreadCache logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.state.internals.ThreadCache=ALL

Creating ThreadCache Instance

ThreadCache takes the following to be created:

ThreadCache initializes the internal properties.

StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG Configuration Property

When created, ThreadCache is given the maxCacheSizeBytes that is configured using StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG configuration property.

The maxCacheSizeBytes is used when ThreadCache is requested to maybeEvict (when requested to put and putIfAbsent).

Building Cache Namespace — nameSpaceFromTaskIdAndStore Static Method

String nameSpaceFromTaskIdAndStore(
  String taskIDString,
  String underlyingStoreName)

nameSpaceFromTaskIdAndStore builds a cache namespace to be of the format:

[taskIDString]-[underlyingStoreName]
Note
nameSpaceFromTaskIdAndStore is used exclusively when CachingKeyValueStore is requested to initialize (via initInternal).

Registering DirtyEntryFlushListener For Namespace — addDirtyEntryFlushListener Method

void addDirtyEntryFlushListener(
  String namespace,
  DirtyEntryFlushListener listener)

addDirtyEntryFlushListener gets or creates a new named cache (by the namespace) and requests it to set the flush listener with the DirtyEntryFlushListener.

Note
addDirtyEntryFlushListener is used when the state stores with caching enabled (i.e. CachingKeyValueStore, CachingSessionStore, and CachingWindowStore) are requested to initialize.

Closing NamedCache by Namespace — close Method

void close(String namespace)

close…​FIXME

Note
close is used when…​FIXME

Putting Key And Value Into Namespace — put Method

void put(
  String namespace,
  Bytes key,
  LRUCacheEntry value)

put…​FIXME

Note
put is used when…​FIXME

putIfAbsent Method

LRUCacheEntry putIfAbsent(
  String namespace,
  Bytes key,
  LRUCacheEntry value)

putIfAbsent…​FIXME

Note
putIfAbsent is used when…​FIXME

flush Method

void flush(String namespace)

flush…​FIXME

Note
flush is used when…​FIXME

Getting Or Creating New Named Cache by Namespace — getOrCreateCache Internal Method

NamedCache getOrCreateCache(String name)

getOrCreateCache…​FIXME

Note
getOrCreateCache is used when ThreadCache is requested to addDirtyEntryFlushListener, put a key and a value into a namespace, putIfAbsent, and maybeEvict.

maybeEvict Internal Method

void maybeEvict(String namespace)

maybeEvict…​FIXME

Note
maybeEvict is used when ThreadCache is requested to put and putIfAbsent.

sizeBytes Method

long sizeBytes()

sizeBytes…​FIXME

In the end, sizeBytes prints out the following TRACE message to the logs:

Evicted [numEvicted] entries from cache [namespace]
Note
sizeBytes is used exclusively when ThreadCache is requested to maybeEvict (when requested to put and putIfAbsent).

Internal Properties

Name Description

caches

Collection of NamedCaches by namespace (Map<String, NamedCache>)

Used when…​FIXME

numPuts

Number of put and putIfAbsent

Default: 0

Used exclusively in flush to print the current value with TRACE logging level enabled

results matching ""

    No results matching ""