Blog

Updates from the Arroyo team

Fast columnar JSON decoding with arrow-rs

JSON is the most common serialization format used in streaming pipelines, so it pays to be able to deserialize it fast. This post covers in detail how the arrow-json library works to perform very efficient columnar JSON decoding, and the additions we've made for streaming use cases.

Micah Wylde

Micah Wylde

CEO of Arroyo

A Fox juggling JSON tokens

My day job is working on the Arroyo stream processing engine, which executes complex, stateful queries over high-scale streams of events. Computing things like windowed aggregates, stream joins, and incrementally-computed SQL involves, as you might imagine, a lot of sophisticated algorithms and systems. Doing it much faster than existing systems like Apache Flink took careful performance engineering at every level of the stack.

But…I'm going to let you in on a little secret. The sad truth of this industry, which no one will tell you, is that so, so many data pipelines spend the bulk of their CPU time…deserializing JSON1.

That's right. Everybody's favorite human-readable and sorta-human-writable data interchange format has been responsible for >50% of CPU usage on several high-scale Flink pipelines I've worked on. And so if you want to build a fast stream processing engine, you need to start with a fast JSON deserialization strategy2.

Arroyo's internal data representation is built on top of Arrow, a columnar in-memory format designed for fast analytical processing3. So our challenge is to take row-oriented JSON; parse it, and build out columns from the corresponding field in each document. And do this as fast as possible.

Fortunately, we don't have to start from scratch: the arrow-rs project includes a very fast JSON decoder, largely written by Raphael Taylor-Davies. I found the approach quite interesting and thought it was worth writing up. So we'll dig into how arrow-json works and then talk through Arroyo's extensions to support raw JSON—leaving parts of the JSON serialized—and the ability to skip records in a batch that don't match the schema. Along the way you should pick up some intuition for how to work with columnar data, which many engineers find challenging and unfamiliar in our typically row-oriented world.

Table of contents

But is this fast?

Before we get into the details, we might want to answer the question…is this actually fast? And the answer is: yes! Benchmarking against the Jackson-based deserializer used for Flink SQL, arrow-json is up to 2.3x faster:

BenchmarkJava (µs/record)Rust (µs/record)Speedup
Nexmark0.5780.586-0.14%
Bids0.5170.39630.6%
Logs0.6090.50420.8%
Tweets11.7305.108229%

