如何使用Apache Avro对JSON字符串进行二进制编码?

8

我正在尝试将我的JSON字符串进行AVRO二进制编码。以下是我的JSON字符串,我已经创建了一个简单的方法来完成转换,但我不确定我所做的方式是否正确?

public static void main(String args[]) throws Exception{
try{
    Schema schema = new Parser().parse((TestExample.class.getResourceAsStream("/3233.avsc")));
    String json="{"+
        "  \"location\" : {"+
        "    \"devices\":["+
        "      {"+
        "        \"did\":\"9abd09-439bcd-629a8f\","+
        "        \"dt\":\"browser\","+
        "        \"usl\":{"+
        "          \"pos\":{"+
        "            \"source\":\"GPS\","+
        "            \"lat\":90.0,"+
        "            \"long\":101.0,"+
        "            \"acc\":100"+
        "          },"+
        "          \"addSource\":\"LL\","+
        "          \"add\":["+
        "            {"+
        "              \"val\":\"2123\","+
        "              \"type\" : \"NUM\""+
        "            },"+
        "            {"+
        "              \"val\":\"Harris ST\","+
        "              \"type\" : \"ST\""+
        "            }"+
        "          ],"+
        "          \"ei\":{"+
        "            \"ibm\":true,"+
        "            \"sr\":10,"+
        "            \"ienz\":true,"+
        "            \"enz\":100,"+
        "            \"enr\":10"+
        "          },"+
        "          \"lm\":1390598086120"+
        "        }"+
        "      }"+
        "    ],"+
        "    \"ver\" : \"1.0\""+
        "  }"+
        "}";

    byte[] avroByteArray = fromJsonToAvro(json,schema);

} catch (Exception ex) {
    // log an exception
}

下面的方法将把我的JSON字符串转换为Avro二进制编码 -
private static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {

    InputStream input = new ByteArrayInputStream(json.getBytes());
    DataInputStream din = new DataInputStream(input);   

    Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

    DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
    Object datum = reader.read(null, decoder);


    GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

    Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

    w.write(datum, e);
    e.flush();

    return outputStream.toByteArray();
}

有人可以看一下并告诉我,我尝试将JSON字符串转换为AVRO二进制的方式是否正确吗?


就此而言,Apache Avro规范 - Hot Licks
不清楚将JSON“转换为Avro”意味着什么,因为根据规范,Avro符号只是对JSON字符串格式施加的特定约束集。 - Hot Licks
无论如何,Apache似乎提供了一组实用工具,因此不清楚为什么需要编写自己的工具。 - Hot Licks
嗯...我不确定我是否正确理解了这个问题。我有一个JSON字符串,需要将其编码为Avro二进制格式。我应该如何做呢?我现在的方法不正确吗? - AKIWEB
@HotLicks,我发现自己也遇到了同样的问题,您能指出在哪里查找这些Apache实用程序以找到等效的函数/方法吗? - diegoruizbarbero
3个回答

24

我认为OP是正确的。这将写入Avro记录本身,而不是如果这是一个Avro数据文件时存在的模式。

以下是Avro本身中的一些示例(如果您正在使用文件,则非常有用):
    • 从JSON到Avro:DataFileWriteTool
    • 从Avro到JSON:DataFileReadTool

这里有一个完整的双向示例。

@Grapes([
    @Grab(group='org.apache.avro', module='avro', version='1.7.7')
])

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;

String schema = '''{
  "type":"record",
  "namespace":"foo",
  "name":"Person",
  "fields":[
    {
      "name":"name",
      "type":"string"
    },
    {
      "name":"age",
      "type":"int"
    }
  ]
}'''
String json = "{" +
  "\"name\":\"Frank\"," +
  "\"age\":47" +
"}"

assert avroToJson(jsonToAvro(json, schema), schema) == json


public static byte[] jsonToAvro(String json, String schemaStr) throws IOException {
    InputStream input = null;
    GenericDatumWriter<GenericRecord> writer = null;
    Encoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        Schema schema = new Schema.Parser().parse(schemaStr);
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
        input = new ByteArrayInputStream(json.getBytes());
        output = new ByteArrayOutputStream();
        DataInputStream din = new DataInputStream(input);
        writer = new GenericDatumWriter<GenericRecord>(schema);
        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
        encoder = EncoderFactory.get().binaryEncoder(output, null);
        GenericRecord datum;
        while (true) {
            try {
                datum = reader.read(null, decoder);
            } catch (EOFException eofe) {
                break;
            }
            writer.write(datum, encoder);
        }
        encoder.flush();
        return output.toByteArray();
    } finally {
        try { input.close(); } catch (Exception e) { }
    }
}

