From 41dacff283ad6ac433c25eda2d1e7aa902208a4b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 11 Jun 2025 07:33:31 +0000 Subject: [PATCH] feat/bulk-support-flow-batch: **Refactor Timestamp Handling and Update Dependencies** - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision. - **Batching Engine Refactor**: Modified `src/flow/src/batching_mode/engine.rs` to replace `dirty_time_ranges` with `timestamps` for improved timestamp handling. - **Bulk Insert Refactor**: Updated `src/operator/src/bulk_insert.rs` to refactor timestamp extraction and handling. Replaced `compute_timestamp_range` with `extract_timestamps` and adjusted related logic to handle timestamps directly. Signed-off-by: Lei, HUANG --- src/flow/src/batching_mode/engine.rs | 25 +++++++------------ src/operator/src/bulk_insert.rs | 36 ++++++++++++++-------------- 2 files changed, 27 insertions(+), 34 deletions(-) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 61d8633886..842ae100b4 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -88,7 +88,7 @@ impl BatchingEngine { for r in reqs.requests { let tid = TableId::from(r.table_id); let entry = group_by_table_id.entry(tid).or_default(); - entry.extend(r.dirty_time_ranges); + entry.extend(r.timestamps); } let tids = group_by_table_id.keys().cloned().collect::>(); let table_infos = @@ -101,7 +101,7 @@ impl BatchingEngine { let group_by_table_name = group_by_table_id .into_iter() - .filter_map(|(id, rows)| { + .filter_map(|(id, timestamps)| { let table_name = table_infos.get(&id).map(|info| info.table_name()); let Some(table_name) = table_name else { warn!("Failed to get table infos for table id: {:?}", id); @@ -118,7 +118,7 @@ impl BatchingEngine { .as_timestamp() .unwrap() .unit(); - Some((table_name, (rows, time_index_unit))) + Some((table_name, (timestamps, time_index_unit))) }) .collect::>(); @@ -144,35 +144,28 @@ impl BatchingEngine { let src_table_names = &task.config.source_table_names; let mut all_dirty_windows = vec![]; for src_table_name in src_table_names { - if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) { + if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) { let Some(expr) = &task.config.time_window_expr else { continue; }; - for window in window_ranges { + for timestamp in timestamps { let align_start = expr - .eval(common_time::Timestamp::new(window.start_value, *unit))? + .eval(common_time::Timestamp::new(*timestamp, *unit))? .0 .context(UnexpectedSnafu { reason: "Failed to eval start value", })?; - - let align_end = expr - .eval(common_time::Timestamp::new(window.end_value, *unit))? - .1 - .context(UnexpectedSnafu { - reason: "Failed to eval end value", - })?; - all_dirty_windows.push((align_start, align_end)); + all_dirty_windows.push(align_start); } } } let mut state = task.state.write().unwrap(); let flow_id_label = task.config.flow_id.to_string(); - for (s, e) in all_dirty_windows { + for timestamp in all_dirty_windows { + state.dirty_time_windows.add_window(timestamp, None); METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE .with_label_values(&[&flow_id_label]) .observe(e.sub(&s).unwrap_or_default().num_seconds() as f64); - state.dirty_time_windows.add_window(s, Some(e)); } Ok(()) }); diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index fdf31899d1..88160afabc 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -15,7 +15,7 @@ use std::collections::HashSet; use ahash::{HashMap, HashMapExt}; -use api::v1::flow::{DirtyWindowRequest, WindowRange}; +use api::v1::flow::DirtyWindowRequest; use api::v1::region::{ bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader, }; @@ -59,7 +59,8 @@ impl Inserter { return Ok(0); }; decode_timer.observe_duration(); - if let Some((min, max)) = compute_timestamp_range( + + if let Ok(t) = extract_timestamps( &record_batch, &table .table_info() @@ -69,10 +70,14 @@ impl Inserter { .as_ref() .unwrap() .name, - )? { - // notify flownode to update dirty time windows. - self.update_flow_dirty_window(table_id, min, max); + ) + .inspect_err(|e| { + error!(e; "Failed to extract timestamps from record batch"); + }) { + // notify flownode to update dirty timestamps. + self.update_flow_dirty_window(table_id, t); } + metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS .with_label_values(&["raw"]) @@ -242,7 +247,7 @@ impl Inserter { Ok(rows_inserted) } - fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) { + fn update_flow_dirty_window(&self, table_id: TableId, timestamps: Vec) { let table_flownode_set_cache = self.table_flownode_set_cache.clone(); let node_manager = self.node_manager.clone(); common_runtime::spawn_global(async move { @@ -259,23 +264,22 @@ impl Inserter { }; let peers: HashSet<_> = flownodes.values().cloned().collect(); + for peer in peers { let node_manager = node_manager.clone(); + let timestamps = timestamps.clone(); common_runtime::spawn_global(async move { if let Err(e) = node_manager .flownode(&peer) .await .handle_mark_window_dirty(DirtyWindowRequest { table_id, - dirty_time_ranges: vec![WindowRange { - start_value: min, - end_value: max, - }], + timestamps, }) .await .context(error::RequestInsertsSnafu) { - error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max); + error!(e; "Failed to mark timestamps as dirty, table: {}", table_id); } }); } @@ -284,17 +288,14 @@ impl Inserter { } /// Calculate the timestamp range of record batch. Return `None` if record batch is empty. -fn compute_timestamp_range( - rb: &RecordBatch, - timestamp_index_name: &str, -) -> error::Result> { +fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result> { let ts_col = rb .column_by_name(timestamp_index_name) .context(error::ColumnNotFoundSnafu { msg: timestamp_index_name, })?; if rb.num_rows() == 0 { - return Ok(None); + return Ok(vec![]); } let primitive = match ts_col.data_type() { DataType::Timestamp(unit, _) => match unit { @@ -323,6 +324,5 @@ fn compute_timestamp_range( return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail(); } }; - - Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive))) + Ok(primitive.iter().flatten().collect()) }