Django多数据库(写入数据库、读取副本)同步问题

4

所以......当我收到 API 调用时,我会执行以下操作:

i = CertainObject(paramA=1, paramB=2)
i.save()

现在我的作者数据库有一条新记录。

处理可能需要一些时间,我不希望拖延API调用者的响应,因此接下来的代码行,我使用Celery将对象ID传递给异步任务:

run_async_job.delay(i.id)

根据队列,立即执行或几秒钟后执行run_async_job尝试从数据库中加载提供的ID的记录。这是一种赌博。有时它能起作用,有时不行,这取决于读副本是否进行了更新。

是否有模式可以保证成功而无需在读取之前“休眠”几秒钟或者希望好运呢?

谢谢。

4个回答

2

最简单的方法似乎是使用Greg和Elrond在他们的答案中提到的重试。如果你正在使用shared_task或@app.task装饰器,你可以使用以下代码片段。

@shared_task(bind=True)
def your_task(self, certain_object_id):
    try:
        certain_obj = CertainObject.objects.get(id=certain_object_id)
        # Do your stuff
    except CertainObject.DoesNotExist as e:
        self.retry(exc=e, countdown=2 ** self.request.retries, max_retries=20)

我在每次重试之间使用了指数倒计时。您可以根据需要进行修改。
您可以在此处找到自定义重试延迟的文档。还有另一篇文章解释指数回退,可以在此链接中找到。
当您调用retry时,它会发送一个新的消息,使用相同的任务ID,并确保将消息传递到与原始任务相同的队列。您可以在此处的文档中阅读更多信息。

self.retry 会将此任务返回到队列中,以便尝试其他任务吗?还是会占用工作人员的“插槽”,直到成功或超过20次尝试? - JasonGenX
@JasonGenX 我已经更新了我的答案,以回答你关于重试的问题。 - Sai Chander

2
如果写入并立即加载是高优先级的话,为什么不将其存储在基于内存的数据库中,例如Memcache或Redis。这样,在一段时间后,您可以使用定期任务在celery中将其写入数据库,比如每分钟运行一次。当完成向数据库写入时,它将从Redis / Memcache中删除密钥。
您可以将数据保留在基于内存的数据库中一定时间,比如1小时,在需要数据最多的时候。此外,您可以创建一个服务方法,用于检查数据是否在内存中。
如果您正在使用Celery中的Redis作为代理,那么Django Redis是一个很好的连接到Redis的包。
以下是基于Django缓存的一些示例:
# service method

from django.core.cache import cache

def get_object(obj_id, model_cls):
    obj_dict = cache.get(obj_id, None)  # checks if obj id is in cache, O(1) complexity
    if obj_dict:
       return model_cls(**obj_dict)
    else:
       return model_cls.objects.get(id=obj_id)


# celery job

@app.task
def store_objects():
    logger.info("-"*25)
    # you can use .bulk_create() to reduce DB hits and faster DB entries
    for obj_id in cache.keys("foo_*"):
        CertainObject.objects.create(**cache.get(obj_id))
        cache.delete(obj_id)
    logger.info("-"*25)
       

1
最简单的解决方案是在任务开始时捕获任何抛出的DoesNotExist错误,然后安排重试。这可以通过将run_async_job转换为一个Bound Task来实现:
@app.task(bind=True)
def run_async_job(self, object_id):
    try:
        instance = CertainObject.objects.get(id=object_id)
    except CertainObject.DoesNotExist:
        return self.retry(object_id)

0

本文深入探讨了如何处理复制数据库中的读写不一致问题:https://medium.com/box-tech-blog/how-we-learned-to-stop-worrying-and-read-from-replicas-58cc43973638

和作者一样,我也知道没有万无一失的方法来处理读写不一致问题。

我之前使用的主要策略是拥有某种expect_and_get(pk, max_attempts=10, delay_seconds=5)方法,该方法尝试获取记录,并尝试max_attempts次,在尝试之间延迟delay_seconds秒。其想法是“期望”记录存在,因此将某些失败视为瞬态DB问题。它比仅睡眠一段时间更可靠,因为它会更快地获取记录,并希望尽可能少地延迟作业执行。

另一种策略是延迟从特殊的save_to_read方法返回,直到读副本具有该值,可以通过同步将新值推送到读副本或仅轮询它们直到它们返回记录。在我看来,这种方式似乎有点不正规。

对于大多数读取操作,您可能不必担心写入后一致性:

如果我们正在呈现用户所属企业的名称,则如果管理员更改名称需要一分钟才能将更改传播到企业的用户,那么这并不是什么大问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接