1

See example scenario:

csv file content

john|doe|1
stacy|doe|2

database fields

fname | lname | list_index | raw_text

My objective is to ingest and save CSV file content to database using nifi processor. See sample output inserted in the database below including the record inserted in raw_text column.

 fname | lname | list_index | raw_text
  john | doe   |     1      | "john|doe|1 " 
 stacy | doe   |     2      | "stacy|doe|2"
3
  • What processors have you already checked? Commented Jul 13, 2021 at 15:28
  • You'll want ListFile -> FetchFile -> PutDatabaseRecord Commented Jul 13, 2021 at 15:42
  • Without a header on the file though, you'll likely want ReplaceText before PutDatabaseRecord to insert the header line so the CSVReader you create to use with PutDatabaseRecord will parse the fields correctly Commented Jul 13, 2021 at 22:07

1 Answer 1

1

If you need to do a lot of data customization then you can use ExecuteScript processor to do data manipulation, pipeline should be something like -

ListFile -> FetchFile -> ExecuteScript -> PutDatabaseRecord

Configure your ExecuteScript as below,

Script Engine: python

Script Body:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from org.apache.nifi.processors.script import ExecuteScript
from org.python.core.util.FileUtil import wrap
from io import StringIO
import sys
import os
import datetime


class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):

        with wrap(inputStream) as f:
            lines = f.readlines()
            updated_lines = []
            header_line = 'fname|lname|list_index|raw_text' + '\n'
            updated_lines.append(header_line)
            for line in lines:
                updated_line = line.strip() + '|"' + line.strip() + '"' + '\n'
                updated_lines.append(updated_line)

            with wrap(outputStream, 'w') as filehandle:
                filehandle.writelines("%s" % line for line in updated_lines)

flow_file = session.get()

if flow_file:
    try:
        session.write(flow_file, PyStreamCallback())
        session.transfer(flow_file, ExecuteScript.REL_SUCCESS)

    except Exception as e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
        excp = str(exc_type) + str(fname)+ str(exc_tb.tb_lineno)
        attrMap = {'exception': str(excp)}
        flow_file = session.putAllAttributes(flow_file, attrMap)
        session.transfer(flow_file, ExecuteScript.REL_FAILURE)

Configure PutDatabaseRecord accordingly.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.