如何在Nifi中向Python脚本传递参数

3
也许这是一个愚蠢的问题,但我还是要问一下。
我在Nifi中有一个Collect_data处理器,它将消息流式传输到另一个进程中,该进程使用Python脚本解析并创建JSON文件。问题是我不知道Python脚本中的函数输入是什么。如何将那些消息(16位数字)从Collect_data处理器传递到包含Python脚本的下一个处理器中。是否有任何关于此的好的基本示例?
我已经在网上寻找了一些示例,但并没有真正理解。
import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time


def parse_zap(inputStream, outputStream):
    data = inputStream
    buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
    buf = int(buf, 16)
    buf_check = str(buf)
    if buf_check[17] == 2:
        pass
    datetime_now = datetime.now()
    log_date = datetime_now.isoformat()
    try:
        mac = buf_check[7:14].upper()
        ams_id = buf_check[8:]
        action = buf_check[3:4]
        time_a = int(time())
        dict_test = {
        "user": {
            "guruq" : 'false'
        },
        "device" : {
            "type" : "siolbox",
            "mac": mac
        },
        "event" : {
            "origin" : "iptv",
            "timestamp": time_a,
            "type": "zap",
            "product-type" : "tv-channel",
            "channel": {
                "id" : 'channel_id',
                "ams-id": ams_id
            },
            "content": {
                "action": action
            }
        }
        }
        return dict_test
    except Exception as e:
        print('%s nod PARSE 500 \"%s\"' % (log_date, e))

我想我读得没错,但是现在无法生成输出。

提前感谢。

3个回答

3

我想我理解了你的问题,但是关于你的流程有些含糊不清。我会针对几种可能的情况进行回答。

  1. 您有一个处理器从源(即FetchFTP)获取数据,并连接到包含Python脚本以转换这些值的ExecuteScript处理器。在这种情况下,Python脚本可以使用标准API直接操作flowfile属性和内容。请参阅Matt Burgess的博客,了解编写自定义脚本以操作数据的许多示例。
  2. 您有一个处理器从源获取数据,并连接到一个ExecuteStreamCommand处理器,该处理器使用类似python my_external_script.py arg1 arg2 ...的命令调用外部Python脚本。在这种情况下,flowfile内容通过ExecuteStreamCommand处理器传递到STDIN,因此您的脚本应以这种方式消耗它。 此答案更多地解释了如何使用Python脚本与ExecuteStreamCommand
  3. 您有一个自定义处理器,内部调用单独的Python进程。这是一个不好的想法,应重构为其他模型之一。这会破坏关注点分离,失去处理器生命周期协助,遮盖线程处理和时间,缺乏可靠性可见性,违反NiFi的开发模型。

如果您的Python脚本非常简单,可以将其放入ScriptedRecordWriter中,并使用它来同时处理多个“记录”以获得性能优势。根据您的流和传入数据的外观,这可能对您的用例来说过于高级。

更新2018-10-03 10:50

尝试在ExecuteScript主体中使用此脚本:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        result = parse_zap(text)

        outputStream.write(bytearray(result.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback())
    flowFile = session.putAttribute(flowFile, "parsed_zap", "true")
    session.transfer(flowFile, REL_SUCCESS)

// Your parse_zap() method here, with the signature changed to just accept a single string
...

首先,非常感谢您的帮助。但我还是不明白。我需要为此创建类吗?我已经在Nifi处理器窗口中更新了我的Python脚本,请给我一些直接的提示吗? - jovicbg

3

看看这个脚本:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

它从属性中获取需要从flowfile中删除的行数,然后再将flowfile重新写入,而没有这些行,这很容易,也是如何使用属性和如何使用flowfile的良好示例。

根据您更新的代码,您的代码应该像这样:

import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time


class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    data = inputStream
    buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
    buf = int(buf, 16)
    buf_check = str(buf)
    if buf_check[17] == 2:
        pass
    datetime_now = datetime.now()
    log_date = datetime_now.isoformat()
    try:
        mac = buf_check[7:14].upper()
        ams_id = buf_check[8:]
        action = buf_check[3:4]
        time_a = int(time())
        dict_test = {
        "user": {
            "guruq" : 'false'
        },
        "device" : {
            "type" : "siolbox",
            "mac": mac
        },
        "event" : {
            "origin" : "iptv",
            "timestamp": time_a,
            "type": "zap",
            "product-type" : "tv-channel",
            "channel": {
                "id" : 'channel_id',
                "ams-id": ams_id
            },
            "content": {
                "action": action
            }
        }
        }
        return dict_test
    except Exception as e:
        print('%s nod PARSE 500 \"%s\"' % (log_date, e))

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)        

我已经更新了我的问题,并附上了在Nifi窗口中使用的Python脚本代码。不确定是否需要创建类并放置一个函数在其中?基于我的代码,任何直接的提示都将非常有帮助。顺便说一句,谢谢 :) - jovicbg

0

我能够使用这里描述的方法从Python脚本中访问参数。

基本上,你需要做的是:

  1. 停止执行Python脚本的进程
  2. 配置该进程
  3. 向进程添加一个属性(例如,myProperty
  4. 像这样从脚本中访问该属性:myProperty.evaluateAttributeExpressions().getValue()
  5. 重新启动该进程

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接