Regarding the Mixin suggestion from Shilad Sen posted below, consider this example:
# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin
class PrintNumbers(luigi.Task):
def requires(self):
wreturn []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class SquaredNumbers(MTimeMixin, luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
where MTimeMixin is as in the post above. I run the task once using
luigi --module run_luigi SquaredNumbers
Then I touch file numbers_up_to_10.txt and run the task again. Then Luigi gives the following complaint:
File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'
This may just be a Windows problem, not an issue on Linux where "mv a b" may just delete the old b if it already exists and is not write-protected. We can fix this with the following patch to Luigi/local_target.py:
def move_to_final_destination(self):
if os.path.exists(self.path):
os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
os.rename(self.tmp_path, self.path)
Also for completeness here is the Mixin again as a separate file, from the other post:
import os
class MTimeMixin:
"""
Mixin that flags a task as incomplete if any requirement
is incomplete or has been updated more recently than this task
This is based on http://stackoverflow.com/a/29304506, but extends
it to support multiple input / output dependencies.
"""
def complete(self):
def to_list(obj):
if type(obj) in (type(()), type([])):
return obj
else:
return [obj]
def mtime(path):
return os.path.getmtime(path)
if not all(os.path.exists(out.path) for out in to_list(self.output())):
return False
self_mtime = min(mtime(out.path) for out in to_list(self.output()))
# the below assumes a list of requirements, each with a list of outputs. YMMV
for el in to_list(self.requires()):
if not el.complete():
return False
for output in to_list(el.output()):
if mtime(output.path) > self_mtime:
return False
return True