Blog

Updates from the Arroyo team

Using Kafka with Rust

Apache Kafka is a distributed log that's a great fit for streaming applications, microservice architectures, and more. In this post, we will learn how to use Kafka with applications written in Rust.

Micah Wylde

Micah Wylde

CEO of Arroyo

Kafka + Rust

Apache Kafka is a distributed log store and streaming platform. It was originally built as an internal system at LinkedIn, and subsequently open-sourced as an Apache project in 2011. Since then, it's become a component of nearly every large company's infrastructure, used for data pipelines, microservice communication, and more.

It's also a great complement to stream processing engines like Arroyo, which can read and write from Kafka to provide exactly-once processing.

Kafka (and its standard client library) are written in Java. But what if you want to use it from Rust, the best language for writing data infra?.

In this post, we'll learn how to use Kafka from Rust, building a simple chat app that will allow multiple users to communicate over Kafka. We'll cover the basics of producing and consuming messages, and then look at some more advanced topics like schemas, offset management, and high-throughput patterns.

Table of Contents

What is Kafka?

Kafka is an event-streaming platform. It enables producers to write streams of data that are read by consumers. Data in Kafka can be stored indefinitely, and can be consumed in many different ways.

Under the hood, it's implemented as a distributed, persistent log. That may not sound that useful, but the Kafka designers had an insight that many different patterns of stateful communication can be built on top of a log datastructure.

For example, Kafka can be used for pub-sub (where all messages are read by all consumers) and queueing (where each message is read by a single consumer) and various patterns in between. Unlike traditional message queues that drop their data once it's been consumed, Kafka can durably store messages to support patterns like backfill.

This turns out also be very useful for implementing state recovery in a system like Arroyo, as we can checkpoint our current position in the log and resume from that point later.

There are a few key concepts to be aware of:

  • Topic: A named, durable, partitioned log of data; for example 'mobile-events'
  • Partition: A single log for a topic; messages within each partition are ordered
  • Record: An individual item in a Kafka partition, containing a timestamp, key, and value
  • Offset: The position of a record in its partition
  • Consumer group: A name given to a set of consumers that would like to spread out messages amongst them; only one consumer in a group will receive each message

Getting started

Setting up Kafka

First we'll need a Kafka cluster to talk to. There are a number of ways to get access to a Kafka cluster:

  1. Running Kafka in Docker, for example using the Bitnami Kafka Image
  2. Use a hosted Kafka provider, like Confluent Cloud or Upstash
  3. Run Kafka locally on your computer

For this tutorial, I'll assume that you're using the local option, but I'll call out how to adjust for other Kafka clusters. This will require that Java 11 or newer is installed.

Traditionally Kafka has required ZooKeeper, a distributed coordination service to run, but newer versions can use the built-in "kraft" mode instead. We'll use that to avoid needing to run another service.

Run these commands to download and run the Kafka server:

$ curl -O https://downloads.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz
$ tar xvfz kafka_2.12-3.6.0.tgz
$ cd kafka_2.12-3.6.0
$ export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c \
    config/kraft/server.properties
$ bin/kafka-server-start.sh config/kraft/server.properties

You should now be able to create a topic. In another terminal, cd to the kafka_2.12-3.6.0 directory and run

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 \
    --topic chat --create

If this doesn't work, ensure that the server is still running without errors in the other tab.

Rust

For this tutorial, you will need a Rust installation if you don't already have one. The easiest way to install Rust is with the rustup script:

$ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

After that completes, you should be able to create a new rust project using Cargo:

$ cargo new kafka-tutorial

and run it

$ cd kafka-tutorial
$ cargo run
cargo run
   Compiling kafka-tutorial v0.1.0 (/Users/mwylde/kafka-tutorial)
    Finished dev [unoptimized + debuginfo] target(s) in 0.93s
     Running `target/debug/kafka-tutorial`
Hello, world!

Rust Kafka clients

