使用Golang实现多节点并发SSH连接

5
我有一组服务器,需要建立SSH连接。我会为每个新的SSH连接生成一个新的goroutine。然后将该连接的结果(包括错误(如果有))发送到通道中,并从通道中读取。这个程序可以工作,但是即使我关闭了通道,它最终也会冻结。
以下是目前的代码:
package main

import (
    "fmt"
    "net"
    "sync"

    "github.com/awslabs/aws-sdk-go/aws"
    "github.com/awslabs/aws-sdk-go/service/ec2"
)

// ConnectionResult container
type ConnectionResult struct {
    host    string
    message string
}

func main() {
    cnres := make(chan ConnectionResult)
    ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
    wg := sync.WaitGroup{}

    params := &ec2.DescribeInstancesInput{
        Filters: []*ec2.Filter{
            &ec2.Filter{
                Name: aws.String("instance-state-name"),
                Values: []*string{
                    aws.String("running"),
                },
            },
        },
    }

    resp, err := ec2svc.DescribeInstances(params)
    if err != nil {
        panic(err)
    }

    for _, res := range resp.Reservations {
        for _, inst := range res.Instances {
            for _, tag := range inst.Tags {
                if *tag.Key == "Name" {
                    host := *tag.Value
                    wg.Add(1)
                    go func(hostname string, cr chan ConnectionResult) {
                        defer wg.Done()
                        _, err := net.Dial("tcp", host+":22")
                        if err != nil {
                            cr <- ConnectionResult{host, "failed"}
                        } else {
                            cr <- ConnectionResult{host, "succeeded"}
                        }
                    }(host, cnres)
                }
            }
        }
    }

    for cr := range cnres {
        fmt.Println("Connection to " + cr.host + " " + cr.message)
    }

    close(cnres)

    defer wg.Wait()
}

我做错了什么?有没有更好的方法在Go中进行并发SSH连接?

3个回答

3
上面的代码陷入了range cnres循环中。正如Go by Example所指出的,只有在关闭通道时,range才会结束。
解决这个问题的一种方法是在另一个goroutine中运行range cnres迭代。然后您可以使用wg.Wait(),然后像这样close()通道:
...
go func() {
        for cr := range cnres {
                fmt.Println("Connection to " + cr.host + " " + cr.message)
        }   
}() 
wg.Wait()
close(cnres)

顺便提一下(与代码卡住无关),我认为意图是在Dial()函数和随后的通道写入中使用hostname,而不是host


太好了,非常感谢!运行得很顺利。我会发布更新后的答案。 - George K.

1
感谢Frederik,我成功地将其运行起来了:
package main

import (
    "fmt"
    "net"
    "sync"

    "github.com/awslabs/aws-sdk-go/aws"
    "github.com/awslabs/aws-sdk-go/service/ec2"
)

// ConnectionResult container
type ConnectionResult struct {
    host    string
    message string
}

func main() {
    cnres := make(chan ConnectionResult)
    ec2svc := ec2.New(&aws.Config{Region: "us-east-1"})
    wg := sync.WaitGroup{}

    params := &ec2.DescribeInstancesInput{
        Filters: []*ec2.Filter{
            &ec2.Filter{
                Name: aws.String("instance-state-name"),
                Values: []*string{
                    aws.String("running"),
                },
            },
        },
    }

    resp, err := ec2svc.DescribeInstances(params)
    if err != nil {
        panic(err)
    }

    for _, res := range resp.Reservations {
        for _, inst := range res.Instances {
            for _, tag := range inst.Tags {
                if *tag.Key == "Name" {
                    host := *tag.Value
                    publicdnsname := *inst.PublicDNSName
                    wg.Add(1)
                    go func(ec2name, cbname string, cr chan ConnectionResult) {
                        defer wg.Done()
                        _, err := net.Dial("tcp", ec2name+":22")
                        if err != nil {
                            cr <- ConnectionResult{cbname, "failed"}
                        } else {
                            cr <- ConnectionResult{cbname, "succeeded"}
                        }
                    }(publicdnsname, host, cnres)
                }
            }
        }
    }

    go func() {
        for cr := range cnres {
            fmt.Println("Connection to " + cr.host + " " + cr.message)
        }
    }()

    wg.Wait()
}

0

Frederik的解决方案很好,但有一些例外情况。如果命令组例程(从写入通道的循环中)执行响应时间稍长的命令,则处理例程(Frederik的提示)将在最后一个命令例程完成之前处理并关闭通道,因此可能会发生一些数据丢失。

在我的情况下,我使用它来执行对多台服务器的远程SSH命令并打印响应。对我而言,工作解决方案是使用2个分离的WaitGroups,一个用于命令组例程,另一个用于处理例程。这样,处理例程将等待所有命令例程完成,然后处理响应并关闭通道以退出循环:

// Create waitgroup, channel and execute command with concurrency (goroutine)
outchan := make(chan CommandResult)
var wg_command sync.WaitGroup
var wg_processing sync.WaitGroup
for _, t := range validNodes {
    wg_command.Add(1)
    target := t + " (" + user + "@" + nodes[t] + ")"
    go func(dst, user, ip, command string, out chan CommandResult) {
        defer wg_command.Done()
        result := remoteExec(user, ip, cmdCommand)
        out <- CommandResult{dst, result}
    }(target, user, nodes[t], cmdCommand, outchan)
}

wg_processing.Add(1)
go func() {
    defer wg_processing.Done()
    for o := range outchan {
        bBlue.Println(o.target, "=>", cmdCommand)
        fmt.Println(o.cmdout)
    }
}()

// wait untill all goroutines to finish and close the channel
wg_command.Wait()
close(outchan)
wg_processing.Wait()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接