forked from vlimant/mpi_opt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess_block.py
140 lines (128 loc) · 5.78 KB
/
process_block.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import time
import numpy as np
import os
import hashlib
import mpi_learn.mpi.manager as mm
import mpi_learn.train.model as model
from tag_lookup import tag_lookup
class ProcessBlock(object):
"""
This class represents a block of processes that run model training together.
Attributes:
comm_world: MPI communicator with all processes.
Used to communicate with process 0, the coordinator
comm_block: MPI communicator with the processes in this block.
Rank 0 is the master, other ranks are workers.
algo: MPI Algo object
data: MPI Data object
device: string indicating which device (cpu or gpu) should be used
epochs: number of training epochs
train_list: list of training data files
val_list: list of validation data files
verbose: print detailed output from underlying mpi_learn machinery
"""
def __init__(self, comm_world, comm_block, algo, data, device, model_provider,
epochs, train_list, val_list, folds=1,
num_masters=1,
num_process=1,
verbose=False,
early_stopping=None,
target_metric=None,
monitor=False):
print("Initializing ProcessBlock")
self.comm_world = comm_world
self.comm_block = comm_block
self.folds = folds
self.num_masters = num_masters
self.num_process = num_process
self.algo = algo
self.data = data
self.device = device
self.model_provider = model_provider
self.epochs = epochs
self.train_list = train_list
self.val_list = val_list
self.verbose = verbose
self.last_params = None
self.early_stopping=early_stopping
self.target_metric=target_metric
self.monitor = monitor
self.label = None
self.current_builder = None
def ranks(self):
return "Process {}, sub-process {}".format( self.comm_world.Get_rank(), self.comm_block.Get_rank() )
def wait_for_model(self):
"""
Blocks until the parent sends a parameter set
indicating the model that should be trained.
"""
print("{} waiting for model params".format(self.ranks()))
self.last_params = self.comm_world.recv(source=0, tag=tag_lookup('params'))
params = self.last_params
if params is not None:
print ("{} received parameters {}".format( self.ranks(), params))
model_builder = self.model_provider.builder(*params)
if model_builder:
model_builder.comm = self.comm_block
model_builder.device = model_builder.get_device_name(self.device)
self.current_builder = model_builder
else:
self.current_builder = None
return True
return False
def train_model(self):
if self.current_builder is None:
# Invalid model, return nonsense FoM
return np.nan
fake_train = False
if fake_train:
if self.comm_block.Get_rank() == 0:
time.sleep(abs(np.random.randn()*30))
result = np.random.randn()
print("{} finished training with result {}".format(self.ranks(), result))
return result
else:
print("{} creating MPIManager".format(self.ranks()))
## need to reset this part to avoid cached values
self.algo.reset()
manager = mm.MPIKFoldManager( self.folds,
self.comm_block, self.data, self.algo, self.current_builder,
self.epochs, self.train_list, self.val_list,
num_masters=self.num_masters,
num_process=self.num_process,
verbose=self.verbose,
early_stopping=self.early_stopping,
target_metric=self.target_metric,
monitor=self.monitor)
manager.train()
fom = manager.figure_of_merit()
manager.manager.process.record_details(
json_name='block-{}-{}-{}-{}-history.json'.format(self.label if self.label else "",
hashlib.md5(str(self.last_params).encode('utf-8')).hexdigest(),
self.comm_world.Get_rank(),os.getpid()),
meta={'parameters': list(map(float,self.last_params)),
'fold' : manager.fold_num})
manager.free_comms()
return fom
def send_result(self, result):
if self.comm_block.Get_rank() == 0:
## only the rank=0 in the block is sending back his fom
print("{} sending result {} to coordinator".format(self.ranks(), result))
self.comm_world.isend(result, dest=0, tag=tag_lookup('result'))
def run(self):
"""
Awaits instructions from the parent to train a model.
Then trains it and returns the loss to the parent.
"""
while True:
self.comm_block.Barrier()
print("{} waiting for model".format(self.ranks()))
have_builder = self.wait_for_model()
if not have_builder:
print("{} received exit signal from coordinator".format(self.ranks()))
break
print("{} will train model".format(self.ranks()))
fom = self.train_model()
print("{} will send result if needed".format(self.ranks()))
self.send_result(fom)
self.comm_world.Barrier()