Java Streams API 的 Javascript 等价物

18

我喜欢Java 8的流API,它有许多有用的中间方法和终端方法可以转换和收集流。我说的是像distinct()这样的中间方法或者像collect()这样的终端方法。我发现Collector API特别有用,可以将流减少到深度分组映射。

JavaScript中有没有类似于Java流API的等效物? 我知道有基本的函数,如mapfilterreduce,但没有找到JavaScript本机提供的更通用的接口来查询或分组集合中的数据。是否有一些生产就绪的库来匹配Java流API?


4
在Stack Overflow上,询问我们推荐或寻找书籍、工具、软件库、教程或其他外部资源的问题是不符合主题的,因为它们往往会吸引持有个人意见或垃圾信息的回答。 - T.J. Crowder
这是关于两年前的 stream.js 的一些信息:https://www.reddit.com/r/programming/comments/2tp90e/streamjs_java_8_streams_api_ported_to_javascript/ - mplungjan
2
你可以根据需求使用 RxJS - Olivier Boissé
1
在JS中没有Java流API的等效物。 - ZhekaKozlov
Lodash将涵盖大多数情况:https://lodash.com/docs/ - dashton
3个回答

15

5

从API层面来看,lodash/RxJS/stream.js可能能够满足要求,但是Java Stream的强大之处在于它可以利用现代CPU多核架构来并行处理任务。然而,这些纯JavaScript库都没有解决这个问题,最终这些JavaScript代码仍在单线程运行时中运行,并且同时只使用1个核。

我猜测JavaScript引擎需要提供支持才能实现性能目标。


1
JVM具有运行时优化功能,可以在运行时重写或重构代码。有时这意味着多个函数被挤在一起,因为它们总是一起执行。这使得Java代码独立于平台。- 这一直是Java的关键思想和设计原则。- 然而,现在情况变得棘手了,因为开发人员实际上必须考虑代码将在哪个平台上运行。他必须知道系统将有多少CPU核心。这很重要,因为如果在单核系统上使用“parallel”,实际上会降低性能。 - bvdb
此外,在Java中,线程并不像看起来那样经常并行运行。它们经常被阻塞(网络和磁盘I/O或锁定)。在Node.js中,几乎不可能编写会阻塞的代码。每当有磁盘I/O时,您必须创建一个新任务来处理结果。这意味着任务永远不会在中途被打断。没有等待线程,只有预定的任务,这些任务成本更低。- 无论如何,在此期间,Java(例如Spring WebFlux)也支持此概念。但是,它从未像Node.js那样强制执行它,并且因为它不是语言特性,所以需要更多的代码。 - bvdb

3

JavaScript没有并行处理,因此流始终是顺序的,收集器不需要组合器。

我在这里试图模仿JavaScript中的Stream API,但去掉了几个功能。尽管如此,我认为它具有关键特性。

由于你专注于Collectors,所以我添加了一个Collector类,它具有构造函数和静态方法,大致对应于Java的Collectors(复数)接口。 Stream类具有一个构造函数,它接受一个可迭代对象。它具有与Java中大多数静态方法相同的方法,但在涉及reduceiterator时有一些变化,以使其更符合JavaScript的实践。

还包括一个Stream.Builder类。

最后,此代码片段运行了几个示例。如果您熟悉Java API,则会觉得非常熟悉:

