Apache Storm: Track tuples by unique ID from Source Spout to Final Bolt
Asked Answered
D

2

6

I want a method of uniquely identifying tuples throughout a whole Storm topology, so that each tuple can be tracked from Spout to the final Bolt.

The way I understand it is when passing a unique message id with an emit from a spout for example:

String msgID = UUID.randomUUID();
// emits a line from user tasks with msg id
outputCollector.emit(new Values(task), msgID);

This ID is somehow returned when acked to the Spout (Can this be simulated earlier to get back the passed Id at any point?). But the using of get message id on a tuple for example:

inputTuple.getMessageId()

This returns a new messageId not the one passed in at the Spout that is generated by the Tuple. Reference https://groups.google.com/forum/#!topic/storm-user/xBEqMDa-RZs

Questions

1) Is there a way to get the tuple.getMessageId() when the collector emits the Tuple.

2) Alternatively can the passed in messageId at the spout be got somehow from the tuple at any spout or bolt in the toplogy?

End Solution I want to be able to set an ID on a tuple when it is emitted, and then be able to identify that tuple again at any point in the Storm topology.

Or will the unique messageId that my system will track with have to be passed as a field/value on each output of each spout and bolt.

Thanks

Dhoti answered 19/7, 2015 at 15:48 Comment(0)
Z
3

It is not possible to access the system generated IDs at the producer (only at the consumer via tuple.getMessageId(). In order to track tuples as you want it to do, you need to (following you own idea) add the ID as a regular field value to the tuple and copy it in each bolt to the corresponding output tuple(s).

Zasuwa answered 19/7, 2015 at 16:58 Comment(0)
M
0

Several parts to this answer. First, as you correctly point out, it's up to you to come up with a unique ID in your spout for each tuple you emit. Second, if you want to access that ID anywhere in your topology then add that ID to the composite Tuple emitted by the Spout. Third (just for completeness), if there's anything in your emitted tuple that you'll need to know when handling an ack or a fail in your Spout then add that information as part of a composite value that makes up your message ID.

Just as an example, I usually use the Tuple itself as the message ID, too, when emitting a tuple from a spout:

outputCollector.emit(myTuple, myTuple);

This might be overkill, but at least I have access to all of the information in the tuple everywhere.

Mauer answered 20/7, 2015 at 12:59 Comment(2)
Could you please expand on "then add that ID to the composite Tuple emitted by the Spout"? I didn't think you could add info to a tuple especially at this stage where it is just your collector emitting values?, unless you mean pass it as a regular field value between each Spout and Bolt as stated at the bottom of my question. Thanks.Dhoti
I meant as a regular field as stated at the bottom of your question.Mauer

© 2022 - 2024 — McMap. All rights reserved.