First of all, I agree that when deciding to use luigi, you should stick to its mindset that every task has an input and output.
Still, if (like me) you end up needing some task with no such output, we can build up on GarfieldCat 's answer, using decorator for sake of conciseness.
import luigi
from luigi.mock import MockTarget
from functools import wraps
from luigi.mock import MockTarget
def nooutput(TaskClass):
# Define some dummy in-memory output
def output(self):
return MockTarget(str(self))
TaskClass.output = output
# Make sure to write sth to the dummmy output
old_run = TaskClass.run
@wraps(TaskClass.run)
def decorated_run(self, *args, **kwargs):
old_run(self, *args, **kwargs)
with self.output().open("w") as f:
f.write(f"Finished task")
TaskClass.run = decorated_run
return TaskClass
#-------------------------
class A(luigi.Task):
def output(self):
return luigi.LocalTarget("./A.temp")
def run(self):
print("Saving A")
with self.output().open("w") as f:
f.write("A secret message from mr. A")
print("Saved A")
@nooutput
class B(luigi.Task):
# time = luigi.DateSecondParameter() # uncomment for multiple runs
def requires(self):
return A()
def run(self):
print("Starging B..")
with self.input().open("r") as f:
print("\n".join(f.readlines()))
print("Finished B")
class C(luigi.WrapperTask):
def requires(self):
# return B(datetime.now())
return B()
class D(luigi.WrapperTask):
def requires(self):
# return B(datetime.now()) # uncomment for multiple runs
return B()
luigi.build([C(),D()],workers=3)
The commented solution will only run B()
once for C()
and D()
. If instead, you would have wanted it to run each time, add a time-dependant parameter (but at that stage you might as well consider using another tool)
Hope this helps!
Unfulfilled dependencies at run time
). It's IMO much safer to follow luigi way of doing things by getting info about the execution from somewhere else (e.g. storage, API call, ...) – Necropsy