There are two Kafka rust libraries available.

  • Kafka-rust is a pure-Rust implementation of the Kafka protocol
  • Rust-rdkafka is a wrapper for rdkafka, the officially-supported C client library

Most Rust users prefer pure-Rust crates, as they are likely to be safer than C libraries and are easier to build within Cargo. However, in this case I recommend using rust-rdkafka. It is much more actively developed, and the core rdkafka is very fast and high-quality. The Rust wrapper around it adds great ergonomics and support for async.

To add rust-rdkafka to our new application, we can use cargo:

$ cargo add rdkafka

By default, rust-rdkafka is built with a statically-linked version of the underlying rdkafka C library, however this behavior can be configured via feature flags. See the rust-rdkafka docs for the details.

We will also be using the Tokio runtime for async and the uuid crate, so we'll add those:

$ cargo add tokio --features full
$ cargo add uui --features v4

Building a chat app

Now that we have a Kafka cluster and a Rust project, let's build a simple chat app, with clients that will communicate over Kafka.

We'll start with some scaffolding code in our main.rs file:

use tokio::io::{AsyncWriteExt};
 
#[tokio::main] // starts the Tokio runtime for async
async fn main() {
    let mut stdout = tokio::io::stdout();
 
    stdout.write(b"Welcome to Kafka chat!\n").await.unwrap();
}

We're using the tokio::io::stdout function to get a handle to stdout, and then using the tokio::io::AsyncWriteExt trait to write to it asynchronously. This is necessary because we'll be using Tokio's async runtime, and it's not advisable to mix sync and async code.

Sending messages

First we'll work on sending messages to Kafka. We'll create a FutureProducer, which is a struct provided by rdkafka to asynchronously produce messages.

fn create_producer(bootstrap_server: &str) -> FutureProducer {
    ClientConfig::new()
        .set("bootstrap.servers", bootstrap_server)
        .set("queue.buffering.max.ms", "0")
        .create().expect("Failed to create client")
}

Next we'll extend the main function to listen to stdin and send lines read from it to Kafka:

use std::env::args;
use rdkafka::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
 
#[tokio::main]
async fn main() {
    let mut stdout = tokio::io::stdout();
    let mut input_lines = BufReader::new(tokio::io::stdin()).lines();
 
    stdout.write(b"Welcome to Kafka chat!\n").await.unwrap();
 
 
    // Creates a producer, reading the bootstrap server from the first command-line argument
    // or defaulting to localhost:9092
    let producer = create_producer(&args().skip(1).next()
        .unwrap_or("localhost:9092".to_string()));
 
    let mut stdout = tokio::io::stdout();
    let mut input_lines = BufReader::new(tokio::io::stdin()).lines();
 
    loop {
        // Write a prompt to stdout
        stdout.write(b"> ").await.unwrap();
        stdout.flush().await.unwrap();
 
        // Read a line from stdin
        match input_lines.next_line().await.unwrap() {
            Some(line) => {
                // Send the line to Kafka on the 'chat' topic
                producer.send(FutureRecord::<(), _>::to("chat")
                  .payload(&line), Timeout::Never)
                    .await
                    .expect("Failed to produce");
            }
            None => break,
        }
    }
}

We can try it out, using the Kafka command-line tools to read the messages. In the kafka directory, run

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic chat

then in your Rust project

$ cargo run
Welcome to Kafka chat!
> Hello world!

and you should see the message appear in the Kafka console consumer.

If you're using a hosted Kafka provider, you may need to pass in a different bootstrap server address when running the program, for example:

$ cargo run -- my-hosted-kafka:9092

Some hosted Kafka providers will also require authentication, which can be configured via sasl.username and sasl.password options when constructing the client.

Receiving messages

Now we'll work on receiving messages. We'll start by creating a StreamConsumer which will allow us to asynchronously consume messages from Kafka.

