How to reset luigi task status?
Asked Answered
B

6

15

Currently, I have a bunch of luigi tasks queued together, with a simple dependency chain( a -> b -> c -> d). d gets executed first, and a at the end. a is the task that gets triggered.

All the targets except a return a luigi.LocalTarget() object and have a single generic luigi.Parameter() which is a string (containing a date and a time). Runs on a luigi central server (which has history enabled).

The problem is that, when I rerun the said task a, luigi checks the history and sees if that particular task has been run before, if it had had a status of DONE, it doesn't run the tasks (d in this case) and I can't have that, changing the string isn't helping (added a random microsecond to it). How do I force run a task ?

Bronny answered 5/1, 2016 at 13:50 Comment(5)
can you show a little more detail about each task please? how are you building the LocalTarget filename for each one?Thesda
@Thesda The LocalTarget directory name and path is based on the date time parameter. Each of the tasks simply read from a file(csv and pandas). Do operation X and push it back, the last task does a push to database(not using luigi, but pandas).Bronny
Does luigi use the output folder(ifexists) to schedule the task ?Bronny
So should task a be running every time that task b runs? Should they perhaps be contained in the same task? Is task a outputting to a luigi.LocalTarget? If not, you can write to database and then write a blank file called something like task_a_ran_2016-01-06-12:12.log as your LocalTarget.Club
@CharlieHaley Doing exactly that now, thanksBronny
T
16

First a comment: Luigi tasks are idempotent. if you run a task with the same parameter values, no matter how many times you run it, it must always return the same outputs. So it doesn't make sense to run it more than once. This makes Luigi powerful: if you have a big task that makes a lot of things an takes a lot of time and it fails somewhere, you'll have to run it again from the beginning. If you split it into smaller tasks, run it and it fails, you'll only have to run the rest of the tasks in the pipeline.

When you run a task Luigi checks out the outputs of that task to see if they exist. If they don't, Luigi checks out the outputs of the tasks it depends on. If they exists, then it will only run the current task and generate the output Target. If the dependencies outputs doesn't exists, then it will run that tasks.

So, if you want to rerun a task you must delete its Target outputs. And if you want to rerun the whole pipeline you must delete all the outputs of all the tasks that tasks depends on in cascade.

There's an ongoing discussion in this issue in Luigi repository. Take a look at this comment since it will point you to some scripts for getting the output targets of a given task and removing them.

Thesda answered 6/1, 2016 at 17:21 Comment(3)
Aha, it reads up the chain on the Target, so I'll have to remove the outputs along the chain to make it schedule it properly.Bronny
And that issue was exactly what I was looking for, thanks :)Bronny
@Thesda While I have a requirement for updating the latest data file. and I didn't want to generate the file by timestamp, and it was generated with latest suffix like 'foo_latest.txt'. So could luigi developers help me out this requirements?. I think that it is need to set a 'dont_filter=False' flag as luigi.Task init parameter for re-running a task for multiple times.Outgrowth
M
6

I typically do this by overriding complete():

class BaseTask(luigi.Task):

    force = luigi.BoolParameter()

    def complete(self):
        outputs = luigi.task.flatten(self.output())
        for output in outputs:
            if self.force and output.exists():
                output.remove()
        return all(map(lambda output: output.exists(), outputs))


class MyTask(BaseTask):
    def output(self):
        return luigi.LocalTarget("path/to/done/file.txt")

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write('Complete')

When you run the task, the output file is created as expected. Upon instantiating the class with force=True, the output file will still exist until complete() is called.

task = MyTask()
task.run()
task.complete()
# True

new_task = MyTask(force=True)
new_task.output().exists()
# True
new_task.complete()
# False
new_task.output().exists()
# False
Motorize answered 14/2, 2019 at 18:15 Comment(0)
I
2

An improvement of @cangers BaseTask to raise an error if the target can't be removed.

class BaseTask(luigi.Task):
    force = luigi.BoolParameter(significant=False, default=False)

    def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    if self.force is True:
        outputs = luigi.task.flatten(self.output())
        for out in outputs:
            if out.exists():
                try:
                    out.remove()
                except AttributeError:
                    raise NotImplementedError


      

Intramuscular answered 27/3, 2021 at 6:0 Comment(0)
E
0

d6tflow allows you to reset and force rerun of tasks, see details at https://d6tflow.readthedocs.io/en/latest/control.html#manually-forcing-task-reset-and-rerun.

# force execution including downstream tasks
d6tflow.run([TaskTrain()],force=[TaskGetData()])

# reset single task
TaskGetData().invalidate()

# reset all downstream task output
d6tflow.invalidate_downstream(TaskGetData(), TaskTrain())

# reset all upstream task input
d6tflow.invalidate_upstream(TaskTrain())

Caveat: it only works for d6tflow tasks and targets, which are modified local targets, but not for all luigi targets. Should take you a long way and is optimized for data science workflows. Works well for local worker, haven't tested on central server.

Exocrine answered 20/1, 2019 at 5:0 Comment(0)
T
0

I use this to forcibly regenerate output without needing to remove it first, and allow you to select which types to regenerate. In our use case, we want the old generated files to continue to exist until they are rewritten with fresh versions.

# generation.py
class ForcibleTask(luigi.Task):
    force_task_families = luigi.ListParameter(
        positional=False, significant=False, default=[]
    )

    def complete(self):
        print("{}: check {}".format(self.get_task_family(), self.output().path))
        if not self.output().exists():
            self.oldinode = 0  # so any new file is considered complete
            return False
        curino = pathlib.Path(self.output().path).stat().st_ino
        try:
            x = self.oldinode
        except AttributeError:
            self.oldinode = curino

        if self.get_task_family() in self.force_task_families:
            # only done when file has been overwritten with new file
            return self.oldinode != curino

        return self.output().exists()

Example usage

class Generate(ForcibleTask):
    date = luigi.DateParameter()
    def output(self):
        return luigi.LocalTarget(
            self.date.strftime("generated-%Y-%m-%d")
        )

invocation

luigi --module generation Generate '--Generate-force-task-families=["Generate"]'
Teenyweeny answered 12/9, 2019 at 22:8 Comment(0)
B
0

you can make use of in-memory output storage and they get purged every time. I have a solution for this, don't know whether that suits your need or not.

import uuid
class taskname(luigi.Task):
    id = luigi.Parameter(default=uuid.uuid5(uuid.NAMESPACE_DNS, random().__str__()).__str__(), positional=True) # this helps in getting a new id everytime it is executed.

   def output(self):
    # This is just to ensure the task is complete
      return luigi.mock.MockTarget(f'taskname-{self.id}')
   def run(self):
     #do your process here
     # if your process is successful then run this
     self.output().open('w').close()  #persists the object in memory for the scheduler to understand the task is complete.

We use the id created in the class to name the mock target in the output. So, the central scheduler cannot find this output even though you run the same DAG twice at the same time. Only that current batch can access them in memory.

Bangup answered 22/2, 2022 at 7:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.