How do you pass multiple arguments to a Luigi subtask?
Asked Answered
D

4

5

I have a Luigi task that requires a subtask. The subtask depends on parameters passed through by the parent task (i.e. the one that is doing the requireing). I know you can specify a parameter that the subtask can use by setting...

def requires(self):
    return subTask(some_parameter)

...then on the subtask, receiving the parameter by setting...

x = luigi.Parameter()

That only appears to let you pass through one parameter though. What is the best way to send through an arbitrary number of parameters, of whatever types I want? Really I want something like this:

class parentTask(luigi.Task):

    def requires(self):
        return subTask({'file_postfix': 'foo',
                        'file_content': 'bar'
        })

    def run(self):
        return


class subTask(luigi.Task):
    params = luigi.DictParameter()

    def output(self):
        return luigi.LocalTarget("file_{}.csv".format(self.params['file_postfix']))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.params['file_content'])

As you can see I tried using luigi.DictParameter instead of a straight luigi.Parameter but I get TypeError: unhashable type: 'dict' from somewhere deep inside Luigi when I run the above.

Running Python 2.7.11, Luigi 2.1.1

Dementia answered 24/8, 2016 at 12:13 Comment(0)
M
5

What is the best way to send through an arbitrary number of parameters, of whatever types I want?

The best way is to use named parameters, e.g.,

#in requires
return MySampleSubTask(x=local_x, y=local_y)

class MySampleSubTask(luigi.Task):
    x = luigi.Parameter()
    y = luigi.Parameter()
Misname answered 15/9, 2016 at 13:34 Comment(0)
T
4

What is the best way to send through an arbitrary number of parameters, of whatever types I want?

you could follow this example. you will have one placeholder to define all parameters needed to pass(ParameterCollector). This will avoid defining parameters on every sincgle task if you need to pass a paramter to subtasks in the case of many subtasks.

class ParameterCollector(object):
    param1 = luigi.Parameter()
    param2 = luigi.Parameter()

    def collect_params(self):
        return {'param1': self.param1, 'param2': self.param2}


class TaskB(ParameterCollector, luigi.Task):
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('/tmp/task1_success')

    def run(self):
        with self.output().open('w') as f:
            f.write(self.param1)


class TaskA(ParameterCollector, luigi.Task):
    def requires(self):
        a = TaskB(**self.collect_params())
        print(a)
        return a

    def output(self):
        return luigi.LocalTarget('/tmp/task2_success')

    def run(self):
        with self.output().open('w') as f:
            f.write(str([self.param1, self.param2]))


if __name__ == '__main__':
    luigi.run()
Tupler answered 1/11, 2017 at 12:13 Comment(0)
D
0

Ok, so I found that this works as expected in python 3.5 (and the problem is still there in 3.4).

Don't have the time to get to the bottom of this today so no further details.

Dementia answered 24/8, 2016 at 14:27 Comment(0)
H
0

you can use the following.

class DownloadFile(luigi.Task):
    """This is file download task used to get
     the file from file parser"""
    id = luigi.Parameter(default=uuid.uuid4().__str__(), positional=True)
    #master_task_id = luigi.Parameter(positional=True)
    pipeline_data = luigi.DictParameter(default={}, significant=False,
                                    
    visibility=luigi.parameter.ParameterVisibility.PRIVATE,
                                    positional=True)
def output(self):
    # add the file output here
    return MockTarget("FileDownload", mirror_on_stderr=True)

def run(self):
    time.sleep(3)
    parsed_data = self.pipeline_data['pipeline_data'] #this helps in parsing the dict
Huntingdon answered 18/11, 2021 at 13:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.