在Flink 1.11.1中找不到可执行应用程序的ExecutorFactory。

8
首先,我阅读了此帖子,关于同样的问题(链接),并尝试按照相同的解决方案进行操作(使用mvn创建一个新的快速入门项目并将代码迁移到其中),但是即使在IntelliJ的开箱即用情况下也无法正常工作。
这是我的pom.xml与另一个pom.xml中的依赖项混合在一起。我做错了什么?
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.test</groupId>
    <artifactId>job</artifactId>
    <version>1</version>
    <packaging>jar</packaging>

    <name>funnel-cep-analytics</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-fs</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-metrics-prometheus -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-prometheus_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.4-1201-jdbc41</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1-SNAPSHOT</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-test-utils -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
            <classifier>tests</classifier>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
            <classifier>tests</classifier>
        </dependency>

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils-junit</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>9</source>
                    <target>9</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>path.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>
            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>
</project>

同时在 IntelliJ 运行时,出现以下错误:

java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
        at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) ~[cep-1.5.0.jar:?]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803) ~[cep-1.5.0.jar:?]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713) ~[cep-1.5.0.jar:?]
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) ~[cep-1.5.0.jar:?]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699) ~[job.jar:?]
        at com.test.job.StreamingJob.runCEP(StreamingJob.java:122) ~[job.jar:?]
        at com.test.job.StreamingJob.init(StreamingJob.java:107) ~[job.jar:?]
        at com.test.job.StreamingJob.main(StreamingJob.java:64) [job.jar:?]

我正在使用本地环境配置。 - Alter
11个回答

13

对我来说,以下依赖项非常有效:

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.12.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.12.2</version>
        </dependency>

    </dependencies>

这里是有效的。对于我的情况(使用Gradle),我只需添加 flink-clients 依赖: 'implementation("org.apache.flink:flink-clients:1.16.1")' - Luis Camargo
很高兴能帮到你。 - whatsinthename

9

你在下面的代码中使用了<scope>provided</scope>

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

所以移除<scope>provided</scope>就可以了,我在本地测试过了,能够正常工作。


感谢@Thangavel Loganathan的回答,我想我已经尝试过了,但是无济于事,不管怎样我会再试一次。但是当您使用Flink存储库中的mvn创建快速入门时,那是自定义范围。再次感谢。 - Alter
有时候你需要修改它们才能正常工作。你尝试过移除作用域了吗? - prostý člověk
我已经删除了作用域并尝试使用compile,但问题仍然存在。已经再次尝试过,但仍然没有结果,我已经没有更多的想法了。感谢您的帮助。 - Alter

6
在我的情况下,问题得到解决是因为删除了这两个依赖项,因为客户端依赖已经包含了它们。
        /*removed*/
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        /*removed*/
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        /*Keep just this one*/
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

3

flink-clients 不在类路径中时,会出现此错误。请检查实际类路径以确认配置文件是否按预期工作。另外,在 IntelliJ 中,您根本不需要使用配置文件。只需在 Run/Debug 对话框中选中包含 provided 依赖项的选项即可。


2

针对 Flink 版本 1.15.1 的更新,特别是为了使来自 Flink 文档的 Table API 示例正常工作。

  • 需要将 flink-table-planner 添加为提供的依赖项。
  • 该规划器包含一个名为 org.apache.flink.table.planner.delegation.DefaultExecutorFactoryExecutorFactory 实现。

添加这个依赖项就解决了问题:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

非常感谢@adamo,当我到达Flink的那个版本时,我会自己尝试。 - Alter

1
如果使用的是gradle而不是Maven,遇到相同的问题,那么Flink-clients也会遇到同样的问题。你只需要添加以下代码:
compile "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"

如果您正在跟踪依赖项,可以查看Flink-training仓库中第129行的内容:https://github.com/apache/flink-training/blob/master/build.gradle#L129


1
通过阅读Flink演示, 您可以看到问题得到了解决。
在您的pom.xml下添加此配置文件。
<project>

