Run distributed Pytorch with OpenMPI across LAN and Virtual LAN nodes

1

Run distributed Pytorch with OpenMPI across LAN and Virtual LAN nodes

I have two Ubuntu nodes, having distributed PyTorch and heterogeneous OpenMPI installed from source. They both can connect to each other through passwordless SSH and have a shared NFS directory (home/nvidia/shared) which contains simple PyTorch script (distmpi.py) to be executed over OpenMPI.

Node-1 (xpa): a desktop PC having an IP 192.168.201.23 on LAN network interface enp4s0 (ip addr) Node-2 (hetero): a VM in OpenStack having Virtual IP 11.11.11.21 on vLAN interface ens3 and Floating IP 192.168.200.151 (ip addr, ifconfig)

Following error occurs when mpirun is launched to run 2 processes from XPS (1 on 192.168.201.23 and the other on 192.168.200.151)

(torch) nvidia@xps:~$ mpirun -v -np 2 -H 192.168.201.23:1,192.168.200.151 torch/bin/python shared/distmpi.py
--------------------------------------------------------------------------
Open MPI detected an inbound MPI TCP connection request from a peer
that appears to be part of this MPI job (i.e., it identified itself as
part of this Open MPI job), but it is from an IP address that is
unexpected.  This is highly unusual.

The inbound connection has been dropped, and the peer should simply
try again with a different IP interface (i.e., the job should
hopefully be able to continue).

  Local host:          xps
  Local PID:           7113
  Peer hostname:       192.168.200.151 ([[55343,1],1])
  Source IP of socket: 192.168.200.151
  Known IPs of peer:   
    11.11.11.21
--------------------------------------------------------------------------
[xps][[55343,1],0][btl_tcp_endpoint.c:796:mca_btl_tcp_endpoint_complete_connect] connect() to 11.11.11.21 failed: Connection timed out (110)

Please have a look at the python script e.g. distmpi.py, for reference:

#!/usr/bin/env python
import os
import socket
import torch
import torch.distributed as dist
from torch.multiprocessing import Process


def run(rank, size):
    tensor = torch.zeros(size)
    print(f"I am {rank} of {size} with tensor {tensor}")

    # incrementing the old tensor
    tensor += 1

    # sending tensor to next rank
    if rank == size-1:
       dist.send(tensor=tensor, dst=0)
    else:
       dist.send(tensor=tensor, dst=rank+1)

    # receiving tensor from previous rank
    if rank == 0:
        dist.recv(tensor=tensor, src=size-1)
    else:
        dist.recv(tensor=tensor, src=rank-1)

    print('Rank ', rank, ' has data ', tensor[0])
    pass


def init_processes(rank, size, hostname, fn, backend='mpi'):
    """ Initialize the distributed environment. """
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    world_size = int(os.environ['OMPI_COMM_WORLD_SIZE'])
    world_rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
    hostname = socket.gethostname()
    init_processes(world_rank, world_size, hostname, run, backend='mpi')

Regards.

Shakeel Anjum

Posted 2019-04-01T14:08:54.300

Reputation: 11

No answers