我正在处理一个Nifi流程,其中我获取了一个带有多个键值对的JSON文档。我正在使用具有python
的ExecuteScript
处理器。
我的目标是基于JSON keys
创建各种URLS。这些键是数字,并且长这样:
keys = [10200, 10201, 10202, ...]
我需要的URL有三种类型,它们应该长成这样:
http://google.com/10200
http://bing.com/10200
http://yahoo.com/10200
我想循环遍历我的keys[]
,并为其中包含的每个数字键创建3个特定的URL。我有以下代码,我正在尝试:
读取列表中的数字键 -->
创建3个URL -->
输出一个flow file。
...... 然后读取列表中的下一个数字键并继续循环.....
我有以下代码,但当我将JSON flowfile传递给它时,它目前什么也不做。请问我做错了什么?
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class ModJSON(StreamCallback):
def __init__(self):
self.parentFlowFile = None
pass
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
obj = json.loads(text)
flowfiles_list = []
outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8')))
for numerical_key in obj.keys():
# create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes
flowFile = session.create(self.parentFlowFile)
if (flowFile != None):
flowFile = session.write(flowFile, "Does not matter")
flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key)
flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key)
flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key)
flowfiles_list.append(flowFile)
for flow in flowfiles_list:
session.transfer(flow, REL_SUCCESS)