Skip to content

Commit

Permalink
Don't run program in runtime if its' queue is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
ByteNacked committed Jan 23, 2025
1 parent e5e2d2a commit a34d3ec
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethexe/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ parity-scale-codec = { workspace = true, features = ["std", "derive"] }
sp-allocator = { workspace = true, features = ["std"] }
sp-wasm-interface = { workspace = true, features = ["std", "wasmtime"] }
tokio = { workspace = true, features = ["full"] }
itertools.workspace = true

[dev-dependencies]
wabt.workspace = true
Expand Down
41 changes: 28 additions & 13 deletions ethexe/processor/src/handling/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ use crate::{
};
use core_processor::common::JournalNote;
use ethexe_db::{CodesStorage, Database};
use ethexe_runtime_common::{InBlockTransitions, JournalHandler, TransitionController};
use ethexe_runtime_common::{
state::Storage, InBlockTransitions, JournalHandler, TransitionController,
};
use gear_core::ids::ProgramId;
use gprimitives::H256;
use itertools::Itertools;
use std::collections::BTreeMap;
use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -89,12 +92,28 @@ async fn run_in_async(
handles.push(handle);
}

// Send tasks to process programs in workers, until all queues are empty.
loop {
// Send tasks to process programs in workers, until all queues are empty.
// Filter out programs with empty queue
let filtered_states: Vec<_> = in_block_transitions
.states_iter()
.filter(|(_, state)| {
!db.read_state(**state)
.expect("failed to read state from storage")
.queue
.is_queue_empty()
})
.map(|(program_id, hash)| (*program_id, *hash))
.collect();

let mut no_more_to_do = true;
for index in (0..in_block_transitions.states_amount()).step_by(virtual_threads) {
let result_receivers = one_batch(index, &task_senders, in_block_transitions).await;

for states_chunk in filtered_states
.into_iter()
.chunks(virtual_threads)
.into_iter()
{
let result_receivers = one_batch(states_chunk, &task_senders).await;

let mut super_journal = vec![];
for (program_id, receiver) in result_receivers.into_iter() {
Expand Down Expand Up @@ -165,27 +184,23 @@ async fn worker(
}

async fn one_batch(
from_index: usize,
state_chunks: impl IntoIterator<Item = (ProgramId, H256)>,
task_senders: &[mpsc::Sender<Task>],
in_block_transitions: &mut InBlockTransitions,
) -> BTreeMap<ProgramId, oneshot::Receiver<Vec<JournalNote>>> {
let mut result_receivers = BTreeMap::new();

for (sender, (program_id, state_hash)) in task_senders
.iter()
.zip(in_block_transitions.states_iter().skip(from_index))
{
for (sender, (program_id, state_hash)) in task_senders.iter().zip(state_chunks) {
let (result_sender, result_receiver) = oneshot::channel();

let task = Task::Run {
program_id: *program_id,
state_hash: *state_hash,
program_id,
state_hash,
result_sender,
};

sender.send(task).await.unwrap();

result_receivers.insert(*program_id, result_receiver);
result_receivers.insert(program_id, result_receiver);
}

result_receivers
Expand Down

0 comments on commit a34d3ec

Please sign in to comment.