layout: docs title: Getting Started ---
Getting dependency
To use kafka4s in an existing SBT project with Scala 2.13 or a later version, add the following dependencies to your
build.sbt
depending on your needs:
libraryDependencies ++= Seq(
"com.banno" %% "kafka4s" % "<version>"
)
Some quick examples
First, some initial imports:
import cats._, cats.effect._, cats.implicits._, scala.concurrent.duration._
Define our data
We'll define a toy message type for data we want to store in our Kafka topic.
case class Customer(name: String, address: String)
case class CustomerId(id: String)
Create our Kafka topic
Now we'll tell Kafka to create a topic that we'll write our Kafka records to.
First, let's bring some types and implicits into scope:
import com.banno.kafka._, com.banno.kafka.admin._
import org.apache.kafka.clients.admin.NewTopic
We'll use Avro4s for serialization.
import com.banno.kafka.avro4s._
Now we can create a topic named customers.v1
with 1 partition and 1 replica:
val topic = new NewTopic("customers.v1", 1, 1.toShort)
// topic: NewTopic = (name=customers.v1, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs=null)
val kafkaBootstrapServers = "localhost:9092" // Change as needed
// kafkaBootstrapServers: String = "localhost:9092"
import cats.effect.unsafe.implicits.global
AdminApi.createTopicsIdempotent[IO](kafkaBootstrapServers, topic :: Nil).unsafeRunSync()
Register our topic schema
Let's register a schema for our topic with the schema registry!
First, we bring types and implicits into scope:
import com.banno.kafka.schemaregistry._
We'll use the name of the topic we created above:
val topicName = topic.name
// topicName: String = "customers.v1"
Now we can register our topic key and topic value schemas:
val schemaRegistryUri = "http://localhost:8091" // Change as needed
// schemaRegistryUri: String = "http://localhost:8091"
import cats.effect.unsafe.implicits.global
SchemaRegistryApi.avro4s.register[IO, CustomerId, Customer](
schemaRegistryUri, topicName
).unsafeRunSync()
Write our records to Kafka
Now let's create a producer and send some records to our Kafka topic!
We first bring our Kafka producer utils into scope:
import com.banno.kafka.producer._
As of kafka4s-6.x, producers are traced with
Natchez, so an implicit Trace[IO]
is required. See the Natchez
backends for more
production solutions.
import natchez.Trace.Implicits.noop
Now we can create our producer instance:
val producer = ProducerApi.Avro.Generic.resource[IO](
BootstrapServers(kafkaBootstrapServers),
SchemaRegistryUrl(schemaRegistryUri),
ClientId("producer-example")
)
// producer: Resource[IO, ProducerApi[IO, org.apache.avro.generic.GenericRecord, org.apache.avro.generic.GenericRecord]] = Allocate(
// resource = cats.effect.kernel.Resource$$$Lambda$10690/0x0000000102d9b840@dfdb7b3
// )
And we'll define some customer records to be written:
import org.apache.kafka.clients.producer.ProducerRecord
val recordsToBeWritten = (1 to 10).map(a => new ProducerRecord(topicName, CustomerId(a.toString), Customer(s"name-${a}", s"address-${a}"))).toVector
// recordsToBeWritten: Vector[ProducerRecord[CustomerId, Customer]] = Vector(
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(1), value=Customer(name-1,address-1), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(2), value=Customer(name-2,address-2), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(3), value=Customer(name-3,address-3), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(4), value=Customer(name-4,address-4), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(5), value=Customer(name-5,address-5), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(6), value=Customer(name-6,address-6), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(7), value=Customer(name-7,address-7), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(8), value=Customer(name-8,address-8), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(9), value=Customer(name-9,address-9), timestamp=null),
// ProducerRecord(topic=customers.v1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=CustomerId(10), value=Customer(name-10,address-10), timestamp=null)
// )
And now we can (attempt to) write our records to Kafka:
producer.use(p => recordsToBeWritten.traverse_(p.sendSync))
// error: value sendSync is not a member of com.banno.kafka.producer.ProducerApi[cats.effect.IO,org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord]
// did you mean sendAsync?
// producer.use(p => recordsToBeWritten.traverse_(p.sendSync))
// ^^^^^^^^^^
The above fails to compile, however! Our producer writes generic
ProducerRecord
s, but we'd like to send typed records, to ensure that
our CustomerId
key and our Customer
value are compatible with our
topic. For this, we can use Kafka4s' avro4s
integration!
Writing typed records with an Avro4s producer
Turning a generic producer into a typed producer is simple. We first ensure that com.sksamuel.avro4s.RecordFormat
instances for our data are in scope:
implicit val CustomerRecordFormat = com.sksamuel.avro4s.RecordFormat[Customer]
// CustomerRecordFormat: com.sksamuel.avro4s.RecordFormat[Customer] = com.sksamuel.avro4s.RecordFormat$$anon$1@3a1f9309
implicit val CustomerIdRecordFormat = com.sksamuel.avro4s.RecordFormat[CustomerId]
// CustomerIdRecordFormat: com.sksamuel.avro4s.RecordFormat[CustomerId] = com.sksamuel.avro4s.RecordFormat$$anon$1@21f53169
And with those implicits in scope, we can create our producer:
val avro4sProducer = producer.map(_.toAvro4s[CustomerId, Customer])
// avro4sProducer: Resource[IO, ProducerApi[[A]IO[A], CustomerId, Customer]] = Bind(
// source = Allocate(
// resource = cats.effect.kernel.Resource$$$Lambda$10690/0x0000000102d9b840@dfdb7b3
// ),
// fs = cats.effect.kernel.Resource$$Lambda$10818/0x0000000102ea7840@70294830
// )
We can now write our typed customer records successfully!
import cats.effect.unsafe.implicits.global
avro4sProducer.use(p =>
recordsToBeWritten.traverse_(r => p.sendAsync(r).flatMap(rmd => IO(println(s"Wrote record to ${rmd}"))))
).unsafeRunSync()
Read our records from Kafka
Now that we've stored some records in Kafka, let's read them as an fs2.Stream
!
We first import our Kafka consumer utilities:
import com.banno.kafka.consumer._
Now we can create our consumer instance.
By default, kafka4s
consumers shift blocking calls to a dedicated
ExecutionContext
backed by a singleton thread pool, to avoid blocking the main
work pool's (typically ExecutionContext.global
) threads, and as a simple
synchronization mechanism because the underlying Java client KafkaConsumer
is
not thread-safe. After receiving records, work is then shifted back to the work
pool.
And here's our consumer, which is using Avro4s to deserialize the records:
val consumer = Avro4sConsumer.resource[IO, CustomerId, Customer](
BootstrapServers(kafkaBootstrapServers),
SchemaRegistryUrl(schemaRegistryUri),
ClientId("consumer-example"),
GroupId("consumer-example-group")
)
// consumer: Resource[IO, ConsumerApi[IO, CustomerId, Customer]] = Bind(
// source = Bind(
// source = Allocate(
// resource = cats.effect.kernel.Resource$$$Lambda$10690/0x0000000102d9b840@631e5ba4
// ),
// fs = com.banno.kafka.consumer.ConsumerApi$Avro$$$Lambda$10821/0x0000000102ea9040@35504ab5
// ),
// fs = cats.effect.kernel.Resource$$Lambda$10818/0x0000000102ea7840@1e154b56
// )
With our Kafka consumer in hand, we'll assign to our consumer our topic partition, with no offsets, so that it starts reading from the first record, and read a stream of records from our Kafka topic:
import org.apache.kafka.common.TopicPartition
val initialOffsets = Map.empty[TopicPartition, Long] // Start from beginning
// initialOffsets: Map[TopicPartition, Long] = Map()
import cats.effect.unsafe.implicits.global
val messages = consumer.use(c =>
c.assign(topicName, initialOffsets) *> c.recordStream(1.second).take(5).compile.toVector
).unsafeRunSync()
Because the producer and consumer above were created within a Resource
context, everything was closed and shut down properly.
Now that we've seen a quick overview, we can take a look at more in-depth documentation of Kafka4s utilities.