RabbitMQ Pika连接重置,(-1,ConnectionResetError(104,'连接被对等方重置'))

6

我在 Stackoverflow 上搜索并发布了这个问题,因为没有任何解决方案适用于我,我的问题可能与其他问题不同。

我正在编写一个脚本,从 rabbitMQ 队列中获取文章,并处理文章以计算单词数和提取关键词,然后将其转储到数据库中。我的脚本正常运行,但在执行一段时间后,我会收到以下异常:
(-1, "ConnectionResetError(104, 'Connection reset by peer')")

我不知道为什么会出现这种情况。我尝试了许多可用的解决方案,但都不适用于我。我编写了两种不同的脚本,并尝试运行它们。它们都运行良好,但是在一段时间后都会出现相同的异常。

这是我的第一个代码:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    # Edit 4
    def pika_connect():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
        channel = connection.channel()
        print ("In pika connect")
        Logger.log_message('Setting up input queue consumer')
        channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
        channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

        Logger.log_message('Starting loop')
        channel.start_consuming()

    #########

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Edit 5 starting 10 threads to listen to pika 

    for th in range(qthreads):
        Logger.log_message('Starting thread: '+str(th))
        try:
            t = Thread(target=pika_connect, args=())
            t.start()
        except Exception as e:
            Logger.error_message("Exception in starting threads " + str(e))



try:
    app_main()
except Exception as e:
    Logger.error_message("Exception in APP MAIN " + str(e))

这是我的第二段代码:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()
    print ("In app main")
    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
    channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

    Logger.log_message('Starting loop')

    try:
        channel.start_consuming()
    except Exception as e:
        Logger.error_message("Exception in start_consuming in main " + str(e))
        raise e


try:
    app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


在我的第一段代码中,我使用了线程,因为我想加快处理文章的速度。
这是我的回调函数:
def on_message(ch, method, properties, message): Logger.log_message("Starting parsing new msg ") handle_message(message)

编辑:完整代码

import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
    vars = {}
    lock = None

    def __init__(self):
        self.lock = threading.Lock()

    def inc(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] += 1
            else:
                self.vars[var] = 1
        finally:
            self.lock.release()


    def dec(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] -= 1
            else:
                Logger.error_message('Cannot decrement ' + var + ', not tracked')
        finally:
            self.lock.release()

    def get(self, var):

        out = None
        self.lock.acquire()
        try:
            if var in self.vars:
                out = self.vars[var]
            else:
                Logger.error_message('Cannot get ' + var + ', not tracked')
        finally:
            self.lock.release()


        return out

    def get_all(self):

        out = None
        self.lock.acquire()
        try:
            out = self.vars.copy()
        finally:
            self.lock.release()


        return out


class SpeedTracker(threading.Thread):
    speedvars = None
    start_ts = None
    last_vars = {}

    def __init__(self, speedvars):
        super(SpeedTracker, self).__init__()
        self.start_ts = time.time()
        self.speedvars = speedvars
        Logger.log_message('Setting up speed tracker')

    def run(self):
        while True:
            time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
            prev = self.last_vars
            cur = self.speedvars.get_all()
            now = time.time()
            if len(prev) > 0:
                q = {}
                for key in cur:
                    qty = cur[key] - prev[key]
                    avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
                    overall_avg = cur[key] / (now - self.start_ts)
                    Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
                                       + ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
                                       + ', overall speed ' + '%0.2f' % overall_avg + '/sec')
                pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
                pending_avg = pending / (now - self.start_ts)
                Logger.log_message('Speed-tracking (pending): total ' + str(pending)
                                   + ', overall speed ' + '%0.2f' % pending_avg + '/sec')
            self.last_vars = cur


class ResultsSender(threading.Thread):
    channel = None
    results = None
    speedvars = None

    def __init__(self, results, speedvars):
        super(ResultsSender, self).__init__()
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=Config.AMQ_DAEMONS['base']['amq-host']))
        self.channel = connection.channel()
        Logger.log_message('Setting up output exchange')
        self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
        self.results = results
        self.speedvars = speedvars

    def run(self):
        while True:
            item = self.results.get()
            self.channel.basic_publish(
                exchange=Config.AMQ_DAEMONS['consumer']['output'],
                routing_key='',
                body=item)
            self.speedvars.inc(SPD_SENT)

def parse_message(message):
    try:
        bodytxt = message.decode('UTF-8')
        body = json.loads(bodytxt)
        return body
    except Exception as e:
        Logger.error_message("Cannot parse message - " + str(e))
        raise e

def get_body_elements(body):
    try:
        artid = str(body.get('article_id'))
        article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
        date = article_dt.strftime(Config.DATE_FORMAT)
        article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
        return (artid, date, article)
    except Exception as e:
        Logger.error_message("Cannot retrieve article attributes " + str(e))
        raise e

def process_article(id, date, text):
    global results, speedvars
    try:
        Logger.log_message('Processing article ' + id)
        keywords = Pipeline.extract_keywords(text)
        send_data = {"id": id, "date": date, "keywords": keywords}
        results.put(pickle.dumps(send_data))
        # print('Queue Size:',results.qsize())
    except Exception as e:
        Logger.error_message("Problem processing article " + str(e))
        raise e

