How to parallelize a training loop ever samples of a batch when CPU is only available in pytorch?
Asked Answered
S

1

19

I want to parallelize over single examples or batch of example (in my situation is that I only have cpus, I have up to 112). I tried it but I get a bug that the losses cannot have the gradient out of separate processes (which entirely ruins my attempt). I still want to do it and it essential that after the multiproessing happens that I can do an optimizer step. How do I get around it? I made a totally self contained example:


import torch
import torch.nn as nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import Dataset, DataLoader

from torch.multiprocessing import Pool

class SimpleDataSet(Dataset):

    def __init__(self, Din, num_examples=23):
        self.x_dataset = [torch.randn(Din) for _ in range(num_examples)]
        # target function is x*x
        self.y_dataset = [x**2 for x in self.x_dataset]

    def __len__(self):
        return len(self.x_dataset)

    def __getitem__(self, idx):
        return self.x_dataset[idx], self.y_dataset[idx]

def get_loss(args):
    x, y, model = args
    y_pred = model(x)
    criterion = nn.MSELoss()
    loss = criterion(y_pred, y)
    return loss

def get_dataloader(D, num_workers, batch_size):
    ds = SimpleDataSet(D)
    dl = DataLoader(ds, batch_size=batch_size, num_workers=num_workers)
    return dl

def train_fake_data():
    num_workers = 2
    Din, Dout = 3, 1
    model = nn.Linear(Din, Dout).share_memory()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.1)

    batch_size = 2
    num_epochs = 10
    # num_batches = 5
    num_procs = 5
    dataloader = get_dataloader(Din, num_workers, batch_size)
    scheduler = StepLR(optimizer, step_size=1, gamma=0.7)
    for epoch in range(num_epochs):
        for _, batch in enumerate(dataloader):
            batch = [(torch.randn(Din), torch.randn(Dout), model) for _ in batch]
            with Pool(num_procs) as pool:
                optimizer.zero_grad()

                losses = pool.map(get_loss, batch)
                loss = torch.mean(losses)
                loss.backward()

                optimizer.step()
            # scheduler
            scheduler.step()


if __name__ == '__main__':
    # start = time.time()
    # train()
    train_fake_data()
    # print(f'execution time: {time.time() - start}')

Error:

Traceback (most recent call last):
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3427, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-2-ea57e03ba088>", line 1, in <module>
    runfile('/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py', wdir='/Users/brando/ML4Coq/playground/multiprocessing_playground')
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 95, in <module>
    train_fake_data()
  File "/Users/brando/ML4Coq/playground/multiprocessing_playground/multiprocessing_cpu_pytorch.py", line 83, in train_fake_data
    losses = pool.map(get_loss, batch)
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 290, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/brando/anaconda3/envs/coq_gym/lib/python3.7/multiprocessing/pool.py", line 683, in get
    raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '[tensor(0.5237, grad_fn=<MseLossBackward>)]'. Reason: 'RuntimeError('Cowardly refusing to serialize non-leaf tensor which requires_grad, since autograd does not support crossing process boundaries.  If you just want to transfer the data, call detach() on the tensor before serializing (e.g., putting it on the queue).')'

I am sure I want to do this. How should I be doing this?


New attempt using DDP

"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

import os

num_epochs = 5
batch_size = 8
Din, Dout = 10, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_epochs)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        self.net1 = nn.Linear(Din, Din)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)

    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)

    for batch_idx, batch in enumerate(data):
        x, y = batch
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(x)
        labels = y.to(rank) if torch.cuda.is_available() else y
        # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
        loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
        optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # args
    world_size = mp.cpu_count()
    mp.spawn(run_parallel_training_loop, args=(world_size,), nprocs=world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')

it seems it works but my question is in line 74 do I need to do this

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()

or

    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel()

for it to work properly in multiple CPUs?


Serial is faster than parallel even if I have 112 cpu cores?

My current issue is that the cpu parallel job is slower than the serially running one when only cpus are available.


I want to know how to set up python and parallel cpus. e.g. if I have X cpus how many processes should I be running...X? or what? How do I choose this number, even if its heursitics rough.


related links from research:

Sizable answered 16/2, 2021 at 14:18 Comment(8)
I'm not sure how would "share_memory" form a difference in terms of properly working solution. Shared memory will just enable IPC. Hence, less copies, and relatively faster runtime. If you have tried both (which I think you've done), you would have noticed that already.Destructionist
@AndrewNaguib I guess what I was worried is that the parameters were NOT synchronized across processes. I guess I am unsure what the point of the share_memory is if they are always synchronized...Sizable
As far as I can tell, DDP will sync regardless of sharing the object through shared mem.Destructionist
@AndrewNaguib yes but my current issue is that the cpu parallel job is slower than the serially running one when only cpus are available.Sizable
am I missing something? I added detach as the error suggested loss = criterion(y_pred, y).detach() which removed that error message, and gave a new one complaining that with loss = torch.mean(losses), losses must be a tensor rather than a list (pool.map is returning a list of tensors from the batch).Plus
regarding DDP, use your second option (no reason to tell it to share memory). I would just take a template from somewhere (like here) and keep the skeleton.Polston
I want to know how to set up python and parallel cpus. e.g. if I have X cpus how many processes should I be running...X? or what? How do I choose this number, even if its heursitics rough.Sizable
to give further context, I am not only interested in DDP but with the torchrun launching utility. Details here: github.com/learnables/learn2learn/issues/…Sizable
M
4

