如何交错流(带有反压)

11

假设我有两个可能是无限的流:

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...

我希望将流合并,然后使用稍微慢一些的异步操作来映射已合并的流(例如,在Bacon中使用fromPromiseflatMapConcat)。

我可以使用merge将它们组合在一起:

me = a12b3.c45d6.7e...

接着进行映射

s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..

正如您所看到的,贪心s2流在长期内得到了优势。 这种行为是不希望的


合并行为是不可取的,因为我想要一些反压机制来实现更多交替、"公平"、"轮询"的合并。以下是一些期望的行为示例:

s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...

s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...

一种思考方式是,s1s2将任务发送给只能处理一项任务的工作线程。使用 mergeflatMapConcat,我将得到一个贪婪的任务管理器,但我想要更加公平的解决方案。


我希望找到一个简单而优雅的解决方案。如果可以轻松地为任意数量的流进行推广,那就太好了。

// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);

使用RxJS或其他Rx库的解决方案也可以。


澄清

不是zipAsArray

我不想要:

function roundRobinPromiseMap(streams, f) {
  return Bacon.zipAsArray.apply(null, streams)
    .flatMap(Bacon.fromArray)
    .flatMapConcat(function (x) {
      return Bacon.fromPromise(f(x));
    });
}

比较示例弹珠图:

s1  = a.....b..............c.......
s2  = ..1.2.3......................
mm  = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based

是的,我会遇到缓冲问题

...但使用简单的不公平的方式也会遇到同样的问题:

function greedyPromiseMap(streams, f) {
  Bacon.mergeAll(streams).flatMapConcat(function (x) {
    return Bacon.fromPromise(f(x));
  });
}

大理石图

s1    = a.........b..........c...
s2    = ..1.2.3..................
mm    = a...1...2...b...3....c...
merge = a...1...2...3...b....c...

4
据我记得,RxJS有像bufferWithTimeOrCount这样的东西。Bacon建议在您的情况下,从推式(push)转向拉式(pull),使用可迭代对象(iterable)而不是流(streams) - 相关的Bacon问题引用了Eric Meijer的话,他说你不应该使用可观察对象(observables)来处理快速产生数据源和慢速消费者的情况 - 我倾向于同意这个看法。我可以编写一个生成合并流的答案,但老实说,我不太理解您的交错逻辑 - 如果您能澄清一下,我可以概括一下要点。 - Benjamin Gruenbaum
1
我也不明白您所需的交错方式。在第一个示例中,我假设您在memm之间使用concatMap。在这种情况下,是的,s2会发挥作用。但是,您可以使用flatMap来允许并发。我认为在Bacon中,这是flatMapWithConcurrencyLimit()。另一种解决方案是分别在s1flatMap承诺,并在s2flatMap承诺,而不合并,因为合并在任何情况下都具有"或"语义。取决于您想要实现什么。 - André Staltz
第一个例子是一种不希望的行为。我在问题中已经明确说明了。 - phadej
我一开始就理解了。那么我的上面的评论不是解决了这个问题吗? - André Staltz
@OlegGrenrus 我仍然不理解你的公平标准 - 你想要一个合并,允许从s1最多Y个事件,对于s2的每X个事件,反之亦然? - Benjamin Gruenbaum
2个回答

2

这是一段疯狂的代码,可能会有所帮助。

它将输入流转换为单个“值”事件流,然后与“发送”事件(和“结束”事件用于记录)合并。然后,使用状态机,它从“值”事件中构建队列,并在“发送”事件上分派值。

最初我写了一个roundRobinThrottle,但现在我已经将其移动到了gist中。

这里有一个非常相似的roundRobinPromiseMap。虽然gist中的代码经过测试,但此代码尚未经过测试。

# roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
roundRobinPromiseMap = (promiser, streams) ->
    # A bus to trigger new sends based on promise fulfillment
    promiseFulfilled = new Bacon.Bus()

    # Merge the input streams into a single, keyed stream
    theStream = Bacon.mergeAll(streams.map((s, idx) ->
        s.map((val) -> {
            type: 'value'
            index: idx
            value: val
        })
    ))
    # Merge in 'end' events
    .merge(Bacon.mergeAll(streams.map((s) ->
        s.mapEnd(-> {
            type: 'end'
        })
    )))
    # Merge in 'send' events that fire when the promise is fulfilled.
    .merge(promiseFulfilled.map({ type: 'send' }))
    # Feed into a state machine that keeps queues and only creates
    # output events on 'send' input events.
    .withStateMachine(
        {
            queues: streams.map(-> [])
            toPush: 0
            ended: 0
        }
        handleState

    )
    # Feed this output to the promiser
    theStream.onValue((value) ->
        Bacon.fromPromise(promiser(value)).onValue(->
            promiseFulfilled.push()
    ))

handleState = (state, baconEvent) ->
    outEvents = []

    if baconEvent.hasValue()
        # Handle a round robin event of 'value', 'send', or 'end'
        outEvents = handleRoundRobinEvent(state, baconEvent.value())
    else
        outEvents = [baconEvent]

    [state, outEvents]

