Django + PostgreSQL如何最佳地提高慢速汇总聚合的性能?

7

背景

我有一个使用PostgreSQL数据库的Django REST API,其中包含数百万个项目。这些项目由多个系统进行处理,处理详细信息被发送回并存储在记录表中。简化模型如下:

class Item(models.Model):
    details = models.JSONField()


class Record(models.Model):
    items = models.ManyToManyField(Item)
    created = models.DateTimeField(auto_created=True)
    system = models.CharField(max_length=100)
    status = models.CharField(max_length=100)
    details = models.JSONField()

目标

我想对“Items”表进行任意过滤,并获取各种处理系统的汇总信息。此摘要为所选每个系统的每个选择的“Item”获取最新状态,并显示每个状态的计数。例如,如果我筛选了1055个项,则返回示例如下:

{
System_1: [running: 5, completed: 1000, error: 50],
System_2: [halted: 55, completed: 1000],
System_3: [submitted: 1055]
}

我目前可以通过如下的查询方式来工作,它返回 System_1 处理状态的计数,并重复执行其他系统的操作,并将其打包为 JSON 返回。

Item.objects.filter(....).annotate(
    system_1_status=Subquery(
        Record.objects.filter(
            system='System_1',
            items__id=OuterRef('pk')
        ).order_by('-created').values('status')[:1]
    )
).values('system_1_status').annotate(count=Count('system_1_status'))

这将转换为SQL查询:

SELECT 
    "api_item"."id", 
    "api_item"."details", 
    (
        SELECT 
            U0."status" 
        FROM 
            "api_record" U0 
        INNER JOIN 
            "api_record_items" U1 
        ON 
            (U0."id" = U1."record_id") 
        WHERE 
            (U1."item_id" = ("api_item"."id") AND U0."system" = system_1) 
        ORDER BY 
            U0."created" DESC LIMIT 1
    ) AS "system_1_status" 
FROM "api_item"

我们有数百万个项目和记录,如果选择的项目少于一千个,则性能表现良好。但是如果选择的项目数量超过一千个,则需要花费数分钟时间。当试图处理数十万个项目时,情况变得非常糟糕。

问题

我该如何提高此查询的性能?除了调整索引,我看不到还有其他方法?

或者,是否将 JSONField 添加到 Item 模型中并存储每个系统的最新状态缓存是一个不好的主意?虽然我不喜欢复制数据的想法,但在模型上已经存在的字段进行聚合应该可以在查询时非常快速。我有 DjangoQ,可以使用计划函数来保持这些字段的最新状态。


2
你能否包含实际发送到数据库的查询语句?同时,表结构和索引也会很有帮助。 - Kadet
我目前无法访问系统以获取这些信息,但我将在几天内更新。Record 上唯一的索引只是主键,而 Items 在各个字段上有许多索引,因此初始过滤速度适当快。只有在获取 Record 摘要时才会变慢。 - User
原则上,如果我能够将Records左连接到items,按Record创建时间排序,并按Item分组,那么我就可以在不使用成千上万个子查询的情况下实现我想要的结果。 - User
这是我的一些建议,首先在“system”上添加一个索引,其次使用queryset.defer("details")(JSONField 检索速度较慢,如果你不需要它在结果中出现,就延迟处理它,速度会更快)。 - Gabriel Muj
4个回答

2
以下解决方案可将查询时间从我的样本中的5分钟减少到20秒。
from collections import Counter

items = Item.objects.filter(...)
{
    "System_1": Counter(
        items.
            filter(record__system='System_1').
            order_by('id', '-record__created').
            values_list('record__status', flat=True).
            distinct('id')),
    "System_2": Counter(
        items.
            filter(record__system='System_2').
            order_by('id', '-record__created').
            values_list('record__status', flat=True).
            distinct('id'))
}

下面是生成的PostgreSQL计划,请告诉我是否可以改进:
SELECT DISTINCT ON ("api_item"."id") "api_record"."status" 
FROM "api_item" INNER JOIN "api_record_items" ON ("api_item"."id" = "api_record_items"."item_id") 
INNER JOIN "api_record" ON ("api_record_items"."record_id" = "api_record"."id") 
WHERE "api_record"."system" = System_1 ORDER BY "api_item"."id" ASC, "api_record"."created" DESC

我不喜欢需要从数据库中获取所有值来进行计数,但是我一直无法让聚合函数与所需的去重配合工作,以确保每个项目只计算一次。
如果有人能够改进此查询计划,请看以下内容:
Unique  (cost=563327.33..1064477.67 rows=1010100 width=21) (actual time=16194.180..22301.422 rows=1010100 loops=1)
Planning Time: 0.492 ms
Execution Time: 22401.646 ms
  ->  Gather Merge  (cost=563327.33..1053946.33 rows=4212534 width=21) (actual time=16194.179..21165.116 rows=22200000 loops=1)
        Workers Planned: 2
        Workers Launched: 2
        ->  Sort  (cost=562327.31..566715.36 rows=1755222 width=21) (actual time=16140.068..17729.869 rows=7400000 loops=3)
              Worker 1:  Sort Method: external merge  Disk: 244080kB
              Worker 0:  Sort Method: external merge  Disk: 246880kB
              Sort Method: external merge  Disk: 247800kB
