Flask Celery任务锁定

10
我正在使用 Flask 和 Celery,尝试锁定特定任务,以便一次只能运行一个。在 celery 文档中,它提供了一个示例来实现这个 Celery docs, Ensuring a task is only executed one at a time。虽然这个示例是针对 Django 的,但我已经尽力将其转换为适用于 Flask 的代码。然而,我仍然发现有锁的 myTask1 可以运行多次。
有一件事情不太清楚,就是我是否正确地使用了缓存。我以前从未使用过,所有内容都很新鲜。文档中提到但没有解释的一件事是:

文档说明:

为了使此功能正常工作,您需要使用缓存后端,其中 .add 操作是原子操作。memcached 已知可用于此目的。

我不太确定这意味着什么,我是否应该将缓存与数据库一起使用,如果是的话,我该如何做?我正在使用mongodb。在我的代码中,我只是为缓存设置了这个 cache = Cache(app, config={'CACHE_TYPE': 'simple'}),因为这是 Flask-Cache 文档中提到的Flask-Cache Docs

另一件让我不清楚的事情是,由于我是从我的 Flask 路由 task1 中调用 myTask1,所以我是否需要做任何不同的事情。

以下是我正在使用的代码示例。

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time


app = Flask(__name__)

cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################


app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')

    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html') 


@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))

最终工作代码

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis


app = Flask(__name__)

# ADDING REDIS
redis_store = FlaskRedis(app)

# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################

# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    print('in memcache_lock and timeout_at is {}'.format(timeout_at))
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
        print('memcache_lock and status is {}'.format(status))
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')
    print('dir is {} '.format(dir(self)))

    lock_id = self.name
    print('lock_id is {}'.format(lock_id))

    with memcache_lock(lock_id, self.app.oid) as acquired:
        print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html')

@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))


if __name__ == '__main__':
    app.secret_key = 'super secret key for me123456789987654321'
    app.run(port=1234, host='localhost')

这里还有一张截图,你可以看到我运行了myTask1两次和myTask2一次。现在我已经得到了myTask1的期望行为。现在,如果另一个工作人员试图接手它,myTask1将由单个工作人员运行,并根据我定义的重试次数进行重试。

Flower Dashboard


1
self 是一个字符串,而 self.cache 不存在。这只是一个猜测,但也许 Cache.add 应该是一个实例,所以像 Cache().add 这样的形式?因为当调用 add 函数时,第一个参数可能是 self,例如 def add(self, lock_id, oid, lock_expire):,所以根据你目前的代码,self 就是 lock_id? - jmunsch
谢谢您的建议,我尝试了一下status = Cache().add(lock_id, oid, LOCK_EXPIRE),但是这给了我一个新的回溯。 - Max Powers
with memcache_lock(lock_id, self.app.oid) as acquired: File "/auto/pysw/cel63/python/3.4.1/lib/python3.4/contextlib.py", line 59, in enter return next(self.gen) File "app.py", line 63, in memcache_lock status = Cache().add(lock_id, oid, LOCK_EXPIRE) File "/pyats2/lib/python3.4/site-packages/flask_cache/init.py", line 204, in add self.cache.add(*args, **kwargs) File "/ws/mastarke-sjc/pyats2/lib/python3.4/site-packages/flask_cache/init.py", line 192, in cache return app.extensions['cache'][self] KeyError: 'cache' - Max Powers
当任务被多次调用时,您希望它的行为是什么?您想让任务排队还是完全忽略它? - sytech
理想情况下我希望的是,当调用myTask1时,它只会被排队,直到锁定完成才会运行。 - Max Powers
让我再澄清一下,以防我的第一条评论不够清楚。如果myTask1没有在使用,那么我希望它能够运行。如果另一个工作者正在使用myTask1,理想情况下我希望任务被排队,但在锁定被移除之前不要运行。希望这样更清楚了。 - Max Powers
3个回答

