From 2f08bee08f2272c08e7709562be13218ec895687 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 10 Jun 2025 14:13:01 +0000 Subject: [PATCH] feat/bulk-support-flow-batch: ### Add Dirty Window Handling in Flow Module - **Updated `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`. - **Flow Module Enhancements**: - Added `DirtyWindowRequest` handling in `flow.rs`, `node_manager.rs`, `test_util.rs`, `flownode_impl.rs`, and `server.rs`. - Implemented `handle_mark_window_dirty` function to manage dirty time windows. - **Bulk Insert Enhancements**: - Modified `bulk_insert.rs` to notify flownodes about dirty time windows using `update_flow_dirty_window`. - **Removed Unused Imports**: Cleaned up unused imports in `greptime_handler.rs`, `grpc.rs`, and `mod.rs`. Signed-off-by: Lei, HUANG --- src/client/src/flow.rs | 28 ++++++++++++- src/common/meta/src/node_manager.rs | 5 ++- src/common/meta/src/test_util.rs | 14 ++++++- src/flow/src/adapter/flownode_impl.rs | 11 +++++ src/flow/src/server.rs | 19 ++++++++- src/operator/src/bulk_insert.rs | 52 ++++++++++++++++++++++-- src/operator/src/insert.rs | 2 +- src/servers/src/grpc/greptime_handler.rs | 1 - src/servers/src/query_handler/grpc.rs | 1 - src/servers/tests/mod.rs | 1 - 10 files changed, 122 insertions(+), 12 deletions(-) diff --git a/src/client/src/flow.rs b/src/client/src/flow.rs index 0d5963f848..7c3479d412 100644 --- a/src/client/src/flow.rs +++ b/src/client/src/flow.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::node_manager::Flownode; @@ -44,6 +44,16 @@ impl Flownode for FlowRequester { .map_err(BoxedError::new) .context(common_meta::error::ExternalSnafu) } + + async fn handle_mark_window_dirty( + &self, + req: DirtyWindowRequest, + ) -> common_meta::error::Result { + self.handle_mark_window_dirty(req) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } } impl FlowRequester { @@ -91,4 +101,20 @@ impl FlowRequester { .into_inner(); Ok(response) } + + async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result { + let (addr, mut client) = self.client.raw_flow_client()?; + let response = client + .handle_mark_dirty_time_window(DirtyWindowRequests { + requests: vec![req], + }) + .await + .or_else(|e| { + let code = e.code(); + let err: crate::error::Error = e.into(); + Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code }) + })? + .into_inner(); + Ok(response) + } } diff --git a/src/common/meta/src/node_manager.rs b/src/common/meta/src/node_manager.rs index bda4e02d1e..b640cf307c 100644 --- a/src/common/meta/src/node_manager.rs +++ b/src/common/meta/src/node_manager.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; use common_query::request::QueryRequest; @@ -42,6 +42,9 @@ pub trait Flownode: Send + Sync { async fn handle(&self, request: FlowRequest) -> Result; async fn handle_inserts(&self, request: InsertRequests) -> Result; + + /// Handles requests to mark time window as dirty. + async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result; } pub type FlownodeRef = Arc; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index f790960690..8dc4a38643 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::region::RegionResponse; -use api::v1::flow::{FlowRequest, FlowResponse}; +use api::v1::flow::{DirtyWindowRequest, FlowRequest, FlowResponse}; use api::v1::region::{InsertRequests, RegionRequest}; pub use common_base::AffectedRows; use common_query::request::QueryRequest; @@ -67,6 +67,14 @@ pub trait MockFlownodeHandler: Sync + Send + Clone { ) -> Result { unimplemented!() } + + async fn handle_mark_window_dirty( + &self, + _peer: &Peer, + _req: DirtyWindowRequest, + ) -> Result { + unimplemented!() + } } /// A mock struct implements [NodeManager] only implement the `datanode` method. @@ -134,6 +142,10 @@ impl Flownode for MockNode { async fn handle_inserts(&self, requests: InsertRequests) -> Result { self.handler.handle_inserts(&self.peer, requests).await } + + async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result { + self.handler.handle_mark_window_dirty(&self.peer, req).await + } } #[async_trait::async_trait] diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index b42ae2c29e..25bbc1e5e0 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -31,6 +31,7 @@ use common_runtime::JoinHandle; use common_telemetry::{error, info, trace, warn}; use datatypes::value::Value; use futures::TryStreamExt; +use greptime_proto::v1::flow::DirtyWindowRequest; use itertools::Itertools; use session::context::QueryContextBuilder; use snafu::{ensure, IntoError, OptionExt, ResultExt}; @@ -819,6 +820,11 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { .map(|_| Default::default()) .map_err(to_meta_err(snafu::location!())) } + + async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult { + // todo: implement + unimplemented!() + } } /// return a function to convert `crate::error::Error` to `common_meta::error::Error` @@ -926,6 +932,11 @@ impl common_meta::node_manager::Flownode for StreamingEngine { .map(|_| Default::default()) .map_err(to_meta_err(snafu::location!())) } + + async fn handle_mark_window_dirty(&self, _req: DirtyWindowRequest) -> MetaResult { + // todo: implement + unimplemented!() + } } impl FlowEngine for StreamingEngine { diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index e41168f5d3..76d1e01b48 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -32,7 +32,9 @@ use common_query::Output; use common_runtime::JoinHandle; use common_telemetry::tracing::info; use futures::{FutureExt, TryStreamExt}; -use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertRequests}; +use greptime_proto::v1::flow::{ + flow_server, DirtyWindowRequests, FlowRequest, FlowResponse, InsertRequests, +}; use itertools::Itertools; use operator::delete::Deleter; use operator::insert::Inserter; @@ -136,6 +138,21 @@ impl flow_server::Flow for FlowService { .map(Response::new) .map_err(to_status_with_last_err) } + + async fn handle_mark_dirty_time_window( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + for req in req.requests { + self.dual_engine + .handle_mark_window_dirty(req) + .await + .map(Response::new) + .map_err(to_status_with_last_err)?; + } + Ok(Response::new(FlowResponse::default())) + } } #[derive(Clone)] diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index 99fc9e327b..fdf31899d1 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use ahash::{HashMap, HashMapExt}; +use api::v1::flow::{DirtyWindowRequest, WindowRange}; use api::v1::region::{ bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, }; @@ -26,10 +29,10 @@ use arrow::record_batch::RecordBatch; use common_base::AffectedRows; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::FlightData; -use common_telemetry::tracing::Instrument; +use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, TableId}; use table::TableRef; use crate::insert::Inserter; @@ -67,7 +70,8 @@ impl Inserter { .unwrap() .name, )? { - // notify flownode. + // notify flownode to update dirty time windows. + self.update_flow_dirty_window(table_id, min, max); } metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS @@ -237,6 +241,46 @@ impl Inserter { crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); Ok(rows_inserted) } + + fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) { + let table_flownode_set_cache = self.table_flownode_set_cache.clone(); + let node_manager = self.node_manager.clone(); + common_runtime::spawn_global(async move { + let result = table_flownode_set_cache + .get(table_id) + .await + .context(error::RequestInsertsSnafu); + let flownodes = match result { + Ok(flownodes) => flownodes.unwrap_or_default(), + Err(e) => { + error!(e; "Failed to get flownodes for table id: {}", table_id); + return; + } + }; + + let peers: HashSet<_> = flownodes.values().cloned().collect(); + for peer in peers { + let node_manager = node_manager.clone(); + common_runtime::spawn_global(async move { + if let Err(e) = node_manager + .flownode(&peer) + .await + .handle_mark_window_dirty(DirtyWindowRequest { + table_id, + dirty_time_ranges: vec![WindowRange { + start_value: min, + end_value: max, + }], + }) + .await + .context(error::RequestInsertsSnafu) + { + error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max); + } + }); + } + }); + } } /// Calculate the timestamp range of record batch. Return `None` if record batch is empty. @@ -275,7 +319,7 @@ fn compute_timestamp_range( .unwrap() .reinterpret_cast::(), }, - t @ _ => { + t => { return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail(); } }; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index efc9812554..590eca914f 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -78,7 +78,7 @@ pub struct Inserter { catalog_manager: CatalogManagerRef, pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) node_manager: NodeManagerRef, - table_flownode_set_cache: TableFlownodeSetCacheRef, + pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, } pub type InserterRef = Arc; diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index b0f26fa0ee..91457e5452 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -40,7 +40,6 @@ use futures_util::StreamExt; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use session::hints::READ_PREFERENCE_HINT; use snafu::{OptionExt, ResultExt}; -use table::metadata::TableId; use table::TableRef; use tokio::sync::mpsc; diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index f2a941da39..15f18aaaed 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -23,7 +23,6 @@ use common_grpc::flight::FlightDecoder; use common_query::Output; use session::context::QueryContextRef; use snafu::ResultExt; -use table::metadata::TableId; use table::table_name::TableName; use table::TableRef; diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 63f2ec4a53..41454c8ddc 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -34,7 +34,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler}; use session::context::QueryContextRef; use snafu::ensure; use sql::statements::statement::Statement; -use table::metadata::TableId; use table::table_name::TableName; use table::TableRef;