将pyodbc游标的结果输出为Python字典。

111
我该如何将pyodbc游标的输出(来自.fetchone.fetchmany.fetchall)序列化为Python字典?
我正在使用bottlepy,并且需要返回字典以便将其作为JSON返回。

是的,我注意到这在PEPE249 FAQ中提到了,但这并不影响我的要求。 - Foo Stack
10个回答

222

如果您事先不知道列名,请使用Cursor.description来构建列名列表,并使用zip将每行与之合并以生成字典列表。示例假定已建立连接和查询:

>>> cursor = connection.cursor().execute(sql)
>>> columns = [column[0] for column in cursor.description]
>>> print(columns)
['name', 'create_date']
>>> results = []
>>> for row in cursor.fetchall():
...     results.append(dict(zip(columns, row)))
...
>>> print(results)
[{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'master'},   
 {'create_date': datetime.datetime(2013, 1, 30, 12, 31, 40, 340000), 'name': u'tempdb'},
 {'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'model'},     
 {'create_date': datetime.datetime(2010, 4, 2, 17, 35, 8, 970000), 'name': u'msdb'}]

9
我之前不知道cursor.description,现在知道了,这让我省了很多时间。 - TehTris
@BenLutgens 看起来是一个很好的提问机会。一定要包含能够重现错误的代码。 - Bryan
3
@LJT 只有在Python3中才能使用...但由于print()函数在Python2中也可以使用,因此最好养成使用它的习惯。 - Auspex
1
@BenLutgens 因为该示例生成的是字典列表,而不是字典。 - Auspex
1
更新:默认情况下,pypyodbc设置lowercase = True。您可以像这样覆盖它:import pypyodbc; pypyodbc.lowercase = False。参考:链接 - Weihui Guo
显示剩余5条评论

17

使用 @Beargle 在 bottlepy 中的结果,我能够创建这个非常简洁的查询暴露端点:

@route('/api/query/<query_str>')
def query(query_str):
    cursor.execute(query_str)
    return {'results':
            [dict(zip([column[0] for column in cursor.description], row))
             for row in cursor.fetchall()]}

4
这是否容易受到 SQL 注入攻击? - Ben
@Ben 是的!除非你确信请求始终来自可信客户端,否则永远不要使用它。 - Bora M. Alper

9
如果光标不可用 - 例如,当行已通过某些函数调用或内部方法返回时,您仍可以使用row.cursor_description创建字典表示。
def row_to_dict(row):
    return dict(zip([t[0] for t in row.cursor_description], row))

7
这里是一个简短的版本,您可能能够使用。
>>> cursor.select("<your SQL here>")
>>> single_row = dict(zip(zip(*cursor.description)[0], cursor.fetchone()))
>>> multiple_rows = [dict(zip(zip(*cursor.description)[0], row)) for row in cursor.fetchall()]

您可能知道,当您在列表中添加*时,实际上会剥离列表,将单独的列表条目作为调用函数的参数。通过使用zip,我们将选择从第一到第n个条目并将它们像裤子拉链一样合并在一起。

因此,通过使用

zip(*[(a,1,2),(b,1,2)])
# interpreted by python as zip((a,1,2),(b,1,2))

您获得

[('a', 'b'), (1, 1), (2, 2)]

由于描述是一个包含元组的元组,每个元组描述每个列的标题和数据类型,您可以使用以下方法提取每个元组的第一个元素

>>> columns = zip(*cursor.description)[0]

相当于
>>> columns = [column[0] for column in cursor.description]

使用Python3.4时,我遇到了“TypeError: 'zip' object is not subscriptable”错误,因此无法使用“zip(*description)[0]”技巧。 - malat
1
在Python 3.4中,zip是一个迭代器。您可以将zip包装在列表中list(zip(*description))[0] @malat - Tommy Strand
你通过使用columns变量节省了一行代码,但是为每一行单独计算列名来乘以函数的复杂度。 - Sergey Nudnov

3

主要根据@Torxed的回复,我创建了一组完整的通用函数,用于将模式和数据查找到字典中:

def schema_dict(cursor):
    cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
    schema = {}

    for it in cursor.fetchall():
        if it[0] not in schema:
            schema[it[0]]={'scheme':[]}
        else:
            schema[it[0]]['scheme'].append(it[1])

    return schema


def populate_dict(cursor, schema):
    for i in schema.keys():
        cursor.execute("select * from {table};".format(table=i))

        for row in cursor.fetchall():
            colindex = 0

            for col in schema[i]['scheme']:
                if not 'data' in schema[i]:
                    schema[i]['data']=[]

                schema[i]['data'].append(row[colindex])
                colindex += 1

    return schema

def database_to_dict():
    cursor = connect()
    schema = populate_dict(cursor, schema_dict(cursor))

欢迎您在这方面尽情发挥,以减少代码行数;但与此同时,它是有效的!

;)


2

我喜欢@bryan和@foo-stack的答案。如果你正在使用postgresql并且使用psycopg2,你可以使用一些 psycopg2的好东西来通过在从连接创建游标时指定cursorfactory为DictCursor来实现相同的功能,像这样:

cur = conn.cursor( cursor_factory=psycopg2.extras.DictCursor )

