From 6ed67167bb93d3219e741bb346766d876b21f513 Mon Sep 17 00:00:00 2001 From: QuakeWang <45645138+QuakeWang@users.noreply.github.com> Date: Thu, 4 Jun 2026 20:04:15 +0800 Subject: [PATCH] feat: support headerless CSV copy from (#8233) * feat: support headerless CSV copy from Signed-off-by: QuakeWang * fix: update csv copy sqlness result Signed-off-by: QuakeWang * test: cover headerless CSV copy from Signed-off-by: QuakeWang * test: cover headerless CSV column count mismatch Signed-off-by: QuakeWang --------- Signed-off-by: QuakeWang --- src/common/datasource/src/file_format.rs | 1 + src/common/datasource/src/file_format/csv.rs | 64 ++++- src/operator/src/statement/copy_table_from.rs | 243 ++++++++++++++++-- src/sql/src/parsers/copy_parser.rs | 4 +- tests-integration/src/tests/instance_test.rs | 125 +++++++++ tests/data/csv/headerless.csv | 2 + tests/data/csv/headerless_extra_columns.csv | 2 + tests/data/csv/headerless_fewer_columns.csv | 2 + .../data/csv/headerless_skip_bad_records.csv | 3 + 9 files changed, 421 insertions(+), 25 deletions(-) create mode 100644 tests/data/csv/headerless.csv create mode 100644 tests/data/csv/headerless_extra_columns.csv create mode 100644 tests/data/csv/headerless_fewer_columns.csv create mode 100644 tests/data/csv/headerless_skip_bad_records.csv diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index d9d7b8b648..028aeef14d 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -60,6 +60,7 @@ use crate::share_buffer::SharedBuffer; pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type"; pub const FORMAT_DELIMITER: &str = "delimiter"; pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record"; +pub const FORMAT_HEADERS: &str = "headers"; pub const FORMAT_HAS_HEADER: &str = "has_header"; pub const FORMAT_SKIP_BAD_RECORDS: &str = "skip_bad_records"; pub const FORMAT_TYPE: &str = "format"; diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 2b39051b48..24b61340f5 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -87,9 +87,26 @@ impl TryFrom<&HashMap> for CsvFormat { .build() })?); }; - if let Some(has_header) = value.get(file_format::FORMAT_HAS_HEADER) { - format.has_header = parse_bool(file_format::FORMAT_HAS_HEADER, has_header)?; - }; + let headers = value + .get(file_format::FORMAT_HEADERS) + .map(|headers| parse_bool(file_format::FORMAT_HEADERS, headers)) + .transpose()?; + let has_header = value + .get(file_format::FORMAT_HAS_HEADER) + .map(|has_header| parse_bool(file_format::FORMAT_HAS_HEADER, has_header)) + .transpose()?; + match (headers, has_header) { + (Some(headers), Some(has_header)) if headers != has_header => { + return error::ParseFormatSnafu { + key: file_format::FORMAT_HEADERS, + value: format!("headers={headers}, has_header={has_header}"), + } + .fail(); + } + (Some(headers), _) => format.has_header = headers, + (_, Some(has_header)) => format.has_header = has_header, + _ => {} + } if let Some(skip_bad_records) = value.get(file_format::FORMAT_SKIP_BAD_RECORDS) { format.skip_bad_records = parse_bool(file_format::FORMAT_SKIP_BAD_RECORDS, skip_bad_records)?; @@ -347,7 +364,7 @@ mod tests { use super::*; use crate::file_format::{ - FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER, + FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER, FORMAT_HEADERS, FORMAT_SCHEMA_INFER_MAX_RECORD, FORMAT_SKIP_BAD_RECORDS, FileFormat, file_to_stream, }; use crate::test_util::{format_schema, test_store}; @@ -491,12 +508,51 @@ mod tests { ..CsvFormat::default() } ); + + let map = HashMap::from([(FORMAT_HEADERS.to_string(), "true".to_string())]); + let format = CsvFormat::try_from(&map).unwrap(); + assert_eq!(format, CsvFormat::default()); + + let map = HashMap::from([(FORMAT_HEADERS.to_string(), "false".to_string())]); + let format = CsvFormat::try_from(&map).unwrap(); + assert_eq!( + format, + CsvFormat { + has_header: false, + ..CsvFormat::default() + } + ); + + let map = HashMap::from([ + (FORMAT_HEADERS.to_string(), "false".to_string()), + (FORMAT_HAS_HEADER.to_string(), "false".to_string()), + ]); + let format = CsvFormat::try_from(&map).unwrap(); + assert_eq!( + format, + CsvFormat { + has_header: false, + ..CsvFormat::default() + } + ); } #[test] fn test_try_from_rejects_invalid_bool_options() { let map = HashMap::from([(FORMAT_SKIP_BAD_RECORDS.to_string(), "yes".to_string())]); assert!(CsvFormat::try_from(&map).is_err()); + + let map = HashMap::from([(FORMAT_HEADERS.to_string(), "yes".to_string())]); + assert!(CsvFormat::try_from(&map).is_err()); + } + + #[test] + fn test_try_from_rejects_conflicting_header_options() { + let map = HashMap::from([ + (FORMAT_HEADERS.to_string(), "false".to_string()), + (FORMAT_HAS_HEADER.to_string(), "true".to_string()), + ]); + assert!(CsvFormat::try_from(&map).is_err()); } #[tokio::test] diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index cae2835242..b64c654923 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -392,24 +392,23 @@ impl StatementExecutor { .collect_metadata(&object_store, format.clone(), path.to_string()) .await?; - let file_schema = file_metadata.schema(); - let (file_schema_projection, table_schema_projection, compat_schema) = - generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema); + let schema_mapping = copy_from_schema_mapping(&file_metadata, &table_schema); let projected_file_schema = Arc::new( - file_schema - .project(&file_schema_projection) + file_metadata + .schema() + .project(&schema_mapping.file_projection) .context(error::ProjectSchemaSnafu)?, ); let projected_table_schema = Arc::new( table_schema - .project(&table_schema_projection) + .project(&schema_mapping.table_projection) .context(error::ProjectSchemaSnafu)?, ); ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?; files.push(( - Arc::new(compat_schema), - file_schema_projection, + Arc::new(schema_mapping.compat_file_schema), + schema_mapping.file_projection, projected_table_schema, file_metadata, )) @@ -581,18 +580,11 @@ fn csv_reader_schema_for_skip_bad_records(file: &SchemaRef, compat: &SchemaRef) .fields() .iter() .enumerate() - .map(|(idx, file_field)| { - let compat_field = compat - .fields() - .find(file_field.name()) - .map(|(_, field)| field); - - match compat_field { - Some(compat_field) if can_csv_reader_parse_type(compat_field.data_type()) => { - compat_field.clone() - } - _ => file.fields()[idx].clone(), + .map(|(idx, file_field)| match compat.fields().get(idx) { + Some(compat_field) if can_csv_reader_parse_type(compat_field.data_type()) => { + compat_field.clone() } + _ => file_field.clone(), }) .collect::>(); @@ -678,6 +670,62 @@ fn generated_schema_projection_and_compatible_file_schema( ) } +struct CopyFromSchemaMapping { + file_projection: Vec, + table_projection: Vec, + compat_file_schema: Schema, +} + +fn copy_from_schema_mapping( + file_metadata: &FileMetadata, + table: &SchemaRef, +) -> CopyFromSchemaMapping { + match file_metadata { + FileMetadata::Csv { schema, format, .. } if !format.has_header => { + generated_positional_schema_projection_and_compatible_file_schema(schema, table) + } + _ => { + let (file_projection, table_projection, compat_file_schema) = + generated_schema_projection_and_compatible_file_schema( + file_metadata.schema(), + table, + ); + CopyFromSchemaMapping { + file_projection, + table_projection, + compat_file_schema, + } + } + } +} + +fn generated_positional_schema_projection_and_compatible_file_schema( + file: &SchemaRef, + table: &SchemaRef, +) -> CopyFromSchemaMapping { + let len = file.fields.len().min(table.fields.len()); + let file_projection = (0..len).collect::>(); + let table_projection = (0..len).collect::>(); + let compatible_fields = file + .fields + .iter() + .enumerate() + .map(|(idx, file_field)| { + if idx < len { + table.fields[idx].clone() + } else { + file_field.clone() + } + }) + .collect::>(); + + CopyFromSchemaMapping { + file_projection, + table_projection, + compat_file_schema: Schema::new(compatible_fields), + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -938,4 +986,161 @@ mod tests { compat_schema.field(2).data_type() ); } + + fn make_csv_metadata(schema: Arc, has_header: bool) -> FileMetadata { + FileMetadata::Csv { + schema, + format: CsvFormat { + has_header, + ..CsvFormat::default() + }, + path: "test.csv".to_string(), + } + } + + fn assert_field(schema: &Schema, idx: usize, name: &str, data_type: &DataType) { + let field = schema.field(idx); + assert_eq!(field.name(), name); + assert_eq!(field.data_type(), data_type); + } + + #[test] + fn test_headerless_csv_schema_projection_is_positional() { + let file_schema = make_test_schema(&[ + Field::new("column_1", DataType::UInt8, true), + Field::new("column_2", DataType::Float64, true), + Field::new("column_3", DataType::Utf8, true), + ]); + let table_schema = make_test_schema(&[ + Field::new("host_id", DataType::UInt32, true), + Field::new("reading_value", DataType::Float64, true), + Field::new( + "ts", + DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None), + true, + ), + ]); + + let mapping = + copy_from_schema_mapping(&make_csv_metadata(file_schema, false), &table_schema); + + assert_eq!(mapping.file_projection, vec![0, 1, 2]); + assert_eq!(mapping.table_projection, vec![0, 1, 2]); + assert_field(&mapping.compat_file_schema, 0, "host_id", &DataType::UInt32); + assert_field( + &mapping.compat_file_schema, + 1, + "reading_value", + &DataType::Float64, + ); + assert_field( + &mapping.compat_file_schema, + 2, + "ts", + table_schema.field(2).data_type(), + ); + assert_eq!( + mapping + .compat_file_schema + .project(&mapping.file_projection) + .unwrap(), + table_schema.project(&mapping.table_projection).unwrap() + ); + } + + #[test] + fn test_headerless_csv_schema_projection_ignores_extra_file_columns() { + let file_schema = make_test_schema(&[ + Field::new("column_1", DataType::UInt8, true), + Field::new("column_2", DataType::Float64, true), + Field::new("column_3", DataType::Utf8, true), + Field::new("column_4", DataType::Utf8, true), + ]); + let table_schema = make_test_schema(&[ + Field::new("host_id", DataType::UInt32, true), + Field::new("reading_value", DataType::Float64, true), + Field::new("ts", DataType::Utf8, true), + ]); + + let mapping = + copy_from_schema_mapping(&make_csv_metadata(file_schema, false), &table_schema); + + assert_eq!(mapping.file_projection, vec![0, 1, 2]); + assert_eq!(mapping.table_projection, vec![0, 1, 2]); + assert_eq!(mapping.compat_file_schema.fields().len(), 4); + assert_field(&mapping.compat_file_schema, 0, "host_id", &DataType::UInt32); + assert_field( + &mapping.compat_file_schema, + 1, + "reading_value", + &DataType::Float64, + ); + assert_field(&mapping.compat_file_schema, 2, "ts", &DataType::Utf8); + assert_field(&mapping.compat_file_schema, 3, "column_4", &DataType::Utf8); + } + + #[test] + fn test_headerless_csv_schema_projection_supports_prefix_import() { + let file_schema = make_test_schema(&[ + Field::new("column_1", DataType::UInt8, true), + Field::new("column_2", DataType::Float64, true), + ]); + let table_schema = make_test_schema(&[ + Field::new("host_id", DataType::UInt32, true), + Field::new("reading_value", DataType::Float64, true), + Field::new("ts", DataType::Utf8, true), + ]); + + let mapping = + copy_from_schema_mapping(&make_csv_metadata(file_schema, false), &table_schema); + + assert_eq!(mapping.file_projection, vec![0, 1]); + assert_eq!(mapping.table_projection, vec![0, 1]); + assert_field(&mapping.compat_file_schema, 0, "host_id", &DataType::UInt32); + assert_field( + &mapping.compat_file_schema, + 1, + "reading_value", + &DataType::Float64, + ); + assert_eq!( + mapping + .compat_file_schema + .project(&mapping.file_projection) + .unwrap(), + table_schema.project(&mapping.table_projection).unwrap() + ); + } + + #[test] + fn test_csv_reader_schema_for_skip_bad_records_uses_positional_mapping() { + let file_schema = make_test_schema(&[ + Field::new("column_1", DataType::Utf8, true), + Field::new("column_2", DataType::Utf8, true), + Field::new("column_3", DataType::Utf8, true), + ]); + let table_schema = make_test_schema(&[ + Field::new("host_id", DataType::UInt32, true), + Field::new("jsons", DataType::Binary, true), + Field::new( + "ts", + DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None), + true, + ), + ]); + let mapping = copy_from_schema_mapping( + &make_csv_metadata(file_schema.clone(), false), + &table_schema, + ); + let compat_schema = Arc::new(mapping.compat_file_schema); + + let reader_schema = csv_reader_schema_for_skip_bad_records(&file_schema, &compat_schema); + + assert_eq!(reader_schema.field(0).data_type(), &DataType::UInt32); + assert_eq!(reader_schema.field(1).data_type(), &DataType::Utf8); + assert_eq!( + reader_schema.field(2).data_type(), + table_schema.field(2).data_type() + ); + } } diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 491912c82e..dbd207406e 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -403,8 +403,7 @@ mod tests { #[test] fn test_parse_copy_table_from_csv_options() { - let sql = - "COPY my_table FROM '/tmp/test.csv' WITH (FORMAT = 'CSV', SKIP_BAD_RECORDS = 'false')"; + let sql = "COPY my_table FROM '/tmp/test.csv' WITH (FORMAT = 'CSV', SKIP_BAD_RECORDS = 'false', HEADERS = 'false')"; let mut result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) .unwrap(); @@ -418,6 +417,7 @@ mod tests { ))) => { assert_eq!(copy_table.with.get("format"), Some("CSV")); assert_eq!(copy_table.with.get("skip_bad_records"), Some("false")); + assert_eq!(copy_table.with.get("headers"), Some("false")); } _ => unreachable!(), } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 67f9975ba9..59f6a626c7 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -965,6 +965,131 @@ async fn test_execute_query_external_table_csv(instance: Arc) check_output_stream(output, expect).await; } +#[apply(both_instances_cases)] +async fn test_execute_copy_from_headerless_csv(instance: Arc) { + let instance = instance.frontend(); + let csv_path = find_testing_resource("/tests/data/csv/headerless.csv"); + let skip_bad_records_csv_path = + find_testing_resource("/tests/data/csv/headerless_skip_bad_records.csv"); + let extra_columns_csv_path = + find_testing_resource("/tests/data/csv/headerless_extra_columns.csv"); + let fewer_columns_csv_path = + find_testing_resource("/tests/data/csv/headerless_fewer_columns.csv"); + + let output = execute_sql( + &instance, + "CREATE TABLE csv_headerless(host_id INT, host_name STRING, reading_value DOUBLE, ts TIMESTAMP TIME INDEX);", + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let output = execute_sql( + &instance, + &format!("COPY csv_headerless FROM '{csv_path}' WITH (FORMAT='csv', HEADERS='false');"), + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(2))); + + let output = execute_sql(&instance, "SELECT * FROM csv_headerless ORDER BY ts;") + .await + .data; + let expect = "\ ++---------+-----------+---------------+---------------------+ +| host_id | host_name | reading_value | ts | ++---------+-----------+---------------+---------------------+ +| 1 | Alice | 10.5 | 2024-01-01T00:00:00 | +| 2 | Bob | 30.5 | 2024-01-01T00:00:02 | ++---------+-----------+---------------+---------------------+"; + check_output_stream(output, expect).await; + + let output = execute_sql( + &instance, + "CREATE TABLE csv_headerless_skip_bad_records(host_id INT, host_name STRING, reading_value DOUBLE, ts TIMESTAMP TIME INDEX);", + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let output = execute_sql( + &instance, + &format!( + "COPY csv_headerless_skip_bad_records FROM '{skip_bad_records_csv_path}' WITH (FORMAT='csv', HEADERS='false', SKIP_BAD_RECORDS='true');" + ), + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(2))); + + let output = execute_sql( + &instance, + "SELECT * FROM csv_headerless_skip_bad_records ORDER BY ts;", + ) + .await + .data; + check_output_stream(output, expect).await; + + let output = execute_sql( + &instance, + "CREATE TABLE csv_headerless_extra_columns(host_id INT, host_name STRING, reading_value DOUBLE, ts TIMESTAMP TIME INDEX);", + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let output = execute_sql( + &instance, + &format!( + "COPY csv_headerless_extra_columns FROM '{extra_columns_csv_path}' WITH (FORMAT='csv', HEADERS='false');" + ), + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(2))); + + let output = execute_sql( + &instance, + "SELECT * FROM csv_headerless_extra_columns ORDER BY ts;", + ) + .await + .data; + check_output_stream(output, expect).await; + + let output = execute_sql( + &instance, + "CREATE TABLE csv_headerless_fewer_columns(host_id INT, host_name STRING, ts TIMESTAMP TIME INDEX, reading_value DOUBLE DEFAULT 42.0);", + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let output = execute_sql( + &instance, + &format!( + "COPY csv_headerless_fewer_columns FROM '{fewer_columns_csv_path}' WITH (FORMAT='csv', HEADERS='false');" + ), + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(2))); + + let output = execute_sql( + &instance, + "SELECT * FROM csv_headerless_fewer_columns ORDER BY ts;", + ) + .await + .data; + let expect = "\ ++---------+-----------+---------------------+---------------+ +| host_id | host_name | ts | reading_value | ++---------+-----------+---------------------+---------------+ +| 1 | Alice | 2024-01-01T00:00:00 | 42.0 | +| 2 | Bob | 2024-01-01T00:00:02 | 42.0 | ++---------+-----------+---------------------+---------------+"; + check_output_stream(output, expect).await; +} + #[apply(both_instances_cases)] async fn test_execute_query_external_table_json(instance: Arc) { unsafe { diff --git a/tests/data/csv/headerless.csv b/tests/data/csv/headerless.csv new file mode 100644 index 0000000000..ebba81e3f6 --- /dev/null +++ b/tests/data/csv/headerless.csv @@ -0,0 +1,2 @@ +1,Alice,10.5,2024-01-01T00:00:00 +2,Bob,30.5,2024-01-01T00:00:02 diff --git a/tests/data/csv/headerless_extra_columns.csv b/tests/data/csv/headerless_extra_columns.csv new file mode 100644 index 0000000000..5be5db83ec --- /dev/null +++ b/tests/data/csv/headerless_extra_columns.csv @@ -0,0 +1,2 @@ +1,Alice,10.5,2024-01-01T00:00:00,ignored +2,Bob,30.5,2024-01-01T00:00:02,ignored diff --git a/tests/data/csv/headerless_fewer_columns.csv b/tests/data/csv/headerless_fewer_columns.csv new file mode 100644 index 0000000000..8485b6a046 --- /dev/null +++ b/tests/data/csv/headerless_fewer_columns.csv @@ -0,0 +1,2 @@ +1,Alice,2024-01-01T00:00:00 +2,Bob,2024-01-01T00:00:02 diff --git a/tests/data/csv/headerless_skip_bad_records.csv b/tests/data/csv/headerless_skip_bad_records.csv new file mode 100644 index 0000000000..e64eb15232 --- /dev/null +++ b/tests/data/csv/headerless_skip_bad_records.csv @@ -0,0 +1,3 @@ +1,Alice,10.5,2024-01-01T00:00:00 +bad,Bad,20.0,2024-01-01T00:00:01 +2,Bob,30.5,2024-01-01T00:00:02