diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index fa20e4007..2f024d638 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -43,28 +43,41 @@ jobs: with: toolchain: nightly components: llvm-tools-preview + - name: Install cargo-nextest + uses: taiki-e/install-action@cargo-nextest - name: Install cargo-llvm-cov uses: taiki-e/install-action@cargo-llvm-cov - name: Generate code coverage run: | - cargo llvm-cov test \ + cargo llvm-cov nextest \ --workspace \ --exclude malachite-test-mbt \ --ignore-filename-regex crates/cli \ --all-features \ - --jobs 1 \ + --no-capture \ --ignore-run-fail \ --lcov \ --output-path lcov.info - name: Generate text report run: cargo llvm-cov report - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 + if: ${{ !cancelled() }} with: token: ${{ secrets.CODECOV_TOKEN }} files: code/lcov.info flags: integration fail_ci_if_error: false + url: https://codecov.informal.systems + - name: Upload test results to Codecov + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 + with: + token: ${{ secrets.CODECOV_TOKEN }} + file: code/target/nextest/default/junit.xml + flags: integration + fail_ci_if_error: false + url: https://codecov.informal.systems mbt: name: MBT @@ -101,9 +114,10 @@ jobs: - name: Generate text report run: cargo llvm-cov report - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} files: code/lcov.info flags: mbt fail_ci_if_error: false + url: https://codecov.informal.systems diff --git a/README.md b/README.md index 7b22a6b33..d47adacd2 100644 --- a/README.md +++ b/README.md @@ -41,8 +41,8 @@ Unless required by applicable law or agreed to in writing, software distributed [quint-link]: https://github.com/informalsystems/malachite/actions/workflows/quint.yml [mbt-test-image]: https://github.com/informalsystems/malachite/actions/workflows/mbt.yml/badge.svg [mbt-test-link]: https://github.com/informalsystems/malachite/actions/workflows/mbt.yml -[coverage-image]: https://codecov.io/gh/informalsystems/malachite/graph/badge.svg?token=B9KY7B6DJF -[coverage-link]: https://codecov.io/gh/informalsystems/malachite +[coverage-image]: https://codecov.informal.systems/gh/informalsystems/malachite/graph/badge.svg?token=LO0NSEJ9FC +[coverage-link]: https://codecov.informal.systems/gh/informalsystems/malachite [license-image]: https://img.shields.io/badge/license-Apache_2.0-blue.svg [license-link]: https://github.com/informalsystems/hermes/blob/master/LICENSE [rustc-image]: https://img.shields.io/badge/Rust-stable-orange.svg diff --git a/code/.config/nextest.toml b/code/.config/nextest.toml new file mode 100644 index 000000000..a23314383 --- /dev/null +++ b/code/.config/nextest.toml @@ -0,0 +1,6 @@ +[profile.default] +retries = 1 +fail-fast = false + +[profile.default.junit] +path = "junit.xml" diff --git a/code/Cargo.lock b/code/Cargo.lock index 831a6535e..9f54df8ee 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -278,9 +278,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "axum" -version = "0.7.7" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", @@ -557,9 +557,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +checksum = "fb3b4b9e5a7c7514dfa52869339ee98b3156b0bfb4e8a77c4ff4babb64b1604f" dependencies = [ "clap_builder", "clap_derive", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.20" +version = "4.5.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +checksum = "b17a95aa67cc7b5ebd32aa5370189aa0d79069ef1c64ce893bd30fb24bff20ec" dependencies = [ "anstream", "anstyle", @@ -3857,18 +3857,18 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", @@ -3877,9 +3877,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index f33f5b0c2..cba2fbe82 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -6,12 +6,11 @@ use prost::Message; use redb::ReadableTable; use thiserror::Error; -use malachite_blocksync::SyncedBlock; use malachite_common::CommitCertificate; use malachite_consensus::ProposedValue; use malachite_proto::Protobuf; -use crate::codec::{decode_sync_block, encode_proposed_value, encode_synced_block}; +use crate::codec; use crate::proto::{self as proto, Error as ProtoError}; use crate::types::MockContext; use crate::types::{Block, Height, Transaction, Transactions}; @@ -25,27 +24,14 @@ pub struct DecidedBlock { pub certificate: CommitCertificate, } -impl DecidedBlock { - fn into_bytes(self) -> Result, ProtoError> { - let synced_block = SyncedBlock { - certificate: self.certificate.clone(), - block_bytes: self.block.to_bytes().unwrap(), - }; - - let proto = encode_synced_block(synced_block)?; - Ok(proto.encode_to_vec()) - } - - fn from_bytes(bytes: &[u8]) -> Option { - let synced_block = proto::sync::SyncedBlock::decode(bytes).ok()?; - let synced_block = decode_sync_block(synced_block).ok()?; - let block = Block::from_bytes(synced_block.block_bytes.as_ref()).ok()?; +fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { + let proto = proto::sync::CommitCertificate::decode(bytes)?; + codec::decode_certificate(proto) +} - Some(Self { - block, - certificate: synced_block.certificate, - }) - } +fn encode_certificate(certificate: CommitCertificate) -> Result, ProtoError> { + let proto = codec::encode_certificate(certificate)?; + Ok(proto.encode_to_vec()) } #[derive(Debug, Error)] @@ -73,6 +59,8 @@ pub enum StoreError { } const BLOCK_TABLE: redb::TableDefinition> = redb::TableDefinition::new("blocks"); +const CERTIFICATE_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("certificates"); const PROPOSED_VALUE_TABLE: redb::TableDefinition> = redb::TableDefinition::new("proposed_values"); @@ -90,10 +78,22 @@ impl Db { fn get(&self, height: Height) -> Result, StoreError> { let tx = self.db.begin_read()?; - let table = tx.open_table(BLOCK_TABLE)?; - let value = table.get(&height)?; - let block = value.and_then(|value| DecidedBlock::from_bytes(&value.value())); - Ok(block) + let block = { + let table = tx.open_table(BLOCK_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| Block::from_bytes(&value.value()).ok()) + }; + let certificate = { + let table = tx.open_table(CERTIFICATE_TABLE)?; + let value = table.get(&height)?; + value.and_then(|value| decode_certificate(&value.value()).ok()) + }; + + let decided_block = block + .zip(certificate) + .map(|(block, certificate)| DecidedBlock { block, certificate }); + + Ok(decided_block) } fn insert_decided_block(&self, decided_block: DecidedBlock) -> Result<(), StoreError> { @@ -101,16 +101,21 @@ impl Db { let tx = self.db.begin_write()?; { - let mut table = tx.open_table(BLOCK_TABLE)?; - table.insert(height, decided_block.into_bytes()?)?; + let mut blocks = tx.open_table(BLOCK_TABLE)?; + blocks.insert(height, decided_block.block.to_bytes()?.to_vec())?; + } + { + let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; + certificates.insert(height, encode_certificate(decided_block.certificate)?)?; } tx.commit()?; + Ok(()) } fn insert_proposed_value(&self, value: ProposedValue) -> Result<(), StoreError> { let key = (value.height, value.round, value.value); - let value = encode_proposed_value(&value)?; + let value = codec::encode_proposed_value(&value)?; let tx = self.db.begin_write()?; { let mut table = tx.open_table(PROPOSED_VALUE_TABLE)?; @@ -138,10 +143,12 @@ impl Db { fn prune(&self, retain_height: Height) -> Result, StoreError> { let tx = self.db.begin_write().unwrap(); let pruned = { - let mut table = tx.open_table(BLOCK_TABLE)?; - let keys = self.range(&table, ..retain_height)?; + let mut blocks = tx.open_table(BLOCK_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; + let keys = self.range(&blocks, ..retain_height)?; for key in &keys { - table.remove(key)?; + blocks.remove(key)?; + certificates.remove(key)?; } keys }; @@ -166,8 +173,9 @@ impl Db { fn create_tables(&self) -> Result<(), StoreError> { let tx = self.db.begin_write()?; - // `open_table` implicitly creates the tables if needed + // Implicitly creates the tables if they do not exist yet let _ = tx.open_table(BLOCK_TABLE)?; + let _ = tx.open_table(CERTIFICATE_TABLE)?; let _ = tx.open_table(PROPOSED_VALUE_TABLE)?; tx.commit()?; Ok(()) @@ -205,12 +213,10 @@ impl BlockStore { certificate: &CommitCertificate, txes: &[Transaction], ) -> Result<(), StoreError> { - let block_id = certificate.value_id; - let decided_block = DecidedBlock { block: Block { height: certificate.height, - block_hash: block_id, + block_hash: certificate.value_id, transactions: Transactions::new(txes.to_vec()), }, certificate: certificate.clone(), diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index d24b0b099..6c64347e6 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -94,7 +94,7 @@ impl NetworkCodec> for ProtobufCodec { Ok(blocksync::Response { height: Height::new(response.block_number, response.fork_id), - block: response.block.map(decode_sync_block).transpose()?, + block: response.block.map(decode_synced_block).transpose()?, }) } @@ -184,6 +184,30 @@ where } } +pub(crate) fn encode_synced_block( + synced_block: blocksync::SyncedBlock, +) -> Result { + Ok(proto::sync::SyncedBlock { + block_bytes: synced_block.block_bytes, + certificate: Some(encode_certificate(synced_block.certificate)?), + }) +} + +pub(crate) fn decode_synced_block( + proto: proto::sync::SyncedBlock, +) -> Result, ProtoError> { + let Some(certificate) = proto.certificate else { + return Err(ProtoError::missing_field::( + "certificate", + )); + }; + + Ok(blocksync::SyncedBlock { + block_bytes: proto.block_bytes, + certificate: decode_certificate(certificate)?, + }) +} + pub(crate) fn encode_aggregate_signature( aggregated_signature: AggregatedSignature, ) -> Result { @@ -205,29 +229,6 @@ pub(crate) fn encode_aggregate_signature( Ok(proto::sync::AggregatedSignature { signatures }) } -pub(crate) fn encode_certificate( - certificate: CommitCertificate, -) -> Result { - Ok(proto::sync::CommitCertificate { - fork_id: certificate.height.fork_id, - block_number: certificate.height.block_number, - round: certificate.round.as_u32().expect("round should not be nil"), - block_hash: Some(certificate.value_id.to_proto()?), - aggregated_signature: Some(encode_aggregate_signature( - certificate.aggregated_signature, - )?), - }) -} - -pub(crate) fn encode_synced_block( - synced_block: blocksync::SyncedBlock, -) -> Result { - Ok(proto::sync::SyncedBlock { - block_bytes: synced_block.block_bytes, - certificate: Some(encode_certificate(synced_block.certificate)?), - }) -} - pub(crate) fn decode_aggregated_signature( signature: proto::sync::AggregatedSignature, ) -> Result, ProtoError> { @@ -283,6 +284,20 @@ pub(crate) fn decode_extension( Ok(SignedExtension::new(extension, signature)) } +pub(crate) fn encode_certificate( + certificate: CommitCertificate, +) -> Result { + Ok(proto::sync::CommitCertificate { + fork_id: certificate.height.fork_id, + block_number: certificate.height.block_number, + round: certificate.round.as_u32().expect("round should not be nil"), + block_hash: Some(certificate.value_id.to_proto()?), + aggregated_signature: Some(encode_aggregate_signature( + certificate.aggregated_signature, + )?), + }) +} + pub(crate) fn decode_certificate( certificate: proto::sync::CommitCertificate, ) -> Result, ProtoError> { @@ -312,23 +327,6 @@ pub(crate) fn decode_certificate( Ok(certificate) } -pub(crate) fn decode_sync_block( - synced_block: proto::sync::SyncedBlock, -) -> Result, ProtoError> { - let certificate = if let Some(certificate) = synced_block.certificate { - certificate - } else { - return Err(ProtoError::missing_field::( - "certificate", - )); - }; - - Ok(blocksync::SyncedBlock { - block_bytes: synced_block.block_bytes, - certificate: decode_certificate(certificate)?, - }) -} - pub(crate) fn encode_proposed_value( value: &ProposedValue, ) -> Result, ProtoError> { diff --git a/code/crates/starknet/p2p-proto/src/lib.rs b/code/crates/starknet/p2p-proto/src/lib.rs index 0f7cb520f..ea999161a 100644 --- a/code/crates/starknet/p2p-proto/src/lib.rs +++ b/code/crates/starknet/p2p-proto/src/lib.rs @@ -3,5 +3,5 @@ include!(concat!(env!("OUT_DIR"), "/p2p.rs")); pub mod sync { - include!(concat!(env!("OUT_DIR"), "/blocksync.rs")); + include!(concat!(env!("OUT_DIR"), "/sync.rs")); } diff --git a/specs/quint/specs/blocksync/blocksync.qnt b/specs/quint/specs/blocksync/blocksync.qnt index 0f4bae6ab..9b236e296 100644 --- a/specs/quint/specs/blocksync/blocksync.qnt +++ b/specs/quint/specs/blocksync/blocksync.qnt @@ -1,4 +1,8 @@ // -*- mode: Bluespec; -*- +// +// General definitions for blocksync protocol and state machine for +// the client-server communication, namely, the network. +// module blocksync { @@ -23,6 +27,7 @@ module blocksync { type ReqType = | SyncCertificate | SyncBlock + | SyncBlockStoreEntry type RequestMsg = { client: Address, @@ -41,6 +46,7 @@ module blocksync { type Response = | RespBlock(Proposal) | RespCertificate(Set[Vote]) + | RespBlockStoreEntry(BlockStoreEntry) type ResponseMsg = { client: Address, diff --git a/specs/quint/specs/blocksync/blocksyncClient.qnt b/specs/quint/specs/blocksync/blocksyncClient.qnt new file mode 100644 index 000000000..5485aba3f --- /dev/null +++ b/specs/quint/specs/blocksync/blocksyncClient.qnt @@ -0,0 +1,253 @@ +// -*- mode: Bluespec; -*- +// +// Blocksync protocol: client side that requests and downloads decisions. +// + +module blocksyncClient { + + import blocksync.* from "./blocksync" + + /// The state of the synchronizer + type BsyncClient = { + id: Address, + + peerStatus: Address -> BlockRange, + openRequests: Set[RequestMsg], + + height: Height, + lastSyncedHeight: Height, // "done" if greater than or equal to height + // TODO: we could add buffers for certificates and values + // inbuffers + statusMsgs: Set[StatusMsg], + responseMsgs: Set[ResponseMsg], + } + + type BsyncClientOutput = + | SOCertificate(Set[Vote]) + | SOBlock(Proposal) + | SOBlockStoreEntry(BlockStoreEntry) + | SONoOutput + + // + // BsyncClient functions + // + + /// Initialize the synchronizer + pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient = + { + id: id, + peerStatus: peers.mapBy(x => {base:-1, top:-1}), + openRequests: Set(), + height: -1, + lastSyncedHeight: -1, + statusMsgs: Set(), + responseMsgs: Set(), + } + + /// Auxiliary function to iterate over the received status messages + pure def updatePeerStatus (ps: Address -> BlockRange, msgs: Set[StatusMsg]) : Address -> BlockRange = + msgs.fold(ps, (newStatus , msg) => + if (newStatus.get(msg.peer).top < msg.top) // TODO: think about base? + newStatus.put(msg.peer, {base: msg.base, top: msg.top}) + else + newStatus + ) + + /// inform the synchronizer that consensus has entered height h + pure def syncNewHeight (s: BsyncClient, h: Height) : BsyncClient = + if (h <= s.height) + s + else + s.with("height", h) + + /// returned by the synchronizer: sync is the new state, so is the output towards + /// the consensus driver, req are messages sent towards peers/servers + type ClientResult = { + sync: BsyncClient, + so: BsyncClientOutput, + req: Option[RequestMsg] + } + + /// We have received a certificate. now we need to issue the + /// corresponding block request and generate a certificate output. + pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult = + val blockReq = { client: s.id, + server: peer, + rtype: SyncBlock, + height: s.height} + { sync: {...s, openRequests: Set(blockReq)}, // If we have parallelization we need to be more precise here + so: SOCertificate(cert), + req: Some(blockReq)} + + /// we have received a block. now we need to generate a block output + pure def syncHandleBlock (s: BsyncClient, p: Proposal) : ClientResult = + { sync: {...s, openRequests: Set(), // If we have parallelization we need to remove one element, + lastSyncedHeight: s.height }, // blockheight, + so: SOBlock(p), + req: None} + + /// we have received a blockstore entry (block and certificate). + /// now we need to generate a block output + pure def syncHandleBlockStoreEntry (s: BsyncClient, b: BlockStoreEntry) : ClientResult = + { sync: {...s, openRequests: Set(), // If we have parallelization we need to remove one element, + lastSyncedHeight: s.height }, // blockheight, + so: SOBlockStoreEntry(b), + req: None} + + /// step of a client: + /// 1. update peer statuses, 2. if there is no open request, request something + /// 3. otherwise check whether we have a response and act accordingly + pure def bsyncClientLogic (s: BsyncClient, fullEntries: bool) : ClientResult = + val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs) + val newS = { ...s, peerStatus: newPeerStates} + if (s.lastSyncedHeight >= s.height) + // nothing to do + { sync: newS, + so: SONoOutput, + req: None} + else + val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height + and s.height <= newPeerStates.get(p).top ) + if (goodPeers.size() > 0) + if (s.openRequests.size() == 0) + // we start the sync "round" by asking for a certificate + val req = { client: s.id, + server: goodPeers.fold("", (acc, i) => i), //chooseSome(), + rtype: if (fullEntries) + SyncBlockStoreEntry + else + SyncCertificate, + height: s.height} + { sync: {... newS, openRequests: s.openRequests.union(Set(req))}, + so: SONoOutput, + req: Some(req) + } + else + // we issued a request before, let's see whether there is a response + if (s.responseMsgs.size()> 0) + val resp = s.responseMsgs.fold(emptyResponseMsg, (acc, i) => i) //chooseSome() // in the future there might be parallelization + val ns = {... newS, responseMsgs: newS.responseMsgs.exclude(Set(resp))} + match resp.response { + | RespBlock(prop) => syncHandleBlock(newS, prop) + | RespCertificate(cert) => syncHandleCertificate(newS, cert, goodPeers.fold("", (s,x) => x)) + | RespBlockStoreEntry(b) => syncHandleBlockStoreEntry(ns, b) + } + else + // I don't have response + // this might be timeout logic + { sync: newS, + so: SONoOutput, + req: None} + else + // no peers + { sync: newS, + so: SONoOutput, + req: None} + + /// step of a client upon timeout: We send the same request to a different server + // FIXME: what to do if goodPeers is empty? + pure def bsyncClientTimeout(s: BsyncClient, toMsg: RequestMsg) : ClientResult = + val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs) + + val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height + and s.height <= newPeerStates.get(p).top ). + exclude(Set(toMsg.server)) + if (goodPeers.size() > 0) + val req = {... toMsg, server: goodPeers.fold("", (acc, i) => i)} + val newS = { ...s, peerStatus: newPeerStates, + openRequests: s.openRequests.exclude(Set(toMsg)) + .union(Set(req))} + { sync: newS, + so: SONoOutput, + req: Some(req)} + else + { sync: { ...s, peerStatus: newPeerStates, + openRequests: s.openRequests.exclude(Set(toMsg))}, + so: SONoOutput, + req: None} + + + // State machine + + var bsyncClients: Address -> BsyncClient + + action initClient(nodes) = all { + bsyncClients' = nodes.mapBy(v => initBsyncClient(v, nodes.exclude(Set(v)))), + } + + action unchangedClient = all { + bsyncClients' = bsyncClients, + } + + action newHeightClient(node, h) = all { + bsyncClients' = bsyncClients.put(node, syncNewHeight(bsyncClients.get(node), h)), + } + + // deliver a status message, from the statusBuffer, to node + action deliverStatus(node) = all { + statusBuffer.get(node).size() > 0, + val client = bsyncClients.get(node) + nondet msg = statusBuffer.get(node).oneOf() + all { + bsyncClients' = bsyncClients.put(node, {... client, + statusMsgs: client.statusMsgs.union(Set(msg))}), + statusBuffer' = statusBuffer.put(node, statusBuffer.get(node).exclude(Set(msg))), + requestsBuffer' = requestsBuffer, + responsesBuffer' = responsesBuffer, + } + } + + + // deliver a response message, from responsesBuffer, to node + action deliverResponse(node) = all { + responsesBuffer.get(node).size() > 0, + val client = bsyncClients.get(node) + nondet msg = responsesBuffer.get(node).oneOf() + all { + bsyncClients' = bsyncClients.put(node, {... client, + responseMsgs: client.responseMsgs.union(Set(msg))}), + requestsBuffer' = requestsBuffer, + responsesBuffer' = responsesBuffer.put(node, responsesBuffer.get(node).exclude(Set(msg))), + statusBuffer' = statusBuffer, + } + } + + action applyClientUpdate(v, result) = all { + bsyncClients' = bsyncClients.put(v, result.sync), + statusBuffer' = statusBuffer, + requestsBuffer' = match result.req { + | Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m))) + | None => requestsBuffer + }, + responsesBuffer' = responsesBuffer, + } + + // Client v takes a step + action clientLogic(v, outputAction, full) = all { + val result = bsyncClientLogic(bsyncClients.get(v), full) + all { + // updates the client's state + applyClientUpdate(v, result), + // This is the part that interacts with the consensus. + // It can be the driver or the mocked consensus logic. + outputAction(v, result.so), + } + } + + + action clientTimeout(node) = all { + bsyncClients.get(node).openRequests.size() > 0, + nondet toMsg = bsyncClients.get(node).openRequests.oneOf() + val result = bsyncClientTimeout(bsyncClients.get(node), toMsg) + all { + bsyncClients' = bsyncClients.put(node, result.sync), + requestsBuffer' = match result.req { + | Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m))) + | None => requestsBuffer + }, + }, + statusBuffer' = statusBuffer, + responsesBuffer' = responsesBuffer, + } + +} diff --git a/specs/quint/specs/blocksync/blocksyncServer.qnt b/specs/quint/specs/blocksync/blocksyncServer.qnt new file mode 100644 index 000000000..d82756cf3 --- /dev/null +++ b/specs/quint/specs/blocksync/blocksyncServer.qnt @@ -0,0 +1,133 @@ +// -*- mode: Bluespec; -*- +// +// Blocksync protocol: server side, replies to client requests. +// + +module blocksyncServer { + + import blocksync.* from "./blocksync" + + type Server = { + id: Address, + + chain: List[BlockStoreEntry], + + // Incoming requests + requestMsgs: Set[RequestMsg] + } + + pure def newServer(addr: Address) : Server = { + id: addr, + chain: List(), + requestMsgs: Set() + } + + // generate a status message + pure def syncStatus (server: Server) : StatusMsg = + // TODO: perhaps we should add to height to the chain entries to capture non-zero bases + { peer: server.id , base: 0, top: server.chain.length() - 1 } + + /// new server state and response messages to be sent + type ServerOutput = { + sync: Server, + msg: Option[ResponseMsg], + } + + // main method: respond to incoming request, if any + pure def syncServer (s: Server) : ServerOutput = + if (s.requestMsgs.size() > 0) + val m = s.requestMsgs.fold(emptyReqMsg, (acc, i) => i) // chooseSome() // TODO: fix + val result = + if (m.height < s.chain.length()) + match m.rtype { + | SyncCertificate => + val cm = { client: m.client, + server: s.id, + height: m.height, + response: RespCertificate(s.chain[m.height].commit)} + Some(cm) + | SyncBlock => + val bl = { client: m.client, + server: s.id, + height: m.height, + response: RespBlock(s.chain[m.height].decision)} + Some(bl) + | SyncBlockStoreEntry => + val bs = { client: m.client, + server: s.id, + height: m.height, + response: RespBlockStoreEntry(s.chain[m.height])} + Some(bs) + } + else None + { sync: { ...s, requestMsgs: s.requestMsgs.exclude(Set(m))}, + msg: result} + else { + sync: s, + msg: None} + + // State machine + + var bsyncServers: Address -> Server + + action initServer(nodes) = all { + bsyncServers' = nodes.mapBy(v => newServer(v)), + } + + action unchangedServer = all { + bsyncServers' = bsyncServers, + } + + // Deliver a request message, from requestsBuffer, to node + action deliverRequest(node) = all { + requestsBuffer.get(node).size() > 0, + val server = bsyncServers.get(node) + nondet msg = requestsBuffer.get(node).oneOf() + all { + bsyncServers' = bsyncServers.put(node, {... server, + requestMsgs: server.requestMsgs.union(Set(msg))}), + statusBuffer' = statusBuffer, + requestsBuffer' = requestsBuffer.put(node, requestsBuffer.get(node).exclude(Set(msg))), + responsesBuffer' = responsesBuffer, + } + } + + // Server at node broadcasts its blockchain status + action broadcastStatus(node) = all { + val server = bsyncServers.get(node) + val msg = server.syncStatus() + all { + bsyncServers' = bsyncServers.put(node, server), + statusBuffer' = broadcastStatusMsg(statusBuffer, msg), + requestsBuffer' = requestsBuffer, + responsesBuffer' = responsesBuffer, + } + } + + // Server at node takes a step (checking for requests and responding) + action stepServer(node) = all { + val server = bsyncServers.get(node) + val result = server.syncServer() + all { + bsyncServers' = bsyncServers.put(node, server), + statusBuffer' = statusBuffer, + requestsBuffer' = requestsBuffer, + responsesBuffer' = match result.msg { + | Some(m) => responsesBuffer.sendResponse(m) + | None => responsesBuffer + }, + } + } + + // Updates the server status, the latest available blockchain content. + // This action must be called by the component that knows the chain. + action updateServer(node, chain) = all { + val server = bsyncServers.get(node) + all { + chain != server.chain, + bsyncServers' = bsyncServers.put(node, { ...server, chain: chain }), + unchangedBsync, + } + } + +} diff --git a/specs/quint/specs/blocksync/bsyncStatemachine.qnt b/specs/quint/specs/blocksync/bsyncStatemachine.qnt new file mode 100644 index 000000000..8f3fe284f --- /dev/null +++ b/specs/quint/specs/blocksync/bsyncStatemachine.qnt @@ -0,0 +1,118 @@ +// -*- mode: Bluespec; -*- +// +// State machine for the block sync protocol. +// + +module bsyncStatemachine { + + import blocksync.* from "./blocksync" + import blocksyncClient.* from "./blocksyncClient" + import blocksyncServer.* from "./blocksyncServer" + + export blocksync.* + export blocksyncClient.* + export blocksyncServer.* + + // **************************************************************************** + // State machine + // **************************************************************************** + // + // The statemachine is put on top of statemachineAsync, that is, we use its + // initialization and steps, and add the updates to the variables defined below + // + + /// initializing the variables of the sync part of the state machine + action syncInit(validators) = all { + initClient(validators), + initServer(validators), + initBsync(validators), + } + + action syncUnchangedAll = all { + unchangedServer, + unchangedClient, + unchangedBsync, + } + + // + // Actions for the Environment to send a node to a new height + // + + /// environment sends the node to the next height. + /// initializes client + action newHeightActionSync(v, valset, h) = all { + newHeightClient(v, h), + unchangedBsync, + unchangedServer, + } + + // + // Actions for process steps in the sync protocol + // + + // Server v announces its status + action syncStatusStep(v) = all { + all { + broadcastStatus(v), + unchangedClient, + } + } + + // Server v takes a step (checking for requests and responding) + action syncStepServer(v) = all { + all { + stepServer(v), + unchangedClient, + } + } + + // + // Actions for the environment to deliver messages in the sync protocol + // Implemented by the blocksync server or client. + // + + /// Deliver a status message to client v + action syncDeliverStatus(v) = all { + deliverStatus(v), + unchangedServer, + } + + /// Deliver a response to client v + action syncDeliverResp(v) = all { + deliverResponse(v), + unchangedServer, + } + + /// Deliver a request to server v + action syncDeliverReq(v) = all { + deliverRequest(v), + unchangedClient, + } + + /// Client v runs a protocol step + action syncStepClient(v, out, full) = all { + clientLogic(v, out, full), + unchangedServer, + } + + /// A client v's request times out + action syncClientTimeout(v) = all { + clientTimeout(v), + unchangedServer, + } + + /// any blocksync step independent of consensus. + /// unchange is an action that can be used for composition + action pureSyncStep (v, unchange) = all { + any { + syncDeliverReq(v), + syncDeliverResp(v), + syncDeliverStatus(v), + syncStepServer(v), + syncStatusStep(v), + syncClientTimeout(v), + }, + unchange + } + +} diff --git a/specs/quint/specs/blocksync/bsyncWithConsensus.qnt b/specs/quint/specs/blocksync/bsyncWithConsensus.qnt new file mode 100644 index 000000000..1ff79e3c7 --- /dev/null +++ b/specs/quint/specs/blocksync/bsyncWithConsensus.qnt @@ -0,0 +1,172 @@ +// -*- mode: Bluespec; -*- +// +// Blocksync executable model running together with the consensus logic. +// + +module bsyncWithConsensus { + + import bsyncStatemachine.* from "./bsyncStatemachine" + + /// Consensus state machine setup + import statemachineAsync( + validators = Set("v1", "v2", "v3", "v4"), + validatorSet = Set("v1", "v2", "v3", "v4").mapBy(x => 1), + Faulty = Set("v1"), + Values = Set("red", "blue"), + Rounds = Set(0, 1, 2, 3), + Heights = Set(0) // , 1, 2, 3) + ).* from "../statemachineAsync" + + /// initialize consensus and block sync + action initAll = all { + init, + syncInit(validators) + } + + /// environment sends the node to the next height. + action newHeightActionAll(v, valset, h) = all { + system.get(v).es.chain.length() == h, // precondition for calling this + newHeightActionSync(v, valset, h), + newHeightAction(v, valset, h), + } + + // Update server v from the consensus' blockchain + action syncUpdateServer(v) = all { + val chain = system.get(v).es.chain + all { + updateServer(v, chain), + unchangedClient, + unchangedAll, + } + } + + // validator step in the consensus protocol, no changes to block sync + action syncValStep(v) = all { + valStep(v), + syncUnchangedAll + } + + + + /// main step function: either a consensus state-machine step or a block sync protocol step + action stepWithBlockSync = any { + // consensus takes a step + all { + step, + syncUnchangedAll + }, + // block sync takes a step + nondet v = oneOf(Correct) + any { + pureSyncStep(v, unchangedAll), + // Block sync client check if there are responses to apply to consensus + syncStepClient(v, putSyncOutputIntoNode, false), + syncStepClient(v, putSyncOutputIntoNode, true), + syncUpdateServer(v), + }, + // a node goes to next height + nondet v = oneOf(Correct) + newHeightActionAll(v, validatorSet, system.get(v).es.chain.length()) + } + + // + // Interesting scenarios + // + + /// auxiliary function for initHeight + /// sets the chain + pure def setChain(s: DriverState, c: List[{decision: Proposal, commit: Set[Vote]}]): DriverState = + {... s, chain: c} + + /// auxiliary function for initHeight + /// constructs a commit certificate for a height and value + pure def commitSet (h: Height, v: Value) : Set[Vote] = + Set("v1", "v2", "v3").map(n => mkVote(Precommit, n, h, 0, v)) + + /// An action to set up an initial state with some nodes already decided up to height h + /// this sets up an initial state where v4 starts late, and v2 and v3 have reached + /// height h + action initHeight(h) = all { + val special = "v4" // TODO proper selection from correct set + val initsystem = Correct.mapBy(v => + // hack + if (v == special) initNode(v, validatorSet, 0) + else initNode(v, validatorSet, h)) + nondet decisionValMap = 0.to(h-1).setOfMaps(Values).oneOf() + val propMap = decisionValMap.keys().mapBy(i => + mkProposal( proposer(validatorSet, i,0), + i, + 0, + decisionValMap.get(i), + 0)) + val list = range(0, h).foldl(List(), (acc, i) => acc.append(propMap.get(i))) + val chain = list.foldl(List(), (acc, i) => acc.append({decision: i, commit: commitSet(i.height, Val(i.proposal))})) + all { + system' = initsystem.keys().mapBy(x => + // hack + if (x == special) initsystem.get(x) + else {... initsystem.get(x), es: setChain(initsystem.get(x).es, chain) }), + propBuffer' = Correct.mapBy(v => Set()), + voteBuffer' = Correct.mapBy(v => Set()), + certBuffer' = Correct.mapBy(v => Set()), + _hist' = { validator: "INIT", input: NoDInput, output: NoConsensusOutput }, + syncInit(validators) + } + } + + action syncStep = + nondet v = oneOf(Correct) + pureSyncStep(v, unchangedAll) + + /// a simple scenario where v4 starts height h + run syncCycle(h) = + newHeightActionAll("v4", validatorSet, h) + .then(all{unchangedAll, syncStatusStep("v2")}) + .then(all{unchangedAll, syncDeliverStatus("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode, false)) // ask for certificate + .then(all{unchangedAll, syncDeliverReq("v2")}) + .then(all{unchangedAll, syncStepServer("v2")}) + .then(all{unchangedAll, syncDeliverResp("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode, false)) // ask for block and give certificate to node + .expect(system.get("v4").incomingSyncCertificates.size() > 0) + .then(all{unchangedAll, syncDeliverReq("v2")}) + .then(all{unchangedAll, syncStepServer("v2")}) + .then(all{unchangedAll, syncDeliverResp("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode, false)) + .expect(system.get("v4").incomingSyncProposals.size() > 0) + .then(3.reps(_ => syncValStep("v4"))) + .expect(system.get("v4").es.chain.length() > h) + + run parisRetreat = + nondet heightToReach = 1.to(4).oneOf() + initHeight( q::debug ("Height:", heightToReach)) + // FIXME: I had to put it here, instead of syncCycle(h) + // This is not ideal, but it works. + .then(syncUpdateServer("v2")) + .then(heightToReach.reps(i => syncCycle(i))) + .expect(system.get("v4").es.chain == system.get("v2").es.chain) + .then(newHeightActionAll("v4", validatorSet, heightToReach)) + .expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height) + // and now v4 has synced ! + + run lausanneRetreat = + initHeight(2) + // FIXME: I had to put it here, instead of syncCycle(h) + // This is not ideal, but it works. + .then(syncUpdateServer("v2")) + .then(newHeightActionAll("v4", validatorSet, 0)) + .then(all{unchangedAll, syncStatusStep("v2")}) + .then(all{unchangedAll, syncDeliverStatus("v4")}) + .then(syncStepClient("v4", putSyncOutputIntoNode, false)) // ask for certificate + // request for certificate is sent to v2 + .expect(requestsBuffer.get("v2").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v2" })) + // v3 wakes up and sends it status to v4 + .then(syncUpdateServer("v3")) + .then(all{unchangedAll, syncStatusStep("v3")}) + .then(all{unchangedAll, syncDeliverStatus("v4")}) + // v4's request to v2 times out... + .then(all{unchangedAll, syncClientTimeout("v4")}) + // after handling the timeout a request for certificate is sent to v3 + .expect(requestsBuffer.get("v3").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v3" })) + +} diff --git a/specs/quint/specs/blocksync/bsyncWithMockedConsensus.qnt b/specs/quint/specs/blocksync/bsyncWithMockedConsensus.qnt new file mode 100644 index 000000000..ce6cb8f74 --- /dev/null +++ b/specs/quint/specs/blocksync/bsyncWithMockedConsensus.qnt @@ -0,0 +1,127 @@ +// -*- mode: Bluespec; -*- +// +// Blocksync executable model running with a mocked consensus logic. +// + +module bsyncWithMockedConsensus { + + import bsyncStatemachine.* from "./bsyncStatemachine" + + val validators = Set("v1", "v2", "v3", "v4") + val Correct = Set("v2", "v3", "v4") + val validatorSet = Set("v1", "v2", "v3", "v4").mapBy(x => 1) + + // State machine + + var outputs: Address -> List[BsyncClientOutput] + var chains: Address -> List[BlockStoreEntry] + + /// initialize consensus and synchronizer + action initMockedConsensus = all { + outputs' = validators.mapBy(_ => []), + chains' = Correct.mapBy(_ => []), + syncInit(validators) + } + + action unchangedMock = all { + outputs' = outputs, + chains' = chains, + } + + /// Consensus mocked logic decides the latest height. + /// The blockchain increases in size. + /// The server becomes aware of that once `updateServer` is invoked. + /// The client becomes aware of that once `newHeightAction` is invoked. + /// This is an abstract version. We just add a (mocked) block with the right + /// height on top of the chain. Because of this, implicitly all nodes will + /// decide on the same block + action decideMock(v) = all { + bsyncClients.get(v).height == chains.get(v).length(), + outputs' = outputs, + chains' = chains.put(v, chains.get(v).append({ + decision: mkProposal("", chains.get(v).length(), 0, "", -1), + commit: Set(), + })), + syncUnchangedAll, + } + + /// Environment sends the node to the next height. + /// This implicitly requires `decideMock(v)` to be previous executed. + action newHeightActionAll(v, valset, h) = all { + //chains.get(v).length() == h, // precondition for calling this + newHeightActionSync(v, valset, h), + unchangedMock, + } + + /// Update server v from the consensus' blockchain + /// This abstracts as pull-based mechanism: the server consults the chain state. + action syncUpdateServer(v) = all { + all { + updateServer(v, chains.get(v)), + unchangedClient, + unchangedMock, + } + } + + action writeAction(v, so) = all { + val updateOutput = outputs.get(v).append(so) + outputs' = outputs.put(v, updateOutput), + chains' = chains, + } + + action mockStep(v, act) = all { + act(v), + unchangedMock + } + + /// For simple repl evaluation + action bsyncStep (v, act) = all { + act(v), + unchangedMock + } + + /// main step function: either a consensus state-machine step or a sync protocol step + action stepMockedConsensus = + nondet v = oneOf(Correct) + any { + pureSyncStep(v, unchangedMock), + // consensus-specific steps + syncStepClient(v, writeAction, false), + syncUpdateServer(v), + newHeightActionAll(v, validators, chains.get(v).length()), + decideMock(v), + } + + + /// An action to set up an initial state with some nodes already decided up to height h + /// this sets up an initial state where v4 starts late, and v2 and v3 have reached + /// height h + action initHeight(h) = all { + val special = "v4" // TODO proper selection from correct set + chains' = Correct.mapBy(v => range(0, h).foldl(List(), (acc, i) => acc.append( + { decision: mkProposal( "", i, 0, "", 0), + commit: Set() } )) + ), + outputs' = validators.mapBy(_ => []), + syncInit(validators) + } + + run lausanneRetreat = + initHeight(2) + .then(syncUpdateServer("v2")) + .then(newHeightActionAll("v4", validatorSet, 0)) + .then(all{unchangedMock, syncStatusStep("v2")}) + .then(all{unchangedMock, syncDeliverStatus("v4")}) + .then(syncStepClient("v4", writeAction, false)) // ask for certificate + // request for certificate is sent to v2 + .expect(requestsBuffer.get("v2").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v2" })) + // v3 wakes up and sends it status to v4 + .then(syncUpdateServer("v3")) + .then(all{unchangedMock, syncStatusStep("v3")}) + .then(all{unchangedMock, syncDeliverStatus("v4")}) + // v4's request to v2 times out... + .then(all{unchangedMock, syncClientTimeout("v4")}) + // after handling the timeout a request for certificate is sent to v3 + .expect(requestsBuffer.get("v3").contains({ client: "v4", height: 0, rtype: SyncCertificate, server: "v3" })) + +} diff --git a/specs/quint/specs/blocksync/client.qnt b/specs/quint/specs/blocksync/client.qnt deleted file mode 100644 index 84215d511..000000000 --- a/specs/quint/specs/blocksync/client.qnt +++ /dev/null @@ -1,175 +0,0 @@ -// -*- mode: Bluespec; -*- -// -// Blocksync protocol: client side. - -module bsync_client { - import blocksync.* from "./blocksync" - - /// The state of the synchronizer - type BsyncClient = { - id: Address, - - peerStatus: Address -> BlockRange, - openRequests: Set[RequestMsg], - - height: Height, - lastSyncedHeight: Height, // "done" if greater than or equal to height - // TODO: we could add buffers for certificates and values - // inbuffers - statusMsgs: Set[StatusMsg], - responseMsgs: Set[ResponseMsg], - } - - type BsyncClientOutput = - | SOCertificate(Set[Vote]) - | SOBlock(Proposal) - | SONoOutput - - // - // BsyncClient functions - // - - /// Initialize the synchronizer - pure def initBsyncClient(id: Address, peers: Set[Address]) : BsyncClient = - { - id: id, - peerStatus: peers.mapBy(x => {base:-1, top:-1}), - openRequests: Set(), - height: -1, - lastSyncedHeight: -1, - statusMsgs: Set(), - responseMsgs: Set(), - } - - /// Auxiliary function to iterate over the received status messages - pure def updatePeerStatus (ps: Address -> BlockRange, msgs: Set[StatusMsg]) : Address -> BlockRange = - msgs.fold(ps, (newStatus , msg) => - if (newStatus.get(msg.peer).top < msg.top) // TODO: think about base? - newStatus.put(msg.peer, {base: msg.base, top: msg.top}) - else - newStatus - ) - - /// inform the synchronizer that consensus has entered height h - pure def syncNewHeight (s: BsyncClient, h: Height) : BsyncClient = - if (h <= s.height) - s - else - s.with("height", h) - - /// returned by the synchronizer: sync is the new state, so is the output towards - /// the consensus driver, req are messages sent towards peers/servers - type ClientResult = { - sync: BsyncClient, - so: BsyncClientOutput, - req: Option[RequestMsg] - } - - /// We have received a certificate. now we need to issue the - /// corresponding block request and generate a certificate output. - pure def syncHandleCertificate (s: BsyncClient, cert: Set[Vote], peer: str ) : ClientResult = - val blockReq = { client: s.id, - server: peer, - rtype: SyncBlock, - height: s.height} - { sync: {...s, openRequests: Set(blockReq)}, // If we have parallelization we need to be more precise here - so: SOCertificate(cert), - req: Some(blockReq)} - - /// we have received a block. now we need to generate a block output - pure def syncHandleBlock (s: BsyncClient, p: Proposal) : ClientResult = - { sync: {...s, openRequests: Set(), // If we have parallelization we need to remove one element, - lastSyncedHeight: s.height }, // blockheight, - so: SOBlock(p), - req: None} - - /// step of a client: - /// 1. update peer statuses, 2. if there is no open request, request something - /// 3. otherwise check whether we have a response and act accordingly - pure def bsyncClient (s: BsyncClient) : ClientResult = - val newPeerStates = updatePeerStatus(s.peerStatus, s.statusMsgs) - val newS = { ...s, peerStatus: newPeerStates} - if (s.lastSyncedHeight >= s.height) - // nothing to do - { sync: newS, - so: SONoOutput, - req: None} - else - val goodPeers = s.peerStatus.keys().filter(p => newPeerStates.get(p).base <= s.height - and s.height <= newPeerStates.get(p).top ) - if (goodPeers.size() > 0) - if (s.openRequests.size() == 0) - // we start the sync "round" by asking for a certificate - val req = { client: s.id, - server: goodPeers.fold("", (acc, i) => i), //chooseSome(), - rtype: SyncCertificate, - height: s.height} - { sync: {... newS, openRequests: s.openRequests.union(Set(req))}, - so: SONoOutput, - req: Some(req) - } - else - // we issued a request before, let's see whether there is a response - if (s.responseMsgs.size()> 0) - val resp = s.responseMsgs.fold(emptyResponseMsg, (acc, i) => i) //chooseSome() // in the future there might be parallelization - match resp.response { - | RespBlock(prop) => syncHandleBlock(newS, prop) - | RespCertificate(cert) => syncHandleCertificate(newS, cert, goodPeers.fold("", (s,x) => x)) - } - else - // I don't have response - // this might be timeout logic - { sync: newS, - so: SONoOutput, - req: None} - else - // no peers - { sync: newS, - so: SONoOutput, - req: None} - - // State machine - - var bsyncClients: Address -> BsyncClient - - action initClient(nodes) = all { - bsyncClients' = nodes.mapBy(v => initBsyncClient(v, nodes.exclude(Set(v)))), - } - - action unchangedClient = all { - bsyncClients' = bsyncClients, - } - - action newHeightClient(node, h) = all { - bsyncClients' = bsyncClients.put(node, syncNewHeight(bsyncClients.get(node), h)), - } - - // deliver a status message, from the statusBuffer, to node - action deliverStatus(node) = all { - statusBuffer.get(node).size() > 0, - val client = bsyncClients.get(node) - nondet msg = statusBuffer.get(node).oneOf() - all { - bsyncClients' = bsyncClients.put(node, {... client, - statusMsgs: client.statusMsgs.union(Set(msg))}), - statusBuffer' = statusBuffer.put(node, statusBuffer.get(node).exclude(Set(msg))), - requestsBuffer' = requestsBuffer, - responsesBuffer' = responsesBuffer, - } - } - - // deliver a response message, from responsesBuffer, to node - action deliverResponse(node) = all { - responsesBuffer.get(node).size() > 0, - val client = bsyncClients.get(node) - nondet msg = responsesBuffer.get(node).oneOf() - all { - bsyncClients' = bsyncClients.put(node, {... client, - responseMsgs: client.responseMsgs.union(Set(msg))}), - requestsBuffer' = requestsBuffer, - responsesBuffer' = responsesBuffer.put(node, responsesBuffer.get(node).exclude(Set(msg))), - statusBuffer' = statusBuffer, - } - } - -} diff --git a/specs/quint/specs/blocksync/server.qnt b/specs/quint/specs/blocksync/server.qnt deleted file mode 100644 index 5d6c359da..000000000 --- a/specs/quint/specs/blocksync/server.qnt +++ /dev/null @@ -1,125 +0,0 @@ -// -*- mode: Bluespec; -*- -// -// Blocksync protocol: server side. - -module bsync_server { - import blocksync.* from "./blocksync" - - type Server = { - id: Address, - - chain: List[BlockStoreEntry], - - // Incoming requests - requestMsgs: Set[RequestMsg] - } - - pure def newServer(addr: Address) : Server = { - id: addr, - chain: List(), - requestMsgs: Set() - } - - // generate a status message - pure def syncStatus (server: Server) : StatusMsg = - // TODO: perhaps we should add to height to the chain entries to capture non-zero bases - { peer: server.id , base: 0, top: server.chain.length() - 1 } - - /// new server state and response messages to be sent - type ServerOutput = { - sync: Server, - msg: Option[ResponseMsg], - } - - // main method: respond to incoming request, if any - pure def syncServer (s: Server) : ServerOutput = - if (s.requestMsgs.size() > 0) - val m = s.requestMsgs.fold(emptyReqMsg, (acc, i) => i) // chooseSome() // TODO: fix - val result = - if (m.height < s.chain.length()) - match m.rtype { - | SyncCertificate => - val cm = { client: m.client, - server: s.id, - height: m.height, - response: RespCertificate(s.chain[m.height].commit)} - Some(cm) - | SyncBlock => - val bl = { client: m.client, - server: s.id, - height: m.height, - response: RespBlock(s.chain[m.height].decision)} - Some(bl) - } - else None - { sync: { ...s, requestMsgs: s.requestMsgs.exclude(Set(m))}, - msg: result} - else { - sync: s, - msg: None} - - // State machine - - var bsyncServers: Address -> Server - - action initServer(nodes) = all { - bsyncServers' = nodes.mapBy(v => newServer(v)), - } - - action unchangedServer = all { - bsyncServers' = bsyncServers, - } - - // Deliver a request message, from requestsBuffer, to node - action deliverRequest(node) = all { - requestsBuffer.get(node).size() > 0, - val server = bsyncServers.get(node) - nondet msg = requestsBuffer.get(node).oneOf() - all { - bsyncServers' = bsyncServers.put(node, {... server, - requestMsgs: server.requestMsgs.union(Set(msg))}), - statusBuffer' = statusBuffer, - requestsBuffer' = requestsBuffer.put(node, requestsBuffer.get(node).exclude(Set(msg))), - responsesBuffer' = responsesBuffer, - } - } - - // Server at node broadcasts its blockchain status - action broadcastStatus(node) = all { - val server = bsyncServers.get(node) - val msg = server.syncStatus() - all { - bsyncServers' = bsyncServers.put(node, server), - statusBuffer' = broadcastStatusMsg(statusBuffer, msg), - requestsBuffer' = requestsBuffer, - responsesBuffer' = responsesBuffer, - } - } - - // Server at node takes a step (checking for requests and responding) - action stepServer(node) = all { - val server = bsyncServers.get(node) - val result = server.syncServer() - all { - bsyncServers' = bsyncServers.put(node, server), - statusBuffer' = statusBuffer, - requestsBuffer' = requestsBuffer, - responsesBuffer' = match result.msg { - | Some(m) => responsesBuffer.sendResponse(m) - | None => responsesBuffer - }, - } - } - - // Updates the server status, the latest available blockchain content. - // This action must be called by the component that knows the chain. - action updateServer(node, chain) = all { - val server = bsyncServers.get(node) - all { - chain != server.chain, - bsyncServers' = bsyncServers.put(node, { ...server, chain: chain }), - unchangedBsync, - } - } - -} diff --git a/specs/quint/specs/blocksync/sync.qnt b/specs/quint/specs/blocksync/sync.qnt deleted file mode 100644 index 3c64d9786..000000000 --- a/specs/quint/specs/blocksync/sync.qnt +++ /dev/null @@ -1,254 +0,0 @@ -// -*- mode: Bluespec; -*- - -module sync { - - // General definitions - import blocksync.* from "./blocksync" - - import statemachineAsync( - validators = Set("v1", "v2", "v3", "v4"), - validatorSet = Set("v1", "v2", "v3", "v4").mapBy(x => 1), - Faulty = Set("v1"), - Values = Set("red", "blue"), - Rounds = Set(0, 1, 2, 3), - Heights = Set(0) // , 1, 2, 3) - ).* from "../statemachineAsync" - - import bsync_client.* from "./client" - import bsync_server.* from "./server" - - // **************************************************************************** - // State machine - // **************************************************************************** - // - // The statemachine is put on top of statemachineAsync, that is, we use its - // initialization and steps, and add the updates to the variables defined below - // - - /// initializing the variables of the sync part of the state machine - action syncInit = all { - initClient(validators), - initServer(validators), - initBsync(validators), - } - - action syncUnchangedAll = all { - unchangedServer, - unchangedClient, - unchangedBsync, - } - - /// initialize consensus and synchronizer - action initAll = all { - init, - syncInit - } - - // - // Actions for the Environment to send a node to a new height - // - - /// environment sends the node to the next height. - /// initializes synchronizer - action newHeightActionSync(v, valset, h) = all { - newHeightClient(v, h), - unchangedBsync, - unchangedServer, - } - - /// environment sends the node to the next height. - action newHeightActionAll(v, valset, h) = all { - newHeightActionSync(v, valset, h), - newHeightAction(v, valset, h), - } - - // - // Actions for process steps in the sync protocol - // - - // Update server v from the consensus' blockchain - action syncUpdateServer(v) = all { - val chain = system.get(v).es.chain - all { - updateServer(v, chain), - unchangedClient, - unchangedAll, - } - } - - // Server v announces its status - action syncStatusStep(v) = all { - all { - broadcastStatus(v), - unchangedClient, - unchangedAll, - } - } - - // Server v takes a step (checking for requests and responding) - action syncStepServer(v) = all { - all { - stepServer(v), - unchangedClient, - unchangedAll, - } - } - - // Client v takes a step - // - // FIXME: here we need to apply the client output to the consensus - // state machine. Refactoring this method is much more complex. - // - action syncStepClient(v) = all { - val result = bsyncClient(bsyncClients.get(v)) - all { - // TODO: this block should be implemented in client.qnt - bsyncClients' = bsyncClients.put(v, result.sync), - statusBuffer' = statusBuffer, - requestsBuffer' = match result.req { - | Some(m) => requestsBuffer.put(m.server, requestsBuffer.get(m.server).union(Set(m))) - | None => requestsBuffer - }, - responsesBuffer' = responsesBuffer, - - unchangedServer, - - // This is the part that interacts with the consensus state - // machine. Moreover, the logic's code should be here, but due - // to Quint issues we need to keep it in stateMachineAsync. - putSyncOutputIntoNode(v, result.so), - } - } - - // - // Actions for the environment to deliver messages in the sync protocol - // Implemented by the blocksync server or client. - // - - // deliver a status message to client v - action syncDeliverStatus(v) = all { - deliverStatus(v), - unchangedServer, - unchangedAll - } - - // deliver a response to client v - action syncDeliverResp(v) = all { - deliverResponse(v), - unchangedServer, - unchangedAll - } - - // deliver a request to server v - action syncDeliverReq(v) = all { - deliverRequest(v), - unchangedClient, - unchangedAll - } - - // - // Complex actions for a validator: consensus and blocksync - // - - // validator step in the system with sync protocol - action syncValStep(v) = all { - valStep(v), - syncUnchangedAll - } - - /// main step function: either a consensus state-machine step or a sync protocol step - action stepWithBlockSync = any { - all { - step, - syncUnchangedAll - }, - nondet v = oneOf(Correct) - any { - syncDeliverReq(v), - syncDeliverResp(v), - syncDeliverStatus(v), - syncUpdateServer(v), // update the server with the latest state of the chain - syncStepServer(v), // looking for a request and sends responses - syncStepClient(v), // is checking if there are responses and check whether it need to requ - syncStatusStep(v), - //syncTimeout(v) // TODO: - } - } - - // - // Interesting scenarios - // - - /// auxiliary function for initHeight - /// sets the chain - pure def setChain(s: DriverState, c: List[{decision: Proposal, commit: Set[Vote]}]): DriverState = - {... s, chain: c} - - /// auxiliary function for initHeight - /// constructs a commit certificate for a height and value - pure def commitSet (h: Height, v: Value) : Set[Vote] = - Set("v1", "v2", "v3").map(n => mkVote(Precommit, n, h, 0, v)) - - /// An action to set up an initial state with some nodes already decided up to height h - /// this sets up an initial state where v4 starts late, and v2 and v3 have reached - /// height h - action initHeight(h) = all { - val special = "v4" // TODO proper selection from correct set - val initsystem = Correct.mapBy(v => - // hack - if (v == special) initNode(v, validatorSet, 0) - else initNode(v, validatorSet, h)) - nondet decisionValMap = 0.to(h-1).setOfMaps(Values).oneOf() - val propMap = decisionValMap.keys().mapBy(i => - mkProposal( proposer(validatorSet, i,0), - i, - 0, - decisionValMap.get(i), - 0)) - val list = range(0, h).foldl(List(), (acc, i) => acc.append(propMap.get(i))) - val chain = list.foldl(List(), (acc, i) => acc.append({decision: i, commit: commitSet(i.height, Val(i.proposal))})) - all { - system' = initsystem.keys().mapBy(x => - // hack - if (x == special) initsystem.get(x) - else {... initsystem.get(x), es: setChain(initsystem.get(x).es, chain) }), - propBuffer' = Correct.mapBy(v => Set()), - voteBuffer' = Correct.mapBy(v => Set()), - certBuffer' = Correct.mapBy(v => Set()), - _hist' = { validator: "INIT", input: NoDInput, output: NoConsensusOutput }, - syncInit - } - } - - /// a simple scenario where v4 starts height h - run syncCycle(h) = - newHeightActionAll("v4", validatorSet, h) - .then(syncStatusStep("v2")) - .then(syncDeliverStatus("v4")) - .then(syncStepClient("v4")) // ask for certificate - .then(syncDeliverReq("v2")) - .then(syncStepServer("v2")) - .then(syncDeliverResp("v4")) - .then(syncStepClient("v4")) // ask for block and give certificate to node - .expect(system.get("v4").incomingSyncCertificates.size() > 0) - .then(syncDeliverReq("v2")) - .then(syncStepServer("v2")) - .then(syncDeliverResp("v4")) - .then(syncStepClient("v4")) - .expect(system.get("v4").incomingSyncProposals.size() > 0) - .then(3.reps(_ => syncValStep("v4"))) - .expect(system.get("v4").es.chain.length() > h) - - run retreat = - nondet heightToReach = 1.to(4).oneOf() - initHeight( q::debug ("Height:", heightToReach)) - // FIXME: I had to put it here, instead of syncCycle(h) - // This is not ideal, but it works. - .then(syncUpdateServer("v2")) - .then(heightToReach.reps(i => syncCycle(i))) - .expect(system.get("v4").es.chain == system.get("v2").es.chain) - .then(newHeightActionAll("v4", validatorSet, heightToReach)) - .expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height) - // and now v4 has synced ! - -} diff --git a/specs/quint/specs/reset/README.md b/specs/quint/specs/reset/README.md index 0edb1ab84..04b98de30 100644 --- a/specs/quint/specs/reset/README.md +++ b/specs/quint/specs/reset/README.md @@ -31,9 +31,9 @@ The following actions add faulty behaviors to the standard `step` action: - `L1ForkIDMonotonic`: L2 forkID in L1 blocks is non-decreasing - Proofs validation invariants - - `InvalidRegistrationProofRejectedInv`: If the latest block in L1 does not include a (valid) proof or the proof contains an invalid registration, then the proof should be rejected. We check that by attesting that L1's provenHeight remains unchanged (checked also for `--step "stepWithInvalidRegs"`) - - `OldProofRejectedInv`: L1 blocks should not accept proofs with non monotonically increasing proven L2 heights. As a consequence, the latest L2 proven height in L1 should remain unchanged with such a proof is submitted (checked also with `--step stepWithPotentiallyOldProofs`) - - `FutureProofRejectedInv`: If the proof starts from a block with height greater than provenHeight + 1 it is rejected. (checked also with `--step stepWithPotentiallyFutureProofs`) + - `InvalidRegistrationProofRejectedInv`: If the latest block in L1 does not include a (valid) proof or the proof contains an invalid registration, then the proof should be rejected. We check that by attesting that L1's provenHeight remains unchanged (checked also for `--step "stepWithInvalidRegs"`) + - `OldProofRejectedInv`: L1 blocks should not accept proofs with non monotonically increasing proven L2 heights. As a consequence, the latest L2 proven height in L1 should remain unchanged with such a proof is submitted (checked also with `--step stepWithPotentiallyOldProofs`) + - `FutureProofRejectedInv`: If the proof starts from a block with height greater than provenHeight + 1 it is rejected. (checked also with `--step stepWithPotentiallyFutureProofs`) - Local L2 invariants - `monotonicForkIDInv`: ForkID on L2 is non-decreasing diff --git a/specs/quint/specs/statemachineAsync.qnt b/specs/quint/specs/statemachineAsync.qnt index 29397e851..9fde7e2e9 100644 --- a/specs/quint/specs/statemachineAsync.qnt +++ b/specs/quint/specs/statemachineAsync.qnt @@ -269,6 +269,11 @@ action putSyncOutputIntoNode(v, syncOut) = all{ | SOBlock(prop) => system.put(v, {... system.get(v), incomingSyncProposals: system.get(v).incomingSyncProposals.union(Set(prop))}) + | SOBlockStoreEntry(b) => system.put(v, {... system.get(v), + incomingSyncProposals: + system.get(v).incomingSyncProposals.union(Set(b.decision)), + incomingSyncCertificates: + system.get(v).incomingSyncCertificates.union(Set(b.commit))}) | SONoOutput => system }, propBuffer' = propBuffer,