现在,您可以执行您的sql查询,并获得一个字典来获取您的结果,而无需手动映射它们。

cur.execute( sql_query )
results = cur.fetchall()

for row in results:
    print row['row_no']

请注意,您需要 import psycopg2.extras 才能使其工作。

1
我需要的是与OP所问略有不同的内容:如果您想完全概括执行SQL Select查询的例程,但需要通过索引号而不是名称引用结果,则可以使用列表嵌套的方式实现,而不是使用字典。
返回数据的每一行在返回的列表中表示为字段(列)值的列表。
列名可以作为返回列表的第一个条目提供,因此在调用例程中解析返回的列表可以非常容易和灵活。
通过这种方式,执行数据库调用的例程不需要知道它处理的数据的任何信息。下面是这样一个例程:
    def read_DB_Records(self, tablename, fieldlist, wherefield, wherevalue) -> list:

        DBfile = 'C:/DATA/MyDatabase.accdb'
        # this connection string is for Access 2007, 2010 or later .accdb files
        conn = pyodbc.connect(r'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ='+DBfile)
        cursor = conn.cursor()

        # Build the SQL Query string using the passed-in field list:
        SQL = "SELECT "
        for i in range(0, len(fieldlist)):
            SQL = SQL + "[" + fieldlist[i] + "]"
            if i < (len(fieldlist)-1):
                SQL = SQL + ", "
        SQL = SQL + " FROM " + tablename

        # Support an optional WHERE clause:
        if wherefield != "" and wherevalue != "" :
            SQL = SQL + " WHERE [" + wherefield + "] = " + "'" + wherevalue + "';"

        results = []    # Create the results list object

        cursor.execute(SQL) # Execute the Query

        # (Optional) Get a list of the column names returned from the query:
        columns = [column[0] for column in cursor.description]
        results.append(columns) # append the column names to the return list

        # Now add each row as a list of column data to the results list
        for row in cursor.fetchall():   # iterate over the cursor
            results.append(list(row))   # add the row as a list to the list of lists

        cursor.close()  # close the cursor
        conn.close()    # close the DB connection

        return results  # return the list of lists

1
假设您知道列名!此外,这里有三种不同的解决方案,您可能会想看最后一个!
colnames = ['city', 'area', 'street']
data = {}

counter = 0
for row in x.fetchall():
    if not counter in data:
        data[counter] = {}

    colcounter = 0
    for colname in colnames:
        data[counter][colname] = row[colcounter]
        colcounter += 1

    counter += 1

这是一个索引版本,虽然不是最美观的解决方案,但它可以工作。另一种方法是将列名作为字典键进行索引,每个键中包含一个列表,其中按行号顺序包含数据。方法如下:
colnames = ['city', 'area', 'street']
data = {}

for row in x.fetchall():
    colindex = 0
    for col in colnames:
        if not col in data:
            data[col] = []
        data[col].append(row[colindex])
        colindex += 1

写这篇文章时,我意识到可以用 for colindex in range(0, len()) 替代 for col in colnames,但你应该能理解。然而,后一种示例在仅获取一行数据时非常有用,例如:

使用字典存储每行数据

def fetchone_dict(stuff):
    colnames = ['city', 'area', 'street']
    data = {}

    for colindex in range(0, colnames):
        data[colnames[colindex]] = stuff[colindex]
    return data

row = x.fetchone()
print fetchone_dict(row)['city']

获取表名(我想是这样的..感谢Foo Stack):
下面是beargle提供的更直接的解决方案

cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
    if it[0] in schema:
       schema[it[0]].append(it[1])
    else:
        schema[it[0]] = [it[1]]

谢谢,但是当我不知道列名时,是否有一般化的解决方案? - Foo Stack
是的,它被称为SQL语法。您可以查询数据库以获取正在查询的表的名称。https://dev59.com/wG445IYBdhLWcg3w5OII - Torxed
我写了一个不错的通用模式收集器: - Foo Stack
1
cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';") schema = {} for it in cursor.fetchall(): if it[0] in schema: schema[it[0]].append(it[1]) else: schema[it[0]] = [it[1]] - Foo Stack
@FooStack 列名已经在 cursor.description 中返回。不需要单独查询。 - Bryan

1
步骤如下:

1. 导入库:

  1. Import Libs:
    from pandas import DataFrame
    import pyodbc
    import sqlalchemy

从本地数据库获取您的结果:
db_file = r'xxx.accdb'
odbc_conn_str = 'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s' % (db_file)

conn = pyodbc.connect(odbc_conn_str)
cur = conn.cursor() 
qry = cur.execute("SELECT * FROM tbl")
columns = [column[0] for column in cur.description]

results = []
for row in cur.fetchall():
 
   results.append(dict(zip(columns, row)))
df = DataFrame(results) 
df

0

我知道这已经是老话题了,我只是在回顾别人已经说过的内容。但我发现这种方法很好,因为它也可以防止注入攻击。

def to_dict(row):
    return dict(zip([t[0] for t in row.cursor_description], row))

def query(cursor, query, params=[], cursor_func=to_dict):
    cursor.execute(query, params) 
    results = [cursor_func(row) for row in cursor.fetchall()]
    return results

quotes = query(cursor, "select * from currency where abbreviation like ?", ["USD"])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接