Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spec): add timeout to Blocksync #568

Merged
merged 11 commits into from
Nov 20, 2024
27 changes: 26 additions & 1 deletion specs/quint/specs/blocksync/bsyncWithConsensus.qnt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
syncUnchangedAll
}



/// main step function: either a consensus state-machine step or a block sync protocol step
action stepWithBlockSync = any {
// consensus takes a step
Expand Down Expand Up @@ -134,7 +136,7 @@
.then(3.reps(_ => syncValStep("v4")))
.expect(system.get("v4").es.chain.length() > h)

run retreat =
run parisRetreat =
nondet heightToReach = 1.to(4).oneOf()
initHeight( q::debug ("Height:", heightToReach))
// FIXME: I had to put it here, instead of syncCycle(h)
Expand All @@ -145,4 +147,27 @@
.then(newHeightActionAll("v4", validatorSet, heightToReach))
.expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height)
// and now v4 has synced !

run LausanneRetreat =
initHeight(2)
// FIXME: I had to put it here, instead of syncCycle(h)
// This is not ideal, but it works.
.then(syncUpdateServer("v2"))
.then(newHeightActionAll("v4", validatorSet, 0))
.then(all{unchangedAll, syncStatusStep("v2")})
.then(all{unchangedAll, syncDeliverStatus("v4")})
.then(syncStepClient("v4", putSyncOutputIntoNode)) // ask for certificate
// request for certificate is sent to v2
.expect(requestsBuffer.get("v2").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v2" }))
// v3 wakes up and sends it startus to v4

Check warning on line 162 in specs/quint/specs/blocksync/bsyncWithConsensus.qnt

View workflow job for this annotation

GitHub Actions / Check spelling

"startus" should be "startups".
josef-widder marked this conversation as resolved.
Show resolved Hide resolved
.then(syncUpdateServer("v3"))
.then(all{unchangedAll, syncStatusStep("v3")})
.then(all{unchangedAll, syncDeliverStatus("v4")})
// v4's request to v2 times out...
.then(all{unchangedAll, syncClientTimeOut("v4")})
// after handling the timeout a request for certificate is sent to v3
.expect(requestsBuffer.get("v3").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v3" }))



}
59 changes: 55 additions & 4 deletions specs/quint/specs/blocksync/client.qnt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module bsync_client {
//

/// Initialize the synchronizer
pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient =
pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient =
{
id: id,
peerStatus: peers.mapBy(x => {base:-1, top:-1}),
Expand Down Expand Up @@ -65,8 +65,6 @@ module bsync_client {
req: Option[RequestMsg]
}



/// We have received a certificate. now we need to issue the
/// corresponding block request and generate a certificate output.
pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult =
Expand All @@ -88,7 +86,7 @@ module bsync_client {
/// step of a client:
/// 1. update peer statuses, 2. if there is no open request, request something
/// 3. otherwise check whether we have a response and act accordingly
pure def bsyncClient (s: BsyncClient) : ClientResult =
pure def bsyncClientLogic (s: BsyncClient) : ClientResult =
val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs)
val newS = { ...s, peerStatus: newPeerStates}
if (s.lastSyncedHeight >= s.height)
Expand Down Expand Up @@ -130,6 +128,29 @@ module bsync_client {
so: SONoOutput,
req: None}

/// step of a client upon timeout: We send the same request to a diffenerent server
// TODO: what to do if goodPeers is empty?
pure def bsyncClientTimeout(s: BsyncClient, toMsg: RequestMsg) : ClientResult =
val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs)

val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height
and s.height <= newPeerStates.get(p).top ).
exclude(Set(toMsg.server))
if (goodPeers.size() > 0)
val req = {... toMsg, server: goodPeers.fold("", (acc, i) => i)}
val newS = { ...s, peerStatus: newPeerStates,
openRequests: s.openRequests.exclude(Set(toMsg))
.union(Set(req))}
{ sync: newS,
so: SONoOutput,
req: Some(req)}
else
{ sync: { ...s, peerStatus: newPeerStates,
openRequests: s.openRequests.exclude(Set(toMsg))},
so: SONoOutput,
req: None}


// State machine

var bsyncClients: Address -> BsyncClient
Expand Down Expand Up @@ -160,6 +181,7 @@ module bsync_client {
}
}


// deliver a response message, from responsesBuffer, to node
action deliverResponse(node) = all {
responsesBuffer.get(node).size() > 0,
Expand All @@ -184,4 +206,33 @@ module bsync_client {
responsesBuffer' = responsesBuffer,
}

// Client v takes a step
action clientLogic(v, outputAction) = all {
val result = bsyncClientLogic(bsyncClients.get(v))
all {
// updates the client's state
applyClientUpdate(v, result),
// This is the part that interacts with the consensus.
// It can be the driver or the mocked consensus logic.
outputAction(v, result.so),
}
}


action clientTimeout(node) = all {
bsyncClients.get(node).openRequests.size() > 0,
nondet toMsg = bsyncClients.get(node).openRequests.oneOf()
val result = bsyncClientTimeout(bsyncClients.get(node), toMsg)
all {
bsyncClients' = bsyncClients.put(node, result.sync),
requestsBuffer' = match result.req {
| Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m)))
| None => requestsBuffer
},
},
statusBuffer' = statusBuffer,
responsesBuffer' = responsesBuffer,
}


}
23 changes: 11 additions & 12 deletions specs/quint/specs/blocksync/syncStatemachine.qnt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ module syncStatemachine {
unchangedClient,
}

action syncStepClient(v, out) = all {
clientLogic(v, out),
unchangedServer,
}

action syncClientTimeOut(v) = all {
clientTimeout(v),
unchangedServer,
}

/// any blocksync step independent of consensus. unchange is an action
/// that can be used for composition
action pureSyncStep (v, unchange) = all {
Expand All @@ -93,23 +103,12 @@ module syncStatemachine {
syncDeliverStatus(v),
syncStepServer(v),
syncStatusStep(v),
syncClientTimeOut(v),
},
unchange
}


// Client v takes a step
action syncStepClient(v, outputAction) = all {
val result = bsyncClient(bsyncClients.get(v))
all {
// updates the client's state
applyClientUpdate(v, result),
// This is the part that interacts with the consensus.
// It can be the driver or the mocked consensus logic.
outputAction(v, result.so),
unchangedServer,
}
}



Expand Down
Loading