Implementing luigi dynamic graph configuration
Asked Answered
W

1

12

I am new to luigi, came across it while designing a pipeline for our ML efforts. Though it wasn't fitted to my particular use case it had so many extra features I decided to make it fit.

Basically what I was looking for was a way to be able to persist a custom built pipeline and thus have its results repeatable and easier to deploy, after reading most of the online tutorials I tried to implement my serialization using the existing luigi.cfg configuration and command line mechanisms and it might have sufficed for the tasks' parameters but it provided no way of serializing the DAG connectivity of my pipeline, so I decided to have a WrapperTask which received a json config file that would then create all the task instances and connect all the input output channels of the luigi tasks (do all the plumbing).

I hereby enclose a small test program for your scrutiny:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1)


class SlowNode(TaskNode):

    def process(self):
        time.sleep(2)


# This WrapperTask builds all the nodes 
class All(luigi.WrapperTask):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        num_nodes = 513

        classes = TaskNode.__subclasses__()
        self.nodes = []
        for i in reversed(range(num_nodes)):
            cls = random.choice(classes)

            dependencies = random.sample(self.nodes, (num_nodes - i) // 35)

            obj = cls(i=i)
            if dependencies:
                obj.set_required(required=dependencies)
            else:
                obj.set_required(required=None)

            # delete existing output causing a build all
            if obj.output().exists():
                obj.output().remove()  

            self.nodes.append(obj)

    def requires(self):
        return self.nodes


if __name__ == '__main__':
    luigi.run()

So, basically, as is stated in the question's title, this focuses on the dynamic dependencies and generates a 513 node dependency DAG with p=1/35 connectivity probability, it also defines the All (as in make all) class as a WrapperTask that requires all nodes to be built for it to be considered done (I have a version which only connects it to heads of connected DAG components but I didn't want to over complicate).

Is there a more standard (Luigic) way of implementing this? Especially note the not so pretty complication with the TaskNode init and set_required methods, I only did it this way because receiving parameters in the init method clashes somehow with the way luigi registers parameters. I also tried several other ways but this was basically the most decent one (that worked)

If there isn't a standard way I'd still love to hear any insights you have on the way I plan to go before I finish implementing the framework.

Woollen answered 26/6, 2018 at 15:51 Comment(5)
related unanswered question #42563675Woollen
did you ever consider airflow over luigi? I use it for doing more or less exactly what you said in your question: airflow.apache.orgScarito
I took a short look at it but it seemed more centered around cmdline and not python, is that just a lazy first impression?Woollen
i woulddn't necessary say that. it comes with a web ui and is centered around scheduled or triggered execution of your DAGs. you can run DAGs from the command line, manually, if you like. it is a very large package, though, with a few dependencies that might be overkill for your situation. it requires at least a sqlite db, for instance.Scarito
Hmmm, sounds interesting and rumor has it that it is better maintained than luigi now, I'll probably look at it next round (or next job I have) but I'm kind of full steam ahead with luigi by now :/Woollen
S
2

I answered a similar question yesterday with a demo. I based that almost entirely off of the example in the docs.. In the docs, assigning dynamic dependencies by yeilding tasks seems like the way they prefer.

luigi.Config and dynamic dependencies can probably give you a pipeline of almost infinite flexibility. They also describe a dummy Task that calls multiple dependency chains here, which could give you even more control.

Semifinalist answered 2/11, 2018 at 20:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.