Kafka Properties

Table 1. Properties
Name Description

advertised.listeners

Comma-separated list of URIs to publish to ZooKeeper for clients to use, if different than the listeners config property.

Default: null

In IaaS environments (e.g. Docker, Kubernetes, a cloud), advertised.listeners may need to be different from the interface to which a Kafka broker binds. That ensures that the Kafka broker advertises an address that is accessible from both local and external hosts. If not set, the value for listeners is used.

Unlike listeners it is invalid to advertise the 0.0.0.0 non-routable meta-address.

authorizer.class.name

Fully-qualified class name of the Authorizer for request authorization

Default: (empty)

Supports authorizers that implement the deprecated kafka.security.auth.Authorizer trait which was previously used for authorization before Kafka 2.4.0 (KIP-504 - Add new Java Authorizer Interface).

auto.commit.interval.ms

How often (in milliseconds) consumer offsets should be auto-committed when enable.auto.commit is enabled

auto.create.topics.enable

Enables auto creation of a topic

Default: true

auto.leader.rebalance.enable

Enables auto leader balancing

Default: true

A background thread checks and triggers leader balance if required.

auto.offset.reset

Reset policy — what to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest — automatically reset the offset to the earliest offset

  • latest — automatically reset the offset to the latest offset

  • none — throw an exception to the consumer if no previous offset is found for the consumer’s group

  • anything else: throw an exception to a consumer

Default: latest

background.threads

The number of threads to use for various background processing tasks

Default: 10

Must be at least 1

bootstrap.servers

A comma-separated list of host:port pairs that are the addresses of one or more brokers in a Kafka cluster, e.g. localhost:9092 or localhost:9092,another.host:9092.

Default: (empty)

broker.id

The broker id of a Kafka broker for identification purposes

If unset, a unique broker id will be generated. To avoid conflicts between zookeeper generated broker id’s and user configured broker id’s, generated broker IDs start from reserved.broker.max.id + 1.

Use KafkaConfig.brokerId to access the current value.

broker.id.generation.enable

Enables automatic broker id generation of a Kafka broker

Default: true

When enabled (true) the value configured for reserved.broker.max.id should be checked.

broker.rack

check.crcs

Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.

Use ConsumerConfig.CHECK_CRCS_CONFIG

client.id

Default: (randomly-generated)

connection.failed.authentication.delay.ms

Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure. This must be configured to be less than connections.max.idle.ms to prevent connection timeout.

Default: 100

Has to be at least 0

connections.max.idle.ms

Idle connections timeout: the server socket processor threads close the connections that idle more than this

Default: 10 * 60 * 1000L

control.plane.listener.name

Name of the listener for communication between controller and brokers

Default: null (undefined), i.e. no dedicated endpoint

Must be defined in listener.security.protocol.map

Note
FIXME Is this true? Must be defined among advertised.listeners.

Must be different than interBrokerListenerName

Broker will use the name to locate the endpoint in listeners, to listen for connections from the controller.

For example, if a broker’s config is:

listeners=INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093,CONTROLLER://192.1.1.8:9094
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROLLER:SSL
control.plane.listener.name=CONTROLLER

On startup, the broker will start listening on "192.1.1.8:9094" with security protocol "SSL". On controller side, when it discovers a broker’s published endpoints through zookeeper, it will use the name to find the endpoint, which it will use to establish connection to the broker.

For example, if the broker’s published endpoints on zookeeper are:

"endpoints": [
  "INTERNAL://broker1:9092",
  "EXTERNAL://broker1:9093",
  "CONTROLLER://broker1:9094"
]

and the controller’s config is:

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROLLER:SSL
control.plane.listener.name=CONTROLLER

then a controller will use "broker1:9094" with security protocol "SSL" to connect to the broker.

default.replication.factor

The default replication factor that is used for auto-created topics

Default: 1

Increase the default value to at least 2

delegation.token.master.key

