Spring Data JPA - 并发批量插入/更新

7

目前我正在开发一个Spring Boot应用程序,主要从消息队列(~5个并发使用者)中提取产品评论数据,并将它们存储到MySQL数据库中。每个评论可以通过其评论标识符(字符串)进行唯一标识,该标识符是主键,可以属于一个或多个产品(例如,颜色不同的产品)。以下是数据模型的摘录:

public class ProductPlacement implements Serializable{

   private static final long serialVersionUID = 1L;

   @Id
   @GeneratedValue(strategy = GenerationType.AUTO)
   @Column(name = "product_placement_id")
   private long id;

   @ManyToMany(fetch = FetchType.LAZY, cascade = CascadeType.ALL, mappedBy="productPlacements")
   private Set<CustomerReview> customerReviews;
}

public class CustomerReview implements Serializable{

   private static final long serialVersionUID = 1L;

   @Id
   @Column(name = "customer_review_id")
   private String reviewIdentifier;

   @ManyToMany(fetch = FetchType.LAZY, cascade = CascadeType.ALL)
   @JoinTable(
        name = "tb_miner_review_to_product",
           joinColumns = @JoinColumn(name = "customer_review_id"),
           inverseJoinColumns = @JoinColumn(name = "product_placement_id")
        )
   private Set<ProductPlacement> productPlacements;
}

队列中的一条消息包含1-15个评论和产品放置ID。现在我想要一种高效的方法来保存该产品的评论。每个传入的评论需要考虑以下两种情况:
  1. 该评论不在数据库中 -> 插入评论并引用消息中包含的产品。
  2. 该评论已经在数据库中 -> 只需将产品引用添加到现有评论的 Set productPlacements 中。
目前,我用如下方式来持久化评论(使用 Spring Data JpaRespoitories),但它不是最优的:
@Override
@Transactional
public void saveAllReviews(List<CustomerReview> customerReviews, long productPlacementId) {
    ProductPlacement placement = productPlacementRepository.findOne(productPlacementId);
    for(CustomerReview review: customerReviews){
        CustomerReview cr = customerReviewRepository.findOne(review.getReviewIdentifier());
        if (cr!=null){
            cr.getProductPlacements().add(placement);
            customerReviewRepository.saveAndFlush(cr);
        }   
        else{
            Set<ProductPlacement> productPlacements = new HashSet<>();
            productPlacements.add(placement);
            review.setProductPlacements(productPlacements);
            cr = review;
            customerReviewRepository.saveAndFlush(cr);
        }

    }
}

问题:

  1. 有时我会因为违反“reviewIndentifier”的唯一约束而收到“constraintViolationExceptions”。这显然是因为我(并发地)查看审核是否已存在,然后再插入或更新它。我该如何避免这种情况?
  2. 在我的情况下,使用save()还是saveAndFlush()更好?我每秒大约得到50-80个reviews。如果只使用save(),Hibernate会自动刷新,还是会导致内存使用大幅增加?

问题1的更新:在我的Review Repository上简单使用@Lock能否防止唯一约束异常?

@Lock(LockModeType.PESSIMISTIC_WRITE)
CustomerReview findByReviewIdentifier(String reviewIdentifier);

当findByReviewIdentifier返回null时会发生什么?即使该方法返回null,Hibernate是否可以锁定reviewIdentifier以进行可能的插入操作?谢谢!

1
为了消除竞态条件,要么将saveAllReviews()方法设为同步方法,要么根据评论的键(受限制的属性)实现显式锁定。在我们的组织中,我们也需要处理这种情况。经过三年多的尝试和测试,我们无法找到比按键进行锁定更好的方法...也许还有其他做法,我也想学习一下。 - Alex Salauyou
谢谢您的回复。您认为在性能方面,使方法同步和锁定关键字之间有什么区别吗? - JuHarm89
1
当然,使用键锁定会更加有效,因为您可以安全地允许不同键的并发写入。但是,这种方法需要实施工作。您可以先尝试synchronized,如果性能不满意,则考虑更高级的技术。 - Alex Salauyou
1个回答

4
从性能角度考虑,我将考虑通过以下更改来评估解决方案。
1. 从双向ManyToMany改为双向OneToMany
我曾经有过同样的问题,即哪个在执行DML语句时更有效。引用自典型的ManyToMany映射与两个OneToMany
选项一可能在配置方面更简单,但它产生的DML语句效率较低。
请使用第二个选项,因为每当关联由@ManyToOne关联控制时,DML语句总是最有效的。
  1. 启用DML语句的批处理

启用批处理支持可以减少向数据库插入/更新相同数量记录时需要的往返次数。

引用自批量插入和更新语句

hibernate.jdbc.batch_size = 50
hibernate.order_inserts = true
hibernate.order_updates = true
hibernate.jdbc.batch_versioned_data = true


  1. 减少saveAndFlush调用次数

当前代码获取ProductPlacement,并针对每个review进行saveAndFlush,导致DML语句无法批处理。

相反,我建议加载ProductPlacement实体,并将List<CustomerReview> customerReviews添加到ProductPlacement实体的Set<CustomerReview> customerReviews字段中,最后在结束时调用merge方法,做出这两个更改:

  • 使ProductPlacement实体成为关联的拥有者,即通过将mappedBy属性移动到CustomerReview实体的Set<ProductPlacement> productPlacements字段上。
  • 使CustomerReview实体使用equalshashCode方法,使用这些方法中的reviewIdentifier字段。我认为reviewIdentifier是唯一且由用户分配的。
最后,在进行这些更改的性能优化时,请使用当前代码作为基准来评估性能。然后进行更改,并比较更改是否真正为您的解决方案带来了任何显着的性能提升。

虽然这一切无疑都会提高性能,但它如何帮助避免并发查找-插入周期中的竞争条件呢? - Alex Salauyou
1
@SashaSalauyou 这是正确的。这主要解决了性能问题方面的问题。对于竞态条件,我更倾向于使用同步方法,但我想知道是否有更好的方法,但现在还不确定。 - Madhusudana Reddy Sunnapu
@MadhusudanaReddySunnapu 感谢您的建议。我也考虑过将productPlacement作为关系的所有者,但是假设一个产品有2.5k条评论。这样做会导致获取2.5k条评论来添加10个到集合中吗?是否可以向延迟加载的集合中添加项目? - JuHarm89
@JuHarm89 嗯,那样会导致获取所有评论。那么,对于新的Customerreview,由于reviewIdentifier是在代码中手动分配的,我们可以在Customerreview中添加一个boolean isNew临时字段,根据是否为新评论来设置为true/false。然后,在上述代码保存reviews时,我们可以使用HQL来执行插入new评论并创建映射行。这种方式看起来更有效,也许可以解决constraintViolationException的问题。不过,缺点是使用HQL会使SLC无效化。 - Madhusudana Reddy Sunnapu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接