从GCS迁移CSV到PostgreSQL

6
我正在尝试使用Python脚本将从BigQuery导出的CSV文件从Google Cloud Storage(GCS)迁移到PostgreSQL Google云sql实例。我希望使用Google API,但在文档中发现了这个问题:文档中指出:不支持使用Cloud SQL Admin API导入CSV数据到PostgreSQL实例。作为替代方案,我可以使用psycopg2库并将csv文件的行流式传输到SQL实例。我有三种方法可以做到这一点:
逐行处理:读取每行,然后提交插入命令,最后提交。 批量流:读取每行,然后提交插入命令,每10行或100行后提交一次。 整个csv:读取每行并提交插入命令,然后只在文档末尾提交。
我的担忧是这些csv文件可能包含数百万行,运行上述任何一个选项的过程似乎都不是明智之举。我有哪些替代方案呢?基本上,我们在BigQuery中有一些原始数据,在导出到GCS之前进行预处理,以准备将其导入到PostgreSQL实例。我需要将这些预处理数据从BigQuery导出到PostgreSQL实例。这不是此问题的重复,因为我更喜欢从BigQuery导出数据到PostgreSQL实例的解决方案,无论是通过GCS还是直接导出。

1
为什么不使用Cloud Dataflow呢?听起来这是一个不错的选择。 - Graham Polley
我没有不这样做的好理由。这本应该是项目另一部分的快速测试的一部分。希望能够在没有设置数据流管道的情况下完成它。我以前从未使用过Dataflow。 - DJ319
听起来这将是一个非常简单的管道。好的一点是它会为您自动进行扩展,并具有与BigQuery和CloudSQL的本地数据源/汇。 - Graham Polley
作为替代方案,您可以将数据集加载到pandas中,并使用其自己的方法将它们发送到SQL连接,例如psycopg2。 - Eir Nym
可能是重复的问题:如何将CSV文件数据导入到PostgreSQL表中? - Eir Nym
另外顺便问一下,有人知道为什么在 PostgreSQL 实例中不支持导入 CSV 文件吗? - DJ319
4个回答

3

在开始之前,您应该确保:

您将要导入的数据库和表已经存在于Cloud SQL实例中。

CSV文件格式要求:CSV文件必须为每行数据提供一行,并具有逗号分隔的字段。

然后,您可以按照以下步骤使用位于GCS存储桶中的CSV文件将数据导入Cloud SQL实例

  1. 描述您正在从中导出的实例:

gcloud sql instances describe [INSTANCE_NAME]

  1. 复制serviceAccountEmailAddress字段。

  2. 将服务帐户作为编写器添加到存储桶ACL中:

gsutil acl ch -u [SERVICE_ACCOUNT_ADDRESS]:W gs://[BUCKET_NAME]

  1. 将服务帐户添加为读者到导入文件中:

gsutil acl ch -u [SERVICE_ACCOUNT_ADDRESS]:R gs://[BUCKET_NAME]/[IMPORT_FILE_NAME]

  1. 导入文件

gcloud sql import csv [INSTANCE_NAME] gs://[BUCKET_NAME]/[FILE_NAME] \ --database=[DATABASE_NAME] --table=[TABLE_NAME]

  1. 如果您不需要保留之前设置的ACL提供的权限,请删除ACL:

gsutil acl ch -d [SERVICE_ACCOUNT_ADDRESS] gs://[BUCKET_NAME]


3
您可以像@GrahamPolley建议的那样使用Cloud Dataflow进行导入过程。虽然这种解决方案需要一些额外的工作(熟悉Dataflow,设置所有内容等),但即使有了这些额外的工作,这仍将是您情况下首选的解决方案。但是,还有其他解决方案可用,我将在下面解释其中之一。
要使用Dataflow设置迁移过程,此关于将BigQuery导出到Google Datastore的教程是一个很好的例子。

Cloud Dataflow的替代解决方案

Cloud SQL for PostgreSQL不支持从.CSV文件导入数据,但它支持.SQL文件。

指定uri的文件类型。
SQL: 文件包含SQL语句。
CSV: 文件包含CSV数据。 不支持使用Cloud SQL Admin API导入CSV数据到PostgreSQL实例。

一种直接的解决方法是使用某些工具(Google没有提供我所知道的工具,但有许多在线工具)将.CSV文件转换为.SQL文件,然后再导入到PostgreSQL中。

如果您想以更“编程化”的方式实现此解决方案,我建议使用Cloud Functions,这里是我尝试如何实现它的一个示例:

  1. 设置一个云函数,当文件上传到云存储桶时触发(触发器)
  2. 编写函数以获取上传的文件并检查其是否为.CSV文件。如果是,则使用csv-to-sql API(API示例)将文件转换为.SQL文件
  3. 将新文件存储在云存储中
  4. 导入到PostgreSQL

谢谢您的答案。我同意建立数据流管道是解决此问题最正确的方法。这就是为什么我会标记您的回答为已接受。 我发现了另一种方法,在一个答案中我将详细介绍它,这使我能够使用我已经编写的其余代码。 - DJ319

0
我发现pyscopg2模块有copy_from()函数,它允许加载整个csv文件而不是逐行流式传输。使用此方法的缺点是仍然需要从GCS下载csv文件并在本地存储。
以下是使用pyscopg2 'copy_from()'的详细信息(来自这里)。
import psycopg2

conn = psycopg2.connect("host=localhost dbname=postgres user=postgres")
cur = conn.cursor()
with open('user_accounts.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f)  # Skip the header row.
    cur.copy_from(f, 'users', sep=',')

conn.commit()

0
你可以使用一个类来使从互联网上获取的文本表现得像一个文件。我已经多次使用过这个方法。
import io
import sys


class IteratorFile(io.TextIOBase):
    """ given an iterator which yields strings,
    return a file like object for reading those strings """

    def __init__(self, obj):
        elements = "{}|" * len(obj[0])
        elements = (unicode(elements[:-1]).format(*x) for x in obj)
        self._it = elements
        self._f = io.cStringIO()

    def read(self, length=sys.maxsize):

        try:
            while self._f.tell() < length:
                self._f.write(next(self._it) + "\n")

        except StopIteration as e:
            # soak up StopIteration. this block is not necessary because
            # of finally, but just to be explicit
            pass

        except Exception as e:
            print("uncaught exception: {}".format(e))

        finally:
            self._f.seek(0)
            data = self._f.read(length)

            # save the remainder for next read
            remainder = self._f.read()
            self._f.seek(0)
            self._f.truncate(0)
            self._f.write(remainder)
            return data

    def readline(self):
        return next(self._it)

这是为了避免需要本地下载文件吗? - DJ319
是的,它将内存中的二进制数据视为文件处理。 - eatmeimadanish
本地和在Google Cloud上的内存是一样的。没有本地存储磁盘。 - Courvoisier

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接