celery,flask sqlalchemy:DatabaseError:(DatabaseError)SSL错误:解密失败或不良记录mac。

14

你好,我有一个使用Celery Flask SqlAlchemy的设置,但是我不时地遇到这个错误:

 (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac

我按照这篇文章的内容进行了操作:Celery + SQLAlchemy : DatabaseError: (DatabaseError) SSL error: decryption failed or bad record mac。另外,我还添加了一些预运行和后运行的方法:
@task_postrun.connect
def close_session(*args, **kwargs):
    # Flask SQLAlchemy will automatically create new sessions for you from 
    # a scoped session factory, given that we are maintaining the same app
    # context, this ensures tasks have a fresh session (e.g. session errors 
    # won't propagate across tasks)
    d.session.remove()

@task_prerun.connect
def on_task_init(*args, **kwargs):
    d.engine.dispose()

但我还是看到这个错误。有人解决了吗?
请注意,我正在AWS上运行(两个服务器访问同一个数据库)。数据库本身托管在它自己的服务器上(不是RDS)。我相信运行的celery后台任务总数为6(2+4)。Flask前端使用gunicorn运行。
相关主题: https://github.com/celery/celery/issues/3238#issuecomment-225975220

你能找到解决方案吗? - Ritesh Kadmawala
没有,还没有找到一个。 - Ankit
我在AWS上使用Celery、SQLAlchemy和PostgreSQL,没有出现这样的问题。我能想到的唯一区别是我的数据库在RDS上。我认为你可以尝试暂时切换到RDS,只是为了测试问题是否仍然存在。如果使用RDS后问题消失了,那么你就需要查看PostgreSQL的设置。 - Borys Serebrov
嗨Boris,你知道Postgres配置中是否启用了SSL吗?我相信只有在启用SSL时才会出现这个问题。 - Ankit
@BorisSerebrov,你能分享一下celery初始化的代码吗?根据我目前的了解,这取决于如何初始化其中的celery和sqlalchemy。 - Ankit
@Ankit,我已经在回答中发布了我的代码,如果在评论区进行展示将是过多的文本。 - Borys Serebrov
1个回答

1

这是我的评论和额外信息:

我在AWS上使用Celery、SQLAlchemy和PostgreSQL,并没有出现这样的问题。唯一能想到的区别就是我把数据库放在了RDS上。我认为你可以尝试暂时切换到RDS,只是为了测试问题是否仍然存在。如果使用RDS后问题消失了,那么你就需要查看PostgreSQL的设置。

根据RDS参数,我启用了SSL:

ssl = 1, Enables SSL connections.
ssl_ca_file = /rdsdbdata/rds-metadata/ca-cert.pem
ssl_cert_file = /rdsdbdata/rds-metadata/server-cert.pem
ssl_ciphers = false, Sets the list of allowed SSL ciphers.
ssl_key_file = /rdsdbdata/rds-metadata/server-key.pem
ssl_renegotiation_limit = 0, integer, (kB) Set the amount of traffic to send and receive before renegotiating the encryption keys.

关于Celery的初始化代码,大致如下。
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

import sqldb

engine = sqldb.get_engine()
cached_data = None

def do_the_work():
    global engine, ruckus_data
    if cached_data is not None:
        return cached_data
    db_session = None
    try:
        db_session = scoped_session(sessionmaker(
            autocommit=False, autoflush=False, bind=engine))
        data = sqldb.get_session().query(
            sqldb.system.MyModel).filter_by(
                my_type = sqldb.system.MyModel.TYPEA).all()
        cached_data = {}
        for row in data:
            ... # put row into cached_data
    finally:
        if db_session is not None:
            db_session.remove()
    return cached_data

这个do_the_work函数随后从celery任务中被调用。 sqldb.get_engine看起来像这样:
from sqlalchemy import create_engine

_engine = None

def get_engine():
    global _engine
    if _engine:
        return _engine
    _engine = create_engine(config.SQL_DB_URL, echo=config.SQL_DB_ECHO)
    return _engine

最后,config模块中的SQL_DB_URI和SQL_DB_ECHO分别是这样的:

SQL_DB_URL = 'postgresql+psycopg2://%s:%s@%s/%s' % (
    POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DB_NAME)
SQL_DB_ECHO = False

谢谢,所以您在每个任务上都创建一个新的引擎和会话吗?另外想知道为什么要这样做:sqldb.get_session().query 而不是 db_session.query - Ankit
@Ankit engine变量在模块级别上,它充当一个单例。引擎只会被创建一次并在后续任务执行中重复使用(至少这是我从测试中记得的)。sqldb.get_session() 是一个复制粘贴错误,它应该只是 db_session.query - Borys Serebrov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接