Python在NiFi中的ExecuteScript:转换flowfile属性和内容

3

我正在尝试在NiFi中创建一个Python脚本,实现以下功能:

  1. 从传入的flowfile中读取一些属性
  2. 读取flowfile的json内容并提取特定字段
  3. 将属性写入传出的flowfile
  4. 用脚本创建新的内容(例如返回新的JSON的API调用)覆盖原来的传入flowfile,并将其发送到成功关系,或删除旧的flowfile并创建想要的新内容。

目前为止,我已经完成了以下工作:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback,InputStreamCallback, OutputStreamCallback

class OutputWrite(OutputStreamCallback, obj):

def __init__(self):
    self.obj = obj

def process(self, outputStream):

    outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

###end class###

flowfile = session.get()

if flowfile != None:

**#1) Get flowfile attributes**

    headers = {
        'Accept-Encoding': 'gzip, deflate, br',
        'Accept': 'application/json, text/plain, */*',
        'Cache-Control': 'no-cache',
        'Ocp-Apim-Trace': 'true',
        'Authorization': flowfile.getAttribute('Authorization')
    }

    collection = flowfile.getAttribute('collection')
    dataset = flowfile.getAttribute('dataset')

    **#2)Get flowfile content**

    stream_content = session.read(flowfile)
    text_content = IOUtils.toString(stream_content, StandardCharsets.UTF_8)
    json_content = json.loads(text_content)

    records = json_content['result']['count']
    pages = records/10000

    **#3) Write flowfile attributes**

    flowfile = session.putAttribute(flowfile, 'collection', collection)
    flowfile = session.putAttribute(flowfile, 'dataset', dataset)

    **#API operations: output_json with desired data**

    output_json = {some data}

    **#4) Write final JSON data to output flowfile**

    flowfile = session.write(flowfile, OutputWrite(output_json))

    session.transfer(flowfile, REL_SUCCESS)
    session.commit()

我的问题是我找不到一种方法将所需的output_json对象的引用作为参数传递给OutputStreamCallback类。有什么解决方法或更好的方法吗?
也许在这种情况下,在类的process函数中执行所有API操作会更容易,但是如何在process函数中获取流入flowfile属性的访问权限(需要会话或flowfile对象)?
非常感谢任何帮助!

用Groovy代替Python怎么样? - daggett
查看这个链接:https://stackoverflow.com/questions/44630514/how-to-update-line-with-modified-data-in-jython/44631896#44631896 - daggett
2个回答

3
你可以尝试像这样做 -
import json
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil

class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            # Read input FlowFile content
            input_text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
            input_obj = json.loads(input_text)
            # Transform content
            output_obj = input_obj   #your input content

            #perform Data tranformation on output_obj

            # Write output content
            output_text = json.dumps(outputJson)
            outputStream.write(StringUtil.toBytes(output_text))
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

outputJson 看起来像是一个打字错误。 - Tim
outputJson是一个变量,你可以将最终的Json值赋给它。我还没有写完整的代码。 - talkdatatome

1

我在下面包含了一个示例的Python代码,它允许自定义PyStreamCallback类来实现逻辑,以转换流文件内容中的JSON。你可以参考Matt Burgess的博客文章上的相关主题,但我建议你考虑使用原生处理器UpdateAttributeEvaluateJSONPath来执行相关活动,并仅在特定需要执行NiFi无法直接处理的任务时才使用自定义代码。

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    obj = json.loads(text)
    newObj = {
          "Range": 5,
          "Rating": obj['rating']['primary']['value'],
          "SecondaryRatings": {}
        }
    for key, value in obj['rating'].iteritems():
      if key != "primary":
        newObj['SecondaryRatings'][key] = {"Id": key, "Range": 5, "Value": value['value']}

    outputStream.write(bytearray(json.dumps(newObj, indent=4).encode('utf-8'))) 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

更新:

要在回调函数中访问流文件的属性,只需将其作为参数传递给构造函数,将其存储为字段,并在process方法中引用它。下面是一个非常简单的示例,将属性my_attr的值与传入的流文件内容连接起来并写回:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self, flowfile):
        self.ff = flowfile
        pass
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        text += self.ff.getAttribute('my_attr')
        outputStream.write(bytearray(text.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback(flowFile))
    session.transfer(flowFile, REL_SUCCESS)

传入的流文件:

--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
    Value: '30'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '1690494181462176'
Key: 'my_attr'
    Value: 'This is an attribute value.'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.

传出流文件:
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'lineageStartDate'
    Value: 'Tue Mar 13 13:10:48 PDT 2018'
Key: 'fileSize'
    Value: '57'
FlowFile Attribute Map Content
Key: 'filename'
    Value: '1690494181462176'
Key: 'my_attr'
    Value: 'This is an attribute value.'
Key: 'path'
    Value: './'
Key: 'uuid'
    Value: 'dc93b715-50a0-43ce-a4db-716bd9ec3205'
--------------------------------------------------
This is some flowfile content.This is an attribute value.

谢谢,但是在你的脚本中,我仍然看不到在处理函数中对流文件属性进行操作的方法。我传入的流文件具有我想要在传出的流文件内容中处理的属性。 - balalaika
我在process方法中更新了答案,添加了用于访问flowfile属性的代码。 - Andy
是的,它比我想象的要简单得多。 - balalaika

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接