7
在您的问题中,您指出了来自您使用的Celery示例的此警告:
为了使其正常工作,您需要使用缓存后端,其中.add操作是原子的。memcached已知可用于此目的。
您提到您不太理解这意味着什么。确实,您展示的代码表明您没有注意到这个警告,因为您的代码使用了不合适的后端。
请考虑以下代码:
with memcache_lock(lock_id, self.app.oid) as acquired:
    if acquired:
        # do some work

你想要的是在一个线程中只有一个acquired为真。如果两个线程同时进入with块,那么只有一个线程应该“获胜”并且拥有acquired为真。拥有acquired为真的线程可以继续进行其工作,而另一个线程必须跳过执行工作并稍后再试以获取锁。为确保只有一个线程可以拥有acquired为真,.add必须是原子性的。 下面是.add(key, value)的伪代码:
1. if <key> is already in the cache:
2.   return False    
3. else:
4.   set the cache so that <key> has the value <value>
5.   return True

如果.add的执行不是原子性的,那么如果两个线程A和B同时执行.add("foo", "bar"),假设开始时缓存为空,则可能会出现以下情况:
  1. 线程A执行1. if "foo" is already in the cache,发现"foo"不在缓存中,然后跳转到第3行,但线程调度器将控制权切换到了线程B。
  2. 线程B也执行1. if "foo" is already in the cache,同样发现"foo"不在缓存中。因此,它跳转到第3行,然后执行第4行和第5行,将键"foo"设置为值"bar",并返回True
  3. 最终,调度程序将控制权交还给线程A,线程A继续执行第3、4、5行,并将键"foo"设置为值"bar",同样返回True
你这里有两个.add调用都返回了True,如果这些.add调用是在memcache_lock中进行的,这意味着两个线程可以使acquired为true。因此,两个线程可以同时执行工作,而你的memcache_lock没有实现它应该实现的功能,即只允许一个线程同时工作。 你没有使用确保.add是原子性的缓存。你的初始化方式如下:
cache = Cache(app, config={'CACHE_TYPE': 'simple'})

simple 缓存后端 作用范围只限于单个进程,不具备线程安全性,.add 操作不是原子操作。(顺便说一下,这与 Mongo 没有关系。如果你想让缓存支持 Mongo,请选择一个专门将数据发送到 Mongo 数据库的后端。)

因此,您需要切换到另一个后端,保证 .add 是原子操作。您可以像 Celery 示例一样使用memcached 后端,它具有原子的 .add 操作。我没有使用 Flask,但是我以前使用 Django 和 Celery 实现了您正在此处实现的功能,并成功地使用 Redis 后端提供了此类锁定。


好的,与其像这样使用简单缓存 cache = Cache(app, config={'CACHE_TYPE': 'simple'}),我应该转而使用 Redis 缓存,例如 cache = Cache(app, config={'CACHE_TYPE': 'redis'})。目前我使用 RabbitMQ,但如果需要这样做才能使它工作,我可以切换到 Redis。 - Max Powers
是的,如果您想使用Redis,可以使用cache = Cache(app, config={'CACHE_TYPE': 'redis'})。我知道RabbitMQ的名字,但我从未真正使用过它,因此我不知道它是否适合用作锁定的后端。 - Louis
好的,谢谢。我在官方Flask-Cache文档的内置缓存后端中没有看到RabbitMQ的提及,但是我看到了Redis。让我在我的环境中设置Redis并查看其效果。如果可以的话,我会将您的答案标记为已接受。再次感谢您详细的解释。 - Max Powers
谢谢你,我终于成功了。基本上需要使用Redis,我改成了 cache = Cache(app, config={'CACHE_TYPE': 'redis'}),现在可以正常工作了。 - Max Powers

