感谢 Abhijit Sarkar 的回答指引。
我需要下载一个庞大的 JSON 流并将其分成易于处理的小数据流。该 JSON 由具有大属性的对象组成:这些大属性可以序列化到文件中,从而从未解组的 JSON 对象中删除。
另一个用例是逐个对象下载 JSON 流,像 map/reduce 算法一样对其进行处理并生成单个输出,而无需加载整个流到内存中。
还有另一个用例是读取一个大型 JSON 文件,并根据条件仅选择少量对象,同时解组为普通的 Java 对象。
下面是一个示例:我们想要流式传输一个非常巨大的 JSON 文件,它是一个数组,并且我们只想检索数组中的第一个对象。
给定位于服务器上的此大文件,可在
http://example.org/testings.json获取:
[
{ "property1": "value1", "property2": "value2", "property3": "value3" },
{ "property1": "value1", "property2": "value2", "property3": "value3" },
... 1446481 objects => a file of 104 MB => take quite long to download...
]
这个JSON数组的每一行都可以被解析为这个对象:
@lombok.Data
public class Testing {
String property1;
String property2;
String property3;
}
你需要这个类来使解析代码可重用:
import com.fasterxml.jackson.core.JsonParser;
import java.io.IOException;
@FunctionalInterface
public interface JsonStreamer<R> {
R stream(JsonParser jsonParser) throws IOException;
}
并且这个类用于解析:
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import lombok.AllArgsConstructor;
import org.springframework.http.HttpInputMessage;
import org.springframework.http.HttpOutputMessage;
import org.springframework.http.MediaType;
import org.springframework.http.converter.HttpMessageConverter;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@AllArgsConstructor
public class StreamingHttpMessageConverter<R> implements HttpMessageConverter<R> {
private final JsonFactory factory;
private final JsonStreamer<R> jsonStreamer;
@Override
public boolean canRead(Class<?> clazz, MediaType mediaType) {
return MediaType.APPLICATION_JSON.isCompatibleWith(mediaType);
}
@Override
public boolean canWrite(Class<?> clazz, MediaType mediaType) {
return false;
}
@Override
public List<MediaType> getSupportedMediaTypes() {
return Collections.singletonList(MediaType.APPLICATION_JSON);
}
@Override
public R read(Class<? extends R> clazz, HttpInputMessage inputMessage) throws IOException {
try (InputStream inputStream = inputMessage.getBody();
JsonParser parser = factory.createParser(inputStream)) {
return jsonStreamer.stream(parser);
}
}
@Override
public void write(R result, MediaType contentType, HttpOutputMessage outputMessage) {
throw new UnsupportedOperationException();
}
}
接下来是用于流式HTTP响应、解析JSON数组并返回仅第一个未解组对象的代码:
JsonFactory jsonFactory = new JsonFactory();
ObjectMapper objectMapper = new ObjectMapper();
RestTemplateBuilder restTemplateBuilder = new RestTemplateBuilder();
RestTemplate restTemplate = restTemplateBuilder.detectRequestFactory(false).messageConverters(
new StreamingHttpMessageConverter<>(jsonFactory, jsonParser -> {
if (!jsonParser.isClosed() && jsonParser.nextToken() == JsonToken.START_ARRAY) {
if (!jsonParser.isClosed() && jsonParser.nextToken() == JsonToken.START_OBJECT) {
return objectMapper.readValue(jsonParser, Testing.class);
}
}
return null;
})
).build();
final Testing firstTesting = restTemplate.getForObject("http://example.org/testings.json", Testing.class);
log.debug("First testing object: {}", firstTesting);