The Arroyo team is very excited to announce the release of Arroyo 0.11, which includes a number new features and improvements on top of 0.10, including pipeline clusters, a new, lightweight way to run streaming pipelines on serverless infrastructure, sqlite support for the control plane, a new configuration system, refinement of the create pipeline UX, and more.
In this release, we are thrilled to welcome two new contributors to the project:
- @benjamin-awd made their first contribution in #609
- @chokosabe made their first contribution in #652
Thanks to all of our contributors for this release:
Excited to try things out? Download a binary for your platform or run in Docker with
$ docker run -p 5115:5115 ghcr.io/arroyosystems/arroyo:0.11
Table of Contents
Pipeline clusters
Today Arroyo operates as a distributed cluster, with a controller node that manages a collection of pipelines, which themselves are scheduled on Arroyo nodes or Kubernetes.
Arroyo 0.11 introduces a new, lighter-weight way to run individual jobs, which we're calling pipeline clusters.
It works like this:
CREATE TABLE coinbase (
type TEXT,
price TEXT
) WITH (
connector = 'websocket',
endpoint = 'wss://ws-feed.exchange.coinbase.com',
subscription_message = '{
"type": "subscribe",
"product_ids": [
"BTC-USD"
],
"channels": ["ticker"]
}',
format = 'json'
);
select avg(cast(price as FLOAT)) from coinbase
group by hop(interval '1 second', interval '10 seconds');
Here we've created a SQL file containing an Arroyo query and run it directly with the
arroyo binary with the new arroyo run
command. This starts up a completely self-contained
minicluster that starts and manages the provided query. When the process is signalled to exit
(via ctrl-c or SIGINT) the pipeline is stopped with a final checkpoint so it can be resumed
later. If a sink isn't specified (as an INSERT INTO statement), the default STDOUT sink is used,
which means you can consume pipeline outputs with UNIX pipes:
$ arroyo run query.sql > outputs.json
Pipeline clusters are great for running pipelines locally or in CI. But they also make it possible to run streaming pipelines in light-weight container runtimes like Amazon Fargate, Google Cloud Run, and Fly.io.
When running on a container runtime, state can be snapshotted and restored from
an object store like S3 or GCS with the --state-dir
flag. This means stateful,
consistent, and reliable processing even on transient resources.
See the pipeline cluster docs for more details, and watch out for more tutorials and guides to running Arroyo on various serverless runtimes.
Sqlite control plane
In 0.10, we slimmed down the architecture of Arroyo, from a half-a-dozen services to three— one Arroyo service plus Postgres for configuration and metadata, and Prometheus to power the metrics features of the Web UI. In 0.11 we've finished this work:
- Sqlite is now supported as the storage engine for the control plane, powering the pipeline configuration and API
- We've replaced prometheus with a small, purpose-built metrics system to power the built-in metrics features (with prometheus-compatible metrics still available for integration with external observability systems)
The entire Arroyo system can now run off a single, statically-linked, 150MB binary.
This greatly reduces the complexity and resource requirements of a minimal Arroyo deployment, which can now run comfortably in lightweight container hosting.
- Add Sqlite support for the control plane by @mwylde in #626
- Support metrics for web UI natively instead of using prometheus by @mwylde in #630
- Migrate arroyo-bin cmd to arroyo and consolidate docker images by @mwylde in #659
Configuration
Arroyo 0.11 ships with a completely new configuration system, replacing the ad-hoc environment variables used previously. This makes the system easier and more consistent to customize. We've also added comprehensive documentation on the various configuration options.
The new system is hierarchical; at the base is a
default config
shipped with the engine. This can be overridden by Toml or Yaml files placed in
the Users or system config directory (on Linux for example
~/.config/arroyo/config.toml
), an arroyo.toml
file in the current directory, then a config file passed on the
command line with the new --config
option. Finally, everything can be overridden by environment variables.
For example, on my Mac I have the following local config in my ~/Library/Application Support/arroyo/config.toml
file
checkpoint-url = "/Users/mwylde/arroyo/checkpoints"
artifact-url = "/Users/mwylde/arroyo/artifacts"
[database]
type = "sqlite"
On a per-run basis I can override this with environment variables, for example, to use Postgres instead
$ ARROYO__DATABASE__TYPE=postgres arroyo cluster
See the configuration docs for all of the options and full details on how the configuration system works.
Existing configuration environment variables (like SCHEDULER
, DATABASE_HOST
, etc.) will continue to be
supported with a warning in 0.11, but will be removed in 0.12.
Create pipeline and preview improvements
We've reworked the UX for creating a pipeline and previewing it in the Web UI, focusing on performance and increased information density. It looks like this:
We've made the sidebars collapsible and integrated other UI elements to increase the amount of vertical and horizontal space for your queries and results.
The result table is now a high-performance virtual renderer, allowing us to increase the number of results shown to 10,000 (from 20 today), and the columns are now resizable, re-orderable, and sortable.
For power users there are few new options, including a checkbox to enable sinks in preview (by default they are disabled to avoid polluting external systems with test data) and set the parallelism on pipeline start.
- Redesign create pipeline UX and pipeline outputs for greater density and performance by @mwylde in #663
Raw bytes
Arroyo supports a number of formats (including JSON, Avro, and Parquet) but there will always be a long-tail of formats (potentially company-internal) that aren't built in.
Now it's possible to build your own decoders using UDFs and the new raw_bytes format.
When I started in streaming, the classic demo was analyzing Twitter data to find popular hashtags. Unfortunately, today Twitter API access costs thousands of dollars a month and is locked down. Fortunately there are now more open Twitter alternatives, like Bluesky which publishes all of its messages using the open AT protocol. And a website called Firesky has already done the hard work of aggregating these events into a websocket feed.
That would be great for Arroyo, except that its data is encoded in a binary format called cbor, a kind of binary JSON.
With raw_bytes and the serde-json crate this is no longer an issue!
We'll start with a simple UDF that converts cbor to JSON:
/*
[dependencies]
serde_cbor = "0.11"
serde_json = "1"
serde = {version = "1", features = ["derive"]}
serde-transcode = "1"
*/
use arroyo_udf_plugin::udf;
#[udf]
fn cbor_to_json(data: &[u8]) -> Option<String> {
let mut deserializer = serde_cbor::Deserializer::from_slice(data);
let mut buf = std::io::BufWriter::new(Vec::new());
let mut serializer = serde_json::Serializer::new(&mut buf);
serde_transcode::transcode(&mut deserializer, &mut serializer).ok()?;
let bytes = buf.into_inner().unwrap();
Some(String::from_utf8(bytes).ok()?)
}
(we could have also processed directly on the cbor data, but the tools for dealing with JSON are a bit more convenient).
Then, we can use this in a query to count the hashtags:
create table firesky (
value BYTEA
) with (
connector = 'websocket',
endpoint = 'wss://firesky.tv/ws/app',
format = 'raw_bytes'
);
create view tags as (
select unnest(extract_json(cbor_to_json(value),
'$.info.post.facets[*].features[*].tag')) as tag
from firesky);
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY window
ORDER BY count DESC) as row_num
FROM (SELECT count(*) as count,
tag,
hop(interval '5 seconds', interval '15 minutes') as window
FROM tags
group by tag, window)) WHERE row_num <= 5;
- Add raw_binary format by @mwylde in #625
- Support UDFs on raw_bytes by @mwylde in #665
- Bump and release version 0.1.1 of UDF lib by @mwylde in #666
SQL
Array columns
It's now possible to define array columns in CREATE TABLE statements:
CREATE TABLE transactions (
time DATETIME,
user_id TEXT,
orders TEXT[]
) with (
connector = 'kafka',
bootstrap_servers = 'localhost:9092',
type = 'source',
topic = 'transactions',
format = 'json'
);
Joining on struct fields
Joining on struct fields is now supported. This is most often useful for performing joins on windows,
which are defined as a struct with a start
and end
field. For example, this query finds the auctions
with the most bids over a sliding window and relies on joining on a window:
SELECT AuctionBids.auction, AuctionBids.num
FROM (
SELECT
auction,
count(*) AS num,
hop(interval '2 second', interval '10 seconds') as window
FROM bid
GROUP BY auction, window
) AS AuctionBids
JOIN (
SELECT
max(CountBids.num) AS maxn,
CountBids.window
FROM (
SELECT
count(*) AS num,
hop(interval '2 second', interval '10 seconds') as window
FROM bid
GROUP BY auction, window
) AS CountBids
GROUP BY CountBids.window
) AS MaxBids
ON AuctionBids.window = MaxBids.window AND AuctionBids.num >= MaxBids.maxn;
Upgrade notes
There are several breaking changes in this release for users to be aware of when upgrading.
UDF String types
In UDFs, TEXT columns were previously passed in as owned strings (the Rust String
type); in 0.11 they are instead
string references (&str
). This helps avoid unnecessary copies, improving performance, and aligns the handling
of strings with bytes. Values are still returned as owned strings, however.
For example:
#[udf]
fn to_uppercase(s: &str) -> String {
s.to_uppercase()
}
Service ports
Default ports for services have been changed to reduce the chance of overlap with other services a user may have running. In particular, the default HTTP port for the API and WebUI is now 5115. Other service defaults are now
- Controller gRPC: 5116
- Compiler gRPC: 5117
- Node gRPC: 5118
- Admin HTTP: 5114
Helm image overrides
The configuration for image overrides has changed to be more consistent with Kubernetes standards and to unify the configuration between the controller and the worker. It now looks like this:
image:
repository: ghcr.io/arroyosystems/arroyo
tag: "tip"
imagePullPolicy: IfNotPresent
imagePullSecrets:
- name: my-secret
arroyo-single docker image
The arroyo-single
docker image is being phased out, in favor of a single
arroyo
image. Previously, the arroyo-single image differed by including
Postgres and Prometheus in order to provide a complete, out-of-the-box
experience. With sqlite support and the removal of prometheus in 0.11, we
no longer need external dependencies for this. We are tagging
arroyo
as arroyo-single
as well for the immediate future, but suggest that users
switch over.
Improvements
- Add integration tests for most API endpoints by @mwylde in #620
- Better error messages when a service fails to bind by @mwylde in #624
- Update to arrow 51 and Datafusion 37.1 by @mwylde in #623
- Allow env var substitution in UDF dependencies by @benjamin-awd in #608
- Add ability to set group_id prefix for kafka by @mwylde in #615
- Report error message in helm when required configs are not set by @FourSpaces in #617
- Add object store cache for GCS by @benjamin-awd in #634
- Support dynamically updating the checkpoint interval by @mwylde in #636
- Unify sql error handling on datafusion errors to improve error wrapping by @mwylde in #643
- Clean up old preview pipelines and filter out non-running pipelines on controller start by @mwylde in #637
- Deploy pods directly for k8s instead of replica sets and k8s robustness improvements by @mwylde in #646
- Support per-slot k8s resource management by @mwylde in #649
- Introduce jitter into initial checkpoints to reduce checkpoint alignment across pipelines on controller restart by @mwylde in #661
- Submit programs via RPC instead of env var or files by @mwylde in #667
- Setting imagePullSecrets for workers by @chokosabe in #652
- Unify worker and controller images in helm chart by @mwylde in #669
- Change default ports to avoid clashes by @mwylde in #670
Fixes
- Support writing non-updating queries to debezium sinks. by @jacksonrnewhouse in #612
- Flush kafka on close by @jacksonrnewhouse in #619
- Cascade deletes for sqlite by @mwylde in #635
- Fix broken metric graphs when subtask has no data by @mwylde in #648
- Fix regression in DATABASE_PORT config by @mwylde in #650
Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.10.3...v0.11.0