Apache Flink的AWS S3 Sink在本地测试时需要Hadoop吗?

4

我对Apache Flink比较新,正在尝试创建一个简单的项目,将文件生成到AWS S3存储桶中。根据文档,看起来我需要安装Hadoop才能实现这一点。

如何设置本地环境以允许我测试此功能?我已经在本地安装了Apache Flink和Hadoop。我已经对Hadoop的core-site.xml配置进行了必要的更改,并将我的HADOOP_CONF路径添加到了flink.yaml配置中。当我尝试通过Flink UI在本地提交作业时,总是会出现错误。

2016-12-29 16:03:49,861 INFO  org.apache.flink.util.NetUtils                                - Unable to allocate on port 6123, due to error: Address already in use
2016-12-29 16:03:49,862 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to run JobManager.
java.lang.RuntimeException: Unable to do further retries starting the actor system
    at org.apache.flink.runtime.jobmanager.JobManager$.retryOnBindException(JobManager.scala:2203)
    at org.apache.flink.runtime.jobmanager.JobManager$.runJobManager(JobManager.scala:2143)
    at org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:2040)
    at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)

我猜测我的环境设置有些问题。是否可以在本地完成这个操作?非常感谢您的帮助。


检查端口6123是否正在使用。如果没有,则禁用您的防火墙/iptables。 - Ani Menon
4个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
4

当您需要使用Hadoop库时,不必安装Hadoop即可在本地运行并写入S3。我刚好试过使用基于Avro模式和生成的SpecificRecord向S3写入Parquet输出。我通过SBT和Intellij Idea本地运行以下代码的版本。所需部分:

1)有以下文件指定所需的Hadoop属性(注意:定义AWS访问密钥/秘密密钥不是推荐做法。最好在具有适当IAM角色以读取/写入您的S3存储桶的EC2实例上运行。但是在本地进行测试需要)

<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 导入: import com.uebercomputing.eventrecord.EventOnlyRecord

import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job

import org.apache.parquet.avro.AvroParquetOutputFormat

3) Flink代码使用具有上述配置的HadoopOutputFormat:

    val events: DataSet[(Void, EventOnlyRecord)] = ...

    val hadoopConfig = getHadoopConfiguration(hadoopConfigFile)

    val outputFormat = new AvroParquetOutputFormat[EventOnlyRecord]
    val outputJob = Job.getInstance

    //Note: AvroParquetOutputFormat extends FileOutputFormat[Void,T]
    //so key is Void, value of type T - EventOnlyRecord in this case
    val hadoopOutputFormat = new HadoopOutputFormat[Void, EventOnlyRecord](
      outputFormat,
      outputJob
    )

    val outputConfig = outputJob.getConfiguration
    outputConfig.addResource(hadoopConfig)
    val outputPath = new Path("s3://<bucket>/<dir-prefix>")
    FileOutputFormat.setOutputPath(outputJob, outputPath)
    AvroParquetOutputFormat.setSchema(outputJob, EventOnlyRecord.getClassSchema)

    events.output(hadoopOutputFormat)

    env.execute

    ...

    def getHadoopConfiguration(hadoodConfigPath: String): HadoopConfiguration = {
      val hadoopConfig = new HadoopConfiguration()
      hadoopConfig.addResource(new Path(hadoodConfigPath))
      hadoopConfig
    }

4) 构建依赖项及其版本:

    val awsSdkVersion = "1.7.4"
    val hadoopVersion = "2.7.3"
    val flinkVersion = "1.1.4"

    val flinkDependencies = Seq(
      ("org.apache.flink" %% "flink-scala" % flinkVersion),
      ("org.apache.flink" %% "flink-hadoop-compatibility" % flinkVersion)
    )

    val providedFlinkDependencies = flinkDependencies.map(_ % "provided")

    val serializationDependencies = Seq(
      ("org.apache.avro" % "avro" % "1.7.7"),
      ("org.apache.avro" % "avro-mapred" % "1.7.7").classifier("hadoop2"),
      ("org.apache.parquet" % "parquet-avro" % "1.8.1")
    )

    val s3Dependencies = Seq(
      ("com.amazonaws" % "aws-java-sdk" % awsSdkVersion),
      ("org.apache.hadoop" % "hadoop-aws" % hadoopVersion)
    )

使用writeAsText到S3的编辑:

1) 创建一个Hadoop配置目录(将其称为hadoop-conf-dir),其中包含一个core-site.xml文件。

例如:

mkdir /home/<user>/hadoop-config
cd /home/<user>/hadoop-config
vi core-site.xml

