Spring Batch中如何实现跳过处理?

10

我想知道如何在我的ItemWriter中确定Spring Batch当前是处于分块处理模式还是回退的单个项处理模式。首先,我找不到有关这种回退机制的实现方式的信息。

即使我还没有找到解决我的实际问题的方法,我还是想与你分享我对这种回退机制的了解。

如果我漏掉了什么,请随意添加附加信息的答案;-)


你能解释一下现实世界中的问题,是什么让你产生了“作者如何知道当前处理模式”的疑问吗? - Michael Pralow
当然 :-) 我正在存储业务日志(除了我的技术日志)。在这个日志中,每个项目的消息只应该出现一次。如果在处理过程中出现异常,我也会在业务日志中为此项编写错误日志。如果一个项目已经被处理但被回滚,我对它的错误日志不感兴趣。我只想在单个项目处理时记录这些错误。否则,如果我处于块模式,则可能为良好的项目记录错误,只是因为它们在坏块中。 - Peter Wippermann
2个回答

16
实现跳过机制的方法可以在FaultTolerantChunkProcessorRetryTemplate中找到。
假设您已经配置了可跳过的异常,但没有可重试的异常。当前块中有一个失败的项目导致异常。
现在,首先应该写入整个块。在处理器的write()方法中,您可以看到调用了RetryTemplate。它还获取了对RetryCallbackRecoveryCallback的两个引用。
切换到RetryTemplate。找到以下方法:
protected <T> T doExecute(RetryCallback<T> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state)

您可以看到,RetryTemplate 会在未耗尽的情况下(即在我们的配置中仅一次)进行重试。这样的重试将由可重试异常引起。不可重试异常将立即中止重试机制。

在重试用完或被中止后,将调用 RecoveryCallback

e = handleRetryExhausted(recoveryCallback, context, state);

这就是单个项目处理模式的运作方式!

在处理器的write()方法中定义的RecoveryCallback将锁定输入块(inputs.setBusy(true))并运行其scan()方法。在这里,您可以看到从块中取出了一个单独的项目:

List<O> items = Collections.singletonList(outputIterator.next());

如果这个单独的项目能够被ItemWriter正确处理,那么块将完成,并且ChunkOrientedTasklet将运行另一个块(用于下一个单独的项目)。这将导致对RetryCallback的常规调用,但由于块已被RecoveryTemplate锁定,因此scan()方法将立即被调用:

if (!inputs.isBusy()) {
    // ...
}
else {
    scan(contribution, inputs, outputs, chunkMonitor);
}

因此,另一个单独的项目将被处理并重复此过程,直到原始块逐个项目地被处理:

if (outputs.isEmpty()) {
    inputs.setBusy(false);

就是这样,希望这篇内容对你有所帮助。我更希望你能通过搜索引擎轻松找到它,而不是浪费太多时间自己寻找。;-)


1

我原来的问题是ItemWriter想知道它是在块模式还是单个项模式下,可能的方法之一是以下几种替代方案之一:


  • 只有当传递的块大小为1时,才需要进行进一步检查。
  • 当传递的块是java.util.Collections.SingletonList时,我们会相当确定,因为FaultTolerantChunkProcessor执行以下操作:

    List items = Collections.singletonList(outputIterator.next());

    不幸的是,这个类是私有的,所以我们无法使用instanceOf进行检查。

  • 相反,如果块是ArrayList,我们也可以相当确定,因为Spring Batch的Chunk类使用它:

    private List items = new ArrayList();

  • 剩下的一个模糊点就是从执行上下文中读取的缓冲项。但我希望那些也是ArrayLists。

无论如何,我仍然认为这种方法太模糊了。我更希望框架提供这些信息。


另一种选择是将我的ItemWriter钩入框架执行中。也许ItemWriteListener.onWriteError()是合适的。

更新:如果您处于单项目模式并在ItemWriter中抛出异常,则不会调用onWriteError()方法。我认为这是一个错误,我已经提出了https://jira.springsource.org/browse/BATCH-2027

所以这个替代方案被排除了。


这是一个不使用任何框架,直接在编写器中完成相同功能的片段。
    private int writeErrorCount = 0;

@Override
public void write(final List<? extends Long> items) throws Exception {
    try {
        writeWhatever(items);
    } catch (final Exception e) {
        if (this.writeErrorCount == 0) {
            this.writeErrorCount = items.size();
        } else {
            this.writeErrorCount--;
        }

        throw e;
    }
    this.writeErrorCount--;
}

public boolean isWriterInSingleItemMode() {
    return writeErrorCount != 0;
}

注意:在此处应该检查可跳过的异常,而不是一般的异常。

我们不能在单例列表上调用instanceof,但是这个方法可以:`Class<?> singletonListClazz = Class.forName( "java.util.Collections$SingletonList" ); boolean retrying = false; if( items.getClass().equals( singletonListClazz ) ){ retrying = true; }` - slh777
@slh777 感谢您的建议。确实可以这样做。但是,我对这种方法并不满意,因为我们严重依赖框架的实现细节(即在SingleItemMode下返回SingletonList)。我更希望看到框架封装这个功能。 :-( - Peter Wippermann

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接