无论如何,我都无法在Hibernate中批量处理MySQL的INSERT语句

34

我目前面临着广为人知的并且常见的Hibernate插入批处理问题。

我需要保存500万行的批次。我首先尝试较轻的负载。由于我只需要插入两种类型的实体(首先是所有类型A的记录,然后是所有类型B的记录,都指向共同的类型C ManyToOne 父级),因此我想充分利用JDBC批量插入。

我已经阅读了很多文档,但是我尝试过的没有一个有效。

  • 我知道为了使用批量插入,我不能使用实体生成器。因此,我删除了AUTO_INCREMENT ID,并使用一个技巧设置ID:SELECT MAX(ID) FROM ENTITIES并每次递增。
  • 我知道我必须定期刷新会话。我将在下面发布代码,但是无论如何,我每500个元素执行一次事务。
  • 我知道我必须将hibernate.jdbc.batch_size设置与我的应用程序的批量大小一致,因此我在LocalSessionFactoryBean中设置它(Spring ORM集成)
  • 我知道我必须在连接URL中启用重写批处理语句。

这是我的实体

共同的父实体。这个在单个事务中首先插入。我不关心自动递增列在这里。每个批处理作业只有一个记录

@Entity
@Table(...)
@SequenceGenerator(...)
public class Deal
{

    @Id
    @Column(
            name = "DEAL_ID",
            nullable = false)
    @GeneratedValue(
            strategy = GenerationType.AUTO)
    protected Long id;

    ................
}

其中一个孩子(假设每批次2.5M个记录)

@Entity
@Table(
        name = "TA_LOANS")
public class Loan
{

    @Id
    @Column(
            name = "LOAN_ID",
            nullable = false)
    protected Long id;

    @ManyToOne(
            optional = false,
            targetEntity = Deal.class,
            fetch = FetchType.LAZY)
    @JoinColumn(
            name = "DEAL_ID",
            nullable = false)
    protected Deal deal;


    .............
}

其他孩子正在输入。假设还有另外250万条记录。
@Entity
@Table(
        name = "TA_BONDS")
public class Bond
{

    @Id
    @Column(
            name = "BOND_ID")

    @ManyToOne(
            fetch = FetchType.LAZY,
            optional = false,
            targetEntity = Deal.class)
    @JoinColumn(
            name = "DEAL_ID",
            nullable = false,
            updatable = false)
    protected Deal deal;

}

插入记录的简化代码

    long loanIdCounter = loanDao.getMaxId(), bondIdCounter = bondDao.getMaxId(); //Perform SELECT MAX(ID)

    Deal deal = null;

    List<Bond> bondList = new ArrayList<Bond>(COMMIT_BATCH_SIZE); //500 constant value
    List<Loan> loanList = new ArrayList<Loan>(COMMIT_BATCH_SIZE);

    for (String msg: inputStreamReader)
    {
        log.debug(msg.toString());

        if (this is a deal)
        {
            Deal deal = parseDeal(msg.getMessage());

            deal = dealManager.persist(holder.deal); //Called in a separate transaction using Spring annotation @Transaction(REQUIRES_NEW)

        }
        else if (this is a loan)
        {

            Loan loan = parseLoan(msg.getMessage());
            loan.setId(++loanIdCounter);
            loan.setDeal(deal);

            loanList.add(loan);

            if (loanList.size() == COMMIT_BATCH_SIZE)
            {
                loanManager.bulkInsert(loanList); //Perform a bulk insert in a single transaction, not annotated but handled manually this time
                loanList.clear();
            }
        }
        else if (this is a bond)
        {
            Bond bond = parseBond(msg.getMessage());
            bond.setId(++bondIdCounter);
            bond.setDeal(deal);

            bondList.add(bond);



            if (bondList.size() == COMMIT_BATCH_SIZE) //As above
            {
                bondManager.bulkInsert(bondList);
                bondList.clear();

            }
        }
    }

    if (!bondList.isEmpty())
        bondManager.bulkInsert(bondList);
    if (!loanList.isEmpty())
        loanManager.bulkInsert(loanList);
    //Flush remaining items, not important

bulkInsert 的实现:

