From cc1b297831c0929ae393c2b90aeea516c06fa815 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 18 Apr 2025 11:36:35 +0800 Subject: [PATCH] fix: avoid double schema projection in file format readers (#5918) --- src/operator/src/statement/copy_table_from.rs | 32 +++++++++----- .../common/copy/copy_from_fs_csv.result | 44 +++++++++++++++++++ .../common/copy/copy_from_fs_csv.sql | 16 +++++++ .../common/copy/copy_from_fs_json.result | 44 +++++++++++++++++++ .../common/copy/copy_from_fs_json.sql | 16 +++++++ .../common/copy/copy_from_fs_parquet.result | 44 +++++++++++++++++++ .../common/copy/copy_from_fs_parquet.sql | 16 +++++++ 7 files changed, 200 insertions(+), 12 deletions(-) diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 2d9fbef6b9..505d74c3d7 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -237,7 +237,7 @@ impl StatementExecutor { path, schema, } => { - let projected_schema = Arc::new( + let output_schema = Arc::new( compat_schema .project(&projection) .context(error::ProjectSchemaSnafu)?, @@ -255,17 +255,23 @@ impl StatementExecutor { )), None, )); - + let projected_file_schema = Arc::new( + schema + .project(&projection) + .context(error::ProjectSchemaSnafu)?, + ); let stream = self .build_file_stream( CsvOpener::new(csv_config, format.compression_type.into()), path, - schema.clone(), + projected_file_schema, ) .await?; Ok(Box::pin( - RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + // The projection is already applied in the CSV reader when we created the stream, + // so we pass None here to avoid double projection which would cause schema mismatch errors. + RecordBatchStreamTypeAdapter::new(output_schema, stream, None) .with_filter(filters) .context(error::PhysicalExprSnafu)?, )) @@ -280,7 +286,7 @@ impl StatementExecutor { .project(&projection) .context(error::ProjectSchemaSnafu)?, ); - let projected_schema = Arc::new( + let output_schema = Arc::new( compat_schema .project(&projection) .context(error::ProjectSchemaSnafu)?, @@ -290,17 +296,19 @@ impl StatementExecutor { .build_file_stream( JsonOpener::new( DEFAULT_BATCH_SIZE, - projected_file_schema, + projected_file_schema.clone(), format.compression_type.into(), Arc::new(store), ), path, - schema.clone(), + projected_file_schema, ) .await?; Ok(Box::pin( - RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + // The projection is already applied in the JSON reader when we created the stream, + // so we pass None here to avoid double projection which would cause schema mismatch errors. + RecordBatchStreamTypeAdapter::new(output_schema, stream, None) .with_filter(filters) .context(error::PhysicalExprSnafu)?, )) @@ -325,13 +333,13 @@ impl StatementExecutor { .build() .context(error::BuildParquetRecordBatchStreamSnafu)?; - let projected_schema = Arc::new( + let output_schema = Arc::new( compat_schema .project(&projection) .context(error::ProjectSchemaSnafu)?, ); Ok(Box::pin( - RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection)) .with_filter(filters) .context(error::PhysicalExprSnafu)?, )) @@ -352,14 +360,14 @@ impl StatementExecutor { .await .context(error::ReadOrcSnafu)?; - let projected_schema = Arc::new( + let output_schema = Arc::new( compat_schema .project(&projection) .context(error::ProjectSchemaSnafu)?, ); Ok(Box::pin( - RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection)) .with_filter(filters) .context(error::PhysicalExprSnafu)?, )) diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.result b/tests/cases/standalone/common/copy/copy_from_fs_csv.result index f412677a32..d1838a5079 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_csv.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.result @@ -66,6 +66,42 @@ select * from with_pattern order by ts; | host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | +-------+------+--------+---------------------+ +CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value'); + +Affected Rows: 0 + +Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv'); + +Affected Rows: 3 + +select * from demo_with_external_column order by ts; + ++-------+------+--------+---------------------+-----------------+ +| host | cpu | memory | ts | external_column | ++-------+------+--------+---------------------+-----------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value | +| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | default_value | ++-------+------+--------+---------------------+-----------------+ + +CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv'); + +Affected Rows: 3 + +select * from demo_with_less_columns order by ts; + ++-------+--------+---------------------+ +| host | memory | ts | ++-------+--------+---------------------+ +| host1 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 333.3 | 2022-06-15T07:02:38 | +| host3 | 444.4 | 2024-07-27T10:47:43 | ++-------+--------+---------------------+ + drop table demo; Affected Rows: 0 @@ -82,3 +118,11 @@ drop table with_pattern; Affected Rows: 0 +drop table demo_with_external_column; + +Affected Rows: 0 + +drop table demo_with_less_columns; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql index f2c6ccf5bd..8c4e9b3aa9 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql @@ -27,6 +27,18 @@ Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/csv/' WITH (pattern = 'demo. select * from with_pattern order by ts; +CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value'); + +Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv'); + +select * from demo_with_external_column order by ts; + +CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index); + +Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv'); + +select * from demo_with_less_columns order by ts; + drop table demo; drop table with_filename; @@ -34,3 +46,7 @@ drop table with_filename; drop table with_path; drop table with_pattern; + +drop table demo_with_external_column; + +drop table demo_with_less_columns; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.result b/tests/cases/standalone/common/copy/copy_from_fs_json.result index e1e3a810d5..153fac1e87 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_json.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.result @@ -66,6 +66,42 @@ select * from with_pattern order by ts; | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +-------+------+--------+---------------------+ +CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value'); + +Affected Rows: 0 + +Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json'); + +Affected Rows: 3 + +select * from demo_with_external_column order by ts; + ++-------+------+--------+---------------------+-----------------+ +| host | cpu | memory | ts | external_column | ++-------+------+--------+---------------------+-----------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value | +| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | default_value | ++-------+------+--------+---------------------+-----------------+ + +CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json'); + +Affected Rows: 3 + +select * from demo_with_less_columns order by ts; + ++-------+--------+---------------------+ +| host | memory | ts | ++-------+--------+---------------------+ +| host1 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 333.3 | 2022-06-15T07:02:38 | +| host3 | 444.4 | 2024-07-27T10:47:43 | ++-------+--------+---------------------+ + drop table demo; Affected Rows: 0 @@ -82,3 +118,11 @@ drop table with_pattern; Affected Rows: 0 +drop table demo_with_external_column; + +Affected Rows: 0 + +drop table demo_with_less_columns; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.sql b/tests/cases/standalone/common/copy/copy_from_fs_json.sql index 55e8a55ebf..b032332262 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_json.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.sql @@ -27,6 +27,18 @@ Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/json/' WITH (pattern = 'demo select * from with_pattern order by ts; +CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value'); + +Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json'); + +select * from demo_with_external_column order by ts; + +CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index); + +Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json'); + +select * from demo_with_less_columns order by ts; + drop table demo; drop table with_filename; @@ -34,3 +46,7 @@ drop table with_filename; drop table with_path; drop table with_pattern; + +drop table demo_with_external_column; + +drop table demo_with_less_columns; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index f31aebd7b1..ac7fbcddba 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -123,6 +123,42 @@ Copy with_limit_rows_segment FROM '${SQLNESS_HOME}/demo/export/parquet_files/' L Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement, expected: 'the number of maximum rows', found: ;: sql parser error: Expected: literal int, found: hello at Line: 1, Column: 86 +CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value'); + +Affected Rows: 0 + +Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet'; + +Affected Rows: 3 + +select * from demo_with_external_column order by ts; + ++-------+-------+--------+---------------------+-----------------+ +| host | cpu | memory | ts | external_column | ++-------+-------+--------+---------------------+-----------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value | +| host3 | 111.1 | 444.4 | 2024-07-27T10:47:43 | default_value | ++-------+-------+--------+---------------------+-----------------+ + +CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet'; + +Affected Rows: 3 + +select * from demo_with_less_columns order by ts; + ++-------+--------+---------------------+ +| host | memory | ts | ++-------+--------+---------------------+ +| host1 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 333.3 | 2022-06-15T07:02:38 | +| host3 | 444.4 | 2024-07-27T10:47:43 | ++-------+--------+---------------------+ + drop table demo; Affected Rows: 0 @@ -151,3 +187,11 @@ drop table with_limit_rows_segment; Affected Rows: 0 +drop table demo_with_external_column; + +Affected Rows: 0 + +drop table demo_with_less_columns; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index 0db5e119d8..13e15bd91e 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -52,6 +52,18 @@ select count(*) from with_limit_rows_segment; Copy with_limit_rows_segment FROM '${SQLNESS_HOME}/demo/export/parquet_files/' LIMIT hello; +CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value'); + +Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet'; + +select * from demo_with_external_column order by ts; + +CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index); + +Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet'; + +select * from demo_with_less_columns order by ts; + drop table demo; drop table demo_2; @@ -65,3 +77,7 @@ drop table with_pattern; drop table without_limit_rows; drop table with_limit_rows_segment; + +drop table demo_with_external_column; + +drop table demo_with_less_columns;