Skip to main content

Consumer groups

All consumers belong to a consumer group. This functionality enables parallellization of consumers, ensuring that no two members of the same consumer group will process the same message. It also handles storage of offsets.

Choosing a consumer group name

A consumer group name needs to be a string of 255 characters or less, and can contain alphanumeric characters only. There is no explicit registration required ahead of time; if your consumer group id does not already exist when connecting it will simply be created.

Consumer group names should be easily recognizable, preferably named after the domain and service they belong to. Kafka provides no facility for determining who listens to a topic apart from the consumer group id, so having readily identifiable id strings makes support and troubleshooting simpler.

How do consumer groups work?

Kafka breaks topics into a number of partitions. The amount of partitions is configured upon the topic’s creation. Each consumer in a group is assigned a subset of these partitions when connecting. As a partition is the unit of parallellization, a consumer group may never have more active consumers than the topic has partitions. Any further consumers joining the group will be left in a ready state, receiving no messages unless one of the active consumers leaves the group.

Kafka will attempt to balance the number of partitions assigned to each consumer as evenly as possible. If a new consumer joins the consumer group or an existing consumer leaves, Kafka will perform what is known as a rebalancing. This involves redistributing topics between members to ensure an even distribution of partitions. Once redistribution is complete and each consumer group member has acknowledged the change, the group returns to its ordinary ready state.

Note that no consumer will receive any further messages while the consumer group is in a rebalancing state. It is thus important to note that if a consumer has a very long session timeout and exits non-gracefully (thus failing to explicitly leave the group), the next rebalancing will involve waiting until the dead consumer’s session times out before rebalancing can complete successfully.

Note also that each consumer group is considered stable for as long as any client within that group is active. This is also true across topics. We therefore recommend using one consumer group per topic that is consumed for common use cases.

Offsets and consumer groups

All messages are assigned an offset. Kafka stores the latest offset each consumer group has consumed for each partition, once they have been committed. When a new consumer is assigned a partition another group member has previously consumed messages from, it will pick up from the next offset. This allows for a consumer group to only read each message once. Note that auto-commit needs to be enabled (default) for consumed offsets to be committed to the cluster. There exist advanced use cases, such as where message consumption results in multiple effects, where the offset should be committed only after successful business logic execution. In other words, this will then provide an "at least once" guarantee for delivery.

Non-graceful shutdown

The JVM-based apache Kafka clients in Kotlin, as well as our Spring-starter library clients, are all Closeable. As such, upon application termination start, they will gracefully shut down by exiting their consumer group, and ensure that the latest offset has been committed. Note however, that if we simply kill off an application without waiting for a graceful shutdown (e.g. kill -9 <pid> or something of similar nature), a client will die without exiting the consumer group. As such, the cluster will still consider this client a member of the group until their max poll time and heartbeat interval has been exceeded. After this, the cluster will consider the client as dead, and it will then exit (or be removed from) the consumer group, which in turn will trigger consumer group rebalancing. Note that no consumer will receive messages during rebalancing. This will then impact all clients belonging to the same consumer group until it has been successfully rebalanced. As such, it is possible to end up in a state where we cannot consume messages for the max poll time or heartbeat interval duration after a non-graceful shutdown (and this should be avoided).

Resetting offsets

A rather neat way of resetting offsets for a consumer group is by using the timestamp that is stored in a record, along with its other metadata and message. This may be done CLI by using the following command:

    kafka-consumer-groups --bootstrap-server <bootstrap-server-url> --command-config <path-to-config-file.properties> --reset-offsets --group <consumer-group> --topic <topic-name> --to-datetime yyyy-MM-DDTHH:MM:ss.mmm --execute

Deleting a consumer group

There may exist situations in which you as a developer simply wish to delete a consumer group. An example includes during development, where your team may wish to re-use the same consumer group name, but you wish to reset all committed information pertaining to a consumer group, such as but not limited to its offsets for a topic. To do this, you may use the same CLI tool as above, by executing the --delete command;

    kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group <group_name>