fix: batched prometheus ingest row metric (#8054)

* fix: count batched prometheus ingest rows

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: align batched ingest metrics

Use actual affected rows when updating `DIST_INGEST_ROW_COUNT` and cache the flush database label to avoid repeated `get_db_string` allocation.

Files: `src/servers/src/pending_rows_batcher.rs`
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-04-30 12:17:38 +08:00
committed by GitHub
parent 266734d7ef
commit f0b3ee4830

View File

@@ -228,6 +228,7 @@ struct PendingBatch {
tables: HashMap<String, TableBatch>,
created_at: Instant,
total_row_count: usize,
db_string: String,
ctx: QueryContextRef,
waiters: Vec<FlushWaiter>,
}
@@ -240,6 +241,7 @@ struct FlushWaiter {
struct FlushBatch {
table_batches: Vec<TableBatch>,
total_row_count: usize,
db_string: String,
ctx: QueryContextRef,
waiters: Vec<FlushWaiter>,
}
@@ -821,10 +823,12 @@ impl Drop for PendingRowsBatcher {
impl PendingBatch {
fn new(ctx: QueryContextRef) -> Self {
let db_string = ctx.get_db_string();
Self {
tables: HashMap::new(),
created_at: Instant::now(),
total_row_count: 0,
db_string,
ctx,
waiters: Vec::new(),
}
@@ -995,6 +999,7 @@ fn drain_batch(batch: &mut Option<PendingBatch>) -> Option<FlushBatch> {
Some(FlushBatch {
table_batches,
total_row_count,
db_string: batch.db_string,
ctx: batch.ctx,
waiters,
})
@@ -1169,15 +1174,19 @@ fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result<RecordBatch>
async fn flush_region_writes_concurrently(
node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
writes: Vec<FlushRegionWrite>,
) -> Result<()> {
) -> Result<usize> {
let mut affected_rows = 0;
if !should_dispatch_concurrently(writes.len()) {
for write in writes {
let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
.with_label_values(&["flush_write_region"])
.start_timer();
node_manager.handle(&write.datanode, write.request).await?;
affected_rows += node_manager
.handle(&write.datanode, write.request)
.await?
.affected_rows;
}
return Ok(());
return Ok(affected_rows);
}
let write_futures = writes.into_iter().map(|write| async move {
@@ -1185,13 +1194,16 @@ async fn flush_region_writes_concurrently(
.with_label_values(&["flush_write_region"])
.start_timer();
node_manager.handle(&write.datanode, write.request).await?;
Ok::<_, Error>(())
let response = node_manager.handle(&write.datanode, write.request).await?;
Ok::<_, Error>(response.affected_rows)
});
// todo(hl): should be bounded.
futures::future::try_join_all(write_futures).await?;
Ok(())
let affected_rows = futures::future::try_join_all(write_futures)
.await?
.into_iter()
.sum();
Ok(affected_rows)
}
async fn flush_batch(
@@ -1203,6 +1215,7 @@ async fn flush_batch(
let FlushBatch {
table_batches,
total_row_count,
db_string,
ctx,
waiters,
} = flush;
@@ -1233,9 +1246,12 @@ async fn flush_batch(
if result.is_err() {
FLUSH_FAILURES.inc();
FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64);
} else {
} else if let Ok(affected_rows) = &result {
FLUSH_TOTAL.inc();
FLUSH_ROWS.observe(total_row_count as f64);
operator::metrics::DIST_INGEST_ROW_COUNT
.with_label_values(&[db_string.as_str()])
.inc_by(*affected_rows as u64);
}
debug!(
@@ -1243,7 +1259,7 @@ async fn flush_batch(
total_row_count, elapsed
);
notify_waiters(waiters, result);
notify_waiters(waiters, result.map(|_| ()));
}
/// Flushes a batch of logical table rows by transforming them into the physical table format
@@ -1263,7 +1279,7 @@ pub async fn flush_batch_physical(
partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized),
node_manager: &(impl PhysicalFlushNodeRequester + ?Sized),
catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized),
) -> Result<()> {
) -> Result<usize> {
// 1. Resolve the physical table and get column ID mapping
let physical_table = {
let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED
@@ -1856,6 +1872,7 @@ mod tests {
)]),
created_at: Instant::now(),
total_row_count: 1,
db_string: ctx.get_db_string(),
ctx: ctx.clone(),
waiters: vec![FlushWaiter {
response_tx,
@@ -1868,6 +1885,7 @@ mod tests {
assert!(batch.is_none());
assert_eq!(1, flush.total_row_count);
assert_eq!(1, flush.table_batches.len());
assert_eq!(ctx.get_db_string(), flush.db_string);
assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog());
}
@@ -2480,6 +2498,51 @@ mod tests {
assert_eq!(1, node.writes.load(Ordering::SeqCst));
}
#[derive(Default)]
struct AffectedRowsFlushNodeRequester {
affected_rows: usize,
}
#[async_trait]
impl PhysicalFlushNodeRequester for AffectedRowsFlushNodeRequester {
async fn handle(
&self,
_peer: &Peer,
_request: RegionRequest,
) -> error::Result<RegionResponse> {
Ok(RegionResponse::new(self.affected_rows))
}
}
#[tokio::test]
async fn test_flush_batch_physical_returns_actual_affected_rows() {
let table_batches = vec![TableBatch {
table_name: "t1".to_string(),
table_id: 11,
batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)],
row_count: 1,
}];
let ctx = session::context::QueryContext::arc();
let affected_rows = flush_batch_physical(
&table_batches,
"phy",
&ctx,
&MockFlushPartitionProvider {
partition_rule_calls: Arc::new(AtomicUsize::new(0)),
region_leader_calls: Arc::new(AtomicUsize::new(0)),
},
&AffectedRowsFlushNodeRequester { affected_rows: 7 },
&MockFlushCatalogProvider {
table: Some(mock_physical_table_metadata(1024)),
},
)
.await
.unwrap();
assert_eq!(7, affected_rows);
}
#[tokio::test]
async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() {
let table_batches = vec![TableBatch {