(See https://github.com/ArroyoSystems/json-benchmarks for the benchmark source code and test documents.)

Arrow-json does particularly well with large, deeply nested records, and poorly with enum-like structures where there are many null fields.

Batched decoding

Arrow, like other columnar formats, operates in batches. We're almost never dealing with just one record (or “row”) of data, because columnar formats achieve their high performance by storing many values together in a single array. If you just have one row, the columnar overhead eats any performance gains.

So our goal is to take a bunch of JSON documents (generally with a top-level object) and turn them into a set of arrays representing each column of the schema.

To make this concrete, let's take data representing HTTP server logs:

Field NameData Type
ipUtf8
identityUtf8
user_idUtf8
timestampTimestamp
dateUtf8
requestUtf8
codeUInt32
sizeUInt64

An example document looks like this:

{
  "ip": "131.62.139.111",
  "identity": "-",
  "user_id": "jimmy_voluptas",
  "timestamp": "2025-02-19T09:15:21.884163-08:00",
  "request": "POST /tmp/high/own.zip",
  "status_code": 404,
  "size": 4980,
  "referer": "-",
  "user_agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
}

And we'll be decoding each column into arrays like this:

ip137.100.60.1653.249.253.103118.64.108.216113.243.167.149133.131.152.130121.87.92.182
identity------
user_iddeven_quikiara_nihileladio_cupiditalucile_temporarae_etenrico_dolorum
timestamp2025-02-19T09:15:22.281400-08:002025-02-19T09:15:22.303590-08:002025-02-19T09:15:22.325585-08:002025-02-19T09:15:22.347106-08:002025-02-19T09:15:22.369079-08:002025-02-19T09:15:22.391311-08:00
requestGET /sbin.tarGET /var/problem/first.zipGET /part/public.pptGET /etc/harper/man/few.xlsGET /usr/aria/elijah/man.rarGET /var/week/life.xls
status_code401500403200500403
size95126965910928864592706
referer------

In general, operations are much more efficient if we can perform them along the grain of columns; in other words we want to decode all of the ips, then all of the identities, then all of the user_ids, as this allows us to avoid repeatedly downcasting (interpreting as a concrete type) our generically-typed arrays.

Stealing from the best

To do so, arrow-json uses an idea from simdjson, one of the fastest JSON decoders out there4. While most JSON libraries parse (understanding the structure of the document) and deserialize (converting to a tree representation) as a single step, simdjson uses two passes. In the first pass, it identifies all of the structural tokens ([, {, ], } ,:, ,) and stores their locations to a separate array5. In the second pass each previously-identified node is parsed into the tree. For simdjson this strategy enables SIMD (single-instruction multiple data) processing to quickly identify the token locations, then allows parallelizing the tree constructions because fields can be handled without needing to finish previous fields.

It turns out this structure is quite helpful when doing columnar parsing as well, as we can use the output of the first pass to deserialize entire columns at once, as we start knowing where all of the fields corresponding to that column are in the buffer.

We do need to make some changes to the simdjson approach to support efficiently operating over many JSON documents. The arrow-json interface looks like this:

impl Decoder {
    pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
 
    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
}

This is a function on the Decoder struct (which stores the state of the in-progress batch) which takes a byte slice—an immutable view of a sequence of bytes—then returns either the number of processed bytes or an error if the JSON was malformed.

The API supports streaming. We can just keep piping in bytes representing potentially partial JSON documents without having to frame them (figure out where one ends and the next begins). When we're ready to consume the outputs, we call flush which returns an Arrow RecordBatch (a series of columns with a schema) with all of the data we'd sent to decode.

As we're just given slices (which are references to data we don't own) and we're deferring the actual deserialization to the flush call, we need to store the underlying data ourselves in order to reference it in the second pass.

Run the tape

