feat: Adds RegionScanner trait (#3948)

* feat: define region scanner

* feat: single partition scanner

* feat: use single partition scanner

* feat: implement ExecutionPlan wip

* feat: mito engine returns single partition scanner

* feat: implement DisplayAs for region server

* feat: dummy table provider use handle_partitioned_query()

* test: update sqlness test

* feat: table provider use ReadFromRegion

* refactor: remove StreamScanAdapter

* chore: update lock

* style: fix clippy

* refactor: remove handle_query from the RegionEngine trait

* chore: address CR comments

* refactor: rename methods

* refactor: rename ReadFromRegion to RegionScanExec
This commit is contained in:
Yingwen
2024-05-20 19:52:00 +08:00
committed by GitHub
parent 19543f9819
commit 179c8c716c
32 changed files with 371 additions and 209 deletions

View File

@@ -26,9 +26,10 @@ use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use store_api::region_engine::SinglePartitionScanner;
use store_api::storage::ScanRequest;
use crate::table::scan::StreamScanAdapter;
use crate::table::scan::RegionScanExec;
use crate::table::{TableRef, TableType};
/// Adapt greptime's [TableRef] to DataFusion's [TableProvider].
@@ -110,11 +111,12 @@ impl TableProvider for DfTableProviderAdapter {
.collect::<Vec<_>>()
});
let mut stream_adapter = StreamScanAdapter::new(stream);
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let mut plan = RegionScanExec::new(scanner);
if let Some(sort_expr) = sort_expr {
stream_adapter = stream_adapter.with_output_ordering(sort_expr);
plan = plan.with_output_ordering(sort_expr);
}
Ok(Arc::new(stream_adapter))
Ok(Arc::new(plan))
}
fn supports_filters_pushdown(

View File

@@ -13,12 +13,10 @@
// limitations under the License.
use std::any::Any;
use std::fmt::{self, Debug, Formatter};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll};
use common_query::error::ExecuteRepeatedlySnafu;
use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
@@ -32,59 +30,54 @@ use datafusion::physical_plan::{
use datafusion_common::DataFusionError;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::OptionExt;
use store_api::region_engine::RegionScannerRef;
use crate::table::metrics::MemoryUsageMetrics;
/// Adapt greptime's [SendableRecordBatchStream] to [ExecutionPlan].
pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct RegionScanExec {
scanner: RegionScannerRef,
arrow_schema: ArrowSchemaRef,
/// The expected output ordering for the plan.
output_ordering: Option<Vec<PhysicalSortExpr>>,
metric: ExecutionPlanMetricsSet,
properties: PlanProperties,
}
impl Debug for StreamScanAdapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamScanAdapter")
.field("stream", &"<SendableRecordBatchStream>")
.finish()
}
}
impl StreamScanAdapter {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
impl RegionScanExec {
pub fn new(scanner: RegionScannerRef) -> Self {
let arrow_schema = scanner.schema().arrow_schema().clone();
let scanner_props = scanner.properties();
let properties = PlanProperties::new(
EquivalenceProperties::new(schema.arrow_schema().clone()),
Partitioning::UnknownPartitioning(1),
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(scanner_props.partitioning().num_partitions()),
ExecutionMode::Bounded,
);
Self {
stream: Mutex::new(Some(stream)),
schema,
scanner,
arrow_schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
properties,
}
}
/// Set the expected output ordering for the plan.
pub fn with_output_ordering(mut self, output_ordering: Vec<PhysicalSortExpr>) -> Self {
self.output_ordering = Some(output_ordering);
self
}
}
impl ExecutionPlan for StreamScanAdapter {
impl ExecutionPlan for RegionScanExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> ArrowSchemaRef {
self.schema.arrow_schema().clone()
self.arrow_schema.clone()
}
fn properties(&self) -> &PlanProperties {
@@ -98,7 +91,7 @@ impl ExecutionPlan for StreamScanAdapter {
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
@@ -106,12 +99,15 @@ impl ExecutionPlan for StreamScanAdapter {
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
) -> datafusion_common::Result<DfSendableRecordBatchStream> {
let tracing_context = TracingContext::from_json(context.session_id().as_str());
let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter"));
let span =
tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region"));
let mut stream = self.stream.lock().unwrap();
let stream = stream.take().context(ExecuteRepeatedlySnafu)?;
let stream = self
.scanner
.scan_partition(partition)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition);
Ok(Box::pin(StreamWithMetricWrapper {
stream,
@@ -125,9 +121,10 @@ impl ExecutionPlan for StreamScanAdapter {
}
}
impl DisplayAs for StreamScanAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
impl DisplayAs for RegionScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// The scanner contains all information needed to display the plan.
write!(f, "{:?}", self.scanner)
}
}
@@ -177,12 +174,15 @@ impl DfRecordBatchStream for StreamWithMetricWrapper {
#[cfg(test)]
mod test {
use std::sync::Arc;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use futures::TryStreamExt;
use store_api::region_engine::SinglePartitionScanner;
use super::*;
@@ -210,9 +210,10 @@ mod test {
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();
let scan = StreamScanAdapter::new(stream);
let scanner = Arc::new(SinglePartitionScanner::new(stream));
let plan = RegionScanExec::new(scanner);
let actual: SchemaRef = Arc::new(
scan.properties
plan.properties
.eq_properties
.schema()
.clone()
@@ -221,12 +222,12 @@ mod test {
);
assert_eq!(actual, schema);
let stream = scan.execute(0, ctx.task_ctx()).unwrap();
let stream = plan.execute(0, ctx.task_ctx()).unwrap();
let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(batch1.df_record_batch(), &recordbatches[0]);
assert_eq!(batch2.df_record_batch(), &recordbatches[1]);
let result = scan.execute(0, ctx.task_ctx());
let result = plan.execute(0, ctx.task_ctx());
assert!(result.is_err());
match result {
Err(e) => assert!(e