Make failure of a dynamic Luigi task non critical
Asked Answered
S

2

1

I have a luigi workflow that downloads a bunch of large files via ftp and deposits them on s3.

I have one task that reads a list of files to download then creates a bunch of tasks that actually do the downloads

The idea is that the result of this workflow is a single file containing a list of downloads that have succeeded, with any failed downloads being reattempted on the next run the following day.

The problem is that if any of the download tasks fails then the successful download list is never created.

This is because the dynamically created tasks become requirements of the main task that creates them and compiles a list from their outputs.

Is there a way to make failures of these download task insignificant so that the list is compiled minus the output of the failed tasks?

Example code below, GetFiles is the task that we are calling from the command line.

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)
Spirituality answered 8/1, 2018 at 12:34 Comment(0)
V
0

THIS ANSWER IS PROBABLY INCORRECT - CHECK THE COMMENTS

I have read the documentation a few times, and I found no indication of such things as non-critical failures. That being said, this behavior could be easily achieved by overriding Task.complete method in DownloadFileFromFtp, while still being able to use DownloadFileFromFtp.output in GetFiles.run.

By overring with return True, the Task DownloadFileFromFtp will succeed regardless of the success of the download.

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

Notice however, that you could also use more complex logic in that complete method - like failing only if the task met a specific network failure at runtime.

Vintner answered 23/1, 2018 at 15:47 Comment(5)
This will prevent the task from running even if it never ran. When a task is scheduled, luigi first check if its dependencies are completed by calling complete on each dependency specified in requires. If a complete returns True for a dependency, that dependency is not ran.Chak
The only way I see to implement such optional dependencies, would be to make sure the optional task never fails, by catching all exceptions in the run method(you still probably want to log those exceptions, or to signify the failure in some way in the target, and maybe use a target that is aware of failures and return False from its exists method if the task failed).Chak
I didn't know about that, is this behavior documented somewhere? Anyway, this probably could be fixed by having the run method to always set an instance boolean to be True, and then the complete method to return that value, indicating whether the run method was already called or not. I'll try this later, and edit my awnser if it is the caseVintner
Setting an in-memory variable to keep track of whether or not a task was run will work, until you try to use multiple Luigi workers. Since workers do not share memory, they will each try to run the task. Luigi expects multiple workers to coordinate by checking a Target they can all access, like a file or database.Siding
@Siding Of course, that makes sense. I edited the answer to add a disclaimerVintner
M
2

Several years later, you must have found the answer, but here is something that can help.

class DownloadFileFromFtp(luigi.Task):
      sourceUrl = luigi.Parameter()

      def run(self):
           with self.output().open('w') as output:
             WriteFileFromFtp(sourceUrl, output)
      
      def on_failure(self, exception):
          #If the task fails for any reason, 
          #then just indicate the task as completed.
          #From the docs, exception is a string, so you can easily.

          if "FileNotFound" in exception:
              return self.complete(ignore=True)
          return self.complete(ignore=False)

      def complete(self, ignore=False):
          return ignore

      def output(self):
          client = S3Client()
          return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
Monarchism answered 25/3, 2021 at 7:27 Comment(0)
V
0

THIS ANSWER IS PROBABLY INCORRECT - CHECK THE COMMENTS

I have read the documentation a few times, and I found no indication of such things as non-critical failures. That being said, this behavior could be easily achieved by overriding Task.complete method in DownloadFileFromFtp, while still being able to use DownloadFileFromFtp.output in GetFiles.run.

By overring with return True, the Task DownloadFileFromFtp will succeed regardless of the success of the download.

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

Notice however, that you could also use more complex logic in that complete method - like failing only if the task met a specific network failure at runtime.

Vintner answered 23/1, 2018 at 15:47 Comment(5)
This will prevent the task from running even if it never ran. When a task is scheduled, luigi first check if its dependencies are completed by calling complete on each dependency specified in requires. If a complete returns True for a dependency, that dependency is not ran.Chak
The only way I see to implement such optional dependencies, would be to make sure the optional task never fails, by catching all exceptions in the run method(you still probably want to log those exceptions, or to signify the failure in some way in the target, and maybe use a target that is aware of failures and return False from its exists method if the task failed).Chak
I didn't know about that, is this behavior documented somewhere? Anyway, this probably could be fixed by having the run method to always set an instance boolean to be True, and then the complete method to return that value, indicating whether the run method was already called or not. I'll try this later, and edit my awnser if it is the caseVintner
Setting an in-memory variable to keep track of whether or not a task was run will work, until you try to use multiple Luigi workers. Since workers do not share memory, they will each try to run the task. Luigi expects multiple workers to coordinate by checking a Target they can all access, like a file or database.Siding
@Siding Of course, that makes sense. I edited the answer to add a disclaimerVintner

© 2022 - 2024 — McMap. All rights reserved.