我认为OP是正确的。这将写入Avro记录本身,而不是如果这是一个Avro数据文件时存在的模式。
以下是Avro本身中的一些示例(如果您正在使用文件,则非常有用):
• 从JSON到Avro:DataFileWriteTool
• 从Avro到JSON:DataFileReadTool
这里有一个完整的双向示例。
@Grapes([
@Grab(group='org.apache.avro', module='avro', version='1.7.7')
])
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
String schema = '''{
"type":"record",
"namespace":"foo",
"name":"Person",
"fields":[
{
"name":"name",
"type":"string"
},
{
"name":"age",
"type":"int"
}
]
}'''
String json = "{" +
"\"name\":\"Frank\"," +
"\"age\":47" +
"}"
assert avroToJson(jsonToAvro(json, schema), schema) == json
public static byte[] jsonToAvro(String json, String schemaStr) throws IOException {
InputStream input = null;
GenericDatumWriter<GenericRecord> writer = null;
Encoder encoder = null;
ByteArrayOutputStream output = null;
try {
Schema schema = new Schema.Parser().parse(schemaStr);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
input = new ByteArrayInputStream(json.getBytes());
output = new ByteArrayOutputStream();
DataInputStream din = new DataInputStream(input);
writer = new GenericDatumWriter<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
encoder = EncoderFactory.get().binaryEncoder(output, null);
GenericRecord datum;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException eofe) {
break;
}
writer.write(datum, encoder);
}
encoder.flush();
return output.toByteArray();
} finally {
try { input.close(); } catch (Exception e) { }
}
}
public static String avroToJson(byte[] avro, String schemaStr) throws IOException {
boolean pretty = false;
GenericDatumReader<GenericRecord> reader = null;
JsonEncoder encoder = null;
ByteArrayOutputStream output = null;
try {
Schema schema = new Schema.Parser().parse(schemaStr);
reader = new GenericDatumReader<GenericRecord>(schema);
InputStream input = new ByteArrayInputStream(avro);
output = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
Decoder decoder = DecoderFactory.get().binaryDecoder(input, null);
GenericRecord datum;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException eofe) {
break;
}
writer.write(datum, encoder);
}
encoder.flush();
output.flush();
return new String(output.toByteArray());
} finally {
try { if (output != null) output.close(); } catch (Exception e) { }
}
}
为了完整起见,这是一个例子,如果你使用流(Avro将这些称为
容器文件)而不是记录。请注意,当您从JSON返回到Avro时,您不需要传递模式。这是因为它存在于流中。
@Grapes([
@Grab(group='org.apache.avro', module='avro', version='1.7.7')
])
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
String schema = '''{
"type":"record",
"namespace":"foo",
"name":"Person",
"fields":[
{
"name":"name",
"type":"string"
},
{
"name":"age",
"type":"int"
}
]
}'''
String json = "{" +
"\"name\":\"Frank\"," +
"\"age\":47" +
"}"
assert avroToJson(jsonToAvro(json, schema)) == json
public static byte[] jsonToAvro(String json, String schemaStr) throws IOException {
InputStream input = null;
DataFileWriter<GenericRecord> writer = null;
Encoder encoder = null;
ByteArrayOutputStream output = null;
try {
Schema schema = new Schema.Parser().parse(schemaStr);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
input = new ByteArrayInputStream(json.getBytes());
output = new ByteArrayOutputStream();
DataInputStream din = new DataInputStream(input);
writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
writer.create(schema, output);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
GenericRecord datum;
while (true) {
try {
datum = reader.read(null, decoder);
} catch (EOFException eofe) {
break;
}
writer.append(datum);
}
writer.flush();
return output.toByteArray();
} finally {
try { input.close(); } catch (Exception e) { }
}
}
public static String avroToJson(byte[] avro) throws IOException {
boolean pretty = false;
GenericDatumReader<GenericRecord> reader = null;
JsonEncoder encoder = null;
ByteArrayOutputStream output = null;
try {
reader = new GenericDatumReader<GenericRecord>();
InputStream input = new ByteArrayInputStream(avro);
DataFileStream<GenericRecord> streamReader = new DataFileStream<GenericRecord>(input, reader);
output = new ByteArrayOutputStream();
Schema schema = streamReader.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
for (GenericRecord datum : streamReader) {
writer.write(datum, encoder);
}
encoder.flush();
output.flush();
return new String(output.toByteArray());
} finally {
try { if (output != null) output.close(); } catch (Exception e) { }
}
}