您的脚本没有按照您的期望执行。解决这个问题有几种方法:
- 使用迭代整个CSV内容的脚本一次性操作整个flowfile
- 将CSV内容中的行视为“记录”,并使用处理单个行的脚本操作每个记录
我将提供更改您的脚本以一次处理整个flowfile内容。您可以在
此处、
此处和
此处了解更多关于Record*处理器的信息。
以下是执行您期望的操作的脚本。请注意区别以了解我更改的地方(当然,这个脚本可以更加高效和简洁;它是冗长的,以演示正在发生的事情,而且我不是Python专家)。
import json
from java.io import BufferedReader, InputStreamReader
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
try:
reader = BufferedReader(InputStreamReader(inputStream))
isHeader = True
try:
lines = []
while True:
line = reader.readLine()
if line is None:
break
if isHeader:
header = line + ",total"
lines.append(header)
isHeader = False
else:
elements = self.extract_elements(line)
sum = self.summation(elements)
newLine = ",".join([line, str(sum)])
lines.append(newLine)
output = "\n".join([str(l) for l in lines])
outputStream.write(bytearray(output.encode('utf-8')))
finally:
if reader is not None:
reader.close()
except Exception as e:
log.warn("Exception in Reader")
log.warn('-' * 60)
log.warn(str(e))
log.warn('-' * 60)
raise e
session.transfer(flowFile, REL_FAILURE)
def extract_elements(self, line):
return [int(x) for x in line.split(',')]
def summation(self, list):
return sum(list)
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)
我流程的结果(使用 GenerateFlowFile
处理器中提供的输入):
2018-07-20 13:54:06,772 INFO [Timer-Driven Process Thread-5] o.a.n.processors.standard.LogAttribute LogAttribute[id=b87f0c01-0164-1000-920e-799647cb9b48] logging for flow file StandardFlowFileRecord[uuid=de888571-2947-4ae1-b646-09e61c85538b,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1532106928567-1, container=default, section=1], offset=2499, length=51],offset=0,name=470063203212609,size=51]
--------------------------------------------------
Standard FlowFile Attributes
Key: 'entryDate'
Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'lineageStartDate'
Value: 'Fri Jul 20 13:54:06 EDT 2018'
Key: 'fileSize'
Value: '51'
FlowFile Attribute Map Content
Key: 'filename'
Value: '470063203212609'
Key: 'path'
Value: './'
Key: 'uuid'
Value: 'de888571-2947-4ae1-b646-09e61c85538b'
--------------------------------------------------
first,second,third,total
1,4,9,14
7,5,2,14
3,8,7,18