Using click.progressbar with multiprocessing in Python
Asked Answered
T

5

9

I have a huge list that I need to process, which takes some time, so I divide it into 4 pieces and multiprocess each piece with some function. It still takes a bit of time to run with 4 cores, so I figured I would add some progress bar to the function, so that it could tell me where each processor is at in processing the list.

My dream was to have something like this:

erasing close atoms, cpu0  [######..............................]  13%
erasing close atoms, cpu1  [#######.............................]  15%
erasing close atoms, cpu2  [######..............................]  13%
erasing close atoms, cpu3  [######..............................]  14%

with each bar moving as the loop in the function progresses. But instead, I get a continuous flow:

enter image description here

etc, filling my terminal window.

Here is the main python script that calls the function:

from eraseCloseAtoms import *
from readPDB import *
import multiprocessing as mp
from vectorCalc import *

prot, cell = readPDB('file')
atoms = vectorCalc(cell)

output = mp.Queue()

# setup mp to erase grid atoms that are too close to the protein (dmin = 2.5A)
cpuNum = 4
tasks = len(atoms)
rangeSet = [tasks / cpuNum for i in range(cpuNum)]
for i in range(tasks % cpuNum):
    rangeSet[i] += 1

rangeSet = np.array(rangeSet)

processes = []
for c in range(cpuNum):
    na, nb = (int(np.sum(rangeSet[:c] + 1)), int(np.sum(rangeSet[:c + 1])))
    processes.append(mp.Process(target=eraseCloseAtoms, args=(prot, atoms[na:nb], cell, 2.7, 2.5, output)))

for p in processes:
    p.start()

results = [output.get() for p in processes]

for p in processes:
    p.join()

atomsNew = results[0] + results[1] + results[2] + results[3]

Below is the function eraseCloseAtoms():

import numpy as np
import click


def eraseCloseAtoms(protein, atoms, cell, spacing=2, dmin=1.4, output=None):
    print 'just need to erase close atoms'

    if dmin > spacing:
        print 'the spacing needs to be larger than dmin'
        return

    grid = [int(cell[0] / spacing), int(cell[1] / spacing), int(cell[2] / spacing)]

    selected = list(atoms)
    with click.progressbar(length=len(atoms), label='erasing close atoms') as bar:
        for i, atom in enumerate(atoms):
            bar.update(i)
            erased = False
            coord = np.array(atom[6])

            for ix in [-1, 0, 1]:
                if erased:
                    break
                for iy in [-1, 0, 1]:
                    if erased:
                        break
                    for iz in [-1, 0, 1]:
                        if erased:
                            break
                        for j in protein:
                            protCoord = np.array(protein[int(j)][6])
                            trueDist = getMinDist(protCoord, coord, cell, vectors)
                            if trueDist <= dmin:
                                selected.remove(atom)
                                erased = True
                                break
    if output is None:
        return selected
    else:
        output.put(selected)
Trillium answered 18/8, 2015 at 18:46 Comment(1)
There is an example repo on this you might find interesting: github.com/aaren/multi_progressIddo
G
6

I see two issues in your code.

The first one explains why your progress bars are often showing 100% rather than their real progress. You're calling bar.update(i) which advances the bar's progress by i steps, when I think you want to be updating by one step. A better approach would be to pass the iterable to the progressbar function and let it do the updating automatically:

with click.progressbar(atoms, label='erasing close atoms') as bar:
    for atom in bar:
        erased = False
        coord = np.array(atom[6])

        # ...

However, this still won't work with multiple processes iterating at once, each with its own progress bar due to the second issue with your code. The click.progressbar documentation states the following limitation:

No printing must happen or the progress bar will be unintentionally destroyed.

This means that whenever one of your progress bars updates itself, it will break all of the other active progress bars.

I don't think there is an easy fix for this. It's very hard to interactively update a multiple-line console output (you basically need to be using curses or a similar "console GUI" library with support from your OS). The click module does not have that capability, it can only update the current line. Your best hope would probably be to extend the click.progressbar design to output multiple bars in columns, like:

CPU1: [######      ] 52%   CPU2: [###        ] 30%    CPU3: [########  ] 84%

This would require a non-trivial amount of code to make it work (especially when the updates are coming from multiple processes), but it's not completely impractical.

Gabelle answered 19/8, 2015 at 1:40 Comment(0)
H
7

accepted answer says it's impossible with click and it'd require 'non trivial amount of code to make it work'.

While it's true, there is another module with this functionality out of the box: tqdm https://github.com/tqdm/tqdm which does exatly what you need.

You can do nested progress bars in docs https://github.com/tqdm/tqdm#nested-progress-bars etc.

Hypertensive answered 28/5, 2016 at 19:3 Comment(1)
Yes tqdm has recently added support for parallel progress bars as asked by OP as demonstrated here and here, and without using curses nor a GUI, only standard control characters.Vouge
G
6

I see two issues in your code.

The first one explains why your progress bars are often showing 100% rather than their real progress. You're calling bar.update(i) which advances the bar's progress by i steps, when I think you want to be updating by one step. A better approach would be to pass the iterable to the progressbar function and let it do the updating automatically:

with click.progressbar(atoms, label='erasing close atoms') as bar:
    for atom in bar:
        erased = False
        coord = np.array(atom[6])

        # ...

However, this still won't work with multiple processes iterating at once, each with its own progress bar due to the second issue with your code. The click.progressbar documentation states the following limitation:

No printing must happen or the progress bar will be unintentionally destroyed.

This means that whenever one of your progress bars updates itself, it will break all of the other active progress bars.

I don't think there is an easy fix for this. It's very hard to interactively update a multiple-line console output (you basically need to be using curses or a similar "console GUI" library with support from your OS). The click module does not have that capability, it can only update the current line. Your best hope would probably be to extend the click.progressbar design to output multiple bars in columns, like:

CPU1: [######      ] 52%   CPU2: [###        ] 30%    CPU3: [########  ] 84%

This would require a non-trivial amount of code to make it work (especially when the updates are coming from multiple processes), but it's not completely impractical.

Gabelle answered 19/8, 2015 at 1:40 Comment(0)
Q
1

For anybody coming to this later. I created this which seems to work okay. It overrides click.ProgressBar fairly minimally, although I had to override an entire method for only a few lines of code at the bottom of the method. This is using \x1b[1A\x1b[2K to clear the progress bars before rewriting them so may be environment dependent.

#!/usr/bin/env python
import time
from typing import Dict

import click
from click._termui_impl import ProgressBar as ClickProgressBar, BEFORE_BAR
from click._compat import term_len


class ProgressBar(ClickProgressBar):
    def render_progress(self, in_collection=False):
        # This is basically a copy of the default render_progress with the addition of in_collection
        # param which is only used at the very bottom to determine how to echo the bar
        from click.termui import get_terminal_size

        if self.is_hidden:
            return

        buf = []
        # Update width in case the terminal has been resized
        if self.autowidth:
            old_width = self.width
            self.width = 0
            clutter_length = term_len(self.format_progress_line())
            new_width = max(0, get_terminal_size()[0] - clutter_length)
            if new_width < old_width:
                buf.append(BEFORE_BAR)
                buf.append(" " * self.max_width)
                self.max_width = new_width
            self.width = new_width

        clear_width = self.width
        if self.max_width is not None:
            clear_width = self.max_width

        buf.append(BEFORE_BAR)
        line = self.format_progress_line()
        line_len = term_len(line)
        if self.max_width is None or self.max_width < line_len:
            self.max_width = line_len

        buf.append(line)
        buf.append(" " * (clear_width - line_len))
        line = "".join(buf)
        # Render the line only if it changed.

        if line != self._last_line and not self.is_fast():
            self._last_line = line
            click.echo(line, file=self.file, color=self.color, nl=in_collection)
            self.file.flush()
        elif in_collection:
            click.echo(self._last_line, file=self.file, color=self.color, nl=in_collection)
            self.file.flush()


class ProgressBarCollection(object):
    def __init__(self, bars: Dict[str, ProgressBar], bar_template=None, width=None):
        self.bars = bars
        if bar_template or width:
            for bar in self.bars.values():
                if bar_template:
                    bar.bar_template = bar_template
                if width:
                    bar.width = width

    def __enter__(self):
        self.render_progress()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.render_finish()

    def render_progress(self, clear=False):
        if clear:
            self._clear_bars()
        for bar in self.bars.values():
            bar.render_progress(in_collection=True)

    def render_finish(self):
        for bar in self.bars.values():
            bar.render_finish()

    def update(self, bar_name: str, n_steps: int):
        self.bars[bar_name].make_step(n_steps)
        self.render_progress(clear=True)

    def _clear_bars(self):
        for _ in range(0, len(self.bars)):
            click.echo('\x1b[1A\x1b[2K', nl=False)


def progressbar_collection(bars: Dict[str, ProgressBar]):
    return ProgressBarCollection(bars, bar_template="%(label)s  [%(bar)s]  %(info)s", width=36)


@click.command()
def cli():
    with click.progressbar(length=10, label='bar 0') as bar:
        for i in range(0, 10):
            time.sleep(1)
            bar.update(1)
    click.echo('------')
    with ProgressBar(iterable=None, length=10, label='bar 1', bar_template="%(label)s  [%(bar)s]  %(info)s") as bar:
        for i in range(0, 10):
            time.sleep(1)
            bar.update(1)
    click.echo('------')
    bar2 = ProgressBar(iterable=None, length=10, label='bar 2')
    bar3 = ProgressBar(iterable=None, length=10, label='bar 3')
    with progressbar_collection({'bar2': bar2, 'bar3': bar3}) as bar_collection:
        for i in range(0, 10):
            time.sleep(1)
            bar_collection.update('bar2', 1)
        for i in range(0, 10):
            time.sleep(1)
            bar_collection.update('bar3', 1)


if __name__ == "__main__":
    cli()
Quarantine answered 13/7, 2020 at 15:7 Comment(0)
W
0

It may not be the same as your dream, but you can use imap_unordered with click.progressbar to integrate with multiprocessing.

import multiprocessing as mp
import click
import time


def proc(arg):
    time.sleep(arg)
    return True

def main():
    p = mp.Pool(4)
    args = range(4)
    results = p.imap_unordered(proc, args)
    with click.progressbar(results, length=len(args)) as bar:
        for result in bar:
            pass

if __name__ == '__main__:
    main()
Wasteful answered 12/5, 2019 at 15:12 Comment(0)
D
0

Something like this will work if you are okay with having one progress bar:

import click
import threading
import numpy as np

reallybiglist = []
numthreads = 4

def myfunc(listportion, bar):
    for item in listportion:
        # do a thing
        bar.update(1)

with click.progressbar(length=len(reallybiglist), show_pos=True) as bar:
    threads = []
    for listportion in np.split(reallybiglist, numthreads):
        thread = threading.Thread(target=myfunc, args=(listportion, bar))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()
Delphiadelphic answered 4/7, 2019 at 23:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.