kafka4s - Functional programming with Kafka and Scala

kafka4s provides pure, referentially transparent functions for working with Kafka, and integrates with FP libraries such as cats-effect and fs2.

Quick Start

To use kafka4s in an existing SBT project with Scala 2.12 or a later version, add the following dependencies to your build.sbt depending on your needs:

libraryDependencies ++= Seq(
  "com.banno" %% "kafka4s" % "<version>"
)

Sending records to Kafka is an effect. If we wanted to periodically write random integers to a Kafka topic, we could do:

Stream
  .awakeDelay[F](1 second)
  .evalMap(
    _ =>
      Sync[F]
        .delay(Random.nextInt())
        .flatMap(i => producer.sendAndForget(new ProducerRecord(topic, i, i)))
  )

Polling Kafka for records is also an effect, and we can obtain a stream of records from a topic. We can print the even random integers from the above topic using:

consumer.recordStream(
    initialize = consumer.subscribe(topic),
    pollTimeout = 1.second
  )
  .map(_.value)
  .filter(_ % 2 == 0)
  .evalMap(i => Sync[F].delay(println(i)))

Learning more

To learn more about kafka4s, start with our Getting Started Guide, play with some example apps, and check out the kafka4s Scaladoc for more info.