Getting dependency

To use kafka4s in an existing SBT project with Scala 2.13 or a later version, add the following dependencies to your build.sbt depending on your needs:

libraryDependencies ++= Seq(
  "com.banno" %% "kafka4s" % "<version>"
)

Some quick examples

First, some initial imports:

import cats._, cats.effect._, cats.implicits._, scala.concurrent.duration._

Define our data

We’ll define a toy message type for data we want to store in our Kafka topic.

case class Customer(name: String, address: String)
case class CustomerId(id: String)

Create our Kafka topic

Now we’ll tell Kafka to create a topic that we’ll write our Kafka records to.

First, let’s bring some types and implicits into scope:

import com.banno.kafka._, com.banno.kafka.admin._
import org.apache.kafka.clients.admin.NewTopic

Now we can create a topic named customers.v1 with 1 partition and 1 replica:

val topic = new NewTopic("customers.v1", 1, 1.toShort)
// topic: NewTopic = (name=customers.v1, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs=null)
val kafkaBootstrapServers = "localhost:9092" // Change as needed
// kafkaBootstrapServers: String = "localhost:9092"
import cats.effect.unsafe.implicits.global
AdminApi.createTopicsIdempotent[IO](kafkaBootstrapServers, topic :: Nil).unsafeRunSync()

Register our topic schema

Let’s register a schema for our topic with the schema registry!

First, we bring types and implicits into scope:

import com.banno.kafka.schemaregistry._

We’ll use the name of the topic we created above:

val topicName = topic.name
// topicName: String = "customers.v1"

Now we can register our topic key and topic value schemas:

val schemaRegistryUri = "http://localhost:8091" // Change as needed
// schemaRegistryUri: String = "http://localhost:8091"
import cats.effect.unsafe.implicits.global
SchemaRegistryApi.register[IO, CustomerId, Customer](
  schemaRegistryUri, topicName
).unsafeRunSync()

Write our records to Kafka

Now let’s create a producer and send some records to our Kafka topic!

We first bring our Kafka producer utils into scope:

import com.banno.kafka.producer._

Now we can create our producer instance:

val producer = ProducerApi.Avro.Generic.resource[IO](
  BootstrapServers(kafkaBootstrapServers),
  SchemaRegistryUrl(schemaRegistryUri),
  ClientId("producer-example")
)
// producer: Resource[IO, ProducerApi[IO, org.apache.avro.generic.GenericRecord, org.apache.avro.generic.GenericRecord]] = Allocate(
//   resource = cats.effect.kernel.Resource$$$Lambda$7182/1167825769@7d87d305
// )

And we’ll define some customer records to be written:

import org.apache.kafka.clients.producer.ProducerRecord
val recordsToBeWritten = (1 to 10).map(a => new ProducerRecord(topicName, CustomerId(a.toString), Customer(s"name-${a}", s"address-${a}"))).toVector
// recordsToBeWritten: Vector[ProducerRecord[CustomerId, Customer]] = Vector(
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(1), value=Customer(name-1,address-1), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(2), value=Customer(name-2,address-2), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(3), value=Customer(name-3,address-3), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(4), value=Customer(name-4,address-4), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(5), value=Customer(name-5,address-5), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(6), value=Customer(name-6,address-6), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(7), value=Customer(name-7,address-7), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(8), value=Customer(name-8,address-8), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(9), value=Customer(name-9,address-9), timestamp=null),
//   ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(10), value=Customer(name-10,address-10), timestamp=null)
// )

And now we can (attempt to) write our records to Kafka:

producer.use(p => recordsToBeWritten.traverse_(p.sendSync))
// error: type mismatch;
//   org.apache.kafka.clients.producer.ProducerRecord[org.apache.avro.generic.GenericRecord|repl.MdocSession.App.CustomerId, org.apache.avro.generic.GenericRecord|repl.MdocSession.App.Customer] => cats.effect.IO[org.apache.kafka.clients.producer.RecordMetadata]
// producer.use(p => recordsToBeWritten.traverse_(p.sendSync))
//                                                ^^^^^^^^^^