fn create_consumer(bootstrap_server: &str) -> StreamConsumer {
    ClientConfig::new()
        .set("bootstrap.servers", bootstrap_server)
        .set("enable.partition.eof", "false")
        // We'll give each session its own (unique) consumer group id,
        // so that each session will receive all messages
        .set("group.id", format!("chat-{}", Uuid::new_v4()))
        .create()
        .expect("Failed to create client")
}

Next we need to integrate the consumer into our main function. We'll begin with some setup:

use uuid::{Uuid};
use rdkafka::consumer::{Consumer, StreamConsumer};
 
async fn main() {
  ...
  // create the consumer
  let consumer = create_consumer(&args().skip(1).next()
        .unwrap_or("localhost:9092".to_string()));
 
  // subscribe to our topic
  consumer.subscribe(&["chat"]).unwrap();

But then we have a problem. We'd like to be able to both read from stdin and from Kafka, such that when we get a new input we write it to Kafka, and when we get a new message from Kafka we output it. But we can only .await one future at a time.

Fortunately, Tokio provides a way to combine multiple futures, such that we can .await them all at once: the select! macro.

Adding in the consumer, our loop looks like:

loop {
    stdout.write(b"> ").await.unwrap();
    stdout.flush().await.unwrap();
 
    tokio::select! {
        message = consumer.recv() => {
            let message  = message.expect("Failed to read message").detach();
            let payload = message.payload().unwrap();
            stdout.write(payload).await.unwrap();
            stdout.write(b"\n").await.unwrap();
        }
        line = input_lines.next_line() => {
            match line {
                Ok(Some(line)) => {
                    producer.send(FutureRecord::<(), _>::to("chat")
                      .payload(&line), Timeout::Never)
                        .await
                        .expect("Failed to produce");
                }
                _ => break,
            }
        }
    }
}

In the select block, we set up two futures: one that will read from Kafka, and one that will read from stdin. Whichever one completes first, its corresponding handler will be run. On the next loop iteration we'll re-create the futures, so that we can continue to read from both sources.

Now we can run two instances of our program, and see that they can communicate with each other:

$ cargo run

Welcome to Kafka chat!
> Hey, what's new?
> Hey, what's new?
$ cargo run
Welcome to Kafka chat!
> Hey, what's new?
>

Adding usernames

Playing around with the app, you might notice some areas for improvement:

  • When a user sends a message, it's echoed back to them because we don't know who sent it
  • When a user sends a message, it's not clear who it's from

We can fix both of these by adding a username to each message. We'll ask the user for their username when they start the program, and then use it as the key for each message.

Kafka messages can have a key, which is used to determine which partition the message is written to. It can also be used to de-duplicate messages, and provide other context like (in our case) who sent the message.

We'll start by adding the username-asker to the main function:

stdout.write(b"Please enter your name: ").await?;
stdout.flush().await?;
let name = input_lines.next_line().await.unwrap().unwrap();

Then we'll update our producer to send the username as the key:

producer.send(FutureRecord::to("chat")
    .key(&name)
    .payload(&line), Timeout::Never)
    .await

And finally we'll update our consumer to print the username along with the message and skip messages that are from the current user:

let message  = message.expect("Failed to read message").detach();
let key = message.key().ok_or_else(|| "no key for message")?;
 
if key == name.as_bytes() {
    continue;
}
 
let payload = message.payload().ok_or_else(|| "no payload for message")?;
stdout.write(b"\t").await?;
stdout.write(key).await?;
stdout.write(b": ").await?;
stdout.write(payload).await?;
stdout.write(b"\n").await?;

Now we can run two instances of our program, and see that they can communicate with each other:

$ cargo run

Welcome to Kafka chat!
Please enter your name: Alice
> Hey, what's new?
$ cargo run
Welcome to Kafka chat!
Please enter your name: Bob
> Alice: Hey, what's new?

Final code

The final code for our chat app is below. I've added some extra features, including better error handling, and a welcome message when a user joins the chat.

use std::env::args;
use rdkafka::{ClientConfig, Message};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use uuid::{Uuid};
 
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut stdout = tokio::io::stdout();
    let mut input_lines = BufReader::new(tokio::io::stdin()).lines();
 
    stdout.write(b"Welcome to Kafka chat!\n").await?;
 
    let name;
    loop {
        stdout.write(b"Please enter your name: ").await?;
        stdout.flush().await?;
 
        if let Some(s) = input_lines.next_line().await? {
            if s.is_empty() {
                continue;
            }
 
            name = s;
            break;
        }
    };
 
    let producer = create_producer(&args().skip(1).next()
        .unwrap_or("localhost:9092".to_string()))?;
 
    stdout.write("-------------\n\n".as_bytes()).await.unwrap();
 
    producer.send(FutureRecord::to("chat")
        .key(&name)
        .payload(b"has joined the chat"), Timeout::Never)
        .await
        .expect("Failed to produce");
 
    let consumer = create_consumer(&args().skip(1).next()
        .unwrap_or("localhost:9092".to_string()))?;
 
    consumer.subscribe(&["chat"])?;
 
    loop {
        tokio::select! {
            message = consumer.recv() => {
                let message  = message.expect("Failed to read message").detach();
                let key = message.key().ok_or_else(|| "no key for message")?;
 
                if key == name.as_bytes() {
                    continue;
                }
 
                let payload = message.payload().ok_or_else(|| "no payload for message")?;
                stdout.write(b"\t").await?;
                stdout.write(key).await?;
                stdout.write(b": ").await?;
                stdout.write(payload).await?;
                stdout.write(b"\n").await?;
            }
            line = input_lines.next_line() => {
                match line {
                    Ok(Some(line)) => {
                        producer.send(FutureRecord::to("chat")
                            .key(&name)
                            .payload(&line), Timeout::Never)
                            .await
                        .map_err(|(e, _)| format!("Failed to produce: {:?}", e))?;
                    }
                    _ => break,
                }
            }
        }
    }
 
    Ok(())
}
 
 
fn create_producer(bootstrap_server: &str) -> Result<FutureProducer, Box<dyn std::error::Error>> {
    Ok(ClientConfig::new()
        .set("bootstrap.servers", bootstrap_server)
        .set("queue.buffering.max.ms", "0")
        .create()?)
}
 