With that background, we can describe the arrow-json deserialization algorithm. We'll start with the first pass, where we parse the JSON into a flattened “tape” data structure, containing an array of structural components. While we process bytes, we're building up three data structures

  • tape: Vec<TapeElement>, a list of structural elements in the data, like {, } , [, as well as strings, numbers, and literals etc. Structural elements store an int identifying their opposite element (so a { points to the index of the corresponding }), while strings and numbers are stored as indices into the offsets array
  • bytes: Vec<u8>, a buffer containing raw string data and numeric literals (as we don't have access to the original inputs when it comes time to building the RecordBatch, we have to store the raw data ourselves)
  • offsets: Vec<usize>, a list of offsets into the bytes buffer, pointing to where individual elements begin (and, by looking at the next offset, where they end).

We also keep a stack of our parse state, as in typical recursive-descent parsing, and use a state machine to drive the parsing process.

To start decoding, we push a Value token onto the parse stack, which indicates we're looking for a valid JSON Value—one of a null, true, or false literal, a string, a number, an array, or an object; these are all distinguishable by looking at a single character. Whichever we find, the corresponding state is pushed onto the stack and on the next iteration of the decode loop we will begin to decode that value.

To help understand how this proceeds, we'll walk through a few value parsing strategies.

Strings

A JSON string starts with a double-quote ", then continues through any valid unicode codepoints until an unescaped closing ". It may also contain special characters and arbitrary unicode via escapes (\b, \f, \n, \r, \t, \\, \", \uXXXX). We search for the next double-quote or backslash (using a SIMD-optimized memchr function), and push those bytes onto our buffer.

We then look at which byte we matched. If it's a backslash, we move into the escape state, decoding the escaped value and pushing it into our buffer. If it's a double quote, we're done. We push buffer.len() into the offsets list to indicate where this string ends, then a String element onto the tape, along with the second-to-last index of the offset array, which marks the start.

Lists

List decoding starts when we encounter a [ token. We push StartList to the tape, then we skip ahead to the next non-whitespace, non-comma token, and check if it's a ]. If so, we're done with the list. We push an EndList to the tape, and update the corresponding StartList to point to the index of the EndList. Otherwise, we begin value parsing for the list element.

An example

To see how this works, we can walk through the example of decoding this log line:

{"ip":"34.127.44.91","identity":"-","user_id":"carmela_enim",
"timestamp":"2025-02-19T09:15:21.839430-08:00",
"request":"GET /sbin/early.csv","status_code":401,"size":3833,"referer":"-"}

We start, as always, in the Value state. We find the first token is a {, moving us to the Object state, causing us to push StartObject(MAX_INT) to the tape. Then we encounter the ", moving to the String state. We search for the next " or \\ , finding a". We push the bytes between the quotes (ip) to the buffer, push 2 onto the offsets array, and String(0) to the tape.

From there we continue through the various parse rules until we reach the }; we push EndObject(1) and update the initial StartObject from the placeholder MAX_INT to point to the end index (18).

When fully parsed, we end up with the following datastructures:

A worked-through example showing the tape datastructure

Column building

Once we have the tape built we can perform the second pass, which builds the final columns of Arrow arrays.

We start by constructing a tree of decoders according to our schema. Each decoder is responsible for implementing a decode function:

fn decode(&mut self, tape: &Tape<'_>, pos: &[u32])
   -> Result<ArrayData, ArrowError>

(if you're not familiar with Rust definitions, this is a method, taking a mutable reference to the struct—self in Rust terms, equivalent to Java's this—an immutable reference to the tape datastructure we built previously, and an immutable slice of an array of ints, and returning either an error or ArrayData).

This might look a bit funny—why are we getting an array of positions? This is where the the columnar part of this comes in, and much of the performance justification for the process of building up the tape before deserializing. Because we've identified all of the components of the document and we know the schema, we can determine up-front the tape indices of all of the data that will go into a particular array—meaning we can process them all at once, in a tight, efficient loop, and only needing to downcast the array a single time.

What does this look like? In our example, the top-level decoder is a StructArrayDecoder, containing as fields all of the fields of our top-level object:

StructArrayDecoer {
  decoders: [
    StringArrayDecoder { .. }, // ip
    StringArrayDecoder { .. }, // identity
    StringArrayDecoder { .. }, // user_id
    TimestampArrayDecoder { .. }, // timestamp
    StringArrayDecoder { .. }, // request
    PrimitiveArrayDecoder { .. } // status_code
    PrimitiveArraydecoder { .. } // size
    StringArrayDecoder { .. } // referer
  ]
}

Struct decoding

Decoding starts by finding all of the tape positions of our top level value. If this is a primitive value, like a string or a number, this is trivial—every element in our tape will be our primitive.

In the more common case that this is a struct, we make use of the metadata we stored along with the StartObject tape element—the tape position of the corresponding EndObject element. So to find all of the StartObjects, we can use this simple algorithm:

let mut cur_idx = 1; // The first offset is a null
let mut pos = vec![];
 
for _ in 0..tape.num_rows() {
    offsets.push(cur_idx);
 
    match tape.elements[cur_idx] {
        TapeElement::StartObject(end_idx) => {
            cur_idx = end_idx + 1;
        }
        TapeElement::Null => {
            cur_idx += 1;
        }
        ..
    }
}

With our top-level positions found, we can call the decode method for our StructArrayDecoder, which must then find the positions array for each struct field. There are three potential complications here6:

  • Expected fields may be missing
  • Additional fields may be present
  • We might have a null literal instead of an object

For each struct position, then, we first determine whether it's an object or Null by querying the tape. If it's Null, we just append that to a null buffer7 and continue on. If it's an object, we go field-by-field by iterating through every tape element between the object start and object end. Starting at the next element, we read it as a String (if it's not, this is not a valid JSON document—since Object keys must be strings—and we can exit).

We look up the key in our schema and find the corresponding field, and add the next tape element to its position array, then advance to the next field.

Once we've found the position arrays for all our fields, we simply pass them into the decode method on each of our child decoders, producing arrays for each, which we can construct into our StructArray according to the Arrow Struct layout.

Primitive and String decoding

For non-nested types like primitives (nulls, ints, and floats) and strings, decoding is much simpler. We simply iterate through the positions array, looking up each element in the tape, and parsing it as the corresponding Arrow type, and pushing it into our array (which we are able to construct and size up front, since we know how many elements there will be). A bit of complexity creeps in here, as JSON typing can be a bit…loose…in practice. So if we're expecting a number and find a string, we'll attempt to parse it as a number anyways8.

If you remember the tape encoding from earlier, there isn't much distinction between strings and numbers—they're both stored as bytes in our buffer, copied directly from the input. The tape element points to an index in our offset array, which then indexes into our buffer. So numeric decoding in either case looks like this:

match tape.get(pos) {
    TapeElement::String(idx) => {
        let s = tape.get_string(idx);
        let value = P::parse(s).ok_or_else(|| {
            ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",))
        })?;
 
        builder.append_value(value)
    }
    TapeElement::Number(idx) => {
        let s = tape.get_string(idx);
        let value = ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
            ArrowError::JsonError(format!("failed to parse {s} as {d}",))
        })?;
 
        builder.append_value(value)
    }
    ...
}

with Tape::get_string implemented something like

fn get_string(&self, idx: u32) -> &str {
    self.buffer[self.offsets[idx]..self.offsets[idx + 1]
}

Once we've parsed each of our positions, we build and return the final array.

Streaming extensions

So that's how the standard arrow-json decoder works. We parse the input text into a flattened tape data structure, find the positions of each row, then build up each Arrow array in one go.

But for Arroyo, we found we needed a couple of additional features to support streaming data ingestion, as event data is often messier than properly schematized and cleaned batch data.

Raw JSON

Decoding JSON into Arrow can provide some impedance mismatch: JSON is, by itself, untyped, unschematized, and pretty much an anything goes sort of data serialization format. Attempts to add structure to JSON—like JSON Schema—end up with incredible complexity, with features like conditionals, pattern properties, and self-modifying schemas. We're never going to be able to support all of that in a relatively rigid typesystem like Arrow. In other cases, the data fundamentally does not follow a schema, like an unstructured “properties” field.

So sometimes you just need an escape hatch, the ability to throw up your hands and say, sure, just give us whatever you got. In Arroyo, we call that raw JSON—a special Arrow extension type that will accept any valid JSON value and store it as an encoded string.

How do we efficiently decode arbitrary JSON into a string? The tape decoding approach actually makes this quite simple. In each case, we are building up a string. We have a few cases to handle depending on what kind of TapeElement we find:

  • Literals (true/false/null): we write that literal to the string
  • Strings: we push ", followed by the contents of the buffer corresponding to the string, followed by "
  • Numbers: we push the contents of the buffer corresponding to that number
  • Lists: we push [, then go element by element, recursively calling our decode method, pushing , (except for the last one), then ]
  • Objects: we first push {, go two elements at a time, first decoding the key, pushing :, then the value pushing , (except for the last one), then }

Once we've reached the end of the element we end up with a compact representation of the original JSON (notably missing the original whitespace, which is lost during the tape building process).

Invalid data handling

A reality of JSON data pipelines is that the JSON in them is not always exactly as it's supposed to be. Some application adds a field, emits a new type of event, or drops a required field and now there's bad data in our Kafka topic. So Arroyo, like other streaming engines, offers a “bad data” handling setting, that allows users to ignore incorrect data rather than failing the pipeline.

For row-based decoding this is trivial to implement—you try to decode a row, and that fails you just move on. As an application, you don't need any special support from the decoder. But for columnar decoding, we process an entire batch of records at once, and a single bad document will cause us to lose the entire batch. Adding in the ability to handle fine-grained errors requires modifying the decoder itself.

We'll consider two ways our input JSON might be invalid, corresponding to our two passes. The first is if the JSON fails to parse—for example, there's a { without a corresponding }, an unterminated string, or an illegal escape sequence. These errors we handle when passing bytes to the TapeDecoder. For example, if we start parsing an object we expect to find the next character is a "—if it's not we'll return an error.

{
  "ip": "131.62.139.111,
  "identity": "-"
}

Once an error has occurred, we roll back the state of the tape to where it was when we started parsing the current record. This requires storing the indices of our three arrays when we start each new record, and unwinding the stack.

The second type of error occurs when we receive valid JSON which mismatches the schema. How strict we are about validation is configurable (users can choose to allow extra fields, to coerce primitives, and mark fields as nullable), but there are some errors that will always fail decoding, like incorrect and un-coercable types

{
  "ip": "131.62.139.111",
  ...
  "size": "pretty big"
}

The columnar decoding strategy makes handling schema errors harder. In this example, we don't know the record is bad until we get to the final column, at which point we've already built all of the other columns. We can't merely skip it at this point, because that will lead to different columns being misaligned (we could mark it null, assuming the column is nullable, but that has different and undesirable semantics).

We really need to know which rows are invalid before we start constructing any of our columns. Our solution was to add an addition method to each decoder:

fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool

This method should follow the validation logic in the actual decoder, returning true for valid rows and false for invalid ones.

Before decoding a batch, we then call this for our root decoder with each position, determining which rows are good and which are invalid. We filter out any bad rows from our positions array, and proceed to decode with the good rows. If desired we can also return the bad JSON (possibly for logging or reporting to the user) by using the raw JSON decoder described above.

Wrapping up

So that is how Arroyo deserializes JSON into Arrow arrays. As discussed at the beginning, this is quite fast, but there are still many possible performance improvements. While writing this post, I spent some time profiling and optimizing the tape decoder and found 30-40% of performance improvements.

That PR added SIMD optimizations to a couple of operations (finding string ends and UTF-8 validation), but most of the input processing work is still done in a scalar manner. There's also room to eliminate the CPU spent building the string buffer by using the input data directly as in simdjson (albeit at the cost of extra memory usage).

The array-building pass is already quite efficient so long as our data is dense. In cases where we have many fields that are null (possibly to express an enum) we pay the cost of filling in all of those fields (and subfields, if they are objects) with null values.


Footnotes

  1. https://dl.acm.org/doi/10.14778/3236187.3236207

  2. Some may interject at the point that the real solution is to stop using JSON for your data pipelines. Other formats like Protobuf or Avro can be deserialized much faster, and are schematized to prevent bad data. But you try tell the 1000 developers at your company that they should spend a quarter on data format migrations.

  3. Columnar formats like Arrow store data as arrays of columns (for example, every user_id field will be stored together) rather than rows (storing the user_id, created_at, name, etc. fields for a single user together). They have many advantages for analytical processing.

  4. simdjson has a lot of other tricks up its sleeves to get its outrageous performance, as detailed in this highly-readable VLDB paper

  5. This is significantly more complex than it sounds, because structural tokens may be contained within quotes—in which case they're no longer structural—but quotes may also be escaped.

  6. If all fields are non-nullable and we're operating in “strict” mode where additional fields are not allowed, we could just compute the fixed offsets, like every field congruent to 1 mod 8 is identity

  7. Arrow uses a dedicated bitmap, the “null” or “validity” buffer, to indicate whether the corresponding element in the data buffer is null; this allows us to store, for example, primitive ints without needing to reserve a special null value.

  8. In fact, because many JSON encoders use JavaScript numeric types and lack support for full 64-bit integers, string encoding is the only reliable way to pass large numbers around