"              Sort Key: api_item.id, api_record.created DESC"
              ->  Parallel Hash Join  (cost=22117.30..308287.51 rows=1755222 width=21) (actual time=5719.348..8826.655 rows=7400000 loops=3)
                    Hash Cond: (api_record_items.item_id = api_item.id)
                    ->  Parallel Hash Join  (cost=2584.61..261932.34 rows=1755222 width=21) (actual time=17.042..3748.984 rows=7400000 loops=3)
                    ->  Parallel Hash  (cost=12626.75..12626.75 rows=420875 width=8) (actual time=180.939..180.940 rows=336700 loops=3)
                          Hash Cond: (api_record_items.record_id = api_record.id)
                          Buckets: 131072  Batches: 16  Memory Usage: 3520kB
                          ->  Parallel Seq Scan on api_record_items  (cost=0.00..234956.18 rows=9291718 width=16) (actual time=0.472..1557.711 rows=7433333 loops=3)
                          ->  Parallel Seq Scan on api_item  (cost=0.00..12626.75 rows=420875 width=8) (actual time=0.022..95.567 rows=336700 loops=3)
                          ->  Parallel Hash  (cost=2402.11..2402.11 rows=14600 width=21) (actual time=16.479..16.480 rows=10012 loops=3)
                                Buckets: 32768  Batches: 1  Memory Usage: 1952kB
                                ->  Parallel Seq Scan on api_record  (cost=0.00..2402.11 rows=14600 width=21) (actual time=6.063..13.094 rows=10012 loops=3)
                                      Rows Removed by Filter: 33340
                                      Filter: ((system)::text = 'system_1'::text)

2
您正在为每个项目执行一个子查询,尝试像下面这样做,并使用聚合。
from django.db.models import Case, When, Sum

items = Item.objects.filter(# your condition)
results = Record.objects.values("system").annotate(
    running=Sum(Case(When(status="running", then=1), default=0, 
        output_field=IntegerField())),
    completed=Sum(Case(When(status="completed", then=1), default=0, 
        output_field=IntegerField())),
    # add more status annotations
).order_by("system").filter(items__in=items)

谢谢回答,我正在努力确定它是否适用于我。目前出现了ValueError:使用切片必须将QuerySet值限制为一个精确查找结果。 - User
@用户 对不起,我猜这可能是由于“items”超过一个的原因。我已经更改了上面的代码以适应这种情况。根据您的操作,您可能需要调整项目的过滤方式。但首先,可能要检查一下没有“filter()”时查询的效果如何。 - Bernhard Vallant

1
我认为使用简单的分组可能有助于获取状态;
from django.db.models import Count
Item.objects.filter(record__system='System_1').values('record__status').annotate(c=Count('record__status')).values('record__status', 'c')

谢谢您的回复。不幸的是,这并没有给出我需要的结果。如果一个项目有两个记录,比如不同的状态,那么每个状态的总数都会增加。因此,我们最终得到的状态总和将超过应计算的项目总数,因为它们被多次计算了。 - User

1

看起来这是一个经典的top-n-per-group问题。您想要获取每个项目的最新状态。

如果您无法创建合适的索引,并且api_item表中有大量行,则最有效的方法很可能是使用ROW_NUMBER窗口函数,就像这样。我重新编写了您在问题中提出的查询,以产生相同的结果,但希望更高效。

WITH
CTE_rn
AS
(
    SELECT
        U1."item_id"
        ,U0."status" AS "system_1_status"
        ,ROW_NUMBER() OVER (PARTITION BY U1."item_id" ORDER BY U0."created" DESC) AS rn
    FROM
        "api_record" U0 
        INNER JOIN "api_record_items" U1 ON U0."id" = U1."record_id"
    WHERE
        U0."system" = 'system_1'
)
,CTE_item_status
AS
(
    SELECT
        "item_id"
        ,"system_1_status"
    FROM
        CTE_rn
    WHERE
        rn = 1
)
SELECT
    "api_item"."id"
    ,"api_item"."details"
    ,CTE_item_status."system_1_status"
FROM
    "api_item"
    LEFT JOIN CTE_item_status ON CTE_item_status."item_id" = "api_item"."id"
;

如果您知道永远不会有没有状态的api_items,或者不想在结果集中看到这样的项目,则在主查询中使用INNER JOIN CTE_item_status而不是LEFT JOIN


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接