在Golang中实现递归函数生成器(yield)的惯用方式

48
[ 注意:我已阅读 Go语言中的Python风格生成器,本文内容与其不重复。]
在 Python/Ruby/JavaScript/ECMAScript 6 中,可以使用语言提供的 yield 关键字编写生成器函数。在 Go 中,可以使用 goroutine 和 channel 模拟。
以下代码展示了如何实现一个排列函数(abcd、abdc、acbd、acdb、...、dcba):
// $src/lib/lib.go

package lib

// private, starts with lowercase "p"
func permutateWithChannel(channel chan<- []string, strings, prefix []string) {
    length := len(strings)
    if length == 0 {
        // Base case
        channel <- prefix
        return
    }
    // Recursive case
    newStrings := make([]string, 0, length-1)
    for i, s := range strings {
        // Remove strings[i] and assign the result to newStringI
        // Append strings[i] to newPrefixI
        // Call the recursive case
        newStringsI := append(newStrings, strings[:i]...)
        newStringsI = append(newStringsI, strings[i+1:]...)
        newPrefixI := append(prefix, s)
        permutateWithChannel(channel, newStringsI, newPrefixI)
    }
}

// public, starts with uppercase "P"
func PermutateWithChannel(strings []string) chan []string {
    channel := make(chan []string)
    prefix := make([]string, 0, len(strings))
    go func() {
        permutateWithChannel(channel, strings, prefix)
        close(channel)
    }()
    return channel
}

这是如何使用的:

// $src/main.go

package main

import (
    "./lib"
    "fmt"
)

var (
    fruits  = []string{"apple", "banana", "cherry", "durian"}
    banned = "durian"
)

func main() {
    channel := lib.PermutateWithChannel(fruits)
    for myFruits := range channel {
        fmt.Println(myFruits)
        if myFruits[0] == banned {
            close(channel)
            //break
        }
    }
}

注意:

break 语句(如上所示)并不需要,因为 close(channel) 导致 range 在下一次迭代时返回 false,循环将终止。

问题

如果调用者不需要所有排列,它需要显式地 close() 通道,否则通道将在程序终止之前不会关闭(资源泄漏发生)。另一方面,如果调用者需要所有排列(即 range 循环到末尾),调用者必须不要 close() 通道。这是因为 close() 已经关闭的通道会导致运行时恐慌(请参阅规范中此处)。然而,如果确定是否应该停止的逻辑不像上面显示的那样简单,则最好使用 defer close(channel)

  1. 如何惯用地实现像这样的生成器?
  2. 惯用地,谁应该负责关闭通道 - 库函数还是调用者?
  3. 将我的代码修改为下面这样是否是一个好主意,以便调用者无论如何都要 defer close() 通道?

在库中,修改这个:

    go func() {
        permutateWithChannel(channel, strings, prefix)
        close(channel)
    }()

转换为:

    go permutateWithChannel(channel, strings, prefix)

在调用者中,修改这个:
func main() {
    channel := lib.PermutateWithChannel(fruits)
    for myFruits := range channel {
        fmt.Println(myFruits)
        if myFruits[0] == banned {
            close(channel)
        }
    }
}

转换为:

func main() {
    channel := lib.PermutateWithChannel(fruits)
    defer close(channel)    // <- Added
    for myFruits := range channel {
        fmt.Println(myFruits)
        if myFruits[0] == banned {
            break           // <- Changed
        }
    }
}

4. 尽管在执行上述代码时无法观察到,且算法的正确性不受影响,在调用者使用close()关闭通道后,运行库代码的goroutine应该在下一次迭代中尝试向已关闭的通道发送数据时引发panic(正如规范文档中所述),从而终止它。这会造成任何负面影响吗?
5. 库函数的签名是func(strings []string) chan []string。理想情况下,返回类型应为<-chan []string以将其限制为只读取。然而,如果是调用者负责close()通道,那么它就不能被标记为“只读取”,因为close()内置函数不适用于只读取通道。有什么惯用的处理方法?

