介绍
如我所承诺的那样,我已经编写了一个完整的示例。我使用了MySQL并创建了三个表,如下所示:
CREATE TABLE `test{1,2,3}` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`data` varchar(255) NOT NULL UNIQUE,
PRIMARY KEY (`id`)
);
test2
最初包含一行数据。
INSERT INTO `test2` (`data`) VALUES ('a');
(
我已将完整代码发布到http://pastebin.com。)
以下示例执行了几个操作:
- 将
threads
设置为3
,确定要并行运行多少个作业。
- 创建
threads
个连接。
- 默认情况下,对每个表输出一些示例数据(默认数据为每个表的
a
)。
- 创建要运行的
threads
个作业,并加载它们的数据。
- 在
threads
个线程中运行这些作业,并等待它们完成(无论成功或失败)。
- 如果没有发生异常,则提交每个连接;否则,回滚每个连接。
- 关闭连接(但这些连接可以重复使用)。
(请注意,在SQLTask.call()
中我使用了Java 7的自动资源管理功能。)
逻辑
public static void main(String[] args) throws SQLException, InterruptedException {
int threads = 3;
List<Connection> connections = getConnections(threads);
Map<String, String> tableData = getTableData(threads);
List<SQLTask> tasks = getTasks(threads, connections);
setData(tableData, tasks);
try {
runTasks(tasks);
commitConnections(connections);
} catch (ExecutionException ex) {
rollbackConnections(connections);
} finally {
closeConnections(connections);
}
}
数据
private static Map<String, String> getTableData(int threads) {
Map<String, String> tableData = new HashMap<>();
for (int i = 1; i <= threads; i++)
tableData.put("test" + i, "a");
return tableData;
}
任务
private static final class SQLTask implements Callable<Void> {
private final Connection connection;
private String data;
private String table;
public SQLTask(Connection connection) {
this.connection = connection;
}
public void setTable(String table) {
this.table = table;
}
public void setData(String data) {
this.data = data;
}
@Override
public Void call() throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(String.format(
"INSERT INTO `%s` (data) VALUES ('%s');", table, data));
}
return null;
}
}
private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
List<SQLTask> tasks = new ArrayList<>();
for (int i = 0; i < threads; i++)
tasks.add(new SQLTask(connections.get(i)));
return tasks;
}
private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
Iterator<SQLTask> j = tasks.iterator();
while (i.hasNext()) {
Entry<String, String> entry = i.next();
SQLTask task = j.next();
task.setTable(entry.getKey());
task.setData(entry.getValue());
}
}
运行
private static void runTasks(List<SQLTask> tasks)
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
List<Future<Void>> futures = executorService.invokeAll(tasks);
executorService.shutdown();
for (Future<Void> future : futures)
future.get();
}
结果
给定由getTableData(...)
返回的默认数据
test1 -> `a`
test2 -> `a`
test3 -> `a`
由于test2
已经包含了a
(且data
列是唯一的),所以第二个任务将失败并抛出异常,因此每个连接都将被回滚。
如果您返回b
而不是a
,那么连接将被安全提交。
可以通过类似LOAD DATA
的方式来完成此操作。
在我的答案中得到OP的回复后,我意识到她/他想要做的事情无法简单明了地完成。
基本上问题在于,在成功提交之后,已经提交的数据无法回滚,因为该操作是原子性的。在给定的情况下需要多次提交,除非跟踪所有数据(在所有事务中)并在发生某些事件时删除成功提交的所有内容,否则无法回滚所有内容。
有一个很好的答案涉及提交和回滚的问题。
Connection
对象向单个表中发出LOAD DATA
命令以从文件中加载数据,则不可能。LOAD DATA
的文档指出:“如果您指定CONCURRENT[...],则其他线程可以在LOAD DATA执行时从表中检索数据。” 如果在LOAD DATA
执行时从数据库中读取内容可能会有问题,那么同时写入内容就是我的大忌了。 - Kohányi Róbert