golang:如何使select语句变得确定性?

4

让我们仔细看一下Go语言时间包中Ticker示例代码

package main

import (
    "fmt"
    "time"
)

func main() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            fmt.Println("Current time: ", t)
        }
    }
}

本示例中将时间间隔调整为1秒,方便起见。在运行足够次数后,我们发现存在这样一种情况,即当前时间未被打印(或者仅打印了9次而不是10次):

Current time:  2020-06-10 12:23:51.189421219 -0700 PDT m=+1.000350341
Done!
Current time:  2020-06-10 12:23:52.193636682 -0700 PDT m=+1.000473686
Done!
Current time:  2020-06-10 12:23:53.199688564 -0700 PDT m=+1.000322824
Done!
Current time:  2020-06-10 12:23:54.204380186 -0700 PDT m=+1.000420293
Done!
Current time:  2020-06-10 12:23:55.21085129 -0700 PDT m=+1.000266810
Done!
Done!
Current time:  2020-06-10 12:23:57.220120615 -0700 PDT m=+1.000479431
Done!
Current time:  2020-06-10 12:23:58.226167159 -0700 PDT m=+1.000443199
Done!
Current time:  2020-06-10 12:23:59.231721969 -0700 PDT m=+1.000316117
Done!

当done和ticker.C通道同时就绪时,我们进入了Go非确定性行为的领域: 我理解Go之所以选择select是非确定性的设计原理。这在很大程度上归结于该语言不致力于解决问题,因为这样做通常很困难,并且可能会导致用户编写并发问题代码,因此优先考虑使用select来留给读者练习。
假设出于任何原因,我想确保在关闭程序并打印“完成!”之前消耗所有未处理的ticks。是否存在一般性的转换可以应用于此简单示例使其具有确定性?
我尝试添加另一个信号通道:
func main() {
    ticker := time.NewTicker(time.Second)
    stop := make(chan bool)
    done := make(chan bool)
    tick := make(chan time.Time)
    go func() {
        time.Sleep(1 * time.Second)
        stop <- true
    }()
    go func() {
        for t := range tick {
            fmt.Println("Current time: ", t)
        }
        done <- true
    }()
    for {
        select {
        case <-stop:
            ticker.Stop()
            close(tick)
        case t := <-ticker.C:
            tick <- t
            break
        case <-done:
            fmt.Println("Done!")
            return
        }
    }
}

但似乎表现更差了...
Current time:  2020-06-10 13:23:20.489040642 -0700 PDT m=+1.000425216
Done!
Current time:  2020-06-10 13:23:21.495263288 -0700 PDT m=+1.000338902
Done!
Current time:  2020-06-10 13:23:22.501474055 -0700 PDT m=+1.000327127
Done!
Current time:  2020-06-10 13:23:23.503531868 -0700 PDT m=+1.000244398
Done!
Current time:  2020-06-10 13:23:24.510210786 -0700 PDT m=+1.000420955
Done!
Current time:  2020-06-10 13:23:25.516500359 -0700 PDT m=+1.000460986
Done!
Done!
Current time:  2020-06-10 13:23:27.527077433 -0700 PDT m=+1.000375330
Done!
Current time:  2020-06-10 13:23:28.533401667 -0700 PDT m=+1.000470273
Done!
panic: send on closed channel

goroutine 1 [running]:
main.main()
    /home/dcow/Desktop/ticker-go/main2.go:29 +0x22f
Current time:  2020-06-10 13:23:30.547554719 -0700 PDT m=+1.000399602
Done!
Current time:  2020-06-10 13:23:31.55416725 -0700 PDT m=+1.000443683
Done!
Current time:  2020-06-10 13:23:32.56041176 -0700 PDT m=+1.000436364
Done!
Done!
Current time:  2020-06-10 13:23:34.572550584 -0700 PDT m=+1.000445593
Done!
Current time:  2020-06-10 13:23:35.578672712 -0700 PDT m=+1.000357330
Done!
Done!
Current time:  2020-06-10 13:23:37.590984117 -0700 PDT m=+1.000447504
Done!

我们不能保证在接收到最终的tick时不会同时收到停止信号,因此我们只是将问题转化为了一些在行为“不正确”时会出现panic的情况(这比默默地出现要好得多)。如果我们将tick通道置为空,则会退化到原始情况。而且我们仍然有一些情况下根本不会打印任何tick,可能是因为在计时器有机会触发之前就关闭了它。
那么,准备好的通道怎么样?
func main() {
    ticker := time.NewTicker(time.Second)
    tick := make(chan time.Time)
    ready := make(chan bool, 1)
    stop := make(chan bool)
    done := make(chan bool)
    go func() {
        time.Sleep(1 * time.Second)
        <-ready
        stop <- true
    }()
    go func() {
        for t := range tick {
            fmt.Println("Current time: ", t)
        }
        done <- true
    }()
    for {
        select {
        case <-stop:
            ticker.Stop()
            close(tick)
        case t := <-ticker.C:
            select {
            case ready<-true:
                break
            default:
            }
            tick <- t
            break
        case <-done:
            fmt.Println("Done!")
            return
        }
    }
}

这似乎有效。增加了3个新通道和一个额外的协程,但到目前为止还没有失败。这种模式在 Go 中是否惯用?在想要优先考虑选择案例的情况下,有哪些一般的形式策略可以应用此类型的转换? 我遇到的大多数建议都涉及顺序和嵌套选择,这并不能真正解决问题。

或者,有没有一种方法可以说“给我准备就绪的通道列表,以便我选择处理它们的顺序”?

编辑:

添加一些澄清的话:我不感兴趣保留并发操作的顺序。我同意这是一个愚蠢的尝试。 我只想知道是否有一组可选的通道已准备好被处理,并提供自己的逻辑来决定在多个通道同时准备好时该怎么做。我基本上对 Go 的 POSIX select 感兴趣。和/或我对描述或围绕通用“将非确定性选择转换为确定性选择的模式”的文献感兴趣。

例如,人们是否使用 heap 包并将数据存入优先级队列,最终从中读取? 是否有一个 x/reflect 风格的包,使用不安全的方法实现了优先选择? 是否有一些简单的模式,比如“将所有应该优先考虑的单通道选择转换为双通道样式,并将“完成”请求转发给生产者,后者应该终止并关闭他们的通道,然后在通道范围循环上阻塞(有点像我的工作解决方案)? 实际上,由于原因 x、y 等等,在共享条件变量上锁定。


你能提供更多关于你正在解决的问题的背景信息吗?只是为了让我理解整个情况。谢谢。 - Oleg Kovalov
我想我正在寻找一种通用模式,以处理您希望按特定顺序处理多个准备好的通道的情况。 - dcow
1
我目前唯一看到的解决方案是在某个切片中“注册”准备好的通道(由互斥锁保护),然后根据它们的计数器(每个通道已使用多少次)进行均匀分布或选择。我认为,关于select的每个解决方案仍将存在边缘情况,但这只是我的猜测,抱歉。 - Oleg Kovalov
“Done!”似乎只能在所有示例中打印一次,但在输出中出现了多次。代码列表/输出是否正确? - Mark
@Mark出现两次表示done和ticker通道同时准备好了,它随机选择了done而不是ticker。你可以通过查看时间戳来判断——它跳过了一秒钟。 - dcow
@dcow 对,抱歉,我被原始的10秒持续时间搞混了。 - Mark
2个回答

2
除非应用程序在标记和完成通道的准备状态之间具有某些已知的排序,否则无法确保应用程序按发送值的顺序处理通道中的值。
应用程序可以使用嵌套的选择语句确保在ticker.C中排队的值在done的值之前接收。
for {
    select {
    case t := <-ticker.C:
        fmt.Println("Current time: ", t)
    default:
        // ticker.C is not ready for commination, wait for both 
        // channels.
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            fmt.Println("Current time: ", t)
        }
    }
}

如果在内部选择中的准备<-ticker.C通信之前执行了done通信,则这两个通道几乎同时进入就绪状态。除非问题中有未说明的要求,否则这对应用程序不应该有影响。
应用程序可以嵌套第三个选择以使接收ticker.C在函数返回之前有最后一次执行的机会。当两个通道几乎同时进入准备状态时,此方法将优先考虑定时器。我提到这一点是为了完整性,而不是因为我推荐它。正如我在上一段中所说,本答案中的第一个代码片段应该已经足够好了。
for {
    select {
    case t := <-ticker.C:
        fmt.Println("Current time: ", t)
    default:
        // ticker.C is not ready for commination, wait for both
        // channels.
        select {
        case <-done:
            // Give communication on <-ticker.C one last
            // opportunity before exiting.
            select {
            case t := <-ticker.C:
                // Note that the ticker may have entered
                // the ready state just after the done channel
                // entered the state. 
                fmt.Println("Current time: ", t)
            default:
            }
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            fmt.Println("Current time: ", t)
        }
    }
}

1
正如我在答案中提到的那样,嵌套的选择器将窗口关闭到两个通道几乎同时进入就绪状态的位置。如果这已经满足应用需求,那么就没有问题了。如果您的应用程序中有其他已知的事件顺序必须遵循,则应在问题中说明。 - thwd
1
这更多是一个原则性的问题:我很好奇在golang中是否有一种实际的处理方式。例如,使用POSIX select,您会收到一组准备好进行处理的文件描述符,并且可以选择如何处理它们。在golang中真的没有类似的东西吗? - dcow
3
Go语言没有提供获取准备好进行通信的通道列表的方法。您想通过获取该列表解决什么问题?也许有一个更高级别的问题可以找到一个好的解决方案。 - thwd
1
我发现自己陷入了相当复杂的代码中,试图编写确定性程序,我担心我可能会使事情比必要的更加复杂。如果我可以优先处理“取消”消息,那么我的代码将会简单得多。但在最高层面上,我有点烦恼Go的时间包提供了一个表面上简单的例子,实际上却是不确定的。我看到很多Go示例代码看起来非常简单和正确,但实际上它的执行是不确定的。我不喜欢不确定性程序... - dcow
@MuffinTop,我对妥协并不感兴趣。最终我重写了一堆代码来使用sync包。很遗憾,对于纯通道实现我打了个-1。 - dcow
显示剩余6条评论

1

如果您需要在启用两个通道时选择一个通道而不是另一个通道,则可以进行嵌套选择。 如果在选择开始时两个通道都启用,则此操作将优先选择优先级较高的通道而不是优先级较低的通道:

select {
  case <-highPriority:
     // Deal with it
  default:
     select {
       case <-lowPriority:
         // low priority channel
       default:
     }
}

如果您有N个带优先级排名的通道,则可以尝试在循环中进行选择:
for _,channel:=range channels {
   select {
     case <-channel:
      //
     default:
   }
}

当然,这只是一个近似值,因为它会错过在循环期间发生的通道状态更改。但它将根据for循环开始时通道的状态对通道进行优先排序。
然后有reflect.Select,但它不会优先考虑通道。

4
值得一提的是,如果所有通道都没有物品可消耗,那么这将成为一个繁忙的循环。 - zerkms
@zerkms 你说得对。这不是一个好的解决方案。我在想这是否有可能。 - Burak Serdar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接