1
我不确定通道和 goroutine 在这个目的上真正的习惯用语是什么;我发现尤其是那些刚接触 Go 的人往往对它们过于着迷,并在它们并不是特别有用的地方使用它们。对于生成器,我通常使用简单、无麻烦的闭包。 - LemurFromTheId
2
@Aedolon 注意,简单的闭包和生成器的工作方式不同。生成器只执行工作并每次调用时产生一个以上的值,同时保持状态。我不知道有什么方法可以获得一个闭包(不使用通道和goroutine)来阻塞直到再次调用。 - Adam Smith
1
我同意@AdamSmith的观点。此外,生成器的一个重要用例是使用阻塞I/O进行异步处理。对于这种情况,闭包是不够的;你需要一个goroutine。所以我非常喜欢你的模式。当调用者愿意运行到完成时,你的模式甚至更好,因为调用者可以简单地迭代通道。 - cdunn2001
请注意,通道不需要关闭即可清理并避免泄漏。因此,您可以生成所有值并将它们存储在缓冲通道中。当没有更多引用时,它将被清理。 (注意:您还可以在插入所有值后关闭通道,这将允许读取器端使用范围) - Corey Scott
@CoreyScott Goroutines会泄漏。 - Siu Ching Pong -Asuka Kenji-
@SiuChingPong-AsukaKenji- 我们绝对应该花费足够的时间来避免这种情况。我的评论只涉及通道,而不需要关闭它们。 - Corey Scott
3个回答

41

I. 替代方案

前言:我将使用一个更简单的生成器,因为问题并不涉及生成器的复杂性,而是生成器和使用者之间的信号以及使用者本身的调用。这个简单的生成器只能生成从 09 的整数。

1. 使用函数返回值

使用传递一个简单的使用者函数的生成-使用者模式更加清晰,它还具有优势,即可以返回一个值,表示是否需要中止或执行其他操作。

考虑到在这个例子中只需发出一个事件信号("abort"),使用者函数将具有bool返回类型,用于表示是否需要中止。

因此,请看这个简单的例子,其中传递了一个使用者函数值给生成器:

func generate(process func(x int) bool) {
    for i := 0; i < 10; i++ {
        if process(i) {
            break
        }
    }
}

func main() {
    process := func(x int) bool {
        fmt.Println("Processing", x)
        return x == 3 // Terminate if x == 3
    }
    generate(process)
}

输出(在Go Playground上尝试):

Processing 0
Processing 1
Processing 2
Processing 3

请注意,消费者(process)不需要是一个“本地”函数,它可以在main()之外声明,例如,它可以是全局函数或来自另一个包的函数。

这种解决方案的潜在缺点是它只使用1个goroutine来生成和消费值。

2. 使用通道

如果您仍然想使用通道,那么可以。请注意,由于通道是由生成器创建的,并且由于消费者循环遍历从通道接收到的值(理想情况下使用for ... range构造),因此关闭通道是生成器的责任。这样做还允许你返回一个只读通道。

是的,在生成器中通过延迟语句关闭返回的通道最好了,这样即使生成器发生恐慌,消费者也不会被阻塞。但是请注意,这个延迟关闭不在generate()函数中,而是在从generate()开始并作为新的goroutine执行的匿名函数中;否则,通道将在从generate()返回之前关闭——根本没有用处...

如果您想要向生成器发出信号(例如中止并停止生成更多的值),则可以使用另一个传递给生成器的通道。由于生成器只会“监听”此通道,因此它也可以声明为可读取的通道。如果您只需要发出一个事件的信号(在我们的情况下是中止),则不需要在此通道上发送任何值,仅需简单地关闭即可。如果您需要发出多个事件的信号,则可以通过实际在此通道上发送值来完成,以执行事件/操作(其中中止可能是多个事件之一)。

您可以使用选择语句作为处理向返回的通道发送值和观察传递给生成器的通道的惯用方式。

这里是一个带有abort通道的解决方案:

