Blog

Updates from the Arroyo team

Announcing Arroyo 0.5.0

Release 0.5 of Arroyo is all about connectors. We've added a high-performance transactional FileSystem sink, exactly-once Kafka support, a Kinesis connector, and more.

Micah Wylde

Micah Wylde

CEO of Arroyo

I'm excited to announce the release of Arroyo 0.5, the latest version of our open-source stream processing engine. This release is all about connectors: we've added a high-performance transactional FileSystem sink, exactly-once Kafka support, a Kinesis connector, and more.

Table of Contents

Overview

Arroyo 0.5 brings a number of new features and improvements to the Arroyo platform. The biggest of these is the new FileSystem connector, which is a high-performance, transactional sink for writing data to filesystems and object stores like S3. This allows Arroyo to write into data lakes and data warehouses. We've also added exactly-once support for Kafka sinks, a new Kinesis connector, expanded our SQL support, and made a number of improvements to the Web UI and REST API.

Read on for more details, and check out our docs for full details on existing and new features.

Thanks to all our contributors for this release:

What's next

With 0.5 out the door we're already hard at work on the next release, targeted for early September. The headline feature for 0.6 will be what we call event-history joins, which allow you to join new events against the entire history data for a given key—for example, when a user transaction comes in you can join against all of that user's previous activity.

Building on the FileSystem work in this release, we'll also be adding integration with Delta Lake. Along with a new, built-in Postgres sink we'll be expanding Arroyo's capabilities for ETL workloads. Finally, we're planning on introducing our first support for UDAFs (user-defined aggregate functions), to allow more powerful extensions to Arroyo's SQL support.

Anything else you'd like to see? Let us know on Discord!

Now on to the details.


Features

FileSystem connector

Columnar files (like Parquet) on S3 have become the de-facto standard for storing data at rest, combining low cost of storage with decent query performance. Modern query engines like Trino, ClickHouse, and DuckDB can operate directly on these files, as can many data warehouses like Snowflake and Redshift.

parquet + s3

And with the new FileSystem connector, Arroyo can efficiently perform real-time ETL into these S3-backed systems.

The FileSystem connector is a high-performance, transactional sink for writing data (as Parquet or JSON files) to file systems and object stores like S3.

It's deeply integrated with Arroyo's checkpoint system for exactly-once processing. This means that even if a machine is lost or a job is restarted, the data written to S3 will be consistent and correct. Unlike other systems like Flink, it's even able to perform consistent checkpointing while in the process of writing a single Parquet file. This means that you can write larger files for better query performance while still performing frequent checkpoints.

Look out for a blog post in the near future with more details on how all of this works.

FileSystem sinks can be created in SQL via a CREATE TABLE statement like this:

CREATE TABLE bids (
  time timestamp,
  auction bigint,
  bidder bigint,
  price bigint
) WITH (
  connector ='filesystem',
  path = 'https://s3.us-west-2.amazonaws.com/demo/s3-uri',
  format = 'parquet',
  parquet_compression = 'zstd',
  rollover_seconds = '60'
);

See the docs for all of the details and available options.

Exactly-once Kafka sink

Arroyo has always supported exactly-once processing when reading from Kafka by integrating offset-tracking with its checkpoint system. In 0.5 we're adding exactly-once support for writing to Kafka as well. This enables end-to-end exactly-once processing when integrating with other systems via Kafka.

Exactly-once processing is achieved by leveraging Kafka's transactional API. When processing starts, Arroyo will begin a transaction which is used for all writes.

Once a checkpoint is completed successfully, the transaction is committed, allowing consumers to read the records. This ensures that records are only read once, even if a failure occurs.

If a failure does occur, the transaction will be rolled back and processing will restart from the last checkpoint.

Exactly-once Kafka sinks can be created in SQL via a CREATE TABLE statement by configuring the new 'sink.commit_mode' = 'exactly_once' option, for example:

CREATE TABLE sink (
  time TIMESTAMP,
  user_id TEXT,
  count INT
) WITH (
  connector ='kafka',
  topic = 'results',
  bootstrap_servers = 'localhost:9092',
  type = 'sink',
  format = 'json',
  'sink.commit_mode' = 'exactly_once'
);

There is also now a corresponding source.read_mode option for Kafka sources, which can set to read_committed to read only committed records produced by a transactional producer.

See the Kafka connector docs for more details.

  • implement exactly-once commits to Kafka sinks and read_committed reads to Kafka sources by @jacksonrnewhouse in #218

Kinesis connector

Arroyo now supports reading from and writing to AWS Kinesis data streams via the new Kinesis connector. Like the existing Kafka connector, the Kinesis connector supports exactly-once processing of records.

Kinesis sources and sinks can be created in the Web UI or via SQL, for example

