多进程中的解密失败或坏记录MAC

6

我试图让所有PC核心在填充PostgreSQL数据库时同时工作,我已经编辑了代码以制作出现错误的可重现示例。

Traceback (most recent call last):
  File "test2.py", line 50, in <module>
    download_all_sites(sites)
  File "test2.py", line 36, in download_all_sites
    pool.map(download_site, sites)
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
psycopg2.OperationalError: SSL error: decryption failed or bad record mac

完整的代码,导致错误。
import requests
import multiprocessing
import time
import os
import psycopg2
session = None
conn = psycopg2.connect(user="user",
                        password="pass123",
                        host="127.0.0.1",
                        port="5432",
                        database="my_db")
cursor = conn.cursor()


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(domain):
    url = "http://" + domain
    with session.get(url) as response:
        temp = response.text.lower()
        found = [i for i in keywords if i in temp]
        query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
        cursor.execute(query, (domain, found))


def download_all_sites(sites):
    with multiprocessing.Pool(processes=os.cpu_count(), initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = ['google.com'] * 10
    keywords = ['google', 'success']
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    conn.commit()
    print(f"Finished {len(sites)} in {duration} seconds")

不要在多个进程之间共享相同的数据库连接,在download_site函数内打开数据库连接。 - Maurice Meyer
@MauriceMeyer 你的意思是在函数内打开连接、执行查询,然后关闭连接吗? - wishmaster
1个回答

6
为每个多进程创建一个新的Postgres连接。不应该在forked进程(多进程)中使用Libpq连接, 这在Postgres文档的第二个警告框中提到。请参见postgres文档
import requests
import multiprocessing
import time
import os
import psycopg2
session = None    

def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(domain):
    url = "http://" + domain
    with session.get(url) as response:
        #temp = response.text.lower()
        #found = [i for i in keywords if i in temp]
        #query = """INSERT INTO test (domain, keyword) VALUES (%s, %s)"""
        conn = psycopg2.connect(
            "dbname=mf port=5959 host=localhost user=mf_usr"
        )
        cursor = conn.cursor()
        query = """INSERT INTO mytable (name) VALUES (%s)"""
        cursor.execute(query, (domain, ))
        conn.commit()
        conn.close()


def download_all_sites(sites):
    with multiprocessing.Pool(
        processes=os.cpu_count(), initializer=set_global_session
    ) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = ['google.com'] * 10
    keywords = ['google', 'success']
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Finished {len(sites)} in {duration} seconds")

    # make sure it worked!
    conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")
    cursor = conn.cursor()
    cursor.execute('select count(name) from mytable')
    print(cursor.fetchall())  # verify 10 downloads == 10 records in database

Out:

Finished 10 in 0.9922008514404297 seconds
[(10,)]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接