使用Nifi ExecuteScript处理器生成多个flowfiles

3

我正在处理一个Nifi流程,其中我获取了一个带有多个键值对的JSON文档。我正在使用具有pythonExecuteScript处理器。

我的目标是基于JSON keys创建各种URLS。这些键是数字,并且长这样:

keys = [10200, 10201, 10202, ...]

我需要的URL有三种类型,它们应该长成这样:

http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200

我想循环遍历我的keys[],并为其中包含的每个数字键创建3个特定的URL。我有以下代码,我正在尝试:

读取列表中的数字键 --> 创建3个URL --> 输出一个flow file。

...... 然后读取列表中的下一个数字键并继续循环.....

我有以下代码,但当我将JSON flowfile传递给它时,它目前什么也不做。请问我做错了什么?

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class ModJSON(StreamCallback):

  def __init__(self):
        self.parentFlowFile = None
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))


    for numerical_key in obj.keys():
      # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
      flowFile = session.create(self.parentFlowFile)
      if (flowFile != None):
        flowFile = session.write(flowFile, "Does not matter")
        flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)

        flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
        flowfiles_list.append(flowFile)

    for flow in flowfiles_list:
      session.transfer(flow, REL_SUCCESS)
1个回答

6

很好的问题,这是回调方法处理流文件API的一个细节。您已经创建了StreamCallback的子类,但是您尚未检索输入流文件或使用它通过您的类的实例覆盖内容。

在定义ModJSON类之后,请尝试执行以下操作:

originalFlowFile = session.get()
if(originalFlowFile != None):
    originalFlowFile = session.write(flowFile, ModJSON())
    session.remove(originalFlowFile)

这将获取一个输入流文件(或等待一个出现),然后调用您的StreamCallback来覆盖您的流文件的内容。在我的示例中,您将丢弃输入流文件,因此如果这是您的用例的正确行为,则可以只扩展InputStreamCallback而不是StreamCallback并删除outputStream.write(),如果您没有使用outputStream。要做到这一点,请将StreamCallback替换为InputStreamCallback,并从process()方法中删除“outputStream”参数。
在您的示例中,一旦添加了上面的代码片段,您就会使用json.dumps()命令覆盖输入内容,并创建和传输新文件,所有文件都在同一个关系(成功)中,所以如果它们不是相同的格式,可能会导致问题(这就是为什么我添加了session.remove())。如果您需要原始流文件通过不同的关系发送而不是其余部分,请考虑使用InvokeScriptedProcessor而不是ExecuteScript。如果您在处理后不关心输入流文件(已完成URL属性的添加),则请按照我的建议进行操作。如果它们都可以通过相同的关系(成功)发送,那么请用我的session.remove()替换。
session.transfer(originalFlowFile, REL_SUCCESS)

查看我的ExecuteScript烹饪书文章(第2部分共3部分)以获取更多Jython(和其他语言)中使用这些用例的示例:)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接