If you need to do a lot of data customization then you can use ExecuteScript processor to do data manipulation, pipeline should be something like -
ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord
Configure your ExecuteScript as below,
Script Engine: python
Script Body:
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
with wrap(inputStream) as f:
lines = f.readlines()
updated_lines = []
header_line = 'fname|lname|list_index|raw_text' + '\n'
updated_lines.append(header_line)
for line in lines:
updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
updated_lines.append(updated_line)
with wrap(outputStream, 'w') as filehandle:
filehandle.writelines("%s" % line for line in updated_lines)
flow_file = session.get()
if flow_file:
try:
session.write(flow_file, PyStreamCallback())
session.transfer(flow_file, ExecuteScript.REL_SUCCESS)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
attrMap = {'exception': str(excp)}
flow_file = session.putAllAttributes(flow_file, attrMap)
session.transfer(flow_file, ExecuteScript.REL_FAILURE)
Configure PutDatabaseRecord accordingly.