fn create_consumer(bootstrap_server: &str) -> Result<StreamConsumer, Box<dyn std::error::Error>> {
    Ok(ClientConfig::new()
        .set("bootstrap.servers", bootstrap_server)
        .set("enable.partition.eof", "false")
        .set("group.id", format!("chat-{}", Uuid::new_v4()))
        .create()
        .expect("Failed to create client"))
}

Advanced topics

This tutorial has covered the basics of using Kafka from Rust, but for a real application there are many more things to consider.

Formats and Schemas

Kafka does not define any semantics for the data that is written to itā€”as far as Kafka is concerned, it's just a stream of bytes. While in this example we just wrote UTF-8 strings, in a real application you'll likely want to use a more structured format, like JSON, Avro, or Protobuf. The Rust ecosystem has great support for all of these formats. I'd recommend

Another consideration is schema management. If you're using a structured format like Avro or Protobuf, you'll need to provide the schema to consumers. This is particularly challenging for Avro, as decoding an Avro message requires the exact schema that was used to write it. For protobuf, you just need to ensure that a compatible schema is used.

Even for JSON, which is unstructured, you'll likely want to provide some guidance to consumers about what fields to expect. This can be done by using a schema language like JSON Schema.

Confluent Schema Registry

Confluent provides a Schema Registry that can be used to store and distribute schemasā€”this is widely used within the Kafka ecosystem, and particularly handy if you're using Kafka Connect.

However, consuming and producing data that's compatible with the Schema Registry requires some extra work. Messages are encoded using a special format that includes a schema ID:

