feat/bulk-support-flow-batch:

**Refactor and Enhance Timestamp Handling in `bulk_insert.rs`**

 - **Refactored Timestamp Extraction**: Moved timestamp extraction logic to a new method `maybe_update_flow_dirty_window` to improve code readability and maintainability.
 - **Enhanced Flow Update Logic**: Updated the flow dirty window update mechanism to conditionally notify flownodes only if they are configured, using `table_info` and `record_batch`.
 - **Imports Adjusted**: Updated imports to reflect changes in table metadata handling, replacing `TableId` with `TableInfoRef`.

 Files affected:
 - `src/operator/src/bulk_insert.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-11 07:57:22 +00:00
parent ced018fce0
commit 77f20ede7a

View File

@@ -32,7 +32,8 @@ use common_grpc::FlightData;
use common_telemetry::error;
use common_telemetry::tracing_context::TracingContext;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use store_api::storage::RegionId;
use table::metadata::TableInfoRef;
use table::TableRef;
use crate::insert::Inserter;
@@ -46,7 +47,8 @@ impl Inserter {
decoder: &mut FlightDecoder,
data: FlightData,
) -> error::Result<AffectedRows> {
let table_id = table.table_info().table_id();
let table_info = table.table_info();
let table_id = table_info.table_id();
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"])
.start_timer();
@@ -60,23 +62,8 @@ impl Inserter {
};
decode_timer.observe_duration();
if let Ok(t) = extract_timestamps(
&record_batch,
&table
.table_info()
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)
.inspect_err(|e| {
error!(e; "Failed to extract timestamps from record batch");
}) {
// notify flownode to update dirty timestamps.
self.update_flow_dirty_window(table_id, t);
}
// notify flownode to update dirty timestamps if flow is configured.
self.maybe_update_flow_dirty_window(table_info, record_batch.clone());
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS
@@ -247,7 +234,8 @@ impl Inserter {
Ok(rows_inserted)
}
fn update_flow_dirty_window(&self, table_id: TableId, timestamps: Vec<i64>) {
fn maybe_update_flow_dirty_window(&self, table_info: TableInfoRef, record_batch: RecordBatch) {
let table_id = table_info.table_id();
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
@@ -264,6 +252,25 @@ impl Inserter {
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
if peers.is_empty() {
return;
}
let Ok(timestamps) = extract_timestamps(
&record_batch,
&table_info
.meta
.schema
.timestamp_column()
.as_ref()
.unwrap()
.name,
)
.inspect_err(|e| {
error!(e; "Failed to extract timestamps from record batch");
}) else {
return;
};
for peer in peers {
let node_manager = node_manager.clone();