delete.topic.enable

Enables topic deletion

Note
Deleting topic through the admin tool has no effect with the property disabled.

Default: true

enable.auto.commit

When enabled (i.e. true) consumer offsets are committed automatically in the background (aka consumer auto commit) every auto.commit.interval.ms

Default: true

When disabled, offsets have to be committed manually (synchronously using KafkaConsumer.commitSync or asynchronously KafkaConsumer.commitAsync). On restart restore the position of a consumer using KafkaConsumer.seek.

Used when KafkaConsumer is created and creates a ConsumerCoordinator.

fetch.max.bytes

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel.

Use ConsumerConfig.FETCH_MAX_BYTES_CONFIG

fetch.max.wait.ms

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

Use ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG

fetch.min.bytes

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.

Use ConsumerConfig.FETCH_MIN_BYTES_CONFIG

file.delete.delay.ms

group.id

The name of the consumer group the consumer is part of.

heartbeat.interval.ms

The expected time between heartbeats to the group coordinator when using Kafka’s group management facilities.

host.name

The hostname a Kafka broker listens on

Default: (empty)

inter.broker.listener.name

Name of the listener for inter-broker communication (resolved per listener.security.protocol.map)

It is an error to use together with security.inter.broker.protocol

inter.broker.protocol.version

Version of the inter-broker protocol

Default: the latest ApiVersion (e.g. 2.1-IV2)

Typically bumped up after all brokers were upgraded to a new version

interceptor.classes

Comma-separated list of ConsumerInterceptor class names.

Default: (empty)

key.deserializer

How to deserialize message keys.

leader.imbalance.check.interval.seconds

How often the active KafkaController schedules the auto-leader-rebalance-task (aka AutoLeaderRebalance or AutoPreferredReplicaLeaderElection or auto leader balancing)

Default: 300

leader.imbalance.per.broker.percentage

Allowed ratio of leader imbalance per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.

Default: 10

listeners

Comma-separated list of URIs and listener names that a Kafka broker will listen on

Default: PLAINTEXT://host.name:port

Use 0.0.0.0 to bind to all the network interfaces on a machine or leave it empty to bind to the default interface.

listener.security.protocol.map

Map of listener names and security protocols (key and value are separated by a colon and map entries are separated by commas). Each listener name should only appear once in the map.

Default: Map with PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL keys

This map must be defined for the same security protocol to be usable in more than one port or IP. For example, internal and external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners with names INTERNAL and EXTERNAL and this property as: INTERNAL:SSL,EXTERNAL:SSL.

Different security (SSL and SASL) settings can be configured for each listener by adding a normalised prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. If the config for the listener name is not set, the config will fallback to the generic config (ssl.keystore.location).

log.cleaner.backoff.ms

log.cleaner.dedupe.buffer.size

log.cleaner.enable

Enables the log cleaner process to run on a Kafka broker (true). Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

Default: true

log.cleaner.io.buffer.load.factor

log.cleaner.io.buffer.size

log.cleaner.io.max.bytes.per.second

log.cleaner.threads

log.cleanup.policy

log.dir

The directory in which the log data is kept

Default: /tmp/kafka-logs

log.dirs

The directories in which the log data is kept

Default: log.dir

Use KafkaConfig.logDirs to access the current value.

log.flush.interval.messages

Number of messages written to a log partition is kept in memory before flushing to disk (by forcing an fsync)

Default: Long.MaxValue (maximum possible long value)

E.g. if this was set to 1 we would fsync after every message; if it were 5 we would fsync after every five messages.

It is recommended not setting this and using replication for durability and allowing the operating system’s background flush capabilities as it is more efficient.

Must be at least 0

Topic-level configuration: flush.messages

log.flush.interval.ms

How long (in millis) a message written to a log partition is kept in memory before flushing to disk (by forcing an fsync). If not set, the value in log.flush.scheduler.interval.ms is used.

Default: null (undefined)

