如何使用SQLAlchemy将pandas DataFrame upsert到MySQL

3

我正在将数据从数据框推送到MySQL,目前只有在数据不存在(追加)时才向表中添加新数据。这很完美,但我也希望我的代码检查记录是否已经存在,如果是,则需要更新。所以我需要追加+更新。我真的不知道该如何开始解决这个问题,因为我被卡住了...有人之前尝试过吗?

这是我的代码:

engine = create_engine("mysql+pymysql://{user}:{pw}@localhost/{db}"
                        .format(user="root",
                                pw="*****",
                                db="my_db"))
my_df.to_sql('my_table', con = engine, if_exists = 'append')

@SlavaRozhnev 对不起,我的 if_exists 实际上是 append,我编辑了我的代码。 - NorthAfrican
根据pandas.DataFrame,你无法通过pandas来实现这个功能,但是你可以通过MySQL的触发器来实现。 - Slava Rozhnev
如何?并且是否可以自动化它……? - NorthAfrican
您所描述的是一种“upsert”操作。在网上搜索“MySQL upsert”应该会给您一些如何进行操作的想法。 - Gord Thompson
@GordThompson 好的,但是我没有看到与Pandas结合使用的任何东西...? - NorthAfrican
这里有一个PostgreSQL的例子(https://dev59.com/-1IH5IYBdhLWcg3wFInR)。你可以为MySQL做类似的事情。 - Gord Thompson
2个回答

2

与在PostgreSQL中使用的方法类似,您可以在MySQL中使用INSERT ... ON DUPLICATE KEY

with engine.begin() as conn:
    # step 0.0 - create test environment
    conn.execute(sa.text("DROP TABLE IF EXISTS main_table"))
    conn.execute(
        sa.text(
            "CREATE TABLE main_table (id int primary key, txt varchar(50))"
        )
    )
    conn.execute(
        sa.text(
            "INSERT INTO main_table (id, txt) VALUES (1, 'row 1 old text')"
        )
    )
    # step 0.1 - create DataFrame to UPSERT
    df = pd.DataFrame(
        [(2, "new row 2 text"), (1, "row 1 new text")], columns=["id", "txt"]
    )

    # step 1 - create temporary table and upload DataFrame
    conn.execute(
        sa.text(
            "CREATE TEMPORARY TABLE temp_table (id int primary key, txt varchar(50))"
        )
    )
    df.to_sql("temp_table", conn, index=False, if_exists="append")

    # step 2 - merge temp_table into main_table
    conn.execute(
        sa.text(
            """\
            INSERT INTO main_table (id, txt) 
            SELECT id, txt FROM temp_table
            ON DUPLICATE KEY UPDATE txt = VALUES(txt)
            """
        )
    )

    # step 3 - confirm results
    result = conn.execute(
        sa.text("SELECT * FROM main_table ORDER BY id")
    ).fetchall()
    print(result)  # [(1, 'row 1 new text'), (2, 'new row 2 text')]

2
您可以在数据库端使用以下解决方案: 第一步:创建一个表,用于从Pandas中插入数据(我们称之为test):
CREATE TABLE `test` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(100) NOT NULL,
  `capacity` INT(11) NOT NULL,
  PRIMARY KEY (`id`)
);

第二步:创建结果数据的表格(我们称之为cumulative_test),其结构与test完全相同:

CREATE TABLE `cumulative_test` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(100) NOT NULL,
  `capacity` INT(11) NOT NULL,
  PRIMARY KEY (`id`)
);

第三步:为测试表中的每个插入设置触发器,将会在第二个表中插入或更新记录,如下所示:

DELIMITER $$

CREATE
    /*!50017 DEFINER = 'root'@'localhost' */
    TRIGGER `before_test_insert` BEFORE INSERT ON `test` 
    FOR EACH ROW BEGIN
    DECLARE _id INT;
    
    SELECT id INTO _id
    FROM `cumulative_test` WHERE `cumulative_test`.`name` = new.name;
    
    IF _id IS NOT NULL THEN
        UPDATE cumulative_test
        SET `cumulative_test`.`capacity` = `cumulative_test`.`capacity` + new.capacity;
     ELSE 
        INSERT INTO `cumulative_test` (`name`, `capacity`) 
        VALUES (NEW.name, NEW.capacity);
    END IF; 
END;
$$

DELIMITER ;

所以你已经将值插入测试表中,并在第二个表中获得计算结果。触发器内部的逻辑可以根据您的需求进行匹配。


我想要更新和追加的表已经有将近100万条记录了... - NorthAfrican

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接