多个Docker容器日志

3

我想要同时从多个docker容器中获取日志(顺序无关紧要)。如果将types.ContainerLogsOption.Follow设置为false,则可以按预期工作。

如果将types.ContainerLogsOption.Follow设置为true,有时日志输出会在几条日志之后停滞不前,并且没有后续日志打印到stdout。

如果输出没有停滞,则按预期工作。

此外,如果我重启一个或全部容器,则该命令不像docker logs -f containerName那样退出。

func (w *Whatever) Logs(options LogOptions) {
    readers := []io.Reader{}

    for _, container := range options.Containers {
        responseBody, err := w.Docker.Client.ContainerLogs(context.Background(), container, types.ContainerLogsOptions{
            ShowStdout: true,
            ShowStderr: true,
            Follow:     options.Follow,
        })
        defer responseBody.Close()

        if err != nil {
            log.Fatal(err)
        }
        readers = append(readers, responseBody)
    }

    // concatenate all readers to one
    multiReader := io.MultiReader(readers...)

    _, err := stdcopy.StdCopy(os.Stdout, os.Stderr, multiReader)
    if err != nil && err != io.EOF {
        log.Fatal(err)
    }
}

基本上,我的实现与docker logshttps://github.com/docker/docker/blob/master/cli/command/container/logs.go没有太大的区别,因此我想知道是什么原因导致了这些问题。


从io.MultiReader文档中可以得知:MultiReader返回一个Reader,它是提供的输入读取器的逻辑连接。它们按顺序读取。。您正在尝试在转到下一个日志之前读取每个日志直到EOF。 - JimB
1个回答

1
正如JimB所评论的那样,由于io.MultiReader操作的原因,该方法无法工作。你需要做的是从每个响应中单独读取并组合输出。由于你正在处理日志,将读取内容分割成新行是有意义的。bufio.Scanner可以为单个io.Reader执行此操作。因此,一种选择是创建一种新类型,可以同时扫描多个读取器。
您可以像这样使用它:
scanner := NewConcurrentScanner(readers...)
for scanner.Scan() {
    fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
    log.Fatalln(err)
}

并发扫描器的示例实现:

// ConcurrentScanner works like io.Scanner, but with multiple io.Readers
type ConcurrentScanner struct {
    scans  chan []byte   // Scanned data from readers
    errors chan error    // Errors from readers
    done   chan struct{} // Signal that all readers have completed
    cancel func()        // Cancel all readers (stop on first error)

    data []byte // Last scanned value
    err  error
}

// NewConcurrentScanner starts scanning each reader in a separate goroutine
// and returns a *ConcurrentScanner.
func NewConcurrentScanner(readers ...io.Reader) *ConcurrentScanner {
    ctx, cancel := context.WithCancel(context.Background())
    s := &ConcurrentScanner{
        scans:  make(chan []byte),
        errors: make(chan error),
        done:   make(chan struct{}),
        cancel: cancel,
    }

    var wg sync.WaitGroup
    wg.Add(len(readers))

    for _, reader := range readers {
        // Start a scanner for each reader in it's own goroutine.
        go func(reader io.Reader) {
            defer wg.Done()
            scanner := bufio.NewScanner(reader)

            for scanner.Scan() {
                select {
                case s.scans <- scanner.Bytes():
                    // While there is data, send it to s.scans,
                    // this will block until Scan() is called.
                case <-ctx.Done():
                    // This fires when context is cancelled,
                    // indicating that we should exit now.
                    return
                }
            }
            if err := scanner.Err(); err != nil {
                select {
                case s.errors <- err:
                    // Reprort we got an error
                case <-ctx.Done():
                    // Exit now if context was cancelled, otherwise sending
                    // the error and this goroutine will never exit.
                    return
                }
            }
        }(reader)
    }

    go func() {
        // Signal that all scanners have completed
        wg.Wait()
        close(s.done)
    }()

    return s
}

func (s *ConcurrentScanner) Scan() bool {
    select {
    case s.data = <-s.scans:
        // Got data from a scanner
        return true
    case <-s.done:
        // All scanners are done, nothing to do.
    case s.err = <-s.errors:
        // One of the scanners error'd, were done.
    }
    s.cancel() // Cancel context regardless of how we exited.
    return false
}

func (s *ConcurrentScanner) Bytes() []byte {
    return s.data
}

func (s *ConcurrentScanner) Text() string {
    return string(s.data)
}

func (s *ConcurrentScanner) Err() error {
    return s.err
}

这是它在Go Playground中运行的示例:https://play.golang.org/p/EUB0K2V7iT 您可以看到并发扫描器的输出交错。与io.MultiReader不同,它不是先读取一个读取器的所有内容,然后再转向下一个读取器。

感谢您详细的回答! - drlogout

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接