class Collector {
    constructor(supplier, accumulator, finisher=a => a) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.finisher = finisher;
    }
    static toArray() {
        return self._toArray ??= new Collector(Array, (acc, item) => (acc.push(item), acc));
    }
    static toSet() {
        return self._toSet ??= new Collector(() => new Set, (acc, item) => acc.add(item));
    }
    static toMap(keyMapper, valueMapper) {
        return new Collector(() => new Map, (acc, item) => acc.add(keyMapper(item), valueMapper(item)));
    }
    static toObject(keyMapper, valueMapper) {
        return new Collector(Object, (acc, item) => acc[keyMapper(item)] = valueMapper(item));
    }
    static averaging(mapper=a => a) {
        return new Collector(
            () => [0, 0], 
            ([sum, count], value) => [sum + mapper.call(value, value), count + 1],
            ([sum, count]) => sum / count
        );
    }
    static collectingAndThen({supplier, accumulator, finisher}, nextFinisher) {
        return new Collector(
            supplier, 
            accumulator,
            value => (prev => nextFinisher.call(prev, prev))(finisher.call(value, value))
        );
    }
    static counting() {
        return this._counting ??= new Collector(Number, (count, value) => count + 1);
    }
    static summing(mapper=Number) {
        return new Collector(Number, (sum, value) => sum + mapper.call(value, value));
    }
    static joining(delimiter=",", prefix="", postfix="") {
        return this.collectingAndThen(Collector.toArray(), arr => prefix + arr.join(delimiter) + postfix);
    }
    // No implementation of partitioningBy, as it is the same as groupingBy with a boolean classifier
    static groupingBy(classifier, {supplier, accumulator, finisher} = Collector.toArray()) {
        return new Collector(
            () => new Map,
            (map, value) => {
                const key = classifier.call(value, value);
                let result = map.get(key) ?? supplier();
                return map.set(key, accumulator(result, value));
            },
            map => {
                map.forEach((value, key) => map.set(key, finisher.call(value, value)));
                return map;
            }
        );
    }
    static mapping(mapper, {supplier, accumulator, finisher}) {
        return new Collector(
            supplier,
            (acc, value) => accumulator(acc, mapper.call(value, value)),
            finisher
        );
    }
    static maxBy(comparator) {
        return new Collector(
            () => undefined,
            (acc, value) => acc === undefined || comparator(acc, value) < 0 ? value : acc
        );
    }
    static minBy(comparator) {
        return new Collector(
            () => undefined,
            (acc, value) => acc === undefined || comparator(acc, value) > 0 ? value : acc
        );
    }
    static reducing(binaryOperator, identity, mapper=a => a) {
        return new Collector(
            () => identity,
            (acc, value) => acc === undefined ? mapper.call(value, value) : binaryOperator(acc, mapper.call(value, value))
        );
    }
}

class Stream {
    static of(...args) {
        return new Stream(args);
    }
    static fromGenerator(generator, ...args) {
        return new Stream(generator.call(null, ...args));
    }
    static empty() {
        return this.of();
    }
    static Builder = class Builder {
        _items = [];
        // Chainable
        add(item) {
            this.accept(item);
            return this;
        }
        // Other
        accept(item) {
            if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
            this._items.push(item);
        }
        build() {
            if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
            let {_items} = this;
            this._items = null;
            return new Stream(_items);
        }
    }
    static builder() {
        return new this.Builder();
    }
    static iterate(value, produceNextFromLast) {
        return this.fromGenerator(function* () {
            yield value;
            while (true) yield value = produceNextFromLast.call(value, value);
        });
    }
    static generate(produceNext) {
        return this.fromGenerator(function* () {
            while (true) yield produceNext();
        });
    }
    static concat(a, b) {
        return this.fromGenerator(function* () {
            yield* a;
            yield* b;
        });
    }
    static range(start, end) {
        return this.fromGenerator(function* () {
            while (start < end) yield start++;
        });
    }
    static rangeClosed(start, last) {
        return this.range(start, last + 1);
    }

    constructor(iterable) {
        this._iterable = iterable;
    }
    // Intermediate (Chainable, pipe) methods
    _chain(generator) { // Private helper method
        return Stream.fromGenerator(generator, this);
    }
    filter(predicate) {
        return this._chain(function* (previous) {
            for (const item of previous) {
                if (predicate.call(item, item)) yield item;
            }
        });
    }
    distinct() {
        const set = new Set;
        return this.filter(item => !set.has(item) && set.add(item));
    }
    map(mapper) {
        return this._chain(function* (previous) {
            for (const item of previous) yield mapper.call(item, item);
        });
    }
    flatMap(mapper) {
        return this._chain(function* (previous) {
            for (const item of previous) yield* mapper.call(item, item);
        });
    }
    peek(action) { // Only for debugging
        return this._chain(function* (previous) {
            for (const item of previous) {
                action.call(item, item);
                yield item;
            }
        });
    }
    sorted(comparator=(a, b) => (a > b) - (a < b)) {
        return this._chain(function* (previous) {
            yield* [...previous].sort(comparator);
        });
    }
    dropWhile(predicate) {
        let active = false;
        return this.filter(item => active ||= !predicate.call(item, item));
    }
    skip(n) {
        return this.dropWhile(() => n > 0 && n--);
    }
    takeWhile(predicate) {
        return this._chain(function* (previous) {
            for (const item of previous) {
                if (!predicate.call(item, item)) break;
                yield item;
            }
        });
    }
    limit(maxSize) {
        return this.takeWhile(() => maxSize > 0 && maxSize--);
    }
    // Terminal operations below: these do not return a Stream
    *[Symbol.iterator]() {  // Use JS symbol convention instead of "iterator" method
        const iterable = this._iterable;
        this.close();
        yield* iterable;
    }
    close() {
        if (!this._iterable) throw TypeError("stream has already been operated on or closed");
        this._iterable = null;
    }
    forEach(callback) {
        for (const item of this) callback.call(item, item);
    }
    toArray() {
        return [...this];
    }
    findFirst() {
        for (const item of this) return item;
    }
    allMatch(predicate) {
        for (const item of this) {
            if (!predicate.call(item, item)) return false;
        }
        return true;
    }
    anyMatch(predicate) {
        for (const item of this) {
            if (predicate.call(item, item)) return true;
        }
        return false;
    }
    noneMatch(predicate) {
        return !this.anyMatch(predicate);
    }
    collect(supplier, accumulator, finisher=a => a) {
        // Can be called with Collector instance as first argument
        if (arguments.length === 1) {
            ({supplier, accumulator, finisher} = supplier);
        }
        const reduced = this.reduce(accumulator, supplier());
        return finisher.call(reduced, reduced);
    }
    reduce(accumulator, initialValue) {  // interface like Array#reduce
        let done, result = initialValue;
        const iterator = this[Symbol.iterator]();
        if (arguments.length == 1) {
            ({done, value: result} = iterator.next());
            if (done) throw new TypeError("reduce of empty stream without initial value");
        }
        for (const item of iterator) {
            result = accumulator(result, item);
        }
        return result;
    }
    count() {
        return this.reduce(count => count + 1, 0);
    }
    max(comparator=(a, b) => (a > b) - (a < b)) {
        return this.reduce((a, b) => comparator(a, b) < 0 ? b : a);
    }
    min(comparator=(a, b) => (a > b) - (a < b)) {
        return this.reduce((a, b) => comparator(a, b) < 0 ? a : b);
    }
    sum() { // Will sum numbers or concatenate strings
        return this.reduce((a, b) => a + b, 0);
    }
    average() {
        return this.reduce(([count, sum], b) => [count + 1, sum + b], [0, 0])
                   .reduce((count, sum) => sum / count);
    }
}

