Go语言中符合惯用法的可变大小工作池

11
我正试图在Go语言中实现一个工作池。 go-wiki(和Channels部分的Effective Go)提供了完美的限制资源使用的示例。只需创建一个缓冲区大小等于工作池的通道,然后用工作人员填充该通道,在完成工作后将它们发送回该通道。从通道接收会阻塞直到有可用的工作人员。所以通道和循环就是整个实现 - 非常酷!

另外,也可以在向通道发送时进行阻塞,但是思路相同。

我的问题是如何在运行时更改工作池的大小。我不认为有一种方法可以改变通道的大小。我有一些想法,但其中大多数似乎过于复杂。 这个页面实际上使用了一个通道和空结构体来实现信号量,方法类似,但它也有同样的问题(在Google搜索“golang semaphore”时经常遇到这种情况)。


为什么您想要一个可变大小的工作池? - fabrizioM
@fabrizioM 工人们自己做的事情很少,他们实际上只是控制几个外部进程。工人数量应该取决于外部进程负载和其他因素(例如,一种类型的工作优先级与另一种类型的工作相比)。 - Hut8
在这种情况下,您可能最好使用不同的机制来调整此问题,而不是可调大小池。例如,您可以启动一个goroutine来检查目标机器的RAM、负载等,并在机器准备好时启动任务,如果没有,则安排另一个检查。显然,我不知道什么有效,因为我真的不知道您正在构建什么。 - twotwotwo
2个回答

25

我会采用另一种方式。不是生成许多 goroutine(它们仍需要相当数量的内存)并使用通道阻止它们,而是将这些工作线程建模为 goroutine 并使用通道来分配工作。像这样:

package main

import (
    "fmt"
    "sync"
)

type Task string

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            fmt.Println("processing task", task)
        case <-quit:
            return
        }
    }
}

func main() {
    tasks := make(chan Task, 128)
    quit := make(chan bool)
    var wg sync.WaitGroup

    // spawn 5 workers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute some tasks
    tasks <- Task("foo")
    tasks <- Task("bar")

    // remove two workers
    quit <- true
    quit <- true

    // add three more workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute more tasks
    for i := 0; i < 20; i++ {
        tasks <- Task(fmt.Sprintf("additional_%d", i+1))
    }

    // end of tasks. the workers should quit afterwards
    close(tasks)
    // use "close(quit)", if you do not want to wait for the remaining tasks

    // wait for all workers to shut down properly
    wg.Wait()
}

创建一个独立的WorkerPool类型并添加一些方便的方法可能是个好主意。此外,通常使用一个包含"done"信道的结构体来代替type Task string,该信道用于发出任务已成功执行的信号。

编辑:我进行了更多实验,并想出了以下内容:http://play.golang.org/p/VlEirPRk8V。这基本上是相同的示例,但具有更好的API。


1
谢谢!看起来很棒。 - Pablo Lalloni
1
在这种情况下,所有的工作人员在示例之前都有可能接收到退出信号:http://play.golang.org/p/19DKaA4Q6G - Oleg

3
一个简单的修改思路是增加一个控制信号量大小的通道。其中关键部分在于select语句。如果队列中有更多的任务,可以使用当前的信号量来处理它们;如果有请求改变信号量大小的需求,那么就可以改变信号量大小并继续使用新的信号量处理请求队列。请注意,旧信号量将被垃圾回收。
package main

import "time"
import "fmt"

type Request struct{ num int }
var quit chan struct{} = make(chan struct{})

func Serve(queue chan *Request, resize chan int, semsize int) {
    for {
        sem := make(chan struct{}, semsize)
        var req *Request
        select {
        case semsize = <-resize:
            {
                sem = make(chan struct{}, semsize)
                fmt.Println("changing semaphore size to ", semsize)
            }
        case req = <-queue:
            {
                sem <- struct{}{}   // Block until there's capacity to process a request.
                go handle(req, sem) // Don't wait for handle to finish.
            }
                case <-quit:
                     return
        }

    }
}

func process(r *Request) {
  fmt.Println("Handled Request", r.num)
}

func handle(r *Request, sem chan struct{}) {
    process(r) // May take a long time & use a lot of memory or CPU
    <-sem      // Done; enable next request to run.
}

func main() {
    workq := make(chan *Request, 1)
    ctrlq := make(chan int)
    go func() {
        for i := 0; i < 20; i += 1 {
            <-time.After(100 * time.Millisecond)
            workq <- &Request{i}
        }
        <-time.After(500 * time.Millisecond)
            quit <- struct{}{}
    }()
    go func() {
        <-time.After(500 * time.Millisecond)
        ctrlq <- 10
    }()
    Serve(workq, ctrlq, 1)
}

http://play.golang.org/p/AHOLlAv2LH


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接