Skip to content

Commit

Permalink
feat(flow): refill flow task def(Part 2) (GreptimeTeam#5317)
Browse files Browse the repository at this point in the history
* feat: refill task def

* chore: per review

* chore: after rebase
  • Loading branch information
discord9 authored Jan 15, 2025
1 parent f0d30a0 commit 0185a65
Show file tree
Hide file tree
Showing 2 changed files with 438 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast, watch, Mutex, RwLock};

pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::refill::RefillTask;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
Expand All @@ -57,6 +58,7 @@ use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};

mod flownode_impl;
mod parse_expr;
pub(crate) mod refill;
mod stat;
#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -154,6 +156,8 @@ pub struct FlowWorkerManager {
frontend_invoker: RwLock<Option<FrontendInvoker>>,
/// contains mapping from table name to global id, and table schema
node_context: RwLock<FlownodeContext>,
/// Contains all refill tasks
refill_tasks: RwLock<BTreeMap<FlowId, RefillTask>>,
flow_err_collectors: RwLock<BTreeMap<FlowId, ErrCollector>>,
src_send_buf_lens: RwLock<BTreeMap<TableId, watch::Receiver<usize>>>,
tick_manager: FlowTickManager,
Expand Down Expand Up @@ -193,6 +197,7 @@ impl FlowWorkerManager {
table_info_source: srv_map,
frontend_invoker: RwLock::new(None),
node_context: RwLock::new(node_context),
refill_tasks: Default::default(),
flow_err_collectors: Default::default(),
src_send_buf_lens: Default::default(),
tick_manager,
Expand Down
Loading

0 comments on commit 0185a65

Please sign in to comment.