diff --git a/Cargo.lock b/Cargo.lock index f23abf83b2..2ab1c5274c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1233,6 +1233,7 @@ dependencies = [ "session", "snafu", "storage", + "store-api", "table", "tokio", ] @@ -2359,7 +2360,7 @@ dependencies = [ [[package]] name = "datafusion" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "ahash 0.8.3", "arrow", @@ -2408,7 +2409,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "arrow-array", @@ -2422,7 +2423,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "dashmap", "datafusion-common", @@ -2439,7 +2440,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "ahash 0.8.3", "arrow", @@ -2450,7 +2451,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "async-trait", @@ -2467,7 +2468,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "ahash 0.8.3", "arrow", @@ -2498,7 +2499,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "datafusion-common", @@ -2509,7 +2510,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "arrow", "arrow-schema", @@ -2522,7 +2523,7 @@ dependencies = [ [[package]] name = "datafusion-substrait" version = "22.0.0" -source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793#b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" +source = "git+https://github.com/waynexia/arrow-datafusion.git?rev=5337c86120de8193406b59be7612484796a46294#5337c86120de8193406b59be7612484796a46294" dependencies = [ "async-recursion", "chrono", @@ -5220,7 +5221,7 @@ dependencies = [ "num_cpus", "once_cell", "parking_lot", - "quanta 0.11.0", + "quanta 0.11.1", "rustc_version 0.4.0", "scheduled-thread-pool", "skeptic", @@ -6700,9 +6701,9 @@ dependencies = [ [[package]] name = "quanta" -version = "0.11.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cc73c42f9314c4bdce450c77e6f09ecbddefbeddb1b5979ded332a3913ded33" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" dependencies = [ "crossbeam-utils", "libc", @@ -6763,6 +6764,7 @@ dependencies = [ "sql", "statrs", "stats-cli", + "store-api", "streaming-stats", "table", "tokio", @@ -8654,6 +8656,7 @@ dependencies = [ "common-base", "common-error", "common-query", + "common-recordbatch", "common-time", "datatypes", "derive_builder 0.11.2", diff --git a/Cargo.toml b/Cargo.toml index 1cd431c9c5..021e641380 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,13 +62,13 @@ async-stream = "0.3" async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } # TODO(ruihang): use arrow-datafusion when it contains https://github.com/apache/arrow-datafusion/pull/6032 -datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } -datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "b14f7a9ffe91257fc3d2a5d654f2a1a14a8fc793" } +datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } +datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "5337c86120de8193406b59be7612484796a46294" } futures = "0.3" futures-util = "0.3" parquet = "37.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 115b4f8c18..d6dbf30f59 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -37,6 +37,7 @@ serde_json = "1.0" session = { path = "../session" } snafu = { version = "0.7", features = ["backtraces"] } storage = { path = "../storage" } +store-api = { path = "../store-api" } table = { path = "../table" } tokio.workspace = true diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 9e5fa135fa..95f253da1d 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -16,16 +16,21 @@ mod columns; mod tables; use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; -use datafusion::datasource::streaming::{PartitionStream, StreamingTable}; -use snafu::ResultExt; -use table::table::adapter::TableAdapter; -use table::TableRef; +use common_query::physical_plan::PhysicalPlanRef; +use common_query::prelude::Expr; +use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use datatypes::schema::SchemaRef; +use futures_util::StreamExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::ScanRequest; +use table::error::{DuplicatedExecuteCallSnafu, SchemaConversionSnafu}; +use table::{Result as TableResult, Table, TableRef}; use self::columns::InformationSchemaColumns; -use crate::error::{DatafusionSnafu, Result, TableSchemaMismatchSnafu}; +use crate::error::Result; use crate::information_schema::tables::InformationSchemaTables; use crate::{CatalogProviderRef, SchemaProvider}; @@ -59,40 +64,23 @@ impl SchemaProvider for InformationSchemaProvider { } async fn table(&self, name: &str) -> Result> { - let table = match name.to_ascii_lowercase().as_ref() { - TABLES => { - let inner = Arc::new(InformationSchemaTables::new( - self.catalog_name.clone(), - self.catalog_provider.clone(), - )); - Arc::new( - StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context( - |_| DatafusionSnafu { - msg: format!("Failed to get InformationSchema table '{name}'"), - }, - )?, - ) - } - COLUMNS => { - let inner = Arc::new(InformationSchemaColumns::new( - self.catalog_name.clone(), - self.catalog_provider.clone(), - )); - Arc::new( - StreamingTable::try_new(inner.schema().clone(), vec![inner]).with_context( - |_| DatafusionSnafu { - msg: format!("Failed to get InformationSchema table '{name}'"), - }, - )?, - ) - } + let stream = match name.to_ascii_lowercase().as_ref() { + TABLES => InformationSchemaTables::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + ) + .to_stream()?, + COLUMNS => InformationSchemaColumns::new( + self.catalog_name.clone(), + self.catalog_provider.clone(), + ) + .to_stream()?, _ => { return Ok(None); } }; - let table = TableAdapter::new(table).context(TableSchemaMismatchSnafu)?; - Ok(Some(Arc::new(table))) + Ok(Some(Arc::new(InformationTable::new(stream)))) } async fn table_exist(&self, name: &str) -> Result { @@ -100,3 +88,82 @@ impl SchemaProvider for InformationSchemaProvider { Ok(self.tables.contains(&normalized_name)) } } + +pub struct InformationTable { + schema: SchemaRef, + stream: Arc>>, +} + +impl InformationTable { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema(); + Self { + schema, + stream: Arc::new(Mutex::new(Some(stream))), + } + } +} + +#[async_trait] +impl Table for InformationTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_info(&self) -> table::metadata::TableInfoRef { + unreachable!("Should not call table_info() of InformationTable directly") + } + + /// Scan the table and returns a SendableRecordBatchStream. + async fn scan( + &self, + _projection: Option<&Vec>, + _filters: &[Expr], + // limit can be used to reduce the amount scanned + // from the datasource as a performance optimization. + // If set, it contains the amount of rows needed by the `LogicalPlan`, + // The datasource should return *at least* this number of rows if available. + _limit: Option, + ) -> TableResult { + unimplemented!() + } + + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + let projection = request.projection; + let projected_schema = if let Some(projection) = &projection { + Arc::new( + self.schema() + .try_project(projection) + .context(SchemaConversionSnafu)?, + ) + } else { + self.schema() + }; + let stream = self + .stream + .lock() + .unwrap() + .take() + .with_context(|| DuplicatedExecuteCallSnafu { + table: self.table_info().name.clone(), + })? + .map(move |batch| { + batch.and_then(|batch| { + if let Some(projection) = &projection { + batch.try_project(projection) + } else { + Ok(batch) + } + }) + }); + let stream = RecordBatchStreamAdaptor { + schema: projected_schema, + stream: Box::pin(stream), + }; + Ok(Box::pin(stream)) + } +} diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index 760dd75fb2..56c9e71058 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -18,8 +18,10 @@ use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::{ SEMANTIC_TYPE_FIELD, SEMANTIC_TYPE_PRIMARY_KEY, SEMANTIC_TYPE_TIME_INDEX, }; +use common_error::prelude::BoxedError; use common_query::physical_plan::TaskContext; -use common_recordbatch::RecordBatch; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::datasource::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; @@ -29,7 +31,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{StringVectorBuilder, VectorRef}; use snafu::ResultExt; -use crate::error::{CreateRecordBatchSnafu, Result}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::CatalogProviderRef; pub(super) struct InformationSchemaColumns { @@ -69,6 +71,26 @@ impl InformationSchemaColumns { self.catalog_provider.clone(), ) } + + pub fn to_stream(&self) -> Result { + let schema = self.schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_tables() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } } struct InformationSchemaColumnsBuilder { diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 1a5afa9d3e..a7fb51d37b 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -16,8 +16,10 @@ use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; use common_catalog::consts::INFORMATION_SCHEMA_NAME; +use common_error::prelude::BoxedError; use common_query::physical_plan::TaskContext; -use common_recordbatch::RecordBatch; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::datasource::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; @@ -27,7 +29,7 @@ use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; use snafu::ResultExt; use table::metadata::TableType; -use crate::error::{CreateRecordBatchSnafu, Result}; +use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result}; use crate::CatalogProviderRef; pub(super) struct InformationSchemaTables { @@ -60,6 +62,26 @@ impl InformationSchemaTables { self.catalog_provider.clone(), ) } + + pub fn to_stream(&self) -> Result { + let schema = self.schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_tables() + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } } /// Builds the `information_schema.TABLE` table row by row diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 1d8c964935..854602b54f 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -30,12 +30,13 @@ use datatypes::schema::{ColumnSchema, RawSchema, SchemaRef}; use datatypes::vectors::{BinaryVector, TimestampMillisecondVector, UInt8Vector}; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::ScanRequest; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableInfoRef}; use table::requests::{ CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest, TableOptions, }; -use table::{Table, TableRef}; +use table::{Result as TableResult, Table, TableRef}; use crate::error::{ self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, @@ -68,8 +69,12 @@ impl Table for SystemCatalogTable { self.0.scan(projection, filters, limit).await } + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + self.0.scan_to_stream(request).await + } + /// Insert values into table. - async fn insert(&self, request: InsertRequest) -> table::error::Result { + async fn insert(&self, request: InsertRequest) -> TableResult { self.0.insert(request).await } @@ -77,7 +82,7 @@ impl Table for SystemCatalogTable { self.0.table_info() } - async fn delete(&self, request: DeleteRequest) -> table::Result { + async fn delete(&self, request: DeleteRequest) -> TableResult { self.0.delete(request).await } diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index dc5b8fec2d..42e381517b 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -459,26 +459,20 @@ fn is_null(null_mask: &BitVec, idx: usize) -> Option { #[cfg(test)] mod tests { - use std::any::Any; use std::sync::Arc; - use std::{assert_eq, unimplemented, vec}; + use std::{assert_eq, vec}; use api::helper::ColumnDataTypeWrapper; use api::v1::column::{self, SemanticType, Values}; use api::v1::{Column, ColumnDataType}; use common_base::BitVec; use common_catalog::consts::MITO_ENGINE; - use common_query::physical_plan::PhysicalPlanRef; - use common_query::prelude::Expr; use common_time::timestamp::Timestamp; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; + use datatypes::schema::{ColumnSchema, SchemaBuilder}; use datatypes::types::{TimestampMillisecondType, TimestampSecondType, TimestampType}; use datatypes::value::Value; use snafu::ResultExt; - use table::error::Result as TableResult; - use table::metadata::TableInfoRef; - use table::Table; use super::*; use crate::error; @@ -733,49 +727,6 @@ mod tests { assert_eq!(None, is_null(&null_mask, 99)); } - struct DemoTable; - - #[async_trait::async_trait] - impl Table for DemoTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true), - ]; - - Arc::new( - SchemaBuilder::try_from(column_schemas) - .unwrap() - .build() - .unwrap(), - ) - } - - fn table_info(&self) -> TableInfoRef { - unimplemented!() - } - - async fn scan( - &self, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> TableResult { - unimplemented!(); - } - } - fn mock_insert_batch() -> (Vec, u32) { let row_count = 2; diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index c949cdf414..83affe52e7 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -71,6 +71,7 @@ pub trait PhysicalPlan: Debug + Send + Sync { ) -> Result; } +/// Adapt DataFusion's [`ExecutionPlan`](DfPhysicalPlan) to GreptimeDB's [`PhysicalPlan`]. #[derive(Debug)] pub struct PhysicalPlanAdapter { schema: SchemaRef, diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 4be42c1175..4e7c90fcb5 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -111,7 +111,7 @@ impl Stream for DfRecordBatchStreamAdapter { } } -/// DataFusion SendableRecordBatchStream -> Greptime RecordBatchStream +/// DataFusion [SendableRecordBatchStream](DfSendableRecordBatchStream) -> Greptime [RecordBatchStream] pub struct RecordBatchStreamAdapter { schema: SchemaRef, stream: DfSendableRecordBatchStream, diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 53a63ded23..10fee35e54 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -70,6 +70,19 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Failed to project Arrow RecordBatch with schema {:?} and projection {:?}, source: {}", + schema, + projection, + source + ))] + ProjectArrowRecordBatch { + source: datatypes::arrow::error::ArrowError, + location: Location, + schema: datatypes::schema::SchemaRef, + projection: Vec, + }, + #[snafu(display("Column {} not exists in table {}", column_name, table_name))] ColumnNotExists { column_name: String, @@ -101,7 +114,8 @@ impl ErrorExt for Error { | Error::PollStream { .. } | Error::Format { .. } | Error::InitRecordbatchStream { .. } - | Error::ColumnNotExists { .. } => StatusCode::Internal, + | Error::ColumnNotExists { .. } + | Error::ProjectArrowRecordBatch { .. } => StatusCode::Internal, Error::External { source } => source.status_code(), diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 477b8527bd..632a88267e 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use datafusion::physical_plan::memory::MemoryStream; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::arrow::compute::SortOptions; pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::util::pretty; use datatypes::prelude::VectorRef; @@ -34,10 +35,20 @@ use snafu::{ensure, ResultExt}; pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } } pub type SendableRecordBatchStream = Pin>; +#[derive(Debug, Clone, Copy)] +pub struct OrderOption { + pub index: usize, + pub options: SortOptions, +} + /// EmptyRecordBatchStream can be used to create a RecordBatchStream /// that will produce no results pub struct EmptyRecordBatchStream { @@ -181,6 +192,26 @@ impl Stream for SimpleRecordBatchStream { } } +/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream]. +pub struct RecordBatchStreamAdaptor { + pub schema: SchemaRef, + pub stream: Pin> + Send>>, +} + +impl RecordBatchStream for RecordBatchStreamAdaptor { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for RecordBatchStreamAdaptor { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_next(ctx) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 721accaf10..c524840ff5 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use datatypes::schema::SchemaRef; use datatypes::value::Value; @@ -21,7 +22,10 @@ use serde::ser::{Error, SerializeStruct}; use serde::{Serialize, Serializer}; use snafu::{OptionExt, ResultExt}; -use crate::error::{self, CastVectorSnafu, ColumnNotExistsSnafu, Result}; +use crate::error::{ + self, CastVectorSnafu, ColumnNotExistsSnafu, DataTypesSnafu, ProjectArrowRecordBatchSnafu, + Result, +}; use crate::DfRecordBatch; /// A two-dimensional batch of column-oriented data with a defined schema. @@ -51,6 +55,26 @@ impl RecordBatch { }) } + pub fn try_project(&self, indices: &[usize]) -> Result { + let schema = Arc::new(self.schema.try_project(indices).context(DataTypesSnafu)?); + let mut columns = Vec::with_capacity(indices.len()); + for index in indices { + columns.push(self.columns[*index].clone()); + } + let df_record_batch = self.df_record_batch.project(indices).with_context(|_| { + ProjectArrowRecordBatchSnafu { + schema: self.schema.clone(), + projection: indices.to_vec(), + } + })?; + + Ok(Self { + schema, + columns, + df_record_batch, + }) + } + /// Create a new [`RecordBatch`] from `schema` and `df_record_batch`. /// /// This method doesn't check the schema. diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index 0caec0fed5..9fad402dd1 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -89,6 +89,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to project arrow schema, source: {}", source))] + ProjectArrowSchema { + source: arrow::error::ArrowError, + location: Location, + }, + #[snafu(display("Unsupported column default constraint expression: {}", expr))] UnsupportedDefaultExpr { expr: String, location: Location }, diff --git a/src/datatypes/src/lib.rs b/src/datatypes/src/lib.rs index 60f3b85315..1721ee8b29 100644 --- a/src/datatypes/src/lib.rs +++ b/src/datatypes/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(let_chains)] + pub mod arrow_array; pub mod data_type; pub mod error; diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index dde4f8e9a0..6372a9f164 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -24,7 +24,7 @@ use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::data_type::DataType; -use crate::error::{self, Error, Result}; +use crate::error::{self, Error, ProjectArrowSchemaSnafu, Result}; pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY}; pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; @@ -70,12 +70,10 @@ impl Schema { SchemaBuilder::try_from(column_schemas)?.build() } - #[inline] pub fn arrow_schema(&self) -> &Arc { &self.arrow_schema } - #[inline] pub fn column_schemas(&self) -> &[ColumnSchema] { &self.column_schemas } @@ -89,51 +87,75 @@ impl Schema { /// Retrieve the column's name by index /// # Panics /// This method **may** panic if the index is out of range of column schemas. - #[inline] pub fn column_name_by_index(&self, idx: usize) -> &str { &self.column_schemas[idx].name } - #[inline] pub fn column_index_by_name(&self, name: &str) -> Option { self.name_to_index.get(name).copied() } - #[inline] pub fn contains_column(&self, name: &str) -> bool { self.name_to_index.contains_key(name) } - #[inline] pub fn num_columns(&self) -> usize { self.column_schemas.len() } - #[inline] pub fn is_empty(&self) -> bool { self.column_schemas.is_empty() } /// Returns index of the timestamp key column. - #[inline] pub fn timestamp_index(&self) -> Option { self.timestamp_index } - #[inline] pub fn timestamp_column(&self) -> Option<&ColumnSchema> { self.timestamp_index.map(|idx| &self.column_schemas[idx]) } - #[inline] pub fn version(&self) -> u32 { self.version } - #[inline] pub fn metadata(&self) -> &HashMap { &self.arrow_schema.metadata } + + /// Generate a new projected schema + /// + /// # Panic + /// + /// If the index out ouf bound + pub fn try_project(&self, indices: &[usize]) -> Result { + let mut column_schemas = Vec::with_capacity(indices.len()); + let mut timestamp_index = None; + for index in indices { + if let Some(ts_index) = self.timestamp_index && ts_index == *index { + timestamp_index = Some(column_schemas.len()); + } + column_schemas.push(self.column_schemas[*index].clone()); + } + let arrow_schema = self + .arrow_schema + .project(indices) + .context(ProjectArrowSchemaSnafu)?; + let name_to_index = column_schemas + .iter() + .enumerate() + .map(|(pos, column_schema)| (column_schema.name.clone(), pos)) + .collect(); + + Ok(Self { + column_schemas, + name_to_index, + arrow_schema: Arc::new(arrow_schema), + timestamp_index, + version: self.version, + }) + } } #[derive(Default)] diff --git a/src/file-table-engine/src/error.rs b/src/file-table-engine/src/error.rs index 7fc4c91d22..ec45857fe2 100644 --- a/src/file-table-engine/src/error.rs +++ b/src/file-table-engine/src/error.rs @@ -142,7 +142,7 @@ pub enum Error { #[snafu(display("Failed to build stream: {}", source))] BuildStream { - source: datafusion::error::DataFusionError, + source: DataFusionError, location: Location, }, diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index b089fe5ef9..47d58ee69c 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -21,21 +21,24 @@ use common_datasource::file_format::Format; use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; use common_query::prelude::Expr; use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::SendableRecordBatchStream; use datafusion::common::ToDFSchema; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::optimizer::utils::conjunction; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_expr::execution_props::ExecutionProps; -use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec}; +use datafusion::physical_plan::file_format::{ + FileOpener, FileScanConfig, FileStream, ParquetExec, ParquetOpener, +}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datatypes::arrow::datatypes::Schema as ArrowSchema; use datatypes::schema::{Schema, SchemaRef}; use object_store::ObjectStore; use snafu::ResultExt; -use table::table::scan::SimpleTableScan; +use table::table::scan::StreamScanAdapter; -use crate::error::{self, Result}; +use crate::error::{self, BuildStreamSnafu, Result}; const DEFAULT_BATCH_SIZE: usize = 8192; @@ -113,7 +116,39 @@ fn build_scan_plan( .context(error::BuildStreamSnafu)?; let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) .context(error::BuildStreamAdapterSnafu)?; - Ok(Arc::new(SimpleTableScan::new(Box::pin(adapter)))) + Ok(Arc::new(StreamScanAdapter::new(Box::pin(adapter)))) +} + +fn build_record_batch_stream( + opener: T, + file_schema: Arc, + files: &[String], + projection: Option<&Vec>, + limit: Option, +) -> Result { + let stream = FileStream::new( + &FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema, + file_groups: vec![files + .iter() + .map(|filename| PartitionedFile::new(filename.to_string(), 0)) + .collect::>()], + statistics: Default::default(), + projection: projection.cloned(), + limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + 0, // partition: hard-code + opener, + &ExecutionPlanMetricsSet::new(), + ) + .context(error::BuildStreamSnafu)?; + let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) + .context(error::BuildStreamAdapterSnafu)?; + Ok(Box::pin(adapter)) } fn new_csv_scan_plan( @@ -132,6 +167,22 @@ fn new_csv_scan_plan( ) } +fn new_csv_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + format: &CsvFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_csv_opener(file_schema.clone(), config, format)?; + build_record_batch_stream( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + fn new_json_scan_plan( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -148,6 +199,22 @@ fn new_json_scan_plan( ) } +fn new_json_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + format: &JsonFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_json_opener(file_schema.clone(), config, format)?; + build_record_batch_stream( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + fn new_parquet_scan_plan( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -218,6 +285,84 @@ fn new_parquet_scan_plan( ))) } +fn new_parquet_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + _format: &ParquetFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let ScanPlanConfig { + files, + projection, + limit, + filters, + store, + .. + } = config; + + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema: file_schema.clone(), + file_groups: vec![files + .iter() + .map(|filename| PartitionedFile::new(filename.to_string(), 0)) + .collect::>()], + statistics: Default::default(), + projection: projection.cloned(), + limit: *limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + + let filters = filters + .iter() + .map(|f| f.df_expr().clone()) + .collect::>(); + + let filters = if let Some(expr) = conjunction(filters) { + let df_schema = file_schema + .clone() + .to_dfschema_ref() + .context(error::ParquetScanPlanSnafu)?; + + let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new()) + .context(error::ParquetScanPlanSnafu)?; + Some(filters) + } else { + None + }; + + let parquet_opener = ParquetOpener { + partition_index: 0, // partition: hard-code. This is only for statistics purpose + projection: Arc::from(projection.cloned().unwrap_or_default()), + batch_size: DEFAULT_BATCH_SIZE, + limit: *limit, + predicate: filters, + pruning_predicate: None, + page_pruning_predicate: None, + table_schema: file_schema.clone(), + metadata_size_hint: None, + metrics: ExecutionPlanMetricsSet::new(), + parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(store.clone())), + pushdown_filters: true, + reorder_filters: true, + enable_page_index: true, + }; + + let stream = FileStream::new( + &scan_config, + 0, + parquet_opener, + &ExecutionPlanMetricsSet::new(), + ) + .context(BuildStreamSnafu)?; + + let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) + .context(error::BuildStreamAdapterSnafu)?; + Ok(Box::pin(adapter)) +} + #[derive(Debug, Clone)] pub struct ScanPlanConfig<'a> { pub file_schema: SchemaRef, @@ -239,3 +384,15 @@ pub fn create_physical_plan( Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format), } } + +pub fn create_stream( + format: &Format, + ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, +) -> Result { + match format { + Format::Csv(format) => new_csv_stream(ctx, config, format), + Format::Json(format) => new_json_stream(ctx, config, format), + Format::Parquet(format) => new_parquet_stream(ctx, config, format), + } +} diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index 980c4d9b02..9171f3dd13 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -21,15 +21,17 @@ use common_datasource::object_store::build_backend; use common_error::prelude::BoxedError; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; +use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use table::error::{self as table_error, Result as TableResult}; use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; use table::{requests, Table}; +use super::format::create_stream; use crate::error::{self, ConvertRawSnafu, Result}; use crate::manifest::immutable::{ read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION, @@ -96,6 +98,23 @@ impl Table for ImmutableFileTable { .context(table_error::TableOperationSnafu) } + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + create_stream( + &self.format, + &CreateScanPlanContext::default(), + &ScanPlanConfig { + file_schema: self.schema(), + files: &self.files, + projection: request.projection.as_ref(), + filters: &request.filters, + limit: request.limit, + store: self.object_store.clone(), + }, + ) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) + } + async fn flush( &self, _region_number: Option, diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 2b9660a16e..fe9c4b91c2 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::iter; +use std::pin::Pin; use std::sync::Arc; use api::v1::AlterExpr; @@ -28,7 +29,12 @@ use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; use common_query::Output; use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter; -use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use common_recordbatch::error::{ + InitRecordbatchStreamSnafu, PollStreamSnafu, Result as RecordBatchResult, +}; +use common_recordbatch::{ + RecordBatch, RecordBatchStreamAdaptor, RecordBatches, SendableRecordBatchStream, +}; use common_telemetry::debug; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::{ @@ -36,10 +42,11 @@ use datafusion::physical_plan::{ }; use datafusion_common::DataFusionError; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use futures_util::{Stream, StreamExt}; use partition::manager::PartitionRuleManagerRef; use partition::splitter::WriteSplitter; use snafu::prelude::*; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; @@ -155,6 +162,70 @@ impl Table for DistTable { Ok(Arc::new(dist_scan)) } + // TODO(ruihang): DistTable should not call this method directly + async fn scan_to_stream( + &self, + request: ScanRequest, + ) -> table::Result { + let partition_rule = self + .partition_manager + .find_table_partition_rule(&self.table_name) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + + let regions = self + .partition_manager + .find_regions_by_filters(partition_rule, &request.filters) + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + let datanodes = self + .partition_manager + .find_region_datanodes(&self.table_name, regions) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; + + let table_name = &self.table_name; + let mut partition_execs = Vec::with_capacity(datanodes.len()); + for (datanode, _regions) in datanodes.iter() { + let client = self.datanode_clients.get_client(datanode).await; + let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); + let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); + + partition_execs.push(Arc::new(PartitionExec { + table_name: table_name.clone(), + datanode_instance, + projection: request.projection.clone(), + filters: request.filters.clone(), + limit: request.limit, + batches: Arc::new(RwLock::new(None)), + })); + } + + let schema = project_schema(self.schema(), request.projection.as_ref()); + let schema_to_move = schema.clone(); + let stream: Pin> + Send>> = Box::pin( + async_stream::try_stream! { + for partition_exec in partition_execs { + partition_exec + .maybe_init() + .await + .map_err(|e| DataFusionError::External(Box::new(e))) + .context(InitRecordbatchStreamSnafu)?; + let mut stream = partition_exec.as_stream().await.context(InitRecordbatchStreamSnafu)?; + + while let Some(batch) = stream.next().await{ + yield RecordBatch::try_from_df_record_batch(schema_to_move.clone(),batch.context(PollStreamSnafu)?)? + } + } + }, + ); + let record_batch_stream = RecordBatchStreamAdaptor { schema, stream }; + + Ok(Box::pin(record_batch_stream)) + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index ed3301a1e2..8be42c4c86 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -17,7 +17,6 @@ pub mod test_util; use std::any::Any; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use arc_swap::ArcSwap; @@ -26,12 +25,10 @@ use common_datasource::compression::CompressionType; use common_error::ext::BoxedError; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; -use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_telemetry::{info, logging}; use datatypes::schema::Schema; -use futures::task::{Context, Poll}; -use futures::Stream; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; @@ -48,7 +45,7 @@ use table::metadata::{ use table::requests::{ AddColumnRequest, AlterKind, AlterTableRequest, DeleteRequest, InsertRequest, }; -use table::table::scan::SimpleTableScan; +use table::table::scan::StreamScanAdapter; use table::table::{AlterContext, Table}; use table::{error as table_error, RegionStat}; use tokio::sync::Mutex; @@ -210,8 +207,83 @@ impl Table for MitoTable { } }); - let stream = Box::pin(ChunkStream { schema, stream }); - Ok(Arc::new(SimpleTableScan::new(stream))) + let stream = Box::pin(RecordBatchStreamAdaptor { schema, stream }); + Ok(Arc::new(StreamScanAdapter::new(stream))) + } + + async fn scan_to_stream(&self, request: ScanRequest) -> TableResult { + let read_ctx = ReadContext::default(); + let regions = self.regions.load(); + let mut readers = Vec::with_capacity(regions.len()); + let mut first_schema: Option> = None; + + let table_info = self.table_info.load(); + // TODO(hl): Currently the API between frontend and datanode is under refactoring in + // https://github.com/GreptimeTeam/greptimedb/issues/597 . Once it's finished, query plan + // can carry filtered region info to avoid scanning all regions on datanode. + for region in regions.values() { + let snapshot = region + .snapshot(&read_ctx) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + + let projection = self + .transform_projection(region, request.projection.clone()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + let filters = request.filters.clone(); + + let scan_request = ScanRequest { + projection, + filters, + ..Default::default() + }; + + let reader = snapshot + .scan(&read_ctx, scan_request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? + .reader; + + let schema = reader.user_schema().clone(); + if let Some(first_schema) = &first_schema { + // TODO(hl): we assume all regions' schemas are the same, but undergoing table altering + // may make these schemas inconsistent. + ensure!( + first_schema.version() == schema.version(), + RegionSchemaMismatchSnafu { + table: common_catalog::format_full_table_name( + &table_info.catalog_name, + &table_info.schema_name, + &table_info.name + ) + } + ); + } else { + first_schema = Some(schema); + } + readers.push(reader); + } + + // TODO(hl): we assume table contains at least one region, but with region migration this + // assumption may become invalid. + let stream_schema = first_schema.context(InvalidTableSnafu { + table_id: table_info.ident.table_id, + })?; + + let schema = stream_schema.clone(); + + let stream = Box::pin(async_stream::try_stream! { + for mut reader in readers { + while let Some(chunk) = reader.next_chunk().await.map_err(BoxedError::new).context(ExternalSnafu)? { + let chunk = reader.project_chunk(chunk); + yield RecordBatch::new(stream_schema.clone(), chunk.columns)? + } + } + }); + + Ok(Box::pin(RecordBatchStreamAdaptor { schema, stream })) } fn supports_filters_pushdown(&self, filters: &[&Expr]) -> TableResult> { @@ -341,25 +413,6 @@ impl Table for MitoTable { } } -struct ChunkStream { - schema: SchemaRef, - stream: Pin> + Send>>, -} - -impl RecordBatchStream for ChunkStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -impl Stream for ChunkStream { - type Item = RecordBatchResult; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_next(ctx) - } -} - #[inline] fn column_qualified_name(table_name: &str, region_name: &str, column_name: &str) -> String { format!("{table_name}.{region_name}.{column_name}") diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 1a9195d16c..14339b0394 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -54,5 +54,6 @@ paste = "1.0" rand.workspace = true statrs = "0.16" stats-cli = "3.0" +store-api = { path = "../store-api" } streaming-stats = "0.2" tokio-stream = "0.1" diff --git a/src/query/src/error.rs b/src/query/src/error.rs index b45045a4ed..3f658e20a3 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -34,8 +34,8 @@ pub enum Error { #[snafu(display("General catalog error: {}", source))] Catalog { - #[snafu(backtrace)] source: catalog::error::Error, + location: Location, }, #[snafu(display("Catalog not found: {}", catalog))] @@ -49,35 +49,49 @@ pub enum Error { #[snafu(display("Failed to do vector computation, source: {}", source))] VectorComputation { - #[snafu(backtrace)] source: datatypes::error::Error, + location: Location, }, #[snafu(display("Failed to create RecordBatch, source: {}", source))] CreateRecordBatch { - #[snafu(backtrace)] source: common_recordbatch::error::Error, + location: Location, }, #[snafu(display("Failure during query execution, source: {}", source))] - QueryExecution { source: BoxedError }, + QueryExecution { + source: BoxedError, + location: Location, + }, #[snafu(display("Failure during query planning, source: {}", source))] - QueryPlan { source: BoxedError }, + QueryPlan { + source: BoxedError, + location: Location, + }, #[snafu(display("Failure during query parsing, query: {}, source: {}", query, source))] - QueryParse { query: String, source: BoxedError }, + QueryParse { + query: String, + source: BoxedError, + location: Location, + }, #[snafu(display("Illegal access to catalog: {} and schema: {}", catalog, schema))] - QueryAccessDenied { catalog: String, schema: String }, + QueryAccessDenied { + catalog: String, + schema: String, + location: Location, + }, #[snafu(display("The SQL string has multiple statements, query: {}", query))] MultipleStatements { query: String, location: Location }, #[snafu(display("Failed to convert Datafusion schema: {}", source))] ConvertDatafusionSchema { - #[snafu(backtrace)] source: datatypes::error::Error, + location: Location, }, #[snafu(display("Failed to parse timestamp `{}`: {}", raw, source))] @@ -102,7 +116,7 @@ pub enum Error { #[snafu(display("General SQL error: {}", source))] Sql { - #[snafu(backtrace)] + location: Location, source: sql::error::Error, }, @@ -122,21 +136,21 @@ pub enum Error { #[snafu(display("Failed to convert value to sql value: {}", value))] ConvertSqlValue { value: Value, - #[snafu(backtrace)] source: sql::error::Error, + location: Location, }, #[snafu(display("Failed to convert concrete type to sql type: {:?}", datatype))] ConvertSqlType { datatype: ConcreteDataType, - #[snafu(backtrace)] source: sql::error::Error, + location: Location, }, #[snafu(display("Failed to parse SQL, source: {}", source))] ParseSql { - #[snafu(backtrace)] source: sql::error::Error, + location: Location, }, #[snafu(display("Missing required field: {}", name))] @@ -150,32 +164,32 @@ pub enum Error { #[snafu(display("Failed to build data source backend, source: {}", source))] BuildBackend { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to list objects, source: {}", source))] ListObjects { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to parse file format: {}", source))] ParseFileFormat { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to infer schema: {}", source))] InferSchema { - #[snafu(backtrace)] source: common_datasource::error::Error, + location: Location, }, #[snafu(display("Failed to convert datafusion schema, source: {}", source))] ConvertSchema { - #[snafu(backtrace)] source: datatypes::error::Error, + location: Location, }, } @@ -201,15 +215,15 @@ impl ErrorExt for Error { ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(), QueryAccessDenied { .. } => StatusCode::AccessDenied, - Catalog { source } => source.status_code(), - VectorComputation { source } | ConvertDatafusionSchema { source } => { + Catalog { source, .. } => source.status_code(), + VectorComputation { source, .. } | ConvertDatafusionSchema { source, .. } => { source.status_code() } - ParseSql { source } => source.status_code(), - CreateRecordBatch { source } => source.status_code(), - QueryExecution { source } | QueryPlan { source } => source.status_code(), + ParseSql { source, .. } => source.status_code(), + CreateRecordBatch { source, .. } => source.status_code(), + QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(), DataFusion { .. } | MissingTimestampColumn { .. } => StatusCode::Internal, - Sql { source } => source.status_code(), + Sql { source, .. } => source.status_code(), PlanSql { .. } => StatusCode::PlanQuery, ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(), } diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index e133ce4032..73431ed248 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -18,13 +18,14 @@ use std::sync::Arc; use catalog::local::{new_memory_catalog_list, MemoryCatalogProvider, MemorySchemaProvider}; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; -use common_recordbatch::RecordBatch; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; +use store_api::storage::ScanRequest; use table::metadata::{FilterPushDownType, TableInfoRef}; use table::predicate::TimeRangePredicateBuilder; use table::test_util::MemTable; @@ -69,6 +70,14 @@ impl Table for MemTableWrapper { self.inner.scan(projection, filters, limit).await } + async fn scan_to_stream( + &self, + request: ScanRequest, + ) -> table::Result { + *self.filter.write().await = request.filters.clone(); + self.inner.scan_to_stream(request).await + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 03bd025810..4834af37bb 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -10,6 +10,7 @@ bytes = "1.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } +common-recordbatch = { path = "../common/recordbatch" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } derive_builder = "0.11" diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 9338ff88d8..159f7b983a 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use common_error::ext::ErrorExt; use common_query::logical_plan::Expr; +use common_recordbatch::OrderOption; use datatypes::vectors::VectorRef; use crate::storage::{ColumnDescriptor, RegionDescriptor, SequenceNumber}; @@ -38,7 +39,7 @@ pub trait WriteRequest: Send { fn delete(&mut self, keys: HashMap) -> Result<(), Self::Error>; } -#[derive(Default)] +#[derive(Default, Clone, Debug)] pub struct ScanRequest { /// Max sequence number to read, None for latest sequence. /// @@ -49,6 +50,13 @@ pub struct ScanRequest { pub projection: Option>, /// Filters pushed down pub filters: Vec, + /// Expected output ordering. This is only a hint and isn't guaranteed. + pub output_ordering: Option>, + /// limit can be used to reduce the amount scanned + /// from the datasource as a performance optimization. + /// If set, it contains the amount of rows needed by the caller, + /// The data source should return *at least* this number of rows if available. + pub limit: Option, } #[derive(Debug)] diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 31e411a4cd..e377099c00 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -79,6 +79,9 @@ pub enum Error { location: Location, }, + #[snafu(display("Duplicated call to plan execute method. table: {}", table))] + DuplicatedExecuteCall { location: Location, table: String }, + #[snafu(display( "Not allowed to remove index column {} from table {}", column_name, @@ -141,7 +144,9 @@ impl ErrorExt for Error { Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => { StatusCode::InvalidArguments } - Error::TablesRecordBatch { .. } => StatusCode::Unexpected, + Error::TablesRecordBatch { .. } | Error::DuplicatedExecuteCall { .. } => { + StatusCode::Unexpected + } Error::ColumnExists { .. } => StatusCode::TableColumnExists, Error::SchemaBuild { source, .. } => source.status_code(), Error::TableOperation { source } => source.status_code(), diff --git a/src/table/src/table.rs b/src/table/src/table.rs index b2bd036f57..3dd38bfe08 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -22,8 +22,9 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::logical_plan::Expr; use common_query::physical_plan::PhysicalPlanRef; +use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::{Result, UnsupportedSnafu}; use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType}; @@ -73,6 +74,8 @@ pub trait Table: Send + Sync { limit: Option, ) -> Result; + async fn scan_to_stream(&self, request: ScanRequest) -> Result; + /// Tests whether the table provider can make use of any or all filter expressions /// to optimise data retrieval. fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 15c915fe9c..a9e1b5a4c5 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -13,39 +13,46 @@ // limitations under the License. use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use common_query::logical_plan::Expr; -use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlanAdapter, PhysicalPlanRef}; +use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::DfPhysicalPlan; -use common_telemetry::debug; +use common_recordbatch::OrderOption; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion::datasource::{TableProvider, TableType as DfTableType}; use datafusion::error::Result as DfResult; use datafusion::execution::context::SessionState; -use datafusion::prelude::SessionContext; use datafusion_expr::expr::Expr as DfExpr; -use datatypes::schema::{SchemaRef as TableSchemaRef, SchemaRef}; -use snafu::prelude::*; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::PhysicalSortExpr; +use store_api::storage::ScanRequest; -use crate::error::{self, Result}; -use crate::metadata::TableInfoRef; -use crate::table::{FilterPushDownType, Table, TableRef, TableType}; +use super::scan::StreamScanAdapter; +use crate::table::{TableRef, TableType}; -/// Greptime Table -> datafusion TableProvider +/// Adapt greptime's [TableRef] to DataFusion's [TableProvider]. pub struct DfTableProviderAdapter { table: TableRef, + scan_req: Arc>, } impl DfTableProviderAdapter { pub fn new(table: TableRef) -> Self { - Self { table } + Self { + table, + scan_req: Arc::default(), + } } pub fn table(&self) -> TableRef { self.table.clone() } + + pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) { + self.scan_req.lock().unwrap().output_ordering = Some(order_opts.to_vec()); + } } #[async_trait::async_trait] @@ -74,8 +81,36 @@ impl TableProvider for DfTableProviderAdapter { limit: Option, ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); - let inner = self.table.scan(projection, &filters, limit).await?; - Ok(Arc::new(DfPhysicalPlanAdapter(inner))) + let request = { + let mut request = self.scan_req.lock().unwrap(); + request.filters = filters; + request.projection = projection.cloned(); + request.limit = limit; + request.clone() + }; + let stream = self.table.scan_to_stream(request).await?; + + // build sort physical expr + let schema = stream.schema(); + let sort_expr = stream.output_ordering().map(|order_opts| { + order_opts + .iter() + .map(|order_opt| { + let col_name = schema.column_name_by_index(order_opt.index); + let col_expr = Arc::new(Column::new(col_name, order_opt.index)); + PhysicalSortExpr { + expr: col_expr, + options: order_opt.options, + } + }) + .collect::>() + }); + + let mut stream_adapter = StreamScanAdapter::new(stream); + if let Some(sort_expr) = sort_expr { + stream_adapter = stream_adapter.with_output_ordering(sort_expr); + } + Ok(Arc::new(DfPhysicalPlanAdapter(Arc::new(stream_adapter)))) } fn supports_filters_pushdown( @@ -92,100 +127,3 @@ impl TableProvider for DfTableProviderAdapter { .map(|v| v.into_iter().map(Into::into).collect::>())?) } } - -/// Datafusion TableProvider -> greptime Table -pub struct TableAdapter { - schema: TableSchemaRef, - table_provider: Arc, -} - -impl TableAdapter { - pub fn new(table_provider: Arc) -> Result { - Ok(Self { - schema: Arc::new( - table_provider - .schema() - .try_into() - .context(error::SchemaConversionSnafu)?, - ), - table_provider, - }) - } -} - -#[async_trait::async_trait] -impl Table for TableAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> TableSchemaRef { - self.schema.clone() - } - - fn table_info(&self) -> TableInfoRef { - unreachable!("Should not call table_info of TableAdaptor directly") - } - - fn table_type(&self) -> TableType { - match self.table_provider.table_type() { - DfTableType::Base => TableType::Base, - DfTableType::View => TableType::View, - DfTableType::Temporary => TableType::Temporary, - } - } - - async fn scan( - &self, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> Result { - let ctx = SessionContext::new(); - let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); - debug!("TableScan filter size: {}", filters.len()); - let execution_plan = self - .table_provider - .scan(&ctx.state(), projection, &filters, limit) - .await - .context(error::DatafusionSnafu)?; - let schema: SchemaRef = Arc::new( - execution_plan - .schema() - .try_into() - .context(error::SchemaConversionSnafu)?, - ); - Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan))) - } - - fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { - self.table_provider - .supports_filters_pushdown(&filters.iter().map(|x| x.df_expr()).collect::>()) - .context(error::DatafusionSnafu) - .map(|v| v.into_iter().map(Into::into).collect::>()) - } -} - -#[cfg(test)] -mod tests { - use datafusion::arrow; - use datafusion::datasource::empty::EmptyTable; - - use super::*; - use crate::metadata::TableType::Base; - - #[test] - #[should_panic] - fn test_table_adaptor_info() { - let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - let table_adapter = TableAdapter::new(df_table).unwrap(); - let _ = table_adapter.table_info(); - } - - #[test] - fn test_table_adaptor_type() { - let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - let table_adapter = TableAdapter::new(df_table).unwrap(); - assert_eq!(Base, table_adapter.table_type()); - } -} diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 3047346409..4ee32e40c8 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch; use datafusion_common::from_slice::FromSlice; @@ -29,11 +29,11 @@ use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; use futures::task::{Context, Poll}; use futures::Stream; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::Result; use crate::metadata::{TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; -use crate::table::scan::SimpleTableScan; +use crate::table::scan::StreamScanAdapter; use crate::table::{Expr, Table}; const NUMBER_COLUMN: &str = "number"; @@ -132,10 +132,18 @@ impl Table for NumbersTable { ) .into()]; Ok(Arc::new( - SimpleTableScan::new(stream).with_output_ordering(output_ordering), + StreamScanAdapter::new(stream).with_output_ordering(output_ordering), )) } + async fn scan_to_stream(&self, request: ScanRequest) -> Result { + Ok(Box::pin(NumbersStream { + limit: request.limit.unwrap_or(100) as u32, + schema: self.schema.clone(), + already_run: false, + })) + } + async fn flush(&self, _region_number: Option, _wait: Option) -> Result<()> { Ok(()) } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 01acd5854a..c510dbdde8 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -25,22 +25,23 @@ use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::SchemaRef; use snafu::OptionExt; -pub struct SimpleTableScan { +/// Adapt greptime's [SendableRecordBatchStream] to DataFusion's [PhysicalPlan]. +pub struct StreamScanAdapter { stream: Mutex>, schema: SchemaRef, output_ordering: Option>, } -impl Debug for SimpleTableScan { +impl Debug for StreamScanAdapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SimpleTableScan") + f.debug_struct("StreamScanAdapter") .field("stream", &"") .field("schema", &self.schema) .finish() } } -impl SimpleTableScan { +impl StreamScanAdapter { pub fn new(stream: SendableRecordBatchStream) -> Self { let schema = stream.schema(); @@ -57,7 +58,7 @@ impl SimpleTableScan { } } -impl PhysicalPlan for SimpleTableScan { +impl PhysicalPlan for StreamScanAdapter { fn as_any(&self) -> &dyn Any { self } @@ -126,7 +127,7 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scan = SimpleTableScan::new(stream); + let scan = StreamScanAdapter::new(stream); assert_eq!(scan.schema(), schema); diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index 7fd8168235..c2388dc292 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -16,11 +16,12 @@ use std::sync::Arc; use async_trait::async_trait; use common_query::physical_plan::PhysicalPlanRef; -use common_recordbatch::EmptyRecordBatchStream; +use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; +use store_api::storage::ScanRequest; use crate::metadata::{TableInfo, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType}; use crate::requests::{CreateTableRequest, InsertRequest}; -use crate::table::scan::SimpleTableScan; +use crate::table::scan::StreamScanAdapter; use crate::{Result, Table}; pub struct EmptyTable { @@ -82,7 +83,11 @@ impl Table for EmptyTable { _filters: &[common_query::prelude::Expr], _limit: Option, ) -> Result { - let scan = SimpleTableScan::new(Box::pin(EmptyRecordBatchStream::new(self.schema()))); + let scan = StreamScanAdapter::new(Box::pin(EmptyRecordBatchStream::new(self.schema()))); Ok(Arc::new(scan)) } + + async fn scan_to_stream(&self, _: ScanRequest) -> Result { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))) + } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 8aa5f3dd2d..ecb888f8d0 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -21,20 +21,20 @@ use common_error::prelude::BoxedError; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::UInt32Vector; use futures::task::{Context, Poll}; use futures::Stream; use snafu::prelude::*; -use store_api::storage::RegionNumber; +use store_api::storage::{RegionNumber, ScanRequest}; use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu}; use crate::metadata::{ TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion, }; -use crate::table::scan::SimpleTableScan; +use crate::table::scan::StreamScanAdapter; use crate::{ColumnStatistics, Table, TableStatistics}; #[derive(Debug, Clone)] @@ -167,12 +167,43 @@ impl Table for MemTable { ) .map_err(BoxedError::new) .context(TablesRecordBatchSnafu)?; - Ok(Arc::new(SimpleTableScan::new(Box::pin(MemtableStream { + Ok(Arc::new(StreamScanAdapter::new(Box::pin(MemtableStream { schema: recordbatch.schema.clone(), recordbatch: Some(recordbatch), })))) } + async fn scan_to_stream(&self, request: ScanRequest) -> Result { + let df_recordbatch = if let Some(indices) = request.projection { + self.recordbatch + .df_record_batch() + .project(&indices) + .context(TableProjectionSnafu)? + } else { + self.recordbatch.df_record_batch().clone() + }; + + let rows = df_recordbatch.num_rows(); + let limit = if let Some(limit) = request.limit { + limit.min(rows) + } else { + rows + }; + let df_recordbatch = df_recordbatch.slice(0, limit); + + let recordbatch = RecordBatch::try_from_df_record_batch( + Arc::new(Schema::try_from(df_recordbatch.schema()).context(SchemaConversionSnafu)?), + df_recordbatch, + ) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)?; + + Ok(Box::pin(MemtableStream { + schema: recordbatch.schema.clone(), + recordbatch: Some(recordbatch), + })) + } + fn statistics(&self) -> Option { let df_recordbatch = self.recordbatch.df_record_batch(); let num_rows = df_recordbatch.num_rows(); diff --git a/tests/cases/standalone/common/optimizer/order_by.result b/tests/cases/standalone/common/optimizer/order_by.result index d661c83068..87f57f31e7 100644 --- a/tests/cases/standalone/common/optimizer/order_by.result +++ b/tests/cases/standalone/common/optimizer/order_by.result @@ -27,7 +27,8 @@ explain select * from numbers order by number asc; +---------------+------------------------------------------+ | logical_plan | Sort: numbers.number ASC NULLS LAST | | | TableScan: numbers projection=[number] | -| physical_plan | ExecutionPlan(PlaceHolder) | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | ExecutionPlan(PlaceHolder) | | | | +---------------+------------------------------------------+ @@ -47,14 +48,15 @@ explain select * from numbers order by number desc limit 10; explain select * from numbers order by number asc limit 10; -+---------------+-------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------+ -| logical_plan | Limit: skip=0, fetch=10 | -| | Sort: numbers.number ASC NULLS LAST, fetch=10 | -| | TableScan: numbers projection=[number] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | ExecutionPlan(PlaceHolder) | -| | | -+---------------+-------------------------------------------------+ ++---------------+------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------+ +| logical_plan | Limit: skip=0, fetch=10 | +| | Sort: numbers.number ASC NULLS LAST, fetch=10 | +| | TableScan: numbers projection=[number] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] | +| | ExecutionPlan(PlaceHolder) | +| | | ++---------------+------------------------------------------------------+