NiFi - 如何在ExecuteStreamCommand中引用flowFile?

7
我需要执行类似以下命令的操作: sed '1d' simple.tsv > noHeader.tsv
这将从我的大型流文件(> 1 GB)中删除第一行。
问题是 - 我需要在我的流文件上执行它,所以应该是: sed '1d' myFlowFile > myFlowFile 问题是:我应该如何配置ExecuteStreamCommand处理器以便它在我的流文件上运行该命令并将其返回到我的流文件? 如果sed不是最佳选择,我可以考虑其他方式(例如tail)。

ExecuteStreamCommand processor

谢谢,Michal

编辑2(解决方案):

下面是最终的ExecuteStreamCommand配置,可以实现我需要的功能(从流文件中删除第一行)。 @Andy - 非常感谢您提供的所有宝贵提示。 ExecuteStreamCommand - remove 1st line from the flow

3个回答

8

Michal,

我想确保我正确理解了你的问题,因为我认为有更好的解决方案。

问题:

你有一个1GB的TSV文件加载到NiFi中,你想要删除第一行。

解决方案:

如果你的文件比较小,最好的解决方案是使用ReplaceText处理器并设置以下处理器属性:

  • 搜索值^.*\n
  • 替换值 <- 空字符串

这将在不必将1GB内容发送出NiFi到命令行,然后重新摄取结果的情况下删除第一行。不幸的是,要使用正则表达式,您需要设置最大缓冲区大小,这意味着整个内容需要读入堆内存才能执行此操作。

如果您知道第一行的确切值,并且有一个1GB的文件,您应该尝试使用ModifyBytes,它允许您从流文件内容的开头和/或结尾修剪字节数。然后,您可以简单地指示处理器删除内容的前n个字节。由于NiFi的写时复制内容存储库,您仍将拥有约2GB的数据,但它是使用8192B缓冲区大小以流方式完成的。

我最好的建议是使用ExecuteScript处理器。此处理器允许您使用多种语言(Groovy、Python、Ruby、Lua、JS)编写自定义代码,并在流文件上执行。使用下面的Groovy脚本,您可以删除第一行并以流式传输方式复制其余部分,以便不必要地增加堆的负担。

我用1MB的文件进行了测试,每个flowfile花费大约1.06秒(MacBook Pro 2015,16 GB RAM,OS X 10.11.6)。在更好的设备上,您显然会获得更好的吞吐量,并且可以将其扩展到更大的文件。

def flowfile = session.get()
if (!flowfile) return

try {
    // Here we are reading from the current flowfile content and writing to the new content
    flowfile = session.write(flowfile, { inputStream, outputStream ->
        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))

        // Ignoring the first line
        def ignoredFirstLine = bufferedReader.readLine()

        def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
        def line

        int i = 0

        // While the incoming line is not empty, write it to the outputStream
        while ((line = bufferedReader.readLine()) != null) {
            bufferedWriter.write(line)
            bufferedWriter.newLine()
            i++
        }

        // By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
        log.warn("Wrote ${i} lines to output")

        bufferedReader.close()
        bufferedWriter.close()
    } as StreamCallback)

    session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
    log.error(e)
    session.transfer(flowfile, REL_FAILURE)
}

顺便提一下,对于NiFi来说,一般的好做法是将巨大的文本文件拆分成更小的组件流文件(使用类似SplitText的工具),以便获得并行处理的好处。如果1GB的输入是视频,则不适用此方法,但由于您提到了TSV,我认为将初始流文件拆分成较小的部分并在并行处理它们(甚至将它们发送到集群中的其他节点进行负载平衡)可能有助于提高性能。

编辑:

我意识到我没有回答你最初的问题——如何将流文件的内容传递给ExecuteStreamCommand处理器的命令行调用。如果您想要操作属性值,可以在Arguments字段中使用表达式语言语法${attribute_name}引用属性值。然而,由于内容无法从EL中引用,并且您不想通过将1GB内容移动到属性中来破坏堆,因此最好的解决方案是使用PutFile将内容写入文件,对提供的文件名运行sed命令并将其写入另一个文件,然后使用GetFile将这些内容读回NiFi中的流文件。

这里有一个模板,演示了如何使用ExecuteStreamCommand同时对流文件内容使用revsed,并将输出放入新流文件的内容中。您可以运行该流并监视logs/nifi-app.log以查看输出,或使用数据可靠性查询来检查每个处理器执行的修改。

ExecuteStreamCommand Example

ExecuteStreamCommand Configuration


谢谢你提供如此详尽和详细的答案 - 非常感谢!让我参考一下: 1)替换文本 - 我已经尝试了较小的文件,它可以工作,但是当在整个文本模式下运行此处理器时,我会得到内存越界异常。所以,正如你注意到的那样,在这里它不是最好的选择。 2)修改字节 - 不,第一行的内容对于不同的文件将是不同的。我需要删除该行,而不管其内容 - 换句话说:用“空字符串”替换任何与(^.*)\n正则表达式匹配的内容。 - michalrudko
  1. PutFile + GetFile - 这是一种选择,但我更喜欢不中断流程,我仍在寻找更“优雅”的方法来解决这个挑战。
  2. SplitText - 我实际上正在为SplitText进行预处理,整个并行化将在我的流程中接下来进行。如果您有更多建议,我将非常感激听到它们,再次感谢!
- michalrudko
安迪,我仍然希望更好地理解为什么ExecuteStreamCommand在这里不是一个好的解决方案,从我读到的内容来看,它不会将flow file保存为属性。它所做的是“对流文件的内容执行外部命令,并创建一个带有命令结果的新流文件”。我只想知道如何使用我的sed命令(可以在shell脚本中)来实现它。如果这仍然不是一个选项,我会选择自定义代码解决方案。谢谢! - michalrudko
1
我更新了我的答案,并提供了一个示例,该示例使用 sed 将 flowfile 内容放到命令行上,并将输出获取到下一个 flowfile 的内容中。 - Andy
你最近的帖子给了我一个很好的提示,告诉我如何使用ExecuteStreamCommand处理器。它比我想象的要简单,但是在网上我找不到任何好的例子。非常感谢您的支持和时间 - 将其标记为最终解决方案,并更新我的帖子以包含最终配置。 - michalrudko
显示剩余3条评论

2

由于您想从文件中删除标题,因此我认为使用StripHeader处理器将是更好的选择。

Ankit


安基特,你在哪里看到那个处理器?它没有列在NiFi文档中。 - Andy
@Ankit,我正想问和Andy一样的问题,我也看不到这样的处理器。你能再详细解释一下你的提示吗?谢谢。 - michalrudko
2
刚刚注意到这不是他们的,但你可以构建自己的stripheader处理器。关于逻辑,你可以参考下面的链接: https://github.com/KyloIO/kylo/blob/f66903fe61e5f968856f8e159b50e190de4aa5ca/integrations/nifi/nifi-nar-bundles/nifi-core-bundle/nifi-core-processors/src/main/java/com/thinkbiganalytics/nifi/v2/ingest/StripHeader.java - Ankit Tripathi

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接