Celery WorkerLostError 工作者意外退出: 信号 6 (SIGABRT)

3
我想把异步工作变成同步的。我使用couchbase中数据的id作为redis中的键。我再次upsert couchbase并将redis的值作为其值。当以同步方式运行时,它运行良好,但异步执行时会失败。"最初的回答"
from couchbase.cluster import Cluster
from couchbase.cluster import PasswordAuthenticator
from couchbase.n1ql import N1QLQuery
from couchbase.n1ql import N1QLRequest
from elasticsearch import Elasticsearch
from time import sleep
from urllib import request
from wikidata.client import Client
from openpyxl import Workbook
import base64
import json
import osmapi
import re
import redis
import urllib
import hashlib
import time
import requests
import csv
from functools import reduce

wikid_client = Client()
es_client = Elasticsearch("--")
cluster = Cluster('couchbase://--')
authenticator = PasswordAuthenticator('ATLAS-DEV','ATLAS-DEV')
cluster.authenticate(authenticator)
bucket_test = cluster.open_bucket('ATLAS-DEV')
redis_db = redis.StrictRedis(host='--', port=6379, db=10, decode_responses=True)
map_api = osmapi.OsmApi()
N1QLQuery.timeout = 3600


def deep_get(dictionary, keys, default=None):
    return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)

query = N1QLQuery("SELECT meta().id, * FROM `ATLAS-DEV` WHERE class_type = 'REGION' AND (codes.osm IS NOT NULL OR codes.osm != '') LIMIT 20")
for k in bucket_test.n1ql_query(query):
    # print("k : ")
    # print(k)
    document_id = k.get("id")
    # print("document_id : " + document_id)
    osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
    polygon = redis_db.hget(name="osm:polygons", key=osm_id)
    if polygon is not None:
        k['ATLAS-DEV'].update({'boundaries': polygon})
        bucket_test.upsert(document_id, k['ATLAS-DEV'])

这是我的同步工作。它运行良好,可以插入/更新Couchbase。

但是这里是我的Celery作业代码。


from app.jobs.task import each_k
query = N1QLQuery("SELECT meta().id, * FROM `ATLAS-DEV` WHERE class_type = 'REGION' AND (codes.osm IS NOT NULL OR codes.osm != '') LIMIT 5")
for k in bucket_test.n1ql_query(query):
    # print("k : ")
    # print(k)
    each_k.delay(k)




# /app/jobs/task.py

def deep_get(dictionary, keys, default=None):
    return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)

@app.task
@provide_redis
def each_k(redis_db, k):
    print("레디스 디비 : ")
    print(redis_db)
    document_id = k.get("id")
    print("document_id : " + document_id)
    osm_id = deep_get(k, "ATLAS-DEV.codes.osm")
    polygon = redis_db.hget(name="osm:polygons", key=osm_id)
    print("폴리곤 : ")
    print(polygon)
    if polygon is not None:
        k['ATLAS-DEV'].update({'boundaries': polygon})
        bucket_test.upsert(document_id, k['ATLAS-DEV'])


最初的回答

它返回

[warn] kevent: Bad file descriptor
python-couchbase: self->nremaining == 0 at src/oputil.c:67. Abort[2019-06-26 15:13:03,675: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT).')
Traceback (most recent call last):
  File "/Users/n18016/Library/Python/3.7/lib/python/site-packages/billiard/pool.py", line 1226, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT).
[2019-06-26 15:13:03,701: ERROR/MainProcess] Process 'ForkPoolWorker-11' pid:7697 exited with 'signal 6 (SIGABRT)'
[2019-06-26 15:13:03,702: ERROR/MainProcess] Process 'ForkPoolWorker-10' pid:7696 exited with 'signal 6 (SIGABRT)'
[2019-06-26 15:13:03,703: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT).')
Traceback (most recent call last):
  File "/Users/n18016/Library/Python/3.7/lib/python/site-packages/billiard/pool.py", line 1226, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT).
[2019-06-26 15:13:03,704: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 6 (SIGABRT).')
Traceback (most recent call last):
  File "/Users/n18016/Library/Python/3.7/lib/python/site-packages/billiard/pool.py", line 1226, in mark_as_worker_lost
    human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 6 (SIGABRT).

1个回答

11

https://github.com/celery/celery/issues/4113

celery worker --pool solo ...

这个cli选项挽救了我的生命。它运行良好。这个命令可以很好地在异步单线程中运行。

它可能不是所有情况的答案。如果你想在单线程中执行,那么这就是答案。


同意,这并不是解决方案。它仍然保持开放状态。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接