如何在Dart中创建一个StreamTransformer?

26
尝试构建自定义StreamTransformer类,但是很多现有的示例似乎已经过时了,在文档中找到的一个也不像一些类型化语言可能认为的那样作为一个类(在这里找到:https://api.dartlang.org/apidocs/channels/stable/dartdoc-viewer/dart:async.StreamTransformer)。这似乎不像Dart风格的方法,而更像JavaScript风格的方法(我使用Dart来避免这种情况)。
许多在线资源都说这就是创建StreamTransformer的方法,但是在扩展它时会出现错误。
class exampleStreamTransformer extends StreamTransformer
{
  //... (This won't work)
}

'实现'似乎是正确的方式,需要实现所需的绑定函数:

class exampleStreamTransformer implements StreamTransformer
{
  Stream bind(Stream stream)
  {
    //... (Go on to return new stream, etc)
  }
}

我似乎找不到这种方法的任何示例,但我已经自己创建了一些(它可以在我的IDE中通过,但在运行时会出现空对象错误,当尝试使用暂停getter时):

class exampleStreamTransformer implements StreamTransformer
{
  StreamController<String> _controller;
  StreamSubscription<String> _subscription;

  Stream bind(Stream stream)
  {
    _controller = new StreamController<String>(
        onListen: ()
        {
          _subscription = stream.listen((data)
          {
            // Transform the data.
            _controller.add(data);
          },
          onError: _controller.addError,
          onDone: _controller.close,
          cancelOnError: true); // Unsure how I'd pass this in?????
        },
        onPause: _subscription.pause,
        onResume: _subscription.resume,
        onCancel: _subscription.cancel,
        sync: true
    );

    return _controller.stream;
  }
}

希望能以“打字”的方式来实现这个类,非常感谢您的帮助,谢谢。

2
我同意 StreamTransformer 是一个不必要的类 - 它只包含一个函数,因此您可以在传递对象的任何地方直接传递该函数。如果现在将其添加到库中,它只会成为一个函数类型。 - lrn
5个回答

22

为什么不使用StreamTransformer.fromHandler()

import 'dart:async';

void handleData(data, EventSink sink) {
  sink.add(data*2);
}

void main() {
  StreamTransformer doubleTransformer = new StreamTransformer.fromHandlers(handleData: handleData);

  StreamController controller = new StreamController();
  controller.stream.transform(doubleTransformer).listen((data) {
    print('data: $data');
  });

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

输出:

data: 2
data: 4
data: 6

1
因为我需要一个类。内容需要缓冲等,这很复杂。无论如何,谢谢。 - Will Squire
1
然后创建一个全局缓冲变量,缓冲数据并根据需要将数据写入到目标位置? - Robert
3
不,那是本质上的坏主意。如果我想要多次使用这个,怎么办?拥有全局缓冲区意味着两个流可能会向相同的位置缓存,否则每次都必须在函数范围之外创建一个单独的变量。缺乏封装性和代码可维护性。只需调用.transform(new exampleStreamTransformer()); +即可缩短,但我对语言约定感到非常困扰。这是一个旧的(已弃用的)示例:http://victorsavkin.com/post/51233496661/using-streams-for-composing-complex-event-based - Will Squire
“全局变量”本质上是一个不好的想法。 - DarkNeuron

12
好的。这里有一个可行的例子:
import 'dart:async';

class DuplicateTransformer<S, T> implements StreamTransformer<S, T> {
  StreamController _controller;

  StreamSubscription _subscription;

  bool cancelOnError;

  // Original Stream
  Stream<S> _stream;

  DuplicateTransformer({bool sync: false, this.cancelOnError}) {
    _controller = new StreamController<T>(onListen: _onListen, onCancel: _onCancel, onPause: () {
      _subscription.pause();
    }, onResume: () {
      _subscription.resume();
    }, sync: sync);
  }

  DuplicateTransformer.broadcast({bool sync: false, bool this.cancelOnError}) {
    _controller = new StreamController<T>.broadcast(onListen: _onListen, onCancel: _onCancel, sync: sync);
  }

  void _onListen() {
    _subscription = _stream.listen(onData,
      onError: _controller.addError,
      onDone: _controller.close,
      cancelOnError: cancelOnError);
  }

  void _onCancel() {
    _subscription.cancel();
    _subscription = null;
  }

  /**
   * Transformation
   */

  void onData(S data) {
    _controller.add(data);
    _controller.add(data); /* DUPLICATE EXAMPLE!! REMOVE FOR YOUR OWN IMPLEMENTATION!! */
  }

  /**
   * Bind
   */

  Stream<T> bind(Stream<S> stream) {
    this._stream = stream;
    return _controller.stream;
  }
}

void main() {
  // Create StreamController
  StreamController controller = new StreamController.broadcast();
  // Transform
  Stream s = controller.stream.transform(new DuplicateTransformer.broadcast());

  s.listen((data) {
    print('data: $data');
  }).cancel();

  s.listen((data) {
    print('data2: $data');
  }).cancel();

  s.listen((data) {
    print('data3: $data');
  });

  // Simulate data

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

让我添加一些注释:

  • 当查看其他Dart内部转换器的源代码时,使用implements似乎是正确的方法。
  • 我为常规流和广播流实现了两个版本。
  • 对于常规流,您可以直接在新的流控制器上调用cancel/pause/resumt,因为我们只能监听一次。
  • 如果您使用广播流,则发现只有在没有人监听该流时才会调用listen()。 onCancel的行为相同。如果最后一个订阅者取消其订阅,则会调用onCancel。这就是为什么在此处使用相同函数是安全的原因。

是的,这看起来是一个更好的实现!谢谢,我稍后会试一下。 - Will Squire
感谢您的实现,这非常有帮助 :) - Daniel Leiszen
我意识到这已经是古老的问题了,也许我很蠢,但是关于cancelOnError是否存在抽象问题不是吗?它似乎需要提供实现,但在创建转换器时可能无法访问该变量:它可能会在更下游的地方提供,在那里调用listen的责任就在那里。 - DarkNeuron

10
map不同,transformer更加强大,可以让你维护内部状态,并在任何时候发出值。它可以实现map无法完成的功能,例如延迟、复制值、有选择地省略某些值等。
基本上,这个实现需要一个提供基于传入的旧流的新流的bind方法,以及一个在运行时帮助进行类型检查的cast方法。
下面是一个过度简化的示例,演示如何实现将整数值流转换为求和流的“TallyTransformer”。例如,如果输入流到目前为止已经有了1、1、1、-2、0、...,输出流将是1、2、3、1、1、...,即对到目前为止的所有输入求和。
示例用法:stream.transform(TallyTransformer())
class TallyTransformer implements StreamTransformer {
  StreamController _controller = StreamController();
  int _sum = 0; // sum of all values so far

  @override
  Stream bind(Stream stream) {
    // start listening on input stream
    stream.listen((value) {
      _sum += value; // add the new value to sum
      _controller.add(_sum); // emit current sum to our listener
    });
    // return an output stream for our listener
    return _controller.stream;
  }

  @override
  StreamTransformer<RS, RT> cast<RS, RT>() {
    return StreamTransformer.castFrom(this);
  }
}

这个例子过于简化(但仍然有效),没有涵盖流暂停、恢复或取消等情况。如果遇到 "Stream has already been listened" 错误,请确保流正在广播。


7

https://github.com/dart-lang/sdk/issues/27740#issuecomment-258073139

You can use StreamTransformer.fromHandlers to easily create transformers that just convert input events to output events.

Example:

new StreamTransformer.fromHandlers(handleData: (String event, EventSink output) {
  if (event.startsWith('data:')) {
    output.add(JSON.decode(event.substring('data:'.length)));
  } else if (event.isNotEmpty) {
    output.addError('Unexpected data from CloudBit stream: "$event"');
  }
});

1
如果您想使用类似于这样的函数简单地转换值
int handleData(int data) {
  return data * 2;
}

使用 Stream 的 map 方法

stream
  .map(handleData)
  .listen((data) {
    print('data: $data');
  });

完整的例子:
import 'dart:async';

int handleData(int data) {
  return data * 2;
}

void main() {
  final controller = StreamController<int>();

  controller.stream
    .map(handleData)
    .listen((data) {
      print('data: $data');
    });

  controller.add(1);
  controller.add(2);
  controller.add(3);
}

在 dart.dev 上查看 更多示例


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接