diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index 15f8df668a..13d8fd9f6f 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -18,7 +18,6 @@ use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener use common_datasource::file_format::json::{JsonFormat, JsonOpener}; use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat}; use common_datasource::file_format::Format; -use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; use common_query::prelude::Expr; use common_query::DfPhysicalPlan; use common_recordbatch::adapter::RecordBatchStreamAdapter; @@ -33,10 +32,9 @@ use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStr use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::prelude::SessionContext; use datatypes::arrow::datatypes::Schema as ArrowSchema; -use datatypes::schema::{Schema, SchemaRef}; +use datatypes::schema::SchemaRef; use object_store::ObjectStore; use snafu::ResultExt; -use table::table::scan::StreamScanAdapter; use crate::error::{self, Result}; @@ -87,17 +85,6 @@ fn build_json_opener( )) } -fn build_scan_plan( - opener: T, - file_schema: Arc, - files: &[String], - projection: Option<&Vec>, - limit: Option, -) -> Result { - let adapter = build_record_batch_stream(opener, file_schema, files, projection, limit)?; - Ok(Arc::new(StreamScanAdapter::new(adapter))) -} - fn build_record_batch_stream( opener: T, file_schema: Arc, @@ -130,22 +117,6 @@ fn build_record_batch_stream( Ok(Box::pin(adapter)) } -fn new_csv_scan_plan( - _ctx: &CreateScanPlanContext, - config: &ScanPlanConfig, - format: &CsvFormat, -) -> Result { - let file_schema = config.file_schema.arrow_schema().clone(); - let opener = build_csv_opener(file_schema.clone(), config, format)?; - build_scan_plan( - opener, - file_schema, - config.files, - config.projection, - config.limit, - ) -} - fn new_csv_stream( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -162,22 +133,6 @@ fn new_csv_stream( ) } -fn new_json_scan_plan( - _ctx: &CreateScanPlanContext, - config: &ScanPlanConfig, - format: &JsonFormat, -) -> Result { - let file_schema = config.file_schema.arrow_schema().clone(); - let opener = build_json_opener(file_schema.clone(), config, format)?; - build_scan_plan( - opener, - file_schema, - config.files, - config.projection, - config.limit, - ) -} - fn new_json_stream( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -194,76 +149,6 @@ fn new_json_stream( ) } -fn new_parquet_scan_plan( - _ctx: &CreateScanPlanContext, - config: &ScanPlanConfig, - _format: &ParquetFormat, -) -> Result { - let file_schema = config.file_schema.arrow_schema().clone(); - let ScanPlanConfig { - files, - projection, - limit, - filters, - store, - .. - } = config; - - let scan_config = FileScanConfig { - object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used - file_schema: file_schema.clone(), - file_groups: vec![files - .iter() - .map(|filename| PartitionedFile::new(filename.to_string(), 0)) - .collect::>()], - statistics: Default::default(), - projection: projection.cloned(), - limit: *limit, - table_partition_cols: vec![], - output_ordering: None, - infinite_source: false, - }; - - let filters = filters - .iter() - .map(|f| f.df_expr().clone()) - .collect::>(); - - let filters = if let Some(expr) = conjunction(filters) { - let df_schema = file_schema - .clone() - .to_dfschema_ref() - .context(error::ParquetScanPlanSnafu)?; - - let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new()) - .context(error::ParquetScanPlanSnafu)?; - Some(filters) - } else { - None - }; - - let exec = ParquetExec::new(scan_config, filters, None).with_parquet_file_reader_factory( - Arc::new(DefaultParquetFileReaderFactory::new(store.clone())), - ); - - let projected_schema = if let Some(projection) = config.projection { - Arc::new( - file_schema - .project(projection) - .context(error::ProjectSchemaSnafu)?, - ) - } else { - file_schema - }; - - let schema = Schema::try_from(projected_schema).context(error::ConvertSchemaSnafu)?; - - Ok(Arc::new(PhysicalPlanAdapter::new( - Arc::new(schema), - Arc::new(exec), - ))) -} - fn new_parquet_stream_with_exec_plan( _ctx: &CreateScanPlanContext, config: &ScanPlanConfig, @@ -338,19 +223,6 @@ pub struct ScanPlanConfig<'a> { pub store: ObjectStore, } -pub fn create_physical_plan( - format: &Format, - ctx: &CreateScanPlanContext, - config: &ScanPlanConfig, -) -> Result { - match format { - Format::Csv(format) => new_csv_scan_plan(ctx, config, format), - Format::Json(format) => new_json_scan_plan(ctx, config, format), - Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format), - _ => error::UnsupportedFormatSnafu { format: *format }.fail(), - } -} - pub fn create_stream( format: &Format, ctx: &CreateScanPlanContext,