#content of core-site.xml 
<configuration>
    <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
    </property>

    <!-- Comma separated list of local directories used to buffer
         large results prior to transmitting them to S3. -->
    <property>
        <name>fs.s3a.buffer.dir</name>
        <value>/tmp</value>
    </property>

    <!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
    <property>
        <name>fs.s3a.access.key</name>
        <value>YOUR_ACCESS_KEY</value>
    </property>

    <!-- set your AWS access key -->
    <property>
        <name>fs.s3a.secret.key</name>
        <value>YOUR_SECRET_KEY</value>
    </property>
</configuration>

2) 创建一个目录(将其引用为 flink-conf-dir),其中包含一个名为 flink-conf.yaml 的文件。

例如:

mkdir /home/<user>/flink-config
cd /home/<user>/flink-config
vi flink-conf.yaml

//content of flink-conf.yaml - continuing earlier example
fs.hdfs.hadoopconf: /home/<user>/hadoop-config

3) 编辑您用于运行S3 Flink作业的IntelliJ运行配置 - 运行 - 编辑配置 - 并添加以下环境变量:

FLINK_CONF_DIR and set it to your flink-conf-dir

Continuing the example above:
FLINK_CONF_DIR=/home/<user>/flink-config

4) 设置该环境变量并运行代码:

events.writeAsText("s3://<bucket>/<prefix-dir>")

env.execute

感谢您的回复。有没有办法可以只将本地Java执行指向Hadoop配置文件,而不定义输出路径?根据文档,似乎我应该能够像这样做:messageStream.writeAsText("s3://..."); 但是当我通过IntelliJ运行本地执行时,它不知道那个文件在哪里。我也找不到任何Flink操作,可以让我在运行时设置它。 - medium
问题在于当您调用writeAsText时,使用的默认HadoopFileSystem不“知道”s3文件系统。请参见我上面原始答案的编辑。 - medale
所以我认为我已经把一切都搞定了,但是我在访问我的 S3 存储桶时遇到了访问问题,并出现以下错误: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: **********, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID:我不确定为什么会出现访问错误,因为应用程序中使用的密钥是创建 S3 存储桶的同一帐户。现在似乎 flink 的一切都正常工作。如果您有关于我为什么会遇到此错误的任何提示,请告诉我。再次感谢! - medium
@medium 创建存储桶和将对象放入存储桶可能需要不同的IAM权限?请参阅http://docs.aws.amazon.com/AmazonS3/latest/dev/using-with-s3-actions.html。您能否使用这些密钥从AWS CLI(https://aws.amazon.com/cli/)编写?“aws s3 cp <localfile> s3://<your-bucket>/”听起来Flink部分已经设置好了。 - medale
请帮助我了解如何在 flink 集群环境中运行的 flink 应用程序中完成这个任务。 - undefined

1
我必须执行以下步骤才能在本地运行我的 Flink 作业,并将其汇入 S3: 1- 将 flink-s3-fs-hadoop-1.9.1.jar 添加到我的 flink/plugins/flink-s3-fs-hadoop 目录中。 2- 修改 flink/conf/flink-conf.yaml,包括以下内容: s3.access-key: AWS_ACCESS_KEY s3.secret-key: AWS_SECRET_KEY fs.hdfs.hadoopconf: /etc/hadoop-config 我有 core-site.xml 文件位于 hadoop-config 文件夹中,但它不包含任何配置,因此可能不需要 fs.hdfs.hadoopconf。

0
在sbt中,我只需要添加S3库依赖项即可像使用本地文件系统一样使用它。 SBT文件: ```"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion.value``` 读取示例:
    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> text = env.readTextFile("s3://etl-data-ia/test/fileStreamTest.csv");
    text.print();
    env.execute("test");}

0

根据这个链接https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

要使用 flink-s3-fs-hadoop 插件,您应该在启动 Flink 之前将相应的 JAR 文件从 opt 目录复制到 Flink 分发的 plugins 目录中。

我知道的另一种方法是通过环境变量 ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-[flink-version].jar" 启用它

例如:flink-s3-fs-hadoop-1.12.2.jar

对于这两种方式,我们都必须在 flink-conf.yaml 文件中定义 S3 配置

Flink 将内部将其转换回 fs.s3a.connection.maximum。无需使用 Hadoop 的 XML 配置文件传递配置参数。

s3.endpoint: <end-point>
s3.path.style.access : true
关于AWS凭证,它们必须在环境变量中提供或在flink-conf.yaml中进行配置。
s3.endpoint: <end-point>
s3.path.style.access : true
s3.access-key: <key>
s3.secret-key: <value>
s3.region: <region>

一旦所有设置完成,您可以像@EyalP提到的那样从S3读取,或者写入S3(即使用数据集)

dataset.map(new MapToJsonString())
                .writeAsText("s3://....",
                        FileSystem.WriteMode.OVERWRITE);

如果您想在本地测试它(没有真实的AWS帐户),我建议您检查localstack。它完全支持各种AWS服务(包括S3)。如果您选择使用它,则AWS凭据不是必需的(可以提供为空),终端点将是本地堆栈本身。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,