Can luigi rerun tasks when the task dependencies become out of date?
Asked Answered
P

4

18

As far as I know, a luigi.Target can either exist, or not. Therefore, if a luigi.Target exists, it wouldn't be recomputed.

I'm looking for a way to force recomputation of the task, if one of its dependencies is modified, or if the code of one of the tasks changes.

Pinnatiped answered 1/3, 2015 at 13:1 Comment(0)
F
21

One way you could accomplish your goal is by overriding the complete(...) method.

The documentation for complete is straightforward.

Simply implement a function that checks your constraint, and returns False if you want to recompute the task.

For example, to force recomputation when a dependency has been updated, you could do:

def complete(self):
    """Flag this task as incomplete if any requirement is incomplete or has been updated more recently than this task"""
    import os
    import time

    def mtime(path):
        return time.ctime(os.path.getmtime(path))

    # assuming 1 output
    if not os.path.exists(self.output().path):
        return False

    self_mtime = mtime(self.output().path) 

    # the below assumes a list of requirements, each with a list of outputs. YMMV
    for el in self.requires():
        if not el.complete():
            return False
        for output in el.output():
            if mtime(output.path) > self_mtime:
                return False

    return True

This will return False when any requirement is incomplete or any has been modified more recently than the current task or the output of the current task does not exist.

Detecting when code has changed is harder. You could use a similar scheme (checking mtime), but it'd be hit-or-miss unless every task has its own file.

Because of the ability to override complete, any logic you want for recomputation can be implemented. If you want a particular complete method for many tasks, I'd recommend sub-classing luigi.Task, implementing your custom complete there, and then inheriting your tasks from the sub-class.

Freemanfreemartin answered 27/3, 2015 at 15:34 Comment(3)
After the first execution of the task, complete() will run and the result would be True. In case of a change in the dependency, how will luigi know to rerun complete()?Sleep
@Sleep complete() is called every time a task is called. That's why the documentation recommends that no real work be done in it and that it be deterministic. To make sure it is called whenever once of the tasks dependent tasks is changed, modifying their complete() to check the dependency tree is necessary.Freemanfreemartin
I think you should use something like output.exists() instead of manually checking the path, because the target class might come with a customized definition of exists.Solemnity
I
4

I'm late to the game, but here's a mixin that improves the accepted answer to support multiple input / output files.

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.
    """

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return time.ctime(os.path.getmtime(path))

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True

To use it, you would just declare your class using, for example class MyTask(Mixin, luigi.Task).

Ictinus answered 1/3, 2015 at 13:1 Comment(1)
I like your mixin, but Luigi complains that the target file already exists. Comments don't allow me enough space to post the example, so I will put it in as a non-answer answer below.Shed
M
3

The above code works well for me except that I believe for proper timestamp comparison mtime(path) must return a float instead of a string ("Sat " > "Mon "...[sic]). Thus simply,

def mtime(path):
    return os.path.getmtime(path)

instead of:

def mtime(path):
    return time.ctime(os.path.getmtime(path))
Motorman answered 30/8, 2016 at 8:42 Comment(0)
S
0

Regarding the Mixin suggestion from Shilad Sen posted below, consider this example:

# Filename: run_luigi.py
import luigi
from MTimeMixin import MTimeMixin

class PrintNumbers(luigi.Task):

    def requires(self):
        wreturn []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))

class SquaredNumbers(MTimeMixin, luigi.Task):

    def requires(self):
        return [PrintNumbers()]

    def output(self):
        return luigi.LocalTarget("squares.txt")

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

if __name__ == '__main__':
    luigi.run()

where MTimeMixin is as in the post above. I run the task once using

luigi --module run_luigi SquaredNumbers

Then I touch file numbers_up_to_10.txt and run the task again. Then Luigi gives the following complaint:

  File "c:\winpython-64bit-3.4.4.6qt5\python-3.4.4.amd64\lib\site-packages\luigi-2.7.1-py3.4.egg\luigi\local_target.py", line 40, in move_to_final_destination
    os.rename(self.tmp_path, self.path)
FileExistsError: [WinError 183] Cannot create a file when that file already exists: 'squares.txt-luigi-tmp-5391104487' -> 'squares.txt'

This may just be a Windows problem, not an issue on Linux where "mv a b" may just delete the old b if it already exists and is not write-protected. We can fix this with the following patch to Luigi/local_target.py:

def move_to_final_destination(self):
    if os.path.exists(self.path):
        os.rename(self.path, self.path + time.strftime("_%Y%m%d%H%M%S.txt"))
    os.rename(self.tmp_path, self.path)

Also for completeness here is the Mixin again as a separate file, from the other post:

import os

class MTimeMixin:
    """
        Mixin that flags a task as incomplete if any requirement
        is incomplete or has been updated more recently than this task
        This is based on http://stackoverflow.com/a/29304506, but extends
        it to support multiple input / output dependencies.
    """

    def complete(self):
        def to_list(obj):
            if type(obj) in (type(()), type([])):
                return obj
            else:
                return [obj]

        def mtime(path):
            return os.path.getmtime(path)

        if not all(os.path.exists(out.path) for out in to_list(self.output())):
            return False

        self_mtime = min(mtime(out.path) for out in to_list(self.output()))

        # the below assumes a list of requirements, each with a list of outputs. YMMV
        for el in to_list(self.requires()):
            if not el.complete():
                return False
            for output in to_list(el.output()):
                if mtime(output.path) > self_mtime:
                    return False

        return True
Shed answered 15/12, 2017 at 21:50 Comment(1)
Thanks for this suggestion! I think you are right about the Windows problem. I've used this pretty regularly without file clobbering issues. For what it's worth, this overall approach makes me a little uneasy. I always feel like I'm twisting Luigi in ways it's not intended to be used.Ictinus

© 2022 - 2024 — McMap. All rights reserved.