E.g. if this was set to 1000 we would fsync after 1000 ms had passed.

Used exclusively when LogManager is requested to flushDirtyLogs.

It is recommended not setting this and using replication for durability and allowing the operating system’s background flush capabilities as it is more efficient.

Must be undefined or at least 0

Topic-level configuration: flush.messages

log.flush.offset.checkpoint.interval.ms

log.flush.scheduler.interval.ms

log.flush.start.offset.checkpoint.interval.ms

log.index.size.max.bytes

Maximum size (in bytes) of the offset index file (that maps offsets to file positions). It is preallocated and shrinked only after log rolls.

Default: 10 * 1024 * 1024

You generally should not need to change this setting.

Must be at least 0

log.retention.bytes

Maximum size of a partition (which consists of log segments) to grow before discarding old segments and free up space.

log.retention.bytes is enforced at the partition level, multiply it by the number of partitions to compute the topic retention in bytes.

Default: -1L

Must be at least -1

log.retention.check.interval.ms

How often (in millis) the LogManager (as kafka-log-retention task) checks whether any log is eligible for deletion

Default: 5 * 60 * 1000L (millis)

Must be at least 1

log.retention.ms

How long (in millis) to keep a log file before deleting it. -1 denotes no time limit

Default: 24 * 7 * 60 * 60 * 1000L (7 days)

Must be at least -1

Unless set, the value of log.retention.minutes is used.

log.retention.minutes

How long (in mins) to keep a log file before deleting it. -1 denotes no time limit

Unless set, the value of log.retention.hours is used. Secondary to the log.retention.ms.

log.retention.hours

How long (in hours) to keep a log file before deleting it. -1 denotes no time limit

Considered the last unless log.retention.ms and log.retention.minutes were set.

log.roll.ms

Time (in millis) after which Kafka forces the log to roll even if the segment file isn’t full to ensure that retention can delete or compact old data.

Default: 604800000 (7 days)

Must be at least 1

log.segment.bytes

The maximum size of a segment file of logs. Retention and cleaning are always done one file at a time so a larger segment size means fewer files but less granular control over retention.

Default: 1 * 1024 * 1024 * 1024

Must be at least 14 bytes (LegacyRecord.RECORD_OVERHEAD_V0)

Use KafkaConfig.logSegmentBytes to access the current value.

max.block.ms

max.partition.fetch.bytes

The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

Use ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG

Note
Use fetch.max.bytes for limiting the consumer request size.

max.poll.records

(KafkaConsumer) The maximum number of records returned from a Kafka Consumer when polling topics for records.

The default setting (-1) sets no upper bound on the number of records, i.e. Consumer.poll() will return as soon as either any data is available or the passed timeout expires.

max.poll.records was added to Kafka in 0.10.0.0 by KIP-41: KafkaConsumer Max Records.

From kafka-clients mailing list:

max.poll.records only controls the number of records returned from poll, but does not affect fetching. The consumer will try to prefetch records from all partitions it is assigned. It will then buffer those records and return them in batches of max.poll.records each (either all from the same topic partition if there are enough left to satisfy the number of records, or from multiple topic partitions if the data from the last fetch for one of the topic partitions does not cover the max.poll.records).

Use ConsumerConfig.MAX_POLL_RECORDS_CONFIG.


Internally, max.poll.records is used exclusively when KafkaConsumer is created (to create a Fetcher).

message.max.bytes

metadata.max.age.ms

metric.reporters

The list of fully-qualified classes names of the metrics reporters.

Default: JmxReporter

metrics.num.samples

Number of samples to compute metrics.

metrics.sample.window.ms

Time window (in milliseconds) a metrics sample is computed over.

min.insync.replicas

The minimum number of replicas in ISR that is needed to commit a produce request with required.acks=-1 (or all)

Default: 1

Must be at least 1

When a Kafka producer sets acks to all (or -1), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.

If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).

