使用Circe和Akka Streams进行流式JSON解码

3

我的用例类似于这个条目,希望从JSON对象中读取一个内部的巨大数组(作为文本的多个千兆字节)。

{ "a": "...",   // root level fields to be read, separately
  ...
  "bs": [       // the huge array, most of the payload (can be multiple GB's)
    {...},
    ...
  ]
}

输入作为一个Source[ByteString,_](Akka流)可用,而我在其他地方使用Circe进行JSON解码。
我可以看到两个挑战:
1.以流式方式读取bs数组(获取一个Source[B,_]来消耗它)。 2.将原始流分成两部分,以便在数组开始之前读取和分析根级字段。
你有解决这种用例的指针吗?我迄今为止已经检查了akka-stream-jsoncirce-iterateeakka-stream-json看起来像是合适的工具,但是没有得到很好的维护。 circe-iteratee似乎没有与Akka Streams集成。
2个回答

1

Jawn有一个异步解析器:https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala

但由于JSON的顺序来源,编写高效的异步解析器很难。

如果您可以切换到同步解析,则可以使用jsoniter-scala-core并编写一个简单的自定义编解码器,该编解码器将跳过除“bs”之外的所有不需要的键/值对,然后快速解析所需数据,而无需在内存中保留或数组内容。


1
akka-stream-json 基于 Jawn 构建。我面临的问题是读取一个大于 5GB 的 JSON 文件,其中大部分是单一数组。 - akauppi

0

我可以看出这需要一个全新的库,用于流式JSON解码。

类似这样的:

case class A(a: Int, bs: Source[B,_])

val src: Source[ByteString,_] = ???
src.as[A]

我的临时解决方案是通过使用 jqsed 来“调整” JSON,以便每个 B 都在自己的行上。这样,我就可以逐行消耗源代码并单独解码每个 B

以下是 Bash 脚本(不保证可用):

#!/bin/bash

arrKey=$1
input=$2

head -n 1 $input | sed s/.$//
jq -M -c ".$arrKey|.[]" $input | sed s/$/,/
echo "]}"

它确实依赖于某些事情,例如非数组内容始终位于第一行(它们就是这样)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接