CREATE TABLE kinesis_source (
  time TIMESTAMP,
  user_id TEXT,
  count INT
) WITH (
  connector ='kinesis',
  stream_name = 'my-source',
  type = 'source',
  format = 'json'
);
 
CREATE TABLE kinesis_sink (
  time TIMESTAMP,
  user_id TEXT,
  count INT
) WITH (
  connector ='kinesis',
  stream_name = 'my-sink',
  type = 'sink',
  format = 'json'
);
 
INSERT INTO kinesis_sink
SELECT * from kinesis_source;

See the Kinesis connector docs for all the available options.

Postgres sink via Debezium

Arroyo now supports writing to relational databases (including Postgres and Mysql) via Debezium.

As part of this work, we've added support for embedding JSON schemas in outputs in Kafka Connect format. This allows integration with Kafka Connect connectors that, like Debezium, require a schema.

See the Postgres connector docs for the details.

We've also improved our format system to allow for more control over how data is serialized and deserialized, for example allowing for custom date and timestamp formats. Refer to the new format docs.

  • Support sinking to relational databases via Debezium by @mwylde in #235

Session windows

Arroyo 0.5 adds support for session windows.

Visual demonstration of session windows

An example session window with gap size of 3; the window is closed when no records for a particular user are received for 3 seconds.

Unlike sliding and tumbling windows which divide time up into fixed intervals, session windows are defined by a gap in time between records. This is often useful for determining when some period of activity has finished and can be analyzed.

For example, let's take a query over user events on an ecommerce site. A user may arrive on the site, browse around, add some items to their cart, then disappear. A day later they may return and complete their purchase. With session windows we can independently (and efficiently) analyze each of these sessions.

We can create a session window using the session function, which takes as an argument that gap time:

SELECT
  session(INTERVAL '1 hour') as window,
  user_id,
  count(*)
FROM clickstream
GROUP BY window, user_id;

Idle watermarks

Partitioned sources (like Kafka or Kinesis) may experience periods when some partitions are active but others are idle due to the way that they are keyed. This can cause delayed processing due to how watermarks are calculated: as the minimum of the watermarks of all partitions.

If some partitions are idle, the watermark will not advance, and queries that depend on it will not make progress. To address this, sources now support a concept of idleness, which allows them to mark partitions as idle after a period of inactivity. Idle partitions, meanwhile, are ignored for the purpose of calculating watermarks and so allow queries to advance.

Idleness is now enabled by default for all sources with a period of 5 minutes. It can be configured when creating a source in SQL by setting the idle_micros options, or disabled by setting it to -1.

A special case of idleness occurs when there are more Arroyo source tasks than partitions (for example, a Kafka topic with 4 partitions read by 8 Arroyo tasks). This means that some tasks will never receive data, and so will never advance their watermarks. This can occur as well for non-partitioned sources like WebSocket, where only a single task is able to read data. Now sources will immediately set inactive tasks to idle.

REST API

Continuing the work started in 0.4, we are migrating our API from gRPC to REST. This release includes a number of new endpoints, and can now be used to fully manage pipelines and jobs.

For example, let's walk through creating a new pipeline:

curl http://localhost:8000/api/v1/pipelines \
  -X POST -H "Content-Type: application/json" \
  --data @- << EOF
{
  "name": "my_pipeline",
  "parallelism": 1,
  "query": "
    CREATE TABLE impulse (
      counter BIGINT UNSIGNED NOT NULL,
      subtask_index BIGINT UNSIGNED NOT NULL
     )
     WITH (
      connector = 'impulse',
      event_rate = '100'
   );
   SELECT * from impulse;",
  "udfs": []
}
EOF
 
{
  "id": "pl_W2UjDI6Iud",
  "name": "my_pipeline",
  "stop": "none",
  "createdAt": 1692054789252281,
  ...
}

Each pipeline has one or more jobs, which are the running instances of the pipeline. We can list the jobs for a pipeline:

curl http://localhost:8000/api/v1/pipelines/pl_W2UjDI6Iud/jobs | jq '.'
{
  "data": [
    {
      "id": "job_Tqkow4Uc4x",
      "runningDesired": true,
      "state": "Running",
      "runId": 1,
      "startTime": 1692054798235781,
      "finishTime": null,
      "tasks": 4,
      "failureMessage": null,
      "createdAt": 1692054789252281
    }
  ],
  "hasMore": false
}

Once we're done, we can stop the pipeline:

curl http://localhost:8000/api/v1/pipelines/pl_W2UjDI6Iud \
  -X PATCH \
  -H "Content-Type: application/json" \
  -d '{"stop": "checkpoint"}'

See all of the API docs here.

In this release most of the Web UI has been migrated from the gRPC API to the new REST API along with a number of improvements to the UI.

Improvements

Fixes

The full change-log is available here