def ack_message(ch, delivery_tag):
    """Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        #pass

def handle_message(connection, ch, delivery_tag, message):
    global speedvars
    start = time.time()
    thread_id = threading.get_ident()

    try:
        speedvars.inc(SPD_RECEIVED)
        body = parse_message(message)
        (id, date, text) = get_body_elements(body)
        words = len(text.split())
        if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
            process_article(id, date, text)
        else:
            Logger.log_message('Ignoring article, over word count limit')
            speedvars.inc(SPD_DISCARDED)

    except Exception as e:
        Logger.error_message("Could not process message - " + str(e))

    cb = functools.partial(ack_message, ch, delivery_tag)
    connection.add_callback_threadsafe(cb)

    Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag)) 
    Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK    
## def on_message(ch, method, properties, message):
##    global executor
##    executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
    (connection, threads) = args
    delivery_tag = method.delivery_tag
    t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
    t.start()
    threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
    global channel, results, speedvars

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Pika Connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()

    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

    #channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
    channel.basic_qos(prefetch_count=1)
    threads = []
    on_message_callback = functools.partial(on_message, args=(connection, threads))
    channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

    Logger.log_message('Starting loop')
    ## channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()

    Wait for all to complete
    for thread in threads:
        thread.join()

    connection.close()


app_main()  

pika处理消息的时间很短,但我仍然遇到了连接重置问题。
**处理消息所需总时间:0.0005991458892822266 **


RabbitMQ日志包含什么内容? - Luke Bakken
@LukeBakken =错误报告==== 2019年1月1日::12:45:17 === 关闭AMQP连接<0.13654.58>([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}}=错误报告==== 2019年1月1日::12:48:19 === 关闭AMQP连接<0.13560.58>([::1]:44006 -> [::1]:5672): 客户端未发送心跳信号,超时:60秒这是我的日志文件显示的内容。但我不明白为什么会丢失心跳信号,因为我的脚本几乎每2到3秒就从rabbitmq队列中取出文章。 - irum zahra
1
这个回答解决了你的问题吗?在pika/RabbitMQ中处理长时间运行的任务 - xanjay
3个回答

14
你的handle_message方法阻塞了心跳,因为你的所有代码,包括Pika I/O循环,都在同一个线程上运行。请参考this example,了解如何将你的工作(handle_message)在一个独立的线程上运行,然后正确地确认消息。

使用这个例子,连接错误问题已经解决了。但是现在内存开始缓慢增加,并且保持增加。我不知道是什么引起了这个问题。 - irum zahra
1
使用线程的示例代码只是一个示例,不应被视为“生产代码”。当您不断添加到threads列表时,您认为在应用程序中会发生什么? - Luke Bakken
我在没有使用线程的情况下尝试了我的代码,但连接在5分钟后仍然会断开。handle_message() 处理一篇文章所需的时间并不长,但是 Pika 仍然会错过心跳。处理消息所需的总时间:0.0005991458892822266 这是处理一条消息所需的时间。你有什么建议吗? - irum zahra
考虑到已有许多人在生产环境中成功使用Pika而没有遇到此问题,我只能得出您的代码仍然存在某些问题的结论。如果您能提供一个可工作的代码集来重现您所看到的问题,请在此处打开一个问题(https://github.com/pika/pika/issues),并提供代码以及您正在使用的Pika、Python、RabbitMQ和Erlang版本。所谓“可工作的代码集”,是指我可以下载并运行的内容。谢谢。 - Luke Bakken
感谢@LukeBakken提供的解决方案;在阅读您的解决方案之前,我已经苦苦挣扎了三天。 :-) 应该在第一次尝试时就去Stack Overflow。 - shahjapan
显示剩余3条评论

5

我遇到了同样的问题。增加心跳和连接超时时间的配置并没有解决我的问题。最后我发现,如果您已经创建了一个频道,并且在几分钟内(在我这种情况下为20分钟)没有发布任何内容,则会出现此错误。
对我有效的解决方案:

  1. Create channel immediately just before publishing any message. OR

  2. Use try-except and if you get an exception , create another channel and republish. ie.

     try:
         channel.basic_publish(exchange='', routing_key='abcd', body=data)
     except Exception as e1:
         connection=pika.BlockingConnection(pika.ConnectionParameters(host='1.128.0.3',credentials=credentials))
         channel = connection.channel()
         channel.basic_publish(exchange='', routing_key='abcd', body=data)
    

这样至少可以保持事情运转,并防止丢失任何数据。我不是这方面的专家,但希望能帮助到某些人!


3

我也遇到了同样的问题,并增加了心跳和连接超时配置的持续时间来解决它。

非常感谢@LukeBakken,他实际上找到了根本原因。

以下是如何配置超时:

import pika


def main():

    # NOTE: These parameters work with all Pika connection types
    params = pika.ConnectionParameters(heartbeat=600, blocked_connection_timeout=300)

    conn = pika.BlockingConnection(params)

    chan = conn.channel()

    chan.basic_publish('', 'my-alphabet-queue', "abc")

    # If publish causes the connection to become blocked, then this conn.close()
    # would hang until the connection is unblocked, if ever. However, the
    # blocked_connection_timeout connection parameter would interrupt the wait,
    # resulting in ConnectionClosed exception from BlockingConnection (or the
    # on_connection_closed callback call in an asynchronous adapter)
    conn.close()


if __name__ == '__main__':
    main()

参考: https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接