Skip to content

Commit

Permalink
Fix for DA L2 sync race condition (#2512)
Browse files Browse the repository at this point in the history
## Linked Issues/PRs
<!-- List of related issues/PRs -->

## Description
<!-- List of detailed changes -->
In the case we are syncing from an existing network, it is possible for
the L2 blocks to sync slower than the DA costs, in which case you will
try to apply DA costs to an algorithm that is missing the corresponding
unrecorded L2 blocks.

This PR
- introduces a `latest_l2_height` to the `DaSourceService` which will
filter out any DA bundles that include L2 blocks after the current
height
- moves the recorded height concept into the `DaSourceService` where it
probably should have been to begin with


## Checklist
- [ ] Breaking changes are clearly marked as such in the PR description
and changelog
- [ ] New behavior is reflected in tests
- [ ] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [ ] I have reviewed the code myself
- [ ] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?

---------

Co-authored-by: Aaryamann Challani <[email protected]>
  • Loading branch information
MitchTurner and rymnc authored Jan 7, 2025
1 parent a7eaa32 commit 31fd51f
Show file tree
Hide file tree
Showing 10 changed files with 401 additions and 81 deletions.
2 changes: 1 addition & 1 deletion crates/services/gas_price_service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Error {
pub type Result<T, E = Error> = core::result::Result<T, E>;

// Info required about the l2 block for the gas price algorithm
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BlockInfo {
// The genesis block of the L2 chain
GenesisBlock,
Expand Down
144 changes: 139 additions & 5 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,30 @@ mod tests {
use super::*;
use crate::v1::da_source_service::{
dummy_costs::DummyDaBlockCosts,
service::new_service,
service::{
new_da_service,
DaSourceService,
DA_BLOCK_COSTS_CHANNEL_SIZE,
},
};
use fuel_core_services::Service;
use fuel_core_services::{
RunnableTask,
Service,
StateWatcher,
};
use fuel_core_types::fuel_types::BlockHeight;
use std::{
sync::Arc,
sync::{
Arc,
Mutex,
},
time::Duration,
};

fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() {
// given
Expand All @@ -43,7 +59,12 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32)));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
Expand All @@ -62,7 +83,12 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let latest_l2_height = latest_l2_height(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
Expand All @@ -74,4 +100,112 @@ mod tests {
assert!(da_block_costs_res.is_err());
service.stop_and_await().await.unwrap();
}

#[tokio::test]
async fn run__will_not_return_cost_bundles_for_bundles_that_are_greater_than_l2_height(
) {
// given
let l2_height = 4;
let unexpected_costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 0..=9,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
assert!(unexpected_costs.l2_blocks.end() > &l2_height);
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
service.start_and_await().await.unwrap();
notifier.notified().await;

// then
let err = shared_state.try_recv();
tracing::info!("err: {:?}", err);
assert!(err.is_err());
}

#[tokio::test]
async fn run__filtered_da_block_costs_do_not_update_latest_recorded_block() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.try_init();

// given
let l2_height = 4;
let unexpected_costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=9,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
assert!(unexpected_costs.l2_blocks.end() > &l2_height);
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
);
let mut watcher = StateWatcher::started();

// when
let _ = service.run(&mut watcher).await;

// then
let recorded_height = service.recorded_height();
let expected = 1;
assert!(recorded_height.is_none())
}

#[tokio::test]
async fn run__recorded_height_updated_by_da_costs() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.try_init();

// given
let l2_height = 10;
let recorded_height = 9;
let unexpected_costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=recorded_height,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
sender,
);
let mut watcher = StateWatcher::started();

// when
let next = service.run(&mut watcher).await;

// then
let actual = service.recorded_height().unwrap();
let expected = BlockHeight::from(recorded_height);
assert_eq!(expected, actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub trait BlockCommitterApi: Send + Sync {
/// which receives data from the block committer (only http api for now)
pub struct BlockCommitterDaBlockCosts<BlockCommitter> {
client: BlockCommitter,
last_recorded_height: Option<BlockHeight>,
}

#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)]
Expand Down Expand Up @@ -80,11 +79,7 @@ impl From<RawDaBlockCosts> for DaBlockCosts {
impl<BlockCommitter> BlockCommitterDaBlockCosts<BlockCommitter> {
/// Create a new instance of the block committer da block costs source
pub fn new(client: BlockCommitter) -> Self {
let last_recorded_height = None;
Self {
client,
last_recorded_height,
}
Self { client }
}
}

Expand All @@ -93,30 +88,25 @@ impl<BlockCommitter> DaBlockCostsSource for BlockCommitterDaBlockCosts<BlockComm
where
BlockCommitter: BlockCommitterApi,
{
async fn request_da_block_costs(&mut self) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> =
match self.last_recorded_height.and_then(|x| x.succ()) {
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};
async fn request_da_block_costs(
&mut self,
last_recorded_height: &Option<BlockHeight>,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> = match last_recorded_height.and_then(|x| x.succ())
{
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};

let da_block_costs: Vec<_> =
raw_da_block_costs.iter().map(DaBlockCosts::from).collect();
if let Some(cost) = raw_da_block_costs.last() {
self.last_recorded_height = Some(BlockHeight::from(cost.end_height));
}

Ok(da_block_costs)
}

async fn set_last_value(&mut self, height: BlockHeight) -> DaBlockCostsResult<()> {
self.last_recorded_height = Some(height);
Ok(())
}
}

pub struct BlockCommitterHttpApi {
Expand Down Expand Up @@ -502,7 +492,7 @@ mod tests {
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);

// when
let actual = block_committer.request_da_block_costs().await.unwrap();
let actual = block_committer.request_da_block_costs(&None).await.unwrap();

// then
assert_eq!(actual, expected);
Expand All @@ -517,10 +507,12 @@ mod tests {
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs.clone()));
let latest_height = BlockHeight::new(da_block_costs.end_height);
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);
block_committer.set_last_value(latest_height).await.unwrap();

// when
let actual = block_committer.request_da_block_costs().await.unwrap();
let actual = block_committer
.request_da_block_costs(&Some(latest_height))
.await
.unwrap();

// then
let l2_blocks = actual.first().unwrap().l2_blocks.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ impl DummyDaBlockCosts {

#[async_trait::async_trait]
impl DaBlockCostsSource for DummyDaBlockCosts {
async fn request_da_block_costs(&mut self) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
async fn request_da_block_costs(
&mut self,
_latest_recorded_height: &Option<BlockHeight>,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
match &self.value {
Ok(da_block_costs) => {
self.notifier.notify_waiters();
Expand All @@ -35,8 +38,4 @@ impl DaBlockCostsSource for DummyDaBlockCosts {
}
}
}

async fn set_last_value(&mut self, _height: BlockHeight) -> DaBlockCostsResult<()> {
unimplemented!("This is a dummy implementation");
}
}
Loading

0 comments on commit 31fd51f

Please sign in to comment.