使用多个连接进行单个事务(MYSQL/JDBC)

7
我正在开发的应用程序是基于Java的ETL过程,用于将数据加载到多个表中。DBMS是Infobright(一种面向数据仓库的基于MYSQL的DBMS)。
数据加载应该是原子的;但是出于性能考虑,我希望同时将数据加载到多个表中(使用LOAD DATA INFILE命令)。这意味着我需要打开多个连接。
是否有任何解决方案可以让我在原子和并行地进行加载? (我猜答案可能取决于我要加载的表的引擎;它们中的大多数都是Brighthouse,允许事务,但没有XA和Savepoints)。
进一步说明,我想避免以下情况:
  • 我将数据加载到5个表中
  • 我提交了前4个表的加载
  • 第5个表的提交失败
在这种情况下,我无法回滚前4个加载,因为它们已经提交。

我不熟悉InfoBright,但是你锁定的是表,而不是数据库,所以只要它们是分开的表,你应该能够生成一些线程并且让每个线程锁定并填充不同的表。 - Kevin
“单个事务使用多个连接”是什么意思?如果您想使用多个Connection对象向单个表中发出LOAD DATA命令以从文件中加载数据,则不可能。 LOAD DATA文档指出:“如果您指定CONCURRENT[...],则其他线程可以在LOAD DATA执行时从表中检索数据。” 如果在LOAD DATA执行时从数据库中读取内容可能会有问题,那么同时写入内容就是我的大忌了。 - Kohányi Róbert
我认为我的问题被误解了。我想要使用每个连接加载到单独的表中。但是我希望所有的加载都是单个事务的一部分。这意味着,我要么提交所有表的数据,要么回滚所有表的数据。 - Filip
非常有趣,如果我有时间,我会尝试编写一个答案,但基本上您需要为每个连接禁用自动提交,创建保存点,将内容加载到您的表中并进行提交。如果您的事务失败,则可以回滚所有操作。 - Kohányi Róbert
@MasterF 我撒了个谎,因为我没有使用保存点... 哦好吧。 - Kohányi Róbert
@MasterF 看起来你这里有一个特殊的使用案例,但我认为你的方法过于复杂。如果你可以告诉我们你使用的数据类型和你想要将这些数据加载到的表格,也许我们可以给你一些见解,更好的选择或其他任何建议。 - Kohányi Róbert
2个回答

5

介绍

如我所承诺的那样,我已经编写了一个完整的示例。我使用了MySQL并创建了三个表,如下所示:

CREATE TABLE `test{1,2,3}` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `data` varchar(255) NOT NULL UNIQUE,
  PRIMARY KEY (`id`)
);

test2 最初包含一行数据。

INSERT INTO `test2` (`data`) VALUES ('a');

(我已将完整代码发布到http://pastebin.com。)
以下示例执行了几个操作:
  1. threads设置为3,确定要并行运行多少个作业。
  2. 创建threads个连接。
  3. 默认情况下,对每个表输出一些示例数据(默认数据为每个表的a)。
  4. 创建要运行的threads个作业,并加载它们的数据。
  5. threads线程中运行这些作业,并等待它们完成(无论成功或失败)。
  6. 如果没有发生异常,则提交每个连接;否则,回滚每个连接。
  7. 关闭连接(但这些连接可以重复使用)。
(请注意,在SQLTask.call()中我使用了Java 7的自动资源管理功能。)

逻辑

public static void main(String[] args) throws SQLException, InterruptedException {
  int threads = 3;
  List<Connection> connections = getConnections(threads);
  Map<String, String> tableData = getTableData(threads);
  List<SQLTask> tasks = getTasks(threads, connections);
  setData(tableData, tasks);
  try {
    runTasks(tasks);
    commitConnections(connections);
  } catch (ExecutionException ex) {
    rollbackConnections(connections);
  } finally {
    closeConnections(connections);
  }
}

数据

private static Map<String, String> getTableData(int threads) {
  Map<String, String> tableData = new HashMap<>();
  for (int i = 1; i <= threads; i++)
    tableData.put("test" + i, "a");
  return tableData;
}

任务

private static final class SQLTask implements Callable<Void> {

  private final Connection connection;

  private String data;
  private String table;

  public SQLTask(Connection connection) {
    this.connection = connection;
  }

  public void setTable(String table) {
    this.table = table;
  }

  public void setData(String data) {
    this.data = data;
  }

  @Override
  public Void call() throws SQLException {
    try (Statement statement = connection.createStatement()) {
      statement.executeUpdate(String.format(
        "INSERT INTO `%s` (data) VALUES  ('%s');", table, data));
    }
    return null;
  }
}

private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
  List<SQLTask> tasks = new ArrayList<>();
  for (int i = 0; i < threads; i++)
    tasks.add(new SQLTask(connections.get(i)));
  return tasks;
}

private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
  Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
  Iterator<SQLTask> j = tasks.iterator();
  while (i.hasNext()) {
    Entry<String, String> entry = i.next();
    SQLTask task = j.next();
    task.setTable(entry.getKey());
    task.setData(entry.getValue());
  }
}

运行

private static void runTasks(List<SQLTask> tasks) 
    throws ExecutionException, InterruptedException {
  ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
  List<Future<Void>> futures = executorService.invokeAll(tasks);
  executorService.shutdown();
  for (Future<Void> future : futures)
    future.get();
}

结果

给定由getTableData(...)返回的默认数据

test1 -> `a`
test2 -> `a`
test3 -> `a`

由于test2已经包含了a(且data列是唯一的),所以第二个任务将失败并抛出异常,因此每个连接都将被回滚。

如果您返回b而不是a,那么连接将被安全提交。

可以通过类似LOAD DATA的方式来完成此操作。


在我的答案中得到OP的回复后,我意识到她/他想要做的事情无法简单明了地完成。

基本上问题在于,在成功提交之后,已经提交的数据无法回滚,因为该操作是原子性的。在给定的情况下需要多次提交,除非跟踪所有数据(在所有事务中)并在发生某些事件时删除成功提交的所有内容,否则无法回滚所有内容。

有一个很好的答案涉及提交和回滚的问题。


抱歉,直到现在我才有机会阅读这篇文章。这与我目前正在做的非常相似,并且它很好地涵盖了像SQL错误这样的情况。 - Filip
1
但是,它并没有涵盖实际提交失败的情况(不太可能,但有可能)。请尝试在commitConnections方法中的“for”循环中添加System.in.read()(在执行实际提交之前)。对于前两个提交,请按Enter键,然后停止mysql服务器并按Enter键进行第三个提交。第三次提交将失败,但前两次提交已经执行。这就是我想避免的情况。 - Filip
@MasterF 在提交(commit)之后,已经提交的内容是无法回滚的。除非你追踪(commit)了哪些内容并在提交(commit)之后删除这些行。如果你尝试提交(commit)到完全不同的表格,则永远不会收到诸如“重复ID”等错误,因此这不是问题。如果你连接到远程数据库,则可能会在第三个commit()执行时断开你和数据库之间的链接,从而防止数据从该批次传播到数据库。基本上,你想在事务中打破ACID规则。这不会顺利进行。 - Kohányi Róbert

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接