有没有一种有效的方法可以连接多个(超过2个)Kafka主题的外部连接?

4

我希望能够使用流处理API,通过关键字外连接(outer join)几个Kafka主题(通常为2-10个)。所有主题都具有相同的关键字和分区。实现这种联接的一种方法是为每个主题创建一个KStream并链接调用KStream.outerJoin

stream1
    .outerJoin(stream2, ...)
    .outerJoin(stream3, ...)
    .outerJoin(stream4, ...)

然而,KStream.outerJoin文档表明,每次调用 outerJoin 都会使两个输入流具体化,所以上述示例不仅具体化了流1到4,还具体化了stream1.outerJoin(stream2, ...)stream1.outerJoin(stream2, ...).outerJoin(stream3, ...)。相比直接连接这4个流,这将导致大量不必要的序列化、反序列化和I/O操作。
上述方法的另一个问题是:JoinWindow 在所有4个输入流中并不一致:一个 JoinWindow 用于连接流1和2,但随后需要使用一个单独的连接窗口来连接此流和流3等等。例如,我为每个连接指定了10秒的连接窗口,并且某些特定键的条目出现在流1中的时间为0秒,在流2中的时间为6秒,在流3中的时间为12秒,在流4中的时间为18秒,则连接的条目将在18秒后输出,导致延迟过高。结果取决于连接的顺序,这似乎不自然。
有更好的使用 Kafka 进行多路连接的方法吗?
3个回答

1

0

如果您合并所有流,您将获得所需的结果。 请查看this教程以了解如何执行此操作。

使用合并函数组合输入流,该函数创建一个新流,表示其输入的所有事件。


1
如果所有流具有相同的值类型,则此方法有效 - 但这是外部连接的特定情况。在更一般的情况下,外部连接可能具有不同的值类型。 - Kkkev

0
最终,我决定创建一个自定义的轻量级连接器,避免物化并严格遵守过期时间。平均情况下应该是O(1)。它与Consumer API更匹配,而不是Stream API:对于每个消费者,重复轮询并使用任何接收到的数据更新连接器;如果连接器返回完整的属性集,则将其转发。以下是代码:
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;

/**
 * Inner joins multiple streams of data by key into one stream. It is assumed
 * that a key will appear in a stream exactly once. The values associated with
 * each key are collected and if all values are received within a certain
 * maximum wait time, the joiner returns all values corresponding to that key.
 * If not all values are received in time, the joiner never returns any values
 * corresponding to that key.
 * <p>
 * This class is not thread safe: all calls to
 * {@link #update(Object, Object, long)} must be synchronized.
 * @param <K> The type of key.
 * @param <V> The type of value.
 */
class StreamInnerJoiner<K, V> {

    private final Map<K, Vals<V>> idToVals = new LinkedHashMap<>();
    private final int joinCount;
    private final long maxWait;

    /**
     * Creates a stream inner joiner.
     * @param joinCount The number of streams being joined.
     * @param maxWait The maximum amount of time after an item has been seen in
     * one stream to wait for it to be seen in the remaining streams.
     */
    StreamInnerJoiner(final int joinCount, final long maxWait) {
        this.joinCount = joinCount;
        this.maxWait = maxWait;
    }

    private static class Vals<A> {
        final long firstSeen;
        final Collection<A> vals = new ArrayList<>();
        private Vals(final long firstSeen) {
            this.firstSeen = firstSeen;
        }
    }

    /**
     * Updates this joiner with a value corresponding to a key.
     * @param key The key.
     * @param val The value.
     * @param now The current time.
     * @return If all values for the specified key have been received, the
     * complete collection of values for thaht key; otherwise
     * {@link Optional#empty()}.
     */
    Optional<Collection<V>> update(final K key, final V val, final long now) {
        expireOld(now - maxWait);
        final Vals<V> curVals = getOrCreate(key, now);
        curVals.vals.add(val);
        return expireAndGetIffFull(key, curVals);
    }

    private Vals<V> getOrCreate(final K key, final long now) {
        final Vals<V> existingVals = idToVals.get(key);
        if (existingVals != null)
            return existingVals;
        else {
            /*
            Note: we assume that the item with the specified ID has not already
            been seen and timed out, and therefore that its first seen time is
            now. If the item has in fact already timed out, it is doomed and
            will time out again with no ill effect.
             */
            final Vals<V> curVals = new Vals<>(now);
            idToVals.put(key, curVals);
            return curVals;
        }
    }

    private void expireOld(final long expireBefore) {
        final Iterator<Vals<V>> i = idToVals.values().iterator();
        while (i.hasNext() && i.next().firstSeen < expireBefore)
            i.remove();
    }

    private Optional<Collection<V>> expireAndGetIffFull(final K key, final Vals<V> vals) {
        if (vals.vals.size() == joinCount) {
            // as all expired entries were already removed, this entry is valid
            idToVals.remove(key);
            return Optional.of(vals.vals);
        } else
            return Optional.empty();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接