通道是用于什么的?

37

在查看一些Go代码时,我发现以下内容:

  ch := make(chan int)

我在一个在线教程中查阅了有关 Go Channels 的工作原理的内容:

https://tour.golang.org/concurrency/2

但是我发现这个例子不太清楚。

有人能给我一个简单的解释和使用通道的示例吗?


请查看此文章:Golang通道的工作原理 - Jerry An
5个回答

73

chan是Golang中的通道。简单来说,你可以把它想象成一个盒子,在其中一端放入物品,然后从另一端取出。

非缓冲通道

enter image description here

缓冲通道

enter image description here

这是我为您编写的小代码,以便了解通道。现在改变goroutine的顺序并查看输出。每次输出可能会有所不同。

    package main

    import (
        "fmt"
        "time"
    )

    func main() {
        messages := make(chan int)
        go func() {
            time.Sleep(time.Second * 3)
            messages <- 1
        }()
        go func() {
            time.Sleep(time.Second * 2)
            messages <- 2
        }() 
        go func() {
            time.Sleep(time.Second * 1)
            messages <- 3
        }()
        go func() {
            for i := range messages {
                fmt.Println(i)
            }
        }()
        go func() {
            time.Sleep(time.Second * 1)
            messages <- 4
        }()
        go func() {
            time.Sleep(time.Second * 1)
            messages <- 5
        }()
        time.Sleep(time.Second * 5)
    }

为了更好地理解,建议访问这个博客,在其中通过图形用户界面描述Go语言的go routines和channels。

请访问http://divan.github.io/posts/go_concurrency_visualize/


12
我喜欢这些插图! - Mister Verleg
4
这对我很有帮助!非常感谢。 - workdreamer
3
“顺序不重要”这句话是不正确的。Go语言规范指明:“通道(channel) 行为类似先进先出(FIFO) 队列。” - icza

55
我认为规范对此非常清楚。 规范:通道类型:

通道提供了一种机制,允许并发执行的函数通过发送接收指定元素类型的值来进行通信。

当你有多个同时执行的goroutine时,通道提供了最简单的方法让它们相互通信。

通信的一种方式是通过一个“共享”变量,该变量对两个goroutine都可见,但这需要适当的锁定/同步访问。

相反,Go更喜欢使用通道。引用自 Effective Go: 通过通信共享

不要通过共享内存进行通信,而是通过通信共享内存。

因此,例如不必将消息放入共享的切片中,您可以创建一个通道(对两个goroutine都可见),而不需要任何外部同步/锁定,一个goroutine可以通过通道发送消息(值),另一个goroutine可以接收它们。

在任何给定时间只有一个goroutine可以访问该值。按设计,不会发生数据竞争。

因此,实际上,任意数量的goroutine都可以在同一通道上发送值,任意数量的goroutine都可以从中接收值,仍然不需要进一步的同步。有关更多详细信息,请参见相关问题:如果我正确使用通道,是否需要使用互斥锁?

通道示例

让我们看一个示例,其中我们为并发计算目的启动了2个额外的goroutine。我们向第一个传递一个数字,它会将其加1,并将结果传递到第二个通道。第二个goroutine将接收数字,将其乘以10,并将其传递到结果通道:

func AddOne(ch chan<- int, i int) {
    i++
    ch <- i
}

func MulBy10(ch <-chan int, resch chan<- int) {
    i := <-ch
    i *= 10
    resch <- i
}

这是如何调用/使用它的:

func main() {
    ch := make(chan int)
    resch := make(chan int)

    go AddOne(ch, 9)
    go MulBy10(ch, resch)

    result := <-resch
    fmt.Println("Result:", result)
}

通过通道进行通信也可以解决 goroutine 之间的等待问题。在这个例子中,这意味着 MulBy10() 将等待直到 AddOne() 提供增加后的数字,而 main() 将在打印结果前等待 MulBy10()。输出结果与预期相符(在 Go Playground 上尝试)。
Result: 100

语言支持

