Blog

Updates from the Arroyo team

Announcing Arroyo 0.8.0

Arroyo 0.8 is now available, with a new FileSystem source, Delta Lake sink, Redis sink, Avro support, global UDFs, and more.

Micah Wylde

Micah Wylde

CEO of Arroyo

The Arroyo team is pleased to announce the release of Arroyo 0.8.0. This release includes a number of new connectors, including a FileSystem source, Delta Lake sink, and a Redis sink. There are also other new features like Avro support, global UDFs, and more.

Arroyo is an open-source stream processing engine, that allows anyone to build correct, reliable, and scalable real-time data pipelines using SQL.

Read on for more, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

Want to try out the new features? You can easily start a local instance of Arroyo 0.8 in Docker with:

$ docker run -p 8000:8000 ghcr.io/arroyosystems/arroyo-single:0.8.0

What's next

With 0.8 out the door, we're already working on Arroyo 0.9, to be released in early January. Planned features include asynchronous UDFs that are able to call out to external services and perform expensive tasks like ML inference. We are also working on streaming reads for filesystems and Delta Lake, the ability to bootstrap from batch data, support for joins with updating data, better handling of invalid data, and more.

Anything you'd like to see? Let us know on Discord!

Now on to the details.

Table of Contents

FileSystem source

Arroyo 0.5 added a high-performance FileSystem sink, capable of writing JSON and Parquet files to local or remote filesystems and object stores like S3. Now in 0.8, we're adding a corresponding FileSystem source.

The FileSystem source can be configured with a directory (on a local filesystem, or a remote object store like S3). It will read all files in that directory and emit them as records for processing, with support for JSON and Parquet files and a variety of compression codecs.

Files are read in parallel, enabling a very high rate of ingestion.

A FileSystem source can be created in SQL like this:

CREATE TABLE logs (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'filesystem',
    type = 'source',
    path = 's3://my-bucket/inputs/events',
    compression_format = 'gzip',
    format = 'json'
)
 
select * from logs;

In the next release, we will be improving our support for bootstrapping using the FileSystem source, which is the process of creating the initial state of the pipeline from historical data.

For example, you may have a window over 30 days of data. If you start reading today from Kafka, that will take 30 days to fully fill the window. But if you have the historical data available on S3, as is common in many modern data architectures, you can read from both the FileSystem and Kafka sources in parallel with a SQL union.

Full details on the FileSystem source are available in the docs.

Thanks to @rcjmurillo for this major contribution!

Delta Lake Sink

Delta Lake is a popular open-source storage format, with support for ACID transactions, schema enforcement, time travel, and more. It's a great choice for modern data lakes and supported by many popular query engines.

In Arroyo 0.8 we've enhanced our existing FileSystem sink to support Delta Lake, allowing you to write data transactionally to Delta tables.

With Arroyo, a high-performance Kafka to Delta Lake pipeline can be created in SQL as easily as

CREATE TABLE events (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'kafka',
    type = 'source',
    topic = 'events',
    format = 'json'
);
 
CREATE TABLE deltatable (
    id BIGINT,
    time TIMESTAMP,
    host TEXT,
    status INT
) WITH (
    connector = 'delta',
    path = 's3://arroyo-deltalake/delta_uuid_remote_checkpoints',
    format = 'parquet',
    'filename.strategy' = 'uuid'
);
 
INSERT INTO deltatable
SELECT * FROM events;

See the Delta Lake docs for more details.

Redis Sink

For applications that rely on real-time data Redis is a popular choice for a fast, in-memory state store. Now Arroyo can sink its results directly to Redis, providing a great option for serving state.

Redis logo

Redis supports a variet of data structures. In the initial release of the Redis sink, we support writing to String tables, Lists, and Hashes. Keys can be constructed dynamically from columns in the record.

For example, to write results to a Redis String table, you can use a SQL statement like

CREATE TABLE redis (
    user_id TEXT,
    count INT
) WITH (
    connector = 'redis',
    type = 'sink',
    format = 'json',
    'address' = 'redis://localhost:6379',
    target = 'string',
    'target.key_prefix' = 'counts.',
    'target.key_column' = 'user_id'
);
 
INSERT INTO redis
SELECT user_id, count(*)
FROM events
GROUP BY user_id, hop(interval '5 seconds', interval '1 hour');

This will write JSON-encoded values to keys like counts.fred for the user fred.

Writes are performed efficiently using Redis pipelines and can achieve throughputs of millions of writes per second on a single Redis node.

Both standalone and cluster modes are supported.

See the Redis connector docs for more details.

Avro

Arroyo today supports JSON, Parquet, and custom string formats. New in 0.8 is support for Avro, including deep integration with Confluent Schema Registry.

Avro-encoded Kafka sources can be easily created using the Confluent Schema Registry, using the Arroyo UI or API:

From SQL, those sources can be queried like any other:

select * from pizza_orders;

When reading from schema registry sources, Arroyo will use the schema registry to decode the data with full support for schema evolution.

It's also possible to specify your schema directly, although without a schema registry you will not be able to evolve your schemas.

Avro sinks are now supported as well, allowing you to write Avro-encoded data to Kafka and other Avro-compatible systems. When using the Schema Registry, Arroyo will automatically register new schemas as they are encountered.

