Skip to content

Commit

Permalink
[refactor] #4387: Send blocks to observing peers
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <[email protected]>
  • Loading branch information
Erigara committed Mar 29, 2024
1 parent 44bf0d5 commit 8f2481c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 58 deletions.
53 changes: 27 additions & 26 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,33 @@ mod valid {
expected_chain_id: &ChainId,
state_block: &mut StateBlock<'_>,
) -> WithEvents<Result<ValidBlock, (SignedBlock, BlockValidationError)>> {
let expected_block_height = state_block.height() + 1;
let actual_height = block.header().height;

if expected_block_height != actual_height {
return WithEvents::new(Err((
block,
BlockValidationError::LatestBlockHeightMismatch {
expected: expected_block_height,
actual: actual_height,
},
)));
}

let expected_prev_block_hash = state_block.latest_block_hash();
let actual_prev_block_hash = block.header().previous_block_hash;

if expected_prev_block_hash != actual_prev_block_hash {
return WithEvents::new(Err((
block,
BlockValidationError::LatestBlockHashMismatch {
expected: expected_prev_block_hash,
actual: actual_prev_block_hash,
},
)));
}

// NOTE: should be checked AFTER height and hash, both this issues lead to topology mismatch
if !block.header().is_genesis() {
let actual_commit_topology = block.commit_topology();
let expected_commit_topology = &topology.ordered_peers;
Expand Down Expand Up @@ -304,32 +331,6 @@ mod valid {
}
}

let expected_block_height = state_block.height() + 1;
let actual_height = block.header().height;

if expected_block_height != actual_height {
return WithEvents::new(Err((
block,
BlockValidationError::LatestBlockHeightMismatch {
expected: expected_block_height,
actual: actual_height,
},
)));
}

let expected_prev_block_hash = state_block.latest_block_hash();
let actual_prev_block_hash = block.header().previous_block_hash;

if expected_prev_block_hash != actual_prev_block_hash {
return WithEvents::new(Err((
block,
BlockValidationError::LatestBlockHashMismatch {
expected: expected_prev_block_hash,
actual: actual_prev_block_hash,
},
)));
}

if block
.transactions()
.any(|tx| state_block.has_transaction(tx.as_ref().hash()))
Expand Down
6 changes: 6 additions & 0 deletions core/src/gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ impl TransactionGossiper {
}) => {
iroha_logger::debug!(tx_payload_hash = %tx.as_ref().hash(), "Transaction already in blockchain, ignoring...")
}
Err(crate::queue::Failure {
tx,
err: crate::queue::Error::IsInQueue,
}) => {
iroha_logger::debug!(tx_payload_hash = %tx.as_ref().hash(), "Transaction already in the queue, ignoring...")
}
Err(crate::queue::Failure { tx, err }) => {
iroha_logger::error!(?err, tx_payload_hash = %tx.as_ref().hash(), "Failed to enqueue transaction.")
}
Expand Down
73 changes: 41 additions & 32 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,27 +532,55 @@ impl Sumeragi {
*voting_block = Some(v_block);
}
}
(BlockMessage::BlockCreated(block_created), Role::ObservingPeer) => {
(BlockMessage::BlockCreated(BlockCreated { block }), Role::ObservingPeer) => {
let current_topology = current_topology.is_consensus_required().expect(
"Peer has `ObservingPeer` role, which mean that current topology require consensus"
);

// Release block writer before creating new one
let _ = voting_block.take();
if let Some(v_block) = self.vote_for_block(state, &current_topology, block_created)
{
if current_view_change_index >= 1 {
let block_hash = v_block.block.as_ref().hash();

let v_block = {
let block_hash = block.hash_of_payload();
let role = self.current_topology.role(&self.peer_id);
trace!(%addr, %role, %block_hash, "Block received, voting...");

let mut state_block = state.block();
match ValidBlock::validate(
block,
&current_topology,
&self.chain_id,
&mut state_block,
)
.unpack(|e| self.send_event(e))
{
Ok(block) => {
let block = if current_view_change_index >= 1 {
block.sign(&self.key_pair)
} else {
block
};

Some(VotingBlock::new(block, state_block))
}
Err((_, error)) => {
warn!(%addr, %role, ?error, "Block validation failed");
None
}
}
};

if let Some(v_block) = v_block {
let block_hash = v_block.block.as_ref().hash();
info!(%addr, %block_hash, "Block validated");
if current_view_change_index >= 1 {
self.broadcast_packet_to(
BlockSigned::from(&v_block.block),
[current_topology.proxy_tail()],
);
info!(%addr, block=%block_hash, "Block validated, signed and forwarded");
*voting_block = Some(v_block);
} else {
error!(%addr, %role, "Received BlockCreated message, but shouldn't");
info!(%addr, block=%block_hash, "Block signed and forwarded");
}
*voting_block = Some(v_block);
}
}
(BlockMessage::BlockCreated(block_created), Role::ProxyTail) => {
Expand Down Expand Up @@ -636,7 +664,7 @@ impl Sumeragi {
.unpack(|e| self.send_event(e));

let created_in = create_block_start_time.elapsed();
if let Some(current_topology) = current_topology.is_consensus_required() {
if current_topology.is_consensus_required().is_some() {
info!(%addr, created_in_ms=%created_in.as_millis(), block=%new_block.as_ref().hash(), "Block created");

if created_in > self.pipeline_time() / 2 {
Expand All @@ -645,11 +673,7 @@ impl Sumeragi {
*voting_block = Some(VotingBlock::new(new_block.clone(), state_block));

let msg = BlockCreated::from(new_block);
if current_view_change_index >= 1 {
self.broadcast_packet(msg);
} else {
self.broadcast_packet_to(msg, current_topology.voting_peers());
}
self.broadcast_packet(msg);
} else {
match new_block
.commit(current_topology)
Expand Down Expand Up @@ -679,32 +703,17 @@ impl Sumeragi {
info!(block=%committed_block.as_ref().hash(), "Block reached required number of votes");

let msg = BlockCommitted::from(&committed_block);
let current_topology = current_topology
.is_consensus_required()
.expect("Peer has `ProxyTail` role, which mean that current topology require consensus");

#[cfg(debug_assertions)]
if is_genesis_peer && self.debug_force_soft_fork {
std::thread::sleep(self.pipeline_time() * 2);
} else if current_view_change_index >= 1 {
self.broadcast_packet(msg);
} else {
self.broadcast_packet_to(msg, current_topology.voting_peers());
self.broadcast_packet(msg);
}

#[cfg(not(debug_assertions))]
{
if current_view_change_index >= 1 {
self.broadcast_packet(msg);
} else {
self.broadcast_packet_to(
msg,
current_topology
.ordered_peers
.iter()
.take(current_topology.min_votes_for_commit()),
);
}
self.broadcast_packet(msg);
}
self.commit_block(committed_block, state_block);
}
Expand Down

0 comments on commit 8f2481c

Please sign in to comment.