func generate(abort <-chan struct{}) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < 10; i++ {
            select {
            case ch <- i:
                fmt.Println("Sent", i)
            case <-abort: // receive on closed channel can proceed immediately
                fmt.Println("Aborting")
                return
            }
        }
    }()
    return ch
}

func main() {
    abort := make(chan struct{})
    ch := generate(abort)
    for v := range ch {
        fmt.Println("Processing", v)
        if v == 3 { // Terminate if v == 3
            close(abort)
            break
        }
    }
    // Sleep to prevent termination so we see if other goroutine panics
    time.Sleep(time.Second)
}

输出结果(在Go Playground上尝试):

Sent 0
Processing 0
Processing 1
Sent 1
Sent 2
Processing 2
Processing 3
Sent 3
Aborting

这种解决方案的明显优势在于它已经使用了2个goroutine(一个用于生成值,另一个用于消耗/处理它们),并且非常容易将其扩展为使用任意数量的goroutine处理生成的值,因为由生成器返回的通道可以同时从多个goroutine中使用 - 通过设计,从通道接收是安全的,数据竞争不会发生;有关更多信息,请参阅:如果我正确使用通道,是否需要使用互斥锁?

II. 未解答问题的答案

goroutine上的“未捕获”panic将结束goroutine的执行,但不会造成资源泄漏问题。但是,如果作为单独的goroutine执行的函数在非panic情况下会释放其分配的资源(在非延迟语句中),那么该代码显然不会运行,并且会导致资源泄漏。

您之所以没有观察到这一点,是因为程序在主goroutine终止时终止(它不等待其他非主goroutine完成 - 因此您的其他goroutine没有机会出现panic)。请参阅规范:程序执行

但要知道, panic() recover()是用于特殊情况的,它们不适用于类似Java中异常和块等常见用例。例如,应通过返回错误(并处理它们!)避免panic,并且绝对不应使panic离开包的“边界”(例如,在包实现中使用panic()recover()可能是合理的,但是应该在包内“捕获”panic状态,而不要让其出现)。


在基于通道的解决方案中,我认为即使“abort”已关闭,仍然可以处理4、5、6等项目...,尽管这需要非常不幸的时机。 - Paul Hankin
@PaulHankin 你说得对,谢谢。在消费者中添加了一个break来修复它,在close()之后(由于在generate()中使用了无缓冲通道,因此当前的goroutine调度中并不明显)。使用break可以确保不会处理更多的元素,并且close(abort)也将确保其他goroutine也能终止。 - icza
我的问题与https://dev59.com/rGgu5IYBdhLWcg3wWl0q不同。关键点在于递归函数,在你的答案中没有处理。我稍后会发另一篇帖子来解决这些问题。 - Siu Ching Pong -Asuka Kenji-

9
在我看来,通常生成器只是内部封装闭包的包装器。类似于这样:
package main

import "fmt"

// This function `generator` returns another function, which
// we define anonymously in the body of `generator`. The
// returned function _closes over_ the variable `data` to
// form a closure.
func generator(data int, permutation func(int) int, bound int) func() (int, bool) {
    return func() (int, bool) {
        data = permutation(data)
        return data, data < bound
    }
}

// permutation function
func increment(j int) int {
    j += 1
    return j
}

func main() {
    // We call `generator`, assigning the result (a function)
    // to `next`. This function value captures its
    // own `data` value, which will be updated each time
    // we call `next`.
    next := generator(1, increment, 7)
    // See the effect of the closure by calling `next`
    // a few times.
    fmt.Println(next())
    fmt.Println(next())
    fmt.Println(next())
    // To confirm that the state is unique to that
    // particular function, create and test a new one.
    for next, generation, ok := generator(11, increment, 17), 0, true; ok; {
        generation, ok = next()
        fmt.Println(generation)
    }
}

对我来说,它看起来并不像“for range”那样优雅,但在语义和语法上非常清晰。它在这个链接中可以正常工作。


