从Spring Batch ItemProcessor返回多个项目

21

我正在编写一个Spring Batch作业,在其中一个步骤中,我的处理器代码如下:

@Component
public class SubscriberProcessor implements ItemProcessor<NewsletterSubscriber, Account>, InitializingBean {

    @Autowired
    private AccountService service;

    @Override public Account process(NewsletterSubscriber item) throws Exception {
        if (!Strings.isNullOrEmpty(item.getId())) {
            return service.getAccount(item.getId());
        }
        // search with email address
        List<Account> accounts = service.findByEmail(item.getEmail());
        checkState(accounts.size() <= 1, "Found more than one account with email %s", item.getEmail());
        return accounts.isEmpty() ? null : accounts.get(0);
    }

    @Override public void afterPropertiesSet() throws Exception {
        Assert.notNull(service, "account service must be set");
    }
}

以上代码是可行的,但我发现有一些边缘情况允许每个NewsletterSubscriber拥有多个Account。因此,我需要删除状态检查,并将多个Account传递给项目编写器。

我找到的一个解决方案是更改ItemProcessorItemWriter来处理List<Account>类型,而不是Account,但这有两个缺点:

  • 由于编写器中有嵌套列表,代码和测试变得丑陋且难以编写和维护
  • 最重要的是,因为提供给编写器的列表可能包含多个账户,所以同一事务中可能写入多个Account对象,我想避免这种情况。

是否有任何方法,例如使用监听器或替换Spring Batch使用的某些内部组件,以避免在处理器中使用列表?

更新

我已经在Spring Jira上提交了一个问题来解决这个问题。

我正在研究 FaultTolerantChunkProcessor 中标记为 SimpleChunkProcessor 的扩展点的 isCompletegetAdjustedOutputs 方法,以查看是否能够在某种程度上使用它们来实现我的目标。

欢迎任何提示。

4个回答

22

物品处理器接受一个物品,并返回一个列表

MyItemProcessor implements ItemProcessor<SingleThing,List<ExtractedThingFromSingleThing>> {
    public List<ExtractedThingFromSingleThing> process(SingleThing thing) {
    //parse and convert to list
    }
}

将下游的编写器包装起来以解决问题。这样,从该编写器下游的内容就不必使用列表来工作。

@StepScope
public class ItemListWriter<T> implements ItemWriter<List<T>> {
    private ItemWriter<T> wrapped;

    public ItemListWriter(ItemWriter<T> wrapped) {
        this.wrapped = wrapped;
    }

    @Override
    public void write(List<? extends List<T>> items) throws Exception {
        for (List<T> subList : items) {
            wrapped.write(subList);
        }
    }
}

1
我们是否仍然能够使用FlatFileItemWriter来完成这个任务? - isJulian00

5
没有办法在Spring Batch中调用一个ItemProcessor并返回多个项目,除非你愿意深入了解。如果你真的想了解ItemProcessorItemWriter之间的关系(不推荐),可以查看ChunkProcessor接口的实现。虽然简单情况下(SimpleChunkProcessor)还好,但是如果使用任何容错逻辑(通过FaultTolerantChunkProcessor跳过/重试),它会变得非常难以处理。
一个更简单的选择是将这个逻辑移动到ItemReader中,在返回项目之前执行此操作。将您正在使用的任何ItemReader包装在自定义的ItemReader实现中,在返回项目之前进行服务查找。在这种情况下,您将不再从读取器中返回NewsletterSubscriber,而是基于以前的信息返回一个Account

你的回答是一个非常好的选择。然而,我觉得类型转换应该属于处理器范畴,虽然你的建议是可行的,但我会失去使用 SkipListener 的机会,以获得关于坏输入元素的通知,因为 onSkipInRead 只提供了有关发生异常的信息,显然没有关于导致异常的输入元素的信息。 - Fabio
1
我喜欢包装读取器的建议。我能看到的一个问题是,读取器接口只返回单个项目,因此如果 OP 使用内部读取并读取一个 NewsletterSubscriber,但需要外部读取器返回多个 Account,他不能只返回所有这些内容。他必须管理积压并逐个返回它们。或者我有什么遗漏吗? - Tom Saleeba
@TomSaleeba 是的。目前正在开发一个扩展ListItemReader的CustomReader,因为我有一个一对多的输入输出关系。我正在转换和管理内部的“队列”。 - ionutab

1

不要返回一个账户,而是返回一个AccountWrapper或Collection。写作者显然必须考虑到这一点 :)


-2
你可以使用转换器将你的Pojo(从文件中读取的普通Java对象)转换为实体对象,代码如下:
public class Intializer {

public static LGInfo initializeEntity() throws Exception {
    Constructor<LGInfo> constr1 = LGInfo.class.getConstructor();
    LGInfo info = constr1.newInstance();
    return info;
}
}

在你的项目处理器中

public class LgItemProcessor<LgBulkLine, LGInfo> implements ItemProcessor<LgBulkLine, LGInfo> {

private static final Log log = LogFactory.getLog(LgItemProcessor.class);

@SuppressWarnings("unchecked")
@Override
public LGInfo process(LgBulkLine item) throws Exception {
    log.info(item);
    return (LGInfo) Intializer.initializeEntity();
}

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接