From 9d997d593c8180531cde62e5d481a42a24bff79c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 16 Jun 2025 16:19:14 +0800 Subject: [PATCH] feat: bulk support flow batch (#6291) * feat/bulk-support-flow-batch: ### Refactor and Enhance Timestamp Handling in gRPC and Bulk Insert - **Refactor Table Handling**: - Updated `put_record_batch` method to use `TableRef` instead of `TableId` in `grpc.rs`, `greptime_handler.rs`, and `grpc.rs`. - Modified `handle_bulk_insert` to accept `TableRef` and extract `TableId` internally in `bulk_insert.rs`. - **Enhance Timestamp Processing**: - Added `compute_timestamp_range` function to calculate timestamp range in `bulk_insert.rs`. - Introduced error handling for invalid time index types in `error.rs`. - **Test Adjustments**: - Updated `DummyInstance` implementation in `tests/mod.rs` to align with new method signatures. Signed-off-by: Lei, HUANG * feat/bulk-support-flow-batch: ### Add Dirty Window Handling in Flow Module - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`. - **Flow Module Enhancements**: - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`. - Implemented `handle_mark_window_dirty` function to manage dirty time windows. - **Bulk Insert Enhancements**: - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`. - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`. Signed-off-by: Lei, HUANG * feat: mark dirty time window * feat: metrics * metrics: more useful metrics batching mode * feat/bulk-support-flow-batch: **Refactor Timestamp Handling and Update Dependencies** - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision. - **Batching Engine Refactor**: Modified `src/flow/src/batching_mode/engine.rs` to replace `dirty_time_ranges` with `timestamps` for improved timestamp handling. - **Bulk Insert Refactor**: Updated `src/operator/src/bulk_insert.rs` to refactor timestamp extraction and handling. Replaced `compute_timestamp_range` with `extract_timestamps` and adjusted related logic to handle timestamps directly. Signed-off-by: Lei, HUANG * feat/bulk-support-flow-batch: ### Update Metrics in Batching Mode Engine - **Modified Metrics**: Replaced `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE` with `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW` to track the count of time windows instead of their range. - Files affected: `engine.rs`, `metrics.rs` - **New Method**: Added `len()` method to `DirtyTimeWindows` to return the number of dirty windows. - File affected: `state.rs` Signed-off-by: Lei, HUANG * feat/bulk-support-flow-batch: **Refactor and Enhance Timestamp Handling in `bulk_insert.rs`** - **Refactored Timestamp Extraction**: Moved timestamp extraction logic to a new method `maybe_update_flow_dirty_window` to improve code readability and maintainability. - **Enhanced Flow Update Logic**: Updated the flow dirty window update mechanism to conditionally notify flownodes only if they are configured, using `table_info` and `record_batch`. - **Imports Adjusted**: Updated imports to reflect changes in table metadata handling, replacing `TableId` with `TableInfoRef`. Files affected: - `src/operator/src/bulk_insert.rs` Signed-off-by: Lei, HUANG * feat/bulk-support-flow-batch: ## Update `handle_mark_window_dirty` Method in `flownode_impl.rs` - Replaced `unimplemented!()` with `unreachable!()` in the `handle_mark_window_dirty` method for both `FlowDualEngine` and `StreamingEngine` implementations in `flownode_impl.rs`. Signed-off-by: Lei, HUANG * feat/bulk-support-flow-batch: Update `greptime-proto` Dependency - Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`. - `Cargo.lock`: Changed the source revision from `f0913f179ee1d2ce428f8b85a9ea12b5f69ad636` to `17971523673f4fbc982510d3c9d6647ff642e16f`. - `Cargo.toml`: Updated the `greptime-proto` git revision to `17971523673f4fbc982510d3c9d6647ff642e16f`. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG Co-authored-by: discord9 --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/client/src/flow.rs | 28 +++++- src/common/meta/src/node_manager.rs | 5 +- src/common/meta/src/test_util.rs | 14 ++- src/flow/src/adapter.rs | 2 +- src/flow/src/adapter/flownode_impl.rs | 24 ++++- src/flow/src/batching_mode/engine.rs | 115 ++++++++++++++++++++- src/flow/src/batching_mode/state.rs | 5 + src/flow/src/batching_mode/task.rs | 14 ++- src/flow/src/metrics.rs | 23 ++++- src/flow/src/server.rs | 13 +++ src/frontend/src/instance/grpc.rs | 25 +++-- src/operator/src/bulk_insert.rs | 122 ++++++++++++++++++++++- src/operator/src/error.rs | 8 ++ src/operator/src/insert.rs | 2 +- src/servers/src/grpc/greptime_handler.rs | 8 +- src/servers/src/query_handler/grpc.rs | 12 +-- src/servers/tests/mod.rs | 13 +-- 19 files changed, 390 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3fbb42e9c1..5999d195ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5143,7 +5143,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f0913f179ee1d2ce428f8b85a9ea12b5f69ad636#f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=17971523673f4fbc982510d3c9d6647ff642e16f#17971523673f4fbc982510d3c9d6647ff642e16f" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 409b797685..702d9fdd0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,7 +134,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f0913f179ee1d2ce428f8b85a9ea12b5f69ad636" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "17971523673f4fbc982510d3c9d6647ff642e16f" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/client/src/flow.rs b/src/client/src/flow.rs index 0d5963f848..7c3479d412 100644 --- a/src/client/src/flow.rs +++ b/src/client/src/flow.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::node_manager::Flownode; @@ -44,6 +44,16 @@ impl Flownode for FlowRequester { .map_err(BoxedError::new) .context(common_meta::error::ExternalSnafu) } + + async fn handle_mark_window_dirty( + &self, + req: DirtyWindowRequest, + ) -> common_meta::error::Result { + self.handle_mark_window_dirty(req) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } } impl FlowRequester { @@ -91,4 +101,20 @@ impl FlowRequester { .into_inner(); Ok(response) } + + async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result { + let (addr, mut client) = self.client.raw_flow_client()?; + let response = client + .handle_mark_dirty_time_window(DirtyWindowRequests { + requests: vec![req], + }) + .await + .or_else(|e| { + let code = e.code(); + let err: crate::error::Error = e.into(); + Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code }) + })? + .into_inner(); + Ok(response) + } } diff --git a/src/common/meta/src/node_manager.rs b/src/common/meta/src/node_manager.rs index bda4e02d1e..b640cf307c 100644 --- a/src/common/meta/src/node_manager.rs +++ b/src/common/meta/src/node_manager.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; use common_query::request::QueryRequest; @@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync { async fn handle(&self, request: FlowRequest) -> Result; async fn handle_inserts(&self, request: InsertRequests) -> Result; + + /// Handles requests to mark time window as dirty. + async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result; } pub type FlownodeRef = Arc; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index f790960690..8dc4a38643 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; use common_query::request::QueryRequest; @@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone { ) -> Result { unimplemented!() } + + async fn handle_mark_window_dirty( + &self, + _peer: &Peer, + _req: DirtyWindowRequest, + ) -> Result { + unimplemented!() + } } /// A mock struct implements [NodeManager] only implement the `datanode` method. @@ -134,6 +142,10 @@ impl Flownode for MockNode { async fn handle_inserts(&self, requests: InsertRequests) -> Result { self.handler.handle_inserts(&self.peer, requests).await } + + async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result { + self.handler.handle_mark_window_dirty(&self.peer, req).await + } } #[async_trait::async_trait] diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index f7cf78ca70..6a1697a389 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -316,7 +316,7 @@ impl StreamingEngine { ); METRIC_FLOW_ROWS - .with_label_values(&["out"]) + .with_label_values(&["out-streaming"]) .inc_by(total_rows as u64); let now = self.tick_manager.tick(); diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index b42ae2c29e..00e5c8157d 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -31,6 +31,7 @@ use common_runtime::JoinHandle; use common_telemetry::{error, info, trace, warn}; use datatypes::value::Value; use futures::TryStreamExt; +use greptime_proto::v1::flow::DirtyWindowRequest; use itertools::Itertools; use session::context::QueryContextBuilder; use snafu::{ensure, IntoError, OptionExt, ResultExt}; @@ -46,7 +47,7 @@ use crate::error::{ IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, }; -use crate::metrics::METRIC_FLOW_TASK_COUNT; +use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT}; use crate::repr::{self, DiffRow}; use crate::{Error, FlowId}; @@ -689,6 +690,9 @@ impl FlowEngine for FlowDualEngine { let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_batch_engine = request.requests; + let mut batching_row_cnt = 0; + let mut streaming_row_cnt = 0; + { // not locking this, or recover flows will be starved when also handling flow inserts let src_table2flow = self.src_table2flow.read().await; @@ -698,9 +702,11 @@ impl FlowEngine for FlowDualEngine { let is_in_stream = src_table2flow.in_stream(table_id); let is_in_batch = src_table2flow.in_batch(table_id); if is_in_stream { + streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0); to_stream_engine.push(req.clone()); } if is_in_batch { + batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0); return true; } if !is_in_batch && !is_in_stream { @@ -713,6 +719,14 @@ impl FlowEngine for FlowDualEngine { // can't use drop due to https://github.com/rust-lang/rust/pull/128846 } + METRIC_FLOW_ROWS + .with_label_values(&["in-streaming"]) + .inc_by(streaming_row_cnt as u64); + + METRIC_FLOW_ROWS + .with_label_values(&["in-batching"]) + .inc_by(batching_row_cnt as u64); + let streaming_engine = self.streaming_engine.clone(); let stream_handler: JoinHandle> = common_runtime::spawn_global(async move { @@ -819,6 +833,10 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { .map(|_| Default::default()) .map_err(to_meta_err(snafu::location!())) } + + async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult { + unreachable!() + } } /// return a function to convert `crate::error::Error` to `common_meta::error::Error` @@ -926,6 +944,10 @@ impl common_meta::node_manager::Flownode for StreamingEngine { .map(|_| Default::default()) .map_err(to_meta_err(snafu::location!())) } + + async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult { + unreachable!() + } } impl FlowEngine for StreamingEngine { diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 06bd4a4d3c..e18e466867 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use api::v1::flow::{DirtyWindowRequests, FlowResponse}; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; @@ -29,8 +30,7 @@ use common_telemetry::{debug, info}; use common_time::TimeToLive; use query::QueryEngineRef; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::storage::RegionId; -use table::metadata::TableId; +use store_api::storage::{RegionId, TableId}; use tokio::sync::{oneshot, RwLock}; use crate::batching_mode::frontend_client::FrontendClient; @@ -42,6 +42,7 @@ use crate::error::{ ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; +use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW; use crate::{CreateFlowArgs, Error, FlowId, TableName}; /// Batching mode Engine, responsible for driving all the batching mode tasks @@ -77,6 +78,116 @@ impl BatchingEngine { } } + pub async fn handle_mark_dirty_time_window( + &self, + reqs: DirtyWindowRequests, + ) -> Result { + let table_info_mgr = self.table_meta.table_info_manager(); + + let mut group_by_table_id: HashMap> = HashMap::new(); + for r in reqs.requests { + let tid = TableId::from(r.table_id); + let entry = group_by_table_id.entry(tid).or_default(); + entry.extend(r.timestamps); + } + let tids = group_by_table_id.keys().cloned().collect::>(); + let table_infos = + table_info_mgr + .batch_get(&tids) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("Failed to get table info for table ids: {:?}", tids), + })?; + + let group_by_table_name = group_by_table_id + .into_iter() + .filter_map(|(id, timestamps)| { + let table_name = table_infos.get(&id).map(|info| info.table_name()); + let Some(table_name) = table_name else { + warn!("Failed to get table infos for table id: {:?}", id); + return None; + }; + let table_name = [ + table_name.catalog_name, + table_name.schema_name, + table_name.table_name, + ]; + let schema = &table_infos.get(&id).unwrap().table_info.meta.schema; + let time_index_unit = schema.column_schemas[schema.timestamp_index.unwrap()] + .data_type + .as_timestamp() + .unwrap() + .unit(); + Some((table_name, (timestamps, time_index_unit))) + }) + .collect::>(); + + let group_by_table_name = Arc::new(group_by_table_name); + + let mut handles = Vec::new(); + let tasks = self.tasks.read().await; + + for (_flow_id, task) in tasks.iter() { + let src_table_names = &task.config.source_table_names; + + if src_table_names + .iter() + .all(|name| !group_by_table_name.contains_key(name)) + { + continue; + } + + let group_by_table_name = group_by_table_name.clone(); + let task = task.clone(); + + let handle: JoinHandle> = tokio::spawn(async move { + let src_table_names = &task.config.source_table_names; + let mut all_dirty_windows = vec![]; + for src_table_name in src_table_names { + if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) { + let Some(expr) = &task.config.time_window_expr else { + continue; + }; + for timestamp in timestamps { + let align_start = expr + .eval(common_time::Timestamp::new(*timestamp, *unit))? + .0 + .context(UnexpectedSnafu { + reason: "Failed to eval start value", + })?; + all_dirty_windows.push(align_start); + } + } + } + let mut state = task.state.write().unwrap(); + let flow_id_label = task.config.flow_id.to_string(); + for timestamp in all_dirty_windows { + state.dirty_time_windows.add_window(timestamp, None); + } + + METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW + .with_label_values(&[&flow_id_label]) + .set(state.dirty_time_windows.len() as f64); + Ok(()) + }); + handles.push(handle); + } + drop(tasks); + for handle in handles { + match handle.await { + Err(e) => { + warn!("Failed to handle inserts: {e}"); + } + Ok(Ok(())) => (), + Ok(Err(e)) => { + warn!("Failed to handle inserts: {e}"); + } + } + } + + Ok(Default::default()) + } + pub async fn handle_inserts_inner( &self, request: api::v1::region::InsertRequests, diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index f2e101a533..0ecf68b488 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -156,6 +156,11 @@ impl DirtyTimeWindows { self.windows.clear(); } + /// Number of dirty windows. + pub fn len(&self) -> usize { + self.windows.len() + } + /// Generate all filter expressions consuming all time windows /// /// there is two limits: diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index f93755d4f8..aef9eea5e8 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -61,7 +61,9 @@ use crate::error::{ SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, }; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, + METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, + METRIC_FLOW_ROWS, }; use crate::{Error, FlowId}; @@ -371,6 +373,9 @@ impl BatchingTask { "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", elapsed ); + METRIC_FLOW_ROWS + .with_label_values(&[format!("{}-out-batching", flow_id).as_str()]) + .inc_by(*affected_rows as _); } else if let Err(err) = &res { warn!( "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", @@ -410,6 +415,7 @@ impl BatchingTask { engine: QueryEngineRef, frontend_client: Arc, ) { + let flow_id_str = self.config.flow_id.to_string(); loop { // first check if shutdown signal is received // if so, break the loop @@ -427,6 +433,9 @@ impl BatchingTask { Err(TryRecvError::Empty) => (), } } + METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT + .with_label_values(&[&flow_id_str]) + .inc(); let new_query = match self.gen_insert_plan(&engine).await { Ok(new_query) => new_query, @@ -473,6 +482,9 @@ impl BatchingTask { } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT + .with_label_values(&[&flow_id_str]) + .inc(); match new_query { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 2b93a4a0a0..66ec7423e0 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -58,11 +58,32 @@ lazy_static! { vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec = + register_gauge_vec!( + "greptime_flow_batching_engine_bulk_mark_time_window", + "flow batching engine query time window count marked by bulk inserts", + &["flow_id"], + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_start_query_count", + "flow batching engine started query count", + &["flow_id"], + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_error_count", + "flow batching engine error count per flow id", + &["flow_id"], + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( "greptime_flow_processed_rows", - "Count of rows flowing through the system", + "Count of rows flowing through the system.", &["direction"] ) .unwrap(); diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index e41168f5d3..5711f4ada6 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -17,6 +17,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use api::v1::flow::DirtyWindowRequests; use api::v1::{RowDeleteRequests, RowInsertRequests}; use cache::{TABLE_FLOWNODE_SET_CACHE_NAME, TABLE_ROUTE_CACHE_NAME}; use catalog::CatalogManagerRef; @@ -136,6 +137,18 @@ impl flow_server::Flow for FlowService { .map(Response::new) .map_err(to_status_with_last_err) } + + async fn handle_mark_dirty_time_window( + &self, + reqs: Request, + ) -> Result, Status> { + self.dual_engine + .batching_engine() + .handle_mark_dirty_time_window(reqs.into_inner()) + .await + .map(Response::new) + .map_err(to_status_with_last_err) + } } #[derive(Clone)] diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 555330e76a..5383bd931a 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -35,8 +35,8 @@ use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use table::metadata::TableId; use table::table_name::TableName; +use table::TableRef; use crate::error::{ CatalogSnafu, DataFusionSnafu, Error, InFlightWriteBytesExceededSnafu, @@ -235,34 +235,33 @@ impl GrpcQueryHandler for Instance { async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, + table_name: &TableName, + table_ref: &mut Option, decoder: &mut FlightDecoder, data: FlightData, ) -> Result { - let table_id = if let Some(table_id) = table_id { - *table_id + let table = if let Some(table) = table_ref { + table.clone() } else { let table = self .catalog_manager() .table( - &table.catalog_name, - &table.schema_name, - &table.table_name, + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, None, ) .await .context(CatalogSnafu)? .with_context(|| TableNotFoundSnafu { - table_name: table.to_string(), + table_name: table_name.to_string(), })?; - let id = table.table_info().table_id(); - *table_id = Some(id); - id + *table_ref = Some(table.clone()); + table }; self.inserter - .handle_bulk_insert(table_id, decoder, data) + .handle_bulk_insert(table, decoder, data) .await .context(TableOperationSnafu) } diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index ead97d9ce7..4aa90b6cf8 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -12,18 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use ahash::{HashMap, HashMapExt}; +use api::v1::flow::DirtyWindowRequest; use api::v1::region::{ bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, }; use api::v1::ArrowIpc; +use arrow::array::{ + Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use arrow::datatypes::{DataType, Int64Type, TimeUnit}; +use arrow::record_batch::RecordBatch; use common_base::AffectedRows; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::FlightData; +use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use table::metadata::TableId; +use table::metadata::TableInfoRef; +use table::TableRef; use crate::insert::Inserter; use crate::{error, metrics}; @@ -32,10 +43,12 @@ impl Inserter { /// Handle bulk insert request. pub async fn handle_bulk_insert( &self, - table_id: TableId, + table: TableRef, decoder: &mut FlightDecoder, data: FlightData, ) -> error::Result { + let table_info = table.table_info(); + let table_id = table_info.table_id(); let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["decode_request"]) .start_timer(); @@ -48,6 +61,10 @@ impl Inserter { return Ok(0); }; decode_timer.observe_duration(); + + // notify flownode to update dirty timestamps if flow is configured. + self.maybe_update_flow_dirty_window(table_info, record_batch.clone()); + metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS .with_label_values(&["raw"]) @@ -216,4 +233,103 @@ impl Inserter { crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); Ok(rows_inserted) } + + fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) { + let table_id = table_info.table_id(); + let table_flownode_set_cache = self.table_flownode_set_cache.clone(); + let node_manager = self.node_manager.clone(); + common_runtime::spawn_global(async move { + let result = table_flownode_set_cache + .get(table_id) + .await + .context(error::RequestInsertsSnafu); + let flownodes = match result { + Ok(flownodes) => flownodes.unwrap_or_default(), + Err(e) => { + error!(e; "Failed to get flownodes for table id: {}", table_id); + return; + } + }; + + let peers: HashSet<_> = flownodes.values().cloned().collect(); + if peers.is_empty() { + return; + } + + let Ok(timestamps) = extract_timestamps( + &record_batch, + &table_info + .meta + .schema + .timestamp_column() + .as_ref() + .unwrap() + .name, + ) + .inspect_err(|e| { + error!(e; "Failed to extract timestamps from record batch"); + }) else { + return; + }; + + for peer in peers { + let node_manager = node_manager.clone(); + let timestamps = timestamps.clone(); + common_runtime::spawn_global(async move { + if let Err(e) = node_manager + .flownode(&peer) + .await + .handle_mark_window_dirty(DirtyWindowRequest { + table_id, + timestamps, + }) + .await + .context(error::RequestInsertsSnafu) + { + error!(e; "Failed to mark timestamps as dirty, table: {}", table_id); + } + }); + } + }); + } +} + +/// Calculate the timestamp range of record batch. Return `None` if record batch is empty. +fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result> { + let ts_col = rb + .column_by_name(timestamp_index_name) + .context(error::ColumnNotFoundSnafu { + msg: timestamp_index_name, + })?; + if rb.num_rows() == 0 { + return Ok(vec![]); + } + let primitive = match ts_col.data_type() { + DataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + TimeUnit::Millisecond => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + TimeUnit::Microsecond => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + TimeUnit::Nanosecond => ts_col + .as_any() + .downcast_ref::() + .unwrap() + .reinterpret_cast::(), + }, + t => { + return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail(); + } + }; + Ok(primitive.iter().flatten().collect()) } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 900d12b272..edb21a7df4 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -838,6 +838,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid time index type: {}", ty))] + InvalidTimeIndexType { + ty: arrow::datatypes::DataType, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid process id: {}", id))] InvalidProcessId { id: String }, @@ -973,6 +980,7 @@ impl ErrorExt for Error { Error::ColumnOptions { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(), Error::ComputeArrow { .. } => StatusCode::Internal, + Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments, Error::InvalidProcessId { .. } => StatusCode::InvalidArguments, Error::ProcessManagerMissing { .. } => StatusCode::Unexpected, } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index efc9812554..590eca914f 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -78,7 +78,7 @@ pub struct Inserter { catalog_manager: CatalogManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) node_manager: NodeManagerRef, - table_flownode_set_cache: TableFlownodeSetCacheRef, + pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, } pub type InserterRef = Arc; diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 14bf55bf6b..91457e5452 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -40,7 +40,7 @@ use futures_util::StreamExt; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use session::hints::READ_PREFERENCE_HINT; use snafu::{OptionExt, ResultExt}; -use table::metadata::TableId; +use table::TableRef; use tokio::sync::mpsc; use crate::error::Error::UnsupportedAuthScheme; @@ -149,8 +149,8 @@ impl GreptimeRequestHandler { .clone() .unwrap_or_else(common_runtime::global_runtime); runtime.spawn(async move { - // Cached table id - let mut table_id: Option = None; + // Cached table ref + let mut table_ref: Option = None; let mut decoder = FlightDecoder::default(); while let Some(request) = stream.next().await { @@ -169,7 +169,7 @@ impl GreptimeRequestHandler { let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); let result = handler - .put_record_batch(&table_name, &mut table_id, &mut decoder, data) + .put_record_batch(&table_name, &mut table_ref, &mut decoder, data) .await .inspect_err(|e| error!(e; "Failed to handle flight record batches")); timer.observe_duration(); diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index c7055092f6..15f18aaaed 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -23,8 +23,8 @@ use common_grpc::flight::FlightDecoder; use common_query::Output; use session::context::QueryContextRef; use snafu::ResultExt; -use table::metadata::TableId; use table::table_name::TableName; +use table::TableRef; use crate::error::{self, Result}; @@ -45,8 +45,8 @@ pub trait GrpcQueryHandler { async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, + table_name: &TableName, + table_ref: &mut Option, decoder: &mut FlightDecoder, flight_data: FlightData, ) -> std::result::Result; @@ -77,13 +77,13 @@ where async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, + table_name: &TableName, + table_ref: &mut Option, decoder: &mut FlightDecoder, data: FlightData, ) -> Result { self.0 - .put_record_batch(table, table_id, decoder, data) + .put_record_batch(table_name, table_ref, decoder, data) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcRequestSnafu) diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 1c5049e3ba..41454c8ddc 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler}; use session::context::QueryContextRef; use snafu::ensure; use sql::statements::statement::Statement; -use table::metadata::TableId; use table::table_name::TableName; use table::TableRef; @@ -160,15 +159,11 @@ impl GrpcQueryHandler for DummyInstance { async fn put_record_batch( &self, - table: &TableName, - table_id: &mut Option, - decoder: &mut FlightDecoder, - data: FlightData, + _table_name: &TableName, + _table_ref: &mut Option, + _decoder: &mut FlightDecoder, + _data: FlightData, ) -> std::result::Result { - let _ = table; - let _ = data; - let _ = table_id; - let _ = decoder; unimplemented!() } }