Flink:DataStream上没有外连接?

3
我很惊讶地发现,在Flink的DataStream中没有外连接(DataStream文档)。
对于DataSet,您有所有选项:除了常规的joinDataSet文档)之外,还有leftOuterJoinrightOuterJoinfullOuterJoin。但是对于DataStream,您只有普通的join。
这是由于DataStream的某些基本属性使得无法进行外连接吗?或者我们可以在(不远的)将来期待这个功能?
我真的需要在DataStream上进行外连接来解决我的问题...是否有任何方法实现类似的行为?
3个回答

1
您可以使用DataStream.coGroup()转换来实现外部连接。一个CoGroupFunction接收两个迭代器(每个输入一个),它们为某个键的所有元素提供服务,并且如果找不到匹配元素,则可能为空。这允许实现外部连接功能。
在未来的Flink版本中,DataStream API可能会添加对外部连接的一流支持。目前我不知道任何这样的努力。但是,在Apache Flink JIRA中创建问题可能会有所帮助。

1
好的,我已经创建了这个问题,如果有任何帮助:[jira](https://issues.apache.org/jira/browse/FLINK-4188)。我会尝试`coGroup` :) - houcros
嗨@Fabian,使用connect实现这个是否可能? - MJeremy
@houcros,你创建了两个工单:https://issues.apache.org/jira/browse/FLINK-4187 和 https://issues.apache.org/jira/browse/FLINK-4188,但由于它们是相互重复的,所以两个工单都已被关闭。 - kev

0
一种方法是通过流 -> 表 -> 流的方式,使用以下API:FLINK TABLE API - OUTER JOIN 这里是一个Java示例:
    DataStream<String> data = env.readTextFile( ... );
    DataStream<String> data2Merge = env.readTextFile( ... );

    ...

    tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
    tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");

    String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
    String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";

    Table tableLeft = tableEnv.sqlQuery(queryLeft);
    Table tableRight = tableEnv.sqlQuery(queryRight);

    Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);

你如何处理retractStream?它将包含多行true/false。 - Marco

0
这是使用Flink Table API在DataStreams上执行FULL OUTER JOIN的完整工作示例。有关Table API下的DataStream API集成的更多详细信息,请参阅Flink官方页面
步骤
1. Convert both dataStreams into tables.
2. Registers tables as views to execute SQL query.
3. Execute full outer join SQL query on registered views.
4. Result of the SQL query will be a table.
5. Convert the result table to dataStream using toDataStream.

/**
 * Two input data streams
 *
 * <p>@<code>
 * 1. "Alice", "Bob", "John"
 * 2. "Mike", "Sam", "Adam", "Alice"
 * <p>
 * The expected full outer join result is
 * (John), (null)
 * (Bob), (null)
 * (Alice), (Alice)
 * (null), (Mike)
 * (null), (Sam)
 * (null), (Adam)
 *  </code>
 */
public class DataStreamFullOuterJoinUsingTable {

  public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment streamEnv =
        StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH);

    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

    // Convert stream into a table and register it as a view object to execute SQL query.
    DataStream<String> nameStream = streamEnv.fromElements("Alice", "Bob", "John");
    Table nameTable = tableEnv.fromDataStream(nameStream).as("name");
    tableEnv.createTemporaryView("nameTable", nameTable);

    // Convert stream into a table and register it as a view object to execute SQL query.
    DataStream<String> detailStream = streamEnv.fromElements("Mike", "Sam", "Adam", "Alice");
    Table detailTable = tableEnv.fromDataStream(detailStream).as("detail");
    tableEnv.createTemporaryView("detailTable", detailTable);

    // Execute SQL full outer join SQL query and the result will be a table
    Table result =
        tableEnv.sqlQuery(
            "SELECT * FROM "
                + "nameTable FULL OUTER JOIN detailTable "
                + "ON nameTable.name = detailTable.detail");

    // Convert the result table to a dataStream and map the Row objects to String using map
    DataStream<String> resultStream =
        tableEnv
            .toDataStream(result)
            .map(
                (MapFunction<Row, String>)
                    row -> "(" + row.getField(0) + "), (" + row.getField(1) + ")");
    // print as a sink
    resultStream.print();
    /*
    (John), (null)
    (Bob), (null)
    (Alice), (Alice)
    (null), (Mike)
    (null), (Sam)
    (null), (Adam)
     */
    streamEnv.execute();
  }
}

除了 flink-streaming-java 之外,您还需要以下 3 个依赖项(提供的范围)来使 DataStream 和 Table 能够一起工作。

1. flink-table-api-java
2. flink-table-api-java-bridge
3. flink-table-planner_2.12

一旦你知道如何将数据流转换为表格,执行一个SQL查询,然后再将其转换回数据流,你就可以解决许多类似的问题。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接