Integration and Unit testing Nifi process groups
Asked Answered
S

2

6

I have a few Nifi process groups which I want to run integration tests on before promoting to production. The issue is that I can't seem to find any documentation on how to do so.

Data Provenance seems like a promising tool to accomplish what I want, however, over the course of the flowfile's lifecycle, data is published to/from kafka or the file system. As a result, the flowfile UUID changes so I cannot query for it using the nifi-api.

Additionally, I know that Nifi offers a TestRunner library to run tests, however, this seems to only be for processors/processor groups generated via code and not the UI.

Does anyone know of a tool, framework, or pattern for integration and unit testing nifi process groups. Ideally this would be a solution where you can programatically compare input/output of the processor/processor group without modifying the existing workflow.

System answered 13/8, 2018 at 22:42 Comment(0)
U
2

With the introduction of the Apache NiFi Registry, we have seen users promote flows from a development/sandbox environment to a test/QE environment where there are existing "test harness" flows surrounding the "flow under test" so that they can send repeatable and deterministic (or an anonymized sample of real production data) through the flow and compare the results to an expected value.

As you point out, there is a TestRunner class and a whole testing framework provided for unit tests. While it can be difficult to manually translate a UI-constructed flow to the programmatic construction, you could also create something like a translator to accept a flow template or flow.xml.gz file and convert it into something processable by the test framework.

Unexperienced answered 13/8, 2018 at 23:24 Comment(4)
In the case of a "test harness" wouldn't that require some kind of input/output ports of added to the process group? If that's the case then it kind of botches the idea of using some sort of automated workflow test suite so as tests pass I can promote to QA, prod, etc. as each environment would need to modify the workflow to meet it's needs. I've seen Nifi Registry and the Nipy or Nifi CLI to accomplish tasks of promoting between environments, but I have not seen anything on testing.System
If you want to import a PG without modifying it at all, this would be non-trivial as you would need to have some way to ingest/output data. What I was describing is an existing PG that contained an input and output port, with the "flow under test" is the n-2 components between those ports. This makes the PG itself a reusable component, as dev/test/production would send different data into the input port, but the connection which would be modified is outside of the PG itself. If you have PGs that do not consume/publish data outside of them, I'm not sure how you measure them in the first place.Unexperienced
This all specifically depends on the design of the flows under testing - I couldn't specify a generic harness that works in all permutations of data engineering. The simplest solution I can think of is nesting the flow inside another PG where key variables for data locations are in the PG variable registry, then a test script is launched at the end of processing which uses those variables to validate input/output characteristics. This at least I could automate using NiPyApi, but I agree with Andy that having a standard PG design pattern with input/output ports would be more robust.Bahr
Does anybody have an example of a 'Test Harness' flow that is mentioned above?Factious
T
1

Maybe plumber will help you with flow testing.

We also wanted to test whole NiFi flows, not just single processor, so we created this library and decided to open-source it. Simple example in Scala:

    // read flow previously exported from NiFi
    val template = TemplateDeserializer.deserialize(this.getClass.getClassLoader.getResourceAsStream("exported-flow.xml"))
    val flow = NifiTemplateFlowFactory(template).create()
    // enqueue some data to any processor
    flow.enqueueByName("csv row,12,another value,true", "CsvParserProcessor")

    // run entire flow once
    flow.run(1)

    // get the results from any processor
    val records = flow.resultsFromProcessorRelation("LastProcessorInFlow","successRelation")
    records should have size 1

This library is still under development so improvements and ideas are welcomed! :)

Totality answered 10/1, 2019 at 16:28 Comment(4)
How would we test flows that have processors which dont accept input but connect to external systems? Can we mock them?Lundeen
I am not able to mention processor name to flow.enqueueByName method but I need to provide input port name. I don't have any input port as I am not using any processor group. How can I execute the test in my scenarioAirsickness
@Lundeen if you don't want to/can't connect to this external systems then you need to mock them (replace) or add another processor which will mock this external system. GenerateFlowFile might be good choice.Totality
@SajinSurendran you can add input port in your test, connect it to your flow and enqueue files to this port.Totality

© 2022 - 2024 — McMap. All rights reserved.