Oozie coordinator with Asynchronous Data Set
Asked Answered
A

1

7

We want to schedule a workflow based on data availability but there is no particular frequency of data arrival. Also there could be multiple re-runs of data and hence multiple versions of the data for the day arriving at any time.

As I understand from the specification, currently it's mandatory to specify frequency parameter in coordinator.

However, we would like to trigger our workflow based on some event (data arrival or partition creation) only without depending on the frequency.

Seems this qualifies for Asynchronous Data Set. Does Oozie support Asynchronous Data Set yet?

Ax answered 25/1, 2015 at 13:22 Comment(2)
did you solve this ? i also have similar requirement. thanksNorry
Do we know, how we can do it ? - There is one approach mentioned - by triggering workflow directly via shell and use shell commands to sense the data.Fregger
D
0

The frequency parameter is mandatory, but you can specify an input event, something like this:

<datasets>
    <dataset name="mydata" frequency="${coord:days(1)}" initial-instance="${initial_instance}" timezone="UTC">
        <uri-template>${hcat_uri}/${hcatDatabase}/${hcatTable}/dt=${YEAR}${MONTH}${DAY}</uri-template>
    </dataset>
</datasets>
<input-events>
    <data-in name="MYDATA_IN" dataset="mydata">
        <instance>${coord:current(0)}</instance>
    </data-in>
</input-events>

https://oozie.apache.org/docs/3.1.3-incubating/CoordinatorFunctionalSpec.html#a6.1.4._Input_Events

So define a relatively low frequency and a meaningful unit and it will wait for the data availability. Probably it's make sense to specify a timeout for the coordinator, what's less than the frequency:

<timeout>[TIME_PERIOD]</timeout>

Or you can coordinate (start) your workflow directly (without coordinator) with e.g.: a cronjob, but that's not nice at all.

Dupre answered 28/4, 2015 at 18:40 Comment(4)
Thanks kecso for your reply. We have configured our coordinator with the input events in similar fashion as you mentioned. There is an HCat table with partition schema (value_date, id). "id" is an identifier of the data which has been submitted by the data producers. So coordinator can wait on data with id=xyz as follows: value_date=2015-01-01;id=xyz Now this works fine. And also since the frequency is 1 day, next coordinator action is now waiting for next day's data arrival (2015-01-02). However, partition for 2015-01-01 may be re-created at anytime by data producers.Ax
Ideally we would want to add another column "version" to the partition schema which would identify which version of the same data is submitted by data producers. value_date=2015-01-01;id=xyz;version=? However, the problem is that coordinator does not know what version to wait for. Data producers might submit data in following manners for a day: - submit version=v1 of id=xyz at 7 PM - submit version=v2 of id=xyz at 9 PM - submit version=v3 of id=xyz at 9:30 PM. How to make coordinator process all these in an event-driven implementation?Ax
So what if you handling the workflow trigger on your own, without coordinator? Or create a separate action (Fs action) what checks the data availability and based on a marker call the processing or jumps to the end of the workflow?Dupre
coordinator provides us with the ability to add time dependency - e.g. don't run things until 10 PM. By FS action, do you mean polling for some hdfs file? If yes, then we wanted to avoid the polling since it would burden the NameNode as polling would happen every minute.Ax

© 2022 - 2024 — McMap. All rights reserved.