What to do when I don't want Luigi to output a file but show the task as complete?
Asked Answered
D

3

9

When looping over files with Luigi I do not what to be forced to save empty files just to show that the task was complete, and let the next task check if there are any rows in the txt, etc.

How can I have a task showing it succeeded (i.e. the run method worked as expected) without outputting a file? Am I missing something here?

Defunct answered 11/5, 2018 at 5:18 Comment(0)
N
9

You can override the complete function.

class LuigiTaskB(luigi.Task):
    def run(self):
        print "running task b"
        with self.output().open('w') as out_file:
            print >> out_file, "some text"

    def output(self):
        return luigi.LocalTarget("somefile")


class LuigiTaskA(luigi.Task):
    task_complete = False
    def requires(self):
        return LuigiTaskB()

    def run(self):
        print "running task a"
        self.task_complete = true

    def complete(self):
        # Make sure you return false when you want the task to run.
        # And true when complete

        return  self.task_complete
        # This will out put :
        #   running task b
        #   running task a
        # And this on the second time you'll run:
        #   running task a

The complete() function is looking at the output() function, by overriding complete() you can pass on any output and write your on complete condition.

Notice that if your complete function depends on the run function it may not be skipped..

Nusku answered 27/5, 2018 at 13:4 Comment(2)
It may not get skipped and what's worse, it breaks multiprocessing execution (spawned subprocess are going to fail because of Unfulfilled dependencies at run time). It's IMO much safer to follow luigi way of doing things by getting info about the execution from somewhere else (e.g. storage, API call, ...)Necropsy
Agree with @Necropsy : the mentionned error does occur when you start using multiple workers.Dday
S
6

Also you can use MockTarget like this:

class MyTask(luigi.Task):
    def requires(self):
        ...
    def run(self):
        ...
    def output(self):
        return MockFile("MyTask", mirror_on_stderr=True) # check that MyTask in quotes
Spatterdash answered 26/10, 2018 at 18:36 Comment(1)
I think it should be MockTarget instead of MockFileInspirational
D
1

First of all, I agree that when deciding to use luigi, you should stick to its mindset that every task has an input and output.

Still, if (like me) you end up needing some task with no such output, we can build up on GarfieldCat 's answer, using decorator for sake of conciseness.

import luigi
from luigi.mock import MockTarget
from functools import wraps
from luigi.mock import MockTarget
 
def nooutput(TaskClass):
    # Define some dummy in-memory output
    def output(self):
        return MockTarget(str(self))
    TaskClass.output = output
    
    # Make sure to write sth to the dummmy output
    old_run = TaskClass.run
    @wraps(TaskClass.run)
    def decorated_run(self, *args, **kwargs):
        old_run(self, *args, **kwargs)
        with self.output().open("w") as f:
            f.write(f"Finished task")
            
    TaskClass.run = decorated_run
    return TaskClass
#-------------------------
class A(luigi.Task):
    def output(self):
        return luigi.LocalTarget("./A.temp")
        
    def run(self):
        print("Saving A")
        with self.output().open("w") as f:
            f.write("A secret message from mr. A")
        print("Saved A")

@nooutput
class B(luigi.Task):
    # time = luigi.DateSecondParameter() # uncomment for multiple runs
    def requires(self):
        return A()
        
    def run(self):
        print("Starging B..")
        with self.input().open("r") as f:
            print("\n".join(f.readlines()))
        print("Finished B")
        
class C(luigi.WrapperTask):
    def requires(self):
        # return B(datetime.now())
        return B()

class D(luigi.WrapperTask):
    def requires(self):
        # return B(datetime.now())  # uncomment for multiple runs
        return B()

luigi.build([C(),D()],workers=3)

The commented solution will only run B() once for C() and D(). If instead, you would have wanted it to run each time, add a time-dependant parameter (but at that stage you might as well consider using another tool)

Hope this helps!

Dday answered 13/4, 2022 at 14:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.