JSON Kotlin streaming library
The JSON parser in the kjson-core library takes its input from a string of
JSON text in memory.
This pattern of operation does not suit all applications; there is often a requirement to parse incoming data on the fly
– that is, to accept a stream of characters and assemble them into JSON values (objects, arrays and primitive
elements).
The kjson-stream library provides two classes that operate in this manner:
JSONStreameraccepts characters which it uses to build a single JSON value, most often a JSON object.JSONPipelineaccepts characters which must take the form of a JSON array, and emits each member of the array, as it is completed, to a downstream receiving function.
The kjson-stream library makes use of the pipelines library.
This library provides a mechanism for working with streams of data – characters or more complex elements –
with Acceptor classes accepting a stream of data, and Pipeline classes both accepting the data and emitting a
transformed form of the data to a downstream Acceptor.
The Pipeline transformations may be one-to-one, one-to-many or many-to-one, and UTF-8 encoding or decoding provides a
good example.
UTF-8 encoding is both a one-to-one and a one-to-many transform (only the characters outside the ASCII range will
transform into more than one output byte); JSON parsing is clearly a many-to-one transform.
The JSONStreamer class assembles input characters into a single value.
For example:
val streamer = JSONStreamer()
while (true) {
val ch = reader.read() // reader is a Reader (FileReader, StringReader etc.)
if (ch < 0)
break
streamer.accept(ch)
}
val json = streamer.result // json will be of type JSONValue?One point to note is that the input to the accept function is an Int, not a Char.
This allows the JSONStreamer class to be used in a decoding pipeline with character set decoder, as follows:
val streamer = UTF8_CodePoint(JSONStreamer())
streamer.accept(inputStream) // an InputStream (FileInputStream, ByteArrayInputStream etc.)
val json = streamer.resultFor those cases where the JSONStreamer is required to read the entire contents of a Reader:
val json = JSONStreamer.parse(reader)The result is a JSONValue?, the same as would be obtained by calling JSON.parse(reader.readText()), but this
approach does not require the allocation of a String in memory to hold the entire JSON text.
To allow lenient parsing as described in the
kjson-core library, the JSONStreamer constructor and the parse()
function both take an optional ParseOptions parameter.
The JSONPipeline class expects its input to be in the form of a JSON array, and it emits each array item in turn to
the downstream Acceptor.
Like other Pipeline classes it may be constructed with the downstream Acceptor as a parameter, but a more convenient
approach is to use the pipeTo function:
val pipeline = JSONPipeline.pipeTo { processitem(it) }The lambda will be called with each array item in turn.
As with JSONStreamer, a ParseOptions object may be passed as a parameter to the constructor or the pipeTo function
if required.
Non-blocking versions of these classes are available, using the
co-pipelines library.
The JSONCoStreamer class operates in the same manner as JSONStreamer, except that the accept function is a suspend
function.
This is likely to be of little utility since the accept function does not invoke any non-blocking functions; it is
provided mainly to act as the terminal CoAcceptor in a pipeline.
The JSONCoPipeline class is much more interesting.
The downstream function which receives completed array items is called as a suspend function, meaning that each item in
the array may be processed as it arrives, in a non-blocking manner.
For example:
val pipeline = JSONCoPipeline.pipeTo {
invokeSuspendFunction(it)
}Code in a different coroutine may now send data to pipeline, and the suspend function will be invoked with each
completed array item.
The JSON Lines specification allows multiple JSON values to be specified in a single stream of
data, separated by newline (\u000a) characters.
For example, events may be logged to a file as a sequence of objects on separate lines; the alternative would be to
output a JSON array, but this would require a "]" terminator, complicating the shutdown of the process (particularly
abnormal shutdown).
{"time":"2023-06-24T12:24:10.321+10:00","eventType":"ACCOUNT_OPEN","accountNumber": "123456789"}
{"time":"2023-06-24T12:24:10.321+10:00","eventType":"DEPOSIT","accountNumber": "123456789","amount":"1000.00"}The individual items are usually objects (or sometimes arrays) formatted similarly, but that is not a requirement – the items may be of any JSON type.
The kjson-stream library includes classes to process JSON Lines input in a streaming manner.
The JSONLinesPipeline is similar to JSONPipeline, except that it expects its input to take the form of a JSON Lines
data stream rather than a JSON array.
Like JSONPipeline, it can be instantiated using a constructor with an Acceptor parameter, or by the pipeTo
function:
val pipeline = JSONLinesPipeline.pipeTo { processitem(it) }The lambda will be invoked with each individual JSON value (note that a JSON Lines item may be the keyword "null", in
which case the parameter to the lambda will be null).
JSONLinesCoPipeline is the non-blocking equivalent of JSONLinesPipeline.
It can be instantiated using a constructor with a CoAcceptor parameter, or by the pipeTo function, which in this
case takes a suspend lambda:
val pipeline = JSONLinesCoPipeline.pipeTo {
invokeSuspendFunction(it)
}The latest version of the library is 3.6, and it may be obtained from the Maven Central repository.
<dependency>
<groupId>io.kjson</groupId>
<artifactId>kjson-stream</artifactId>
<version>3.6</version>
</dependency> implementation "io.kjson:kjson-stream:3.6" implementation("io.kjson:kjson-stream:3.6")Peter Wall
2025-11-13