I am new to luigi, came across it while designing a pipeline for our ML efforts. Though it wasn't fitted to my particular use case it had so many extra features I decided to make it fit.
Basically what I was looking for was a way to be able to persist a custom built pipeline and thus have its results repeatable and easier to deploy, after reading most of the online tutorials I tried to implement my serialization using the existing luigi.cfg
configuration and command line mechanisms and it might have sufficed for the tasks' parameters but it provided no way of serializing the DAG connectivity of my pipeline, so I decided to have a WrapperTask which received a json config file
that would then create all the task instances and connect all the input output channels of the luigi tasks (do all the plumbing).
I hereby enclose a small test program for your scrutiny:
import random
import luigi
import time
import os
class TaskNode(luigi.Task):
i = luigi.IntParameter() # node ID
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.required = []
def set_required(self, required=None):
self.required = required # set the dependencies
return self
def requires(self):
return self.required
def output(self):
return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))
def run(self):
with self.output().open('w') as outfile:
outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
self.process()
def process(self):
raise NotImplementedError(self.__class__.__name__ + " must implement this method")
class FastNode(TaskNode):
def process(self):
time.sleep(1)
class SlowNode(TaskNode):
def process(self):
time.sleep(2)
# This WrapperTask builds all the nodes
class All(luigi.WrapperTask):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
num_nodes = 513
classes = TaskNode.__subclasses__()
self.nodes = []
for i in reversed(range(num_nodes)):
cls = random.choice(classes)
dependencies = random.sample(self.nodes, (num_nodes - i) // 35)
obj = cls(i=i)
if dependencies:
obj.set_required(required=dependencies)
else:
obj.set_required(required=None)
# delete existing output causing a build all
if obj.output().exists():
obj.output().remove()
self.nodes.append(obj)
def requires(self):
return self.nodes
if __name__ == '__main__':
luigi.run()
So, basically, as is stated in the question's title, this focuses on the dynamic dependencies and generates a 513 node dependency DAG
with p=1/35 connectivity probability
, it also defines the All (as in make all) class as a WrapperTask that requires all nodes to be built for it to be considered done (I have a version which only connects it to heads of connected DAG components but I didn't want to over complicate).
Is there a more standard (Luigic) way of implementing this? Especially note the not so pretty complication with the TaskNode init and set_required methods, I only did it this way because receiving parameters in the init method clashes somehow with the way luigi registers parameters. I also tried several other ways but this was basically the most decent one (that worked)
If there isn't a standard way I'd still love to hear any insights you have on the way I plan to go before I finish implementing the framework.