我有两个不同的SQLite数据库XXX和YYY。 XXX包含表A,而YYY分别包含B。 A和B具有相同的结构(列)。 如何使用Python - SQLite API将B的行附加到A中。 附加后,A包含A和B的行。
您首先需要使用 sqlite3.connect
连接到数据库,然后创建游标以便可以执行SQL语句。有了游标之后,您就可以执行任意的SQL命令。
示例:
import sqlite3
# Get connections to the databases
db_a = sqlite3.connect('database_a.db')
db_b = sqlite3.connect('database_b.db')
# Get the contents of a table
b_cursor = db_b.cursor()
b_cursor.execute('SELECT * FROM mytable')
output = b_cursor.fetchall() # Returns the results as a list.
# Insert those contents into another table.
a_cursor = db_a.cursor()
for row in output:
a_cursor.execute('INSERT INTO myothertable VALUES (?, ?, ...etc..., ?, ?)', row)
# Cleanup
db_a.commit()
a_cursor.close()
b_cursor.close()
<注意:我并没有实际测试过这个,所以它可能存在一些缺陷,但基本思路是正确的,我认为。>
这是一个通用函数,应根据您特定的环境进行自定义。为此,您可以使用静态 SQL 参数(而不是 PRAGMA table_info
)来构造“动态确定 SQL 表达式要求”部分。这应该会提高性能。
import sqlite3
def merge_tables(cursor_new: sqlite3.Cursor, cursor_old: sqlite3.Cursor, table_name: str, del_old_table: bool = False) -> None:
'''
This function merges the content of a specific table from an old cursor into a new cursor.
:param cursor_new: [sqlite3.Cursor] the primary cursor
:param cursor_old: [sqlite3.Cursor] the secondary cursor
:param table_name: [str] the name of the table
:return: None
'''
# dynamically determine SQL expression requirements
column_names = cursor_new.execute(f"PRAGMA table_info({table_name})").fetchall()
column_names = tuple([x[1] for x in column_names][1:]) # remove the primary keyword
values_placeholders = ', '.join(['?' for x in column_names]) # format appropriately
# SQL select columns from table
data = cursor_old.execute(f"SELECT {', '.join(column_names)} FROM {table_name}").fetchall()
# insert the data into the primary cursor
cursor_new.executemany(f"INSERT INTO {table_name} {column_names} VALUES ({values_placeholders})", data)
if (cursor_new.connection.commit() == None):
# With Ephemeral RAM connections & testing, deleting the table may be ill-advised
if del_old_table:
cursor_old.execute(f"DELETE FROM {table_name}") # cursor_old.execute(f'DROP TABLE {table_name}')
cursor_old.connection.commit()
print(f"Table {table_name} merged from {cursor_old.connection} to {cursor_new.connection}") # Consider logging.info()
return None