+--------+------+--------+---------+-----------+
|   Magic Byte  |  Schema ID (INT) |  Payload  |
+--------+------+--------+---------+-----------+

So even JSON-encoded messages will not be straightforward UTF-8 text, but has an extra 5 bytes prepended to the data that has to be stripped off before decoding.

Avro messages are written with the same header, followed by an individual Avro datum. Readers will need to fetch the referenced schema id from the Schema Registry, and configure their Avro decoder to use that schema and read just a single datum rather than a full Avro file. In the Rust apache_avro library, this can be done with the from_avro_datum function.

The Rust crate schema_registry_converter handles these complexities for you, and can be used to produce and consume messages that are compatible with the Schema Registry.

Offset management

As mentioned in the introduction, Kafka stores messages in partitions, and each message has an offset within its partition. Consumers can read from any offset, and can commit their current offset to Kafka so that they can resume from that point later.

This tutorial relied on the default offset management strategy, which is to have the consumer commit read offsets automatically every 5 seconds.

For consumers that are doing some processing on the messages, it's usually important to not miss a message even on failure. The typical strategy for this is to keep track of which messages have been fully processed, and only commit the offset once processing for all messages up to that point has completed.

In Rust, this can be done by using the commit method on the consumer:

let mut topic_partitions = TopicPartitionList::new();
    topic_partitions.add_partition_offset(
        "chat", 0, Offset::Offset(message.offset()))?;
    consumer.commit(&topic_partitions, CommitMode::Async)?;

Consumer groups

In this tutorial, we used a unique consumer group for each session, so that each session would receive all messages. This is a good strategy for a chat app, but for other applications you may want to have a single consumer group across multiple workers. With that setup, the messages in the topic will be partitioned across the workers, and each message will be processed by only one worker.

Under the hood, each consumer in a group is assigned a set of partitions. When a new consumer joins the group, there is a rebalancing process where the partitions are redistributed across the consumers in the group such that each partition is only consumed by one consumer.

In Rust, this just requires set the group.id config option to the same value for each service in the group when creating the consumer.

Producing asynchronously

In this tutorial, we used producer.send and called .await on the result. This is a simple way to reliably produce messages, but can only send one message at a time. If you're producing a lot of messages this can be a bottleneck.

We also set queue.buffering.max.ms to 0, which means that messages are sent immediately. This provides the lowest latency but hurts throughput significantly as each message requires a round-trip to the Kafka server.

For high-throughput applications, you will want to use a higher buffer time and send multiple messages in parallel.

One option is to just drop the future, and allow the message to be send in the background. However, doing this means that you have no way to know if the message was successfully sent or not.

A better option is to spawn a new tokio task to send the message, which can then have its own error handling logic, for example

let result = producer.send_result(FutureRecord::to("chat")
        .key(&name)
        .payload(&line)).map_err(|(e, _)|
            format!("Failed to enqueue message: {:?}", e))?;
 
tokio::spawn(async move {
    if let Err(e) = result.await {
        eprintln!("Failed to produce: {}", e);
    }
});

Note that we're using send_result instead of send. The difference here has to do with the internal buffering that the Kafka producer performs. send will return immediately, and the producer will attempt asynchronously to add it to the buffer within the timeout that's specified in the second argument. If it's not able to be sent within the timeout, the future will complete with an error.

By contrast, send_result will wait until the message has been added to the buffer, and will return a future that will complete when the message has been sent to Kafka. This is easier to use from a tokio task, as we don't need we don't need to worry about the lifetime of the message.

Conclusion

At this point you should have a good understanding of how to use Kafka from Rust. Kafka is a powerful tool for building reliable distributed systems, but it can be very complex to use correctly with more sophisticated applications that involve aggregations, joins, and exactly-once requirements. A streaming engine like Arroyo can simplify this significantly by providing a higher-level abstraction for reading and writing from Kafka.

If you have any questions about Kafka or Rust, feel free to reach out to me on our Discord or Twitter.