what is a fastest way to remove nifi flowfile content?
Asked Answered
S

2

5

I have a workflow where I am getting json files as a response of rest api. I am getting approximately 100k files in a session. total size of all the files is 15GB. I have to save each file to file system, which i am doing. at the end of the process I have to wait for all the files to be present before I send a success message.

Once I save the file in FS, I am calling notify+wait. but I dont need 15 gb data in flowfile anymore. So to release some space, I thought of using either replaceText or ModifyByte to clear content. so notify+wait runs smoothly. Total wait for this process is 3 hrs.

But process is taking too long in both (replaceText or ModifyByte) case.

Can you suggest, fastest way to clear flowfile data.I do not need any attributes too. so is thr a way I can abandon old flowfile and generate kb flowfile, midway?

what i want is something like generateflowfile, but in middle, so for each of my existing flowfile, i can drop old one, and generate blank flowfile for notify and wait.

Thanks

Saltire answered 15/11, 2018 at 3:37 Comment(0)
K
6

NiFi's Content Repository and FlowFile Repository are based on a copy-on-write mechanism, so if you don't change the contents or metadata, then you are not necessarily "keeping" the 15GB across those processors.

Having said that, if all you need is the existence of such flow files on disk (but not contents or metadata), try ExecuteScript with the following Groovy script:

def flowFiles = session.get(1000)
flowFiles.each {
   session.transfer(session.create(), REL_SUCCESS)
}
session.remove(flowFiles)

This script will grab up to 1000 flow files at a time, and for each one, send an empty flow file downstream. It then removes all the original incoming flow files.

Note that this (i.e. your use case) will "break" the provenance/lineage chain, so if something goes wrong in your flow, you won't be able to tell which flow files came from which parent flow files, etc. This limitation is one reason why you don't see a full processor that performs this kind of function.

Kamerun answered 15/11, 2018 at 3:56 Comment(2)
thanks, i think after comparing pros/cons I think i can slower replaceText processor and not loose lineage.Saltire
You can also do session.create(it) and it won't lose lineage (and it also keeps the same attributes as the incoming flow files)Kamerun
M
2

In case you need to keep the attributes, lineage and metadata you can use the following code (grabs only 1 flowfile at a time). The only thing that changes is the UUID, but otherwise everything is kept - except the content of course.

f = session.get()
session.transfer(session.create(f), REL_SUCCESS)
session.remove(f)
Mcafee answered 23/8, 2021 at 7:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.