Introduce time delay before moving flow files to next processor in NiFi
Asked Answered
B

2

8

In NiFi, there exist a data flow to consume from MQTT (ConsumeMQTT) and publish into HDFS path (PutHDFS). I got a requirement to introduce 60 min delay before pushing the consumed data into HDFS path. Found ControlRate and MergeContent processor to be possible solution but not sure.

What is the ideal solution to introduce time delay?

Example: A flow file consumed at 9:00 AM should be published into HDFS at 10:00 AM

enter image description here

Brannon answered 18/5, 2020 at 17:49 Comment(3)
why not just to schedule PutHDFS to run every 60 sec?Weaver
@daggett, we have various process group and each process group output is directed to a common PutHDFS processor. So changing the configuration of PutHDFS will impact other flows as well. Hence its not possible in my caseBrannon
put some dummy updateattribute before puthdf and specify required schedule for it.Weaver
C
13

You can use an ExecuteScript processor to run a sleep(60*60*1000) loop, but this would unnecessarily use system resources.

I would instead introduce a RouteOnAttribute processor which has an output relationship of one_hour_elapsed going to PutHDFS, and unmatched looped back to itself. The RouteOnAttribute processor should have Routing Strategy set to Route to Property Name and a dynamic property (click the + button on the top right of the Properties tab) named one_hour_elapsed. The Expression Language value should be ${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})}.

This expression:

  1. Gets the current time and converts it to milliseconds since the epoch (now():toNumber())
  2. Gets the entryDate attribute of the flowfile (when it entered NiFi) and converts it to milliseconds and adds one hour (entryDate:toNumber():plus(3600000) [3600000 == 60*60*1000])
  3. Compares the two numbers (a:gt(${b}))

If this is not actually the start of your flow, you can use an UpdateAttribute processor to insert an arbitrary timestamp at any point of your flow and calculate from there.

I would also recommend setting the Yield Duration and Run Schedule of the RouteOnAttribute processor to be substantially higher than usual, as you do not want this processor to run constantly as it will do no work. I'd suggest setting this to 1 or 5 minutes to start, as you are introducing a one hour delay already.

Flow showing timing loop

Cirone answered 18/5, 2020 at 19:28 Comment(2)
If you want a delay on the flow level (e.g. since a certain timestamp) this is probably the only straightforward way. However, since the new RetryFlowFile processor is introduced, it is likely more elegant for pure delays at a certain point in the flow. -- I see that someone already wrote an answer with this.Geibel
I'm not a fan of busy waiting.Brought
T
11

Starting from nifi 1.10 this can be done even easier with the RetryFlowfile processor. Use penalty duration for setting the delay time:

enter image description here

Tarmac answered 16/7, 2020 at 14:22 Comment(4)
I like this approach. It is much simpler.Somali
Where is the penalty duration property?Spent
Not in the "Scheduling" tab, but in the "Settings" tabTarmac
You'd like to configure this processor as "Maximum Retries" set to 1, and Automatically Terminate Relationships: "failure". The expected delayed flowfile will be in the "retries_exceeded" relationshipNaughty

© 2022 - 2024 — McMap. All rights reserved.