我的问题与https://dev59.com/rGgu5IYBdhLWcg3wWl0q不同。关键点在于递归函数,在你的答案中没有处理。我稍后会发另一篇帖子来解决这些问题。在你的答案中,你假设函数“permutation”可以给定一个“int”返回一个“int”。为了证明你的假设,这里有几个运行:`permutation(1)`返回,比如说,`20`,然后你将`20`传递给下一个调用; permutation(20)返回,比如说,15,然后你将15传递给下一个调用。不幸的是,“递归函数”并不是这样工作的。 - Siu Ching Pong -Asuka Kenji-
递归函数有基本情况和递归情况。在递归情况下,它使用不同于提供给它的参数来调用自身。在大多数情况下,参数比前面的“更小”,以便“减少”问题。在基本情况下,解决方案/答案已知且无需进一步缩小,应用已知解决方案/返回已知答案。 - Siu Ching Pong -Asuka Kenji-
1
不使用通道的问题在于,一旦在基本情况下返回答案,调用堆栈就会被销毁,如果没有备份(例如在闭包中),信息就会丢失。备份它们没有意义,因为这本质上与使算法迭代(非递归)的工作相同。如果我还没有讲清楚,请尝试编写一个函数,当它被调用为permutation([]string{"a", "b", "c", "d"})时,生成abcdabdcacbdacdb、...、dcba。谢谢! - Siu Ching Pong -Asuka Kenji-
这是最佳的结果... - Albi

3

我同意icza的回答。总结一下,这里有两个替代方案:

  1. 映射函数: 使用回调函数来遍历集合。 func myIterationFn(yieldfunc (myType)) (stopIterating bool)。它的缺点是把控制流程交给了 myGenerator 函数。 myIterationFn 不是一个Pythonic生成器,因为它没有返回一个可迭代序列。
  2. 通道: 使用通道并注意泄漏goroutine。可以将 myIterationFn 转换为返回可迭代序列的函数。下面的代码提供了这种转换的示例。
myMapper := func(yield func(int) bool) {
    for i := 0; i < 5; i++ {
        if done := yield(i); done {
            return
        }
    }
}
iter, cancel := mapperToIterator(myMapper)
defer cancel() // This line is very important - it prevents goroutine leaks.
for value, ok := iter(); ok; value, ok = iter() {
    fmt.Printf("value: %d\n", value)
}

这是一个完整的程序示例。 mapperToIterator映射函数转换为生成器。由于Go语言缺乏泛型,需要将interface{}强制转换为int。请注意,HTML标签已被保留。
package main

import "fmt"

// yieldFn reports true if an iteration should continue. It is called on values
// of a collection.
type yieldFn func(interface{}) (stopIterating bool)

// mapperFn calls yieldFn for each member of a collection.
type mapperFn func(yieldFn)

// iteratorFn returns the next item in an iteration or the zero value. The
// second return value is true when iteration is complete.
type iteratorFn func() (value interface{}, done bool)

// cancelFn should be called to clean up the goroutine that would otherwise leak.
type cancelFn func()

// mapperToIterator returns an iteratorFn version of a mappingFn. The second
// return value must be called at the end of iteration, or the underlying
// goroutine will leak.
func mapperToIterator(m mapperFn) (iteratorFn, cancelFn) {
    generatedValues := make(chan interface{}, 1)
    stopCh := make(chan interface{}, 1)
    go func() {
        m(func(obj interface{}) bool {
            select {
            case <-stopCh:
                return false
            case generatedValues <- obj:
                return true
            }
        })
        close(generatedValues)
    }()
    iter := func() (value interface{}, notDone bool) {
        value, notDone = <-generatedValues
        return
    }
    return iter, func() {
        stopCh <- nil
    }
}

func main() {
    myMapper := func(yield yieldFn) {
        for i := 0; i < 5; i++ {
            if keepGoing := yield(i); !keepGoing {
                return
            }
        }
    }
    iter, cancel := mapperToIterator(myMapper)
    defer cancel()
    for value, notDone := iter(); notDone; value, notDone = iter() {
        fmt.Printf("value: %d\n", value.(int))
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接