refactor: use ChainedRecordBatchStream to simplify codes

This commit is contained in:
evenyag
2025-04-24 05:55:38 +08:00
parent f35e957ddd
commit 007a2b3dfe

View File

@@ -18,15 +18,15 @@ use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_stream::{stream, try_stream};
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::compute::concat_batches;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use smallvec::{smallvec, SmallVec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -191,7 +191,6 @@ impl SeriesScan {
*rx_list = receivers;
}
// TODO(yingwen): Reuse codes.
/// Scans the region and returns a stream.
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let part_num = self.properties.num_partitions();
@@ -199,18 +198,8 @@ impl SeriesScan {
let streams = (0..part_num)
.map(|i| self.scan_partition(&metrics_set, i))
.collect::<Result<Vec<_>, BoxedError>>()?;
let stream = stream! {
for mut stream in streams {
while let Some(rb) = stream.next().await {
yield rb;
}
}
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.schema(),
Box::pin(stream),
));
Ok(stream)
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
Ok(Box::pin(chained_stream))
}
}