feat: support to create parquet format external table (#1463)

* feat: support parquet format external table

* Update src/file-table-engine/src/error.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Weny Xu
2023-04-26 17:45:37 +09:00
committed by GitHub
parent fb9978e95d
commit a709a5c842
6 changed files with 208 additions and 35 deletions

View File

@@ -16,6 +16,7 @@ use std::any::Any;
use common_error::prelude::*;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
use serde_json::error::Error as JsonError;
use snafu::Location;
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError};
@@ -165,6 +166,18 @@ pub enum Error {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
#[snafu(display("Failed to generate parquet scan plan: {}", source))]
ParquetScanPlan {
source: DataFusionError,
location: Location,
},
#[snafu(display("Failed to convert schema: {}", source))]
ConvertSchema {
source: datatypes::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -181,7 +194,8 @@ impl ErrorExt for Error {
| UnsupportedFileFormat { .. }
| BuildCsvConfig { .. }
| ProjectSchema { .. }
| MissingRequiredField { .. } => StatusCode::InvalidArguments,
| MissingRequiredField { .. }
| ConvertSchema { .. } => StatusCode::InvalidArguments,
BuildBackend { source, .. } => source.status_code(),
BuildStreamAdapter { source, .. } => source.status_code(),
@@ -197,7 +211,8 @@ impl ErrorExt for Error {
| ConvertRaw { .. }
| DropTable { .. }
| WriteImmutableManifest { .. }
| BuildStream { .. } => StatusCode::Unexpected,
| BuildStream { .. }
| ParquetScanPlan { .. } => StatusCode::Unexpected,
}
}

View File

@@ -16,16 +16,21 @@ use std::sync::Arc;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat};
use common_datasource::file_format::Format;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef};
use common_query::prelude::Expr;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::arrow::datatypes::Schema;
use datafusion::common::ToDFSchema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::schema::SchemaRef;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::{Schema, SchemaRef};
use object_store::ObjectStore;
use snafu::ResultExt;
use table::table::scan::SimpleTableScan;
@@ -38,7 +43,7 @@ const DEFAULT_BATCH_SIZE: usize = 8192;
pub struct CreateScanPlanContext {}
fn build_csv_opener(
file_schema: Arc<Schema>,
file_schema: Arc<ArrowSchema>,
config: &ScanPlanConfig,
format: &CsvFormat,
) -> Result<CsvOpener> {
@@ -58,7 +63,7 @@ fn build_csv_opener(
}
fn build_json_opener(
file_schema: Arc<Schema>,
file_schema: Arc<ArrowSchema>,
config: &ScanPlanConfig,
format: &JsonFormat,
) -> Result<JsonOpener> {
@@ -81,7 +86,7 @@ fn build_json_opener(
fn build_scan_plan<T: FileOpener + Send + 'static>(
opener: T,
file_schema: Arc<Schema>,
file_schema: Arc<ArrowSchema>,
files: &[String],
projection: Option<&Vec<usize>>,
limit: Option<usize>,
@@ -143,6 +148,76 @@ fn new_json_scan_plan(
)
}
fn new_parquet_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
_format: &ParquetFormat,
) -> Result<PhysicalPlanRef> {
let file_schema = config.file_schema.arrow_schema().clone();
let ScanPlanConfig {
files,
projection,
limit,
filters,
store,
..
} = config;
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema: file_schema.clone(),
file_groups: vec![files
.iter()
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
.collect::<Vec<_>>()],
statistics: Default::default(),
projection: projection.cloned(),
limit: *limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
let filters = filters
.iter()
.map(|f| f.df_expr().clone())
.collect::<Vec<_>>();
let filters = if let Some(expr) = conjunction(filters) {
let df_schema = file_schema
.clone()
.to_dfschema_ref()
.context(error::ParquetScanPlanSnafu)?;
let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new())
.context(error::ParquetScanPlanSnafu)?;
Some(filters)
} else {
None
};
let exec = ParquetExec::new(scan_config, filters, None).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(store.clone())),
);
let projected_schema = if let Some(projection) = config.projection {
Arc::new(
file_schema
.project(projection)
.context(error::ProjectSchemaSnafu)?,
)
} else {
file_schema
};
let schema = Schema::try_from(projected_schema).context(error::ConvertSchemaSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(schema),
Arc::new(exec),
)))
}
#[derive(Debug, Clone)]
pub struct ScanPlanConfig<'a> {
pub file_schema: SchemaRef,
@@ -161,6 +236,6 @@ pub fn create_physical_plan(
match format {
Format::Csv(format) => new_csv_scan_plan(ctx, config, format),
Format::Json(format) => new_json_scan_plan(ctx, config, format),
Format::Parquet(_) => error::UnsupportedFileFormatSnafu { format: "parquet" }.fail(),
Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format),
}
}

View File

