如何使用数据库/sql包批量执行SQL语句

62

如何使用Go的database/sql包批量执行SQL语句?

在Java中,我会这样做:

// Create a prepared statement
String sql = "INSERT INTO my_table VALUES(?)";
PreparedStatement pstmt = connection.prepareStatement(sql);

// Insert 10 rows of data
for (int i=0; i<10; i++) {
    pstmt.setString(1, ""+i);
    pstmt.addBatch();
}

// Execute the batch
int [] updateCounts = pstmt.executeBatch();

如何在Go中实现相同的功能?


我不相信在SQL包中存在这样的东西。 - Stephen Weinberg
13个回答

96

由于 db.Exec 函数是可变参数的,因此一个选项(实际上只需要进行一次网络往返)是自己构造语句并展开参数并传递它们。

示例代码:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, "(?, ?, ?)")
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", 
                        strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}
在我进行的简单测试中,相对于其他答案中提出的Begin、Prepare、Commit方法,这种解决方案在插入10,000行时快了约4倍——尽管实际改进将在很大程度上取决于您个人的设置、网络延迟等。

1
公平起见,您应该使用预处理语句和事务,然后再尝试一次,而不是每次都使用begin/prepare/commit。这将是更准确的比较。 - TrippyD
值得注意的是,这是X个单独的查询 - 它不是一个事务,因此任何(或多个)这些重复可能会失败,而其余部分则会继续执行。它也不能防止SQL注入。 - Xeoncross
6
在这里我写的方式是正确的,你也对它们进行了不同的事务处理。但我认为你可以轻松地使用 db.Begin(); <上面的代码片段,但是要加上 tx.Exec>; db.Commit() 来将它们作为一个事务来运行。此外,由于它使用了 ? 占位符,因此它安全的,可防止 SQL 注入。 - Andrew C
5
我使用这个答案来构建东西,并发现截至撰写本文时,MySQL 的占位符似乎有一个限制,即最多为 2^16-1(65,535)。为了保险起见,我最终在循环内运行了多个插入操作(大约每次插入10,000行数据)。 - j boschiero
7
如果有人看到这个答案并且像我一样没有意识到 (?, ?, ?) 的语法是 MySQL 特定的,需要改为 ($1, $2, $3), ... ($n, $n+1 $n+2) 才能在 Postgresql 中使用。请注意这一点。 - amwill04
显示剩余4条评论

19

17

根据安德鲁的解决方案修改为适用于不支持?占位符的PostgreSQL,如下所示:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}

1
好代码片段!这不会存在SQL注入漏洞吗? - Jona Rodrigues
4
我看不到风险,字符串插值只使用了递增的i参数(valueStrings将变成($1, $2, $3),($4, $5, $6),...)。 ExampleRowStructs仅传递给db.Exec,数据库驱动程序会负责替换占位符。 - MasterCarl

7

扩展Avi Flax的答案,我需要在我的INSERT中加入ON CONFLICT DO UPDATE子句。

解决方案是将数据复制到临时表(在事务结束时删除),然后从临时表插入到永久表中。

这是我选择的代码:

func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
    tx, err := fdata.db.Begin()
    if err != nil {
        return errors.Wrap(err, "begin transaction")
    }
    txOK := false
    defer func() {
        if !txOK {
            tx.Rollback()
        }
    }()

    // The ON COMMIT DROP clause at the end makes sure that the table
    // is cleaned up at the end of the transaction.
    // While the "for{..} state machine" goroutine in charge of delayed
    // saving ensures this function is not running twice at any given time.
    _, err = tx.Exec(sqlFDataMakeTempTable)
    // CREATE TEMPORARY TABLE fstore_data_load
    // (map text NOT NULL, key text NOT NULL, data json)
    // ON COMMIT DROP
    if err != nil {
        return errors.Wrap(err, "create temporary table")
    }

    stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
    for key, val := range items {
        _, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
        if err != nil {
            return errors.Wrap(err, "loading COPY data")
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return errors.Wrap(err, "flush COPY data")
    }
    err = stmt.Close()
    if err != nil {
        return errors.Wrap(err, "close COPY stmt")
    }

    _, err = tx.Exec(sqlFDataSetFromTemp)
    // INSERT INTO fstore_data (map, key, data)
    // SELECT map, key, data FROM fstore_data_load
    // ON CONFLICT DO UPDATE SET data = EXCLUDED.data
    if err != nil {
        return errors.Wrap(err, "move from temporary to real table")
    }

    err = tx.Commit()
    if err != nil {
        return errors.Wrap(err, "commit transaction")
    }
    txOK = true
    return nil
}

