Configuration Properties

Table 1. Configuration Properties
Name Description

application.id

Application ID that is the required identifier of a Kafka Streams stream processing application

Default: (empty)

application.id must be unique within the Kafka cluster as it is used as a namespace for the default client-id prefix, the group-id for membership management, and the prefix for internal topics (that Kafka Streams creates internally).

application.id is used as a Kafka consumer group ID (that you can check out using kafka-consumer-groups.sh script)

Use StreamsConfig.APPLICATION_ID_CONFIG to reference the property.

application.server

A host:port pair of a user-defined endpoint of a single instance in multi-instance Kafka Streams application that can be used for discovering the locations of local state stores

Default: (empty)

Used exclusively when StreamPartitionAssignor is requested to configure itself.

Use StreamsConfig.APPLICATION_SERVER_CONFIG to reference the property.

buffered.records.per.partition

Maximum number of records to buffer per partition

Default: 1000

Used exclusively when StreamTask is requested to process a single record and buffer new records to resume and pause a partition, respectively

Use StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG to reference the property.

cache.max.bytes.buffering

Maximum number of memory bytes to be used for buffering across all threads (StreamThreads and GlobalStreamThread)

Default: 10 * 1024 * 1024L (i.e. 10MB)

Use StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to reference the property.

client.id

Client ID of internal consumer, producer and restore-consumer, with pattern <client.id>-StreamThread-<threadSequenceNumber>-<consumer,producer,restore-consumer>

Default: (empty)

Used when:

  • KafkaStreams is created (and initializes the clientId internal property)

  • StreamsPartitionAssignor is requested to configure

Use StreamsConfig.CLIENT_ID_CONFIG to reference the property.

commit.interval.ms

Flush interval or commit interval (in millis), i.e. how often StreamThread attempts to commit the position of processors and flush state stores

Default:

Must be at least 0L

Use StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to reference the property.

default.windowed.key.serde.inner

Inner serde class that implements the org.apache.kafka.common.serialization.Serde interface

Default: (empty)

default.windowed.value.serde.inner

Inner serde class that implements the org.apache.kafka.common.serialization.Serde interface

Default: (empty)

default.timestamp.extractor

Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface.

max.task.idle.ms

How long a stream task will stay idle when not all of its partition buffers contain records, to avoid potential out-of-order record processing across multiple input streams.

Default: 0L

Use StreamsConfig.MAX_TASK_IDLE_MS_CONFIG to reference the property.

metrics.recording.level

The highest recording level for metrics

Default: INFO

Possible values are:

  • INFO

  • DEBUG

Use StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG to reference the property.

num.standby.replicas

The number of standby replicas per processing task

Default: 0

  • Used when StreamsPartitionAssignor is requested to configure

Use StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG to reference the property.

num.stream.threads

The number of stream processor threads (that KafkaStreams uses for stream processing)

Default: 1

Use StreamsConfig.NUM_STREAM_THREADS_CONFIG to reference the property.

partition.grouper

poll.ms

Polling interval (in milliseconds), i.e. the time spent waiting in Consumer.poll (unless data is available in the buffer already). If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.

Default: 100

Used when:

Use StreamsConfig.POLL_MS_CONFIG to reference the property.

processing.guarantee

Processing guarantee (aka Exactly-Once Support or EOS support)

Default: at_least_once

Possible values are:

  • at_least_once

  • exactly_once

Note

exactly-once processing guarantee requires a Kafka cluster with at least three brokers (which is the recommended setting for production).

For development you can change this by adjusting broker setting transaction.state.log.replication.factor.

Use StreamsConfig.PROCESSING_GUARANTEE_CONFIG to reference the property.

replication.factor

The replication factor for changelog topics and repartition topics created by a stream processing application

Default: 1

Use StreamsConfig.REPLICATION_FACTOR_CONFIG to reference the property.

state.cleanup.delay.ms

The amount of time (in milliseconds) to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least state.cleanup.delay.ms will be removed.

Default: 10 * 60 * 1000 (i.e. 10 mins)

Used exclusively when KafkaStreams is created

Use StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG to reference the property.

state.dir

Path to the base directory for a state storage

Default: /tmp/kafka-streams

Used when StateDirectory is created

Use StreamsConfig.STATE_DIR_CONFIG to reference the property.

windowstore.changelog.additional.retention.ms

Added to a Window maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.

Default: 24 * 60 * 60 * 1000L (i.e. 1 day)

results matching ""

    No results matching ""