有许多语言结构是为了更方便地使用通道而设计的,例如:

  • for ... range 循环可用于从通道接收值,直到通道被关闭为止。
  • select 语句可用于列出多个通道操作,如在通道上发送和从通道接收,并会选择可以继续进行而不阻塞的操作(如果有多个可以继续进行的操作,则随机选择一个;如果没有准备好的操作,则会阻塞)。
  • 有一种特殊形式的接收运算符,允许您检查通道是否已关闭(除了接收值之外):v, ok := <-ch
  • 内置函数 len() 可告诉您排队的元素数量(未读取);内置函数 cap() 返回通道缓冲区容量。

其他用途

一个更实际的例子,可以看看如何使用通道来实现工作池。类似的用途是将值从生产者分发到消费者

另一个实际的例子是使用缓冲通道来实现内存池

还有另一个实际的例子是优雅地实现代理

通常使用通道来设置某些阻塞操作的超时时间,利用time.After()返回的通道,在指定的延迟/持续时间后"触发"(即发送一个值)。请参见此示例进行演示(在Go Playground上尝试):

ch := make(chan int)

select {
case i := <-ch:
    fmt.Println("Received:", i)
case <-time.After(time.Second):
    fmt.Println("Timeout, no value received")
}

它可以用于等待某个值的最长时间,但如果其他goroutine在那个时间内无法提供该值,我们可能会决定做其他事情。另外,一种特殊形式的通信可能只是为了标记某些操作的完成(而不实际发送任何“有用”的数据)。这种情况可以通过具有任何元素类型的通道来实现,例如chan int,并在其上发送任何值,例如0。但由于发送的值不包含信息,因此您可以像chan struct{}这样声明它。或者更好的是,如果您只需要一次信号,则可以关闭通道,可以使用for ... range拦截它,或从中接收(由于从关闭的通道接收立即进行,产生元素类型的零值)。还要知道,即使可以使用通道进行此类信号传递,但对于此类信号传递,有一个更好的替代方案:sync.WaitGroup

进一步阅读

值得了解有关通道公理以避免出现意外行为:未初始化的通道会如何行动?

Go博客:通过通信共享内存

Go博客:Go并发模式:管道和取消

Go博客:高级Go并发模式

Ardan实验室:Go中通道的本质


4
我喜欢这个答案,特别是关于使用 time.After(time.Second) 的部分。我不知道那样做是可行的。 - reticentroot

1
这个概念与Unix/Linux自始就存在的“管道”非常相似。这是一种可靠的进程/线程间通信机制,内置于语言中。非常方便。

0

如果你想让goroutine相互通信,请使用通道。有多种原因可能需要这种信号。

  1. 向另一个goroutine发出信号以开始其任务。
  2. 等待另一个goroutine结束其任务。
  3. 通过关闭通道来发出信号,使其他goroutine停止工作。 您可以在此处找到其他类似的情况。 https://www.ardanlabs.com/blog/2017/10/the-behavior-of-channels.html

如果您想要goroutine进行通信,则请使用通道。这可以是带数据或不带数据的通信。


0
package main

/*
  Simulation for sending messages from threads for processing,
  and getting a response (processing result) to the thread
*/
import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type (
    TMsg struct { // message
        name   string // thread name, owner name
        backId int    // index of response Back channel, or -1
        msg    string // message
        note   string // comment
    }

    TTh struct { // for thread
        name   string // thread name
        jobId  int    // index of central Job chanel
        backId int    // index of response Back channel, or -1
    }

    TChans map[int]chan TMsg
)

var gChans TChans //global variable, all channels map

func main() {
    gChans = make(TChans)      // all channels map
    jobIndex, job := NewChan() // chanel for send mesage to central Job (from threads)
    _, worker := NewChan()     // channel for send message to Worker (from Job receiver)

    for i := 1; i <= 5; i++ { // 5 threads
        backIndex, _ := NewChan()                                                    // channel index for response back the thread
        go ping(TTh{name: fmt.Sprint(i) + "th", jobId: jobIndex, backId: backIndex}) //start threads
    }

    //go Job(job, worker) // central receiver and start workers
    Job(job, worker) // central receiver and start workers

    for LenChan() > 2 { // 2 = job and worker channels
        SleepM(5000)
    }
}

