Arroyo is a modern stream processing engine, designed to perform complex SQL operations on millions of records per second. After deciding that we needed to build a new streaming engine, the next question was: what language do we use?
It's probably not a spoiler that we ended up writing Arroyo in Rust. This has turned out to be an excellent decision, enabling our small team to build a state-of-the-art streaming system in under a year.
So why did we decide to use Rust, and how has it worked out?
A (short) history of languages for data infra
C++
The dawn of modern data infrastructure can be traced to the publishing of three monumental Google papers: Google File System (2003) and Map Reduce (2004), followed by Bigtable (2006). These papers described how Google was able to store, process, and serve the petabytes of data that made up their index of the early web.
But even more interesting was how they were doing it. At a time when ābig dataā was handled by supercomputers and mainframes, the most important startup in Silicon Valley was using networks of thousands of cheap Linux machines. This laid out the template for how data processing would be done for the next twenty years.
These systems were written in C++, then the standard for systems software. C++ offered precise control over memory and allowed Google to get the most out of its hardwareāessential when your servers were 1.4 Ghz Pentium IIIs with 2GB of ram!
Java
Inspired by the Google papers, an engineer named Doug Cutting decided to build his own implementations to power a new search engine. Later hired by Yahoo, this work led to the open-source Hadoop File System (HDFS) and Hadoop Map Reduce projects.
Cutting had previously built Lucene, the search index, in Java and stuck with the language for Hadoop. This had a huge impact. The next wave of big data systems emerged in the following years as more companies found they needed distributed data processing, including HBase, Cassandra, Storm, Spark, Hive, Presto. Nearly all were written in Java or other JVM languages.
And Java had many advantages over C++: its garbage collector (GC) freed programmers from tedious and error-prone manual memory management, and its VM meant Java could be run on many different architectures and OSes with minimal changes. The language was also much simpler than C++, allowing new engineers to ramp up quickly.
Go
Meanwhile Google was continuing to write its infrastructure in C++ and application-level software in Python and Java. In 2009, they released a new language designed for this application-level niche: Go. Designed in part by Rob Pike and Ken Thompsonālegends in the fieldāand with high-level concurrency primitives and web libraries, it quickly found adoption in the distributed systems space.
This was particularly catalyzed by the open-source release of Kubernetes in 2015. Kubernetes is a cluster scheduler, able to turn a fleet of servers or VMs into a common substrate for deploying applications. It arrived at a time when the industry was moving from static on-prem or VM deployment models to more flexible containerized deployments on the cloud.
It was initially written in Java, but at some point in its development migrated to Go. Its success led to a huge wave of Go data systems, including CockroachDB, InfluxDB, and Pachyderm.
C++ makes a comeback, and Rust enters the scene
The tech world is endlessly cyclical, and the past few years have seen another turn of the wheel. Project likes ScyllaDB and Redpanda have found success rewriting Java systems (Cassandra and Kafka, respectively) in C++ for higher performance and more predictable operations. New databases and query engines like DuckDB and Clickhouse are being written from scratch in C++.
Rust 1.0 was released in 2015 as a modern systems language, aiming for the same niche as C++. Rust has no GC, focuses on zero-cost abstractions, and provides low-level control over execution. Unlike C++, its compiler can check for safety violations (like using uninitialized memory or double-frees) and prevent race conditions in multithreaded code. Its become a common choice for both rewriting components of existing systems from Go and Java (TiKV, InfluxDB IoX) and for implementing new systems (Materialize, Readyset).
Why Rust
With that history covered, why did we decide to use Rust1? In short, Rust is extremely well suited to writing data infrastructure, combining the performance and control of C++ with the safety and ease of development of Java.
Excellent performance by default
It's possible to write fast code in Java or Go, but in Rust fast is the default. You have to go out of your way to opt out of efficiency, and Rust makes sure you always know the tradeoffs you're making.
The fastest Java code, for example, looks nothing like idiomatic Java. It relies heavily on primitives and arrays instead of objects and standard-library containers. It's written to minimize object creation and destruction to reduce the load on the GC. And for data-centric systems, the actual data storage is often handled by a C++ library like RocksDB, requiring slow communication through the Java Native Interface (JNI).
Rust was designed around zero-cost abstractions. This is a somewhat confusing concept, but the essence is: you pay for what you use. One part of this is that operations that, for example, allocate memory or context-switch are clearly called out. But more important is that Rust provides many powerful abstractions that do not have any runtime performance cost, and are generally as efficient as hand-written code.
For example, iterators and lambdas are powerful, high-level features, but code like this (taken from the Arroyo watermark system, determining the oldest watermark while taking into account idleness and potentially missing watermarks):
self.cur_watermark = self
.watermarks
.iter()
.fold(Some(Idle), |current, next| {
match (current?, (_next)?) {
(EventTime(cur), EventTime(next)) => Some(EventTime(cur.min(next)))
(Idle, EventTime(t)) | (EventTime(t), Idle) => Some(EventTime(t)),
(Idle, Idle) => Some(Idle),
}
});
will desugar and optimize to something at least as fast as a hand-written for loop. And because iterator code like this is more legible to the optimizer, it can often be more effectively vectorized than the hand-written loop.
No GC / Yes safety
There's a persistent myth that languages with garbage collection can't be fast or are unsuitable in low-latency environments. That isn't trueāsee all of the high-frequency trading done in Javaābut does require a deep level of knowledge of the behavioral and performance characteristics of your garbage collector.
Stream processing systems in particular tend to be difficult cases for GCs. Event processing produces a lot of objects very quickly, while storing some of them for unpredictable amounts of time. Operating JVM stream processors like Apache Flink at scale means becoming an expert in GC configuration and tuning, and still expecting occasional blow-ups2. Many users run with dramatically more memory than needed to avoid this.
Historically there was a tradeoff here. You could either deal with the challenges of scaling a GC or the bugs, crashes, and security issues in a manually-managed language like C++. But Rust found a way to square that: its borrow checker and lifetime system enable compile-time memory management.
This avoids both the need for GC (dynamic memory management) and allows the language to guarantee at compile-time that there are no memory safety errors in code (like use-after-frees or double frees)3.
Some will claim that it's possible to write memory-safe C/C++, but the long history of memory-related exploits belies that. As I write this, Google and Apple are urgently patching a buffer overflow in libwebp that is being actively exploited across the internet4.
Safety also begets performance. For example, Rust's safety guarantees makes it much easier to correctly use pointers to avoid copies. Arroyo's state backend (used for storing data needed for computations like windows) includes a cache that stores deserialized structs in a variety of purpose-specific data structures. When operators query their state, they may receive items from the cache if they are available.
To avoid copies, the query methods have signatures like this:
pub async fn get_time_range(
&mut self,
key: &mut K,
range: Range<SystemTime>,
) -> Vec<&V>
The return type is Vec<&V>
, a vector (growable array) of
immutable references. These references
are bound by the relevant lifetimes, preventing us from accidentally
use-after-freeing. And because they're immutable, the caller can't modify them
or free them. Returning references allows us to avoid potentially expensive
copies of the data, and Rust prevents us from violating our data structure's
correctness and safety properties. In C++, code like this would be very
dangerous and require a high degree of care from users.
If it compiles, it's probably correct
The Rust compiler is pedantic. It is the most obsessive code reviewer you have worked with5. If you pass a 32-bit integer to a function that expects a 64-bit integer, it will not let you. If you try to share a non-threadsafe data structure across threads your compile will fail. Ignore the fact that filesystem paths may be arbitrary bytes and try to use them as UTF-8 strings? Straight to compiler jail.
Some people will love this about Rust. Othersāwho just want to get something working dammitāwill hate it.
Put me in the first camp. I've spent enough time in my career debugging hard-to-reproduce bugs in production. This involves more upfront design work, and some frustration fighting with the compiler. But once you've satisfied it, the code ends up being correct an astonishingly high fraction of the time.
As one anecdote, early in Arroyo's history I spent a couple days writing a network stack to allow pipelines to be distributed across multiple nodes. After writing several thousand lines of complex code and getting it to compile, I ran it for the first time. It just worked. It has continued working with a few minor changes ever since.
Idiomatic Rust also encourages patterns that promote correctness, like heavy use
of Algebraic Datatypes (ADTs). ADTs come in two forms, product types (struct
)
and sum types (enum
). By combing these, you can concisely define your
application's data while preventing bugs. ADTs are pervasive in Rust, for
example in error handling (Option
and Result
).
But they can also make many optimizations easier and safer. For example, Cow (clone-on-write) in the standard library is an enum defined like this:
pub enum Cow<'a, B>
where
B: 'a + ToOwned + ?Sized,
{
Borrowed(&'a B),
Owned(<B as ToOwned>::Owned),
}
If you're not familiar with Rust definitions this might be a bit confusing, but essentially this is a type that can take one of two forms: either an owned copy of the data, or a borrowed reference.
Why is this useful? For a provider of a piece of data, there may be cases where
I am able to avoid a copy and return a reference to some other data I own6.
By returning a Cow
, I allow my consumers to handle both cases transparently
depending on whether they need their own copy of the underlying data or can just
operate on a reference. This pattern can save many unnecessary copies throughout
your codebase.
Or to take an example from Arroyo, here's an enum from our engine:
pub enum QueueItem {
Data(Arc<dyn Any + Send>),
Bytes(Vec<u8>),
}
Arroyo programs are graphs of dataflow, where the operators perform some potentially stateful computation (maps, filter, windows, etc.) and the edges transmit data between the operators. Because Arroyo is distributed, some of those operators will be running in the same process, and some will be on other machines. When sending data across the network we need to serialize it to bytes, but we want to avoid that when sending it on a queue to a local operator.
This enum allows us to provide data that is either a Vec<u8>
(serialized) or
an Arc<dyn Any>
(an in-memory object), avoiding unnecessary
serialization/deserialization while still preventing type confusion in the
consumers.
This also combines nicely with Rust's trait system. For example, we provide this
implementation of the TryFrom
trait, which allows consumers to directly
convert a QueueItem into a Message
(the core type of the Arroyo dataflow):
impl<K: Key, T: Data> TryFrom<QueueItem> for Message<K, T> {
fn try_from(value: QueueItem) -> Result<Self> {
match value {
QueueItem::Data(datum) => _datum.downcast()?,
QueueItem::Bytes(bs) => {
bincode::decode_from_slice(&bs, config::standard())?.0
}
}
}
}
// used like this
let message: Message<K, T> = item.try_from()?;
It is of course still possible to write logic bugs in Rust. But the compiler, along with the equally pedantic standard library and correctness-focused programming patterns like ADTs make it easier to write correct programs than any other language I've used.
Cargo and the crate ecosystem
Building a distributed SQL streaming engine is a lot of work. Just parsing SQL is a huge and involved project7. We've been able to make so much progress as a small team because we stand on the shoulders of giants.
Cargo is the standard Rust build tool and package manager, and crates.io is the third-party library repository. Unlike the fragmented C/C++ world with CMake, Bazel, Buck, and many others, Rust has a single way to build packages, share code, and integrate third party libraries.
Is it perfect? No. But having one way to do it has allowed an incredible package ecosystem to develop in a relatively short time. On crates.io you can find state-of-the-art datastructures, powerful concurrency primitives, an incredible serialization framework, among tens of thousands of other useful libraries.
For Arroyo, perhaps the most valuable of these is DataFusion, part of the Apache Arrow project. As a product, it's a very fast SQL query engine. But for us (and many others in the Rust data space) it's also a library that can power other SQL engines.
Arroyo uses DataFusion to parse (turn textual SQL into an abstract-syntax tree), plan (turning that AST into a graph of computation), and optimize (rewriting that graph according to optimization rules) queries. Each of these are complex, challenging problems that are somewhat orthogonal to the needs of a particular query engine. It is not an exaggeration to say that it has saved us thousands of hours of development.
The hard parts
While we are very satisfied with our choice of Rust, the experience hasn't all been successful compiles and benchmark-beating performance. After writing 2000 words on why we love the language, I think it's worth covering some of the challenges we've had as well.
It takes time to pick up
Rust has a rap as a hard language. This is clearly somewhat deserved; it's much easier to go from a Java or Python background and get something working in Go than Rust8. Learning to work with the borrow checker and lifetime systems takes time, and traits and macros can be more complex than the equivalents in other languages.
We've seen that contributors new to Rust take several months to really get up to speed. For an early-stage startup those months can especially drag momentum out of the gate. If we hadn't had one team member (me) who was already a pretty experienced Rust programmer it would have been much harder.
Async is still pretty rough
Much has been written about async rust. Async/await is a large set of features, first released in 2019, that make it easier to write non-blocking code. While many languages have adopted async/await, the Rust implementation is very different. Rust does not require a runtime and is designed to be usable in settings ranging from microcontrollers to supercomputers. This limits the design space for high-level features like async/await. The solution that Rust came up with is unusual for programmers coming from backgrounds like Go or Javascript in ways that leads to a lot of confusion9.
And if you're writing networked services in Rust, async is pretty much unavoidable at this point. Nearly all of the network and service-oriented frameworks rely on async. For Arroyo, these include tonic (gRPC), hyper (HTTP), object_store (reading/writing to S3), kube-rs, and many more.
It doesn't matter if you think that your problem might be well solved without async. In fact, the original prototype of Arroyo used threads. Since each operator subtask processes messages serially, this actually works pretty well. But we quickly migrated to async for the above reason: the library ecosystem. This migration took some time (and many questions for the very helpful Tokio discord) but in the end turned out to be great for performance (off the bat, we got about a 30% throughput boost).
After a few months of writing async Rust I felt pretty comfortable and productive. But there are still a number of gaps and missing features. The biggest of these is async traits, which will be partially stabilized soon. There are a number of other sharp edges that make async still feel like a work in progress four years on, like the difficulty constructing and naming async closures, or passing references across await points.
No stable ABI
Rust lacks a stale application binary interface (ABI). What does that mean? If I compile a binary with Rust version X, and try to link it against a shared library compiled with Rust version Y there is no guarantee that they will be compatible.
This is a problem for any system that wishes to allow users to dynamically load code, for example user-defined functions or custom sources/sinks.
There are several workarounds available, like using the C ABI or WASM but they come with compromises for performance and expressiveness.
Wrapping up
We're very excited to continue building Arroyo in Rust, and for the future of the Rust data infrastructure. While there are some warts and missing features, the combination of speed, safety, and library ecosystem makes it the obvious choice for building high-scale systems going forward.
Interested in getting involved? Find an issue to work on and come chat with the team in Discord.
Footnotes
-
One of the reasons is pretty selfish: I've been a Rust fan since encountering the language in 2014 and had been using it for personal projects for years. But I hadn't been able to introduce it (outside of small tools) at the companies I worked at, and starting a company was the first time no one could tell me no. ā©
-
After four years of running production Flink workloads at Lyft and Splunk, I could practically see GC logs in my sleep ā©
-
These guarantees hold only in safe rust code, which is the default. The
unsafe
allows users to manipulate raw memory in ways that could potentially violate them, which is necessary for some optimizations and for interacting with C libraries and the OS. However in practiceunsafe
is rare in Rust code bases; Arroyo has only a few unsafe blocks in tens of thousands of lines of code. ā© -
See the great write-up here: https://blog.isosceles.com/the-webp-0day/ ā©
-
I say this as someone who used to have my C++ PRs reviewed by one of the authors of Google's C++ style guideā¦ ā©
-
The most common example is
Cow<str>
, a clone-on-write string. When deserializing a string from a buffer, in some cases it may be possible to directly use the underlying bytes in the buffer to back the string (if it's valid utf-8), and in other cases you may need to copy and modify them. By returningCow<str>
instead ofString
(the owned String type) or&str
(a string reference) we're able to support both cases without extra copies. In Arroyo's dataflow, a node may have a single downstream node, or it may fan-out to multiple downstreams. This is a great case forCow
, because it means that in the first case we can give the nodes an owned object (because it's the only consumer), while in the latter we can give it a reference (since we'd otherwise have to copy it for each consumer). ā© -
And one I sincerely hope never to have to undertake ā©
-
Although folks with C++ backgrounds tend to pick up Rust much more easily ā©
-
If that describes you, this is a great overview of how async works in Rust: http://cliffle.com/blog/async-inversion/ ā©