handleRoundRobinEvent = (state, rrEvent) ->
    outEvents = []

    # 'value' : push onto queue
    if rrEvent.type == 'value'
        state.queues[rrEvent.index].push(rrEvent.value)
    # 'send' : send the next value by round-robin selection
    else if rrEvent.type == 'send'
        # Here's a sentinel for empty queues
        noValue = {}
        nextValue = noValue
        triedQueues = 0

        while nextValue == noValue && triedQueues < state.queues.length
            if state.queues[state.toPush].length > 0
                nextValue = state.queues[state.toPush].shift()
            state.toPush = (state.toPush + 1) % state.queues.length
            triedQueues++
        if nextValue != noValue
            outEvents.push(new Bacon.Next(nextValue))
    # 'end': Keep track of ended streams
    else if rrEvent.type == 'end'
        state.ended++

    # End the round-robin stream if all inputs have ended
    if roundRobinEnded(state)
        outEvents.push(new Bacon.End())

    outEvents

roundRobinEnded = (state) ->
    emptyQueues = allEmpty(state.queues)
    emptyQueues && state.ended == state.queues.length

allEmpty = (arrays) ->
    for a in arrays
        return false if a.length > 0
    return true

2
这里的核心挑战是要理解如何形式化“公平性”。在问题中,我已经提到了工人类比。结果证明,显然的公平标准是选择生成事件比其他流少的流,或者更进一步:等待时间更短的流。
之后,使用指示语义来形式化所需的输出相当简单: 代码在GitHub上 我没有时间开发指示组合器以包括Bacon.js中的withStateMachine,因此下一步是直接使用JavaScript在Bacon.js中重新实现它。整个可运行解决方案可以在gist上找到。
这个想法是用:
  • 每个流的成本和队列作为状态
  • 流和附加反馈流作为输入
来制作一个状态机。
由于整个系统的输出被反馈,所以我们可以在前一个flatMapped流结束时出队下一个事件。
为此,我必须编写一个有点丑陋的rec组合器。
function rec(f) {
  var bus = new Bacon.Bus();
  var result = f(bus);
  bus.plug(result);
  return result;
}

它的类型是(EventStream a -> EventStream a) -> EventStream a,该类型类似于其他递归组合器,例如fix

可以通过改进系统范围行为来实现更好的效果,因为Bus会破坏取消订阅传播。我们需要解决这个问题。

第二个辅助函数是stateMachine,它接受流数组并将它们转换为单个状态机。本质上,它是.withStateMachine ∘ mergeAll ∘ zipWithIndex

function stateMachine(inputs, initState, f) {
  var mapped = inputs.map(function (input, i) {
    return input.map(function (x) {
      return [i, x];
    })
  });
  return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
    if (p.hasValue()) {
      p = p.value();
      return f(state, p[0], p[1]);
    } else {
      return [state, p];
    }
  });
}

使用这两个帮助程序,我们可以编写一个不太复杂的公平调度器:
function fairScheduler(streams, fn) {
  var streamsCount = streams.length;
  return rec(function (res) {
    return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
      // console.log("FAIR: " + JSON.stringify(state), i, x);

      // END event
      if (i == streamsCount && x.end) {
        var additionalCost = new Date().getTime() - x.started;

        // add cost to input stream cost center
        var updatedState = _.extend({}, state, {
          costs: updateArray(
            state.costs,
            x.idx, function (cost) { return cost + additionalCost; }),
        });

        if (state.queues.every(function (q) { return q.length === 0; })) {
          // if queues are empty, set running: false and don't emit any events
          return [_.extend({}, updatedState, { running: false }), []];
        } else {
          // otherwise pick a stream with
          // - non-empty queue
          // - minimal cost
          var minQueueIdx = _.chain(state.queues)
            .map(function (q, i) {
              return [q, i];
            })
            .filter(function (p) {
              return p[0].length !== 0;
            })
            .sortBy(function (p) {
              return state.costs[p[1]];
            })
            .value()[0][1];

          // emit an event from that stream
          return [
            _.extend({}, updatedState, {
              queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
              running: true,
            }),
            [new Bacon.Next({
              value: state.queues[minQueueIdx][0],
              idx: minQueueIdx,
            })],
          ];
        }
      } else if (i < streamsCount) {
        // event from input stream
        if (state.running) {
          // if worker is running, just enquee the event
          return [
            _.extend({}, state, {
              queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
            }),
            [],
          ];
        } else {
          // if worker isn't running, start it right away
          return [
            _.extend({}, state, {
              running: true,
            }),
            [new Bacon.Next({ value: x, idx: i})],
          ]
        }
      } else {
        return [state, []];
      }

    })
    .flatMapConcat(function (x) {
      // map passed thru events,
      // and append special "end" event
      return fn(x).concat(Bacon.once({
        end: true,
        idx: x.idx,
        started: new Date().getTime(),
      }));
    });
  })
  .filter(function (x) {
    // filter out END events
    return !x.end;
  })
  .map(".value"); // and return only value field
}

代码片段中其余部分相当直接。


你是用Haskell编写并转换成JS的吗?要理解所有这些需要什么,抱歉我是新手,正在尝试学习FP、Haskell和单子等。 - user3995789
我使用Haskell作为思考工具。我按类型思考,而Haskell帮助我正确地理解它们。事实上,在制作JavaScript版本时,我不得不经常使用console.log来确保不会弄乱数据。 - phadej

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接