在Python的paramiko模块中运行长时间的ssh命令(以及如何结束它们)

25

我想使用Python的paramiko模块在远程机器上运行tail -f logfile命令。到目前为止,我尝试了以下方式:

interface = paramiko.SSHClient()
#snip the connection setup portion
stdin, stdout, stderr = interface.exec_command("tail -f logfile")
#snip into threaded loop
print stdout.readline()

我想让命令一直运行,但有两个问题:

  1. 如何优雅地停止它? 我考虑创建一个Channel,然后在完成后使用shutdown()命令关闭该通道- 但这似乎很麻烦。 是否可能向通道的标准输入发送Ctrl-C之类的内容来停止它?
  2. readline()会阻塞,如果我有一种非阻塞获取输出的方法,就可以避免线程- 有什么想法吗?

很抱歉要传递这个坏消息,但SSHClient()已经在内部使用线程了。 - joeforker
6个回答

23

不要在客户端上调用exec_command,而是获取传输并生成自己的通道。可以使用通道来执行命令,并且您可以在选择语句中使用它来查找何时可以读取数据:

#!/usr/bin/env python
import paramiko
import select
client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect('host.example.com')
transport = client.get_transport()
channel = transport.open_session()
channel.exec_command("tail -f /var/log/everything/current")
while True:
  rl, wl, xl = select.select([channel],[],[],0.0)
  if len(rl) > 0:
      # Must be stdout
      print channel.recv(1024)

通过连接到远程命令的标准输出和标准输入,可以读取和写入通道对象。调用channel.makefile_stderr(...)可获取标准错误。

我将超时设置为0.0秒,因为要求使用非阻塞解决方案。根据您的需求,您可能需要使用非零超时来阻塞。


我已经修改和扩展了这个例子,并测试过确保它可以正常工作 :)。 - Andrew Aylett
1
@Vivek:你仍然需要查看rl,这是可以读取的套接字列表。请查看channel.recv_stderr()(和channel.recv_stderr_ready())的文档,以了解如何读取远程stderr。 - Andrew Aylett
我明白了,谢谢。我之前尝试使用xl,但结果很奇怪。 - abc def foo bar
这个脚本执行一个远程的 sleep 10,但仍然需要占用我100%的CPU。 - azmeuk
嗨,@azmeuk,我记得为什么超时时间为零了——因为OP想要一个非阻塞的解决方案。所以你可能想要阻塞或本地等待——如果前一次迭代没有产生任何工作,添加超时可能是明智的选择。 - Andrew Aylett
显示剩余2条评论

15

1)如果您希望,可以直接关闭客户端。另一端的服务器将会杀死尾部进程。

2)如果您需要以非阻塞的方式进行操作,您需要直接使用通道对象。您可以使用channel.recv_ready()和channel.recv_stderr_ready()同时监视stdout和stderr或者使用select.select。


2
我来晚了,但是 exec_command 本身不是非阻塞的吗? - zengr
2
在一些较新的服务器上,即使您终止客户端,您的进程也不会被终止。您需要在exec_command()中设置get_pty=True,以便在退出客户端后清理这些进程。 - nlsun
get_pty=True使您能够正确执行Ctrl+C,但它会导致所有命令在大约10分钟后超时。因此,您无法执行长时间运行的命令。 - Łukasz Strugała

9

对于Andrew Aylett的解决方案,这里有一个小更新。以下代码实际上在外部进程完成后会中断循环并退出:

import paramiko
import select

client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect('host.example.com')
channel = client.get_transport().open_session()
channel.exec_command("tail -f /var/log/everything/current")
while True:
    if channel.exit_status_ready():
        break
    rl, wl, xl = select.select([channel], [], [], 0.0)
    if len(rl) > 0:
        print channel.recv(1024)

另外,请参见https://dev59.com/KnRA5IYBdhLWcg3w_C8w#Ef0YoYgBc1ULPQZFyIj5。 - azmeuk
1
@azmeuk 两种解决方案都略有不妥,因为您不希望在退出状态准备好时立即停止接收输出。您希望在没有输出可接收且退出状态已准备好时停止。否则,您可能会在接收所有输出之前退出。 - user7610
@ Jiri,你说得对,我也遇到了你提到的类似问题。请问你知道有没有什么解决方法?我的一些输出被从tail -f <file>中跳过了。 - user5154816

0
我解决这个问题的方法是使用上下文管理器。这将确保我的长时间运行的命令被中止。关键逻辑是包装以模仿SSHClient.exec_command,捕获创建的通道并使用Timer,如果命令运行时间过长,则关闭该通道。
import paramiko
import threading


class TimeoutChannel:

    def __init__(self, client: paramiko.SSHClient, timeout):
        self.expired = False
        self._channel: paramiko.channel = None
        self.client = client
        self.timeout = timeout

    def __enter__(self):
        self.timer = threading.Timer(self.timeout, self.kill_client)
        self.timer.start()

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print("Exited Timeout. Timed out:", self.expired)
        self.timer.cancel()

        if exc_val:
            return False  # Make sure the exceptions are re-raised

        if self.expired:
            raise TimeoutError("Command timed out")

    def kill_client(self):
        self.expired = True
        print("Should kill client")
        if self._channel:
            print("We have a channel")
            self._channel.close()

    def exec(self, command, bufsize=-1, timeout=None, get_pty=False, environment=None):
        self._channel = self.client.get_transport().open_session(timeout=timeout)
        if get_pty:
            self._channel.get_pty()
        self._channel.settimeout(timeout)
        if environment:
            self._channel.update_environment(environment)
        self._channel.exec_command(command)
        stdin = self._channel.makefile_stdin("wb", bufsize)
        stdout = self._channel.makefile("r", bufsize)
        stderr = self._channel.makefile_stderr("r", bufsize)
        return stdin, stdout, stderr

现在使用这段代码非常简单,第一个示例将会抛出一个TimeoutError

ssh = paramiko.SSHClient()
ssh.connect('hostname', username='user', password='pass')

with TimeoutChannel(ssh, 3) as c:
    ssh_stdin, ssh_stdout, ssh_stderr = c.exec("cat")    # non-blocking
    exit_status = ssh_stdout.channel.recv_exit_status()  # block til done, will never complete because cat wants input

这段代码应该可以正常工作(除非主机负载过高!)

ssh = paramiko.SSHClient()
ssh.connect('hostname', username='user', password='pass')

with TimeoutChannel(ssh, 3) as c:
    ssh_stdin, ssh_stdout, ssh_stderr = c.exec("uptime")    # non-blocking
    exit_status = ssh_stdout.channel.recv_exit_status()     # block til done, will complete quickly
    print(ssh_stdout.read().decode("utf8"))                 # Show results

0

要关闭进程,只需运行:

interface.close()

就非阻塞而言,你无法进行非阻塞读取。你能做的最好的事情就是一次一个“块”地解析它,“stdout.read(1)”只有在缓冲区中没有字符时才会阻塞。


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接