分隔符未找到错误 - 使用Kinesis Firehose从s3加载到AWS Redshift

4
我将使用Kinesis Firehose通过S3将数据传输到Redshift。我有一个非常简单的csv文件,看起来像这样。Firehose将其放入S3中,但Redshift出现了分隔符未找到的错误。我查看了与此错误相关的所有帖子,但我确保包含了分隔符。 文件
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:23:56.986397,848.78
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:24:02.061263,848.78
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:24:07.143044,848.78
GOOG,2017-03-16T16:00:01Z,2017-03-17 06:24:12.217930,848.78

或者

"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:48:59.993260","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:07.034945","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:12.306484","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:18.020833","852.12"
"GOOG","2017-03-17T16:00:02Z","2017-03-18 05:49:24.203464","852.12"

Redshift表

CREATE TABLE stockvalue
( symbol                   VARCHAR(4),
  streamdate               VARCHAR(20),
  writedate                VARCHAR(26),
  stockprice               VARCHAR(6)
);
  • 错误 错误

  • 以防万一,这是我的 Kinesis 流的样子 Firehose

有人能指出文件可能存在什么问题吗? 我在字段之间添加了逗号。 目标表中的所有列都是 varchar 类型,因此不应该出现数据类型错误。 此外,文件和 Redshift 表之间的列长度完全相同。 我尝试过用双引号嵌入列,也尝试过不用。


你能发一下你正在使用的完整COPY命令吗? - Rahul Gupta
你修好了吗?我在数据管道方面遇到了同样的问题。 - RSHAP
3个回答

6

您能发布完整的COPY命令吗?截图中被截断了。

我猜测您的COPY命令缺少DELIMITER ','。尝试将其添加到COPY命令中。


请你也能看一下我的问题吗?https://stackoverflow.com/questions/59017418/change-delimiter-to-pipe-in-aws-data-pipeline - Muhammad Hashir Anwaar

4

我卡了好几个小时,感谢 Shahid的答案,它帮助我解决了问题。

列名的文本大小写非常重要

Redshift始终将表的列视为小写,因此在将JSON键映射到列时,请确保JSON键是小写的,例如:

您的JSON文件将如下所示:

{'id': 'val1', 'name': 'val2'}{'id': 'val1', 'name': 'val2'}{'id': 'val1', 'name': 'val2'}{'id': 'val1', 'name': 'val2'}

并且COPY语句将会像这样

COPY latency(id,name) FROM 's3://<bucket-name>/<manifest>' CREDENTIALS 'aws_iam_role=arn:aws:iam::<aws-account-id>:role/<role-name>' MANIFEST json 'auto';

在Firehose中,设置必须指定列名称(再次使用小写字母)。此外,将以下内容添加到Firehose复制选项中:

json 'auto' TRUNCATECOLUMNS blanksasnull emptyasnull

如何从Python调用put_records:

以下是展示如何在Python中使用put_records函数与Kinesis交互的代码片段:

传入'put_to_stream'函数的'objects'参数是一个字典数组:

def put_to_stream(objects):
    records = []

    for metric in metrics:
        record = {
            'Data': json.dumps(metric),
            'PartitionKey': 'swat_report'
        };

        records.append(record)

    print(records)

    put_response = kinesis_client.put_records(StreamName=kinesis_stream_name, Records=records)

flush
``

json 'auto' TRUNCATECOLUMNS blanksasnull emptyasnull 对我很重要,非常感谢! - Phillip Fleischer

1

1- 您需要添加FORMAT AS JSON 's3://yourbucketname/aJsonPathFile.txt'。AWS尚未提到此事。请注意,仅当您的数据以JSON形式存储时才起作用,例如:

{'attr1': 'val1', 'attr2': 'val2'} {'attr1': 'val1', 'attr2': 'val2'} {'attr1': 'val1', 'attr2': 'val2'} {'attr1': 'val1', 'attr2': 'val2'}

2- 您还需要验证Kinesis Firehouse中的列顺序和CSV文件中的列顺序,并尝试添加

TRUNCATECOLUMNS blanksasnull emptyasnull

3- 一个例子

COPY testrbl3 (eventId, serverTime, pageName, action, ip, userAgent, location, plateform, language, campaign, content, source, medium, productID, colorCode, scrolltoppercentage) FROM 's3://bucketname/' CREDENTIALS 'aws_iam_role=arn:aws:iam:::role/' MANIFEST json 'auto' TRUNCATECOLUMNS blanksasnull emptyasnull;


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接