The Arroyo team is pleased to announce the release of Arroyo 0.8.0. This release includes a number of new connectors, including a FileSystem source, Delta Lake sink, and a Redis sink. There are also other new features like Avro support, global UDFs, and more.
Arroyo is an open-source stream processing engine, that allows anyone to build correct, reliable, and scalable real-time data pipelines using SQL.
Read on for more, and check out our docs for full details on existing and new features.
Thanks to all our contributors for this release:
Want to try out the new features? You can easily start a local instance of Arroyo 0.8 in Docker with:
$ docker run -p 8000:8000 ghcr.io/arroyosystems/arroyo-single:0.8.0
What's next
With 0.8 out the door, we're already working on Arroyo 0.9, to be released in early January. Planned features include asynchronous UDFs that are able to call out to external services and perform expensive tasks like ML inference. We are also working on streaming reads for filesystems and Delta Lake, the ability to bootstrap from batch data, support for joins with updating data, better handling of invalid data, and more.
Anything you'd like to see? Let us know on Discord!
Now on to the details.
Table of Contents
FileSystem source
Arroyo 0.5 added a high-performance FileSystem sink, capable of writing JSON and Parquet files to local or remote filesystems and object stores like S3. Now in 0.8, we're adding a corresponding FileSystem source.
The FileSystem source can be configured with a directory (on a local filesystem, or a remote object store like S3). It will read all files in that directory and emit them as records for processing, with support for JSON and Parquet files and a variety of compression codecs.
Files are read in parallel, enabling a very high rate of ingestion.
A FileSystem source can be created in SQL like this:
CREATE TABLE logs (
id BIGINT,
time TIMESTAMP,
host TEXT,
status INT
) WITH (
connector = 'filesystem',
type = 'source',
path = 's3://my-bucket/inputs/events',
compression_format = 'gzip',
format = 'json'
)
select * from logs;
In the next release, we will be improving our support for bootstrapping using the FileSystem source, which is the process of creating the initial state of the pipeline from historical data.
For example, you may have a window over 30 days of data. If you start reading today from Kafka, that will take 30
days to fully fill the window. But if you have the historical data available on S3, as is common in many modern
data architectures, you can read from both the FileSystem and Kafka sources in parallel with a SQL union
.
Full details on the FileSystem source are available in the docs.
Thanks to @rcjmurillo for this major contribution!
- S3 source connector + Parquet by @rcjmurillo and @jacksonrnewhouse in #403
Delta Lake Sink
Delta Lake is a popular open-source storage format, with support for ACID transactions, schema enforcement, time travel, and more. It's a great choice for modern data lakes and supported by many popular query engines.
In Arroyo 0.8 we've enhanced our existing FileSystem sink to support Delta Lake, allowing you to write data transactionally to Delta tables.
With Arroyo, a high-performance Kafka to Delta Lake pipeline can be created in SQL as easily as
CREATE TABLE events (
id BIGINT,
time TIMESTAMP,
host TEXT,
status INT
) WITH (
connector = 'kafka',
type = 'source',
topic = 'events',
format = 'json'
);
CREATE TABLE deltatable (
id BIGINT,
time TIMESTAMP,
host TEXT,
status INT
) WITH (
connector = 'delta',
path = 's3://arroyo-deltalake/delta_uuid_remote_checkpoints',
format = 'parquet',
'filename.strategy' = 'uuid'
);
INSERT INTO deltatable
SELECT * FROM events;
See the Delta Lake docs for more details.
- Implement Delta Lake Sink by @jacksonrnewhouse in #385
Redis Sink
For applications that rely on real-time data Redis is a popular choice for a fast, in-memory state store. Now Arroyo can sink its results directly to Redis, providing a great option for serving state.
Redis supports a variet of data structures. In the initial release of the Redis sink, we support writing to String tables, Lists, and Hashes. Keys can be constructed dynamically from columns in the record.
For example, to write results to a Redis String table, you can use a SQL statement like
CREATE TABLE redis (
user_id TEXT,
count INT
) WITH (
connector = 'redis',
type = 'sink',
format = 'json',
'address' = 'redis://localhost:6379',
target = 'string',
'target.key_prefix' = 'counts.',
'target.key_column' = 'user_id'
);
INSERT INTO redis
SELECT user_id, count(*)
FROM events
GROUP BY user_id, hop(interval '5 seconds', interval '1 hour');
This will write JSON-encoded values to keys like counts.fred
for the user fred
.
Writes are performed efficiently using Redis pipelines and can achieve throughputs of millions of writes per second on a single Redis node.
Both standalone and cluster modes are supported.
See the Redis connector docs for more details.
Avro
Arroyo today supports JSON, Parquet, and custom string formats. New in 0.8 is support for Avro, including deep integration with Confluent Schema Registry.
Avro-encoded Kafka sources can be easily created using the Confluent Schema Registry, using the Arroyo UI or API:
From SQL, those sources can be queried like any other:
select * from pizza_orders;
When reading from schema registry sources, Arroyo will use the schema registry to decode the data with full support for schema evolution.
It's also possible to specify your schema directly, although without a schema registry you will not be able to evolve your schemas.
Avro sinks are now supported as well, allowing you to write Avro-encoded data to Kafka and other Avro-compatible systems. When using the Schema Registry, Arroyo will automatically register new schemas as they are encountered.
create table output with (
connector = 'kafka',
type = 'sink',
bootstrap_servers = 'localhost:9092',
'schema_registry.endpoint' =
'http://localhost:8081',
format = 'avro',
'avro.confluent_schema_registry' = 'true',
'topic' = 'outputs'
);
insert into output
select * from source;
See the Avro format docs for more details.
- Initial support for Avro formats by @mwylde in #386
- Add support for writing Avro data by @mwylde in #422
Schema Registry improvements
Arroyo supports Confluent Schema Registry for loading and storing schemas of data read and written to Kafka.
We've made several improvementsto our schema registry support in addition to adding Avro support (see above).
- It's now possible to write JSON data to Kafka using a pre-existing schema that has been registered with the schema registry. This provides a more efficient way to write data to Kafka for use with Kafka Connect and other systems that rely on having a schema (previously Arroyo supported an option to embed the schema in the message, which is less efficient).
- We now support using schema registries that require authentication, for example Confluent Cloud.
See the Kafka connector docs for more details.
- Implement json sink for schema registry, use connection types for sinks, add auth for schema registry by @jacksonrnewhouse in #416
UDFs
UDFs (user-defined functions) and UDAFs (user-defined aggregate functions) have become a widely-used feature of Arroyo, allowing users to extend Arroyo with custom logic written in Rust.
Arroyo 0.8 includes a number of improvements to UDFs, making them easier to use and more powerful.
Global UDFs
In earlier versions of Arroyo, UDFs could be defined within a single pipeline, but could not be easily shared. It's common to have a set of UDFs that are useful across an organization, for example to parse custom formats or implementing business logic.
Now in 0.8, UDFs can be defined globally, and used across multiple pipelines. Along with this, we've also completely reworked the UDF editing experience:
Custom UDF dependencies
UDFs can now depend on a custom set of external Rust crates (libraries) by specifying them as a special comment in the UDF definition. Previously, UDFs had a few built-in dependencies (including serde_json and regex) but now any Rust crate can be used.
For example, the access_log_parser crate provides a comprehensive suite of web server log parsers. It's now possible to write a UDF that uses this crate to parse access logs and extract fields from them.
/*
[dependencies]
access_log_parser = "0.8"
serde_json = "1"
*/
use access_log_parser::{parse, AccessLogError, LogType, LogEntry};
use serde_json::json;
use std::time::{UNIX_EPOCH, Duration};
pub fn parse_log(input: String) -> Option<String> {
let LogEntry::CommonLog(entry) =
parse(LogType::CommonLog, &input).ok()? else {
return None
};
Some(json!({
"ip": entry.ip,
"user": entry.user,
"timestamp": entry.timestamp.to_rfc3339(),
"request": format!("{:?}", entry.request),
"status_code": entry.status_code.as_u16()
}).to_string())
}
Then this UDF can be used in SQL, for example this query will count the number of 500 errors in 5 minute sliding windows:
create table logs (
value TEXT NOT NULL,
parsed TEXT GENERATED ALWAYS AS (parse_log(value)),
event_time TIMESTAMP GENERATED ALWAYS AS
(CAST(extract_json_string(parse_log(value), '$.timestamp') as TIMESTAMP))
) with (
connector = 'kafka',
type = 'source',
format = 'raw_string',
bootstrap_servers = 'localhost:9092',
topic = 'apache_logs',
'source.offset' = 'earliest'
);
SELECT count(*)
FROM logs
WHERE extract_json(parsed, '$.status_code')[1] = '500'
GROUP BY hop(interval '5 seconds', interval '5 minutes');
As part of this change, the previously built-in dependencies have been removed, so going forward all crates will need to be specified explicitly.
SQL enhancements
Schema inferrence
Arroyo is now able to infer schemas for connection tables in some cases, making it easier to define sources and sinks without needing to specify the schema manually.
This is particularly useful for sinks, which previously needed to have the schema specified manually. Now, if the schema is not specified, Arroyo can infer it from how the table is used.
For example, a sink table can now be created like this:
CREATE TABLE output WITH (
connector = 'kafka',
type = 'sink',
bootstrap_servers = 'localhost:9092',
format = 'json',
'topic' = 'outputs'
);
without needing to specify the fields.
It can be written to like this:
INSERT INTO output
SELECT id as id, count(*) as count
FROM events
GROUP BY id, hop(interval '5 seconds', interval '1 hour');
Note that when using schema inference, the field names will be determined by the select statement, so it's important to use aliases to ensure the correct names are used.
Connection profiles in SQL
Some connectors, like Kafka or Redis, have a concept of a "connection profile" which contains common configuration that will be used across many tables. For example, for Kafka this contains the bootstrap servers, auth information, and schema registry.
These connection profiles could be used when creating connection tables via the Web UI and API, but not when creating them via SQL. Now in 0.8, connection profiles can be created in the Web UI and then used in SQL as well.
For example, given a connection profile called "local_kafka" a Kafka source can be created like this:
CREATE TABLE events WITH (
connector = 'kafka',
type = 'source',
format = 'json',
'connection_profile' = 'local_kafka'
);
- Connection profiles sql by @jacksonrnewhouse in #421
IN
operator
SQL Arroyo now supports the IN
operator in SQL, allowing you to filter records based on a list of values.
For example, to filter records where the status
field is either 400 or 401, you can use a query like:
SELECT * FROM events
WHERE status IN (400, 401);
- Add support for InList expressions. by @jacksonrnewhouse in #418
Improvements
- Refactor api types into modules by @jbeisen in #377
- Add headers and multiple subscription messages to Websocket source by @jbeisen in #379
- Bump datafusion to 31.0, parquet to 46.0 by @jacksonrnewhouse in #381
- bump object_store dependency to 0.7.1 by @jacksonrnewhouse in #380
- Use udfs crate for compilation in addition to checking by @mwylde in #382
- Reduce the size of cars.json so tests run more quickly by @jacksonrnewhouse in #383
- Filesystem write configurations by @harshit2283 in #392
- Refactor FilenamingStrategy behavior to share code by @jacksonrnewhouse in #409
- Make UDFs public when added to schema provider by @jbeisen in #407
- Pin deltalake to not bump arrow version. by @jacksonrnewhouse in #413
- Test code-gen for JSON Schema and Avro by @jacksonrnewhouse in #408
- Update dependencies by @mwylde in #415
- Docker pin pnpm by @jacksonrnewhouse in #419
Fixes
- Fix deeply-nested generated forms by @mwylde in #388
- Reset nexmark sources if parallelism is changed by @jacksonrnewhouse in #389
- Make sure a final checkpoint is taken before rescaling by @jacksonrnewhouse in #400
- Clear pipeline before checking by @jbeisen in #401
- Fix schema editor by @mwylde in #404
- Make the StorageProvider's get() and put() calls relative to the full path by @jacksonrnewhouse in #410
- Make sure delta lake timestamps are microsecond grain by @jacksonrnewhouse in #411
- RawString schema's value now submitted as non-nullable by @jacksonrnewhouse in #414
Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.7.0...v0.8.0