如何查看Django正在运行的原始SQL查询?

455
有没有办法在执行查询时显示Django正在运行的SQL语句?

https://dev59.com/Gqrns4cB2Jgan1zngCsO#23000580 - Luke Dupin
26个回答

11
只是补充一下,在Django中,如果你有一个查询像这样:
MyModel.objects.all()

做:
MyModel.objects.all().query.sql_with_params()

或者:

str(MyModel.objects.all().query)

获取SQL字符串。

9
Django SQL Sniffer是另一种查看(以及查看)使用Django ORM的任何进程产生的原始执行查询的统计数据的选择。我构建它来满足我自己的特定用例,我没有看到任何地方涵盖它,即:
  • 不需要更改目标进程正在执行的源代码(无需在django设置中注册新应用程序,导入装饰器等)
  • 不需要更改日志配置(例如,因为我对一个特定进程感兴趣,而不是适用于整个进程群的配置)
  • 无需重新启动目标进程(例如,因为它是重要组件,重新启动可能会导致一些停机时间)
因此,Django SQL Sniffer可以临时使用,并附加到已经运行的进程上。然后,该工具“嗅探”执行的查询并将它们打印到控制台中。当停止该工具时,将显示基于某些可能的指标(计数,最大持续时间和总组合持续时间)的异常查询的统计摘要。
以下是一个示例的屏幕截图,其中我附加到了Python shell enter image description here 你可以在GitHub页面上查看实时演示和更多细节。

1
简单明了,同时捕获插入和更新。 - caram
如果你使用的是Ubuntu系统,你需要运行apt-get install gdb命令,否则会失败。此外,在shell中执行其他操作之前,我需要通过PID附加嗅探器,否则它将无法工作。除了这两个注意事项,它表现得非常出色。 - Tyler Morgan

4
我把这个函数放在项目中一个应用的 util 文件里:
import logging
import re

from django.db import connection

logger = logging.getLogger(__name__)

def sql_logger():
    logger.debug('TOTAL QUERIES: ' + str(len(connection.queries)))
    logger.debug('TOTAL TIME: ' + str(sum([float(q['time']) for q in connection.queries])))

    logger.debug('INDIVIDUAL QUERIES:')
    for i, query in enumerate(connection.queries):
        sql = re.split(r'(SELECT|FROM|WHERE|GROUP BY|ORDER BY|INNER JOIN|LIMIT)', query['sql'])
        if not sql[0]: sql = sql[1:]
        sql = [(' ' if i % 2 else '') + x for i, x in enumerate(sql)]
        logger.debug('\n### {} ({} seconds)\n\n{};\n'.format(i, query['time'], '\n'.join(sql)))

然后,需要时我只需导入它并从任何上下文(通常是视图)中调用它,例如:

# ... other imports
from .utils import sql_logger

class IngredientListApiView(generics.ListAPIView):
    # ... class variables and such

    # Main function that gets called when view is accessed
    def list(self, request, *args, **kwargs):
        response = super(IngredientListApiView, self).list(request, *args, **kwargs)

        # Call our function
        sql_logger()

        return response

在模板之外执行这个操作很好,因为如果你有API视图(通常是Django Rest Framework),它也适用于那里。


3
以下内容根据https://code.djangoproject.com/ticket/17741,将查询结果返回为有效的SQL语句:
def str_query(qs):
    """
    qs.query returns something that isn't valid SQL, this returns the actual
    valid SQL that's executed: https://code.djangoproject.com/ticket/17741
    """
    cursor = connections[qs.db].cursor()
    query, params = qs.query.sql_with_params()
    cursor.execute('EXPLAIN ' + query, params)
    res = str(cursor.db.ops.last_executed_query(cursor, query, params))
    assert res.startswith('EXPLAIN ')
    return res[len('EXPLAIN '):]

3

如果您正在使用PostgreSQL,我相信这应该可以解决问题:

from django.db import connections
from app_name import models
from django.utils import timezone

# Generate a queryset, use your favorite filter, QS objects, and whatnot.
qs=models.ThisDataModel.objects.filter(user='bob',date__lte=timezone.now())

# Get a cursor tied to the default database
cursor=connections['default'].cursor()

# Get the query SQL and parameters to be passed into psycopg2, then pass
# those into mogrify to get the query that would have been sent to the backend
# and print it out. Note F-strings require python 3.6 or later.
print(f'{cursor.mogrify(*qs.query.sql_with_params())}')

这在Python 2中也可以工作。只需要像print(cursor.mogrify(*qs.query.sql_with_params()))这样重构即可。 - iChux
如果我没记错,Cursor.mogrify返回一个字符串,所以我认为使用f字符串进行格式化是多余的。 - chander

3

如果您需要重复使用查询以进行某些自定义SQL,还有另一种非常有用的方法。我已在一个分析应用程序中使用了此方法,该应用程序远远超出了Django ORM可以轻松处理的范围,因此我将ORM生成的SQL作为子查询包含。

from django.db import connection
from myapp.models import SomeModel

queryset = SomeModel.objects.filter(foo='bar')

sql_query, params = queryset.query.as_sql(None, connection)

这将为您提供带有占位符的SQL查询及其参数元组,您可以直接将它们传递给数据库:

