diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index aebf9a457d..a4dd5243a7 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -20,40 +20,35 @@ use ahash::{HashMap, HashSet}; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions}; use async_stream::stream; use common_catalog::parse_catalog_and_schema_from_db_string; -use common_error::ext::BoxedError; use common_plugins::GREPTIME_EXEC_READ_COST; use common_query::request::QueryRequest; -use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics}; -use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{ - DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, -}; +use common_recordbatch::adapter::RecordBatchMetrics; use common_telemetry::tracing_context::TracingContext; +use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::{SessionState, TaskContext}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time, }; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, }; -use datafusion_common::{Column as ColumnExpr, Result}; +use datafusion_common::{Column as ColumnExpr, DataFusionError, Result}; use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr}; -use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; -use snafu::ResultExt; use store_api::storage::RegionId; use table::table_name::TableName; use tokio::time::Instant; use crate::dist_plan::analyzer::AliasMapping; -use crate::error::ConvertSchemaSnafu; use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; use crate::region_query::RegionQueryHandlerRef; @@ -140,7 +135,6 @@ pub struct MergeScanExec { table: TableName, regions: Vec, plan: LogicalPlan, - schema: SchemaRef, arrow_schema: ArrowSchemaRef, region_query_handler: RegionQueryHandlerRef, metric: ExecutionPlanMetricsSet, @@ -159,7 +153,6 @@ impl std::fmt::Debug for MergeScanExec { f.debug_struct("MergeScanExec") .field("table", &self.table) .field("regions", &self.regions) - .field("schema", &self.schema) .field("plan", &self.plan) .finish() } @@ -238,12 +231,10 @@ impl MergeScanExec { EmissionType::Incremental, Boundedness::Bounded, ); - let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?; Ok(Self { table, regions, plan, - schema, arrow_schema, region_query_handler, metric: ExecutionPlanMetricsSet::new(), @@ -265,7 +256,7 @@ impl MergeScanExec { let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); - let schema = self.schema.clone(); + let arrow_schema = self.arrow_schema.clone(); let query_ctx = self.query_ctx.clone(); let sub_stage_metrics_moved = self.sub_stage_metrics.clone(); let partition_metrics_moved = self.partition_metrics.clone(); @@ -318,9 +309,8 @@ impl MergeScanExec { .await .map_err(|e| { MERGE_SCAN_ERRORS_TOTAL.inc(); - BoxedError::new(e) - }) - .context(ExternalSnafu)?; + DataFusionError::External(Box::new(e)) + })?; let do_get_cost = do_get_start.elapsed(); ready_timer.stop(); @@ -331,10 +321,11 @@ impl MergeScanExec { let poll_elapsed = poll_timer.elapsed(); poll_duration += poll_elapsed; - let batch = batch?; - // reconstruct batch using `self.schema` - // to remove metadata and correct column name - let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?; + let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?; + let batch = RecordBatch::try_new( + arrow_schema.clone(), + batch.into_df_record_batch().columns().to_vec(), + )?; metric.record_output_batch_rows(batch.num_rows()); if let Some(mut first_consume_timer) = first_consume_timer.take() { first_consume_timer.stop(); @@ -410,12 +401,10 @@ impl MergeScanExec { } })); - Ok(Box::pin(RecordBatchStreamWrapper { - schema: self.schema.clone(), + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.arrow_schema.clone(), stream, - output_ordering: None, - metrics: Default::default(), - })) + ))) } pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option { @@ -453,7 +442,6 @@ impl MergeScanExec { table: self.table.clone(), regions: self.regions.clone(), plan: self.plan.clone(), - schema: self.schema.clone(), arrow_schema: self.arrow_schema.clone(), region_query_handler: self.region_query_handler.clone(), metric: self.metric.clone(), @@ -471,11 +459,6 @@ impl MergeScanExec { }) } - fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result { - let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?; - Ok(Arc::new(schema)) - } - pub fn sub_stage_metrics(&self) -> Vec { self.sub_stage_metrics .lock() @@ -614,10 +597,8 @@ impl ExecutionPlan for MergeScanExec { &self, partition: usize, context: Arc, - ) -> Result { - Ok(Box::pin(DfRecordBatchStreamAdapter::new( - self.to_stream(context, partition)?, - ))) + ) -> Result { + self.to_stream(context, partition) } fn metrics(&self) -> Option {