StreamsConfig — Configuration Properties for Kafka Clients

StreamsConfig is a Apache Kafka AbstractConfig with the configuration properties for a Kafka Streams application.

StreamsConfig is used to reference the properties names (e.g. to avoid any typos or a better type safety).

import org.apache.kafka.streams.StreamsConfig
val conf = new java.util.Properties()
conf.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId")
conf.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092,localhost:9192")

StreamsConfig uses admin. prefix for the configuration properties that are meant to be used for a Kafka AdminClient. Use adminClientPrefix method to add the prefix to a AdminClient property.

import org.apache.kafka.clients.admin.AdminClientConfig
scala> println(AdminClientConfig.RETRIES_CONFIG)
retries

import org.apache.kafka.streams.StreamsConfig.adminClientPrefix
scala> println(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG))
admin.retries
Table 1. Constants for Configuration Properties
Name Property

APPLICATION_ID_CONFIG

application.id

APPLICATION_SERVER_CONFIG

application.server

BOOTSTRAP_SERVERS_CONFIG

Kafka Client’s bootstrap.servers

BUFFERED_RECORDS_PER_PARTITION_CONFIG

buffered.records.per.partition

CACHE_MAX_BYTES_BUFFERING_CONFIG

cache.max.bytes.buffering

CLIENT_ID_CONFIG

client.id

COMMIT_INTERVAL_MS_CONFIG

commit.interval.ms

DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS

default.windowed.key.serde.inner

DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS

default.windowed.value.serde.inner

DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG

default.timestamp.extractor

MAX_TASK_IDLE_MS_CONFIG

max.task.idle.ms

METRICS_RECORDING_LEVEL_CONFIG

metrics.recording.level

NUM_STANDBY_REPLICAS_CONFIG

num.standby.replicas

NUM_STREAM_THREADS_CONFIG

num.stream.threads

PARTITION_GROUPER_CLASS_CONFIG

partition.grouper

POLL_MS_CONFIG

poll.ms

PROCESSING_GUARANTEE_CONFIG

processing.guarantee

REPLICATION_FACTOR_CONFIG

replication.factor

STATE_CLEANUP_DELAY_MS_CONFIG

state.cleanup.delay.ms

STATE_DIR_CONFIG

state.dir

UPGRADE_FROM_CONFIG

WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG

windowstore.changelog.additional.retention.ms

StreamsConfig is also used the configuration for the following Kafka clients (that Kafka Streams uses under the covers):

import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
// ...
val conf = new StreamsConfig(props)

// Consumer Properties
val consumerConfigs = conf.getMainConsumerConfigs("groupId", "clientId")
import collection.JavaConverters._
scala> consumerConfigs.asScala.map { case (key, value) => s"$key -> $value" }.foreach(println)
replication.factor -> 1
num.standby.replicas -> 0
max.poll.records -> 1000
group.id -> groupId
partition.assignment.strategy -> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor
bootstrap.servers -> localhost:8082
enable.auto.commit -> false
admin.retries -> 5
application.server ->
max.poll.interval.ms -> 2147483647
auto.offset.reset -> earliest
windowstore.changelog.additional.retention.ms -> 86400000
internal.leave.group.on.close -> false
application.id -> groupId
client.id -> clientId-consumer

StreamsConfig does not allow users to configure certain Kafka configurations (e.g. for consumer) that are simply removed (with a WARN message in the logs).

Table 2. Kafka Consumer Non-Overridable Configurations
Name Value Description

max.poll.records

1000

auto.offset.reset

earliest

buffered.records.per.partition

1000

The maximum number of records to buffer per partition

enable.auto.commit

false

internal.leave.group.on.close

false

max.poll.interval.ms

Integer.MAX_VALUE

StreamsConfig uses consumer prefix for custom Kafka configurations of a Kafka consumer.

StreamsConfig defines the InternalConfig inner class with the internal properties.

Note
InternalStreamsConfig is an extension of StreamsConfig that is used in StreamsPartitionAssignor to specify custom configuration properties and turn the doLog flag off.

StreamsConfig.EXACTLY_ONCE for Exactly-Once Support (EOS)

String EXACTLY_ONCE = "exactly_once"

StreamsConfig defines StreamsConfig.EXACTLY_ONCE (exactly_once) constant value that is used for the following:

postProcessParsedConfig Method

Map<String, Object> postProcessParsedConfig(
  Map<String, Object> parsedValues)

postProcessParsedConfig…​FIXME

Note
postProcessParsedConfig is used when…​FIXME

eosEnabled Internal Flag for Exactly-Once Support (EOS)

StreamsConfig defines eosEnabled internal flag that is enabled (true) when StreamsConfig.PROCESSING_GUARANTEE_CONFIG is EXACTLY_ONCE.

eosEnabled is used for the following:

  • …​FIXME

getProducerConfigs Method

Map<String, Object> getProducerConfigs(final String clientId)

getProducerConfigs…​FIXME

Note
getProducerConfigs is used when…​FIXME

Getting Configuration for (Creating) Kafka AdminClient — getAdminConfigs Method

Map<String, Object> getAdminConfigs(final String clientId)

getAdminConfigs firstly finds the client properties for a Kafka AdminClient (with admin. prefix).