3

3

如果您正在使用Postgres,这是对@Debasish Mitra解决方案的一个看法。

功能示例:https://play.golang.org/p/dFFD2MrEy3J

备用示例:https://play.golang.org/p/vUtW0K4jVMd

data := []Person{{"John", "Doe", 27}, {"Leeroy", "Jenkins", 19}}

vals := []interface{}{}
for _, row := range data {
    vals = append(vals, row.FirstName, row.LastName, row.Age)
}

sqlStr := `INSERT INTO test(column1, column2, column3) VALUES %s`
sqlStr = ReplaceSQL(sqlStr, "(?, ?, ?)", len(data))

//Prepare and execute the statement
stmt, _ := db.Prepare(sqlStr)
res, _ := stmt.Exec(vals...)

替换SQL函数

func ReplaceSQL(stmt, pattern string, len int) string {
    pattern += ","
    stmt = fmt.Sprintf(stmt, strings.Repeat(pattern, len))
    n := 0
    for strings.IndexByte(stmt, '?') != -1 {
        n++
        param := "$" + strconv.Itoa(n)
        stmt = strings.Replace(stmt, "?", param, 1)
    }
    return strings.TrimSuffix(stmt, ",")
}

使用sqlx,您可以重新绑定给定方言的所有占位符。https://github.com/jmoiron/sqlx/blob/master/bind.go#L44 我想它经过了实战考验。 - user4466350

2

将Andrew C的想法应用于我的工作需求,并使用sql标量变量进行调整。它完美地适用于我工作中特定的要求。也许对某些人很有用,因为它在golang中模拟sql批处理事务。

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("(@p%d, @p%d, @p%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    sqlQuery := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))

    var params []interface{}

    for i := 0; i < len(valueArgs); i++ {
        var param sql.NamedArg
        param.Name = fmt.Sprintf("p%v", i+1)
        param.Value = valueArgs[i]
        params = append(params, param)
    }

    _, err := db.Exec(sqlQuery, params...)
    return err
}

2

我已经成功实现了pq.CopyIn,并且它的速度比字符串值/参数方法快2.4倍(这个方法非常有帮助,而且是一种优雅的解决方案,所以谢谢你!)

我将1000万个int、varchar测试值插入了一个结构体,并使用以下函数加载了它。我对GoLang还比较新,所以请多包涵...

func copyData(client *client.DbClient, dataModels []*dataModel) error{
    db := *client.DB
    txn, err := db.Begin()
    if err != nil {
        return err
    }
    defer txn.Commit()

    stmt, err := txn.Prepare(pq.CopyIn("_temp", "a", "b"))
    if err != nil {
        return(err)
    }

    for _, model := range dataModels{
        _, err := stmt.Exec(model.a, model.b)
        if err != nil {
            txn.Rollback()
            return err
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return err
    }

    err = stmt.Close()
    if err != nil {
        return err
    }

    return nil
    }

已用时间(stringValues/args):1分30.60秒。

已用时间(copyIn):37.57秒。


卡在 _, err = stmt.Exec() - Igor

2
对于Postgres,lib pq支持批量插入:https://godoc.org/github.com/lib/pq#hdr-Bulk_imports。但是,以下代码也可以实现相同的功能,当尝试执行批量条件更新时(根据查询进行更改),它真正有用。要执行类似的批量插入,请使用以下函数。
// ReplaceSQL replaces the instance occurrence of any string pattern with an increasing $n based sequence
func ReplaceSQL(old, searchPattern string) string {
   tmpCount := strings.Count(old, searchPattern)
   for m := 1; m <= tmpCount; m++ {
      old = strings.Replace(old, searchPattern, "$"+strconv.Itoa(m), 1)
   }
   return old
}

那么上面的示例变成了

sqlStr := "INSERT INTO test(n1, n2, n3) VALUES "
vals := []interface{}{}

for _, row := range data {
   sqlStr += "(?, ?, ?)," // Put "?" symbol equal to number of columns
   vals = append(vals, row["v1"], row["v2"], row["v3"]) // Put row["v{n}"] blocks equal to number of columns
}

//trim the last ,
sqlStr = strings.TrimSuffix(sqlStr, ",")

//Replacing ? with $n for postgres
sqlStr = ReplaceSQL(sqlStr, "?")

//prepare the statement
stmt, _ := db.Prepare(sqlStr)

//format all vals at once
res, _ := stmt.Exec(vals...)

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接