At Arroyo we're building a new stream processing engine to replace legacy Java systems like Flink and KSQL. So I was excited to see a project that's doing the same thing for Kafka.
It's called WarpStream, and they're building a replacement for Kafka that's backed directly by S3. It has a Kafka compatible API so naturally I wanted to see how hard it would be to use as a source and sink in Arroyo.
Spoiler: it just works!
Here's the full query we end up writing:
create table warpstream (
timestamp BIGINT,
user_id TEXT,
page_id TEXT,
action TEXT,
event_time TIMESTAMP GENERATED ALWAYS AS (CAST(from_unixtime(timestamp * 1000000000) as TIMESTAMP)),
watermark TIMESTAMP GENERATED ALWAYS AS (CAST(from_unixtime(timestamp * 1000000000) as TIMESTAMP) - INTERVAL '5' SECOND),
) WITH (
connector = 'kafka',
bootstrap_servers = 'api-d131463b-49e7-42b5-9112-0777af9617fe.discovery.prod-z.us-east-1.warpstream.com:9092',
topic = 'demo-stream',
format = 'json',
type = 'source',
event_time_field = 'event_time',
watermark_field = 'watermark'
);
select
session('30 seconds') as window,
user_id,
sum(case when action = 'click' then 1 else 0 end) as clicks,
sum(case when action = 'scroll' then 1 else 0 end) as scrolls,
sum(case when action = 'hover' then 1 else 0 end) as hovers
from warpstream
group by window, user_id;