diff --git a/src/servers/src/pending_rows_batcher.rs b/src/servers/src/pending_rows_batcher.rs index 4cd8331636..307311a4e2 100644 --- a/src/servers/src/pending_rows_batcher.rs +++ b/src/servers/src/pending_rows_batcher.rs @@ -228,6 +228,7 @@ struct PendingBatch { tables: HashMap, created_at: Instant, total_row_count: usize, + db_string: String, ctx: QueryContextRef, waiters: Vec, } @@ -240,6 +241,7 @@ struct FlushWaiter { struct FlushBatch { table_batches: Vec, total_row_count: usize, + db_string: String, ctx: QueryContextRef, waiters: Vec, } @@ -821,10 +823,12 @@ impl Drop for PendingRowsBatcher { impl PendingBatch { fn new(ctx: QueryContextRef) -> Self { + let db_string = ctx.get_db_string(); Self { tables: HashMap::new(), created_at: Instant::now(), total_row_count: 0, + db_string, ctx, waiters: Vec::new(), } @@ -995,6 +999,7 @@ fn drain_batch(batch: &mut Option) -> Option { Some(FlushBatch { table_batches, total_row_count, + db_string: batch.db_string, ctx: batch.ctx, waiters, }) @@ -1169,15 +1174,19 @@ fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result async fn flush_region_writes_concurrently( node_manager: &(impl PhysicalFlushNodeRequester + ?Sized), writes: Vec, -) -> Result<()> { +) -> Result { + let mut affected_rows = 0; if !should_dispatch_concurrently(writes.len()) { for write in writes { let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED .with_label_values(&["flush_write_region"]) .start_timer(); - node_manager.handle(&write.datanode, write.request).await?; + affected_rows += node_manager + .handle(&write.datanode, write.request) + .await? + .affected_rows; } - return Ok(()); + return Ok(affected_rows); } let write_futures = writes.into_iter().map(|write| async move { @@ -1185,13 +1194,16 @@ async fn flush_region_writes_concurrently( .with_label_values(&["flush_write_region"]) .start_timer(); - node_manager.handle(&write.datanode, write.request).await?; - Ok::<_, Error>(()) + let response = node_manager.handle(&write.datanode, write.request).await?; + Ok::<_, Error>(response.affected_rows) }); // todo(hl): should be bounded. - futures::future::try_join_all(write_futures).await?; - Ok(()) + let affected_rows = futures::future::try_join_all(write_futures) + .await? + .into_iter() + .sum(); + Ok(affected_rows) } async fn flush_batch( @@ -1203,6 +1215,7 @@ async fn flush_batch( let FlushBatch { table_batches, total_row_count, + db_string, ctx, waiters, } = flush; @@ -1233,9 +1246,12 @@ async fn flush_batch( if result.is_err() { FLUSH_FAILURES.inc(); FLUSH_DROPPED_ROWS.inc_by(total_row_count as u64); - } else { + } else if let Ok(affected_rows) = &result { FLUSH_TOTAL.inc(); FLUSH_ROWS.observe(total_row_count as f64); + operator::metrics::DIST_INGEST_ROW_COUNT + .with_label_values(&[db_string.as_str()]) + .inc_by(*affected_rows as u64); } debug!( @@ -1243,7 +1259,7 @@ async fn flush_batch( total_row_count, elapsed ); - notify_waiters(waiters, result); + notify_waiters(waiters, result.map(|_| ())); } /// Flushes a batch of logical table rows by transforming them into the physical table format @@ -1263,7 +1279,7 @@ pub async fn flush_batch_physical( partition_manager: &(impl PhysicalFlushPartitionProvider + ?Sized), node_manager: &(impl PhysicalFlushNodeRequester + ?Sized), catalog_manager: &(impl PhysicalFlushCatalogProvider + ?Sized), -) -> Result<()> { +) -> Result { // 1. Resolve the physical table and get column ID mapping let physical_table = { let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED @@ -1856,6 +1872,7 @@ mod tests { )]), created_at: Instant::now(), total_row_count: 1, + db_string: ctx.get_db_string(), ctx: ctx.clone(), waiters: vec![FlushWaiter { response_tx, @@ -1868,6 +1885,7 @@ mod tests { assert!(batch.is_none()); assert_eq!(1, flush.total_row_count); assert_eq!(1, flush.table_batches.len()); + assert_eq!(ctx.get_db_string(), flush.db_string); assert_eq!(ctx.current_catalog(), flush.ctx.current_catalog()); } @@ -2480,6 +2498,51 @@ mod tests { assert_eq!(1, node.writes.load(Ordering::SeqCst)); } + #[derive(Default)] + struct AffectedRowsFlushNodeRequester { + affected_rows: usize, + } + + #[async_trait] + impl PhysicalFlushNodeRequester for AffectedRowsFlushNodeRequester { + async fn handle( + &self, + _peer: &Peer, + _request: RegionRequest, + ) -> error::Result { + Ok(RegionResponse::new(self.affected_rows)) + } + } + + #[tokio::test] + async fn test_flush_batch_physical_returns_actual_affected_rows() { + let table_batches = vec![TableBatch { + table_name: "t1".to_string(), + table_id: 11, + batches: vec![mock_tag_batch("tag1", "host-1", 1000, 1.0)], + row_count: 1, + }]; + let ctx = session::context::QueryContext::arc(); + + let affected_rows = flush_batch_physical( + &table_batches, + "phy", + &ctx, + &MockFlushPartitionProvider { + partition_rule_calls: Arc::new(AtomicUsize::new(0)), + region_leader_calls: Arc::new(AtomicUsize::new(0)), + }, + &AffectedRowsFlushNodeRequester { affected_rows: 7 }, + &MockFlushCatalogProvider { + table: Some(mock_physical_table_metadata(1024)), + }, + ) + .await + .unwrap(); + + assert_eq!(7, affected_rows); + } + #[tokio::test] async fn test_flush_batch_physical_stops_before_partition_and_node_when_table_missing() { let table_batches = vec![TableBatch {