fix: address compiler errors

This commit is contained in:
evenyag
2025-04-08 20:51:55 +08:00
parent 96ba00d175
commit 916e1c2d9e

View File

@@ -22,6 +22,7 @@ use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::compute::concat_batches;
use datatypes::schema::SchemaRef;
@@ -81,6 +82,7 @@ impl SeriesScan {
fn scan_partition_impl(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.num_partitions() {
@@ -93,9 +95,9 @@ impl SeriesScan {
));
}
self.maybe_start_distributor();
self.maybe_start_distributor(metrics_set);
let part_metrics = new_partition_metrics(&self.stream_ctx, partition);
let part_metrics = new_partition_metrics(&self.stream_ctx, metrics_set, partition);
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
let stream_ctx = self.stream_ctx.clone();
@@ -158,7 +160,7 @@ impl SeriesScan {
}
/// Starts the distributor if the receiver list is empty.
fn maybe_start_distributor(&self) {
fn maybe_start_distributor(&self, metrics_set: &ExecutionPlanMetricsSet) {
let mut rx_list = self.receivers.lock().unwrap();
if !rx_list.is_empty() {
return;
@@ -170,6 +172,7 @@ impl SeriesScan {
semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
partitions: self.properties.partitions.clone(),
senders,
metrics_set: metrics_set.clone(),
};
common_runtime::spawn_global(async move {
distributor.execute().await;
@@ -182,8 +185,9 @@ impl SeriesScan {
/// Scans the region and returns a stream.
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
let part_num = self.properties.num_partitions();
let metrics_set = ExecutionPlanMetricsSet::default();
let streams = (0..part_num)
.map(|i| self.scan_partition(i))
.map(|i| self.scan_partition(&metrics_set, i))
.collect::<Result<Vec<_>, BoxedError>>()?;
let stream = stream! {
for mut stream in streams {
@@ -223,8 +227,12 @@ impl RegionScanner for SeriesScan {
self.stream_ctx.input.mapper.metadata().clone()
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(partition)
fn scan_partition(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
@@ -282,6 +290,8 @@ struct SeriesDistributor {
partitions: Vec<Vec<PartitionRange>>,
/// Senders of all partitions.
senders: SenderList,
/// Metrics set to report.
metrics_set: ExecutionPlanMetricsSet,
}
impl SeriesDistributor {
@@ -294,7 +304,8 @@ impl SeriesDistributor {
/// Scans all parts.
async fn scan_partitions(&mut self) -> Result<()> {
let part_metrics = new_partition_metrics(&self.stream_ctx, self.partitions.len());
let part_metrics =
new_partition_metrics(&self.stream_ctx, &self.metrics_set, self.partitions.len());
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
@@ -466,15 +477,16 @@ impl SenderList {
}
}
fn new_partition_metrics(stream_ctx: &StreamContext, partition: usize) -> PartitionMetrics {
fn new_partition_metrics(
stream_ctx: &StreamContext,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> PartitionMetrics {
PartitionMetrics::new(
stream_ctx.input.mapper.metadata().region_id,
partition,
"SeriesScan",
stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: stream_ctx.query_start.elapsed(),
..Default::default()
},
metrics_set,
)
}