Michal,
我想确保我正确理解了你的问题,因为我认为有更好的解决方案。
问题:
你有一个1GB的TSV文件加载到NiFi中,你想要删除第一行。
解决方案:
如果你的文件比较小,最好的解决方案是使用ReplaceText
处理器并设置以下处理器属性:
这将在不必将1GB内容发送出NiFi到命令行,然后重新摄取结果的情况下删除第一行。不幸的是,要使用正则表达式,您需要设置最大缓冲区大小,这意味着整个内容需要读入堆内存才能执行此操作。
如果您知道第一行的确切值,并且有一个1GB的文件,您应该尝试使用ModifyBytes
,它允许您从流文件内容的开头和/或结尾修剪字节数。然后,您可以简单地指示处理器删除内容的前n个字节。由于NiFi的写时复制内容存储库,您仍将拥有约2GB的数据,但它是使用8192B缓冲区大小以流方式完成的。
我最好的建议是使用ExecuteScript
处理器。此处理器允许您使用多种语言(Groovy、Python、Ruby、Lua、JS)编写自定义代码,并在流文件上执行。使用下面的Groovy脚本,您可以删除第一行并以流式传输方式复制其余部分,以便不必要地增加堆的负担。
我用1MB的文件进行了测试,每个flowfile花费大约1.06秒(MacBook Pro 2015,16 GB RAM,OS X 10.11.6)。在更好的设备上,您显然会获得更好的吞吐量,并且可以将其扩展到更大的文件。
def flowfile = session.get()
if (!flowfile) return
try {
flowfile = session.write(flowfile, { inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))
def ignoredFirstLine = bufferedReader.readLine()
def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
def line
int i = 0
while ((line = bufferedReader.readLine()) != null) {
bufferedWriter.write(line)
bufferedWriter.newLine()
i++
}
log.warn("Wrote ${i} lines to output")
bufferedReader.close()
bufferedWriter.close()
} as StreamCallback)
session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
log.error(e)
session.transfer(flowfile, REL_FAILURE)
}
顺便提一下,对于NiFi来说,一般的好做法是将巨大的文本文件拆分成更小的组件流文件(使用类似SplitText
的工具),以便获得并行处理的好处。如果1GB的输入是视频,则不适用此方法,但由于您提到了TSV,我认为将初始流文件拆分成较小的部分并在并行处理它们(甚至将它们发送到集群中的其他节点进行负载平衡)可能有助于提高性能。
编辑:
我意识到我没有回答你最初的问题——如何将流文件的内容传递给
ExecuteStreamCommand
处理器的命令行调用。如果您想要操作属性值,可以在
Arguments字段中使用
表达式语言语法
${attribute_name}
引用属性值。然而,由于内容无法从EL中引用,并且您不想通过将1GB内容移动到属性中来破坏堆,因此最好的解决方案是使用
PutFile
将内容写入文件,对提供的文件名运行
sed
命令并将其写入另一个文件,然后使用
GetFile
将这些内容读回NiFi中的流文件。
这里有一个模板,演示了如何使用ExecuteStreamCommand
同时对流文件内容使用rev
和sed
,并将输出放入新流文件的内容中。您可以运行该流并监视logs/nifi-app.log
以查看输出,或使用数据可靠性查询来检查每个处理器执行的修改。
sed
将 flowfile 内容放到命令行上,并将输出获取到下一个 flowfile 的内容中。 - Andy