Merge two JSON flowfile together in NiFi
Asked Answered
M

3

5

i want to merge two flowfile that contain JSON object by same specified attribute...

flow1:

attribute:    
xuuid = 123456

content:
{
"sname":"jack",
"id":"00001",
"state":"NY"
}

flow2:

attribute:    
xuuid = 123456

content:
{
"country":"US",
"date":"1983"
}

and i expect this form of data in single output flow:

desired_flow:

attribute:    
xuuid = 123456

content:
{
"sname":"jack",
"id":"00001",
"state":"NY",
"country":"US",
"date":"1983"
}

how do i play with this? MergeContent processor or MergeRecord? i think mergerecord can handle it but i confusing on it.

Misbecome answered 2/9, 2018 at 13:31 Comment(0)
T
4

What you are asking for is a streaming join and it is not something that NiFi really does, similar question and answer here:

https://mcmap.net/q/1872079/-how-to-join-two-csvs-with-apache-nifi

The merge processors are made to merge pieces of data one after another, not to perform a streaming join. For example, if you have many small json messages you would want to use MergeContent or MergeRecord to merge together thousands of them into a single flow file before writing to HDFS.

Trula answered 4/9, 2018 at 13:13 Comment(0)
M
5

Yes MergeContent can do this for you.

I use EvalJson --> MergeContent --> AttributesToJson

I have posted a template here you can use to play around. Apache NiFi Merge Json Template

The MergeContent must have this settings : "Keep all attributes","2 a num of entires" ,"Delimiters strategy is Text"

Matteroffact answered 2/9, 2018 at 23:36 Comment(6)
how about the attribute(xuuid)? this merge must be done with correlation of this attribute...Misbecome
using EvalJson is more consume system RAM (because of storing attributes in RAM) than other methods and i not prefer this solution. two json i presented above is sample and in real work every flow size of my work(multi field of text) is near 10MB, then it's not probable for using that processor.Misbecome
i use merge record in other project and that is so useful and powerful processor... i think JSON tree reader and JSON writer can handle this but because of it's complexity i confused in and i need to who has expert on it.Misbecome
i am using EvalJson with 1 Gb payloads and have no issue, the json reader + writer will use same lib to brake down the json flowfile anyway.Matteroffact
in just two flow file it's work, but in my case flows not ordinal and one flow must merge with other flow from other stream that attribute is same as it.... i test your stream and change time of record generator and then result came wrong!Misbecome
well that was not in the scope of you initial questionMatteroffact
T
4

What you are asking for is a streaming join and it is not something that NiFi really does, similar question and answer here:

https://mcmap.net/q/1872079/-how-to-join-two-csvs-with-apache-nifi

The merge processors are made to merge pieces of data one after another, not to perform a streaming join. For example, if you have many small json messages you would want to use MergeContent or MergeRecord to merge together thousands of them into a single flow file before writing to HDFS.

Trula answered 4/9, 2018 at 13:13 Comment(0)
A
1

An answer to another question shows how this can be done with MergeContent followed by a JoltTransformJSON.

Like the OP here, I wanted to merge on a particular attribute (filename, in my case) so my MergeContent config was slightly different:

Merge Strategy: Bin-Packing Algorithm
Merge Format: Binary Concatenation
Correlation Attribute Name: filename  # or xuuid, or whatever you want
Minimum Number of Entries: 2
Delimiter Strategy: Text
Header: [
Footer: ]
Demarcator: ,

After that, the second part of the solution is the same:

Then transfer to JoltTrasnformJSON and set Jolt Transformation DSL to Shift and Jolt Specification to:

{
  "*": {
    "*": "&"
  }
}

This should do the job :)

Smashing solution, all kudos to @Ben Yaakobi.

The only thing I can add, by way of explanation, is that @Bryan Bende's answer is technically true, i.e. NiFi isn't designed for this kind of thing. Accordingly, the answer above is a bit of a hack:

  • In the first part, MergeContent actually ignores the fact we're working with JSON altogether (its Binary Concatenation means it's just dealing with the content as raw bytes). It just "fakes" merging the two records into a JSON array by using Header, Footer and Decmarcator settings as shown, which happen to be JSON syntax.
  • Then in the second part, Jolt is able to parse that munged text as valid JSON, and apply its transformational magic.

For a better understanding of the Jolt syntax used, here are some helpful resources on the topic:

See also some alternative approaches mentioned here. In particular, I think the approach of using Use MergeRecord / MergeContent with a correlation attribute or Defragment mode, followed by QueryRecord with COALESCE and GROUP BY in order to join together the columns from both datasets, would be most relevant to this question (although I haven't tried this myself).

Aqueous answered 28/7, 2021 at 1:21 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.