getAdminConfigs takes the getClientCustomProps and copies the AdminClient properties over.

In the end, getAdminConfigs adds the clientId with -admin suffix as the client.id configuration property.

import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
// required configurations
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")

// Define a custom configuration with admin prefix
import org.apache.kafka.clients.admin.AdminClientConfig
props.put("admin." + AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
val streamsConf = new StreamsConfig(props)
val adminConfigs = streamsConf.getAdminConfigs("my-client-id")

import scala.collection.JavaConverters._
scala> adminConfigs.asScala.map { case (k,v) => s"$k = $v" }.foreach(println)
bootstrap.servers = :9092
metadata.max.age.ms = 10
client.id = my-client-id-admin
Note

getAdminConfigs is used when:

Collecting Properties per Key — clientProps Internal Method

Map<String, Object> clientProps(
  final Set<String> configNames,
  final Map<String, Object> originals)

clientProps collects the configuration properties from originals that have their names in the input configNames, i.e. includes the properties that have been listed in configNames.

Note
clientProps is used exclusively when StreamsConfig is requested to getClientPropsWithPrefix.

Getting Subset of User Configuration by Given Names and Prefix — getClientPropsWithPrefix Internal Method

Map<String, Object> getClientPropsWithPrefix(
  final String prefix,
  final Set<String> configNames)

getClientPropsWithPrefix takes only the properties (as passed in by a user) that have their keys in configNames and adds all properties with the given prefix.

Internally, getClientPropsWithPrefix collects the configuration properties from the original values of Kafka properties as passed in by a user that have their names in configNames.

getClientPropsWithPrefix then copies all original settings with the given prefix (stripping the prefix before adding them) to the collected properties (and possibly overwriting some).

Note
getClientPropsWithPrefix uses AbstractConfig.originals to get the original values of Kafka properties as passed in by the user.
Note
getClientPropsWithPrefix is used when StreamsConfig is requested for getAdminConfigs, getCommonConsumerConfigs, getMainConsumerConfigs and getProducerConfigs.

Getting Common Consumer Configuration — getCommonConsumerConfigs Internal Method

Map<String, Object> getCommonConsumerConfigs()

getCommonConsumerConfigs gets a subset of user configuration for a Kafka consumer as well as the properties with consumer prefix.

Note
getCommonConsumerConfigs uses ConsumerConfig.configNames for the list of the Kafka Consumer-specific configuration keys.
Caution
FIXME
Note
getCommonConsumerConfigs is used when StreamsConfig is requested for getMainConsumerConfigs and getRestoreConsumerConfigs.

Removing "Illegal" User-Defined Configuration Properties — checkIfUnexpectedUserSpecifiedConsumerConfig Internal Method

void checkIfUnexpectedUserSpecifiedConsumerConfig(
  Map<String, Object> clientProvidedProps,
  String[] nonConfigurableConfigs)

checkIfUnexpectedUserSpecifiedConsumerConfig removes non-configurable configuration properties (nonConfigurableConfigs) from user-defined configurations (clientProvidedProps) and prints out a warning for any violation.

Internally, checkIfUnexpectedUserSpecifiedConsumerConfig iterates over nonConfigurableConfigs…​FIXME

Note
checkIfUnexpectedUserSpecifiedConsumerConfig is used when StreamsConfig is requested for getCommonConsumerConfigs and getProducerConfigs.

getRestoreConsumerConfigs Method

Map<String, Object> getRestoreConsumerConfigs(final String clientId)

getRestoreConsumerConfigs…​FIXME

Note
getRestoreConsumerConfigs is used when…​FIXME

Configuration for Kafka Consumer — getMainConsumerConfigs Method

Map<String, Object> getMainConsumerConfigs(
  final String groupId,
  final String clientId)

getMainConsumerConfigs gets the base configuration for a Kafka Consumer first.

getMainConsumerConfigs then…​FIXME

Note
getMainConsumerConfigs is used exclusively when StreamThread is requested to create a StreamThread instance (and requests the KafkaClientSupplier for a Kafka Consumer).

defaultValueSerde Method

Serde defaultValueSerde()

defaultValueSerde…​FIXME

Note
defaultValueSerde is used when…​FIXME

defaultKeySerde Method

Serde defaultKeySerde()

defaultKeySerde…​FIXME

Note
defaultKeySerde is used when…​FIXME

originalsWithPrefix Method

Map<String, Object> originalsWithPrefix(String prefix)

originalsWithPrefix…​FIXME

Note
originalsWithPrefix is used when…​FIXME

adminClientPrefix Static Method

static String adminClientPrefix(final String adminClientProp)

adminClientPrefix simply adds the admin. prefix to a given adminClientProp.

Creating StreamsConfig Instance

StreamsConfig takes the following to be created:

  • Configuration properties

  • doLog flag

StreamsConfig initializes the eosEnabled internal property.

Internal Configuration Properties

Name Value

ASSIGNMENT_ERROR_CODE

__assignment.error.code__

TASK_MANAGER_FOR_PARTITION_ASSIGNOR

__task.manager.instance__

Used to associate the TaskManager (of StreamThread) with StreamsPartitionAssignor (when configured)

results matching ""

    No results matching ""