Used together with acks allows you to enforce greater durability guarantees.

A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.

num.io.threads

The number of threads that KafkaServer uses for processing requests, which may include disk I/O

Default: 8

Must be at least 1

max.connections.per.ip

The maximum number of connections allowed from each ip address.

Default: Int.MaxValue

Must be at least 0 (with 0 if there are overrides configured using max.connections.per.ip.overrides property)

max.connections.per.ip.overrides

A comma-separated list of per-ip or hostname overrides to the default maximum number of connections, e.g. hostName:100,127.0.0.1:200

Default: (empty)

num.network.threads

The number of threads that SocketServer uses for the number of processors per endpoint (for receiving requests from the network and sending responses to the network)

Default: 3

Must be at least 1

num.partitions

The number of log partitions for auto-created topics

Default: 1

Increase the default value (1) since it is better to over-partition a topic that leads to a better data balancing and aids consumer parallelism.

num.recovery.threads.per.data.dir

The number of threads per log data directory for log recovery at startup and flushing at shutdown

Default: 1

Must be at least 1

num.replica.alter.log.dirs.threads

The number of threads that can move replicas between log directories, which may include disk I/O

Default: null

num.replica.fetchers

The number of fetcher threads that ReplicaFetcherManager uses for replicating messages from a source broker

Default: 1

The higher the value the higher degree of I/O parallelism in a follower broker.

port

The port a Kafka broker listens on

Default: 9092

principal.builder.class

Fully-qualified name of KafkaPrincipalBuilder implementation to build the KafkaPrincipal object for authorization

Default: null (i.e. DefaultKafkaPrincipalBuilder)

Supports the deprecated PrincipalBuilder interface which was previously used for client authentication over SSL.

If no principal builder is defined, the default behavior depends on the security protocol in use:

  • For SSL authentication, the principal will be derived using the rules defined by ssl.principal.mapping.rules applied on the distinguished name from the client certificate if one is provided; otherwise, if client authentication is not required, the principal name will be ANONYMOUS.

  • For SASL authentication, the principal will be derived using the rules defined by sasl.kerberos.principal.to.local.rules if GSSAPI is in use, and the SASL authentication ID for other mechanisms. For PLAINTEXT, the principal will be ANONYMOUS.

Used when ChannelBuilders is requested to create a KafkaPrincipalBuilder

replica.fetch.backoff.ms

How long (in millis) a fetcher thread is going to sleep when there are no active partitions (while sending a fetch request) or after a fetch partition error and handlePartitionsWithErrors

Default: 1000 (millis)

Must be at least 0

replica.fetch.max.bytes

The number of bytes of messages to attempt to fetch for each partition

Default: 1024 * 1024 (1 MB)

Must be at least 0

This is not an absolute maximum, if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

replica.fetch.response.max.bytes

Maximum bytes expected for the entire fetch response

Default: 10 * 1024 * 1024

Must be at least 0

Records are fetched in batches, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config).

queued.max.requests

The number of queued requests allowed before blocking the network threads

Default: 500

Must be at least 1

rebalance.timeout.ms

The maximum allowed time for each worker to join the group once a rebalance has begun.

receive.buffer.bytes

The hint about the size of the TCP network receive buffer (SO_RCVBUF) to use (for a socket) when reading data. If the value is -1, the OS default will be used.

replica.fetch.wait.max.ms

replica.lag.time.max.ms

How long to wait for a follower to consume up to the leader’s log end offset (LEO) before the leader removes the follower from the ISR of a partition

Default: 10000L (millis)

Note
replica.fetch.wait.max.ms should always be less than or equal to replica.lag.time.max.ms to prevent frequent changes in ISR.

replica.socket.timeout.ms

Socket timeout of ReplicaFetcherBlockingSend when sending network requests to partition leader brokers

Default: 30 * 1000 (30 seconds)

Should always be at least replica.fetch.wait.max.ms to prevent unnecessary socket timeouts

