fix: avoid double schema projection in file format readers (#5918)

This commit is contained in:
Weny Xu
2025-04-18 11:36:35 +08:00
committed by GitHub
parent e4556ce12b
commit cc1b297831
7 changed files with 200 additions and 12 deletions

View File

@@ -237,7 +237,7 @@ impl StatementExecutor {
path,
schema,
} => {
let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
@@ -255,17 +255,23 @@ impl StatementExecutor {
)),
None,
));
let projected_file_schema = Arc::new(
schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
let stream = self
.build_file_stream(
CsvOpener::new(csv_config, format.compression_type.into()),
path,
schema.clone(),
projected_file_schema,
)
.await?;
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
// The projection is already applied in the CSV reader when we created the stream,
// so we pass None here to avoid double projection which would cause schema mismatch errors.
RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
@@ -280,7 +286,7 @@ impl StatementExecutor {
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
@@ -290,17 +296,19 @@ impl StatementExecutor {
.build_file_stream(
JsonOpener::new(
DEFAULT_BATCH_SIZE,
projected_file_schema,
projected_file_schema.clone(),
format.compression_type.into(),
Arc::new(store),
),
path,
schema.clone(),
projected_file_schema,
)
.await?;
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
// The projection is already applied in the JSON reader when we created the stream,
// so we pass None here to avoid double projection which would cause schema mismatch errors.
RecordBatchStreamTypeAdapter::new(output_schema, stream, None)
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
@@ -325,13 +333,13 @@ impl StatementExecutor {
.build()
.context(error::BuildParquetRecordBatchStreamSnafu)?;
let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))
@@ -352,14 +360,14 @@ impl StatementExecutor {
.await
.context(error::ReadOrcSnafu)?;
let projected_schema = Arc::new(
let output_schema = Arc::new(
compat_schema
.project(&projection)
.context(error::ProjectSchemaSnafu)?,
);
Ok(Box::pin(
RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection))
RecordBatchStreamTypeAdapter::new(output_schema, stream, Some(projection))
.with_filter(filters)
.context(error::PhysicalExprSnafu)?,
))

View File

@@ -66,6 +66,42 @@ select * from with_pattern order by ts;
| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 |
+-------+------+--------+---------------------+
CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');
Affected Rows: 0
Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');
Affected Rows: 3
select * from demo_with_external_column order by ts;
+-------+------+--------+---------------------+-----------------+
| host | cpu | memory | ts | external_column |
+-------+------+--------+---------------------+-----------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value |
| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | default_value |
+-------+------+--------+---------------------+-----------------+
CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);
Affected Rows: 0
Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');
Affected Rows: 3
select * from demo_with_less_columns order by ts;
+-------+--------+---------------------+
| host | memory | ts |
+-------+--------+---------------------+
| host1 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 333.3 | 2022-06-15T07:02:38 |
| host3 | 444.4 | 2024-07-27T10:47:43 |
+-------+--------+---------------------+
drop table demo;
Affected Rows: 0
@@ -82,3 +118,11 @@ drop table with_pattern;
Affected Rows: 0
drop table demo_with_external_column;
Affected Rows: 0
drop table demo_with_less_columns;
Affected Rows: 0

View File

@@ -27,6 +27,18 @@ Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/csv/' WITH (pattern = 'demo.
select * from with_pattern order by ts;
CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');
Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');
select * from demo_with_external_column order by ts;
CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);
Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' WITH (format='csv');
select * from demo_with_less_columns order by ts;
drop table demo;
drop table with_filename;
@@ -34,3 +46,7 @@ drop table with_filename;
drop table with_path;
drop table with_pattern;
drop table demo_with_external_column;
drop table demo_with_less_columns;

View File

@@ -66,6 +66,42 @@ select * from with_pattern order by ts;
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+
CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');
Affected Rows: 0
Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');
Affected Rows: 3
select * from demo_with_external_column order by ts;
+-------+------+--------+---------------------+-----------------+
| host | cpu | memory | ts | external_column |
+-------+------+--------+---------------------+-----------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value |
| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | default_value |
+-------+------+--------+---------------------+-----------------+
CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);
Affected Rows: 0
Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');
Affected Rows: 3
select * from demo_with_less_columns order by ts;
+-------+--------+---------------------+
| host | memory | ts |
+-------+--------+---------------------+
| host1 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 333.3 | 2022-06-15T07:02:38 |
| host3 | 444.4 | 2024-07-27T10:47:43 |
+-------+--------+---------------------+
drop table demo;
Affected Rows: 0
@@ -82,3 +118,11 @@ drop table with_pattern;
Affected Rows: 0
drop table demo_with_external_column;
Affected Rows: 0
drop table demo_with_less_columns;
Affected Rows: 0

View File

@@ -27,6 +27,18 @@ Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/json/' WITH (pattern = 'demo
select * from with_pattern order by ts;
CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');
Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');
select * from demo_with_external_column order by ts;
CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);
Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/json/demo.json' WITH (format='json');
select * from demo_with_less_columns order by ts;
drop table demo;
drop table with_filename;
@@ -34,3 +46,7 @@ drop table with_filename;
drop table with_path;
drop table with_pattern;
drop table demo_with_external_column;
drop table demo_with_less_columns;

View File

@@ -123,6 +123,42 @@ Copy with_limit_rows_segment FROM '${SQLNESS_HOME}/demo/export/parquet_files/' L
Error: 2000(InvalidSyntax), Unexpected token while parsing SQL statement, expected: 'the number of maximum rows', found: ;: sql parser error: Expected: literal int, found: hello at Line: 1, Column: 86
CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');
Affected Rows: 0
Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
Affected Rows: 3
select * from demo_with_external_column order by ts;
+-------+-------+--------+---------------------+-----------------+
| host | cpu | memory | ts | external_column |
+-------+-------+--------+---------------------+-----------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | default_value |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | default_value |
| host3 | 111.1 | 444.4 | 2024-07-27T10:47:43 | default_value |
+-------+-------+--------+---------------------+-----------------+
CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);
Affected Rows: 0
Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
Affected Rows: 3
select * from demo_with_less_columns order by ts;
+-------+--------+---------------------+
| host | memory | ts |
+-------+--------+---------------------+
| host1 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 333.3 | 2022-06-15T07:02:38 |
| host3 | 444.4 | 2024-07-27T10:47:43 |
+-------+--------+---------------------+
drop table demo;
Affected Rows: 0
@@ -151,3 +187,11 @@ drop table with_limit_rows_segment;
Affected Rows: 0
drop table demo_with_external_column;
Affected Rows: 0
drop table demo_with_less_columns;
Affected Rows: 0

View File

@@ -52,6 +52,18 @@ select count(*) from with_limit_rows_segment;
Copy with_limit_rows_segment FROM '${SQLNESS_HOME}/demo/export/parquet_files/' LIMIT hello;
CREATE TABLE demo_with_external_column(host string, cpu double, memory double, ts timestamp time index, external_column string default 'default_value');
Copy demo_with_external_column FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
select * from demo_with_external_column order by ts;
CREATE TABLE demo_with_less_columns(host string, memory double, ts timestamp time index);
Copy demo_with_less_columns FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet';
select * from demo_with_less_columns order by ts;
drop table demo;
drop table demo_2;
@@ -65,3 +77,7 @@ drop table with_pattern;
drop table without_limit_rows;
drop table with_limit_rows_segment;
drop table demo_with_external_column;
drop table demo_with_less_columns;