refactor: Remove PhysicalPlan trait and use ExecutionPlan directly (#3894)

* refactor: remove PhysicalPlan

* refactor: remove physical_plan mod

* refactor: import

* fix merge error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2024-05-11 15:38:03 +08:00
committed by GitHub
parent fa6c371380
commit d0820bb26d
28 changed files with 174 additions and 567 deletions

View File

@@ -21,11 +21,11 @@ use common_config::Mode;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeStatus};
use common_meta::peer::Peer;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::warn;
use common_time::timestamp::Timestamp;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -20,9 +20,9 @@ use common_catalog::consts::{
SEMANTIC_TYPE_TIME_INDEX,
};
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_KEY_COLUMN_USAGE_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -17,9 +17,9 @@ use std::sync::Arc;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -18,10 +18,10 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::datetime::DateTime;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -19,9 +19,9 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID;
use common_error::ext::BoxedError;
use common_meta::rpc::router::RegionRoute;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -17,10 +17,10 @@ use std::sync::Arc;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_time::util::current_time_millis;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_SCHEMATA_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -17,9 +17,9 @@ use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLES_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;

View File

@@ -16,7 +16,6 @@ pub mod columnar_value;
pub mod error;
mod function;
pub mod logical_plan;
pub mod physical_plan;
pub mod prelude;
mod signature;
@@ -26,7 +25,7 @@ use std::sync::Arc;
use api::greptime_proto::v1::add_column_location::LocationType;
use api::greptime_proto::v1::AddColumnLocation as Location;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use physical_plan::PhysicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use serde::{Deserialize, Serialize};
use sqlparser_derive::{Visit, VisitMut};
@@ -49,7 +48,7 @@ pub enum OutputData {
#[derive(Debug, Default)]
pub struct OutputMeta {
/// May exist for query output. One can retrieve execution metrics from this plan.
pub plan: Option<Arc<dyn PhysicalPlan>>,
pub plan: Option<Arc<dyn ExecutionPlan>>,
pub cost: OutputCost,
}
@@ -102,11 +101,11 @@ impl Debug for OutputData {
}
impl OutputMeta {
pub fn new(plan: Option<Arc<dyn PhysicalPlan>>, cost: usize) -> Self {
pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
Self { plan, cost }
}
pub fn new_with_plan(plan: Arc<dyn PhysicalPlan>) -> Self {
pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
plan: Some(plan),
cost: 0,
@@ -118,9 +117,6 @@ impl OutputMeta {
}
}
pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;
pub type DfPhysicalPlanRef = Arc<dyn DfPhysicalPlan>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
pub enum AddColumnLocation {
First,

View File

@@ -1,384 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::Arc;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
pub use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, PlanProperties};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::DfPhysicalPlan;
pub type PhysicalPlanRef = Arc<dyn PhysicalPlan>;
/// [`PhysicalPlan`] represent nodes in the Physical Plan.
///
/// Each [`PhysicalPlan`] is partition-aware and is responsible for
/// creating the actual "async" [`SendableRecordBatchStream`]s
/// of [`RecordBatch`](common_recordbatch::RecordBatch) that incrementally
/// compute the operator's output from its input partition.
pub trait PhysicalPlan: Debug + Send + Sync {
/// Returns the physical plan as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Get the schema for this physical plan
fn schema(&self) -> SchemaRef;
/// Return properties of the output of the [PhysicalPlan], such as output
/// ordering(s), partitioning information etc.
fn properties(&self) -> &PlanProperties;
/// Get a list of child physical plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
fn children(&self) -> Vec<PhysicalPlanRef>;
/// Returns a new plan where all children were replaced by new plans.
/// The size of `children` must be equal to the size of [`PhysicalPlan::children()`].
fn with_new_children(&self, children: Vec<PhysicalPlanRef>) -> Result<PhysicalPlanRef>;
/// Creates an RecordBatch stream.
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> Option<MetricsSet> {
None
}
}
/// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`].
#[derive(Debug)]
pub struct PhysicalPlanAdapter {
schema: SchemaRef,
df_plan: Arc<dyn DfPhysicalPlan>,
metric: ExecutionPlanMetricsSet,
}
impl PhysicalPlanAdapter {
pub fn new(schema: SchemaRef, df_plan: Arc<dyn DfPhysicalPlan>) -> Self {
Self {
schema,
df_plan,
metric: ExecutionPlanMetricsSet::new(),
}
}
pub fn df_plan(&self) -> Arc<dyn DfPhysicalPlan> {
self.df_plan.clone()
}
}
impl PhysicalPlan for PhysicalPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
self.df_plan.properties()
}
fn children(&self) -> Vec<PhysicalPlanRef> {
self.df_plan
.children()
.into_iter()
.map(|x| Arc::new(PhysicalPlanAdapter::new(self.schema(), x)) as _)
.collect()
}
fn with_new_children(&self, children: Vec<PhysicalPlanRef>) -> Result<PhysicalPlanRef> {
let children = children
.into_iter()
.map(|x| Arc::new(DfPhysicalPlanAdapter(x)) as _)
.collect();
let plan = self
.df_plan
.clone()
.with_new_children(children)
.context(error::GeneralDataFusionSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(self.schema(), plan)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let df_plan = self.df_plan.clone();
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics_and_df_plan(
stream,
baseline_metric,
df_plan,
)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
Ok(Box::pin(adapter))
}
fn metrics(&self) -> Option<MetricsSet> {
self.df_plan.metrics()
}
}
#[derive(Debug)]
pub struct DfPhysicalPlanAdapter(pub PhysicalPlanRef);
impl DfPhysicalPlan for DfPhysicalPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> DfSchemaRef {
self.0.schema().arrow_schema().clone()
}
fn children(&self) -> Vec<Arc<dyn DfPhysicalPlan>> {
self.0
.children()
.into_iter()
.map(|x| Arc::new(DfPhysicalPlanAdapter(x)) as _)
.collect()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn DfPhysicalPlan>>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
let df_schema = self.schema();
let schema: SchemaRef = Arc::new(
df_schema
.try_into()
.context(error::ConvertArrowSchemaSnafu)?,
);
let children = children
.into_iter()
.map(|x| Arc::new(PhysicalPlanAdapter::new(schema.clone(), x)) as _)
.collect();
let plan = self.0.with_new_children(children)?;
Ok(Arc::new(DfPhysicalPlanAdapter(plan)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let stream = self.0.execute(partition, context)?;
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
}
fn metrics(&self) -> Option<MetricsSet> {
self.0.metrics()
}
fn properties(&self) -> &PlanProperties {
self.0.properties()
}
}
impl DisplayAs for DfPhysicalPlanAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}
#[cfg(test)]
mod test {
use async_trait::async_trait;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::datasource::{DefaultTableSource, TableProvider as DfTableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{collect, ExecutionMode};
use datafusion_expr::logical_plan::builder::LogicalPlanBuilder;
use datafusion_expr::{Expr, TableSource};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow::util::pretty;
use datatypes::schema::Schema;
use datatypes::vectors::Int32Vector;
use super::*;
struct MyDfTableProvider;
#[async_trait]
impl DfTableProvider for MyDfTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> DfSchemaRef {
Arc::new(ArrowSchema::new(vec![Field::new(
"a",
DataType::Int32,
false,
)]))
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_ctx: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
let schema = Arc::new(Schema::try_from(self.schema()).unwrap());
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.arrow_schema().clone()),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);
let my_plan = Arc::new(MyExecutionPlan { schema, properties });
let df_plan = DfPhysicalPlanAdapter(my_plan);
Ok(Arc::new(df_plan))
}
}
impl MyDfTableProvider {
fn table_source() -> Arc<dyn TableSource> {
Arc::new(DefaultTableSource {
table_provider: Arc::new(Self),
})
}
}
#[derive(Debug, Clone)]
struct MyExecutionPlan {
schema: SchemaRef,
properties: PlanProperties,
}
impl PhysicalPlan for MyExecutionPlan {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<PhysicalPlanRef> {
vec![]
}
fn with_new_children(&self, _children: Vec<PhysicalPlanRef>) -> Result<PhysicalPlanRef> {
Ok(Arc::new(self.clone()))
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let recordbatches = RecordBatches::try_new(
schema.clone(),
vec![
RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice(vec![1])) as _],
)
.unwrap(),
RecordBatch::new(
schema,
vec![Arc::new(Int32Vector::from_slice(vec![2, 3])) as _],
)
.unwrap(),
],
)
.unwrap();
Ok(recordbatches.as_stream())
}
}
// Test our physical plan can be executed by DataFusion, through adapters.
#[tokio::test]
async fn test_execute_physical_plan() {
let ctx = SessionContext::new();
let logical_plan =
LogicalPlanBuilder::scan("test", MyDfTableProvider::table_source(), None)
.unwrap()
.build()
.unwrap();
let physical_plan = ctx
.state()
.create_physical_plan(&logical_plan)
.await
.unwrap();
let df_recordbatches = collect(physical_plan, Arc::new(TaskContext::from(&ctx)))
.await
.unwrap();
let pretty_print = pretty::pretty_format_batches(&df_recordbatches).unwrap();
assert_eq!(
pretty_print.to_string(),
r#"+---+
| a |
+---+
| 1 |
| 2 |
| 3 |
+---+"#
);
}
#[test]
fn test_physical_plan_adapter() {
let df_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"name",
DataType::Utf8,
true,
)]));
let plan = PhysicalPlanAdapter::new(
Arc::new(Schema::try_from(df_schema.clone()).unwrap()),
Arc::new(EmptyExec::new(df_schema.clone())),
);
let _ = plan.df_plan.as_any().downcast_ref::<EmptyExec>().unwrap();
let df_plan = DfPhysicalPlanAdapter(Arc::new(plan));
assert_eq!(df_schema, df_plan.schema());
}
}