Torch will use multiple CPU to parallelize operations, so your serial is maybe using multi-core vectorization.

Take this simple example

import torch
c = 0;
for i in range(10000):
    A = torch.randn(1000, 1000, device='cpu');
    B = torch.randn(1000, 1000, device='cpu');
    c += torch.sum(A @ B)

No code is used to parallelize, however 80% of 12 CPUs with the default configuration.

enter image description here

You can use torch.set_num_threads to set intraop parallelism on CPU. In particular if you are running multiple process and you want each process to use a single CPU you may want to set in each process the intraop parallelism to 1.

However, parallelizing the operations has a cost, I am unable go into the implementation details but we can run a quick experiment that shows the overhead of using multiple threads.

import matplotlib.pyplot as plt
import numpy as np
import torch;
import time;
A = torch.randn(1000, 1000, device='cpu');
B = torch.randn(1000, 1000, device='cpu');
funcs = {
    'sin': lambda a,b: torch.sin(A),
    'tanh': lambda a,b: torch.tanh(A),
    'log': lambda a,b: torch.log(A),
    'matmul': lambda a,b: A @ B.T
}
t = np.zeros(20)
for k,f in funcs.items():
    for i in range(1, len(t) + 1):
        torch.set_num_threads(i)
        c = 0;
        t0 = time.time();
        for _ in range(100):
            f(A,B)
        tf = time.time()
        t[i-1] = (tf - t0)*i;
    plt.plot(np.arange(1, len(t)+1), t, '-o', label=k)
plt.xlabel('Number of threads')
plt.legend()
plt.ylabel('Core x time')

The operations tends to run faster with parallelism enter image description here

But if we take the total CPU time, by multiplying by the number of threads, we see that the single thread version is more efficient.

enter image description here

If you are able to parallelize your experiment at a higher level, by running independent processes, you should try that with a single core for each process, otherwise each process will try to use all the CPUs and all of them will run very slowly because your system is overloaded.

Tweaking DDP example

I modified hyperparameters of your example scripts intentionally in a way that weights in favor of multi process.

  • comparably less initialization overhead
  • comparably less communication between processes
"""
Based on: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

Note: as opposed to the multiprocessing (torch.multiprocessing) package, processes can use
different communication backends and are not restricted to being executed on the same machine.
"""
import torch
from torch import nn, optim
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import argparse
import os

# More than one epoch so that the initialization is less significant
# than compared to the model processing time
num_epochs = 10
# for the experiment select a number that has a lot of divisors
# as I want to test with equal number of batches
num_batches = 16*9*5
# Uses a larger batch so that more work is done in each process
# between two gradient synchronizations
# apparently the intraop optimization is not helping 
# (at least not too much) in the batch dimension
batch_size = 10000
# Use smaller dimensions, so that the intraop parallelization becomes less 
# helpful
Din, Dout = 3, 5
data_x = torch.randn(batch_size, Din)
data_y = torch.randn(batch_size, Dout)
data = [(i*data_x, i*data_y) for i in range(num_batches)]

class OneDeviceModel(nn.Module):
    """
    Toy example for a model ran in parallel but not distributed accross gpus
    (only processes with their own gpu or hardware)
    """
    def __init__(self):
        super().__init__()
        # -- Use more layers
        self.net = [nn.Linear(Din, Din) for _ in range(10)]
        # -- Bob: use more complex activation  
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(Din, Dout)

    def forward(self, x):
      # apply the 10 layers sequentially
      for i in range(10):
        x = self.net[i](x)
        x = self.sigmoid(x)
        x = self.tanh(x)
        x = self.relu(x)
      return self.net2(x)