1
使用此设置,您仍应该期望看到工人接收任务,因为锁在任务内部被检查。唯一的区别是,如果锁已被其他工人获取,工作将不会执行。
在文档中给出的示例中,这是期望的行为; 如果锁已经存在,则任务将什么也不做并成功完成。您想要的略有不同; 您希望工作排队而不是被忽略。
为了获得所需的效果,您需要确保任务将在未来某个时间被工人接取并执行。实现此目的的一种方法是重试。
@task(bind=True, name='my-task')
def my_task(self):
    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later

谢谢,我明白你的观点并感谢你的帮助。然而,我仍然可以多次调用它。当我查看工作日志时,您可以看到工人3和4在大约同一时间拾取了myTask1。140> tail -f worker-3.log [2018-12-28 12:51:55,849: WARNING/ForkPoolWorker-3] acquired is True [2018-12-28 12:53:25,943: INFO/ForkPoolWorker-3] Task app.myTask1[5f492c3f-9684-493e-8873-1190f71527a5] succeeded in 90.10438371099985s: 'result' - Max Powers
这是工作进程4的日志 141> tail -f worker-4.log [2018-12-28 12:51:59,381: WARNING/ForkPoolWorker-4] acquired is True [2018-12-28 12:53:29,476: INFO/ForkPoolWorker-4] Task app.myTask1[be05682f-1ff4-452b-9dff-c4593bd3c452] succeeded in 90.11584289799998s: 'result' - Max Powers
你可以看到它们都在 '12:53' 左右选择了我的任务1。 - Max Powers
我更新了我的帖子,展示了使用你的解决方案的myTask1。 - Max Powers
那么这将向我表明memcache_lock未按预期运行。我不熟悉flask-cache,但是我想知道当您.add一个现有密钥时返回的值是什么。另外,看起来flask-cache绑定到请求上下文?根据文档,'simple'缓存只是Python字典。也许你可以尝试配置一个实际的memcached缓存。 - sytech
显示剩余2条评论

1
我发现这是一个令人惊讶的难题。主要受到 Sebastian's work 在redis中实现分布式锁算法的启发,我写了一个 装饰器函数
需要记住的关键点是,我们在任务参数空间的级别上锁定任务。例如,我们允许多个游戏更新/处理顺序任务并发运行,但每个游戏只允许一个任务。这就是下面代码中 argument_signature 的作用。您可以在 this gist 中查看我们如何在堆栈中使用此文档。
import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid

from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock

rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])

TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60  # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60  # we can keep cached values around for a shorter period of time

REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
"""


@contextmanager
def redis_lock(lock_name, expires=60):
    # https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
    random_value = str(uuid.uuid4())
    lock_acquired = bool(
        rds.set(lock_name, random_value, ex=expires, nx=True)
    )
    yield lock_acquired
    if lock_acquired:
        rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)


def argument_signature(*args, **kwargs):
    arg_list = [str(x) for x in args]
    kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
    return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()


def task_lock(func=None, main_key="", timeout=None):
    def _dec(run_func):
        def _caller(*args, **kwargs):
            with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
                if not acquired:
                    return TASK_LOCK_MSG
                return run_func(*args, **kwargs)
        return _caller
    return _dec(func) if func is not None else _dec

我们任务定义文件中的实现:

@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
    print(f"processing game_id {game_id}")
    time.sleep(TASK_LOCK_TEST_SLEEP)

我们如何针对本地的celery集群进行测试:
from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
    def test_task_locking(self):
        rds.flushall()
        res1 = async_test_task_lock.delay(3)
        res2 = async_test_task_lock.delay(5)
        self.assertFalse(res1.ready())
        self.assertFalse(res2.ready())
        res3 = async_test_task_lock.delay(5)
        res4 = async_test_task_lock.delay(5)
        self.assertEqual(res3.get(), TASK_LOCK_MSG)
        self.assertEqual(res4.get(), TASK_LOCK_MSG)
        time.sleep(TASK_LOCK_TEST_SLEEP)
        res5 = async_test_task_lock.delay(3)
        self.assertFalse(res5.ready())

作为福利,这里还有一个快速设置redis_cache的示例。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接