request.timeout.ms

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

Use ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG

reserved.broker.max.id

Maximum number that can be used for broker.id. Has to be at least 0.

Default: 1000

* Use KafkaConfig.MaxReservedBrokerIdProp to reference the property

* Use KafkaConfig.maxReservedBrokerId to access the current value

retry.backoff.ms

Time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

Use ConsumerConfig.RETRY_BACKOFF_MS_CONFIG

sasl.mechanism.inter.broker.protocol

sasl.jaas.config

sasl.enabled.mechanisms

sasl.kerberos.service.name

sasl.kerberos.kinit.cmd

sasl.kerberos.ticket.renew.window.factor

sasl.kerberos.ticket.renew.jitter

sasl.kerberos.min.time.before.relogin

sasl.kerberos.principal.to.local.rules

sasl.login.refresh.window.factor

sasl.login.refresh.window.jitter

sasl.login.refresh.min.period.seconds

sasl.login.refresh.buffer.seconds

security.inter.broker.protocol

Security protocol for inter-broker communication

Default: PLAINTEXT

Possible values:

  • PLAINTEXT

  • SSL

  • SASL_PLAINTEXT

  • SASL_SSL

It is an error when defined with inter.broker.listener.name (as it then should only be in listener.security.protocol.map).

It is validated when the inter-broker communication uses a SASL protocol (SASL_PLAINTEXT or SASL_SSL) for…​FIXME

send.buffer.bytes

The hint about the size of the TCP network send buffer (SO_SNDBUF) to use (for a socket) when sending data. If the value is -1, the OS default will be used.

session.timeout.ms

The timeout used to detect worker failures.

Default: 10000

socket.request.max.bytes

The maximum number of bytes in a socket request

Default: 100 * 1024 * 1024

Must be at least 1

ssl.principal.mapping.rules

Rules for mapping from the distinguished name from a client certificate to short name.

Default: DEFAULT (i.e. the distinguished name of a X.500 certificate is the principal)

The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored.

This configuration is ignored for a custom KafkaPrincipalBuilder as defined by the principal.builder.class configuration.

Used when SslChannelBuilder is configured (to create a SslPrincipalMapper)

ssl.protocol

ssl.provider

ssl.cipher.suites

ssl.enabled.protocols

ssl.keystore.type

ssl.keystore.location

ssl.keystore.password

ssl.key.password

ssl.truststore.type

ssl.truststore.location

ssl.truststore.password

ssl.keymanager.algorithm

ssl.trustmanager.algorithm

ssl.endpoint.identification.algorithm

ssl.secure.random.implementation

ssl.client.auth

Client authentication

Default: none

Supported values (case-insensitive): required, requested, none

transactional.id.expiration.ms

transaction.max.timeout.ms

The maximum allowed timeout for transactions (in millis).

If a client’s requested transaction time exceed this, then the broker will return an error in InitProducerIdRequest. This prevents a client from a too large timeout that can stall consumers reading from topics included in the transaction.

Default: 15 minutes

Must be at least 1

unclean.leader.election.enable

value.deserializer

How to deserialize message values

zookeeper.connect

Comma-separated list of Zookeeper hosts (as host:port pairs) that brokers register to, e.g. localhost:2181, 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002

Default: (empty)

Zookeeper URIs can have an optional chroot path suffix at the end, e.g. 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a

If the optional chroot path suffix is used, all paths are relative to this path.

It is recommended to include all the hosts in a Zookeeper ensemble (cluster)

zookeeper.connection.timeout.ms

The max time that the client waits to establish a connection to zookeeper

zookeeper.max.in.flight.requests

The maximum number of unacknowledged requests the client will send to Zookeeper before blocking. Has to be at least 1

Default: 10

zookeeper.session.timeout.ms

Zookeeper session timeout

Default: 6000

zookeeper.set.acl

Enables secure ACLs

Default: false

results matching ""

    No results matching ""