def setup_process(rank, world_size, backend='gloo'):
    """
    Initialize the distributed environment (for each process).

    gloo: is a collective communications library (https://github.com/facebookincubator/gloo). My understanding is that
    it's a library/API for process to communicate/coordinate with each other/master. It's a backend library.
    """
    # set up the master's ip address so this child process can coordinate
    # os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # - use NCCL if you are using gpus: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends
    if torch.cuda.is_available():
        backend = 'nccl'
    # Initializes the default distributed process group, and this will also initialize the distributed package.
    dist.init_process_group(backend, rank=rank, world_size=world_size)

def cleanup():
    """ Destroy a given process group, and deinitialize the distributed package """
    dist.destroy_process_group()

def run_parallel_training_loop(rank, world_size):
    """
    Distributed function to be implemented later.

    This is the function that is actually ran in each distributed process.

    Note: as DDP broadcasts model states from rank 0 process to all other processes in the DDP constructor,
    you don’t need to worry about different DDP processes start from different model parameter initial values.
    """
    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    setup_process(rank, world_size)
    torch.set_num_threads(mp.cpu_count() // world_size)
    # create model and move it to GPU with id rank
    model = OneDeviceModel().to(rank) if torch.cuda.is_available() else OneDeviceModel().share_memory()
    # ddp_model = DDP(model, device_ids=[rank])
    ddp_model = DDP(model)
    for _ in range(num_epochs):
      for batch_idx, batch in enumerate(data[rank::world_size]):
          x, y = batch
          loss_fn = nn.MSELoss()
          optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

          optimizer.zero_grad()
          outputs = ddp_model(x)
          labels = y.to(rank) if torch.cuda.is_available() else y
          # Gradient synchronization communications take place during the backward pass and overlap with the backward computation.
          loss_fn(outputs, labels).backward()  # When the backward() returns, param.grad already contains the synchronized gradient tensor.
          optimizer.step()  # TODO how does the optimizer know to do the gradient step only once?

    print()
    print(f"Start running DDP with model parallel example on rank: {rank}.")
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    # Destroy a given process group, and deinitialize the distributed package
    cleanup()

def main():
    print()
    print('running main()')
    print(f'current process: {mp.current_process()}')
    print(f'pid: {os.getpid()}')
    parser = argparse.ArgumentParser()
    parser.add_argument('--world-size', default=1, type=int)
    args = parser.parse_args()
    assert num_batches % args.world_size == 0
    mp.spawn(run_parallel_training_loop, args=(args.world_size,), nprocs=args.world_size)

if __name__ == "__main__":
    print('starting __main__')
    main()
    print('Done!\a\n')
$ time python3 ddp.py --world-size 1 > /dev/null

real    0m59.092s
user    8m46.589s
sys     0m7.320s

$ time python3 ddp.py --world-size 1 > /dev/null

real    1m11.124s
user    10m54.209s
sys     0m9.595s

$ time python3 ddp.py --world-size 6 > /dev/null

real    0m18.348s
user    2m28.799s
sys     0m18.068s
$ time python3 ddp.py --world-size 12 > /dev/null

real    0m26.352s
user    4m3.074s
sys     0m39.179s
$ time python3 ddp.py --world-size 3 > /dev/null

real    0m23.047s
user    3m51.172s
sys     0m11.483s
$ time python3 ddp.py --world-size 4 > /dev/null

real    0m18.195s
user    2m55.241s
sys     0m12.841s
$ time python3 ddp.py --world-size 2 > /dev/null

real    0m26.955s
user    4m15.837s
sys     0m7.127s

If I remove the line

torch.set_num_threads(mp.cpu_count() // world_size)
$ time python3 ddp.py --world-size 4 > /dev/null

real    0m40.574s
user    6m39.176s
sys     0m19.025s

$ time python3 ddp.py --world-size 2 > /dev/null

real    0m28.066s
user    3m17.775s
sys     0m8.410s

$ time python3 ddp.py --world-size 1 > /dev/null

real    0m37.114s
user    2m19.743s
sys     0m4.866s

Using

torch.set_num_threads(mp.cpu_count() // world_size // 2)
$ time python3 ddp.py --world-size 6 > /dev/null

real    0m16.399s
user    1m38.915s
sys     0m20.780s

$ time python3 ddp.py --world-size 4 > /dev/null

real    0m15.649s
user    1m1.821s
sys     0m13.589s

$ time python3 ddp.py --world-size 3 > /dev/null

real    0m16.947s
user    1m29.696s
sys     0m10.069s

$ time python3 ddp.py --world-size 2 > /dev/null

real    0m21.851s
user    2m4.564s
sys     0m7.486s

My Opinion

DDP in a single node seems not particularly advantageous. Unless you have a model that does a lot of work that is particularly not well handled by pytorch intraop parallelism, have large batches, and preferrably models with less parameters and more operations, meaning less gradients to synchronize, e.g. a convolutional model on a very large input.

Other scenarios where DDP might be helpful is if you are using too much python in your model, instead of vectorized operations.

Meehan answered 8/2, 2022 at 8:16 Comment(14)
Does "Torch will use multiple CPU to parallelize operations" mean that an pytorch operation like your += and torch.sum( .) and A @ B might be parallelized on their own and thus might not be parallelizing as expected? What I'd like to do is break the for loop with DDP parallelism over say 8 processes and then the remaining CPUs to be distributed amongst those for parallel torch ops. I have a machine with 40 or 112 CPUs and I want to run a bunch of processes and the rest to be parallelized in the torch ops. Let me know if that doesn't make sense.Sizable
what is "to set intraop parallelism"? and how does one use that?Sizable
The rule of thumb is if you can run p processes, and you have c cpus, you should use a number of threads t * p < c, or only slightly more taking into account that some operations in your code will not be parallelized. If you want the best possible tunning you will have to run experiments with different work load distributions.Meehan
Added an example experiment to the answer.Meehan
ah cool! Say I have 40 cpus and I want to run 8 processes and want to run distributed pytorch (e.g. DDP). How do I choose the t parameter such that t * p < c == t * 8 < 40? Is there a pytorch flag or a environment variable I need to set?Sizable
I would start with torch.set_num_threads(5) in each process. Then I would check what is the number of physical threads, I guess it would be 20, in that case I would try to run with torch.set_num_threads(3) and torch.set_num_threads(2), or s mix of 2 and 3.Meehan
t=torch.set_num_threads(4) I think makes most sense as a first guess since t * p < c == 4 * 8 = 32 <= 40. Will try that. Thanks for the response.Sizable
I suggested 5, because 5*8==40, in theory that could run one thread in each CPU. Also if you want to make sure that a process will not compete with other, you can specify which CPUs each process will run on using taskset -p (assuming Linux OS)Meehan
hmmm that seems to complicate things...I wouldn't know how to choose that. If you have a suggestion feel free to share it! But overall I hope to have the answer we build for this be as simple for the user as possible. Nearly as DDP where you have 4 gpus you tell that to DDP and it works - as little teaking as needed (no need for the best solution, just that it's quick enough and fast enough...is my ideal answer).Sizable
To keep it simple you could use torch.set_num_threads(multiprocessing.cpu_count()//num_workers). Also, be aware that the number of data loader processes should respect the same limits. If the data loading is IO bounded there is no point on adding more workers, if it is bounded by the CPU adding more process will only increase the CPU congestion.Meehan
hmmm so if I am using DDP and I have num_workers=1 and I set num_procs=8, do I still ahve to set torch.set_num_threads(X)? I guess what I am confused is, I'd ideally want a DDP code that if it's running in CPU parallel then it sets torch.set_num_threads(X) automatically. It is ok to make an assumption or enforce the data loaders and their number of workers to have a value. What combination of values do we need to have it work (better than serial, even if it's not much better)? The simplest soln is what I'm hoping exists.Sizable
If you use world_size = mp.cpu_count() aren't you running 40 processes? In that case shouldn't you be using a ` torch.set_num_threads(1)` ? Could you please, compare this two situations? I am unable to run your example, but, could tell me why in your run_parallel_training_loop you simply iterate over enumerate(data), shouldn't you be selecting different slices of the data for each process? for instance in setup_process. Are you sure you are distributing and not repeating the work?Meehan
I put some energy in running your code, modifying it to make it not so well vectorizable, and checking the run time in a few different cases. Tweaking the number of processes, and the number of threads per process for intraop praralelization. I hope this makes the answer acceptable.Meehan
to give further context, I am not only interested in DDP but with the torchrun launching utility. Details here: github.com/learnables/learn2learn/issues/… The reason I mention this is because in normal training there is a loop through the data loader. And that is it. In my case there is an additional loop per batch (or meta-batch) because of meta-learning. Each batch is a batch of tasks so we have to loop again -- no way to parallelize that. So the need to loop through this meta-batch is what makes it really helpful to use distributed training.Sizable

© 2022 - 2024 — McMap. All rights reserved.