// Some example uses....

const first = Stream.iterate(1, i => i + 1)
                 .flatMap(i => Stream.iterate(i, j => j + 100).limit(2))
                 .limit(4);

const second  = Stream.builder().add(9).add(8).add(7).build().peek(console.log);

console.log("concat", Stream.concat(first, second).toArray());
console.log("average", Stream.range(1, 10).average());
console.log("sum", Stream.range(1, 10).sum());
console.log("random", Stream.generate(Math.random).limit(10).toArray());
console.log("skip & limit", Stream.range(1, 10).skip(4).limit(4).toArray());
console.log("first", Stream.range(1, 10).findFirst());
console.log("min, max", Stream.of(..."fabulous").min(), Stream.of(..."fabulous").max());
console.log("count", Stream.range(1, 10).count());
console.log("distinct and sorted", Stream.of(1, 5, 1, 2, 4, 2).distinct().sorted().toArray());

class Person {
    constructor(name, department, salary) {
        this.name = name;
        this.department = department;
        this.salary = salary;
    }
    getName() { return this.name }
    getDepartment() { return this.department }
    getSalary() { return this.salary }
    toString() { return `Hi ${this.name}!` }
}

let people = [
    new Person("John", "reception", 1000), 
    new Person("Mary", "stocks", 1500),
    new Person("Bob", "stocks", 1400),
];
console.log(Stream.of(...people)
                  .map(Person.prototype.getName)
                  .collect(Collector.toArray()));
console.log(Stream.of(...people)
                  .map(Person.prototype.getName)
                  .collect(Collector.toSet()));
console.log(Stream.of(...people)
                  .collect(Collector.joining(", ")));
console.log(Stream.of(...people)
                  .collect(Collector.summing(Person.prototype.getSalary)));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(Person.prototype.getDepartment)));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(Person.prototype.getDepartment,
                                                   Collector.averaging(Person.prototype.getSalary))));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(person => person.getSalary() >= 1300)));

// Fibonnacci
console.log(Stream.iterate([0, 1], ([a, b]) => [b, a+b]) 
                  .map(([a]) => a)
                  .takeWhile(a => a < 30)
                  .dropWhile(a => a < 2)
                  .toArray());

// Accumulate object keys
console.log(Stream.range(0, 10).collect(Object, (acc, a) => Object.assign(acc, {[a]: 1}))); 

// Build complete binary tree in breadth-first order
console.log(Stream.iterate(0, i => i + 1)
    .limit(10)
    .collect(
        () => (root => [[root], root])({ left: "x" }), 
        (acc, value) => {
            let [level] = acc;
            level.push(level[0].left ? (level.shift().right = {value}) : (level[0].left = {value}))
            return acc;
        },
        acc => acc.pop().right
    )
);


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接