create table output with (
    connector = 'kafka',
    type = 'sink',
    bootstrap_servers = 'localhost:9092',
    'schema_registry.endpoint' =
      'http://localhost:8081',
    format = 'avro',
    'avro.confluent_schema_registry' = 'true',
    'topic' = 'outputs'
);
 
insert into output
select * from source;

See the Avro format docs for more details.

Schema Registry improvements

Arroyo supports Confluent Schema Registry for loading and storing schemas of data read and written to Kafka.

We've made several improvementsto our schema registry support in addition to adding Avro support (see above).

  • It's now possible to write JSON data to Kafka using a pre-existing schema that has been registered with the schema registry. This provides a more efficient way to write data to Kafka for use with Kafka Connect and other systems that rely on having a schema (previously Arroyo supported an option to embed the schema in the message, which is less efficient).
  • We now support using schema registries that require authentication, for example Confluent Cloud.

See the Kafka connector docs for more details.

  • Implement json sink for schema registry, use connection types for sinks, add auth for schema registry by @jacksonrnewhouse in #416

UDFs

UDFs (user-defined functions) and UDAFs (user-defined aggregate functions) have become a widely-used feature of Arroyo, allowing users to extend Arroyo with custom logic written in Rust.

Arroyo 0.8 includes a number of improvements to UDFs, making them easier to use and more powerful.

Global UDFs

In earlier versions of Arroyo, UDFs could be defined within a single pipeline, but could not be easily shared. It's common to have a set of UDFs that are useful across an organization, for example to parse custom formats or implementing business logic.

Now in 0.8, UDFs can be defined globally, and used across multiple pipelines. Along with this, we've also completely reworked the UDF editing experience:

Custom UDF dependencies

UDFs can now depend on a custom set of external Rust crates (libraries) by specifying them as a special comment in the UDF definition. Previously, UDFs had a few built-in dependencies (including serde_json and regex) but now any Rust crate can be used.

For example, the access_log_parser crate provides a comprehensive suite of web server log parsers. It's now possible to write a UDF that uses this crate to parse access logs and extract fields from them.

/*
[dependencies]
access_log_parser = "0.8"
serde_json = "1"
*/
 
use access_log_parser::{parse, AccessLogError, LogType, LogEntry};
use serde_json::json;
use std::time::{UNIX_EPOCH, Duration};
 
pub fn parse_log(input: String) -> Option<String> {
    let LogEntry::CommonLog(entry) =
        parse(LogType::CommonLog, &input).ok()? else {
            return None
    };
 
    Some(json!({
        "ip": entry.ip,
        "user": entry.user,
        "timestamp": entry.timestamp.to_rfc3339(),
        "request": format!("{:?}", entry.request),
        "status_code": entry.status_code.as_u16()
    }).to_string())
}
 

Then this UDF can be used in SQL, for example this query will count the number of 500 errors in 5 minute sliding windows:

create table logs (
    value TEXT NOT NULL,
    parsed TEXT GENERATED ALWAYS AS (parse_log(value)),
    event_time TIMESTAMP GENERATED ALWAYS AS
        (CAST(extract_json_string(parse_log(value), '$.timestamp') as TIMESTAMP))
) with (
    connector = 'kafka',
    type = 'source',
    format = 'raw_string',
    bootstrap_servers = 'localhost:9092',
    topic = 'apache_logs',
    'source.offset' = 'earliest'
);
 
SELECT count(*)
FROM logs
WHERE extract_json(parsed, '$.status_code')[1] = '500'
GROUP BY hop(interval '5 seconds', interval '5 minutes');

As part of this change, the previously built-in dependencies have been removed, so going forward all crates will need to be specified explicitly.

SQL enhancements

Schema inferrence

Arroyo is now able to infer schemas for connection tables in some cases, making it easier to define sources and sinks without needing to specify the schema manually.

This is particularly useful for sinks, which previously needed to have the schema specified manually. Now, if the schema is not specified, Arroyo can infer it from how the table is used.

For example, a sink table can now be created like this:

CREATE TABLE output WITH (
    connector = 'kafka',
    type = 'sink',
    bootstrap_servers = 'localhost:9092',
    format = 'json',
    'topic' = 'outputs'
);

without needing to specify the fields.

It can be written to like this:

INSERT INTO output
SELECT id as id, count(*) as count
FROM events
GROUP BY id, hop(interval '5 seconds', interval '1 hour');

Note that when using schema inference, the field names will be determined by the select statement, so it's important to use aliases to ensure the correct names are used.

Connection profiles in SQL

Some connectors, like Kafka or Redis, have a concept of a "connection profile" which contains common configuration that will be used across many tables. For example, for Kafka this contains the bootstrap servers, auth information, and schema registry.

Configuring a Redis connection in the Arroyo API

Configuring a Redis connection in the Arroyo UI

These connection profiles could be used when creating connection tables via the Web UI and API, but not when creating them via SQL. Now in 0.8, connection profiles can be created in the Web UI and then used in SQL as well.

For example, given a connection profile called "local_kafka" a Kafka source can be created like this:

CREATE TABLE events WITH (
    connector = 'kafka',
    type = 'source',
    format = 'json',
    'connection_profile' = 'local_kafka'
);

SQL IN operator

Arroyo now supports the IN operator in SQL, allowing you to filter records based on a list of values.

For example, to filter records where the status field is either 400 or 401, you can use a query like:

SELECT * FROM events
WHERE status IN (400, 401);

Improvements

Fixes

Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.7.0...v0.8.0