.......

  <!-- This profile helps to make things run out of the box in IntelliJ -->
  <!-- Its adds Flink's core classes to the runtime class path. -->
  <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
  <profiles>
    <profile>
      <id>add-dependencies-for-IDEA</id>

      <activation>
        <property>
          <name>idea.version</name>
        </property>
      </activation>
      <dependencies>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
      </dependencies>
    </profile>
  </profiles>

</project>

非常感谢,但是当您使用计划程序时,您将遇到客户端和flink-streaming-java依赖项的问题,然后,streamer依赖项已经包含在客户端依赖项中,因此您可以摆脱flink-streaming-java依赖项。即使不使用计划程序,flink-streaming-java依赖项也包含在客户端中。 - Alter
是的,@Alter,你说得对。在 Flink 1.11 之后,flink-clients 已经从 flink-streaming-java 依赖中移除了。因此,当我在 IDEA 中开发和调试 Flink 作业时,我需要添加所有这些包。而当部署到线上环境时,所有这些包将被设置为“provided”范围,以消除客户端和服务器版本之间的冲突。 - yanss
1
即使对于IDEA,你真的不需要它们。你只需在IDEA的配置文件中放置客户端依赖项,它就应该可以工作了。 - Alter

1

你好 @Thangavel Loganathan

我会删除依赖中的 <scope>provided</scope>,在执行 mvn clean package 后,我能够看到库已经被打包到 jar 文件中,如下所示:[INFO] Including org.apache.flink:flink-clients_2.11:jar:1.11.0 in the shaded jar.,但是即使这样,在 IntelliJ 的默认设置下,我仍然遇到了这个问题:

java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
        at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
        at com.teavaro.cep.StreamingJob.runCEP(StreamingJob.java:129)
        at com.teavaro.cep.StreamingJob.prepareJob(StreamingJob.java:115)
        at com.teavaro.cep.StreamingJob.main(StreamingJob.java:69)

这是我在pom.xml中配置依赖项的方式

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>quickstart</groupId>
    <artifactId>cep</artifactId>
    <version>1</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <pgbulkinsert.version>3.3</pgbulkinsert.version>
        <kda.version>1.0.1</kda.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
<dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
       <dependency>more dependencies here</dependency>
</dependencies>
<build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>9</source>
                    <target>9</target>
                </configuration>
            </plugin>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>path.to.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>
            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>
            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

如果我浏览创建的jar文件,我可以在META-INF\maven\org.apache.flink下找到依赖项: 在此输入图像描述

这是我的作业图:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, conf);
SingleOutputStreamOperator<Event> streamFiltered = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RMQConsumer())
                .name("Event Mapper")
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(PropertyFileReader.getLatencyAllowed()))withTimestampAssigner((event, timestamp) -> event.timestamp.getTime()))
                .name("Watermarks Added")
                .filter(new NullIdEventsFilterFunction())
                .name("Event Filter");
/*more transformations here*/
env.execute("job name");

亲切的问候!


你能否把你的代码放到gist/github上?这样我就可以克隆它并检查一下。我刚刚复制了你的pom文件,删除了所有的构件并运行了它。对于一个简单的任务来说,它对我来说工作得很好。 - prostý člověk
同时,在“文件”菜单下,使缓存失效/重启您的IntelliJ? - prostý člověk

0

对于 Flink 版本 1.15.1,我们可以使用以下依赖项来解决问题。

<properties>
        <flink.version>1.15.1</flink.version>
</properties>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

0
问题解决了!通过探索 https://github.com/apache/flink/tree/master/flink-clients 中的 flink-clients (pom.xml),我意识到 <artifactId>flink-java</artifactId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId> 已经包含在 flink-clients (pom.xml) 中,所以我从我的 pom.xml 中删除了这些依赖项,然后它在 IntelliJ 和开箱即用的地方都可以工作。
我希望这对于其他遇到同样问题的人也有用!

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接