Python Script using ExecuteStreamCommand
Asked Answered
T

1

13

After doing my best to find previous questions and examples relevant to this question, and still not finding the answers that I'm looking for I figured that I would submit a question myself.

ExecuteStreamCommand seems like the perfect processor for me due to the following reasons:

  • I am able to execute any Python script and avoid Jython (in a similar fashion as ExecuteScript). Jython is not an option for me.
  • I can take in FlowFiles. This is necessary as my script is made to consume the output of a previous processor. Furthermore I like the idea of keeping the data under "NiFi management".
  • It writes an "execution status" which will be useful for routing.

In a nutshell, what I'm trying to do with ExecuteStreamCommand is:

  • Ingest the output of a previous processor (a Scrapy spider that outputs a text file with JSON lines to be exact)
  • Call a python script (e.g. python3 my_script.py)
  • Load the FlowFile that was ingested in my python script.
  • Select the content of the FlowFile.
  • Operate on the content of the FlowFile within python.
  • Output either an updated version of the original FlowFile or create a new one.
  • Continue with my NiFi flow with the updated/new FlowFile.

For clarity's sake I currently don't understand:

  • How to call the python script (from the ExecuteStreamCommand Processor)
  • How to load up the FlowFile from within Python
  • How to update or create a new FlowFile from within Python
  • How to output the updated FlowFile from Python back to NiFi.

I have come across various examples for ExecuteScript, but unfortunately these don't exactly translate to the use of the ExecuteStreamCommand.

Thank you in advance. Any advice is appreciated.

Tracee answered 24/3, 2018 at 17:54 Comment(1)
ExecuteStreamCommand sends flowfile content to the standard input stream of new python process and transfers standard output back into flowfile content. So your python code should read from stdin and write to stdout...Vladimar
L
19

From your question you say you need to invoke the Python script without using the InvokeScriptedProcessor or ExecuteScript processors because you can't use Jython. Given that requirement, you should still be able to accomplish your goal. While it requires some familiarity with the framework, all of this information is from the ExecuteStreamCommand documentation.

Your "I currently don't understand" section:

  • How to call the python script (from the ExecuteStreamCommand Processor)

    • In your ExecuteStreamCommand processor, configure the Command Arguments and Command Path properties with the following:

      • Command Arguments: any flags or args, delimited by ; (i.e. /path/to/my_script.py)
      • Command Path: /path/to/python3
  • How to load up the FlowFile from within Python

    • The flowfile content will be passed via STDIN, so in your Python script, process that data the same way you would normally process STDIN.
  • How to update or create a new FlowFile from within Python
    • NiFi handles the flowfile creation in the framework. Any data passed by your Python script to STDOUT will be populated into the content of the resulting flowfile passed to the output stream relationship of the ExecuteStreamCommand processor. Your script does not need to have any awareness of "flowfiles" in this instance. If you were instead using the ISP or ES processors, you could use the NiFi scripting API which is automatically injected into the scripts to create or update the flowfile object.
  • How to output the updated FlowFile from Python back to NiFi.
    • Again, simply write the desired flowfile content to STDOUT from your script, and (given a return status code of 0) NiFi will generate a new flowfile with that content. If you set the Output Destination Attribute property of ESC to a non-null value, NiFi will instead update the existing flowfile with a new attribute of the same name containing the output of the script.
Leigh answered 27/3, 2018 at 23:46 Comment(6)
Thank you! I will make sure to try this out. Regardless this answers all of my questions!Tracee
Yes, ExecuteScript can only use Jython with Python 2.7 at this time.Leigh
@Leigh - Can you explain how to access session inside my python script which is executing by ExecuteStreamCommand?Merilynmeringue
Python code executed with ExecuteStreamCommand will not have access to the NiFi session because it is executing outside of the NiFi context.Leigh
Thanks @Andy, one more question, How to navigate exception occured in python script to non-zero status relationship ?Merilynmeringue
@ChetanHirapara you can use try-except in the script and write back the error with print statement in except block along with sys.exit(1). Also, you can ignore bulletin errors by marking bulletin level as None from the settings tab of the processor.Bonin

© 2022 - 2024 — McMap. All rights reserved.