feat: support headerless CSV copy from (#8233)

* feat: support headerless CSV copy from

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* fix: update csv copy sqlness result

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* test: cover headerless CSV copy from

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

* test: cover headerless CSV column count mismatch

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>

---------

Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
This commit is contained in:
QuakeWang
2026-06-04 20:04:15 +08:00
committed by GitHub
parent 52b51c31fc
commit 6ed67167bb
9 changed files with 421 additions and 25 deletions

View File

@@ -60,6 +60,7 @@ use crate::share_buffer::SharedBuffer;
pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
pub const FORMAT_DELIMITER: &str = "delimiter";
pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
pub const FORMAT_HEADERS: &str = "headers";
pub const FORMAT_HAS_HEADER: &str = "has_header";
pub const FORMAT_SKIP_BAD_RECORDS: &str = "skip_bad_records";
pub const FORMAT_TYPE: &str = "format";

View File

@@ -87,9 +87,26 @@ impl TryFrom<&HashMap<String, String>> for CsvFormat {
.build()
})?);
};
if let Some(has_header) = value.get(file_format::FORMAT_HAS_HEADER) {
format.has_header = parse_bool(file_format::FORMAT_HAS_HEADER, has_header)?;
};
let headers = value
.get(file_format::FORMAT_HEADERS)
.map(|headers| parse_bool(file_format::FORMAT_HEADERS, headers))
.transpose()?;
let has_header = value
.get(file_format::FORMAT_HAS_HEADER)
.map(|has_header| parse_bool(file_format::FORMAT_HAS_HEADER, has_header))
.transpose()?;
match (headers, has_header) {
(Some(headers), Some(has_header)) if headers != has_header => {
return error::ParseFormatSnafu {
key: file_format::FORMAT_HEADERS,
value: format!("headers={headers}, has_header={has_header}"),
}
.fail();
}
(Some(headers), _) => format.has_header = headers,
(_, Some(has_header)) => format.has_header = has_header,
_ => {}
}
if let Some(skip_bad_records) = value.get(file_format::FORMAT_SKIP_BAD_RECORDS) {
format.skip_bad_records =
parse_bool(file_format::FORMAT_SKIP_BAD_RECORDS, skip_bad_records)?;
@@ -347,7 +364,7 @@ mod tests {
use super::*;
use crate::file_format::{
FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER,
FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER, FORMAT_HEADERS,
FORMAT_SCHEMA_INFER_MAX_RECORD, FORMAT_SKIP_BAD_RECORDS, FileFormat, file_to_stream,
};
use crate::test_util::{format_schema, test_store};
@@ -491,12 +508,51 @@ mod tests {
..CsvFormat::default()
}
);
let map = HashMap::from([(FORMAT_HEADERS.to_string(), "true".to_string())]);
let format = CsvFormat::try_from(&map).unwrap();
assert_eq!(format, CsvFormat::default());
let map = HashMap::from([(FORMAT_HEADERS.to_string(), "false".to_string())]);
let format = CsvFormat::try_from(&map).unwrap();
assert_eq!(
format,
CsvFormat {
has_header: false,
..CsvFormat::default()
}
);
let map = HashMap::from([
(FORMAT_HEADERS.to_string(), "false".to_string()),
(FORMAT_HAS_HEADER.to_string(), "false".to_string()),
]);
let format = CsvFormat::try_from(&map).unwrap();
assert_eq!(
format,
CsvFormat {
has_header: false,
..CsvFormat::default()
}
);
}
#[test]
fn test_try_from_rejects_invalid_bool_options() {
let map = HashMap::from([(FORMAT_SKIP_BAD_RECORDS.to_string(), "yes".to_string())]);
assert!(CsvFormat::try_from(&map).is_err());
let map = HashMap::from([(FORMAT_HEADERS.to_string(), "yes".to_string())]);
assert!(CsvFormat::try_from(&map).is_err());
}
#[test]
fn test_try_from_rejects_conflicting_header_options() {
let map = HashMap::from([
(FORMAT_HEADERS.to_string(), "false".to_string()),
(FORMAT_HAS_HEADER.to_string(), "true".to_string()),
]);
assert!(CsvFormat::try_from(&map).is_err());
}
#[tokio::test]

View File

