feat/bulk-support-flow-batch:

**Refactor Timestamp Handling and Update Dependencies**

 - **Dependency Update**: Updated `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml` to a new revision.
 - **Batching Engine Refactor**: Modified `src/flow/src/batching_mode/engine.rs` to replace `dirty_time_ranges` with `timestamps` for improved timestamp handling.
 - **Bulk Insert Refactor**: Updated `src/operator/src/bulk_insert.rs` to refactor timestamp extraction and handling. Replaced `compute_timestamp_range` with `extract_timestamps` and adjusted related logic to handle timestamps directly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-11 07:33:31 +00:00
parent 94a14b6da7
commit 41dacff283
2 changed files with 27 additions and 34 deletions

View File

@@ -88,7 +88,7 @@ impl BatchingEngine {
for r in reqs.requests {
let tid = TableId::from(r.table_id);
let entry = group_by_table_id.entry(tid).or_default();
entry.extend(r.dirty_time_ranges);
entry.extend(r.timestamps);
}
let tids = group_by_table_id.keys().cloned().collect::<Vec<TableId>>();
let table_infos =
@@ -101,7 +101,7 @@ impl BatchingEngine {
let group_by_table_name = group_by_table_id
.into_iter()
.filter_map(|(id, rows)| {
.filter_map(|(id, timestamps)| {
let table_name = table_infos.get(&id).map(|info| info.table_name());
let Some(table_name) = table_name else {
warn!("Failed to get table infos for table id: {:?}", id);
@@ -118,7 +118,7 @@ impl BatchingEngine {
.as_timestamp()
.unwrap()
.unit();
Some((table_name, (rows, time_index_unit)))
Some((table_name, (timestamps, time_index_unit)))
})
.collect::<HashMap<_, _>>();
@@ -144,35 +144,28 @@ impl BatchingEngine {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = vec![];
for src_table_name in src_table_names {
if let Some((window_ranges, unit)) = group_by_table_name.get(src_table_name) {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
for window in window_ranges {
for timestamp in timestamps {
let align_start = expr
.eval(common_time::Timestamp::new(window.start_value, *unit))?
.eval(common_time::Timestamp::new(*timestamp, *unit))?
.0
.context(UnexpectedSnafu {
reason: "Failed to eval start value",
})?;
let align_end = expr
.eval(common_time::Timestamp::new(window.end_value, *unit))?
.1
.context(UnexpectedSnafu {
reason: "Failed to eval end value",
})?;
all_dirty_windows.push((align_start, align_end));
all_dirty_windows.push(align_start);
}
}
}
let mut state = task.state.write().unwrap();
let flow_id_label = task.config.flow_id.to_string();
for (s, e) in all_dirty_windows {
for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None);
METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE
.with_label_values(&[&flow_id_label])
.observe(e.sub(&s).unwrap_or_default().num_seconds() as f64);
state.dirty_time_windows.add_window(s, Some(e));
}
Ok(())
});

View File

@@ -15,7 +15,7 @@
use std::collections::HashSet;
use ahash::{HashMap, HashMapExt};
use api::v1::flow::{DirtyWindowRequest, WindowRange};
use api::v1::flow::DirtyWindowRequest;
use api::v1::region::{
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
};
@@ -59,7 +59,8 @@ impl Inserter {
return Ok(0);
};
decode_timer.observe_duration();
if let Some((min, max)) = compute_timestamp_range(
if let Ok(t) = extract_timestamps(
&record_batch,
&table
.table_info()
@@ -69,10 +70,14 @@ impl Inserter {
.as_ref()
.unwrap()
.name,
)? {
// notify flownode to update dirty time windows.
self.update_flow_dirty_window(table_id, min, max);
)
.inspect_err(|e| {
error!(e; "Failed to extract timestamps from record batch");
}) {
// notify flownode to update dirty timestamps.
self.update_flow_dirty_window(table_id, t);
}
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
metrics::BULK_REQUEST_ROWS
.with_label_values(&["raw"])
@@ -242,7 +247,7 @@ impl Inserter {
Ok(rows_inserted)
}
fn update_flow_dirty_window(&self, table_id: TableId, min: i64, max: i64) {
fn update_flow_dirty_window(&self, table_id: TableId, timestamps: Vec<i64>) {
let table_flownode_set_cache = self.table_flownode_set_cache.clone();
let node_manager = self.node_manager.clone();
common_runtime::spawn_global(async move {
@@ -259,23 +264,22 @@ impl Inserter {
};
let peers: HashSet<_> = flownodes.values().cloned().collect();
for peer in peers {
let node_manager = node_manager.clone();
let timestamps = timestamps.clone();
common_runtime::spawn_global(async move {
if let Err(e) = node_manager
.flownode(&peer)
.await
.handle_mark_window_dirty(DirtyWindowRequest {
table_id,
dirty_time_ranges: vec![WindowRange {
start_value: min,
end_value: max,
}],
timestamps,
})
.await
.context(error::RequestInsertsSnafu)
{
error!(e; "Failed to mark time window as dirty, table: {}, min: {}, max: {}", table_id, min, max);
error!(e; "Failed to mark timestamps as dirty, table: {}", table_id);
}
});
}
@@ -284,17 +288,14 @@ impl Inserter {
}
/// Calculate the timestamp range of record batch. Return `None` if record batch is empty.
fn compute_timestamp_range(
rb: &RecordBatch,
timestamp_index_name: &str,
) -> error::Result<Option<(i64, i64)>> {
fn extract_timestamps(rb: &RecordBatch, timestamp_index_name: &str) -> error::Result<Vec<i64>> {
let ts_col = rb
.column_by_name(timestamp_index_name)
.context(error::ColumnNotFoundSnafu {
msg: timestamp_index_name,
})?;
if rb.num_rows() == 0 {
return Ok(None);
return Ok(vec![]);
}
let primitive = match ts_col.data_type() {
DataType::Timestamp(unit, _) => match unit {
@@ -323,6 +324,5 @@ fn compute_timestamp_range(
return error::InvalidTimeIndexTypeSnafu { ty: t.clone() }.fail();
}
};
Ok(arrow::compute::min(&primitive).zip(arrow::compute::max(&primitive)))
Ok(primitive.iter().flatten().collect())
}