I have a luigi workflow that downloads a bunch of large files via ftp and deposits them on s3.
I have one task that reads a list of files to download then creates a bunch of tasks that actually do the downloads
The idea is that the result of this workflow is a single file containing a list of downloads that have succeeded, with any failed downloads being reattempted on the next run the following day.
The problem is that if any of the download tasks fails then the successful download list is never created.
This is because the dynamically created tasks become requirements of the main task that creates them and compiles a list from their outputs.
Is there a way to make failures of these download task insignificant so that the list is compiled minus the output of the failed tasks?
Example code below, GetFiles is the task that we are calling from the command line.
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):
def run(self):
with self.input().open('r') as fileList:
files = json.load(fileList)
tasks = []
taskOutputs = []
for file in files:
task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
tasks.append(task)
taskOutputs.append(task.output())
yield tasks
successfulDownloads = MakeSuccessfulOutputList(taskOutputs)
with self.output().open('w') as out:
json.dump(successfulDownloads, out)
def output(self):
client = S3Client()
return S3Target(path='successfulDownloads.json', client=client)
complete
on each dependency specified inrequires
. If acomplete
returns True for a dependency, that dependency is not ran. – Chak