在Golang中使用BigQuery写入API

4
我正在尝试使用新的Bigquery Storage API从Golang进行流式插入。根据this page,我了解到该API替换了旧的流式插入bigquery API。
然而,文档中示例都没有展示如何实际插入行。为了创建一个AppendRowsRequest,我得到了以下内容:
&storagepb.AppendRowsRequest{
    WriteStream: resp.Name,
    Rows: &storagepb.AppendRowsRequest_ProtoRows{
        ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
            WriterSchema: nil, // protobuf schema??
            Rows: &storagepb.ProtoRows{
                SerializedRows: [][]byte{}, // serialized protocol buffer data??
            },
        },
    },
}

我应该将什么数据放入上面的SerializedRows字段中?

上面的storagepb.ProtoRows结构在这里有文档记录。不幸的是,只给出了一个链接到协议缓冲区的主要概述页面。

有人能给我一个使用新的Bigquery存储API从Golang流式传输行到bigquery的示例吗?

2个回答

5
在得到上面回答的许多帮助后,我已经得到了一个可行的示例,可以在GitHub上获得: https://github.com/alexflint/bigquery-storage-api-example 主要代码如下:
const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable"
    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row{
    {Name: "John Doe", Age: 104},
    {Name: "Jane Doe", Age: 69},
    {Name: "Adam Smith", Age: 33},
}

func main() {
    ctx := context.Background()

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream{
            Type: storagepb.WriteStream_COMMITTED,
        },
    })
    if err != nil {
        log.Fatal("CreateWriteStream: ", err)
    }

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil {
        log.Fatal("AppendRows: ", err)
    }

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil {
        log.Fatal("NormalizeDescriptor: ", err)
    }

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows {
        buf, err := opts.Marshal(row)
        if err != nil {
            log.Fatal("protobuf.Marshal: ", err)
        }
        data = append(data, buf)
    }

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest{
        WriteStream: resp.Name,
        TraceId:     trace, // identifies this client
        Rows: &storagepb.AppendRowsRequest_ProtoRows{
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema{
                    ProtoDescriptor: descriptor,
                },
                // protocol buffer data
                Rows: &storagepb.ProtoRows{
                    SerializedRows: data, // serialized protocol buffer data
                },
            },
        },
    })
    if err != nil {
        log.Fatal("AppendRows.Send: ", err)
    }

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil {
        log.Fatal("AppendRows.Recv: ", err)
    }

    log.Println("done")
}

"Row" 结构体的协议缓冲区定义如下:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
    string Name = 1;
    int32 Age = 2;
}

首先需要创建一个符合协议缓冲区的架构的BigQuery数据集和表格。请参考上面链接仓库中的readme文件了解如何操作。

运行以上代码后,数据会在BigQuery中呈现为以下形式:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+

感谢大家的帮助!

我尝试了上述代码(使用 storagepb.WriteStream_COMMITTED),没有出现任何错误,但是记录在 BigQuery 中不会立即可用。数据需要一段时间才能进行查询吗? - EyalP
@EyalP 数据应该立即可用 - 这是流式插入 API 的关键目标之一。在较新版本的 bigquery API 中可能发生了变化(我在撰写此文时使用的是 v1beta2)。也许可以尝试使用我在这里存储库中提供的确切代码和依赖版本:https://github.com/alexflint/bigquery-storage-api-example。如果这样还不起作用,请确保在查询表时选择最近的条目。 - Alex Flint
也许是因为我的表格配置了多个地区。不管怎样,感谢您的回复。 - EyalP

1
我找到了一些关于将流写入表格的文档[1][2],但我不确定这是否是您要寻找的内容。请注意,存储/apiv1beta2目前处于测试阶段,因此可能尚未实施或缺乏相关文档。如果我提供的文档无法帮助您,我们可以打开一个公共问题跟踪器来正确记录或实现行流。

是的,这些是我在代码库和文档中找到的示例。无论如何,感谢您提供的指引。 - Alex Flint
1
你是否在寻找类似于 https://github.com/googleapis/python-bigquery-storage/blob/HEAD/samples/snippets/append_rows_pending.py 的 Golang 版本? - Carlos CB
是的,看起来很有帮助。我猜在Golang中还没有写过这样的例子。Python的例子仍然有用。非常感谢。 - Alex Flint
1
嗨,Alex!抱歉回复晚了,我刚刚开了一个公共功能请求。请随意添加您认为相关的任何信息。 - Carlos CB
1
此外,当前的“managedwriter”客户端在这里,也许你会发现它很有用。 - Carlos CB

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接