From f5eac3528c87740be3635345942d3c2e0d229bef Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 10 Jun 2025 22:20:50 +0800 Subject: [PATCH] feat: mark dirty time window --- src/flow/src/batching_mode/engine.rs | 116 ++++++++++++++++++++++++++- src/flow/src/server.rs | 22 +++-- 2 files changed, 123 insertions(+), 15 deletions(-) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 06bd4a4d3c..4cdf68e389 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use api::v1::flow::{DirtyWindowRequests, FlowResponse}; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; @@ -29,8 +30,7 @@ use common_telemetry::{debug, info}; use common_time::TimeToLive; use query::QueryEngineRef; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionId; -use table::metadata::TableId; +use store_api::storage::{RegionId, TableId}; use tokio::sync::{oneshot, RwLock}; use crate::batching_mode::frontend_client::FrontendClient; @@ -77,6 +77,118 @@ impl BatchingEngine { } } + pub async fn handle_mark_dirty_time_window( + &self, + reqs: DirtyWindowRequests, + ) -> Result { + let table_info_mgr = self.table_meta.table_info_manager(); + + let mut group_by_table_id: HashMap> = HashMap::new(); + for r in reqs.requests { + let tid = TableId::from(r.table_id); + let entry = group_by_table_id.entry(tid).or_default(); + entry.extend(r.dirty_time_ranges); + } + let tids = group_by_table_id.keys().cloned().collect::>(); + let table_infos = + table_info_mgr + .batch_get(&tids) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("Failed to get table info for table ids: {:?}", tids), + })?; + + let group_by_table_name = group_by_table_id + .into_iter() + .filter_map(|(id, rows)| { + let table_name = table_infos.get(&id).map(|info| info.table_name()); + let Some(table_name) = table_name else { + warn!("Failed to get table infos for table id: {:?}", id); + return None; + }; + let table_name = [ + table_name.catalog_name, + table_name.schema_name, + table_name.table_name, + ]; + let schema = &table_infos.get(&id).unwrap().table_info.meta.schema; + let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()] + .data_type + .as_timestamp() + .unwrap() + .unit(); + Some((table_name, (rows, time_index_unit))) + }) + .collect::>(); + + let group_by_table_name = Arc::new(group_by_table_name); + + let mut handles = Vec::new(); + let tasks = self.tasks.read().await; + + for (_flow_id, task) in tasks.iter() { + let src_table_names = &task.config.source_table_names; + + if src_table_names + .iter() + .all(|name| !group_by_table_name.contains_key(name)) + { + continue; + } + + let group_by_table_name = group_by_table_name.clone(); + let task = task.clone(); + + let handle: JoinHandle> = tokio::spawn(async move { + let src_table_names = &task.config.source_table_names; + let mut all_dirty_windows = vec![]; + for src_table_name in src_table_names { + if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) { + let Some(expr) = &task.config.time_window_expr else { + continue; + }; + for window in window_ranges { + let align_start = expr + .eval(common_time::Timestamp::new(window.start_value, *unit))? + .0 + .context(UnexpectedSnafu { + reason: "Failed to eval start value", + })?; + + let align_end = expr + .eval(common_time::Timestamp::new(window.end_value, *unit))? + .1 + .context(UnexpectedSnafu { + reason: "Failed to eval end value", + })?; + all_dirty_windows.push((align_start, align_end)); + } + } + } + let mut state = task.state.write().unwrap(); + for (s, e) in all_dirty_windows { + state.dirty_time_windows.add_window(s, Some(e)); + } + Ok(()) + }); + handles.push(handle); + } + drop(tasks); + for handle in handles { + match handle.await { + Err(e) => { + warn!("Failed to handle inserts: {e}"); + } + Ok(Ok(())) => (), + Ok(Err(e)) => { + warn!("Failed to handle inserts: {e}"); + } + } + } + + Ok(Default::default()) + } + pub async fn handle_inserts_inner( &self, request: api::v1::region::InsertRequests, diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index a1cfc73aaa..c440affa1c 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -17,6 +17,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use api::v1::flow::DirtyWindowRequests; use api::v1::{RowDeleteRequests, RowInsertRequests}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; @@ -32,9 +33,7 @@ use common_query::Output; use common_runtime::JoinHandle; use common_telemetry::tracing::info; use futures::{FutureExt, TryStreamExt}; -use greptime_proto::v1::flow::{ - flow_server, DirtyWindowRequests, FlowRequest, FlowResponse, InsertRequests, -}; +use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; use itertools::Itertools; use operator::delete::Deleter; use operator::insert::Inserter; @@ -141,17 +140,14 @@ impl flow_server::Flow for FlowService { async fn handle_mark_dirty_time_window( &self, - request: Request, + reqs: Request, ) -> Result, Status> { - let req = request.into_inner(); - for req in req.requests { - self.dual_engine - .handle_mark_window_dirty(req) - .await - .map(Response::new) - .map_err(to_status_with_last_err)?; - } - Ok(Response::new(FlowResponse::default())) + self.dual_engine + .batching_engine() + .handle_mark_dirty_time_window(reqs.into_inner()) + .await + .map(Response::new) + .map_err(to_status_with_last_err) } }