Skip to main content

Using Avro Schemas

When using the Kafka Avro Serialization/Deserialization classes, you may choose to either use GenericRecords or SpecificRecords. GenericRecord presents the fields of your Avro record in a nested map-like fashion, with field names as entry keys. SpecificRecords present data in instances of specific, pre-prepared classes. This makes SpecificRecords generally simpler to both interact with and avoid unintended schema updates from.

Team Data recommends client applications use SpecificRecord wherever possible.

Generating Avro SpecificRecord classes

While it is possible to manually implement SpecificRecord, it is generally tidier and easier to generate these from .avsc-files.

Writing a .avsc file

An avro schema is represented by a JSON file with a particular format. While an Avro schema can specify both a primitive type as well as records, we typically use records for values. A typical schema for a simple Avro record looks as follows:

{
"namespace": "io.entur.data.test",
"type": "record",
"name": "TestEvent",
"fields": [
{
"name": "timestamp",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "message",
"type": "string"
}
]
}

Supported field types are boolean, int, long, float, double, bytes, string, record, enum, array, map, fixed. For details on these, see the Avro specs here: Specification

Save your <filename>.avsc under src/main/avro or equivalent. If your .avsc-file is only used for testing, consider using src/test/avro.

Generating Java classes from .avsc

Once you have a schema defined, Java classes can be generated by use of Maven or Gradle plugins.

Gradle

plugins {
//...
id("com.github.davidmc24.gradle.plugin.avro") version "latest"
//...
}

dependencies {
implementation("org.apache.avro:avro:latest")
implementation("io.confluent:kafka-avro-serializer:latest")
}

This provides the Gradle tasks generateAvroJava and generateTestAvroJava which will generate Java code for your .avsc-files. The generated code can be found under build/generated-avro-java/<namespace from .avsc>/<schemaname>.java Please also note that you may wish to use specific versions for these libraries.

Maven

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>6.2.4</version>
</dependency>

<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

Using SpecificRecord classes

Once you have your SpecificRecord classes, usage becomes fairly simple.

Using entur-kafka-spring-starter

@Autowired
lateinit var producer: EnturKafkaProducer<TestEvent>

fun produce() {
producer.send(
MY_TOPIC_NAME,
myKey,
TestEvent(java.time.Instant.now(), "Trains are neat"),
myCorrelationId
)
}
@KafkaListener(topics = [MY_TOPIC_NAME], containerFactory = "enturListenerFactory")
fun receiveTestRecord(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) key: String,
@Payload value: TestEvent
) {
//do something with the received record
}

Using the base Kafka client library

To produce a message, simply pass the SpecificRecord instance to the Kafka producer’s send method directly. As an example, using TestEvent from earlier:

val myProducer = KafkaProducer<String, TestEvent>(configurationProperties)

myProducer.send(
ProducerRecord(
MY_TOPIC_NAME,
myKey,
TestEvent(
timestamp = java.time.Instant.now(),
message = "Trains are neat"
)
)
)

When consuming messages, specify the class when instantiating your consumer. For example:

val consumer = KafkaConsumer<String, TestEvent>(
mapOf(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to "true",
//(other configuration properties)
}\";"
)
)

//...
consumer.poll(Duration.ofSeconds(pollTimeout))
.map {it.value()}
.forEach { record -> //do something with record, which will be of type TestEvent }

If you did not write the Avro schema, but your team is simply consuming from a topic, you may download the schema from the schema-registry in order to generate Java classes from the schemas used for deseralisation. If we want the latest schema for the topic “topic-team-ownership” from the test-int cluster, we may simply download it from the schema-registry.

Avro primitives as top level in schemas

There are cases where Avro primitives are used as keys in Entur. This is considered a legacy pattern, and is not recommended by Team Data. Consider if you really need a schema for key values consisting of a single primitive - it may be simpler and sufficient to employ a simple schemaless String in these cases. Avro primitives as keys also suffers from occasional obscure compatibility issues with various tooling, requiring special measures to work properly.