@@ -392,24 +392,23 @@ impl StatementExecutor {
.collect_metadata(&object_store, format.clone(), path.to_string())
.await?;
let file_schema = file_metadata.schema();
let (file_schema_projection, table_schema_projection, compat_schema) =
generated_schema_projection_and_compatible_file_schema(file_schema, &table_schema);
let schema_mapping = copy_from_schema_mapping(&file_metadata, &table_schema);
let projected_file_schema = Arc::new(
file_schema
.project(&file_schema_projection)
file_metadata
.schema()
.project(&schema_mapping.file_projection)
.context(error::ProjectSchemaSnafu)?,
);
let projected_table_schema = Arc::new(
table_schema
.project(&table_schema_projection)
.project(&schema_mapping.table_projection)
.context(error::ProjectSchemaSnafu)?,
);
ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
files.push((
Arc::new(compat_schema),
file_schema_projection,
Arc::new(schema_mapping.compat_file_schema),
schema_mapping.file_projection,
projected_table_schema,
file_metadata,
))
@@ -581,18 +580,11 @@ fn csv_reader_schema_for_skip_bad_records(file: &SchemaRef, compat: &SchemaRef)
.fields()
.iter()
.enumerate()
.map(|(idx, file_field)| {
let compat_field = compat
.fields()
.find(file_field.name())
.map(|(_, field)| field);
match compat_field {
Some(compat_field) if can_csv_reader_parse_type(compat_field.data_type()) => {
compat_field.clone()
}
_ => file.fields()[idx].clone(),
.map(|(idx, file_field)| match compat.fields().get(idx) {
Some(compat_field) if can_csv_reader_parse_type(compat_field.data_type()) => {
compat_field.clone()
}
_ => file_field.clone(),
})
.collect::<Vec<_>>();
@@ -678,6 +670,62 @@ fn generated_schema_projection_and_compatible_file_schema(
)
}
struct CopyFromSchemaMapping {
file_projection: Vec<usize>,
table_projection: Vec<usize>,
compat_file_schema: Schema,
}
fn copy_from_schema_mapping(
file_metadata: &FileMetadata,
table: &SchemaRef,
) -> CopyFromSchemaMapping {
match file_metadata {
FileMetadata::Csv { schema, format, .. } if !format.has_header => {
generated_positional_schema_projection_and_compatible_file_schema(schema, table)
}
_ => {
let (file_projection, table_projection, compat_file_schema) =
generated_schema_projection_and_compatible_file_schema(
file_metadata.schema(),
table,
);
CopyFromSchemaMapping {
file_projection,
table_projection,
compat_file_schema,
}
}
}
}
fn generated_positional_schema_projection_and_compatible_file_schema(
file: &SchemaRef,
table: &SchemaRef,
) -> CopyFromSchemaMapping {
let len = file.fields.len().min(table.fields.len());
let file_projection = (0..len).collect::<Vec<_>>();
let table_projection = (0..len).collect::<Vec<_>>();
let compatible_fields = file
.fields
.iter()
.enumerate()
.map(|(idx, file_field)| {
if idx < len {
table.fields[idx].clone()
} else {
file_field.clone()
}
})
.collect::<Vec<_>>();
CopyFromSchemaMapping {
file_projection,
table_projection,
compat_file_schema: Schema::new(compatible_fields),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -938,4 +986,161 @@ mod tests {
compat_schema.field(2).data_type()
);
}
fn make_csv_metadata(schema: Arc<Schema>, has_header: bool) -> FileMetadata {
FileMetadata::Csv {
schema,
format: CsvFormat {
has_header,
..CsvFormat::default()
},
path: "test.csv".to_string(),
}
}
fn assert_field(schema: &Schema, idx: usize, name: &str, data_type: &DataType) {
let field = schema.field(idx);
assert_eq!(field.name(), name);
assert_eq!(field.data_type(), data_type);
}
#[test]
fn test_headerless_csv_schema_projection_is_positional() {
let file_schema = make_test_schema(&[
Field::new("column_1", DataType::UInt8, true),
Field::new("column_2", DataType::Float64, true),
Field::new("column_3", DataType::Utf8, true),
]);
let table_schema = make_test_schema(&[
Field::new("host_id", DataType::UInt32, true),
Field::new("reading_value", DataType::Float64, true),
Field::new(
"ts",
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
true,
),
]);
let mapping =
copy_from_schema_mapping(&make_csv_metadata(file_schema, false), &table_schema);
assert_eq!(mapping.file_projection, vec![0, 1, 2]);
assert_eq!(mapping.table_projection, vec![0, 1, 2]);
assert_field(&mapping.compat_file_schema, 0, "host_id", &DataType::UInt32);
assert_field(
&mapping.compat_file_schema,
1,
"reading_value",
&DataType::Float64,
);
assert_field(
&mapping.compat_file_schema,
2,
"ts",
table_schema.field(2).data_type(),
);
assert_eq!(
mapping
.compat_file_schema
.project(&mapping.file_projection)
.unwrap(),
table_schema.project(&mapping.table_projection).unwrap()
);
}
#[test]
fn test_headerless_csv_schema_projection_ignores_extra_file_columns() {
let file_schema = make_test_schema(&[
Field::new("column_1", DataType::UInt8, true),
Field::new("column_2", DataType::Float64, true),
Field::new("column_3", DataType::Utf8, true),
Field::new("column_4", DataType::Utf8, true),
]);
let table_schema = make_test_schema(&[
Field::new("host_id", DataType::UInt32, true),
Field::new("reading_value", DataType::Float64, true),
Field::new("ts", DataType::Utf8, true),
]);
let mapping =
copy_from_schema_mapping(&make_csv_metadata(file_schema, false), &table_schema);
assert_eq!(mapping.file_projection, vec![0, 1, 2]);
assert_eq!(mapping.table_projection, vec![0, 1, 2]);
assert_eq!(mapping.compat_file_schema.fields().len(), 4);
assert_field(&mapping.compat_file_schema, 0, "host_id", &DataType::UInt32);
assert_field(
&mapping.compat_file_schema,
1,
"reading_value",
&DataType::Float64,
);
assert_field(&mapping.compat_file_schema, 2, "ts", &DataType::Utf8);
assert_field(&mapping.compat_file_schema, 3, "column_4", &DataType::Utf8);
}
#[test]
fn test_headerless_csv_schema_projection_supports_prefix_import() {
let file_schema = make_test_schema(&[
Field::new("column_1", DataType::UInt8, true),
Field::new("column_2", DataType::Float64, true),
]);
let table_schema = make_test_schema(&[
Field::new("host_id", DataType::UInt32, true),
Field::new("reading_value", DataType::Float64, true),
Field::new("ts", DataType::Utf8, true),
]);
let mapping =
copy_from_schema_mapping(&make_csv_metadata(file_schema, false), &table_schema);
assert_eq!(mapping.file_projection, vec![0, 1]);
assert_eq!(mapping.table_projection, vec![0, 1]);
assert_field(&mapping.compat_file_schema, 0, "host_id", &DataType::UInt32);
assert_field(
&mapping.compat_file_schema,
1,
"reading_value",
&DataType::Float64,
);
assert_eq!(
mapping
.compat_file_schema
.project(&mapping.file_projection)
.unwrap(),
table_schema.project(&mapping.table_projection).unwrap()
);
}
#[test]
fn test_csv_reader_schema_for_skip_bad_records_uses_positional_mapping() {
let file_schema = make_test_schema(&[
Field::new("column_1", DataType::Utf8, true),
Field::new("column_2", DataType::Utf8, true),
Field::new("column_3", DataType::Utf8, true),
]);
let table_schema = make_test_schema(&[
Field::new("host_id", DataType::UInt32, true),
Field::new("jsons", DataType::Binary, true),
Field::new(
"ts",
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
true,
),
]);
let mapping = copy_from_schema_mapping(
&make_csv_metadata(file_schema.clone(), false),
&table_schema,
);
let compat_schema = Arc::new(mapping.compat_file_schema);
let reader_schema = csv_reader_schema_for_skip_bad_records(&file_schema, &compat_schema);
assert_eq!(reader_schema.field(0).data_type(), &DataType::UInt32);
assert_eq!(reader_schema.field(1).data_type(), &DataType::Utf8);
assert_eq!(
reader_schema.field(2).data_type(),
table_schema.field(2).data_type()
);
}
}

View File

@@ -403,8 +403,7 @@ mod tests {
#[test]
fn test_parse_copy_table_from_csv_options() {
let sql =
"COPY my_table FROM '/tmp/test.csv' WITH (FORMAT = 'CSV', SKIP_BAD_RECORDS = 'false')";
let sql = "COPY my_table FROM '/tmp/test.csv' WITH (FORMAT = 'CSV', SKIP_BAD_RECORDS = 'false', HEADERS = 'false')";
let mut result =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
@@ -418,6 +417,7 @@ mod tests {
))) => {
assert_eq!(copy_table.with.get("format"), Some("CSV"));
assert_eq!(copy_table.with.get("skip_bad_records"), Some("false"));
assert_eq!(copy_table.with.get("headers"), Some("false"));
}
_ => unreachable!(),
}

View File

@@ -965,6 +965,131 @@ async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>)
check_output_stream(output, expect).await;
}
#[apply(both_instances_cases)]
async fn test_execute_copy_from_headerless_csv(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let csv_path = find_testing_resource("/tests/data/csv/headerless.csv");
let skip_bad_records_csv_path =
find_testing_resource("/tests/data/csv/headerless_skip_bad_records.csv");
let extra_columns_csv_path =
find_testing_resource("/tests/data/csv/headerless_extra_columns.csv");
let fewer_columns_csv_path =
find_testing_resource("/tests/data/csv/headerless_fewer_columns.csv");
let output = execute_sql(
&instance,
"CREATE TABLE csv_headerless(host_id INT, host_name STRING, reading_value DOUBLE, ts TIMESTAMP TIME INDEX);",
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let output = execute_sql(
&instance,
&format!("COPY csv_headerless FROM '{csv_path}' WITH (FORMAT='csv', HEADERS='false');"),
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(&instance, "SELECT * FROM csv_headerless ORDER BY ts;")
.await
.data;
let expect = "\
+---------+-----------+---------------+---------------------+
| host_id | host_name | reading_value | ts |
+---------+-----------+---------------+---------------------+
| 1 | Alice | 10.5 | 2024-01-01T00:00:00 |
| 2 | Bob | 30.5 | 2024-01-01T00:00:02 |
+---------+-----------+---------------+---------------------+";
check_output_stream(output, expect).await;
let output = execute_sql(
&instance,
"CREATE TABLE csv_headerless_skip_bad_records(host_id INT, host_name STRING, reading_value DOUBLE, ts TIMESTAMP TIME INDEX);",
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let output = execute_sql(
&instance,
&format!(
"COPY csv_headerless_skip_bad_records FROM '{skip_bad_records_csv_path}' WITH (FORMAT='csv', HEADERS='false', SKIP_BAD_RECORDS='true');"
),
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(
&instance,
"SELECT * FROM csv_headerless_skip_bad_records ORDER BY ts;",
)
.await
.data;
check_output_stream(output, expect).await;
let output = execute_sql(
&instance,
"CREATE TABLE csv_headerless_extra_columns(host_id INT, host_name STRING, reading_value DOUBLE, ts TIMESTAMP TIME INDEX);",
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let output = execute_sql(
&instance,
&format!(
"COPY csv_headerless_extra_columns FROM '{extra_columns_csv_path}' WITH (FORMAT='csv', HEADERS='false');"
),
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(
&instance,
"SELECT * FROM csv_headerless_extra_columns ORDER BY ts;",
)
.await
.data;
check_output_stream(output, expect).await;
let output = execute_sql(
&instance,
"CREATE TABLE csv_headerless_fewer_columns(host_id INT, host_name STRING, ts TIMESTAMP TIME INDEX, reading_value DOUBLE DEFAULT 42.0);",
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let output = execute_sql(
&instance,
&format!(
"COPY csv_headerless_fewer_columns FROM '{fewer_columns_csv_path}' WITH (FORMAT='csv', HEADERS='false');"
),
)
.await
.data;
assert!(matches!(output, OutputData::AffectedRows(2)));
let output = execute_sql(
&instance,
"SELECT * FROM csv_headerless_fewer_columns ORDER BY ts;",
)
.await
.data;
let expect = "\
+---------+-----------+---------------------+---------------+
| host_id | host_name | ts | reading_value |
+---------+-----------+---------------------+---------------+
| 1 | Alice | 2024-01-01T00:00:00 | 42.0 |
| 2 | Bob | 2024-01-01T00:00:02 | 42.0 |
+---------+-----------+---------------------+---------------+";
check_output_stream(output, expect).await;
}
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_json(instance: Arc<dyn MockInstance>) {
unsafe {

View File

@@ -0,0 +1,2 @@
1,Alice,10.5,2024-01-01T00:00:00
2,Bob,30.5,2024-01-01T00:00:02
1 1 Alice 10.5 2024-01-01T00:00:00
2 2 Bob 30.5 2024-01-01T00:00:02

View File

@@ -0,0 +1,2 @@
1,Alice,10.5,2024-01-01T00:00:00,ignored
2,Bob,30.5,2024-01-01T00:00:02,ignored
1 1 Alice 10.5 2024-01-01T00:00:00 ignored
2 2 Bob 30.5 2024-01-01T00:00:02 ignored

View File

@@ -0,0 +1,2 @@
1,Alice,2024-01-01T00:00:00
2,Bob,2024-01-01T00:00:02
1 1 Alice 2024-01-01T00:00:00
2 2 Bob 2024-01-01T00:00:02

View File

@@ -0,0 +1,3 @@
1,Alice,10.5,2024-01-01T00:00:00
bad,Bad,20.0,2024-01-01T00:00:01
2,Bob,30.5,2024-01-01T00:00:02
1 1 Alice 10.5 2024-01-01T00:00:00
2 bad Bad 20.0 2024-01-01T00:00:01
3 2 Bob 30.5 2024-01-01T00:00:02