The above fails to compile, however! Our producer writes generic ProducerRecords, but we’d like to send typed records, to ensure that our CustomerId key and our Customer value are compatible with our topic. For this, we can use Kafka4s’ avro4s integration!

Writing typed records with an Avro4s producer

Turning a generic producer into a typed producer is simple. We first ensure that com.sksamuel.avro4s.RecordFormat instances for our data are in scope:

implicit val CustomerRecordFormat = com.sksamuel.avro4s.RecordFormat[Customer]
// CustomerRecordFormat: com.sksamuel.avro4s.RecordFormat[Customer] = com.sksamuel.avro4s.RecordFormat$$anon$1@18d1882f
implicit val CustomerIdRecordFormat = com.sksamuel.avro4s.RecordFormat[CustomerId]
// CustomerIdRecordFormat: com.sksamuel.avro4s.RecordFormat[CustomerId] = com.sksamuel.avro4s.RecordFormat$$anon$1@4b954863

And with those implicits in scope, we can create our producer:

val avro4sProducer = producer.map(_.toAvro4s[CustomerId, Customer])
// avro4sProducer: Resource[IO, ProducerApi[IO[A], CustomerId, Customer]] = Bind(
//   source = Allocate(
//     resource = cats.effect.kernel.Resource$$$Lambda$7182/1167825769@7d87d305
//   ),
//   fs = cats.effect.kernel.Resource$$Lambda$7269/760167558@1bd99667
// )

We can now write our typed customer records successfully!

import cats.effect.unsafe.implicits.global
avro4sProducer.use(p =>
  recordsToBeWritten.traverse_(r => p.sendSync(r).flatMap(rmd => IO(println(s"Wrote record to ${rmd}"))))
).unsafeRunSync()

Read our records from Kafka

Now that we’ve stored some records in Kafka, let’s read them as an fs2.Stream!

We first import our Kafka consumer utilities:

import com.banno.kafka.consumer._

Now we can create our consumer instance.

By default, kafka4s consumers shift blocking calls to a dedicated ExecutionContext backed by a singleton thread pool, to avoid blocking the main work pool’s (typically ExecutionContext.global) threads, and as a simple synchronization mechanism because the underlying Java client KafkaConsumer is not thread-safe. After receiving records, work is then shifted back to the work pool.

And here’s our consumer, which is using Avro4s to deserialize the records:

val consumer = ConsumerApi.Avro4s.resource[IO, CustomerId, Customer](
  BootstrapServers(kafkaBootstrapServers),
  SchemaRegistryUrl(schemaRegistryUri),
  ClientId("consumer-example"),
  GroupId("consumer-example-group")
)
// consumer: Resource[IO, ConsumerApi[IO, CustomerId, Customer]] = Bind(
//   source = Bind(
//     source = Allocate(
//       resource = cats.effect.kernel.Resource$$$Lambda$7182/1167825769@7aee132f
//     ),
//     fs = com.banno.kafka.consumer.ConsumerApi$Avro$$$Lambda$7272/109375717@402cad23
//   ),
//   fs = cats.effect.kernel.Resource$$Lambda$7269/760167558@20abbd9a
// )

With our Kafka consumer in hand, we’ll assign to our consumer our topic partition, with no offsets, so that it starts reading from the first record, and read a stream of records from our Kafka topic:

import org.apache.kafka.common.TopicPartition
val initialOffsets = Map.empty[TopicPartition, Long] // Start from beginning
// initialOffsets: Map[TopicPartition, Long] = Map()
import cats.effect.unsafe.implicits.global
val messages = consumer.use(c =>
  c.assign(topicName, initialOffsets) *> c.recordStream(1.second).take(5).compile.toVector
).unsafeRunSync()

Because the producer and consumer above were created within a Resource context, everything was closed and shut down properly.

Now that we’ve seen a quick overview, we can take a look at more in-depth documentation of Kafka4s utilities.