From 483daeebe96a60fccbf606afa332beef46046372 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 13 Mar 2026 08:04:11 +0800 Subject: [PATCH] shrink the diff size Signed-off-by: Ruihang Xia --- src/common/recordbatch/src/adapter.rs | 11 +++-- src/mito2/src/read/stream.rs | 60 +++++++++++++-------------- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 8f9045d3ad..fc12d87dcf 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use common_base::readable_size::ReadableSize; -use common_telemetry::tracing::{Span, debug_span}; +use common_telemetry::tracing::{Span, info_span}; use common_time::util::format_nanoseconds_human_readable; use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; @@ -247,7 +247,7 @@ impl RecordBatchStreamAdapter { pub fn try_new_with_span(stream: DfSendableRecordBatchStream, span: Span) -> Result { let schema = Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); - let subspan = debug_span!(parent: &span, "RecordBatchStreamAdapter"); + let subspan = info_span!(parent: &span, "RecordBatchStreamAdapter"); Ok(Self { schema, stream, @@ -301,13 +301,15 @@ impl Stream for RecordBatchStreamAdapter { .map(|m| m.elapsed_compute().clone()) .unwrap_or_default(); let _guard = timer.timer(); - let poll_span = debug_span!(parent: &self.span, "poll_next"); + let poll_span = info_span!(parent: &self.span, "poll_next"); let _entered = poll_span.enter(); match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => Poll::Pending, Poll::Ready(Some(df_record_batch)) => { let df_record_batch = df_record_batch?; - if let Metrics::Unresolved(df_plan) = &self.metrics_2 { + if let Metrics::Unresolved(df_plan) | Metrics::PartialResolved(df_plan, _) = + &self.metrics_2 + { let mut metric_collector = MetricCollector::new(self.explain_verbose); accept(df_plan.as_ref(), &mut metric_collector).unwrap(); self.metrics_2 = Metrics::PartialResolved( @@ -460,6 +462,7 @@ fn format_bytes_human_readable(bytes: usize) -> String { format!("{}", ReadableSize(bytes as u64)) } +/// Only display `plan_metrics` with indent ` ` (2 spaces). impl Display for RecordBatchMetrics { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for metric in &self.plan_metrics { diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 7b74da2eab..80002147ea 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -66,20 +66,24 @@ impl ConvertBatchStream { } } - fn convert_into_pending(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<()> { + fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result { match batch { ScanBatch::Normal(batch) => { // Safety: Only primary key format returns this batch. let mapper = self.projection_mapper.as_primary_key().unwrap(); if batch.is_empty() { - self.pending.push_back(mapper.empty_record_batch()); + Ok(mapper.empty_record_batch()) } else { - self.pending - .push_back(mapper.convert(&batch, &self.cache_strategy)?); + mapper.convert(&batch, &self.cache_strategy) } } ScanBatch::Series(series) => { + debug_assert!( + self.pending.is_empty(), + "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist" + ); + match series { SeriesBatch::PrimaryKey(primary_key_batch) => { // Safety: Only primary key format returns this batch. @@ -100,17 +104,20 @@ impl ConvertBatchStream { } } } + + let output_schema = self.projection_mapper.output_schema(); + Ok(self + .pending + .pop_front() + .unwrap_or_else(|| RecordBatch::new_empty(output_schema))) } ScanBatch::RecordBatch(df_record_batch) => { // Safety: Only flat format returns this batch. let mapper = self.projection_mapper.as_flat().unwrap(); - self.pending - .push_back(mapper.convert(&df_record_batch, &self.cache_strategy)?); + mapper.convert(&df_record_batch, &self.cache_strategy) } } - - Ok(()) } } @@ -122,30 +129,21 @@ impl Stream for ConvertBatchStream { return Poll::Ready(Some(Ok(batch))); } - loop { - let batch = futures::ready!(self.inner.poll_next_unpin(cx)); - let Some(batch) = batch else { - return Poll::Ready(None); - }; + let batch = futures::ready!(self.inner.poll_next_unpin(cx)); + let Some(batch) = batch else { + return Poll::Ready(None); + }; - let result = match batch { - Ok(batch) => { - let start = Instant::now(); - let result = self.convert_into_pending(batch); - self.partition_metrics - .inc_convert_batch_cost(start.elapsed()); - result - } - Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu), - }; - - if let Err(e) = result { - return Poll::Ready(Some(Err(e))); + let record_batch = match batch { + Ok(batch) => { + let start = Instant::now(); + let record_batch = self.convert(batch); + self.partition_metrics + .inc_convert_batch_cost(start.elapsed()); + record_batch } - - if let Some(batch) = self.pending.pop_front() { - return Poll::Ready(Some(Ok(batch))); - } - } + Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu), + }; + Poll::Ready(Some(record_batch)) } }