Kafka client configuration
Common Consumer Properties
Kafka has defaults for most properties. These attempt to guess safe/useful values for most use cases, but it is up to you to judge whether your case requires a more specific configuration. Certain properties, such as protocols, security settings and endpoint URLs are necessarily mandatory.
Note that entur-kafka-spring-starter sets some otherwise mandatory properties for you; see that library’s documentation for details.
Several of the settings below pertain to the Kafka Java libraries specifically. For non-JVM languages, examine that language’s client library for the equivalents.
Mandatory properties
bootstrap.servers
should be set to the bootstrap URL of the Kafka cluster your application connects to.schema.registry.url
should be set to the URL for the schema registry for the Kafka cluster your application connects to.- While not strictly mandatory from a technical point of view, it is policy that all topics have a schema associated with their values. This property should thus always be set for producer clients, and is highly recommended for consumers as well.
security.protocol
must be set toSASL_SSL
. We require one-way SSL authentication for our clusters. No plaintext listeners are enabled.
sasl.mechanism
must be set toSCRAM-SHA-512
sasl.jaas.config
must be set toorg.apache.kafka.common.security.scram.ScramLoginModule required username="\<username\>" password="\<password\>"
, populated with the username and password for your application's Kafka user.
Mandatory properties on Aiven only
basic.auth.credentials.source
should be set toUSER_INFO
. Necessary as Aiven requires auth for all schema registry operations.basic.auth.user.info
should be set tousername:password
, withusername
andpassword
being the same as fromsasl.jaas.config
above.
Mandatory consumer-only properties
group.id
should be set to your application’s consumer group ID. For more on consumer groups and how they are used, see this page.key.deserializer
(for JVM clients): Which class to use for deserialization of keys. Commonlyorg.apache.kafka.common.serialization.StringDeserializer
is used, but there are deserializers for more specific types as well asio.confluent.kafka.serializers.KafkaAvroDeserializer
for keys with Avro schemas.value.deserializer
(for JVM clients): Which class to use for deserialization of values. Should under normal circumstances beio.confluent.kafka.serializers.KafkaAvroDeserializer
, although protobuf is also supported throughio.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
. Schemaless deserializers should only ever be used for legacy cases where migration to a schema type is not feasible.
Mandatory producer-only properties
key.serializer
(for JVM clients): Which class to use for serialization of keys. Commonlyorg.apache.kafka.common.serialization.StringSerializer
is used, but there are serializers both for more specific types as well asio.confluent.kafka.serializers.KafkaAvroSerializer
for keys with Avro schemas.value.serializer
(for JVM clients): Which class to use for serialization of values. Should under normal circumstances beio.confluent.kafka.serializers.KafkaAvroSerializer
, although protobuf is also supported throughio.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
. Schemaless serializers should only ever be used for legacy cases where migration to a schema type is not feasible.
Optional properties
Kafka provides a fairly large amounts of configurable properties. This section describes some settings more commonly used in Entur. For a complete listing, see Kafka Consumer Configurations for Confluent Platform | Confluent Documentation and Kafka Producer Configurations for Confluent Platform | Confluent Documentation.
Optional consumer properties
specific.avro.reader
determines whether to use SpecificRecord or GenericRecord for deserializing Avro messages. SpecificRecord is nearly always preferred in Entur. See "Using Avro Schemas" for details.session.timeout.ms
determines the maximum length of time between received heartbeats before consumer is to be considered failed by Kafka, which then leads to rebalancing the consumer group. A too low value may also result in (forcing) rebalancing and rejoining, in turn impacting performance. Setting a too high value will also introduce long delays in failure detection (such as an ungraceful shutdown of consumers), potentially causing lengthy delays before partitions assigned to failed consumers are rebalanced to fresh ones.max.poll.interval.ms
sets the maximum interval between polls from the consumer. This is useful for recovering from a consumer getting stuck in some sort of processing failure or loop, even if it is still emitting heartbeatsmax.poll.records
sets the maximum number of records sent by Kafka in each polling response. Use this to balance length of time between calls and latency. Increasing this setting can be useful if processing of records is expensive enough to cause a large batch to exceed the max.poll.interval.ms setting.auto.offset.reset
determines behavior if no prior offset has been committed for the consumer group and partition in question. latest will set the starting offset to the latest message, while earliest starts from the lowest offset present on the partition. In short, an offset is a unique id and index for a message (in a partition), i.e. the position for a message, which may be used to look up a message. Offsets are used by consumers in order to consume messages (from the offset position).
Optional producer properties
acks
determines the number of acknowledgements of a message from separate brokers the producer requires before considering a request complete. This provides a degree of control over the balance between performance and guaranteed delivery. Common settings are0
,1
orall
retries
controls the number of times to resend a request that fails with a potentially transient error, such as network failures. Typically one uses either 0 or maxint, in the latter case allowing the delivery.timeout.ms configuration to control retry behavior.enable.idempotence
sets whether to guarantee exactly one copy of each message is written. This requires retries to be greater than 0 (otherwise, idempotence is implicitly guaranteed),acks
set toall
andmax.in.flight.requests.per.connection
no greater than5
.max.in.flight.requests.per.connection
controls the maximum number of unacknowledged requests the client will send before blocking.delivery.timeout.ms
controls the amount of time to wait for a call to send() to return before considering the request failed.compression.type
sets the compression type to use, if any. Valid values arenone
,gzip
,snappy
,lz4
orzstd
. This saves both network capacity and disk storage space, especially for repetitive structures like XML or JSON. When using this, it is a good idea to consider the CPU implications for producers and consumers, as well as your batch.size setting. For more on compression, see "Compression".batch.size
determines the upper bound of a batch size before it is sent to the cluster, measured in bytes. The producer will wait for at most thelinger.ms
time before sending a batch if there are fewer bytes than thebatch.sizesetting
before sending. Note thatlinger.ms
defaults to0
, meaning all messages are sent immediately without regard for thebatch.size
setting unlesslinger.ms
is also set.