From 77f20ede7afc3b6b6ed4302c1af23a0e5b451989 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 11 Jun 2025 07:57:22 +0000 Subject: [PATCH] feat/bulk-support-flow-batch: **Refactor and Enhance Timestamp Handling in `bulk_insert.rs`** - **Refactored Timestamp Extraction**: Moved timestamp extraction logic to a new method `maybe_update_flow_dirty_window` to improve code readability and maintainability. - **Enhanced Flow Update Logic**: Updated the flow dirty window update mechanism to conditionally notify flownodes only if they are configured, using `table_info` and `record_batch`. - **Imports Adjusted**: Updated imports to reflect changes in table metadata handling, replacing `TableId` with `TableInfoRef`. Files affected: - `src/operator/src/bulk_insert.rs` Signed-off-by: Lei, HUANG --- src/operator/src/bulk_insert.rs | 47 +++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index 88160afabc..4aa90b6cf8 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -32,7 +32,8 @@ use common_grpc::FlightData; use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, TableId}; +use store_api::storage::RegionId; +use table::metadata::TableInfoRef; use table::TableRef; use crate::insert::Inserter; @@ -46,7 +47,8 @@ impl Inserter { decoder: &mut FlightDecoder, data: FlightData, ) -> error::Result { - let table_id = table.table_info().table_id(); + let table_info = table.table_info(); + let table_id = table_info.table_id(); let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["decode_request"]) .start_timer(); @@ -60,23 +62,8 @@ impl Inserter { }; decode_timer.observe_duration(); - if let Ok(t) = extract_timestamps( - &record_batch, - &table - .table_info() - .meta - .schema - .timestamp_column() - .as_ref() - .unwrap() - .name, - ) - .inspect_err(|e| { - error!(e; "Failed to extract timestamps from record batch"); - }) { - // notify flownode to update dirty timestamps. - self.update_flow_dirty_window(table_id, t); - } + // notify flownode to update dirty timestamps if flow is configured. + self.maybe_update_flow_dirty_window(table_info, record_batch.clone()); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS @@ -247,7 +234,8 @@ impl Inserter { Ok(rows_inserted) } - fn update_flow_dirty_window(&self, table_id: TableId, timestamps: Vec) { + fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) { + let table_id = table_info.table_id(); let table_flownode_set_cache = self.table_flownode_set_cache.clone(); let node_manager = self.node_manager.clone(); common_runtime::spawn_global(async move { @@ -264,6 +252,25 @@ impl Inserter { }; let peers: HashSet<_> = flownodes.values().cloned().collect(); + if peers.is_empty() { + return; + } + + let Ok(timestamps) = extract_timestamps( + &record_batch, + &table_info + .meta + .schema + .timestamp_column() + .as_ref() + .unwrap() + .name, + ) + .inspect_err(|e| { + error!(e; "Failed to extract timestamps from record batch"); + }) else { + return; + }; for peer in peers { let node_manager = node_manager.clone();