with connection.connection.cursor(cursor_factory=DictCursor) as cursor:
    cursor.execute(sql_query, params)
    data = cursor.fetchall()

2
我已经写了一个小片段,你可以使用它:
from django.conf import settings
from django.db import connection


def sql_echo(method, *args, **kwargs):
    settings.DEBUG = True
    result = method(*args, **kwargs)
    for query in connection.queries:
        print(query)
    return result


# HOW TO USE EXAMPLE:
# 
# result = sql_echo(my_method, 'whatever', show=True)

它需要作为参数一个包含SQL查询的函数以及调用该函数所需的args和kwargs。结果它返回函数的返回值并在控制台中打印SQL查询。


2

要从Django查询数据库并获得正确的参数替换结果,您可以使用以下函数:

from django.db import connection

def print_database_query_formatted(query):
    sql, params = query.sql_with_params()
    cursor = connection.cursor()
    cursor.execute('EXPLAIN ' + sql, params)
    db_query = cursor.db.ops.last_executed_query(cursor, sql, params).replace('EXPLAIN ', '')

    parts = '{}'.format(db_query).split('FROM')
    print(parts[0])
    if len(parts) > 1:
        parts = parts[1].split('WHERE')
        print('FROM{}'.format(parts[0]))
        if len(parts) > 1:
            parts = parts[1].split('ORDER BY')
            print('WHERE{}'.format(parts[0]))
            if len(parts) > 1:
                print('ORDER BY{}'.format(parts[1]))

# USAGE
users = User.objects.filter(email='admin@admin.com').order_by('-id')
print_database_query_formatted(users.query)

输出示例

SELECT "users_user"."password", "users_user"."last_login", "users_user"."is_superuser", "users_user"."deleted", "users_user"."id", "users_user"."phone", "users_user"."username", "users_user"."userlastname", "users_user"."email", "users_user"."is_staff", "users_user"."is_active", "users_user"."date_joined", "users_user"."latitude", "users_user"."longitude", "users_user"."point"::bytea, "users_user"."default_search_radius", "users_user"."notifications", "users_user"."admin_theme", "users_user"."address", "users_user"."is_notify_when_buildings_in_radius", "users_user"."active_campaign_id", "users_user"."is_unsubscribed", "users_user"."sf_contact_id", "users_user"."is_agree_terms_of_service", "users_user"."is_facebook_signup", "users_user"."type_signup" 
FROM "users_user" 
WHERE "users_user"."email" = 'admin@admin.com' 
ORDER BY "users_user"."id" DESC

它基于这个票务评论:https://code.djangoproject.com/ticket/17741#comment:4


2
为了在Django中生成立即执行的CREATE / UPDATE / DELETE命令的SQL,请使用以下内容:
from django.db.models import sql

def generate_update_sql(queryset, update_kwargs):
    """Converts queryset with update_kwargs
    like : queryset.update(**update_kwargs) to UPDATE SQL"""

    query = queryset.query.clone(sql.UpdateQuery)
    query.add_update_values(update_kwargs)
    compiler = query.get_compiler(queryset.db)
    sql, params = compiler.as_sql()
    return sql % params

from django.db.models import sql

def generate_delete_sql(queryset):
    """Converts select queryset to DELETE SQL """
    query = queryset.query.chain(sql.DeleteQuery)
    compiler = query.get_compiler(queryset.db)
    sql, params = compiler.as_sql()
    return sql % params

from django.db.models import sql

def generate_create_sql(model, model_data):
    """Converts queryset with create_kwargs
    like if was: queryset.create(**create_kwargs) to SQL CREATE"""
    
    not_saved_instance = model(**model_data)
    not_saved_instance._for_write = True

    query = sql.InsertQuery(model)

    fields = [f for f in model._meta.local_concrete_fields if not isinstance(f, AutoField)]
    query.insert_values(fields, [not_saved_instance], raw=False)

    compiler = query.get_compiler(model.objects.db)
    sql, params = compiler.as_sql()[0]
    return sql % params

测试和使用

    def test_generate_update_sql_with_F(self):
        qs = Event.objects.all()
        update_kwargs = dict(description=F('slug'))
        result = generate_update_sql(qs, update_kwargs)
        sql = "UPDATE `api_event` SET `description` = `api_event`.`slug`"
        self.assertEqual(sql, result)

    def test_generate_create_sql(self):
        result = generate_create_sql(Event, dict(slug='a', app='b', model='c', action='e'))
        sql = "INSERT INTO `api_event` (`slug`, `app`, `model`, `action`, `action_type`, `description`) VALUES (a, b, c, e, , )"
        self.assertEqual(sql, result)

1
from django.db import reset_queries, connection
class ShowSQL(object):
    def __enter__(self):
        reset_queries()
        return self

    def __exit__(self, *args):
        for sql in connection.queries:
            print('Time: %s\nSQL: %s' % (sql['time'], sql['sql']))

那么您可以使用:

with ShowSQL() as t:
    some queries <select>|<annotate>|<update> or other 

它会打印

  • 时间:%s
  • SQL:%s

这个回答值得更多的赞。 - Serhii Kushchenko

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接