import org.apache.kafka.streams.StreamsConfig
val conf = new java.util.Properties()
conf.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId")
conf.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092,localhost:9192")
StreamsConfig — Configuration Properties for Kafka Clients
StreamsConfig
is a Apache Kafka AbstractConfig with the configuration properties for a Kafka Streams application.
StreamsConfig
is used to reference the properties names (e.g. to avoid any typos or a better type safety).
StreamsConfig
uses admin. prefix for the configuration properties that are meant to be used for a Kafka AdminClient. Use adminClientPrefix method to add the prefix to a AdminClient
property.
import org.apache.kafka.clients.admin.AdminClientConfig
scala> println(AdminClientConfig.RETRIES_CONFIG)
retries
import org.apache.kafka.streams.StreamsConfig.adminClientPrefix
scala> println(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG))
admin.retries
Name | Property |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StreamsConfig
is also used the configuration for the following Kafka clients (that Kafka Streams uses under the covers):
import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
// ...
val conf = new StreamsConfig(props)
// Consumer Properties
val consumerConfigs = conf.getMainConsumerConfigs("groupId", "clientId")
import collection.JavaConverters._
scala> consumerConfigs.asScala.map { case (key, value) => s"$key -> $value" }.foreach(println)
replication.factor -> 1
num.standby.replicas -> 0
max.poll.records -> 1000
group.id -> groupId
partition.assignment.strategy -> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor
bootstrap.servers -> localhost:8082
enable.auto.commit -> false
admin.retries -> 5
application.server ->
max.poll.interval.ms -> 2147483647
auto.offset.reset -> earliest
windowstore.changelog.additional.retention.ms -> 86400000
internal.leave.group.on.close -> false
application.id -> groupId
client.id -> clientId-consumer
StreamsConfig
does not allow users to configure certain Kafka configurations (e.g. for consumer) that are simply removed (with a WARN message in the logs).
Name | Value | Description |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StreamsConfig
uses consumer
prefix for custom Kafka configurations of a Kafka consumer.
StreamsConfig
defines the InternalConfig
inner class with the internal properties.
Note
|
InternalStreamsConfig is an extension of StreamsConfig that is used in StreamsPartitionAssignor to specify custom configuration properties and turn the doLog flag off.
|
StreamsConfig.EXACTLY_ONCE for Exactly-Once Support (EOS)
String EXACTLY_ONCE = "exactly_once"
StreamsConfig
defines StreamsConfig.EXACTLY_ONCE
(exactly_once
) constant value that is used for the following:
-
eosEnabled internal flag
-
AbstractTask is created
-
GlobalStateManagerImpl is created
-
StreamThread is created
postProcessParsedConfig
Method
Map<String, Object> postProcessParsedConfig(
Map<String, Object> parsedValues)
postProcessParsedConfig
…FIXME
Note
|
postProcessParsedConfig is used when…FIXME
|
eosEnabled
Internal Flag for Exactly-Once Support (EOS)
StreamsConfig
defines eosEnabled
internal flag that is enabled (true
) when StreamsConfig.PROCESSING_GUARANTEE_CONFIG is EXACTLY_ONCE.
eosEnabled
is used for the following:
-
…FIXME
getProducerConfigs
Method
Map<String, Object> getProducerConfigs(final String clientId)
getProducerConfigs
…FIXME
Note
|
getProducerConfigs is used when…FIXME
|
Getting Configuration for (Creating) Kafka AdminClient — getAdminConfigs
Method
Map<String, Object> getAdminConfigs(final String clientId)
getAdminConfigs
firstly finds the client properties for a Kafka AdminClient (with admin. prefix).
getAdminConfigs
takes the getClientCustomProps and copies the AdminClient properties over.
In the end, getAdminConfigs
adds the clientId
with -admin
suffix as the client.id
configuration property.
import org.apache.kafka.streams.StreamsConfig
val props = new java.util.Properties()
// required configurations
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "demo")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
// Define a custom configuration with admin prefix
import org.apache.kafka.clients.admin.AdminClientConfig
props.put("admin." + AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
val streamsConf = new StreamsConfig(props)
val adminConfigs = streamsConf.getAdminConfigs("my-client-id")
import scala.collection.JavaConverters._
scala> adminConfigs.asScala.map { case (k,v) => s"$k = $v" }.foreach(println)
bootstrap.servers = :9092
metadata.max.age.ms = 10
client.id = my-client-id-admin
Collecting Properties per Key — clientProps
Internal Method
Map<String, Object> clientProps(
final Set<String> configNames,
final Map<String, Object> originals)
clientProps
collects the configuration properties from originals
that have their names in the input configNames
, i.e. includes the properties that have been listed in configNames
.
Note
|
clientProps is used exclusively when StreamsConfig is requested to getClientPropsWithPrefix.
|
Getting Subset of User Configuration by Given Names and Prefix — getClientPropsWithPrefix
Internal Method
Map<String, Object> getClientPropsWithPrefix(
final String prefix,
final Set<String> configNames)
getClientPropsWithPrefix
takes only the properties (as passed in by a user) that have their keys in configNames
and adds all properties with the given prefix
.
Internally, getClientPropsWithPrefix
collects the configuration properties from the original values of Kafka properties as passed in by a user that have their names in configNames
.
getClientPropsWithPrefix
then copies all original settings with the given prefix
(stripping the prefix before adding them) to the collected properties (and possibly overwriting some).
Note
|
getClientPropsWithPrefix uses AbstractConfig.originals to get the original values of Kafka properties as passed in by the user.
|
Note
|
getClientPropsWithPrefix is used when StreamsConfig is requested for getAdminConfigs, getCommonConsumerConfigs, getMainConsumerConfigs and getProducerConfigs.
|
Getting Common Consumer Configuration — getCommonConsumerConfigs
Internal Method
Map<String, Object> getCommonConsumerConfigs()
getCommonConsumerConfigs
gets a subset of user configuration for a Kafka consumer as well as the properties with consumer prefix.
Note
|
getCommonConsumerConfigs uses ConsumerConfig.configNames for the list of the Kafka Consumer-specific configuration keys.
|
Caution
|
FIXME |
Note
|
getCommonConsumerConfigs is used when StreamsConfig is requested for getMainConsumerConfigs and getRestoreConsumerConfigs.
|
Removing "Illegal" User-Defined Configuration Properties — checkIfUnexpectedUserSpecifiedConsumerConfig
Internal Method
void checkIfUnexpectedUserSpecifiedConsumerConfig(
Map<String, Object> clientProvidedProps,
String[] nonConfigurableConfigs)
checkIfUnexpectedUserSpecifiedConsumerConfig
removes non-configurable configuration properties (nonConfigurableConfigs
) from user-defined configurations (clientProvidedProps
) and prints out a warning for any violation.
Internally, checkIfUnexpectedUserSpecifiedConsumerConfig
iterates over nonConfigurableConfigs
…FIXME
Note
|
checkIfUnexpectedUserSpecifiedConsumerConfig is used when StreamsConfig is requested for getCommonConsumerConfigs and getProducerConfigs.
|
getRestoreConsumerConfigs
Method
Map<String, Object> getRestoreConsumerConfigs(final String clientId)
getRestoreConsumerConfigs
…FIXME
Note
|
getRestoreConsumerConfigs is used when…FIXME
|
Configuration for Kafka Consumer — getMainConsumerConfigs
Method
Map<String, Object> getMainConsumerConfigs(
final String groupId,
final String clientId)
getMainConsumerConfigs
gets the base configuration for a Kafka Consumer first.
getMainConsumerConfigs
then…FIXME
Note
|
getMainConsumerConfigs is used exclusively when StreamThread is requested to create a StreamThread instance (and requests the KafkaClientSupplier for a Kafka Consumer).
|
defaultValueSerde
Method
Serde defaultValueSerde()
defaultValueSerde
…FIXME
Note
|
defaultValueSerde is used when…FIXME
|
defaultKeySerde
Method
Serde defaultKeySerde()
defaultKeySerde
…FIXME
Note
|
defaultKeySerde is used when…FIXME
|
originalsWithPrefix
Method
Map<String, Object> originalsWithPrefix(String prefix)
originalsWithPrefix
…FIXME
Note
|
originalsWithPrefix is used when…FIXME
|
adminClientPrefix
Static Method
static String adminClientPrefix(final String adminClientProp)
adminClientPrefix
simply adds the admin. prefix to a given adminClientProp
.
Creating StreamsConfig Instance
StreamsConfig
takes the following to be created:
StreamsConfig
initializes the eosEnabled internal property.
Internal Configuration Properties
Name | Value |
---|---|
|
|
|
Used to associate the TaskManager (of StreamThread) with StreamsPartitionAssignor (when configured) |