Python脚本使用ExecuteStreamCommand

13

在尽最大努力寻找相关问题和示例之后,仍然没有找到我所需要的答案,所以我决定提交一个问题。

由于以下原因,ExecuteStreamCommand 对我来说似乎是完美的处理器:

  • 我可以执行任何 Python 脚本并避免使用 Jython(类似于 ExecuteScript)。对我来说,Jython 不是一个选项。
  • 我可以接收 FlowFiles。这是必要的,因为我的脚本是用来消费先前处理器的输出的。此外,我喜欢将数据保留在“NiFi 管理”下的想法。
  • 它会写入“执行状态”,这将有助于路由。

简而言之,我想要使用 ExecuteStreamCommand 做的事情是:

  • 摄取先前处理器的输出(确切地说是 Scrapy 爬虫输出的带有 JSON 行的文本文件)
  • 调用一个 Python 脚本(例如 python3 my_script.py
  • 在我的 Python 脚本中加载被摄取的 FlowFile。
  • 选择 FlowFile 的内容。
  • 在 Python 中操作 FlowFile 的内容。
  • 输出原始 FlowFile 的更新版本或创建一个新的 FlowFile。
  • 使用更新/新的 FlowFile 继续我的 NiFi 流程。

为了清晰起见,我目前不理解以下问题:

  • 如何从 ExecuteStreamCommand 处理器中调用 Python 脚本
  • 如何从 Python 中加载 FlowFile
  • 如何在 Python 中更新或创建新的 FlowFile
  • 如何将更新后的 FlowFile 从 Python 输出回 NiFi。

我已经看到过各种 ExecuteScript 的示例,但不幸的是这些示例并不能完全翻译成使用 ExecuteStreamCommand 的方式。

提前感谢您的帮助。任何建议都会受到赞赏。


3
ExecuteStreamCommand 将 flowfile 的内容发送到新的 Python 进程的标准输入流,并将标准输出传输回 flowfile 的内容。因此,您的 Python 代码应该从 stdin 中读取并写入 stdout... - daggett
1个回答

19

根据您的问题,您说您需要在不使用InvokeScriptedProcessorExecuteScript处理器的情况下调用Python脚本,因为您不能使用Jython。鉴于这一要求,您仍然应该能够实现您的目标。虽然需要对框架有一定的熟悉度,但所有这些信息都来自ExecuteStreamCommand文档

您的“我目前不理解的”部分:

  • 如何从ExecuteStreamCommand Processor中调用python脚本

    • 在您的ExecuteStreamCommand处理器中,使用以下配置命令参数(Command Arguments)命令路径(Command Path)属性:

      • 命令参数(Command Arguments): 任意标志或参数,由;分隔 (例如:/path/to/my_script.py)
      • 命令路径(Command Path): /path/to/python3
  • 如何在Python中加载FlowFile

    • 流文件的内容将通过STDIN传递,因此在您的Python脚本中,以与通常处理STDIN相同的方式处理数据。
  • 如何从Python中更新或创建新的FlowFile
NiFi负责在框架中处理flowfile的创建。你的Python脚本传递给STDOUT的任何数据将被填充到传递给ExecuteStreamCommand处理器的"output stream"关系的结果flowfile的内容中。在这种情况下,你的脚本不需要意识到"flowfiles"。如果您使用ISP或ES处理器,则可以使用NiFi脚本API注入到脚本中自动创建或更新flowfile对象。
如何从Python将更新后的FlowFile输出回NiFi。 只需从脚本中将所需的flowfile内容写入STDOUT,然后(假设返回状态代码为0),NiFi将生成一个具有该内容的新flowfile。如果将ESC的"Output Destination Attribute"属性设置为非空值,则NiFi将更新具有相同名称的新属性的现有flowfile,其中包含脚本的输出。

谢谢!我一定会尝试这个。无论如何,这回答了我所有的问题! - vcovo
是的,此时的 ExecuteScript 只能使用带有 Python 2.7 的 Jython。 - Andy
@Andy - 你能解释一下如何在通过ExecuteStreamCommand执行的Python脚本中访问会话吗? - Chetan Hirapara
使用“ExecuteStreamCommand”执行的Python代码将无法访问NiFi会话,因为它在NiFi上下文之外执行。 - Andy
谢谢@Andy,还有一个问题,如何将Python脚本中发生的异常导航到非零状态关系? - Chetan Hirapara
@ChetanHirapara,您可以在脚本中使用try-except,并在except块中使用print语句将错误写回,并附上sys.exit(1)。此外,您可以通过在处理器的设置选项卡中将公告级别标记为None来忽略公告错误。 - Ashok Thakur

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接