Blog

Updates from the Arroyo team

Announcing Arroyo 0.12.0

Arroyo 0.12 is now available! This release introduces Python UDFs, Protobuf ingestion, JSON syntax, custom state TTLs, and many other features, improvements, and fixes.

Micah Wylde

Micah Wylde

CEO of Arroyo

The Arroyo team is thrilled to announce that Arroyo 0.12.0 is now available! This release introduces Python UDFs, which allow Python developers to extend the engine with custom functions, callable from SQL. We've also added support for Protobuf as an ingestion format, new JSON syntax, custom state TTLs for updating SQL queries, among many other features, improvements, and fixes.

Excited to try things out? Getting started is easier than ever with new native packages for Linux and MacOS, complementing our existing Docker images and Helm chart.

$ curl -LsSf https://arroyo.dev/install.sh | sh
$ arroyo cluster

Release 0.13.1

|

January 7, 2024

Apache 2.0 License

Arroyo is a community project, and we're very grateful to all of our contributors. We are particularly excited to welcome four new contributors to the project in this release:

Thanks to all of our contributors for this release:

And now, all of the details on what's new in Arroyo 0.12!

Table of contents

Features

Python UDFs

Arroyo has long supported user-defined functions (UDFs), allowing users to extend the engine by writing new scalar, aggregate, and async functions. We've been amazed by the diversity of UDFs that our users have come up with, including

  • Parsers for custom formats
  • Ad-hoc joins with other databases
  • Calling AI inference APIs
  • Sinks to other data systems
  • Integrating specialized financial libraries

Among many other use cases. But until now, Arroyo only supported UDFs written in Rust. We love Rust, but we know it's not the most popular (or second, or third, or...) language for data users.

So in 0.12, we're thrilled to support UDFs written in Python.

It looks like this

from arroyo_udf import udf
 
@udf
def levenshtein(s1: str, s2: str) -> int:
    if len(s1) < len(s2):
        return levenshtein(s2, s1)
 
    if len(s2) == 0:
        return len(s1)
 
    previous_row = range(len(s2) + 1)
 
    for i, c1 in enumerate(s1):
        current_row = [i + 1]
        for j, c2 in enumerate(s2):
            insertions = previous_row[j + 1] + 1
            deletions = current_row[j] + 1
            substitutions = previous_row[j] + (c1 != c2)
            current_row.append(min(insertions, deletions, substitutions))
        previous_row = current_row
 
    return previous_row[-1]

which can then be used in SQL

SELECT levenshtein(username, email) as distance
from events;

Python UDFs take a series of arguments, each of which can be called with a SQL column or literal. The argument types and return type are determined by the function signature and type hints, including support for Optional to indicate how nullability should interact with the UDF.

We've also updated the Web UI to add a Python UDF editor:

What we're releasing in 0.12 is just the start. In our next release, we will add support for Python UDAFs, as well as direct PyArrow support for high-performance Python integrations without deserialization or copying overhead.

For more details on Python UDFs, see the documentation.

We're excited to see what you build with Python UDFs!

Protobuf

Protocol buffers—better known as protobuf—is a fast, space-efficient binary data format that's commonly used in event pipelines. And as of Arroyo 0.12, it's now natively supported as an ingestion format, along with support for reading protobuf schemas from Confluent Schema Registry.

This expands on our existing set of formats, including JSON, Avro, and raw string and bytes.

All protobuf features are supported, including optionals, lists, structs, enums, and imports.

Here's how it works when providing a schema directly:

See the full format documentation here.

  • Protobuf deserialization support by @mwylde in #715
  • Add confluent schema registry support for protobuf by @mwylde in #724
  • Add protoc to docker image for protobuf support by @mwylde in #725

JSON syntax

Sometimes you don't have a nice, proper schema for the JSON flowing through your data pipelines (it's ok, we've all been there). Arroyo still has you covered, with unstructured JSON fields (type JSON). And now the experience is even better, thanks to a suite of new JSON functions and integration of Postgres-style JSON syntax.

It looks like this:

CREATE TABLE events (
  value JSON
) WITH (
  connector = 'kafka',
  bootstrap_servers = 'kafka:9092',
  topic = 'events',
  format = 'json',
  type = 'source',
  'json.unstructured' = 'true'
);
 
SELECT
  -- using the json_get function
  json_get(value, 'user', 'name')::TEXT as name,
  -- or using the -> operator
  value->'user'->'email' as email,
  -- field presence check can be done with the ? operator
  value ? 'id' as has_id
FROM events;

There are several ways to access JSON fields:

  • json_get(json: str, *keys: str | int) takes a JSON-encoded string and a series of keys to traverse, returning a partially-parsed JSON value that can be further processed without needing to be re-parsed
  • The Postgres-style -> operator is a synonym for json_get, and can be efficiently chained
  • json_get_{str|int|bool|float|json}(json: str, *keys: str | int) is a set of convenience functions that return the JSON value as a specific type
  • The --> operator is a synonym for json_get_str
  • SQL type casts can also be used with json_get to get an output of the desired type, like json_get(value, 'a')::INT

We've also added a couple other handy functions for working with JSON:

  • json_contains(json: str, *keys: str | int) (aliased to the ? operator)
  • json_length(json: str, *keys: str | int) -> int

Under the hood, these new functions use the ultra-fast JSON parser jiter and deserialize data into an efficient parsed format, avoiding the need the repeatedly re-parse data to access multiple fields.

See the json function docs for more detail.

  • Add new json functions from datafusion-functions-json by @mwylde in #640

Custom State TTLs

Arroyo has two intersecting streaming SQL semantics, which we call dataflow SQL and updating SQL. Dataflow SQL is based around time-oriented windows, which encode a notion of completeness via watermark-based processing. In other words, for a particular window, the watermark tells us that we can process it and then drop the data for that window from our state.