func Job(job, worker chan TMsg) { //central receiver
    var v TMsg
    ctx := context.Background()
    ctx, cancelWorkers := context.WithCancel(ctx)
    for i := 1; i <= 3; i++ { // start workers , sql simulation
        go Worker(i, worker, ctx)
    }
    for {
        select {
        case v = <-job: // receive message
            if v.note == "sql" { // sql simulation
                worker <- v
            } else {
                if v.note == "end" {
                    FreeChan(v.backId)
                    fmt.Println(v.name, "FREE")
                } else {
                    ch := GetChan(v.backId)
                    if ch != nil {
                        ch <- TMsg{name: v.name, backId: v.backId, msg: v.msg, note: "receiver"}
                    }
                }
            }
        default:
            if LenChan() <= 2 {
                cancelWorkers() // cancel workers
                return
            } else {
                SleepM(2)
            }
        }
    }
}

func Worker(id int, worker <-chan TMsg, ctx context.Context) { // simulate sql or auther process
    var v TMsg
    for {
        select {
        case v = <-worker:
            {
                SleepM(rand.Intn(50))
                v.note = "worker:" + fmt.Sprint(id)
                ch := GetChan(v.backId)
                if ch != nil {
                    ch <- v
                }
            }
        case <-ctx.Done(): //return
        default:
            {
                //fmt.Println("worker", id)
                SleepM(2)
            }
        }
    }
}

func waitResponse(d chan TMsg, pTimeout int) (bool, TMsg) {
    var v TMsg
    for {
        select {
        case v = <-d:
            return true, v
        case <-time.After(time.Duration(pTimeout) * time.Second):
            return false, v
        }
    }
}

func ping(pTh TTh) {
    SleepM(10)
    var v TMsg
    ok := true
    i := 0
    job := GetChan(pTh.jobId)   // central Job receiver chanel
    back := GetChan(pTh.backId) // response Back channel
    for i < 50 {
        if ok {
            ok = false
            job <- TMsg{name: pTh.name, backId: pTh.backId, msg: fmt.Sprint(i), note: "sql"}
            i++
        }
        if back != nil {
            if !ok {
                ok, v = waitResponse(back, 10) //with timeout 10 sec
                if ok {
                    fmt.Println(v.name, "msg:", v.msg, v.note)
                    SleepM(1)
                } else {
                    fmt.Println(pTh.name, "response timeout")
                }

            }
        } else {
            SleepM(1)
        }
    }
    fmt.Println(v.name, "---- end ----")
    v.note = "end"
    job <- v
}

func NewChan() (int, chan TMsg) {
    mux := &sync.RWMutex{}
    mux.Lock()
    defer mux.Unlock()
    index := len(gChans)
    gChans[index] = make(chan TMsg)
    return index, gChans[index]
}

func GetChan(pIndex int) chan TMsg {
    mux := &sync.RWMutex{}
    mux.Lock()
    defer mux.Unlock()
    ch, ok := gChans[pIndex]
    if ok {
        return ch
    } else {
        return nil
    }
}

func LenChan() int {
    return len(gChans)
}

func FreeChan(pIndex int) bool {
    ch := GetChan(pIndex)
    if ch != nil {
        mux := &sync.RWMutex{}
        mux.Lock()
        defer mux.Unlock()
        close(gChans[pIndex]) //close channel
        gChans[pIndex] = nil
        delete(gChans, pIndex)
        return true
    } else {
        return false
    }
}

func SleepM(pMilliSec int) { // sleep millisecounds
    time.Sleep(time.Duration(pMilliSec) * time.Millisecond)
}

目前你的答案表述不清楚。请编辑并添加更多细节,以帮助其他人理解它是如何回答所问的问题的。在帮助中心中可以找到有关如何编写好的答案的更多信息。 - Community

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接