View File

@@ -20,7 +20,6 @@ use common_datasource::file_format::orc::{OrcFormat, OrcOpener};
use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat};
use common_datasource::file_format::Format;
use common_query::prelude::Expr;
use common_query::DfPhysicalPlan;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::{Statistics, ToDFSchema};
@@ -31,6 +30,7 @@ use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStre
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_expr::utils::conjunction;
use datatypes::arrow::datatypes::Schema as ArrowSchema;

View File

@@ -21,7 +21,6 @@ use std::sync::Arc;
use arrow::array::{StringBuilder, UInt32Builder};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_query::{DfPhysicalPlan, DfPhysicalPlanRef};
use common_recordbatch::adapter::{MetricCollector, RecordBatchMetrics};
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
use datafusion::error::Result as DfResult;
@@ -29,7 +28,7 @@ use datafusion::execution::TaskContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
accept, DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties,
accept, DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
};
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{internal_err, DataFusionError};
@@ -44,14 +43,14 @@ const PLAN: &str = "plan";
#[derive(Debug)]
pub struct DistAnalyzeExec {
input: DfPhysicalPlanRef,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
properties: PlanProperties,
}
impl DistAnalyzeExec {
/// Create a new DistAnalyzeExec
pub fn new(input: DfPhysicalPlanRef) -> Self {
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
let schema = SchemaRef::new(Schema::new(vec![
Field::new(STAGE, DataType::UInt32, true),
Field::new(NODE, DataType::UInt32, true),
@@ -66,7 +65,7 @@ impl DistAnalyzeExec {
}
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(input: &DfPhysicalPlanRef, schema: SchemaRef) -> PlanProperties {
fn compute_properties(input: &Arc<dyn ExecutionPlan>, schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
let output_partitioning = Partitioning::UnknownPartitioning(1);
let exec_mode = input.execution_mode();
@@ -84,7 +83,7 @@ impl DisplayAs for DistAnalyzeExec {
}
}
impl DfPhysicalPlan for DistAnalyzeExec {
impl ExecutionPlan for DistAnalyzeExec {
fn name(&self) -> &'static str {
"DistAnalyzeExec"
}
@@ -98,7 +97,7 @@ impl DfPhysicalPlan for DistAnalyzeExec {
&self.properties
}
fn children(&self) -> Vec<DfPhysicalPlanRef> {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
@@ -109,8 +108,8 @@ impl DfPhysicalPlan for DistAnalyzeExec {
fn with_new_children(
self: Arc<Self>,
mut children: Vec<DfPhysicalPlanRef>,
) -> DfResult<DfPhysicalPlanRef> {
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(Self::new(children.pop().unwrap())))
}
@@ -196,7 +195,7 @@ impl AnalyzeOutputBuilder {
/// Creates the output of AnalyzeExec as a RecordBatch
fn create_output_batch(
total_rows: usize,
input: DfPhysicalPlanRef,
input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
) -> DfResult<DfRecordBatch> {
let mut builder = AnalyzeOutputBuilder::new(schema);

View File

@@ -26,7 +26,6 @@ use common_base::Plugins;
use common_error::ext::BoxedError;
use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan, PhysicalPlanAdapter};
use common_query::prelude::ScalarUdf;
use common_query::{Output, OutputData, OutputMeta};
use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -38,6 +37,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
use datatypes::schema::Schema;
use futures_util::StreamExt;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
@@ -352,7 +352,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>> {
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer();
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
@@ -364,17 +364,7 @@ impl PhysicalPlanner for DatafusionQueryEngine {
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(
physical_plan
.schema()
.try_into()
.context(error::ConvertSchemaSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?,
),
physical_plan,
)))
Ok(physical_plan)
}
}
}
@@ -385,44 +375,33 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>> {
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer();
let state = ctx.state();
let config = state.config_options();
let df_plan = plan
.as_any()
.downcast_ref::<PhysicalPlanAdapter>()
.context(error::PhysicalPlanDowncastSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
.df_plan();
// skip optimize AnalyzeExec plan
let optimized_plan =
if let Some(analyze_plan) = df_plan.as_any().downcast_ref::<AnalyzeExec>() {
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = df_plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};
let optimized_plan = if let Some(analyze_plan) = plan.as_any().downcast_ref::<AnalyzeExec>()
{
let mut new_plan = analyze_plan.input().clone();
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
Arc::new(DistAnalyzeExec::new(new_plan))
} else {
let mut new_plan = plan;
for optimizer in state.physical_optimizers() {
new_plan = optimizer
.optimize(new_plan, config)
.context(DataFusionSnafu)?;
}
new_plan
};
Ok(Arc::new(PhysicalPlanAdapter::new(
plan.schema(),
optimized_plan,
)))
Ok(optimized_plan)
}
}
@@ -431,30 +410,21 @@ impl QueryExecutor for DatafusionQueryEngine {
fn execute_stream(
&self,
ctx: &QueryEngineContext,
plan: &Arc<dyn PhysicalPlan>,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream> {
let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer();
let task_ctx = ctx.build_task_ctx();
match plan.properties().output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => {
let stream = plan
.execute(0, task_ctx)
.context(error::ExecutePhysicalPlanSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let stream = OnDone::new(stream, move || {
exec_timer.observe_duration();
});
Ok(Box::pin(stream))
0 => {
let schema = Arc::new(
Schema::try_from(plan.schema())
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?,
);
Ok(Box::pin(EmptyRecordBatchStream::new(schema)))
}
_ => {
let df_plan = Arc::new(DfPhysicalPlanAdapter(plan.clone()));
// merge into a single partition
let plan = CoalescePartitionsExec::new(df_plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.properties().output_partitioning().partition_count());
1 => {
let df_stream = plan
.execute(0, task_ctx)
.context(error::DatafusionSnafu)
@@ -464,7 +434,33 @@ impl QueryExecutor for DatafusionQueryEngine {
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
stream.set_metrics2(df_plan);
stream.set_metrics2(plan.clone());
let stream = OnDone::new(Box::pin(stream), move || {
exec_timer.observe_duration();
});
Ok(Box::pin(stream))
}
_ => {
// merge into a single partition
let merged_plan = CoalescePartitionsExec::new(plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(
1,
merged_plan
.properties()
.output_partitioning()
.partition_count()
);
let df_stream = merged_plan
.execute(0, task_ctx)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
stream.set_metrics2(plan.clone());
let stream = OnDone::new(Box::pin(stream), move || {
exec_timer.observe_duration();
});

View File

@@ -52,13 +52,6 @@ pub enum InnerError {
location: Location,
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to execute physical plan"))]
ExecutePhysicalPlan {
#[snafu(implicit)]
location: Location,
source: common_query::error::Error,
},
}
impl ErrorExt for InnerError {
@@ -70,7 +63,6 @@ impl ErrorExt for InnerError {
Datafusion { .. } => StatusCode::EngineExecuteQuery,
PhysicalPlanDowncast { .. } | ConvertSchema { .. } => StatusCode::Unexpected,
ConvertDfRecordBatchStream { source, .. } => source.status_code(),
ExecutePhysicalPlan { source, .. } => source.status_code(),
}
}

View File

@@ -23,13 +23,13 @@ use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_plugins::GREPTIME_EXEC_READ_COST;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
};

View File

@@ -18,13 +18,12 @@ use std::any::Any;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::DfPhysicalPlan;
use common_recordbatch::OrderOption;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
@@ -157,7 +156,7 @@ impl TableProvider for DummyTableProvider {
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn DfPhysicalPlan>> {
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let mut request = self.scan_request.lock().unwrap().clone();
request.projection = match projection {
Some(x) if !x.is_empty() => Some(x.clone()),
@@ -174,9 +173,7 @@ impl TableProvider for DummyTableProvider {
.handle_query(self.region_id, request)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new(
StreamScanAdapter::new(stream),
))))
Ok(Arc::new(StreamScanAdapter::new(stream)))
}
fn supports_filters_pushdown(

View File

@@ -14,8 +14,8 @@
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::physical_plan::ExecutionPlan;
use crate::error::Result;
use crate::query_engine::QueryEngineContext;
@@ -25,6 +25,6 @@ pub trait QueryExecutor {
fn execute_stream(
&self,
ctx: &QueryEngineContext,
plan: &Arc<dyn PhysicalPlan>,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<SendableRecordBatchStream>;
}

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::DfPhysicalPlanRef;
use std::sync::Arc;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result as DfResult;
@@ -30,9 +32,9 @@ pub struct RemoveDuplicate;
impl PhysicalOptimizerRule for RemoveDuplicate {
fn optimize(
&self,
plan: DfPhysicalPlanRef,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> DfResult<DfPhysicalPlanRef> {
) -> DfResult<Arc<dyn ExecutionPlan>> {
Self::do_optimize(plan)
}
@@ -46,7 +48,7 @@ impl PhysicalOptimizerRule for RemoveDuplicate {
}
impl RemoveDuplicate {
fn do_optimize(plan: DfPhysicalPlanRef) -> DfResult<DfPhysicalPlanRef> {
fn do_optimize(plan: Arc<dyn ExecutionPlan>) -> DfResult<Arc<dyn ExecutionPlan>> {
let result = plan
.transform_down_mut(&mut |plan| {
if plan.as_any().is::<CoalesceBatchesExec>()

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use crate::error::Result;
use crate::query_engine::QueryEngineContext;
@@ -23,6 +23,6 @@ pub trait PhysicalOptimizer {
fn optimize_physical_plan(
&self,
ctx: &mut QueryEngineContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>>;
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>>;
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use crate::error::Result;
use crate::plan::LogicalPlan;
@@ -29,5 +29,5 @@ pub trait PhysicalPlanner {
&self,
ctx: &mut QueryEngineContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>>;
) -> Result<Arc<dyn ExecutionPlan>>;
}

View File

@@ -14,13 +14,13 @@
use std::sync::Arc;
use common_query::physical_plan::PhysicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use session::context::QueryContextRef;
/// wrap physical plan with additional layer
/// e.g: metrics retrieving layer upon physical plan
pub trait PhysicalPlanWrapper: Send + Sync + 'static {
fn wrap(&self, origin: Arc<dyn PhysicalPlan>, ctx: QueryContextRef) -> Arc<dyn PhysicalPlan>;
fn wrap(&self, origin: Arc<dyn ExecutionPlan>, ctx: QueryContextRef) -> Arc<dyn ExecutionPlan>;
}
pub type PhysicalPlanWrapperRef = Arc<dyn PhysicalPlanWrapper>;

View File

@@ -23,12 +23,11 @@ use common_function::function::FunctionRef;
use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::state::FunctionState;
use common_query::physical_plan::SessionContext;
use common_query::prelude::ScalarUdf;
use common_telemetry::warn;
use datafusion::dataframe::DataFrame;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionContext, SessionState};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion::physical_plan::ExecutionPlan;

View File

@@ -25,11 +25,11 @@ use std::time::Duration;
use ahash::RandomState;
use arrow::compute::{self, cast_with_options, CastOptions, SortColumn};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions, TimeUnit};
use common_query::DfPhysicalPlan;
use common_recordbatch::DfSendableRecordBatchStream;
use datafusion::common::{Result as DataFusionResult, Statistics};
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::udaf::create_aggregate_expr as create_aggr_udf_expr;
use datafusion::physical_plan::{
@@ -930,14 +930,14 @@ impl ExecutionPlan for RangeSelectExec {
&self.cache
}
fn children(&self) -> Vec<Arc<dyn DfPhysicalPlan>> {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.input.clone()]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn DfPhysicalPlan>>,
) -> datafusion_common::Result<Arc<dyn DfPhysicalPlan>> {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
assert!(!children.is_empty());
Ok(Arc::new(Self {
input: children[0].clone(),
@@ -958,7 +958,7 @@ impl ExecutionPlan for RangeSelectExec {
fn execute(
&self,
partition: usize,
context: Arc<common_query::physical_plan::TaskContext>,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let baseline_metric = BaselineMetrics::new(&self.metric, partition);
let input = self.input.execute(partition, context)?;

View File

@@ -16,8 +16,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_plugins::GREPTIME_EXEC_PREFIX;
use common_query::physical_plan::PhysicalPlan;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use headers::{Header, HeaderName, HeaderValue};
use hyper::HeaderMap;
use serde_json::Value;
@@ -126,7 +126,7 @@ fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap<String, u6
}
}
pub fn collect_plan_metrics(plan: Arc<dyn PhysicalPlan>, maps: &mut [&mut HashMap<String, u64>]) {
pub fn collect_plan_metrics(plan: Arc<dyn ExecutionPlan>, maps: &mut [&mut HashMap<String, u64>]) {
if let Some(m) = plan.metrics() {
m.iter().for_each(|m| match m.value() {
MetricValue::Count { name, count } => {

View File

@@ -16,20 +16,19 @@ use std::any::Any;
use std::sync::{Arc, Mutex};
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::DfPhysicalPlan;
use common_recordbatch::OrderOption;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::datasource::{TableProvider, TableType as DfTableType};
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use store_api::storage::ScanRequest;
use super::scan::StreamScanAdapter;
use crate::table::scan::StreamScanAdapter;
use crate::table::{TableRef, TableType};
/// Adapt greptime's [TableRef] to DataFusion's [TableProvider].
@@ -84,7 +83,7 @@ impl TableProvider for DfTableProviderAdapter {
projection: Option<&Vec<usize>>,
filters: &[DfExpr],
limit: Option<usize>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
) -> DfResult<Arc<dyn ExecutionPlan>> {
let filters: Vec<Expr> = filters.iter().map(Clone::clone).map(Into::into).collect();
let request = {
let mut request = self.scan_req.lock().unwrap();
@@ -115,7 +114,7 @@ impl TableProvider for DfTableProviderAdapter {
if let Some(sort_expr) = sort_expr {
stream_adapter = stream_adapter.with_output_ordering(sort_expr);
}
Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new(stream_adapter))))
Ok(Arc::new(stream_adapter))
}
fn supports_filters_pushdown(

View File

@@ -13,30 +13,32 @@
// limitations under the License.
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_query::error::ExecuteRepeatedlySnafu;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{ExecutionMode, PlanProperties};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
RecordBatchStream as DfRecordBatchStream,
};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
use crate::table::metrics::MemoryUsageMetrics;
/// Adapt greptime's [SendableRecordBatchStream] to GreptimeDB's [PhysicalPlan].
/// Adapt greptime's [SendableRecordBatchStream] to [ExecutionPlan].
pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
@@ -76,39 +78,40 @@ impl StreamScanAdapter {
}
}
impl PhysicalPlan for StreamScanAdapter {
impl ExecutionPlan for StreamScanAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
fn schema(&self) -> ArrowSchemaRef {
self.schema.arrow_schema().clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<PhysicalPlanRef> {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(&self, _children: Vec<PhysicalPlanRef>) -> QueryResult<PhysicalPlanRef> {
Ok(Arc::new(Self::new(
self.stream.lock().unwrap().take().unwrap(),
)))
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> QueryResult<SendableRecordBatchStream> {
) -> DfResult<DfSendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));
let mut stream = self.stream.lock().unwrap();
let stream = stream.take().context(query_error::ExecuteRepeatedlySnafu)?;
let stream = stream.take().context(ExecuteRepeatedlySnafu)?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
@@ -122,6 +125,12 @@ impl PhysicalPlan for StreamScanAdapter {
}
}
impl DisplayAs for StreamScanAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
}
}
pub struct StreamWithMetricWrapper {
stream: SendableRecordBatchStream,
metric: MemoryUsageMetrics,
@@ -129,49 +138,51 @@ pub struct StreamWithMetricWrapper {
}
impl Stream for StreamWithMetricWrapper {
type Item = RecordBatchResult<RecordBatch>;
type Item = DfResult<DfRecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let _enter = this.span.enter();
let poll = this.stream.poll_next_unpin(cx);
if let Poll::Ready(Some(Ok(record_batch))) = &poll {
let batch_mem_size = record_batch
.columns()
.iter()
.map(|vec_ref| vec_ref.memory_size())
.sum::<usize>();
// we don't record elapsed time here
// since it's calling storage api involving I/O ops
this.metric.record_mem_usage(batch_mem_size);
this.metric.record_output(record_batch.num_rows());
match this.stream.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => match result {
Ok(record_batch) => {
let batch_mem_size = record_batch
.columns()
.iter()
.map(|vec_ref| vec_ref.memory_size())
.sum::<usize>();
// we don't record elapsed time here
// since it's calling storage api involving I/O ops
this.metric.record_mem_usage(batch_mem_size);
this.metric.record_output(record_batch.num_rows());
Poll::Ready(Some(Ok(record_batch.into_df_record_batch())))
}
Err(e) => Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))),
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
poll
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.stream.metrics()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
impl DfRecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> ArrowSchemaRef {
self.stream.schema().arrow_schema().clone()
}
}
#[cfg(test)]
mod test {
use common_recordbatch::{util, RecordBatch, RecordBatches};
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures::TryStreamExt;
use super::*;
@@ -211,9 +222,9 @@ mod test {
assert_eq!(actual, schema);
let stream = scan.execute(0, ctx.task_ctx()).unwrap();
let recordbatches = util::collect(stream).await.unwrap();
assert_eq!(recordbatches[0], batch1);
assert_eq!(recordbatches[1], batch2);
let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batch1.df_record_batch(), &recordbatches[0]);
assert_eq!(batch2.df_record_batch(), &recordbatches[1]);
let result = scan.execute(0, ctx.task_ctx());
assert!(result.is_err());