But updating semantics have no in-built notion of completeness. These are queries like

SELECT user, count(*)
FROM events
GROUP BY user;

The semantics of the query are that, for every user, we should be able to output the complete count of their events going back to the beginning of time. But it's generally intractable in a streaming system to actually keep all of the data for all time without blowing up our state.

To make these queries tractable, Arroyo supports a TTL (time-to-live) for updating state, which controls how long we will keep data around after seeing a particular key (like the user/count pair in that example). In previous Arroyo releases this was configurable only at the cluster level, but now it can be modified at a per-query level with SET updating_ttl.

So if we want to keep the state around for longer, we can write:

SET updating_ttl = '7 days';
 
SELECT user, count(*)
FROM events
GROUP BY user;

IRSA support

AWS has a powerful (and achingly complex) system for managing permissions across services called IAM. Kubernetes has a completely different access-control system based on roles and service accounts.

So...how do you manage permissions when running a Kubernetes cluster on AWS? For example if you wanted to run a certain stream processing engine that, perhaps, needs to access an S3 bucket?

The answer is IAM Roles for Service Accounts (IRSA), a predictable mouthful of an acronym from the marketing folks who brought you the streaming service called Kinesis Data Analytics for Apache Flink.

But naming aside, IRSA lets you attach an IAM role to a Kubernetes service account. And in 0.12, it's now fully supported in Arroyo. This provides a secure, granular way to control Arroyo's access to your AWS resources.

Setting this up is relatively simple on the AWS side. From the Arroyo side, you will just need to set a custom service account in your Helm configuration, for example

serviceAccount:
  create: false
  name: arroyo-pod-sa
 
role:
  create: false

See the docs for a full walk-through.

  • Add support for IRSA authentication for S3 by @mwylde in #694

Kafka producer metadata

Messages produced to Kafka include data—a sequence of bytes typically encoded in a format like JSON, Avro, or Protobuf—as well a metadata, including a timestamp and key. The Arroyo Kafka connector now supports setting metadata fields to support more powerful integrations. For example:

create table sink (
    timestamp TIMESTAMP NOT NULL,
    user TEXT,
    event TEXT
) with (
    connector = 'kafka',
    bootstrap_servers = 'localhost:9092',
    format = 'json',
    'sink.timestamp_field' = 'timestamp',
    'sink.key_field' = 'user',
    type = 'sink',
    topic = 'events'
);
 
 
insert into sink
select timestamp, user, event
from source;
  • Add support for setting the key and timestamp for messages written to kafka by @mwylde in #698

SQL improvements

This release brings various SQL improvements, including:

  • Support for escaped-string literals, like line1\nline2
  • Adding the array_resize function
  • Supporting stride in array_slice, and changing indices to be 1-based
  • Adding ends_with and instr string functions
  • Adding the array_reverse function
  • Adding the position function for strings
  • Adding nvl2 function, which returns the second argument if the first is not null

Other improvements

  • Allow cluster-domain for services to be configured in values.yaml by @jr200 in #677
  • Statically link libsasl for kafka by @mwylde in #680
  • Add option to set raw_datums in avro schema editor by @mwylde in #685
  • UI: open docs in a new tab by @haoxins in #683
  • Upgrade to DF 40 and Arrow 52 by @mwylde in #702
  • Improve robustness of SSE source to server EOFs by @mwylde in #711
  • Sync logging by @mwylde in #718
  • Switch to jemalloc by @mwylde in #719
  • Allow setting worker environment variables in kubernetes by @mwylde in #721
  • Validate the regex pattern field of file system source by @haoxins in #722
  • Enhance the field description of the file system connector by @haoxins in #726
  • Add validation when json format has unstructured flag enabled by @tiagogcampos in #735
  • Add line number and file info in log message by @zhuliquan in #709
  • Upgrade kinesis dependency to avoid multiple AWS client versions by @mwylde in #747
  • Show the finished jobs in the dashboard by @haoxins in #727

Fixes

  • Fix helm chart deploys when pullsecrets is empty by @mwylde in #676
  • Use an inner join to prevent controller from loading partially-constructed jobs by @mwylde in #681
  • Fix pending requests in preview that could stall out the webui by @mwylde in #684
  • Fix regression in updating equi-join optimization by @mwylde in #686
  • Fix checkpoint cleanup failure (#688) by @mwylde in #689
  • Use named ports in Helm configuration (#682) by @mwylde in #690
  • Use correct (relative) delta paths when writing to object stores by @mwylde in #693
  • Clean up local backup files for pipeline clusters by @mwylde in #706
  • Fix race condition that could stall scheduling by @mwylde in #712
  • Enable crypto_expressions feature for datafusion-functions by @mwylde in #713
  • Update DF fork to f2792f6d for metrics memory leak fix by @mwylde in #717
  • Fix recursion limit reached decode error by @zhuliquan in #716
  • Fix state cleanup when checkpoint location is a non-root directory in object store by @mwylde in #731
  • Upgrade parquet version to fix checkpoint failure regression by @mwylde in #739

Project infra and tooling

  • Update pnpm action to fix CI failure by @mwylde in #675
  • Add experimental_allow_proto3_optional compile option by @zhuliquan in #704
  • GitHub actions binaries by @MarcoLugo in #720
  • Move binary builds to buildjet and add arm build by @mwylde in #732
  • Cargo clippy and allow lint clippy:to_string_trait_impl by @zhuliquan in #737
  • Run clippy after build to speed up CI by @mwylde in #740

Full Changelog: https://github.com/ArroyoSystems/arroyo/compare/v0.11.0...v0.12.0