@@ -512,6 +512,76 @@ async fn test_execute_external_create_without_ts_type(instance: Arc<dyn MockInst
assert!(matches!(output, Output::AffectedRows(0)));
}
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "parquet";
let location = get_data_dir("../../tests/data/parquet/various_type.parquet")
.canonicalize()
.unwrap()
.display()
.to_string();
let table_name = "various_type_parquet";
let output = execute_sql(
&instance,
&format!(
r#"create external table {table_name} with (location='{location}', format='{format}');"#,
),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = execute_sql(&instance, &format!("desc table {table_name};")).await;
let expect = "\
+------------+-----------------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+------------+-----------------+------+---------+---------------+
| c_int | Int64 | YES | | FIELD |
| c_float | Float64 | YES | | FIELD |
| c_string | Float64 | YES | | FIELD |
| c_bool | Boolean | YES | | FIELD |
| c_date | Date | YES | | FIELD |
| c_datetime | TimestampSecond | YES | | FIELD |
+------------+-----------------+------+---------+---------------+";
check_output_stream(output, expect).await;
let output = execute_sql(&instance, &format!("select * from {table_name};")).await;
let expect = "\
+-------+-----------+----------+--------+------------+---------------------+
| c_int | c_float | c_string | c_bool | c_date | c_datetime |
+-------+-----------+----------+--------+------------+---------------------+
| 1 | 1.1 | 1.11 | true | 1970-01-01 | 1970-01-01T00:00:00 |
| 2 | 2.2 | 2.22 | true | 2020-11-08 | 2020-11-08T01:00:00 |
| 3 | | 3.33 | true | 1969-12-31 | 1969-11-08T02:00:00 |
| 4 | 4.4 | | false | | |
| 5 | 6.6 | | false | 1990-01-01 | 1990-01-01T03:00:00 |
| 4 | 4000000.0 | | false | | |
| 4 | 4.0e-6 | | false | | |
+-------+-----------+----------+--------+------------+---------------------+";
check_output_stream(output, expect).await;
let output = execute_sql(
&instance,
&format!("select c_bool,c_int,c_bool as b from {table_name};"),
)
.await;
let expect = "\
+--------+-------+-------+
| c_bool | c_int | b |
+--------+-------+-------+
| true | 1 | true |
| true | 2 | true |
| true | 3 | true |
| false | 4 | false |
| false | 5 | false |
| false | 4 | false |
| false | 4 | false |
+--------+-------+-------+";
check_output_stream(output, expect).await;
}
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();

View File

@@ -19,10 +19,7 @@ use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_datasource::file_format::csv::CsvFormat;
use common_datasource::file_format::json::JsonFormat;
use common_datasource::file_format::parquet::ParquetFormat;
use common_datasource::file_format::{infer_schemas, FileFormat};
use common_datasource::file_format::{infer_schemas, FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::build_backend;
use common_datasource::util::find_dir_and_filename;
@@ -40,9 +37,7 @@ use sql::ast::ColumnDef;
use sql::statements::column_def_to_schema;
use sql::statements::create::Partitions;
use sql::statements::show::{ShowDatabases, ShowKind, ShowTables};
use table::requests::{
IMMUTABLE_TABLE_FORMAT_KEY, IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY,
};
use table::requests::{IMMUTABLE_TABLE_LOCATION_KEY, IMMUTABLE_TABLE_PATTERN_KEY};
use table::TableRef;
use crate::error::{self, Result};
@@ -336,24 +331,13 @@ async fn prepare_immutable_file_table(
fn parse_immutable_file_table_format(
options: &HashMap<String, String>,
) -> Result<Box<dyn FileFormat>> {
let format = options
.get(IMMUTABLE_TABLE_FORMAT_KEY)
.cloned()
.unwrap_or_default()
.to_uppercase();
match format.as_str() {
"CSV" => {
let file_format = CsvFormat::try_from(options).context(error::ParseFileFormatSnafu)?;
Ok(Box::new(file_format))
}
"JSON" => {
let file_format = JsonFormat::try_from(options).context(error::ParseFileFormatSnafu)?;
Ok(Box::new(file_format))
}
"PARQUET" => Ok(Box::new(ParquetFormat {})),
format => error::UnsupportedFileFormatSnafu { format }.fail(),
}
Ok(
match Format::try_from(options).context(error::ParseFileFormatSnafu)? {
Format::Csv(format) => Box::new(format),
Format::Json(format) => Box::new(format),
Format::Parquet(format) => Box::new(format),
},
)
}
async fn infer_immutable_file_table_schema(

View File

@@ -0,0 +1,29 @@
### various_type.parquet
Schema:
```
+-------------+-------------------------+-------------+
| column_name | data_type | is_nullable |
+-------------+-------------------------+-------------+
| c_int | Int64 | YES |
| c_float | Float64 | YES |
| c_string | Float64 | YES |
| c_bool | Boolean | YES |
| c_date | Date32 | YES |
| c_datetime | Timestamp(Second, None) | YES |
+-------------+-------------------------+-------------+
```
Data:
```
+-------+----------+----------+--------+------------+---------------------+
| c_int | c_float | c_string | c_bool | c_date | c_datetime |
+-------+----------+----------+--------+------------+---------------------+
| 1 | 1.1 | 1.11 | true | 1970-01-01 | 1970-01-01T00:00:00 |
| 2 | 2.2 | 2.22 | true | 2020-11-08 | 2020-11-08T01:00:00 |
| 3 | | 3.33 | true | 1969-12-31 | 1969-11-08T02:00:00 |
| 4 | 4.4 | | false | | |
| 5 | 6.6 | | false | 1990-01-01 | 1990-01-01T03:00:00 |
| 4 | 4000000 | | false | | |
| 4 | 0.000004 | | false | | |
+-------+----------+----------+--------+------------+---------------------+
```

Binary file not shown.