在Django中将开发/暂存/生产媒体桶分离在S3上

3

我们目前在一个Django 1.11项目中使用AWS S3存储桶作为媒体文件的存储方式(使用django-storages库中的S3BotoStorage)。相关代码如下:

# storage.py

from storages.backends.s3boto import S3BotoStorage


class MediaRootS3BotoStorage(S3BotoStorage):
    """Storage for uploaded media files."""
    bucket_name = settings.AWS_MEDIA_STORAGE_BUCKET_NAME
    custom_domain = domain(settings.MEDIA_URL)

# common_settings.py

DEFAULT_FILE_STORAGE = 'storage.MediaRootS3BotoStorage'
AWS_MEDIA_STORAGE_BUCKET_NAME = 'xxxxxxxxxxxxxxxx'
MEDIA_URL = "//media.example.com/"

# models.py
import os
import uuid

from django.db import models
from django.utils import timezone
from django.utils.module_loading import import_string


def upload_to_unique_filename(instance, filename):
    try:
        extension = os.path.splitext(filename)[1]
    except Exception:
        extension = ""
    now = timezone.now()

    return f'resume/{now.year}/{now.month}/{uuid.uuid4()}{extension}'


class Candidate(models.Model):
    [...]
    resume = models.FileField(
        storage=import_string(settings.DEFAULT_PRIVATE_FILE_STORAGE)(),
        upload_to=upload_to_unique_filename,
    )
    [...]

问题在于存储桶键在设置文件中是硬编码的,由于有多个开发人员和一个演示环境,所有为测试/QA目的上传的垃圾文件最终都会与真正的生产数据一起上传到同一个S3存储桶中。
一个明显的解决方案是在“staging_settings.py”和“development_settings.py”文件中覆盖“AWS_MEDIA_STORAGE_BUCKET_NAME”,但这将使得生产数据在演示和测试实例上不可用。为了使其工作,我们需要以某种方式将生产存储桶与开发/演示存储桶同步,但我不确定如何高效且无缝地完成此操作。
另一个选择是在开发和演示环境中使用本地文件系统进行媒体存储。这也需要下载大量媒体文件,并将栈的一部分(django-storages和S3 API)从测试/QA过程中排除。
如何处理这个问题?在同一个存储桶中混合测试和生产媒体文件是一个问题吗(在开始考虑如何处理它之前,我很确定它是一个问题)?一般来说,有关分离开发/演示/生产云存储的最佳实践是什么?
2个回答

1
我们最近通过自定义S3Storage类解决了这个问题,该类支持两个存储桶而不是一个。每个环境都写入自己的存储桶,这意味着生产存储桶不会被来自临时环境(开发、暂存、QA等)的文件污染。但是,如果给定的环境需要在自己的存储桶中找不到的资源,则会自动尝试从生产存储桶中获取它。因此,我们不需要复制大量已经在生产存储桶中可用的静态资源。

在settings.py中,我们添加了两个新变量并指定了自定义存储类。

# The alternate bucket (typically the production bucket) is used as a fallback when the primary one doesn't contain the resource requested.
AWS_STORAGE_ALTERNATE_BUCKET_NAME = os.getenv('AWS_STORAGE_ALTERNATE_BUCKET_NAME')
AWS_S3_ALTERNATE_CUSTOM_DOMAIN = f'{AWS_STORAGE_ALTERNATE_BUCKET_NAME}.s3.amazonaws.com'

# Custom storage class
STATICFILES_STORAGE = 'hello_django.storage_backends.StaticStorage'

然后在自定义存储类中,我们将url()方法重写如下。
from datetime import datetime, timedelta
from urllib.parse import urlencode

from django.utils.encoding import filepath_to_uri

from storages.backends.s3boto3 import S3Boto3Storage
from storages.utils import setting


class StaticStorage(S3Boto3Storage):
    location = 'static'
    default_acl = 'public-read'

    def __init__(self, **settings):
        super().__init__(**settings)

    def get_default_settings(self):
        settings_dict = super().get_default_settings()
        settings_dict.update({
            "alternate_bucket_name": setting("AWS_STORAGE_ALTERNATE_BUCKET_NAME"),
            "alternate_custom_domain": setting("AWS_S3_ALTERNATE_CUSTOM_DOMAIN")
        })
        return settings_dict

    def url(self, name, parameters=None, expire=None, http_method=None):
        params = parameters.copy() if parameters else {}
        if self.exists(name):
            r = self._url(name, parameters=params, expire=expire, http_method=http_method)
        else:
            if self.alternate_bucket_name:
                params['Bucket'] = self.alternate_bucket_name
                r = self._url(name, parameters=params, expire=expire, http_method=http_method)
        return r

    def _url(self, name, parameters=None, expire=None, http_method=None):
        """
        Similar to super().url() except that it allows the caller to provide
        an alternate bucket name in parameters['Bucket']
        """
        # Preserve the trailing slash after normalizing the path.
        name = self._normalize_name(self._clean_name(name))
        params = parameters.copy() if parameters else {}
        if expire is None:
            expire = self.querystring_expire

        if self.custom_domain:
            bucket_name = params.pop('Bucket', None)
            if bucket_name is None or self.alternate_custom_domain is None:
                custom_domain = self.custom_domain
            else:
                custom_domain = self.alternate_custom_domain
            url = '{}//{}/{}{}'.format(
                self.url_protocol,
                custom_domain,
                filepath_to_uri(name),
                '?{}'.format(urlencode(params)) if params else '',
            )

            if self.querystring_auth and self.cloudfront_signer:
                expiration = datetime.utcnow() + timedelta(seconds=expire)
                return self.cloudfront_signer.generate_presigned_url(url, date_less_than=expiration)

            return url

        if params.get('Bucket') is None:
            params['Bucket'] = self.bucket.name
        params['Key'] = name
        url = self.bucket.meta.client.generate_presigned_url('get_object', Params=params,
                                                             ExpiresIn=expire, HttpMethod=http_method)
        if self.querystring_auth:
            return url
        return self._strip_signing_parameters(url)

这个示例项目演示了这种方法。


1
在这种情况下,我们的团队会为所有环境使用一个存储桶,但我们会为上传的静态和媒体文件添加一些元数据。通过这种方式,为了删除某些非生产S3对象,您只需使用AWS API进行过滤,并将它们删除。
可以通过在settings.py中添加以下内容实现:
ENVIRONMENT = "development/production/qa"
AWS_S3_OBJECT_PARAMETERS = {
   'CacheControl': 'max-age=86400',
   'Metadata': {
      'environment': ENVIRONMENT
   }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接