public static String avroToJson(byte[] avro, String schemaStr) throws IOException {
    boolean pretty = false;
    GenericDatumReader<GenericRecord> reader = null;
    JsonEncoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        Schema schema = new Schema.Parser().parse(schemaStr);
        reader = new GenericDatumReader<GenericRecord>(schema);
        InputStream input = new ByteArrayInputStream(avro);
        output = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
        Decoder decoder = DecoderFactory.get().binaryDecoder(input, null);
        GenericRecord datum;
        while (true) {
            try {
                datum = reader.read(null, decoder);
            } catch (EOFException eofe) {
                break;
            }
            writer.write(datum, encoder);
        }
        encoder.flush();
        output.flush();
        return new String(output.toByteArray());
    } finally {
        try { if (output != null) output.close(); } catch (Exception e) { }
    }
}

为了完整起见,这是一个例子,如果你使用流(Avro将这些称为容器文件)而不是记录。请注意,当您从JSON返回到Avro时,您不需要传递模式。这是因为它存在于流中。
@Grapes([
    @Grab(group='org.apache.avro', module='avro', version='1.7.7')
])

// writes Avro as a http://avro.apache.org/docs/current/spec.html#Object+Container+Files rather than a sequence of records

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;


String schema = '''{
  "type":"record",
  "namespace":"foo",
  "name":"Person",
  "fields":[
    {
      "name":"name",
      "type":"string"
    },
    {
      "name":"age",
      "type":"int"
    }
  ]
}'''
String json = "{" +
  "\"name\":\"Frank\"," +
  "\"age\":47" +
"}"

assert avroToJson(jsonToAvro(json, schema)) == json


public static byte[] jsonToAvro(String json, String schemaStr) throws IOException {
    InputStream input = null;
    DataFileWriter<GenericRecord> writer = null;
    Encoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        Schema schema = new Schema.Parser().parse(schemaStr);
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
        input = new ByteArrayInputStream(json.getBytes());
        output = new ByteArrayOutputStream();
        DataInputStream din = new DataInputStream(input);
        writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
        writer.create(schema, output);
        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
        GenericRecord datum;
        while (true) {
            try {
                datum = reader.read(null, decoder);
            } catch (EOFException eofe) {
                break;
            }
            writer.append(datum);
        }
        writer.flush();
        return output.toByteArray();
    } finally {
        try { input.close(); } catch (Exception e) { }
    }
}

public static String avroToJson(byte[] avro) throws IOException {
    boolean pretty = false;
    GenericDatumReader<GenericRecord> reader = null;
    JsonEncoder encoder = null;
    ByteArrayOutputStream output = null;
    try {
        reader = new GenericDatumReader<GenericRecord>();
        InputStream input = new ByteArrayInputStream(avro);
        DataFileStream<GenericRecord> streamReader = new DataFileStream<GenericRecord>(input, reader);
        output = new ByteArrayOutputStream();
        Schema schema = streamReader.getSchema();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
        for (GenericRecord datum : streamReader) {
            writer.write(datum, encoder);
        }
        encoder.flush();
        output.flush();
        return new String(output.toByteArray());
    } finally {
        try { if (output != null) output.close(); } catch (Exception e) { }
    }
}

2

当您知道json文件({input_file}.json)的模式({schema_file}.avsc)时,可以使用avro-tools将其转换为avro文件({output_file}.avro)。就像下面这样:

java -jar the/path/of/avro-tools-1.8.1.jar fromjson {input_file}.json   --schema-file {schema_file}.avsc > {output_file}.avro

顺便提一下,{schema_file}.avsc 文件的内容如下:
{"type": "record",
 "name": "User",
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}
  ]
 }

下载 avro-tools-1.8.1

下载其它 avro-tools


注:这些链接提供了下载Apache Avro工具的位置。

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接