我将使用Paramiko来监控测试运行期间远程机器上的日志。
监视程序在守护线程中执行,大致执行以下操作:
在Paramiko的transport.py文件中,我认为这就是错误所在。请查找下面的 #<<<<<<<<<<<<<<<<<<<<<<<<<<<。
当运行卡住时,我可以运行 >>>>> sudo lsof -i -n | egrep '\' 来查看是否有ssh连接被卡住了(无限期地卡住)。我的主要测试进程的PID是15010。
监视程序在守护线程中执行,大致执行以下操作:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('sudo tail -f ' + self.logfile)
last_partial = ''
while not self.stopped.isSet():
try:
if None == select or None == channel:
break
rl, wl, xl = select.select([channel], [], [], 1.0)
if None == rl:
break
if len(rl) > 0:
# Must be stdout, how can I check?
line = channel.recv(1024)
else:
time.sleep(1.0)
continue
except:
break
if line:
#handle saving the line... lines are 'merged' so that one log is made from all the sources
ssh.close()
我曾经遇到阻塞读取的问题,所以我开始采用这种方式处理,大部分时间都可以很好地工作。但当网络速度变慢时,我会遇到问题。
有时在运行结束时(在上面的self.stopped被设置后),我会看到这个错误。我已经尝试过在设置stopped并加入所有监视器线程后睡眠,但仍然可能出现停顿。
Exception in thread Thread-9 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run
<type 'exceptions.AttributeError'>: 'NoneType' object has no attribute 'error'
在Paramiko的transport.py文件中,我认为这就是错误所在。请查找下面的 #<<<<<<<<<<<<<<<<<<<<<<<<<<<。
self._channel_handler_table[ptype](chan, m)
elif chanid in self.channels_seen:
self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
else:
self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
self.active = False
self.packetizer.close()
elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
self.auth_handler._handler_table[ptype](self.auth_handler, m)
else:
self._log(WARNING, 'Oops, unhandled type %d' % ptype)
msg = Message()
msg.add_byte(cMSG_UNIMPLEMENTED)
msg.add_int(m.seqno)
self._send_message(msg)
except SSHException as e:
self._log(ERROR, 'Exception: ' + str(e))
self._log(ERROR, util.tb_strings()) #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470
self.saved_exception = e
except EOFError as e:
self._log(DEBUG, 'EOF in transport thread')
#self._log(DEBUG, util.tb_strings())
self.saved_exception = e
except socket.error as e:
if type(e.args) is tuple:
if e.args:
emsg = '%s (%d)' % (e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
emsg = e.args
self._log(ERROR, 'Socket exception: ' + emsg)
self.saved_exception = e
except Exception as e:
self._log(ERROR, 'Unknown exception: ' + str(e))
self._log(ERROR, util.tb_strings())
当运行卡住时,我可以运行 >>>>> sudo lsof -i -n | egrep '\' 来查看是否有ssh连接被卡住了(无限期地卡住)。我的主要测试进程的PID是15010。
sshd 6478 root 3u IPv4 46405 0t0 TCP *:ssh (LISTEN)
sshd 6478 root 4u IPv6 46407 0t0 TCP *:ssh (LISTEN)
sshd 14559 root 3r IPv4 3287615 0t0 TCP 172.16.0.171:ssh- >10.42.80.100:59913 (ESTABLISHED)
sshd 14563 cmead 3u IPv4 3287615 0t0 TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED)
python 15010 root 12u IPv4 3291525 0t0 TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 15u IPv4 3291542 0t0 TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 16u IPv4 3291784 0t0 TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED)
python 15010 root 17u IPv4 3291779 0t0 TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 20u IPv4 3291789 0t0 TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 65u IPv4 3292014 0t0 TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED)
sshd 15106 root 3r IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
sshd 15110 cmead 3u IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
所以,我只是希望我的进程不会挂起。哦,还有,如果更新Paramiko需要更新Python到2.6.6以上,我不想更新,因为我使用的是CentOS,根据我所了解的,超过2.6.6可能会很“复杂”。
感谢任何想法。
对shavenwarthog的评论太长了:
嗨,谢谢你的回答。我有几个快速问题。1)如果我需要在未知时间停止线程怎么办?换句话说,在这三分钟内,tail -f blah.log线程可能会运行三分钟,我想在这三分钟内检查累积数据10次?2)差不多一样,我猜,当我尝试在一些实际的远程机器上运行时,它不会退出(因为tail -f永远不会退出)。我已经忘记了这一点,但我认为非阻塞读取是为了解决这个问题的。你认为你在其他评论中提到的线程加上这个足以使其工作吗?基本上使用我的非阻塞读取来收集每个运行线程的本地数据。然后,当主线程需要每个运行程序的数据时,我只需要锁定,这似乎会将我的一个锁分配给10个锁,这将有所帮助。这有意义吗?