feat: simplify merge scan stream (#7269)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-11-21 11:50:21 +08:00
committed by GitHub
parent 29d23e0ba1
commit c13febe35d

View File

@@ -20,40 +20,35 @@ use ahash::{HashMap, HashSet};
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
use async_stream::stream;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::BoxedError;
use common_plugins::GREPTIME_EXEC_READ_COST;
use common_query::request::QueryRequest;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_telemetry::tracing_context::TracingContext;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::{SessionState, TaskContext};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream,
};
use datafusion_common::{Column as ColumnExpr, Result};
use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::RegionRequestHeader;
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use session::context::QueryContextRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::table_name::TableName;
use tokio::time::Instant;
use crate::dist_plan::analyzer::AliasMapping;
use crate::error::ConvertSchemaSnafu;
use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
use crate::region_query::RegionQueryHandlerRef;
@@ -140,7 +135,6 @@ pub struct MergeScanExec {
table: TableName,
regions: Vec<RegionId>,
plan: LogicalPlan,
schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
region_query_handler: RegionQueryHandlerRef,
metric: ExecutionPlanMetricsSet,
@@ -159,7 +153,6 @@ impl std::fmt::Debug for MergeScanExec {
f.debug_struct("MergeScanExec")
.field("table", &self.table)
.field("regions", &self.regions)
.field("schema", &self.schema)
.field("plan", &self.plan)
.finish()
}
@@ -238,12 +231,10 @@ impl MergeScanExec {
EmissionType::Incremental,
Boundedness::Bounded,
);
let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
Ok(Self {
table,
regions,
plan,
schema,
arrow_schema,
region_query_handler,
metric: ExecutionPlanMetricsSet::new(),
@@ -265,7 +256,7 @@ impl MergeScanExec {
let regions = self.regions.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let schema = self.schema.clone();
let arrow_schema = self.arrow_schema.clone();
let query_ctx = self.query_ctx.clone();
let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
let partition_metrics_moved = self.partition_metrics.clone();
@@ -318,9 +309,8 @@ impl MergeScanExec {
.await
.map_err(|e| {
MERGE_SCAN_ERRORS_TOTAL.inc();
BoxedError::new(e)
})
.context(ExternalSnafu)?;
DataFusionError::External(Box::new(e))
})?;
let do_get_cost = do_get_start.elapsed();
ready_timer.stop();
@@ -331,10 +321,11 @@ impl MergeScanExec {
let poll_elapsed = poll_timer.elapsed();
poll_duration += poll_elapsed;
let batch = batch?;
// reconstruct batch using `self.schema`
// to remove metadata and correct column name
let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
let batch = RecordBatch::try_new(
arrow_schema.clone(),
batch.into_df_record_batch().columns().to_vec(),
)?;
metric.record_output_batch_rows(batch.num_rows());
if let Some(mut first_consume_timer) = first_consume_timer.take() {
first_consume_timer.stop();
@@ -410,12 +401,10 @@ impl MergeScanExec {
}
}));
Ok(Box::pin(RecordBatchStreamWrapper {
schema: self.schema.clone(),
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.arrow_schema.clone(),
stream,
output_ordering: None,
metrics: Default::default(),
}))
)))
}
pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
@@ -453,7 +442,6 @@ impl MergeScanExec {
table: self.table.clone(),
regions: self.regions.clone(),
plan: self.plan.clone(),
schema: self.schema.clone(),
arrow_schema: self.arrow_schema.clone(),
region_query_handler: self.region_query_handler.clone(),
metric: self.metric.clone(),
@@ -471,11 +459,6 @@ impl MergeScanExec {
})
}
fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
Ok(Arc::new(schema))
}
pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
self.sub_stage_metrics
.lock()
@@ -614,10 +597,8 @@ impl ExecutionPlan for MergeScanExec {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<DfSendableRecordBatchStream> {
Ok(Box::pin(DfRecordBatchStreamAdapter::new(
self.to_stream(context, partition)?,
)))
) -> Result<SendableRecordBatchStream> {
self.to_stream(context, partition)
}
fn metrics(&self) -> Option<MetricsSet> {