Skip to content

Commit

Permalink
feat(spec): add timeout to Blocksync (#568)
Browse files Browse the repository at this point in the history
* start with timeout

* spec/quint: additional fix post merge

* spec/quint: re-tabbing client.qnt

* added timeoutactions and a test

* commented timeout test

* Update specs/quint/specs/blocksync/bsyncWithConsensus.qnt

* spec/quint: re-tabbing client.qnt

* spec/quint: re-tabbing syncStatemachine.qnt

* spec/quint: re-tabbing bsyncWithConsensus.qnt

---------

Co-authored-by: Daniel Cason <[email protected]>
  • Loading branch information
josef-widder and cason authored Nov 20, 2024
1 parent 6a744fb commit efa36c7
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 28 deletions.
25 changes: 24 additions & 1 deletion specs/quint/specs/blocksync/bsyncWithConsensus.qnt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ module bsyncWithConsensus {
syncUnchangedAll
}



/// main step function: either a consensus state-machine step or a block sync protocol step
action stepWithBlockSync = any {
// consensus takes a step
Expand Down Expand Up @@ -134,7 +136,7 @@ module bsyncWithConsensus {
.then(3.reps(_ => syncValStep("v4")))
.expect(system.get("v4").es.chain.length() > h)

run retreat =
run parisRetreat =
nondet heightToReach = 1.to(4).oneOf()
initHeight( q::debug ("Height:", heightToReach))
// FIXME: I had to put it here, instead of syncCycle(h)
Expand All @@ -145,4 +147,25 @@ module bsyncWithConsensus {
.then(newHeightActionAll("v4", validatorSet, heightToReach))
.expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height)
// and now v4 has synced !

run LausanneRetreat =
initHeight(2)
// FIXME: I had to put it here, instead of syncCycle(h)
// This is not ideal, but it works.
.then(syncUpdateServer("v2"))
.then(newHeightActionAll("v4", validatorSet, 0))
.then(all{unchangedAll, syncStatusStep("v2")})
.then(all{unchangedAll, syncDeliverStatus("v4")})
.then(syncStepClient("v4", putSyncOutputIntoNode)) // ask for certificate
// request for certificate is sent to v2
.expect(requestsBuffer.get("v2").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v2" }))
// v3 wakes up and sends it status to v4
.then(syncUpdateServer("v3"))
.then(all{unchangedAll, syncStatusStep("v3")})
.then(all{unchangedAll, syncDeliverStatus("v4")})
// v4's request to v2 times out...
.then(all{unchangedAll, syncClientTimeout("v4")})
// after handling the timeout a request for certificate is sent to v3
.expect(requestsBuffer.get("v3").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v3" }))

}
58 changes: 54 additions & 4 deletions specs/quint/specs/blocksync/client.qnt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module bsync_client {
//

/// Initialize the synchronizer
pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient =
pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient =
{
id: id,
peerStatus: peers.mapBy(x => {base:-1, top:-1}),
Expand Down Expand Up @@ -65,8 +65,6 @@ module bsync_client {
req: Option[RequestMsg]
}



/// We have received a certificate. now we need to issue the
/// corresponding block request and generate a certificate output.
pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult =
Expand All @@ -88,7 +86,7 @@ module bsync_client {
/// step of a client:
/// 1. update peer statuses, 2. if there is no open request, request something
/// 3. otherwise check whether we have a response and act accordingly
pure def bsyncClient (s: BsyncClient) : ClientResult =
pure def bsyncClientLogic (s: BsyncClient) : ClientResult =
val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs)
val newS = { ...s, peerStatus: newPeerStates}
if (s.lastSyncedHeight >= s.height)
Expand Down Expand Up @@ -130,6 +128,29 @@ module bsync_client {
so: SONoOutput,
req: None}

/// step of a client upon timeout: We send the same request to a different server
// FIXME: what to do if goodPeers is empty?
pure def bsyncClientTimeout(s: BsyncClient, toMsg: RequestMsg) : ClientResult =
val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs)

val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height
and s.height <= newPeerStates.get(p).top ).
exclude(Set(toMsg.server))
if (goodPeers.size() > 0)
val req = {... toMsg, server: goodPeers.fold("", (acc, i) => i)}
val newS = { ...s, peerStatus: newPeerStates,
openRequests: s.openRequests.exclude(Set(toMsg))
.union(Set(req))}
{ sync: newS,
so: SONoOutput,
req: Some(req)}
else
{ sync: { ...s, peerStatus: newPeerStates,
openRequests: s.openRequests.exclude(Set(toMsg))},
so: SONoOutput,
req: None}


// State machine

var bsyncClients: Address -> BsyncClient
Expand Down Expand Up @@ -160,6 +181,7 @@ module bsync_client {
}
}


// deliver a response message, from responsesBuffer, to node
action deliverResponse(node) = all {
responsesBuffer.get(node).size() > 0,
Expand All @@ -184,4 +206,32 @@ module bsync_client {
responsesBuffer' = responsesBuffer,
}

// Client v takes a step
action clientLogic(v, outputAction) = all {
val result = bsyncClientLogic(bsyncClients.get(v))
all {
// updates the client's state
applyClientUpdate(v, result),
// This is the part that interacts with the consensus.
// It can be the driver or the mocked consensus logic.
outputAction(v, result.so),
}
}


action clientTimeout(node) = all {
bsyncClients.get(node).openRequests.size() > 0,
nondet toMsg = bsyncClients.get(node).openRequests.oneOf()
val result = bsyncClientTimeout(bsyncClients.get(node), toMsg)
all {
bsyncClients' = bsyncClients.put(node, result.sync),
requestsBuffer' = match result.req {
| Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m)))
| None => requestsBuffer
},
},
statusBuffer' = statusBuffer,
responsesBuffer' = responsesBuffer,
}

}
41 changes: 18 additions & 23 deletions specs/quint/specs/blocksync/syncStatemachine.qnt
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,48 @@ module syncStatemachine {
// Implemented by the blocksync server or client.
//

// deliver a status message to client v
/// Deliver a status message to client v
action syncDeliverStatus(v) = all {
deliverStatus(v),
unchangedServer,
}

// deliver a response to client v
/// Deliver a response to client v
action syncDeliverResp(v) = all {
deliverResponse(v),
unchangedServer,
}

// deliver a request to server v
/// Deliver a request to server v
action syncDeliverReq(v) = all {
deliverRequest(v),
unchangedClient,
}

/// any blocksync step independent of consensus. unchange is an action
/// that can be used for composition
/// Client v runs a protocol step
action syncStepClient(v, out) = all {
clientLogic(v, out),
unchangedServer,
}

/// A client v's request times out
action syncClientTimeout(v) = all {
clientTimeout(v),
unchangedServer,
}

/// any blocksync step independent of consensus.
/// unchange is an action that can be used for composition
action pureSyncStep (v, unchange) = all {
any {
syncDeliverReq(v),
syncDeliverResp(v),
syncDeliverStatus(v),
syncStepServer(v),
syncStatusStep(v),
syncClientTimeout(v),
},
unchange
}


// Client v takes a step
action syncStepClient(v, outputAction) = all {
val result = bsyncClient(bsyncClients.get(v))
all {
// updates the client's state
applyClientUpdate(v, result),
// This is the part that interacts with the consensus.
// It can be the driver or the mocked consensus logic.
outputAction(v, result.so),
unchangedServer,
}
}





}

0 comments on commit efa36c7

Please sign in to comment.