Skip to content

Elastic Allreduce using Horovod

HT edited this page Aug 4, 2020 · 2 revisions

We can implement elastic Allreduce based on Gloo. Gloo will not crash the process after communication failure, and supports re-initialization with different process groups. Gloo is a C++ implementation of a collective communications library. Horovod uses Gloo as one of its supported backends and also provides Python APIs. Thus we choose to use Horovod to implement elastic Allreduce in ElasticDL.

Master Code

The master will create a RendezvousServer. Each time the master detects worker number change, it will will assign new ranks to workers and re-create the RendezvousServer. The master will assign rank 0 to the oldest worker, as it will be used as the broadcast source to synchronize models among workers.

import horovod
from horovod.run.http.http_server import  RendezvousServer

# There will be name and API changes in 0.20. But the released version is 0.19 for now.
def if_use_runner():
    version = horovod.__version__
    split_version = version.split(".")
    if len(split_version) >= 2 and int(split_version[0]) == 0 and int(split_version[1]) < 20:
        return False
    return True

use_runner = if_use_runner()

if use_runner:
    from horovod.runner.common.util.hosts import get_host_assignments, parse_hosts
else:
    # Need to make a copy of these two functions from horovod.runner.common.util.hosts (0.20 version)
    from hosts import get_host_assignments, parse_hosts
import time

verbose = True
rendezvous = RendezvousServer(verbose)

# Assign host string as worker domain names or IPs.
hosts_str = "localhost:2"
# Set number of workers
num_proc = 2

hosts = parse_hosts(hosts_str)

host_alloc_plan = get_host_assignments(hosts, num_proc)

if use_runner:
    global_rendezv_port = rendezvous.start()
    rendezvous.init(host_alloc_plan)
else:
    global_rendezv_port = rendezvous.start_server(host_alloc_plan)

Worker Code

Each worker will query the master periodically to see if re-initialization of the Allreduce process group is needed. If needed, the master will tell each worker the new global_rendezv_port, rank, size, local_rank, local_size, cross_rank, cross_size. Then the worker will reset the Allreduce process group.

os.environ["HOROVOD_GLOO_RENDEZVOUS_ADDR"] = master_addr
os.environ["HOROVOD_GLOO_RENDEZVOUS_PORT"] = global_rendezv_port
os.environ["HOROVOD_RANK"] = str(rank)
os.environ["HOROVOD_SIZE"] = str(size)
os.environ["HOROVOD_LOCAL_RANK"] = str(local_rank)
os.environ["HOROVOD_LOCAL_SIZE"] = str(local_size)
os.environ["HOROVOD_CROSS_RANK"] = str(cross_rank)
os.environ["HOROVOD_CROSS_SIZE"] = str(cross_size)
os.environ["HOROVOD_CONTROLLER"] = "gloo"
os.environ["HOROVOD_CPU_OPERATIONS"] = "gloo"
os.environ["HOROVOD_HOSTNAME"] = worker_domain_name

hvd.shutdown()
hvd.init()