diff --git a/src/common/datasource/tests/csv/type_cast.csv b/src/common/datasource/tests/csv/type_cast.csv new file mode 100644 index 0000000000..7bdf73c766 --- /dev/null +++ b/src/common/datasource/tests/csv/type_cast.csv @@ -0,0 +1,6 @@ +hostname,environment,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice,ts +host_0,test,32,58,36,72,61,21,53,12,59,72,2023-04-01T00:00:00+00:00 +host_1,staging,12,32,50,84,19,73,38,37,72,2,2023-04-01T00:00:00+00:00 +host_2,test,98,5,40,95,64,39,21,63,53,94,2023-04-01T00:00:00+00:00 +host_3,test,98,95,7,48,99,67,14,86,36,23,2023-04-01T00:00:00+00:00 +host_4,test,32,44,11,53,64,9,17,39,20,7,2023-04-01T00:00:00+00:00 diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 9ad4b7b06e..8f590b5109 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -32,7 +32,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datatypes::arrow::datatypes::{DataType, Schema, SchemaRef}; +use datatypes::arrow::compute::can_cast_types; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; use datatypes::vectors::Helper; use futures_util::StreamExt; use object_store::{Entry, EntryMode, Metakey, ObjectStore}; @@ -245,11 +246,7 @@ impl StatementExecutor { .context(error::ProjectSchemaSnafu)?, ); - ensure_schema_matches_ignore_timezone( - &projected_file_schema, - &projected_table_schema, - true, - )?; + ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?; files.push(( Arc::new(compat_schema), @@ -336,24 +333,20 @@ async fn batch_insert( Ok(res) } -fn ensure_schema_matches_ignore_timezone( - left: &SchemaRef, - right: &SchemaRef, - ts_cast: bool, -) -> Result<()> { - let not_match = left +fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> { + let not_match = from .fields .iter() - .zip(right.fields.iter()) + .zip(to.fields.iter()) .map(|(l, r)| (l.data_type(), r.data_type())) .enumerate() - .find(|(_, (l, r))| !data_type_equals_ignore_timezone_with_options(l, r, ts_cast)); + .find(|(_, (l, r))| !can_cast_types(l, r)); if let Some((index, _)) = not_match { error::InvalidSchemaSnafu { index, - table_schema: left.to_string(), - file_schema: right.to_string(), + table_schema: to.to_string(), + file_schema: from.to_string(), } .fail() } else { @@ -361,53 +354,6 @@ fn ensure_schema_matches_ignore_timezone( } } -fn data_type_equals_ignore_timezone_with_options( - l: &DataType, - r: &DataType, - ts_cast: bool, -) -> bool { - match (l, r) { - (DataType::List(a), DataType::List(b)) - | (DataType::LargeList(a), DataType::LargeList(b)) => { - a.is_nullable() == b.is_nullable() - && data_type_equals_ignore_timezone_with_options( - a.data_type(), - b.data_type(), - ts_cast, - ) - } - (DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => { - a_size == b_size - && a.is_nullable() == b.is_nullable() - && data_type_equals_ignore_timezone_with_options( - a.data_type(), - b.data_type(), - ts_cast, - ) - } - (DataType::Struct(a), DataType::Struct(b)) => { - a.len() == b.len() - && a.iter().zip(b).all(|(a, b)| { - a.is_nullable() == b.is_nullable() - && data_type_equals_ignore_timezone_with_options( - a.data_type(), - b.data_type(), - ts_cast, - ) - }) - } - (DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => { - a_field == b_field && a_is_sorted == b_is_sorted - } - (DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => { - l_unit == r_unit || ts_cast - } - (&DataType::Utf8, DataType::Timestamp(_, _)) - | (DataType::Timestamp(_, _), &DataType::Utf8) => ts_cast, - _ => l == r, - } -} - /// Allows the file schema is a subset of table fn generated_schema_projection_and_compatible_file_schema( file: &SchemaRef, @@ -437,24 +383,23 @@ fn generated_schema_projection_and_compatible_file_schema( mod tests { use std::sync::Arc; - use datatypes::arrow::datatypes::{Field, Schema}; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; use super::*; - fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) { - test_schema_matches_with_options(l, r, false, matches) - } - - fn test_schema_matches_with_options( - l: (DataType, bool), - r: (DataType, bool), - ts_cast: bool, - matches: bool, - ) { - let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)])); - let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)])); - let res = ensure_schema_matches_ignore_timezone(&s1, &s2, ts_cast); - assert_eq!(matches, res.is_ok()) + fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) { + let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)])); + let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)])); + let res = ensure_schema_compatible(&s1, &s2); + assert_eq!( + matches, + res.is_ok(), + "from data type: {}, to data type: {}, expected: {}, but got: {}", + from.0, + to.0, + matches, + res.is_ok() + ) } #[test] @@ -519,17 +464,17 @@ mod tests { ), true, ), - false, + true, ); test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true); - test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false); + test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true); } #[test] fn test_data_type_equals_ignore_timezone_with_options() { - test_schema_matches_with_options( + test_schema_matches( ( DataType::Timestamp( datatypes::arrow::datatypes::TimeUnit::Microsecond, @@ -545,10 +490,9 @@ mod tests { true, ), true, - true, ); - test_schema_matches_with_options( + test_schema_matches( (DataType::Utf8, true), ( DataType::Timestamp( @@ -558,10 +502,9 @@ mod tests { true, ), true, - true, ); - test_schema_matches_with_options( + test_schema_matches( ( DataType::Timestamp( datatypes::arrow::datatypes::TimeUnit::Millisecond, @@ -571,7 +514,6 @@ mod tests { ), (DataType::Utf8, true), true, - true, ); } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index b208a3d4fb..e514c6fa53 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -1233,6 +1233,44 @@ async fn test_execute_copy_from_s3(instance: Arc) { } } +#[apply(both_instances_cases)] +async fn test_cast_type_issue_1594(instance: Arc) { + let instance = instance.frontend(); + + // setups + execute_sql( + &instance, + "create table tsbs_cpu(hostname STRING, environment STRING, usage_user DOUBLE, usage_system DOUBLE, usage_idle DOUBLE, usage_nice DOUBLE, usage_iowait DOUBLE, usage_irq DOUBLE, usage_softirq DOUBLE, usage_steal DOUBLE, usage_guest DOUBLE, usage_guest_nice DOUBLE, ts TIMESTAMP TIME INDEX, PRIMARY KEY(hostname));", + ) + .await; + let filepath = get_data_dir("../src/common/datasource/tests/csv/type_cast.csv") + .canonicalize() + .unwrap() + .display() + .to_string(); + + let output = execute_sql( + &instance, + &format!("copy tsbs_cpu from '{}' WITH(FORMAT='csv');", &filepath), + ) + .await; + + assert!(matches!(output, Output::AffectedRows(5))); + + let output = execute_sql(&instance, "select * from tsbs_cpu order by hostname;").await; + let expected = "\ ++----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+ +| hostname | environment | usage_user | usage_system | usage_idle | usage_nice | usage_iowait | usage_irq | usage_softirq | usage_steal | usage_guest | usage_guest_nice | ts | ++----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+ +| host_0 | test | 32.0 | 58.0 | 36.0 | 72.0 | 61.0 | 21.0 | 53.0 | 12.0 | 59.0 | 72.0 | 2023-04-01T00:00:00 | +| host_1 | staging | 12.0 | 32.0 | 50.0 | 84.0 | 19.0 | 73.0 | 38.0 | 37.0 | 72.0 | 2.0 | 2023-04-01T00:00:00 | +| host_2 | test | 98.0 | 5.0 | 40.0 | 95.0 | 64.0 | 39.0 | 21.0 | 63.0 | 53.0 | 94.0 | 2023-04-01T00:00:00 | +| host_3 | test | 98.0 | 95.0 | 7.0 | 48.0 | 99.0 | 67.0 | 14.0 | 86.0 | 36.0 | 23.0 | 2023-04-01T00:00:00 | +| host_4 | test | 32.0 | 44.0 | 11.0 | 53.0 | 64.0 | 9.0 | 17.0 | 39.0 | 20.0 | 7.0 | 2023-04-01T00:00:00 | ++----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+"; + check_output_stream(output, expected).await; +} + #[apply(both_instances_cases)] async fn test_information_schema_dot_tables(instance: Arc) { let is_distributed_mode = instance.is_distributed_mode();