@Override
public void bulkInsert(Collection<Bond> bonds)
{
    // StatelessSession session = sessionFactory.openStatelessSession();
    Session session = sessionFactory.openSession();
    try
    {
        Transaction t = session.beginTransaction();
        try
        {
            for (Bond bond : bonds)
                // session.persist(bond);
                // session.insert(bond);
                session.save(bond);
        }
        catch (RuntimeException ex)
        {
            t.rollback();
        }
        finally
        {
            t.commit();
        }
    }
    finally
    {
        session.close();
    }

}

从评论中可以看出,我尝试了多种有状态/无状态会话的组合。但都没有成功。

我的dataSource是一个ComboPooledDataSource,其URL如下:

<b:property name="jdbcUrl" value="jdbc:mysql://server:3306/db?autoReconnect=true&amp;rewriteBatchedStatements=true" />

我的SessionFactory

<b:bean id="sessionFactory" class="class.that.extends.org.springframework.orm.hibernate3.LocalSessionFactoryBean" lazy-init="false" depends-on="dataSource">
        <b:property name="dataSource" ref="phoenixDataSource" />
        <b:property name="hibernateProperties">
            <b:props>
                <b:prop key="hibernate.dialect">${hibernate.dialect}</b:prop> <!-- MySQL5InnoDb-->
                <b:prop key="hibernate.show_sql">${hibernate.showSQL}</b:prop>
                <b:prop key="hibernate.jdbc.batch_size">500</b:prop>
                <b:prop key="hibernate.jdbc.use_scrollable_resultset">false</b:prop>
                <b:prop key="hibernate.cache.use_second_level_cache">false</b:prop>
                <b:prop key="hibernate.cache.provider_class">org.hibernate.cache.EhCacheProvider</b:prop>
                <b:prop key="hibernate.cache.use_query_cache">false</b:prop>
                <b:prop key="hibernate.validator.apply_to_ddl">false</b:prop>
                <b:prop key="hibernate.validator.autoregister_listeners">false</b:prop>
                <b:prop key="hibernate.order_inserts">true</b:prop>
                <b:prop key="hibernate.order_updates">true</b:prop>
            </b:props>
        </b:property>
</b:bean>

即使我的项目范围内的类继承自LocalSessionFactoryBean,它并没有覆盖其方法(只是添加了几个项目范围内的方法)。
我已经疯了好几天了。我阅读了一些文章,但没有帮助我实现批量插入。我从使用Spring上下文的JUnit测试中运行了所有代码(所以我可以通过@Autowire注入我的类)。我所有的尝试都只产生了许多单独的INSERT语句。
以下是一些相关链接: 我错过了什么?

我非常绝望,我需要mysql + hibernate批量插入,你能让它工作吗? - Sahbaz
1
唯一的方法是向底层驱动程序发出直接查询。或者转换到C#,在那里您可以使用Entity Framework进行本地批量插入。 - usr-local-ΕΨΗΕΛΩΝ
你能否请看一下这个问题,我会非常感激。 - Sahbaz
1个回答

28

很可能您的查询正在被重写,但通过查看Hibernate SQL日志是不会知道的。Hibernate不会重写插入语句-MySQL驱动程序会重写它们。换句话说,Hibernate将向驱动程序发送多个插入语句,然后驱动程序将对它们进行重写。因此,Hibernate日志仅显示Hibernate发送到驱动程序的SQL,而不是驱动程序发送到数据库的SQL。

您可以通过在连接URL中启用MySQL的profileSQL参数来验证这一点:

<b:property name="jdbcUrl" value="jdbc:mysql://server:3306/db?autoReconnect=true&amp;rewriteBatchedStatements=true&amp;profileSQL=true" />

使用类似于您的示例,这是我的输出结果:

insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
insert into Person (firstName, lastName, id) values (?, ?, ?)
Wed Feb 05 13:29:52 MST 2014 INFO: Profiler Event: [QUERY]  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) duration: 1 ms, connection-id: 81, statement-id: 33, resultset-id: 0, message: insert into Person (firstName, lastName, id) values ('person1', 'Name', 1),('person2', 'Name', 2),('person3', 'Name', 3),('person4', 'Name', 4),('person5', 'Name', 5),('person6', 'Name', 6),('person7', 'Name', 7),('person8', 'Name', 8),('person9', 'Name', 9),('person10', 'Name', 10)

虽然这不是实际发送到MySQL数据库的内容,但Hibernate记录了前10行。最后一行来自MySQL驱动程序,清楚地显示了一个具有多个值的单批次插入,这才是实际发送到MySQL数据库的内容。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接