import org.apache.kafka.streams.StreamsConfig
val conf = new java.util.Properties()
conf.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId")
conf.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092,localhost:9192")
StreamsConfig — Configuration Properties for Kafka Clients
StreamsConfig is a Apache Kafka AbstractConfig with the configuration properties for a Kafka Streams application.
StreamsConfig is used to reference the properties names (e.g. to avoid any typos or a better type safety).
StreamsConfig uses admin. prefix for the configuration properties that are meant to be used for a Kafka AdminClient. Use adminClientPrefix method to add the prefix to a AdminClient property.
import org.apache.kafka.clients.admin.AdminClientConfig
scala> println(AdminClientConfig.RETRIES_CONFIG)
retries
import org.apache.kafka.streams.StreamsConfig.adminClientPrefix
scala> println(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG))
admin.retries
| Name | Property |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StreamsConfig is also used the configuration for the following Kafka clients (that Kafka Streams uses under the covers):
import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
// ...
val conf = new StreamsConfig(props)
// Consumer Properties
val consumerConfigs = conf.getMainConsumerConfigs("groupId", "clientId")
import collection.JavaConverters._
scala> consumerConfigs.asScala.map { case (key, value) => s"$key -> $value" }.foreach(println)
replication.factor -> 1
num.standby.replicas -> 0
max.poll.records -> 1000
group.id -> groupId
partition.assignment.strategy -> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor
bootstrap.servers -> localhost:8082
enable.auto.commit -> false
admin.retries -> 5
application.server ->
max.poll.interval.ms -> 2147483647
auto.offset.reset -> earliest
windowstore.changelog.additional.retention.ms -> 86400000
internal.leave.group.on.close -> false
application.id -> groupId
client.id -> clientId-consumer
StreamsConfig does not allow users to configure certain Kafka configurations (e.g. for consumer) that are simply removed (with a WARN message in the logs).
| Name | Value | Description |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StreamsConfig uses consumer prefix for custom Kafka configurations of a Kafka consumer.
StreamsConfig defines the InternalConfig inner class with the internal properties.
|
Note
|
InternalStreamsConfig is an extension of StreamsConfig that is used in StreamsPartitionAssignor to specify custom configuration properties and turn the doLog flag off.
|
StreamsConfig.EXACTLY_ONCE for Exactly-Once Support (EOS)
String EXACTLY_ONCE = "exactly_once"
StreamsConfig defines StreamsConfig.EXACTLY_ONCE (exactly_once) constant value that is used for the following:
-
eosEnabled internal flag
-
AbstractTask is created
-
GlobalStateManagerImpl is created
-
StreamThread is created
postProcessParsedConfig Method
Map<String, Object> postProcessParsedConfig(
Map<String, Object> parsedValues)
postProcessParsedConfig…FIXME
|
Note
|
postProcessParsedConfig is used when…FIXME
|
eosEnabled Internal Flag for Exactly-Once Support (EOS)
StreamsConfig defines eosEnabled internal flag that is enabled (true) when StreamsConfig.PROCESSING_GUARANTEE_CONFIG is EXACTLY_ONCE.
eosEnabled is used for the following:
-
…FIXME
getProducerConfigs Method
Map<String, Object> getProducerConfigs(final String clientId)
getProducerConfigs…FIXME
|
Note
|
getProducerConfigs is used when…FIXME
|
Getting Configuration for (Creating) Kafka AdminClient — getAdminConfigs Method
Map<String, Object> getAdminConfigs(final String clientId)
getAdminConfigs firstly finds the client properties for a Kafka AdminClient (with admin. prefix).
getAdminConfigs takes the getClientCustomProps and copies the AdminClient properties over.
In the end, getAdminConfigs adds the clientId with -admin suffix as the client.id configuration property.
import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
// required configurations
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
// Define a custom configuration with admin prefix
import org.apache.kafka.clients.admin.AdminClientConfig
props.put("admin." + AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
val streamsConf = new StreamsConfig(props)
val adminConfigs = streamsConf.getAdminConfigs("my-client-id")
import scala.collection.JavaConverters._
scala> adminConfigs.asScala.map { case (k,v) => s"$k = $v" }.foreach(println)
bootstrap.servers = :9092
metadata.max.age.ms = 10
client.id = my-client-id-admin
Collecting Properties per Key — clientProps Internal Method
Map<String, Object> clientProps(
final Set<String> configNames,
final Map<String, Object> originals)
clientProps collects the configuration properties from originals that have their names in the input configNames, i.e. includes the properties that have been listed in configNames.
|
Note
|
clientProps is used exclusively when StreamsConfig is requested to getClientPropsWithPrefix.
|
Getting Subset of User Configuration by Given Names and Prefix — getClientPropsWithPrefix Internal Method
Map<String, Object> getClientPropsWithPrefix(
final String prefix,
final Set<String> configNames)
getClientPropsWithPrefix takes only the properties (as passed in by a user) that have their keys in configNames and adds all properties with the given prefix.
Internally, getClientPropsWithPrefix collects the configuration properties from the original values of Kafka properties as passed in by a user that have their names in configNames.
getClientPropsWithPrefix then copies all original settings with the given prefix (stripping the prefix before adding them) to the collected properties (and possibly overwriting some).
|
Note
|
getClientPropsWithPrefix uses AbstractConfig.originals to get the original values of Kafka properties as passed in by the user.
|
|
Note
|
getClientPropsWithPrefix is used when StreamsConfig is requested for getAdminConfigs, getCommonConsumerConfigs, getMainConsumerConfigs and getProducerConfigs.
|
Getting Common Consumer Configuration — getCommonConsumerConfigs Internal Method
Map<String, Object> getCommonConsumerConfigs()
getCommonConsumerConfigs gets a subset of user configuration for a Kafka consumer as well as the properties with consumer prefix.
|
Note
|
getCommonConsumerConfigs uses ConsumerConfig.configNames for the list of the Kafka Consumer-specific configuration keys.
|
|
Caution
|
FIXME |
|
Note
|
getCommonConsumerConfigs is used when StreamsConfig is requested for getMainConsumerConfigs and getRestoreConsumerConfigs.
|
Removing "Illegal" User-Defined Configuration Properties — checkIfUnexpectedUserSpecifiedConsumerConfig Internal Method
void checkIfUnexpectedUserSpecifiedConsumerConfig(
Map<String, Object> clientProvidedProps,
String[] nonConfigurableConfigs)
checkIfUnexpectedUserSpecifiedConsumerConfig removes non-configurable configuration properties (nonConfigurableConfigs) from user-defined configurations (clientProvidedProps) and prints out a warning for any violation.
Internally, checkIfUnexpectedUserSpecifiedConsumerConfig iterates over nonConfigurableConfigs…FIXME
|
Note
|
checkIfUnexpectedUserSpecifiedConsumerConfig is used when StreamsConfig is requested for getCommonConsumerConfigs and getProducerConfigs.
|
getRestoreConsumerConfigs Method
Map<String, Object> getRestoreConsumerConfigs(final String clientId)
getRestoreConsumerConfigs…FIXME
|
Note
|
getRestoreConsumerConfigs is used when…FIXME
|
Configuration for Kafka Consumer — getMainConsumerConfigs Method
Map<String, Object> getMainConsumerConfigs(
final String groupId,
final String clientId)
getMainConsumerConfigs gets the base configuration for a Kafka Consumer first.
getMainConsumerConfigs then…FIXME
|
Note
|
getMainConsumerConfigs is used exclusively when StreamThread is requested to create a StreamThread instance (and requests the KafkaClientSupplier for a Kafka Consumer).
|
defaultValueSerde Method
Serde defaultValueSerde()
defaultValueSerde…FIXME
|
Note
|
defaultValueSerde is used when…FIXME
|
defaultKeySerde Method
Serde defaultKeySerde()
defaultKeySerde…FIXME
|
Note
|
defaultKeySerde is used when…FIXME
|
originalsWithPrefix Method
Map<String, Object> originalsWithPrefix(String prefix)
originalsWithPrefix…FIXME
|
Note
|
originalsWithPrefix is used when…FIXME
|
adminClientPrefix Static Method
static String adminClientPrefix(final String adminClientProp)
adminClientPrefix simply adds the admin. prefix to a given adminClientProp.
Creating StreamsConfig Instance
StreamsConfig takes the following to be created:
StreamsConfig initializes the eosEnabled internal property.
Internal Configuration Properties
| Name | Value |
|---|---|
|
|
|
Used to associate the TaskManager (of StreamThread) with StreamsPartitionAssignor (when configured) |