Workers dying early due to uneven work distribution in Luigi (2.6.1)
Asked Answered
C

1

6

We are trying to run a simple pipeline distributed on a docker swarm cluster. The luigi workers are deployed as replicated docker services. They start successfully and after a few seconds of asking for work to luigi-server, they begin to die due to no work was assigned to them and all tasks end up assigned to a single worker.

We had to set keep_alive=True in luigi.cfg of our workers to force them not to die, but keeping workers around after the pipeline is done seems to be a bad idea. Is there a way to control the work distribution ?

Our test pipeline:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i, self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()
Caithness answered 8/5, 2017 at 15:7 Comment(6)
Doesn't seem to be a docker question?Recruit
Are you starting RunAllTasks on each node?Marienthal
@Marienthal Yes, we encapsulated the pipeline as a docker service, so the swarm starts a replica on each node (round robin).Caithness
@Recruit We think the workers are dying due to luigi server not giving them work because we see them in the workers tab on the gui doing nothing. Not seeing anything suspicious on docker's side.Caithness
According to luigi.readthedocs.io/en/stable/central_scheduler.html: > Note that the central scheduler does not execute anything for you or help you with job parallelization. For running tasks periodically, the easiest thing to do is to trigger a Python script from cron or from a continuously running process. There is no central process that automatically triggers jobs. It sounds like the part of running the workers is not part of luigi and you would need something to actually schedule the containers to run periodically.Clinician
Unrelated: You know, I came this this post from the newsletter thinking it was some statistical analysis on the life time of a programmer based on work load ... I was so wrong (and relieved!), lol.Enthrall
A
1

Your issue is the result of yielding a single requirement at a time, instead you want to yield all of them at once, as follows:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i, self.sleep_time))
    yield reqs
Amery answered 13/6, 2017 at 3:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.