如何将二进制文件序列化以便与Celery任务一起使用

12

我最近在我的一个应用程序中集成了 celery(更具体地说是django-celery)。我在应用程序中有一个如下所示的模型。

class UserUploadedFile(models.Model)
    original_file = models.FileField(upload_to='/uploads/')    
    txt = models.FileField(upload_to='/uploads/')
    pdf = models.FileField(upload_to='/uploads/')
    doc = models.FileField(upload_to='/uploads/')
    
    def convert_to_others(self):
        # Code to convert the original file to other formats

现在,一旦用户上传文件,我想将原始文件转换为txt、pdf和doc格式。调用convert_to_others方法是一个比较昂贵的过程,因此我计划使用celery异步处理它。因此,我编写了一个简单的celery任务,如下所示。

@celery.task(default_retry_delay=bdev.settings.TASK_RETRY_DELAY)
def convert_ufile(file, request):
    """ 
    This task method would call a UserUploadedFile object's convert_to_others
    method to do the file conversions.

    The best way to call this task would be doing it asynchronously
    using apply_async method.
    """
    try:
        file.convert_to_others()
    except Exception, err:
        # If the task fails log the exception and retry in 30 secs
        log.LoggingMiddleware.log_exception(request, err)
        convert_ufile.retry(exc=err)
    return True

并且接下来按以下方式调用任务:
ufile = get_object_or_404(models.UserUploadedFiles, pk=id)
tasks.convert_ufile.apply_async(args=[ufile, request])

现在当调用apply_async方法时,会引发以下异常:

PicklingError: Can't pickle <type 'cStringIO.StringO'>: attribute lookup cStringIO.StringO failed

我认为这是因为 celery(默认情况下)使用 pickle 库来序列化数据,而 pickle 无法序列化二进制文件。

问题

是否有其他的序列化器可以自行序列化二进制文件?如果没有,如何使用默认的 pickle 序列化器来序列化二进制文件?

1个回答

7
您说得对,celery尝试对不支持pickle的数据进行pickle。即使您找到了序列化要发送到celery任务的数据的方法,我也不会这样做。
发送尽可能少的数据到celery任务是一种好习惯,所以在您的情况下,我只会传递一个实例的id。有了这个ID,您可以在celery任务中按ID获取对象并执行。
请注意,在任务执行之前,对象状态可能会更改(甚至可能被删除)。因此,最好在您的celery任务中获取对象,而不是发送其完整副本。
总之,仅发送实例ID并在任务中重新获取它可以为您提供以下几点优势:
  • 发送较少的数据到队列。
  • 您无需处理数据不一致性问题。
  • 在您的情况下,这是完全可能的。 :)
唯一的“缺点”是您需要执行额外且廉价的SELECT查询来重新获取数据,但与上述问题相比,它看起来像是一个很好的交易,不是吗?

3
如果我正确理解你的回答,这种方法不能适用于扩展。如果你的Django应用程序和Celery运行在同一台机器上,那么你将能够访问UserUploadFile实例和实际的磁盘文件。如果你有单独的Celery节点,它们将无法访问磁盘文件。Django不会将文件存储在数据库中。虽然可能是一个可怕的解决方案,但我使用uuencode对二进制文件进行编码,并将编码后的数据保存到TextField中。 - Aaron C. de Bruyn
1
在这种情况下,您可以尝试从实际托管图像的机器上下载文件。当然,您需要编写访问图像主机的逻辑。您还可以尝试使用其他类型的文件存储(请参见django-storages)。 - dzida
我的解决方案: data = StringIO.StringIO() / uu.encode(open(file), data) / data.seek(0) / o = MyDjangoObject(uuencoded_data=data.read()) / o.save()然后,您可以解码文件,在Celery节点上保存它并进行任何您想要的操作。 - Aaron C. de Bruyn
如果考虑到规模问题,我会考虑如何避免将文件存储在数据库中。您是否考虑过像rackspace/amazon这样的外部存储? - dzida
1
是的。太贵了。我们只需要在一小组机器之间传输数据进行处理。另一个选择是使用NAS通过NFS共享文件夹到所有工作节点。在我们的情况下,我们需要在非公开网站上快速解决问题。 - Aaron C. de Bruyn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接