From a709a5c842c5e214f429e7e53b01e8bedbd8e88d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 26 Apr 2023 17:45:37 +0900 Subject: [PATCH] feat: support to create parquet format external table (#1463) * feat: support parquet format external table * Update src/file-table-engine/src/error.rs Co-authored-by: Ruihang Xia --------- Co-authored-by: Ruihang Xia --- src/file-table-engine/src/error.rs | 19 ++++- src/file-table-engine/src/table/format.rs | 91 ++++++++++++++++++++-- src/frontend/src/tests/instance_test.rs | 70 +++++++++++++++++ src/query/src/sql.rs | 34 +++----- tests/data/parquet/README.md | 29 +++++++ tests/data/parquet/various_type.parquet | Bin 0 -> 2541 bytes 6 files changed, 208 insertions(+), 35 deletions(-) create mode 100644 tests/data/parquet/README.md create mode 100644 tests/data/parquet/various_type.parquet diff --git a/src/file-table-engine/src/error.rs b/src/file-table-engine/src/error.rs index db172524e6..3e617c0ab6 100644 --- a/src/file-table-engine/src/error.rs +++ b/src/file-table-engine/src/error.rs @@ -16,6 +16,7 @@ use std::any::Any; use common_error::prelude::*; use datafusion::arrow::error::ArrowError; +use datafusion::error::DataFusionError; use serde_json::error::Error as JsonError; use snafu::Location; use table::metadata::{TableInfoBuilderError, TableMetaBuilderError}; @@ -165,6 +166,18 @@ pub enum Error { #[snafu(backtrace)] source: common_datasource::error::Error, }, + + #[snafu(display("Failed to generate parquet scan plan: {}", source))] + ParquetScanPlan { + source: DataFusionError, + location: Location, + }, + + #[snafu(display("Failed to convert schema: {}", source))] + ConvertSchema { + source: datatypes::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -181,7 +194,8 @@ impl ErrorExt for Error { | UnsupportedFileFormat { .. } | BuildCsvConfig { .. } | ProjectSchema { .. } - | MissingRequiredField { .. } => StatusCode::InvalidArguments, + | MissingRequiredField { .. } + | ConvertSchema { .. } => StatusCode::InvalidArguments, BuildBackend { source, .. } => source.status_code(), BuildStreamAdapter { source, .. } => source.status_code(), @@ -197,7 +211,8 @@ impl ErrorExt for Error { | ConvertRaw { .. } | DropTable { .. } | WriteImmutableManifest { .. } - | BuildStream { .. } => StatusCode::Unexpected, + | BuildStream { .. } + | ParquetScanPlan { .. } => StatusCode::Unexpected, } } diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index d99cb5ca2b..b089fe5ef9 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -16,16 +16,21 @@ use std::sync::Arc; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; use common_datasource::file_format::json::{JsonFormat, JsonOpener}; +use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat}; use common_datasource::file_format::Format; -use common_query::physical_plan::PhysicalPlanRef; +use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef}; use common_query::prelude::Expr; use common_recordbatch::adapter::RecordBatchStreamAdapter; -use datafusion::arrow::datatypes::Schema; +use datafusion::common::ToDFSchema; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream}; +use datafusion::optimizer::utils::conjunction; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_expr::execution_props::ExecutionProps; +use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datatypes::schema::SchemaRef; +use datatypes::arrow::datatypes::Schema as ArrowSchema; +use datatypes::schema::{Schema, SchemaRef}; use object_store::ObjectStore; use snafu::ResultExt; use table::table::scan::SimpleTableScan; @@ -38,7 +43,7 @@ const DEFAULT_BATCH_SIZE: usize = 8192; pub struct CreateScanPlanContext {} fn build_csv_opener( - file_schema: Arc, + file_schema: Arc, config: &ScanPlanConfig, format: &CsvFormat, ) -> Result { @@ -58,7 +63,7 @@ fn build_csv_opener( } fn build_json_opener( - file_schema: Arc, + file_schema: Arc, config: &ScanPlanConfig, format: &JsonFormat, ) -> Result { @@ -81,7 +86,7 @@ fn build_json_opener( fn build_scan_plan( opener: T, - file_schema: Arc, + file_schema: Arc, files: &[String], projection: Option<&Vec>, limit: Option, @@ -143,6 +148,76 @@ fn new_json_scan_plan( ) } +fn new_parquet_scan_plan( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + _format: &ParquetFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let ScanPlanConfig { + files, + projection, + limit, + filters, + store, + .. + } = config; + + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema: file_schema.clone(), + file_groups: vec![files + .iter() + .map(|filename| PartitionedFile::new(filename.to_string(), 0)) + .collect::>()], + statistics: Default::default(), + projection: projection.cloned(), + limit: *limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + + let filters = filters + .iter() + .map(|f| f.df_expr().clone()) + .collect::>(); + + let filters = if let Some(expr) = conjunction(filters) { + let df_schema = file_schema + .clone() + .to_dfschema_ref() + .context(error::ParquetScanPlanSnafu)?; + + let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new()) + .context(error::ParquetScanPlanSnafu)?; + Some(filters) + } else { + None + }; + + let exec = ParquetExec::new(scan_config, filters, None).with_parquet_file_reader_factory( + Arc::new(DefaultParquetFileReaderFactory::new(store.clone())), + ); + + let projected_schema = if let Some(projection) = config.projection { + Arc::new( + file_schema + .project(projection) + .context(error::ProjectSchemaSnafu)?, + ) + } else { + file_schema + }; + + let schema = Schema::try_from(projected_schema).context(error::ConvertSchemaSnafu)?; + + Ok(Arc::new(PhysicalPlanAdapter::new( + Arc::new(schema), + Arc::new(exec), + ))) +} + #[derive(Debug, Clone)] pub struct ScanPlanConfig<'a> { pub file_schema: SchemaRef, @@ -161,6 +236,6 @@ pub fn create_physical_plan( match format { Format::Csv(format) => new_csv_scan_plan(ctx, config, format), Format::Json(format) => new_json_scan_plan(ctx, config, format), - Format::Parquet(_) => error::UnsupportedFileFormatSnafu { format: "parquet" }.fail(), + Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format), } } diff --git a/src/frontend/src/tests/instance_test.rs b/src/frontend/src/tests/instance_test.rs index e498655f98..a57242d74b 100644 --- a/src/frontend/src/tests/instance_test.rs +++ b/src/frontend/src/tests/instance_test.rs @@ -512,6 +512,76 @@ async fn test_execute_external_create_without_ts_type(instance: Arc) { + let instance = instance.frontend(); + let format = "parquet"; + let location = get_data_dir("../../tests/data/parquet/various_type.parquet") + .canonicalize() + .unwrap() + .display() + .to_string(); + + let table_name = "various_type_parquet"; + + let output = execute_sql( + &instance, + &format!( + r#"create external table {table_name} with (location='{location}', format='{format}');"#, + ), + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); + + let output = execute_sql(&instance, &format!("desc table {table_name};")).await; + let expect = "\ ++------------+-----------------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++------------+-----------------+------+---------+---------------+ +| c_int | Int64 | YES | | FIELD | +| c_float | Float64 | YES | | FIELD | +| c_string | Float64 | YES | | FIELD | +| c_bool | Boolean | YES | | FIELD | +| c_date | Date | YES | | FIELD | +| c_datetime | TimestampSecond | YES | | FIELD | ++------------+-----------------+------+---------+---------------+"; + check_output_stream(output, expect).await; + + let output = execute_sql(&instance, &format!("select * from {table_name};")).await; + let expect = "\ ++-------+-----------+----------+--------+------------+---------------------+ +| c_int | c_float | c_string | c_bool | c_date | c_datetime | ++-------+-----------+----------+--------+------------+---------------------+ +| 1 | 1.1 | 1.11 | true | 1970-01-01 | 1970-01-01T00:00:00 | +| 2 | 2.2 | 2.22 | true | 2020-11-08 | 2020-11-08T01:00:00 | +| 3 | | 3.33 | true | 1969-12-31 | 1969-11-08T02:00:00 | +| 4 | 4.4 | | false | | | +| 5 | 6.6 | | false | 1990-01-01 | 1990-01-01T03:00:00 | +| 4 | 4000000.0 | | false | | | +| 4 | 4.0e-6 | | false | | | ++-------+-----------+----------+--------+------------+---------------------+"; + check_output_stream(output, expect).await; + + let output = execute_sql( + &instance, + &format!("select c_bool,c_int,c_bool as b from {table_name};"), + ) + .await; + let expect = "\ ++--------+-------+-------+ +| c_bool | c_int | b | ++--------+-------+-------+ +| true | 1 | true | +| true | 2 | true | +| true | 3 | true | +| false | 4 | false | +| false | 5 | false | +| false | 4 | false | +| false | 4 | false | ++--------+-------+-------+"; + check_output_stream(output, expect).await; +} + #[apply(both_instances_cases)] async fn test_execute_query_external_table_csv(instance: Arc) { let instance = instance.frontend(); diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 858ccc915d..1705a37202 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -19,10 +19,7 @@ use std::sync::Arc; use catalog::CatalogManagerRef; use common_catalog::consts::DEFAULT_CATALOG_NAME; -use common_datasource::file_format::csv::CsvFormat; -use common_datasource::file_format::json::JsonFormat; -use common_datasource::file_format::parquet::ParquetFormat; -use common_datasource::file_format::{infer_schemas, FileFormat}; +use common_datasource::file_format::{infer_schemas, FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; use common_datasource::util::find_dir_and_filename; @@ -40,9 +37,7 @@ use sql::ast::ColumnDef; use sql::statements::column_def_to_schema; use sql::statements::create::Partitions; use sql::statements::show::{ShowDatabases, ShowKind, ShowTables}; -use table::requests::{ - IMMUTABLE_TABLE_FORMAT_KEY, IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY, -}; +use table::requests::{IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY}; use table::TableRef; use crate::error::{self, Result}; @@ -336,24 +331,13 @@ async fn prepare_immutable_file_table( fn parse_immutable_file_table_format( options: &HashMap, ) -> Result> { - let format = options - .get(IMMUTABLE_TABLE_FORMAT_KEY) - .cloned() - .unwrap_or_default() - .to_uppercase(); - - match format.as_str() { - "CSV" => { - let file_format = CsvFormat::try_from(options).context(error::ParseFileFormatSnafu)?; - Ok(Box::new(file_format)) - } - "JSON" => { - let file_format = JsonFormat::try_from(options).context(error::ParseFileFormatSnafu)?; - Ok(Box::new(file_format)) - } - "PARQUET" => Ok(Box::new(ParquetFormat {})), - format => error::UnsupportedFileFormatSnafu { format }.fail(), - } + Ok( + match Format::try_from(options).context(error::ParseFileFormatSnafu)? { + Format::Csv(format) => Box::new(format), + Format::Json(format) => Box::new(format), + Format::Parquet(format) => Box::new(format), + }, + ) } async fn infer_immutable_file_table_schema( diff --git a/tests/data/parquet/README.md b/tests/data/parquet/README.md new file mode 100644 index 0000000000..c1abae4e6c --- /dev/null +++ b/tests/data/parquet/README.md @@ -0,0 +1,29 @@ +### various_type.parquet +Schema: +``` ++-------------+-------------------------+-------------+ +| column_name | data_type | is_nullable | ++-------------+-------------------------+-------------+ +| c_int | Int64 | YES | +| c_float | Float64 | YES | +| c_string | Float64 | YES | +| c_bool | Boolean | YES | +| c_date | Date32 | YES | +| c_datetime | Timestamp(Second, None) | YES | ++-------------+-------------------------+-------------+ +``` + +Data: +``` ++-------+----------+----------+--------+------------+---------------------+ +| c_int | c_float | c_string | c_bool | c_date | c_datetime | ++-------+----------+----------+--------+------------+---------------------+ +| 1 | 1.1 | 1.11 | true | 1970-01-01 | 1970-01-01T00:00:00 | +| 2 | 2.2 | 2.22 | true | 2020-11-08 | 2020-11-08T01:00:00 | +| 3 | | 3.33 | true | 1969-12-31 | 1969-11-08T02:00:00 | +| 4 | 4.4 | | false | | | +| 5 | 6.6 | | false | 1990-01-01 | 1990-01-01T03:00:00 | +| 4 | 4000000 | | false | | | +| 4 | 0.000004 | | false | | | ++-------+----------+----------+--------+------------+---------------------+ +``` \ No newline at end of file diff --git a/tests/data/parquet/various_type.parquet b/tests/data/parquet/various_type.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b92184c28baf33cf98df30b3b9d6920ab63f2c08 GIT binary patch literal 2541 zcmb_eUuauZ7(eGGOKxKt+q!dhuiS?q_7YILw$9mz+j5f4v@W(yP}>e6=1>2PG|AH1 z&gsK8L?-AI-|T}36MP#(nDoU5r%doc$`FO&W09c~oFEQS=J%a@PLj19!sdaS^PTVe z&hPxbKj)BfksDxsHqH_m7Gk7_kl4lD|B&I&2{}oLL43qdb`Wp)pNBPNgmD&oltow< z3$TDQX==M=QsoJg2Qx(ggOCXI`OaMKWPbYz66mt69i{wyb%}*}gs)S6g}PIXP(9PnnE@aFEsj2sUpkle{PM#Ve7Ui|3K%j^3$N3z$5 zncBxPwHIh^i^caE#$vFLiX~Xe&s1s!FPaU?&lv7apZl``@oWX-cQ^tf>SxIeB(zn6 zF9`YakJwA^Ub=C>n`z?X`o7Q)=Ryf!zLUAF8tgS=rnXt8-qmED^{U_t8oq}~lek&R zFD^CatFy8WpHlukb*~59vjJef6^!5E2%r*SQKUk=cA(UfZQvy>t{Wi8h}}y9wiPJl zi?v!s9-81e_jJe8lWGrSA<70b%xufTDNG_EXNL*71>-FT%D*}KK+KPJ%a0!5>>aJ6 z-eveU2s2c<;P`T3X<8otKINZL_gctZ4LMdD1m6S*yb>&TJ1^Sm^{&(>dE^-VWeo}3 zy72Xfy|~_nr>T7{Q+tEsc6{|!33+^%<`+E9mni>8H({vLRyg3kjg}C*+pt>~ z6hwV%p)OpTb4gfn?80ziPr<{xDroyu$&UFh1i@D*E&51sPgN>;XA^Nux{kskaD*9j7onOTr zZ+EkYh`WBDb5B>4|AtuSyICJ`FGO1PXe*bD?ryISfAayr|FIi#$NviQ2z00w7P;Km z#8Y0QS%3R?2&>GG>bCX@YziGj*k^+=^u9jzA!_n?nRB6MmNjy;HysNzpr;t!#S z#K8tZ480&FxrU!?O|Iqgu(WFn%r*FuM<)k!bLI4l@?22#V=A7JUJ@UqLnD%()FqFh zU5a7~CcPhyNlScGs`x<2PBoK;NdcY`O2(5OotavQFBie>=s18L$DKm+O$xbAJ)+Ko z6v`pj6AqmP6x9SHNE= zrsH#ENji(9$AlPCkh5(;ctr(-q?l{f8>m3vV5vaT!^?zBE#TLSRjJrVEW2K49D8MY f>9NLQba}e5IA5zqAAhp1zptO{g13Gg{^