feat: add fetch cost metrics

This commit is contained in:
evenyag
2023-11-30 15:37:47 +08:00
parent fb96e3f68d
commit 1d73ad2a2f

View File

@@ -375,6 +375,8 @@ struct Metrics {
num_output_rows: usize,
/// Number of deleted rows.
num_deleted_rows: usize,
/// Cost to fetch batches from sources.
fetch_cost: Duration,
}
/// Helper to collect and merge small batches for same primary key.
@@ -476,7 +478,9 @@ impl Node {
/// It tries to fetch one batch from the `source`.
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
let current_batch = source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
Ok(Node {
@@ -513,8 +517,10 @@ impl Node {
/// Panics if the node has reached EOF.
async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
let current = self.current_batch.take().unwrap();
let start = Instant::now();
// Ensures batch is not empty.
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
metrics.fetch_cost += start.elapsed();
metrics.num_input_rows += self
.current_batch
.as_ref()