在查看一些Go代码时,我发现以下内容:
ch := make(chan int)
我在一个在线教程中查阅了有关 Go Channels 的工作原理的内容:
https://tour.golang.org/concurrency/2
但是我发现这个例子不太清楚。
有人能给我一个简单的解释和使用通道的示例吗?
在查看一些Go代码时,我发现以下内容:
ch := make(chan int)
我在一个在线教程中查阅了有关 Go Channels 的工作原理的内容:
https://tour.golang.org/concurrency/2
但是我发现这个例子不太清楚。
有人能给我一个简单的解释和使用通道的示例吗?
chan是Golang中的通道。简单来说,你可以把它想象成一个盒子,在其中一端放入物品,然后从另一端取出。
非缓冲通道
缓冲通道
这是我为您编写的小代码,以便了解通道。现在改变goroutine的顺序并查看输出。每次输出可能会有所不同。
package main
import (
"fmt"
"time"
)
func main() {
messages := make(chan int)
go func() {
time.Sleep(time.Second * 3)
messages <- 1
}()
go func() {
time.Sleep(time.Second * 2)
messages <- 2
}()
go func() {
time.Sleep(time.Second * 1)
messages <- 3
}()
go func() {
for i := range messages {
fmt.Println(i)
}
}()
go func() {
time.Sleep(time.Second * 1)
messages <- 4
}()
go func() {
time.Sleep(time.Second * 1)
messages <- 5
}()
time.Sleep(time.Second * 5)
}
为了更好地理解,建议访问这个博客,在其中通过图形用户界面描述Go语言的go routines和channels。
当你有多个同时执行的goroutine时,通道提供了最简单的方法让它们相互通信。
通信的一种方式是通过一个“共享”变量,该变量对两个goroutine都可见,但这需要适当的锁定/同步访问。
相反,Go更喜欢使用通道。引用自 Effective Go: 通过通信共享 :
不要通过共享内存进行通信,而是通过通信共享内存。
因此,例如不必将消息放入共享的切片中,您可以创建一个通道(对两个goroutine都可见),而不需要任何外部同步/锁定,一个goroutine可以通过通道发送消息(值),另一个goroutine可以接收它们。
在任何给定时间只有一个goroutine可以访问该值。按设计,不会发生数据竞争。
因此,实际上,任意数量的goroutine都可以在同一通道上发送值,任意数量的goroutine都可以从中接收值,仍然不需要进一步的同步。有关更多详细信息,请参见相关问题:如果我正确使用通道,是否需要使用互斥锁?
让我们看一个示例,其中我们为并发计算目的启动了2个额外的goroutine。我们向第一个传递一个数字,它会将其加1,并将结果传递到第二个通道。第二个goroutine将接收数字,将其乘以10,并将其传递到结果通道:
func AddOne(ch chan<- int, i int) {
i++
ch <- i
}
func MulBy10(ch <-chan int, resch chan<- int) {
i := <-ch
i *= 10
resch <- i
}
这是如何调用/使用它的:
func main() {
ch := make(chan int)
resch := make(chan int)
go AddOne(ch, 9)
go MulBy10(ch, resch)
result := <-resch
fmt.Println("Result:", result)
}
MulBy10()
将等待直到 AddOne()
提供增加后的数字,而 main()
将在打印结果前等待 MulBy10()
。输出结果与预期相符(在 Go Playground 上尝试)。Result: 100
有许多语言结构是为了更方便地使用通道而设计的,例如:
for ... range
循环可用于从通道接收值,直到通道被关闭为止。select
语句可用于列出多个通道操作,如在通道上发送和从通道接收,并会选择可以继续进行而不阻塞的操作(如果有多个可以继续进行的操作,则随机选择一个;如果没有准备好的操作,则会阻塞)。v, ok := <-ch
len()
可告诉您排队的元素数量(未读取);内置函数 cap()
返回通道缓冲区容量。一个更实际的例子,可以看看如何使用通道来实现工作池。类似的用途是将值从生产者分发到消费者。
另一个实际的例子是使用缓冲通道来实现内存池。
还有另一个实际的例子是优雅地实现代理。
通常使用通道来设置某些阻塞操作的超时时间,利用time.After()
返回的通道,在指定的延迟/持续时间后"触发"(即发送一个值)。请参见此示例进行演示(在Go Playground上尝试):
ch := make(chan int)
select {
case i := <-ch:
fmt.Println("Received:", i)
case <-time.After(time.Second):
fmt.Println("Timeout, no value received")
}
chan int
,并在其上发送任何值,例如0
。但由于发送的值不包含信息,因此您可以像chan struct{}
这样声明它。或者更好的是,如果您只需要一次信号,则可以关闭通道,可以使用for ... range
拦截它,或从中接收(由于从关闭的通道接收立即进行,产生元素类型的零值)。还要知道,即使可以使用通道进行此类信号传递,但对于此类信号传递,有一个更好的替代方案:sync.WaitGroup
。
值得了解有关通道公理以避免出现意外行为:未初始化的通道会如何行动?
如果你想让goroutine相互通信,请使用通道。有多种原因可能需要这种信号。
如果您想要goroutine进行通信,则请使用通道。这可以是带数据或不带数据的通信。
package main
/*
Simulation for sending messages from threads for processing,
and getting a response (processing result) to the thread
*/
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type (
TMsg struct { // message
name string // thread name, owner name
backId int // index of response Back channel, or -1
msg string // message
note string // comment
}
TTh struct { // for thread
name string // thread name
jobId int // index of central Job chanel
backId int // index of response Back channel, or -1
}
TChans map[int]chan TMsg
)
var gChans TChans //global variable, all channels map
func main() {
gChans = make(TChans) // all channels map
jobIndex, job := NewChan() // chanel for send mesage to central Job (from threads)
_, worker := NewChan() // channel for send message to Worker (from Job receiver)
for i := 1; i <= 5; i++ { // 5 threads
backIndex, _ := NewChan() // channel index for response back the thread
go ping(TTh{name: fmt.Sprint(i) + "th", jobId: jobIndex, backId: backIndex}) //start threads
}
//go Job(job, worker) // central receiver and start workers
Job(job, worker) // central receiver and start workers
for LenChan() > 2 { // 2 = job and worker channels
SleepM(5000)
}
}
func Job(job, worker chan TMsg) { //central receiver
var v TMsg
ctx := context.Background()
ctx, cancelWorkers := context.WithCancel(ctx)
for i := 1; i <= 3; i++ { // start workers , sql simulation
go Worker(i, worker, ctx)
}
for {
select {
case v = <-job: // receive message
if v.note == "sql" { // sql simulation
worker <- v
} else {
if v.note == "end" {
FreeChan(v.backId)
fmt.Println(v.name, "FREE")
} else {
ch := GetChan(v.backId)
if ch != nil {
ch <- TMsg{name: v.name, backId: v.backId, msg: v.msg, note: "receiver"}
}
}
}
default:
if LenChan() <= 2 {
cancelWorkers() // cancel workers
return
} else {
SleepM(2)
}
}
}
}
func Worker(id int, worker <-chan TMsg, ctx context.Context) { // simulate sql or auther process
var v TMsg
for {
select {
case v = <-worker:
{
SleepM(rand.Intn(50))
v.note = "worker:" + fmt.Sprint(id)
ch := GetChan(v.backId)
if ch != nil {
ch <- v
}
}
case <-ctx.Done(): //return
default:
{
//fmt.Println("worker", id)
SleepM(2)
}
}
}
}
func waitResponse(d chan TMsg, pTimeout int) (bool, TMsg) {
var v TMsg
for {
select {
case v = <-d:
return true, v
case <-time.After(time.Duration(pTimeout) * time.Second):
return false, v
}
}
}
func ping(pTh TTh) {
SleepM(10)
var v TMsg
ok := true
i := 0
job := GetChan(pTh.jobId) // central Job receiver chanel
back := GetChan(pTh.backId) // response Back channel
for i < 50 {
if ok {
ok = false
job <- TMsg{name: pTh.name, backId: pTh.backId, msg: fmt.Sprint(i), note: "sql"}
i++
}
if back != nil {
if !ok {
ok, v = waitResponse(back, 10) //with timeout 10 sec
if ok {
fmt.Println(v.name, "msg:", v.msg, v.note)
SleepM(1)
} else {
fmt.Println(pTh.name, "response timeout")
}
}
} else {
SleepM(1)
}
}
fmt.Println(v.name, "---- end ----")
v.note = "end"
job <- v
}
func NewChan() (int, chan TMsg) {
mux := &sync.RWMutex{}
mux.Lock()
defer mux.Unlock()
index := len(gChans)
gChans[index] = make(chan TMsg)
return index, gChans[index]
}
func GetChan(pIndex int) chan TMsg {
mux := &sync.RWMutex{}
mux.Lock()
defer mux.Unlock()
ch, ok := gChans[pIndex]
if ok {
return ch
} else {
return nil
}
}
func LenChan() int {
return len(gChans)
}
func FreeChan(pIndex int) bool {
ch := GetChan(pIndex)
if ch != nil {
mux := &sync.RWMutex{}
mux.Lock()
defer mux.Unlock()
close(gChans[pIndex]) //close channel
gChans[pIndex] = nil
delete(gChans, pIndex)
return true
} else {
return false
}
}
func SleepM(pMilliSec int) { // sleep millisecounds
time.Sleep(time.Duration(pMilliSec) * time.Millisecond)
}