0 x00 the
Horovod is an easy-to-use, high-performance distributed training framework released by Uber in 2017 that has been widely used in the industry.
This series takes you through the source code analysis of Horovod. This is the eleventh article in a series to see how Horovod runs on Spark (implemented by GLOO).
Horovod on Spark has two underlying implementations: MPI and GLOO. Due to space constraints, this article introduces the GLOO implementation. In order to a single article can be written, so this article and the above part of the repetition, hope understanding.
The other articles in this series are as follows:
Horovod (1) — Basics
Horovod (2) — A distributed training framework for deep learning — from the user’s perspective
Horovod (3) — What’s behind Horovodrun
Horovod (4) — Network Basics & Driver
Horovod (5) — fusion framework
Horovod (6) — background architecture
Horovod (6), a distributed training framework for deep learning, is implemented using threads
Horovod (7) — DistributedOptimizer
Horovod (8) — on Spark
Horovod (9) — Start on Spark
Horovod (10) — Run on Spark
0 x01 review
1.1 Overall sequence diagram
Let’s first review the overall sequence diagram of Horovod on Spark. It should be noted that the overall sequence diagram, starting with MPI_run, is an MPI-related implementation, but this article is Gloo, so it will start with mPI_run differently.
1.2 Overall Logic
Overall, the overall logic of Horovod on Spark is divided into the following phases:
- Start SparkDriverService and use _make_SPARk_thread to start the Spark Task. Then horovod waits for the start to finish.
- Multithreading starts spark Tasks in the Spark Executor, and each task runs a SparkTaskService. SparkTaskService registers SparkDriverTask in the hovorod main process and waits for the next command to start.
- Horovod notifying each task to proceed to the next stage after receiving the completion of all tasks.
- Horovod call
mpi_run
(Use againmpirun_rsh.py
Start the MPI cluster by starting orted (in this case SparkTaskService) on each Spark Executor; - Orted runs training code on top of each Spark Executor;
Having already analyzed the first three stages, this article continues the analysis of the next two.
0x02 Stage 4: Start the Job
Let’s look at the fourth stage, which is how to run a training job.
2.1 GLOO VS the MPI
The question for this article is how Gloo differs from an MPI implementation.
2.1.1 Trouble with MPI
MPI is in trouble because:
- Usually, the MPI connects to the hosts through SSH, but this method does not enable Python funtion in the Spark Executor.
- Orted needs to run in the Spark Executor, but mpirun does not know the combination of the Spark Executor IP and PORT when it starts, so it cannot be started directly.
- So MPI uses RPC to start the user code:
- SparkDriverService: SparkTaskService: SparkDriverService: SparkTaskService Start SparkTaskService in Spark Executor, and then set SparkTaskService IP to: The PORT is registered with SparkDriverService in the Horovod main process.
- Using horovod. Spark. Driver. Mpirun_rsh to connect each Executor, then “remote shell” into the executors.
- Start Orted directly with SparkTaskService.
2.1.2 Gloo Key points
Let’s look at the key points of Gloo. In normal mode, the Gloo scheme would:
- Create a RendezvousServer with a KVStore, in which the driver stores information such as the IP of the workers involved in the communication into the KVStore. This information is used to help the worker call Gloo to construct an AllReduce communication ring.
- The worker can then call gloo to access the RendezvousServer to construct the communication ring.
In Horovod on Spark, the key points are:
- How is the RendezvousServer constructed, and how does the RendezvousServer know the IP :port of the Executor (or similar entity)?
- How does SparkTaskService on Executor communicate with RendezvousServer to know its own network information and that of its neighbors?
Let’s look at the code for an answer.
2.2 Review the startup process
Let’s start by reviewing how we started.
The logic of horovod.spark.run is:
- Handles various configurations such as timeout, nice… ;
- Obtain Spark information, for example, obtain SparkContext from PySpark.
- Build the SparkDriverService (SparkDriverService).
- Using _make_SPARk_thread to start spark executors (and one SparkTaskService per Spark Executor), a cluster is built.
- Each SparkTaskService will pass driver_service. SparkDriverClient. Register_task to horovod the Driver registration; This is where the RendezvousServer knows the SparkTaskService’s IP: PORT.
- Wait for all spark tasks to finish with _notify_and_register_task_addresses;
- using
_launch_job
Start training; - Use spark_thread.join to collect training results;
The key point is that SparkTaskService itself has an internal HTTP server that registers its IP: PORT information with the Driver.
2.3 _launch_job
Let’s start with _launch_job.
The _launch_job is simple:
- First of all,
driver.get_common_interfaces
Obtain the network routing information, which will be recorded by RendezvousServer and eventually utilized by SparkTaskService on Executor; - Next, call run_contoller to start the job;
def _launch_job(use_mpi, use_gloo, settings, driver, env, stdout=None, stderr=None) :
nics = driver.get_common_interfaces()
# Transfer network routing information when gloo_run is called.
run_controller(use_gloo, lambda: gloo_run(settings, nics, driver, env, stdout, stderr),
use_mpi, lambda: mpi_run(settings, nics, driver, env, stdout, stderr),
False.lambda: None,
settings.verbose)
Copy the code
2.3 Obtaining Route Information
The Driver’s get_common_interfaces are different from the get_common_interfaces in normal mode. At this time, the SparkTaskService information in the Spark Executor has been saved in the Driver. You can directly obtain the information.
def get_common_interfaces(self) :
if self._nics is not None:
return self._nics
nics = None
if len(self._task_addresses_for_tasks) > 0:
# in Elastic Horovod on Spark with auto-scaling
# keys in task_addresses are in range(max_np or proc_num)
# but not all keys may exist, so we don't do for index in range(proc_num)
indices = list(self._task_addresses_for_tasks.keys())
nics = set(self._task_addresses_for_tasks[indices[0]].keys())
for index in indices[1:]:
nics.intersection_update(self._task_addresses_for_tasks[index].keys())
return nics
Copy the code
2.4 run_controller
It is based on configuration and compilation to process, choose gloo, JS, or MPI.
def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity) :
if use_gloo:
gloo_run()
elif use_mpi:
mpi_run()
elif use_jsrun:
js_run()
else:
if mpi_built(verbose=verbose):
if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
js_run()
else:
mpi_run()
elif gloo_built(verbose=verbose):
gloo_run()
Copy the code
So let’s start the job and analyze the GLOO.
0 x03 Gloo implementation
Compared to the MPI, the Gloo part is clearer.
3.1 gloo_run
Go back to 2.3 run_controller.
It is based on configuration and compilation to process, choose gloo, JS, or MPI.
def run_controller(use_gloo, gloo_run, use_mpi, mpi_run, use_jsrun, js_run, verbosity) :
if use_gloo:
gloo_run() This article is called here
elif use_mpi:
mpi_run() # mpi will be called here
elif use_jsrun:
js_run()
else:
if mpi_built(verbose=verbose):
if lsf.LSFUtils.using_lsf() and is_jsrun_installed():
js_run()
else:
mpi_run() # mpi will be called here
elif gloo_built(verbose=verbose):
gloo_run() This article is called here
Copy the code
If gloo is configured, we use gloo_run:
def gloo_run(settings, nics, driver, env, stdout=None, stderr=None) :
""" Run distributed gloo jobs. :param settings: Settings for running the distributed jobs. Note: settings.num_proc and settings.hosts must not be None. :param nics: Interfaces to use by gloo. :param driver: The Spark driver service that tasks are connected to. :param env: Environment dictionary to use for running gloo jobs. Can be None. :param stdout: Horovod stdout is redirected to this stream. :param stderr: Horovod stderr is redirected to this stream. """
if env is None:
env = {}
# we don't want the key to be serialized along with settings from here on
key = settings.key
settings.key = None
# Each thread will use SparkTaskClient to launch the job on each remote host. If an
# error occurs in one thread, entire process will be terminated. Otherwise,
# threads will keep running and ssh session.
iface = list(nics)[0]
server_ip = driver.addresses()[iface][0] [0]
# Here builds the commands that need to be executed
command = (sys.executable,
'-m'.'horovod.spark.task.gloo_exec_fn'.# This is the code that runs in task
codec.dumps_base64(driver.addresses()),
codec.dumps_base64(settings))
# think of _exec_command_fn as the ability to execute commands
exec_command = _exec_command_fn(driver, key, settings, env,
stdout, stderr, settings.prefix_output_with_timestamp)
# Routing information is passed in
launch_gloo(command, exec_command, settings, nics, {}, server_ip)
Copy the code
It is important to note that _exec_command_fn is as follows. We can think of _exec_command_fn as the ability to execute a command:
def _exec_command_fn(driver, key, settings, env, stdout, stderr, prefix_output_with_timestamp) :
def _exec_command(command, slot_info, events) :
host = slot_info.hostname # the host name
local_rank = slot_info.local_rank # local rank
verbose = settings.verbose
# Runtime capability encapsulated in RSH
result = rsh(driver.addresses(), key, host, command, env, local_rank, verbose,
stdout, stderr, prefix_output_with_timestamp, False, events)
return result, time.time()
return _exec_command
Copy the code
From horovod.spark.driver. RSH import RSH is called. Here’s the key.
3.2 launch_gloo
Here are the main ones:
- First of all, notice that in the argument,
- Command is roughly: ‘python’,’-m’,’horovod.spark.task.gloo_exec_fn’;
- Exec_command is roughly RSH XXXX. Python -m horovod.spark.task.gloo_exec_fn python -m horovod.spark.task.gloo_exec_fn
- Establishes the RendezvousServer;
- Slot_info_to_command is built, which specifies which slot to run on;
- Call execute_function_multithreaded to run the command with multiple threads;
def launch_gloo(command, exec_command, settings, nics, env, server_ip) :
""" Launches the given command multiple times using gloo. Each command is launched via exec_command. :param command: command to launch :param exec_command: means to execute a single command :param settings: settings for the distribution :param nics: common interfaces :param env: environment to use :param server_ip: ip to use for rendezvous server """.# start global rendezvous server and get port that it is listening on
Set up RendezvousServer, which will be used by the underlying Gloo C++ environment
rendezvous = RendezvousServer(settings.verbose)
# allocate processes into slots
Horovod: horovod: horovod: Horovod: Horovod: Horovod: Horovod: Horovod: Horovod
hosts = parse_hosts(settings.hosts)
host_alloc_plan = get_host_assignments(hosts, settings.num_proc)
# start global rendezvous server and get port that it is listening on
global_rendezv_port = rendezvous.start()
rendezvous.init(host_alloc_plan)
# Obtain the executable command
run_command = get_run_command(command, server_ip, nics, global_rendezv_port)
# Get a slot command that can be executed on the slot
slot_info_to_command = _slot_info_to_command_fn(run_command, env)
event = register_shutdown_event()
# Build args_list based on slot_info_to_command_fn, where each ARG is a slot command
args_list = [[slot_info_to_command(slot_info), slot_info, [event]]
for slot_info in host_alloc_plan]
# If an error occurs in one thread, entire process will be terminated.
# Otherwise, threads will keep running.
# multithreaded execution, executing each arg (slot command) on each exec_command, args_list including information such as HOROVOD_GLOO_RENDEZVOUS_ADDR
res = threads.execute_function_multithreaded(exec_command,
args_list,
block_until_all_done=True)...Copy the code
The details are shown in the figure below:
launch_gloo( command ='python'.'+m'.'horovod.spark.task.gloo_exec_fn'
+ exec_command = rsh xxxx)
|
|
|
|
|
v
RendezvousServer
+
|
| get_run_command
|
|
v
run_command = HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222
HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ......
python +m horovod.spark.task.gloo_exec_fn
exec_command = rsh xxxx
+
|
| _slot_info_to_command_fn
|
v
slot_info_to_command = rank=0,local_rank=0,socket+ifname=eth0,cpu_operations=gloo......
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ...... python -m horovod.spark.task.gloo_exec_fn exec_command = rsh xxxx + | | | v threads.execute_function_multithreaded + | | vCopy the code
Mobile phones are as follows:
3.2.1 get_run_command
The key get_run_command used in the launch_gloo code is to call create_run_env_vars to get the information gloo needs and build the run_command based on it. The format is as follows:
HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python train.py
Copy the code
The code is as follows:
def get_run_command(command, server_ip, nics, port, elastic=False) :
env_vars = create_run_env_vars(server_ip, nics, port, elastic)
env_string = "".join(
[f"{k}={str(v)}" for k, v in env_vars.items()])
run_command = (
'{env_string} '
'{command}' # expect a lot of environment variables
.format(env_string=env_string,
command=' '.join(quote(par) for par in command)))
return run_command
Copy the code
3.2.2 create_run_env_vars
The create_run_env_vars function builds information about gloo running, which is then passed to the Spark Executor.
def create_run_env_vars(server_ip, nics, port, elastic=False) :
run_envs = {
'HOROVOD_GLOO_RENDEZVOUS_ADDR': server_ip,
'HOROVOD_GLOO_RENDEZVOUS_PORT': port,
'HOROVOD_CONTROLLER': "gloo".'HOROVOD_CPU_OPERATIONS': "gloo".'HOROVOD_GLOO_IFACE': list(nics)[0].# TODO: add multiple ifaces in future
'NCCL_SOCKET_IFNAME': ', '.join(nics), # Here is the network routing information needed to build the loop
}
if elastic:
run_envs["HOROVOD_ELASTIC"] = "1"
return run_envs
Copy the code
3.3 RSH
In execute_function_multithreaded, RSH is invoked and eventually interacts with the Spark Executor.
Specific will:
- Obtain the driver handle;
- Use the driver Handle to call SparkDriverClient to obtain task information.
- Obtain task handle;
- Call SparkTaskClient’s run_command method to send a command to the Spark Executor, The command parameter here is roughly “‘python -m horovod.spark.task.gloo_exec_fn”;
- Wait for the result of operation;
When RSH is called, Commands may include something like horovod_gloo_rendezvous OUS_ADDR=1.1.1.1 horoVOD_gloo_rendezvous OUS_port =2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo, so SparkDriverService knows how to construct a Ring route.
def rsh(driver_addresses, key, host_hash, command, env, local_rank, verbose,
stdout=None, stderr=None, prefix_output_with_timestamp=False,
background=True, events=None) :
""" Method to run a command remotely given a host hash, local rank and driver addresses. This method connects to the SparkDriverService running on the Spark driver, retrieves all information required to connect to the task with given local rank of that host hash and invoke the command there. The method returns immediately after launching the command if background is True (default). When background is set to False, this method waits for command termination and returns command's result. If there is an exception while waiting for the result (i.e. connection reset) it returns -1. :param driver_addresses: driver's addresses :param key: used for encryption of parameters passed across the hosts :param host_hash: host hash to connect to :param command: command and arguments to invoke :param env: environment to use :param local_rank: local rank on the host of task to run the command in :param verbose: verbosity level :param stdout: Task stdout is redirected to this stream. :param stderr: Task stderr is redirected to this stream. :param prefix_output_with_timestamp: shows timestamp in stdout/stderr forwarding on the driver if True :param background: run command in background if True, returns command result otherwise :param events: events to abort the command, only if background is True :return exit code if background is False """
if ':' in host_hash:
raise Exception('Illegal host hash provided. Are you using Open MPI 4.0.0+? ')
# Obtain the driver handle
driver_client = driver_service.SparkDriverClient(driver_addresses, key, verbose=verbose)
Use the configuration to determine which task to run
task_indices = driver_client.task_host_hash_indices(host_hash)
task_index = task_indices[local_rank]
task_addresses = driver_client.all_task_addresses(task_index)
# Get task handle
task_client = task_service.SparkTaskClient(task_index, task_addresses, key, verbose=verbose)
task_client.stream_command_output(stdout, stderr)
Python -m horovod.spark.task.gloo_exec_fn
task_client.run_command(command, env,
capture_stdout=stdout is not None,
capture_stderr=stderr is not None,
prefix_output_with_timestamp=prefix_output_with_timestamp)
if not background:
events = events or []
stop = threading.Event()
for event in events:
on_event(event, task_client.abort_command, stop=stop)
try:
exit_code = task_client.wait_for_command_exit_code()
return exit_code
except:
traceback.print_exc()
return -1
finally:
stop.set(a)Copy the code
Therefore, the logic is as follows: Finally, run python -m horovod.spark.task.gloo_exec_fn on the Spark Executor:
Horovod Job + Spark Host
|
SparkDriverService horovod.spark.run | SparkTaskService
+ + | +
| | | |
| v | |
| | |
| launch_gloo( command ='python'.'+m'.'horovod.spark.task.gloo_exec_fn' | |
| + exec_command = rsh xxxx) | |
| | | |
| | | |
| | | |
| | | |
| | | |
| v | |
| RendezvousServer | |
| + | |
| | | |
| | get_run_command | |
| | | |
| | | |
| v | |
| run_command = HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222 | |
| HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ...... | |
| python +m horovod.spark.task.gloo_exec_fn | |
| exec_command = rsh xxxx | |
| | |
| + | |
| | | |
| | _slot_info_to_command_fn | |
| | | |
| v | |
| | |
| slot_info_to_command = rank=0,local_rank=0,socket+ifname=eth0,cpu_operations=gloo...... | |
| HOROVOD_GLOO_RENDEZVOUS_ADDR=1.11.1. HOROVOD_GLOO_RENDEZVOUS_PORT=2222| | | HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo ...... | | | python +m horovod.spark.task.gloo_exec_fn | | | exec_command = rsh xxxx | | | + | | | | | | | | | | | v | | | threads.execute_function_multithreaded | | | + | | | | | | | v | | | rsh | | | + | | | <----------------------------------+ | | | | task_host_hash_indices | | | | | | | | <----------------------------------+ | run_command(command, env) | RunCommandRequest | | all_task_addresses | | | | | +---------------------------------------------------------------------------------------------> | | | | + | | | run command | | | + | | | | | | | | v v | | + vCopy the code
Mobile phones are as follows:
3.4 gloo_exec_fn
Note that the Executor on Spark Host is already running at this point.
Gloo_exec_fn corresponds to mpirun_exec_fn in the previous MPI version.
Spark Run horovod.spark.task.gloo_exec_fn on Executor.
The contents of horovod.spark.task.gloo_exec_fn are as follows:
from horovod.spark.task import task_exec
from horovod.runner.common.util import codec
def main(driver_addresses, settings) :
task_exec(driver_addresses, settings, 'HOROVOD_RANK'.'HOROVOD_LOCAL_RANK')
if __name__ == '__main__':
if len(sys.argv) ! =3:
print('Usage: %s <driver addresses> <settings>' % sys.argv[0])
sys.exit(1)
main(codec.loads_base64(sys.argv[1]), codec.loads_base64(sys.argv[2]))
Copy the code
0x04 Stage 5: Run user code
The task_exec function runs user code to train.
Task_exec is located in: horovod/spark/task/__init__.py
Specific will:
- Call SparkDriverClient to obtain task information.
- Call SparkTaskClient to get the user code;
- Execute user code and so on.
def task_exec(driver_addresses, settings, rank_env, local_rank_env) :
# Die if parent process terminates
in_thread(target=_parent_process_monitor, args=(os.getppid(),))
key = codec.loads_base64(os.environ[secret.HOROVOD_SECRET_KEY])
rank = int(os.environ[rank_env])
local_rank = int(os.environ[local_rank_env])
driver_client = driver_service.SparkDriverClient(driver_addresses, key,
verbose=settings.verbose)
# tell driver about local rank and rank
# in elastic mode the driver already knows this mapping
# for simplicity we keep code paths the same for elastic and static mode
host_hash = os.environ['HOROVOD_HOSTNAME']
task_index = driver_client.set_local_rank_to_rank(host_hash, local_rank, rank)
# gather available resources from task service
task_addresses = driver_client.all_task_addresses(task_index)
task_client = task_service.SparkTaskClient(task_index, task_addresses, key,
verbose=settings.verbose)
task_info.set_resources(task_client.resources())
fn, args, kwargs = driver_client.code()
result = fn(*args, **kwargs)
task_client.register_code_result(result)
Copy the code
The final code looks like this:
Horovod Job + Spark Host
|
SparkDriverService horovod.spark.run | SparkTaskService
+ + | +
| | | |
| v | |
| | |
| launch_gloo( command ='python'.'+m'.'horovod.spark.task.gloo_exec_fn' | |
| + exec_command = rsh xxxx) | |
| | | |
| | | |
| | | |
| | | |
| | | |
| v | |
| RendezvousServer | |
| + | |
| | | |
| | get_run_command | |
| | | |
| | | |
| v | |
| run_command = rendevous_addr, rendevous_port python -m horovod.spark.task.gloo_exec_fn | |
| exec_command = rsh xxxx | |
| | |
| + | |
| | | |
| | _slot_info_to_command_fn | |
| | | |
| v | |
| | |
| slot_info_to_command = rank=0,local_rank=0,socket+ifname=eth0,cpu_operations=gloo...... | | | rendevous_addr, rendevous_port python -m horovod.spark.task.gloo_exec_fn | | | exec_command = rsh xxxx | | | + | | | | | | | | | | | v | | | threads.execute_function_multithreaded | | | + | | | | | | | v | | | rsh | | | | | | | <----------------------------------+ | | | | task_host_hash_indices | | | | | | | | <----------------------------------+ | run_command(command, env) | RunCommandRequest | | all_task_addresses | | | | | +---------------------------------------------------------------------------------------------> | | | | | | | | run command | | | + | | | code() | | <----------------------------------------------------------------------------------------------------------------------- --------------+ | | | | | | +----------------------------------------------------------------------------------------------------------------------- --------------> | | | | code of gloo_exec_fn | | | | | | | | gloo_exec_fn | | | | | | | | | | | task_exec v | | | v | | + vCopy the code
Mobile phones are as follows:
0 x05 summary
In normal mode, the Gloo scheme will:
- Create a RendezvousServer with a KVStore, in which the driver stores information such as the IP of the workers involved in the communication into the KVStore.
- The worker can then call gloo to access the RendezvousServer to construct the communication ring.
In Horovod on Spark via GLOO, the key points are:
- How to construct the RendezvousServer? How does the RendezvousServer know the Executor IP :port?
- The answer is:
- In the driver of Horovod, RendezvousServer is created.
- In before initialization process, each can through driver_service SparkTaskService SparkDriverClient. Register_task to horovod the Driver registration; This is where the RendezvousServer tells you the SparkTaskService’s IP: PORT.
- The answer is:
- How does SparkTaskService on Executor communicate with RendezvousServer to know its own network information and that of its neighbors?
- The answer is:
- In execute_function_multithreaded, RSH is invoked and eventually interacts with the Spark Executor.
- When RSH is called, it will be similar to
HOROVOD_GLOO_RENDEZVOUS_ADDR = 1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT = 2222 HOROVOD_CPU_OPERATIONS = gloo HOROVOD_GLOO_IFACE = lo
The information is passed, including the RendezvousServer’s address, so that the SparkTaskService in the Spark Executor knows how to find the RendezvousServer, which in turn knows how to build the ring.
- The answer is:
Horovod on Spark: Horovod on Spark: Horovod on Spark
0xEE Personal information
Thoughts on life and technology
Wechat public account: Rosie’s Thinking
If you want to get updates on your own articles, or check out your own technical recommendations, please pay attention.