My day job is working on the Arroyo stream processing engine, which executes complex, stateful queries over high-scale streams of events. Computing things like windowed aggregates, stream joins, and incrementally-computed SQL involves, as you might imagine, a lot of sophisticated algorithms and systems. Doing it much faster than existing systems like Apache Flink took careful performance engineering at every level of the stack.
But…I'm going to let you in on a little secret. The sad truth of this industry, which no one will tell you, is that so, so many data pipelines spend the bulk of their CPU time…deserializing JSON1.
That's right. Everybody's favorite human-readable and sorta-human-writable data interchange format has been responsible for >50% of CPU usage on several high-scale Flink pipelines I've worked on. And so if you want to build a fast stream processing engine, you need to start with a fast JSON deserialization strategy2.
Arroyo's internal data representation is built on top of Arrow, a columnar in-memory format designed for fast analytical processing3. So our challenge is to take row-oriented JSON; parse it, and build out columns from the corresponding field in each document. And do this as fast as possible.
Fortunately, we don't have to start from scratch: the arrow-rs project includes a very fast JSON decoder, largely written by Raphael Taylor-Davies. I found the approach quite interesting and thought it was worth writing up. So we'll dig into how arrow-json works and then talk through Arroyo's extensions to support raw JSON—leaving parts of the JSON serialized—and the ability to skip records in a batch that don't match the schema. Along the way you should pick up some intuition for how to work with columnar data, which many engineers find challenging and unfamiliar in our typically row-oriented world.
Table of contents
- But is this fast?
- Batched decoding
- Stealing from the best
- Run the tape
- Column building
- Streaming extensions
- Wrapping up
But is this fast?
Before we get into the details, we might want to answer the question…is this actually fast? And the answer is: yes! Benchmarking against the Jackson-based deserializer used for Flink SQL, arrow-json is up to 2.3x faster:
Benchmark | Java (µs/record) | Rust (µs/record) | Speedup |
---|---|---|---|
Nexmark | 0.578 | 0.586 | -0.14% |
Bids | 0.517 | 0.396 | 30.6% |
Logs | 0.609 | 0.504 | 20.8% |
Tweets | 11.730 | 5.108 | 229% |
(See https://github.com/ArroyoSystems/json-benchmarks for the benchmark source code and test documents.)
Arrow-json does particularly well with large, deeply nested records, and poorly with enum-like structures where there are many null fields.
Batched decoding
Arrow, like other columnar formats, operates in batches. We're almost never dealing with just one record (or “row”) of data, because columnar formats achieve their high performance by storing many values together in a single array. If you just have one row, the columnar overhead eats any performance gains.
So our goal is to take a bunch of JSON documents (generally with a top-level object) and turn them into a set of arrays representing each column of the schema.
To make this concrete, let's take data representing HTTP server logs:
Field Name | Data Type |
---|---|
ip | Utf8 |
identity | Utf8 |
user_id | Utf8 |
timestamp | Timestamp |
date | Utf8 |
request | Utf8 |
code | UInt32 |
size | UInt64 |
An example document looks like this:
{
"ip": "131.62.139.111",
"identity": "-",
"user_id": "jimmy_voluptas",
"timestamp": "2025-02-19T09:15:21.884163-08:00",
"request": "POST /tmp/high/own.zip",
"status_code": 404,
"size": 4980,
"referer": "-",
"user_agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
}
And we'll be decoding each column into arrays like this:
ip | 137.100.60.16 | 53.249.253.103 | 118.64.108.216 | 113.243.167.149 | 133.131.152.130 | 121.87.92.182 |
---|---|---|---|---|---|---|
identity | - | - | - | - | - | - |
user_id | deven_qui | kiara_nihil | eladio_cupidita | lucile_tempora | rae_et | enrico_dolorum |
timestamp | 2025-02-19T09:15:22.281400-08:00 | 2025-02-19T09:15:22.303590-08:00 | 2025-02-19T09:15:22.325585-08:00 | 2025-02-19T09:15:22.347106-08:00 | 2025-02-19T09:15:22.369079-08:00 | 2025-02-19T09:15:22.391311-08:00 |
request | GET /sbin.tar | GET /var/problem/first.zip | GET /part/public.ppt | GET /etc/harper/man/few.xls | GET /usr/aria/elijah/man.rar | GET /var/week/life.xls |
status_code | 401 | 500 | 403 | 200 | 500 | 403 |
size | 9512 | 6965 | 9109 | 2886 | 4592 | 706 |
referer | - | - | - | - | - | - |
In general, operations are much more efficient if we can perform them along the grain of columns; in other words we want to decode all of the ips, then all of the identities, then all of the user_ids, as this allows us to avoid repeatedly downcasting (interpreting as a concrete type) our generically-typed arrays.
Stealing from the best
To do so, arrow-json uses an idea from simdjson, one of
the fastest JSON decoders out there4. While most JSON libraries parse
(understanding the structure of the document) and deserialize (converting to a
tree representation) as a single step, simdjson uses two passes. In the first
pass, it identifies all of the structural tokens ([
, {
, ]
, }
,:
, ,
)
and stores their locations to a separate array5. In the second pass each
previously-identified node is parsed into the tree. For simdjson this strategy
enables SIMD (single-instruction multiple data) processing to quickly identify
the token locations, then allows parallelizing the tree constructions because
fields can be handled without needing to finish previous fields.
It turns out this structure is quite helpful when doing columnar parsing as well, as we can use the output of the first pass to deserialize entire columns at once, as we start knowing where all of the fields corresponding to that column are in the buffer.
We do need to make some changes to the simdjson approach to support efficiently operating over many JSON documents. The arrow-json interface looks like this:
impl Decoder {
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
}
This is a function on the Decoder struct (which stores the state of the in-progress batch) which takes a byte slice—an immutable view of a sequence of bytes—then returns either the number of processed bytes or an error if the JSON was malformed.
The API supports streaming. We can just keep piping in bytes representing
potentially partial JSON documents without having to frame them (figure out
where one ends and the next begins). When we're ready to consume the outputs, we
call flush
which returns an Arrow RecordBatch
(a series of columns with a
schema) with all of the data we'd sent to decode
.
As we're just given slices (which are references to data we don't own) and we're deferring the actual deserialization to the flush call, we need to store the underlying data ourselves in order to reference it in the second pass.
Run the tape
With that background, we can describe the arrow-json deserialization algorithm. We'll start with the first pass, where we parse the JSON into a flattened “tape” data structure, containing an array of structural components. While we process bytes, we're building up three data structures
tape: Vec<TapeElement>
, a list of structural elements in the data, like{
,}
,[
, as well as strings, numbers, and literals etc. Structural elements store an int identifying their opposite element (so a{
points to the index of the corresponding}
), while strings and numbers are stored as indices into theoffsets
arraybytes: Vec<u8>
, a buffer containing raw string data and numeric literals (as we don't have access to the original inputs when it comes time to building the RecordBatch, we have to store the raw data ourselves)offsets: Vec<usize>
, a list of offsets into thebytes
buffer, pointing to where individual elements begin (and, by looking at the next offset, where they end).
We also keep a stack of our parse state, as in typical recursive-descent parsing, and use a state machine to drive the parsing process.
To start decoding, we push a Value
token onto the parse stack, which indicates
we're looking for a valid JSON Value—one of a null
, true
, or false
literal, a string, a number, an array, or an object; these are all
distinguishable by looking at a single character. Whichever we find, the
corresponding state is pushed onto the stack and on the next iteration of the
decode loop we will begin to decode that value.
To help understand how this proceeds, we'll walk through a few value parsing strategies.
Strings
A JSON string starts with a double-quote "
, then continues through any valid
unicode codepoints until an unescaped closing "
. It may also contain special
characters and arbitrary unicode via escapes (\b
, \f
, \n
, \r
, \t
,
\\
, \"
, \uXXXX
). We search for the next double-quote or backslash (using a
SIMD-optimized memchr function), and
push those bytes onto our buffer.
We then look at which byte we matched. If it's a backslash, we move into the
escape state, decoding the escaped value and pushing it into our buffer. If it's
a double quote, we're done. We push buffer.len()
into the offsets list to
indicate where this string ends, then a String element onto the tape, along with
the second-to-last index of the offset array, which marks the start.
Lists
List decoding starts when we encounter a
[
token. We push StartList
to the tape, then we skip ahead to the next non-whitespace, non-comma token, and check if it's a ]
.
If so, we're done with the list. We push an EndList
to the tape, and update
the corresponding StartList
to point to the index of the EndList
. Otherwise,
we begin value parsing for the list element.
An example
To see how this works, we can walk through the example of decoding this log line:
{"ip":"34.127.44.91","identity":"-","user_id":"carmela_enim",
"timestamp":"2025-02-19T09:15:21.839430-08:00",
"request":"GET /sbin/early.csv","status_code":401,"size":3833,"referer":"-"}
We start, as always, in the Value
state. We find the first token is a {
,
moving us to the Object
state, causing us to push StartObject(MAX_INT)
to
the tape. Then we encounter the "
, moving to the String
state. We search for
the next "
or \\
, finding a"
. We push the bytes between the quotes (ip
)
to the buffer, push 2
onto the offsets array, and String(0)
to the tape.
From there we continue through the various parse rules until we reach the }
;
we push EndObject(1)
and update the initial StartObject
from the placeholder
MAX_INT to point to the end index (18).
When fully parsed, we end up with the following datastructures:
Column building
Once we have the tape built we can perform the second pass, which builds the final columns of Arrow arrays.
We start by constructing a tree of decoders according to our schema. Each decoder is responsible for implementing a decode function:
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32])
-> Result<ArrayData, ArrowError>
(if you're not familiar with Rust definitions, this is a method, taking a mutable reference to the struct—self in Rust terms, equivalent to Java's this—an immutable reference to the tape datastructure we built previously, and an immutable slice of an array of ints, and returning either an error or ArrayData).
This might look a bit funny—why are we getting an array of positions? This is where the the columnar part of this comes in, and much of the performance justification for the process of building up the tape before deserializing. Because we've identified all of the components of the document and we know the schema, we can determine up-front the tape indices of all of the data that will go into a particular array—meaning we can process them all at once, in a tight, efficient loop, and only needing to downcast the array a single time.
What does this look like? In our example, the top-level decoder is a StructArrayDecoder, containing as fields all of the fields of our top-level object:
StructArrayDecoer {
decoders: [
StringArrayDecoder { .. }, // ip
StringArrayDecoder { .. }, // identity
StringArrayDecoder { .. }, // user_id
TimestampArrayDecoder { .. }, // timestamp
StringArrayDecoder { .. }, // request
PrimitiveArrayDecoder { .. } // status_code
PrimitiveArraydecoder { .. } // size
StringArrayDecoder { .. } // referer
]
}
Struct decoding
Decoding starts by finding all of the tape positions of our top level value. If this is a primitive value, like a string or a number, this is trivial—every element in our tape will be our primitive.
In the more common case that this is a struct, we make use of the metadata we stored along with the StartObject tape element—the tape position of the corresponding EndObject element. So to find all of the StartObjects, we can use this simple algorithm:
let mut cur_idx = 1; // The first offset is a null
let mut pos = vec![];
for _ in 0..tape.num_rows() {
offsets.push(cur_idx);
match tape.elements[cur_idx] {
TapeElement::StartObject(end_idx) => {
cur_idx = end_idx + 1;
}
TapeElement::Null => {
cur_idx += 1;
}
..
}
}
With our top-level positions found, we can call the decode
method for our
StructArrayDecoder, which must then find the positions array for each struct
field. There are three potential complications here6:
- Expected fields may be missing
- Additional fields may be present
- We might have a
null
literal instead of an object
For each struct position, then, we first determine whether it's an object or Null by querying the tape. If it's Null, we just append that to a null buffer7 and continue on. If it's an object, we go field-by-field by iterating through every tape element between the object start and object end. Starting at the next element, we read it as a String (if it's not, this is not a valid JSON document—since Object keys must be strings—and we can exit).
We look up the key in our schema and find the corresponding field, and add the next tape element to its position array, then advance to the next field.
Once we've found the position arrays for all our fields, we simply pass them
into the decode
method on each of our child decoders, producing arrays for
each, which we can construct into our StructArray according to the
Arrow Struct layout.
Primitive and String decoding
For non-nested types like primitives (nulls, ints, and floats) and strings, decoding is much simpler. We simply iterate through the positions array, looking up each element in the tape, and parsing it as the corresponding Arrow type, and pushing it into our array (which we are able to construct and size up front, since we know how many elements there will be). A bit of complexity creeps in here, as JSON typing can be a bit…loose…in practice. So if we're expecting a number and find a string, we'll attempt to parse it as a number anyways8.
If you remember the tape encoding from earlier, there isn't much distinction between strings and numbers—they're both stored as bytes in our buffer, copied directly from the input. The tape element points to an index in our offset array, which then indexes into our buffer. So numeric decoding in either case looks like this:
match tape.get(pos) {
TapeElement::String(idx) => {
let s = tape.get_string(idx);
let value = P::parse(s).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",))
})?;
builder.append_value(value)
}
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
let value = ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {s} as {d}",))
})?;
builder.append_value(value)
}
...
}
with Tape::get_string
implemented something like
fn get_string(&self, idx: u32) -> &str {
self.buffer[self.offsets[idx]..self.offsets[idx + 1]
}
Once we've parsed each of our positions, we build and return the final array.
Streaming extensions
So that's how the standard arrow-json decoder works. We parse the input text into a flattened tape data structure, find the positions of each row, then build up each Arrow array in one go.
But for Arroyo, we found we needed a couple of additional features to support streaming data ingestion, as event data is often messier than properly schematized and cleaned batch data.
Raw JSON
Decoding JSON into Arrow can provide some impedance mismatch: JSON is, by itself, untyped, unschematized, and pretty much an anything goes sort of data serialization format. Attempts to add structure to JSON—like JSON Schema—end up with incredible complexity, with features like conditionals, pattern properties, and self-modifying schemas. We're never going to be able to support all of that in a relatively rigid typesystem like Arrow. In other cases, the data fundamentally does not follow a schema, like an unstructured “properties” field.
So sometimes you just need an escape hatch, the ability to throw up your hands and say, sure, just give us whatever you got. In Arroyo, we call that raw JSON—a special Arrow extension type that will accept any valid JSON value and store it as an encoded string.
How do we efficiently decode arbitrary JSON into a string? The tape decoding approach actually makes this quite simple. In each case, we are building up a string. We have a few cases to handle depending on what kind of TapeElement we find:
- Literals (true/false/null): we write that literal to the string
- Strings: we push
"
, followed by the contents of the buffer corresponding to the string, followed by"
- Numbers: we push the contents of the buffer corresponding to that number
- Lists: we push
[
, then go element by element, recursively calling our decode method, pushing,
(except for the last one), then]
- Objects: we first push
{
, go two elements at a time, first decoding the key, pushing:
, then the value pushing,
(except for the last one), then}
Once we've reached the end of the element we end up with a compact representation of the original JSON (notably missing the original whitespace, which is lost during the tape building process).
Invalid data handling
A reality of JSON data pipelines is that the JSON in them is not always exactly as it's supposed to be. Some application adds a field, emits a new type of event, or drops a required field and now there's bad data in our Kafka topic. So Arroyo, like other streaming engines, offers a “bad data” handling setting, that allows users to ignore incorrect data rather than failing the pipeline.
For row-based decoding this is trivial to implement—you try to decode a row, and that fails you just move on. As an application, you don't need any special support from the decoder. But for columnar decoding, we process an entire batch of records at once, and a single bad document will cause us to lose the entire batch. Adding in the ability to handle fine-grained errors requires modifying the decoder itself.
We'll consider two ways our input JSON might be invalid, corresponding to our
two passes. The first is if the JSON fails to parse—for example, there's a {
without a corresponding }
, an unterminated string, or an illegal escape
sequence. These errors we handle when passing bytes to the TapeDecoder. For
example, if we start parsing an object we expect to find the next character is a
"
—if it's not we'll return an error.
{
"ip": "131.62.139.111,
"identity": "-"
}
Once an error has occurred, we roll back the state of the tape to where it was when we started parsing the current record. This requires storing the indices of our three arrays when we start each new record, and unwinding the stack.
The second type of error occurs when we receive valid JSON which mismatches the schema. How strict we are about validation is configurable (users can choose to allow extra fields, to coerce primitives, and mark fields as nullable), but there are some errors that will always fail decoding, like incorrect and un-coercable types
{
"ip": "131.62.139.111",
...
"size": "pretty big"
}
The columnar decoding strategy makes handling schema errors harder. In this example, we don't know the record is bad until we get to the final column, at which point we've already built all of the other columns. We can't merely skip it at this point, because that will lead to different columns being misaligned (we could mark it null, assuming the column is nullable, but that has different and undesirable semantics).
We really need to know which rows are invalid before we start constructing any of our columns. Our solution was to add an addition method to each decoder:
fn validate_row(&self, tape: &Tape<'_>, pos: u32) -> bool
This method should follow the validation logic in the actual decoder, returning true for valid rows and false for invalid ones.
Before decoding a batch, we then call this for our root decoder with each position, determining which rows are good and which are invalid. We filter out any bad rows from our positions array, and proceed to decode with the good rows. If desired we can also return the bad JSON (possibly for logging or reporting to the user) by using the raw JSON decoder described above.
Wrapping up
So that is how Arroyo deserializes JSON into Arrow arrays. As discussed at the beginning, this is quite fast, but there are still many possible performance improvements. While writing this post, I spent some time profiling and optimizing the tape decoder and found 30-40% of performance improvements.
That PR added SIMD optimizations to a couple of operations (finding string ends and UTF-8 validation), but most of the input processing work is still done in a scalar manner. There's also room to eliminate the CPU spent building the string buffer by using the input data directly as in simdjson (albeit at the cost of extra memory usage).
The array-building pass is already quite efficient so long as our data is dense. In cases where we have many fields that are null (possibly to express an enum) we pay the cost of filling in all of those fields (and subfields, if they are objects) with null values.
Footnotes
-
Some may interject at the point that the real solution is to stop using JSON for your data pipelines. Other formats like Protobuf or Avro can be deserialized much faster, and are schematized to prevent bad data. But you try tell the 1000 developers at your company that they should spend a quarter on data format migrations. ↩
-
Columnar formats like Arrow store data as arrays of columns (for example, every user_id field will be stored together) rather than rows (storing the user_id, created_at, name, etc. fields for a single user together). They have many advantages for analytical processing. ↩
-
simdjson has a lot of other tricks up its sleeves to get its outrageous performance, as detailed in this highly-readable VLDB paper ↩
-
This is significantly more complex than it sounds, because structural tokens may be contained within quotes—in which case they're no longer structural—but quotes may also be escaped. ↩
-
If all fields are non-nullable and we're operating in “strict” mode where additional fields are not allowed, we could just compute the fixed offsets, like every field congruent to 1 mod 8 is
identity
↩ -
Arrow uses a dedicated bitmap, the “null” or “validity” buffer, to indicate whether the corresponding element in the data buffer is null; this allows us to store, for example, primitive ints without needing to reserve a special null value. ↩
-
In fact, because many JSON encoders use JavaScript numeric types and lack support for full 64-bit integers, string encoding is the only reliable way to pass large numbers around ↩