From 007a2b3dfe63070012ff820d430c50ef656de73d Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 24 Apr 2025 05:55:38 +0800 Subject: [PATCH] refactor: use ChainedRecordBatchStream to simplify codes --- src/mito2/src/read/series_scan.rs | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 40d42b2695..d26473698c 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -18,15 +18,15 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use async_stream::{stream, try_stream}; +use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::compute::concat_batches; use datatypes::schema::SchemaRef; -use futures::StreamExt; use smallvec::{smallvec, SmallVec}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -191,7 +191,6 @@ impl SeriesScan { *rx_list = receivers; } - // TODO(yingwen): Reuse codes. /// Scans the region and returns a stream. pub(crate) async fn build_stream(&self) -> Result { let part_num = self.properties.num_partitions(); @@ -199,18 +198,8 @@ impl SeriesScan { let streams = (0..part_num) .map(|i| self.scan_partition(&metrics_set, i)) .collect::, BoxedError>>()?; - let stream = stream! { - for mut stream in streams { - while let Some(rb) = stream.next().await { - yield rb; - } - } - }; - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.schema(), - Box::pin(stream), - )); - Ok(stream) + let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?; + Ok(Box::pin(chained_stream)) } }