-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathtoy_paxos.py
581 lines (507 loc) · 23.5 KB
/
toy_paxos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
# Naive implementation of the Paxos protocol.
# Henry Robinson, 2009
# Licensed under GPL v2
# TODO:
# 1. Protocols log their state to persistent storage, and recover, rather than the fakery that's there at the moment.
# 2. No way for clients to know who the primary is
# 3. A leader that has failed and then wakes up will have no idea what the highest committed instance is. This can be helped, but the code is
# complex enough and it isn't so much an error condition as a pain.
# 4. Exercise for the reader: notify the client about the result of its request.
# 5. Garbage collect unneeded proposals, and re-propose those that seem to have stalled. Easy to do this from recvMessage.
# The idea of this Paxos implementation is to come to agreement on a
# history of values (which may represent commands in a state machine)
# We have two main players: PaxosLeader and PaxosAcceptor. PaxosLeader listens for proposals from external clients
# and runs the protocol with whichever PaxosAcceptors are currently correct.
# If a leader fails, another leader will take over once it realises. If clients fail, the protcol will still run until
# more than half have failed.
# This is designed to run all on one machine: each actor has a port number, but all are bound to localhost. Would be reasonably
# trivial to generalise this.
# See bottom of file for demonstration of use.
import threading, socket, pickle, Queue
class Message( object ):
MSG_ACCEPTOR_AGREE = 0
MSG_ACCEPTOR_ACCEPT = 1
MSG_ACCEPTOR_REJECT = 2
MSG_ACCEPTOR_UNACCEPT = 3
MSG_ACCEPT = 4
MSG_PROPOSE = 5
MSG_EXT_PROPOSE = 6
MSG_HEARTBEAT = 7
def __init__( self, command = None ):
self.command = command
def copyAsReply( self, message ):
self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to
self.value = message.value
class MessagePump( threading.Thread ):
"""The MessagePump encapsulates the socket connection, and is responsible for feeding messages to its owner"""
class MPHelper( threading.Thread ):
"""The reason for this helper class is to pull things off the socket as fast as we can, to avoid
filling the buffer. It might have been easier to use TCP, in retrospect :)"""
def __init__( self, owner ):
self.owner = owner
threading.Thread.__init__( self )
def run( self ):
while not self.owner.abort:
try:
(bytes, addr) = self.owner.socket.recvfrom( 2048 )
msg = pickle.loads( bytes )
msg.source = addr[1]
self.owner.queue.put( msg )
except:
pass
def __init__( self, owner, port, timeout=2 ):
self.owner = owner
threading.Thread.__init__( self )
self.abort = False
self.timeout = 2
self.port = port
self.socket = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
self.socket.setsockopt( socket.SOL_SOCKET, socket.SO_RCVBUF, 200000 )
self.socket.bind( ("localhost", port) )
self.socket.settimeout( timeout )
self.queue = Queue.Queue( )
self.helper = MessagePump.MPHelper( self )
def run( self ):
self.helper.start( )
while not self.abort:
message = self.waitForMessage( )
# This needs to be blocking, otherwise there's a world
# of multi-threaded pain awaiting us
self.owner.recvMessage( message )
def waitForMessage( self ):
try:
msg = self.queue.get( True, 3 )
return msg
except: # ugh, specialise the exception!
return None
def sendMessage( self, message ):
bytes = pickle.dumps( message )
address = ("localhost", message.to)
self.socket.sendto( bytes, address )
return True
def doAbort( self ):
self.abort = True
import random
class AdversarialMessagePump( MessagePump ):
"""The adversarial message pump randomly delays messages and delivers them in arbitrary orders"""
def __init__( self, owner, port, timeout=2 ):
MessagePump.__init__( self, owner, port, timeout )
self.messages = set( )
def waitForMessage( self ):
try:
msg = self.queue.get( True, 0.1 )
self.messages.add( msg )
except: # ugh, specialise the exception!
pass
if len(self.messages) > 0 and random.random( ) < 0.95: # Arbitrary!
msg = random.choice( list( self.messages ) )
self.messages.remove( msg )
else:
msg = None
return msg
class InstanceRecord( object ):
"""This is a bookkeeping class, which keeps a record of all proposals we've seen or undertaken for a given record,
both on the acceptor and the leader"""
def __init__( self ):
self.protocols = {}
self.highestID = (-1,-1)
self.value = None
def addProtocol( self, protocol ):
self.protocols[ protocol.proposalID ] = protocol
if protocol.proposalID[1] > self.highestID[1] or (protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
self.highestID = protocol.proposalID
def getProtocol( self, protocolID ):
return self.protocols[ protocolID ]
def cleanProtocols( self ):
keys = self.protocols.keys( )
for k in keys:
protocol = self.protocols[k]
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print "Deleting protocol"
del self.protocols[k]
class PaxosLeader( object ):
def __init__(self, port, leaders=None, acceptors=None):
self.port = port
if leaders == None:
self.leaders = []
else:
self.leaders = leaders
if acceptors == None:
self.acceptors = []
else:
self.acceptors = acceptors
self.group = self.leaders + self.acceptors
self.isPrimary = False
self.proposalCount = 0
self.msgPump = MessagePump( self, port )
self.instances = {}
self.hbListener = PaxosLeader.HeartbeatListener( self )
self.hbSender = PaxosLeader.HeartbeatSender( self )
self.highestInstance = -1
self.stopped = True
# The last time we tried to fix up any gaps
self.lasttime = time.time( )
#------------------------------------------------------
# These two classes listen for heartbeats from other leaders
# and, if none appear, tell this leader that it should
# be the primary
class HeartbeatListener( threading.Thread ):
def __init__( self, leader ):
self.leader = leader
self.queue = Queue.Queue( )
self.abort = False
threading.Thread.__init__( self )
def newHB( self, message ):
self.queue.put( message )
def doAbort( self ): self.abort = True
def run( self ):
elapsed = 0
while not self.abort:
s = time.time( )
try:
hb = self.queue.get( True, 2 )
# Easy way to settle conflicts - if your port number is bigger than mine,
# you get to be the leader
if hb.source > self.leader.port:
self.leader.setPrimary( False )
except: # Nothing was got
self.leader.setPrimary( True )
class HeartbeatSender( threading.Thread ):
def __init__( self, leader ):
self.leader = leader
self.abort = False
threading.Thread.__init__( self )
def doAbort( self ): self.abort = True
def run( self ):
while not self.abort:
time.sleep( 1 )
if self.leader.isPrimary:
msg = Message( Message.MSG_HEARTBEAT )
msg.source = self.leader.port
for l in self.leader.leaders:
msg.to = l
self.leader.sendMessage( msg )
#------------------------------------------------------
def sendMessage( self, message ):
self.msgPump.sendMessage( message )
def start( self ):
self.hbSender.start( )
self.hbListener.start( )
self.msgPump.start( )
self.stopped = False
def stop( self ):
self.hbSender.doAbort( )
self.hbListener.doAbort( )
self.msgPump.doAbort( )
self.stopped = True
def setPrimary( self, primary ):
if self.isPrimary != primary:
# Only print if something's changed
if primary:
print "I (%s) am the leader" % self.port
else:
print "I (%s) am NOT the leader" % self.port
self.isPrimary = primary
#------------------------------------------------------
def getGroup( self ):
return self.group
def getLeaders( self ):
return self.leaders
def getAcceptors( self ):
return self.acceptors
def getQuorumSize( self ):
return (len(self.getAcceptors( ) ) / 2) + 1
def getInstanceValue( self, instanceID ):
if instanceID in self.instances:
return self.instances[ instanceID ].value
return None
def getHistory( self ):
return [ self.getInstanceValue( i ) for i in xrange( 1, self.highestInstance+1 ) ]
def getNumAccepted( self ):
return len( [v for v in self.getHistory( ) if v != None] )
#------------------------------------------------------
def findAndFillGaps( self ):
# if no message is received, we take the chance to do a little cleanup
for i in xrange(1,self.highestInstance):
if self.getInstanceValue( i ) == None:
print "Filling in gap", i
self.newProposal( 0, i ) # This will either eventually commit an already accepted value, or fill in the gap with 0 or no-op
self.lasttime = time.time( )
def garbageCollect( self ):
for i in self.instances:
self.instances[i].cleanProtocols( )
def recvMessage( self, message ):
"""Message pump will call this periodically, even if there's no message available"""
if self.stopped: return
if message == None:
# Only run every 15s otherwise you run the risk of cutting good protocols off in their prime :(
if self.isPrimary and time.time( ) - self.lasttime > 15.0:
self.findAndFillGaps( )
self.garbageCollect( )
return
if message.command == Message.MSG_HEARTBEAT:
self.hbListener.newHB( message )
return True
if message.command == Message.MSG_EXT_PROPOSE:
print "External proposal received at", self.port, self.highestInstance
if self.isPrimary:
self.newProposal( message.value )
# else ignore - we're getting proposals when we're not the primary
# what we should do, if we were being kind, is reply with a message saying 'leader has changed'
# and giving the address of the new one. However, we might just as well have failed.
return True
if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:
self.instances[ message.instanceID ].getProtocol(message.proposalID).doTransition( message )
# It's possible that, while we still think we're the primary, we'll get a
# accept message that we're only listening in on.
# We are interested in hearing all accepts, so we play along by pretending we've got the protocol
# that's getting accepted and listening for a quorum as per usual
if message.command == Message.MSG_ACCEPTOR_ACCEPT:
if message.instanceID not in self.instances:
self.instances[ message.instanceID ] = InstanceRecord( )
record = self.instances[ message.instanceID ]
if message.proposalID not in record.protocols:
protocol = PaxosLeaderProtocol( self )
# We just massage this protocol to be in the waiting-for-accept state
protocol.state = PaxosLeaderProtocol.STATE_AGREED
protocol.proposalID = message.proposalID
protocol.instanceID = message.instanceID
protocol.value = message.value
record.addProtocol( protocol )
else:
protocol = record.getProtocol( message.proposalID )
# Should just fall through to here if we initiated this protocol instance
protocol.doTransition( message )
return True
def newProposal( self, value, instance = None ):
protocol = PaxosLeaderProtocol( self )
if instance == None:
self.highestInstance += 1
instanceID = self.highestInstance
else:
instanceID = instance
self.proposalCount += 1
id = (self.port, self.proposalCount )
if instanceID in self.instances:
record = self.instances[ instanceID ]
else:
record = InstanceRecord( )
self.instances[ instanceID ] = record
protocol.propose( value, id, instanceID )
record.addProtocol( protocol )
def notifyLeader( self, protocol, message ):
# Protocols call this when they're done
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print "Protocol instance %s accepted with value %s" % (message.instanceID, message.value)
self.instances[ message.instanceID ].accepted = True
self.instances[ message.instanceID ].value = message.value
self.highestInstance = max( message.instanceID, self.highestInstance )
return
if protocol.state == PaxosLeaderProtocol.STATE_REJECTED:
# Look at the message to find the value, and then retry
# Eventually, assuming that the acceptors will accept some value for
# this instance, the protocol will complete.
self.proposalCount = max( self.proposalCount, message.highestPID[1] )
self.newProposal( message.value )
return True
if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:
pass
class PaxosLeaderProtocol( object ):
# State variables
STATE_UNDEFINED = -1
STATE_PROPOSED = 0
STATE_AGREED = 1
STATE_REJECTED = 2
STATE_ACCEPTED = 3
STATE_UNACCEPTED = 4
def __init__( self, leader ):
self.leader = leader
self.state = PaxosLeaderProtocol.STATE_UNDEFINED
self.proposalID = (-1,-1)
self.agreecount, self.acceptcount = (0,0)
self.rejectcount, self.unacceptcount = (0,0)
self.instanceID = -1
self.highestseen = (0,0)
def propose( self, value, pID, instanceID ):
self.proposalID = pID
self.value = value
self.instanceID = instanceID
message = Message( Message.MSG_PROPOSE )
message.proposalID = pID
message.instanceID = instanceID
message.value = value
for server in self.leader.getAcceptors( ):
message.to = server
self.leader.sendMessage( message )
self.state = PaxosLeaderProtocol.STATE_PROPOSED
return self.proposalID
def doTransition( self, message ):
"""We run the protocol like a simple state machine. It's not always
okay to error on unexpected inputs, however, due to message delays, so we silently
ignore inputs that we're not expecting."""
if self.state == PaxosLeaderProtocol.STATE_PROPOSED:
if message.command == Message.MSG_ACCEPTOR_AGREE:
self.agreecount += 1
if self.agreecount >= self.leader.getQuorumSize( ):
# print "Achieved agreement quorum, last value replied was:", message.value
if message.value != None: # If it's none, can do what we like. Otherwise we have to take the highest seen proposal
if message.sequence[0] > self.highestseen[0] or (message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[1]):
self.value = message.value
self.highestseen = message.sequence
self.state = PaxosLeaderProtocol.STATE_AGREED
# Send 'accept' message to group
msg = Message( Message.MSG_ACCEPT )
msg.copyAsReply( message )
msg.value = self.value
msg.leaderID = msg.to
for s in self.leader.getAcceptors( ):
msg.to = s
self.leader.sendMessage( msg )
self.leader.notifyLeader( self, message )
return True
if message.command == Message.MSG_ACCEPTOR_REJECT:
self.rejectcount += 1
if self.rejectcount >= self.leader.getQuorumSize( ):
self.state = PaxosLeaderProtocol.STATE_REJECTED
self.leader.notifyLeader( self, message )
return True
if self.state == PaxosLeaderProtocol.STATE_AGREED:
if message.command == Message.MSG_ACCEPTOR_ACCEPT:
self.acceptcount += 1
if self.acceptcount >= self.leader.getQuorumSize( ):
self.state = PaxosLeaderProtocol.STATE_ACCEPTED
self.leader.notifyLeader( self, message )
if message.command == Message.MSG_ACCEPTOR_UNACCEPT:
self.unacceptcount += 1
if self.unacceptcount >= self.leader.getQuorumSize( ):
self.state = PaxosLeaderProtocol.STATE_UNACCEPTED
self.leader.notifyLeader( self, message )
pass
class PaxosAcceptor( object ):
def __init__(self, port,leaders ):
self.port = port
self.leaders = leaders
self.instances = {}
self.msgPump = MessagePump( self, self.port )
self.failed = False
def start( self ):
self.msgPump.start( )
def stop( self ):
self.msgPump.doAbort( )
def fail( self ):
self.failed = True
def recover( self ):
self.failed = False
def sendMessage( self, message ):
self.msgPump.sendMessage( message )
def recvMessage( self, message ):
if message == None: return
if self.failed:
return # Failure means ignored and lost messages
if message.command == Message.MSG_PROPOSE:
if message.instanceID not in self.instances:
record = InstanceRecord( )
self.instances[ message.instanceID ] = record
protocol = PaxosAcceptorProtocol( self )
protocol.recvProposal( message )
self.instances[ message.instanceID ].addProtocol( protocol )
else:
self.instances[ message.instanceID ].getProtocol( message.proposalID ).doTransition( message )
def notifyClient( self, protocol, message ):
if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED:
self.instances[ protocol.instanceID ].value = message.value
# print "Proposal accepted at client: ", message.value
def getHighestAgreedProposal( self, instance ):
return self.instances[ instance ].highestID
def getInstanceValue( self, instance ):
return self.instances[ instance ].value
class PaxosAcceptorProtocol( object ):
# State variables
STATE_UNDEFINED = -1
STATE_PROPOSAL_RECEIVED = 0
STATE_PROPOSAL_REJECTED = 1
STATE_PROPOSAL_AGREED = 2
STATE_PROPOSAL_ACCEPTED = 3
STATE_PROPOSAL_UNACCEPTED = 4
def __init__( self, client ):
self.client = client
self.state = PaxosAcceptorProtocol.STATE_UNDEFINED
def recvProposal( self, message ):
if message.command == Message.MSG_PROPOSE:
self.proposalID = message.proposalID
self.instanceID = message.instanceID
# What's the highest already agreed proposal for this instance?
(port, count) = self.client.getHighestAgreedProposal( message.instanceID )
# Check if this proposal is numbered higher
if count < self.proposalID[0] or (count == self.proposalID[0] and port < self.proposalID[1]):
# Send agreed message back, with highest accepted value (if it exists)
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED
# print "Agreeing to proposal: ", message.instanceID, message.value
value = self.client.getInstanceValue( message.instanceID )
msg = Message( Message.MSG_ACCEPTOR_AGREE )
msg.copyAsReply( message )
msg.value = value
msg.sequence = (port, count)
self.client.sendMessage( msg )
else:
# Too late, we already told someone else we'd do it
# Send reject message, along with highest proposal id and its value
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED
return self.proposalID
else:
# error, trying to receive a non-proposal?
pass
def doTransition( self, message ):
if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:
self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED
# Could check on the value here, if we don't trust leaders to honour what we tell them
# send reply to leader acknowledging
msg = Message( Message.MSG_ACCEPTOR_ACCEPT )
msg.copyAsReply( message )
for l in self.client.leaders:
msg.to = l
self.client.sendMessage( msg )
self.notifyClient( message )
return True
raise Exception( "Unexpected state / command combination!" )
def notifyClient( self, message ):
self.client.notifyClient( self, message )
import time
if __name__ == '__main__':
numclients = 5
clients = [ PaxosAcceptor( port, [54321,54322] ) for port in xrange( 64320, 64320+numclients ) ]
leader = PaxosLeader( 54321, [54322], [c.port for c in clients] )
leader2 = PaxosLeader( 54322, [54321], [c.port for c in clients] )
leader.start( )
leader.setPrimary( True )
leader2.setPrimary( True )
leader2.start( )
for c in clients:
c.start( )
clients[0].fail( )
clients[1].fail( )
# clients[2].fail( )
# Send some proposals through to test
s = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
start = time.time( )
for i in xrange(1000):
m = Message( Message.MSG_EXT_PROPOSE )
m.value = 0 + i
m.to = 54322
bytes = pickle.dumps( m )
s.sendto( bytes, ("localhost", m.to) )
while leader2.getNumAccepted( ) < 999:
print "Sleeping for 1s -- accepted:", leader2.getNumAccepted( )
time.sleep( 1 )
end = time.time( )
print "Sleeping for 10s"
time.sleep( 10 )
print "Stopping leaders"
leader.stop( )
leader2.stop( )
print "Stopping clients"
for c in clients:
c.stop( )
print "Leader 1 history: ", leader.getHistory( )
print "Leader 2 history: ", leader2.getHistory( )
print end - start