fix: fix type casting issue (#1652)

* fix: fix type casting issue

* chore: apply suggestion from CR
This commit is contained in:
Weny Xu
2023-05-27 01:17:56 +09:00
committed by GitHub
parent 0da94930d5
commit 4094907c09
3 changed files with 72 additions and 86 deletions

View File

@@ -0,0 +1,6 @@
hostname,environment,usage_user,usage_system,usage_idle,usage_nice,usage_iowait,usage_irq,usage_softirq,usage_steal,usage_guest,usage_guest_nice,ts
host_0,test,32,58,36,72,61,21,53,12,59,72,2023-04-01T00:00:00+00:00
host_1,staging,12,32,50,84,19,73,38,37,72,2,2023-04-01T00:00:00+00:00
host_2,test,98,5,40,95,64,39,21,63,53,94,2023-04-01T00:00:00+00:00
host_3,test,98,95,7,48,99,67,14,86,36,23,2023-04-01T00:00:00+00:00
host_4,test,32,44,11,53,64,9,17,39,20,7,2023-04-01T00:00:00+00:00
1 hostname environment usage_user usage_system usage_idle usage_nice usage_iowait usage_irq usage_softirq usage_steal usage_guest usage_guest_nice ts
2 host_0 test 32 58 36 72 61 21 53 12 59 72 2023-04-01T00:00:00+00:00
3 host_1 staging 12 32 50 84 19 73 38 37 72 2 2023-04-01T00:00:00+00:00
4 host_2 test 98 5 40 95 64 39 21 63 53 94 2023-04-01T00:00:00+00:00
5 host_3 test 98 95 7 48 99 67 14 86 36 23 2023-04-01T00:00:00+00:00
6 host_4 test 32 44 11 53 64 9 17 39 20 7 2023-04-01T00:00:00+00:00

View File

@@ -32,7 +32,8 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::{DataType, Schema, SchemaRef};
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use futures_util::StreamExt;
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
@@ -245,11 +246,7 @@ impl StatementExecutor {
.context(error::ProjectSchemaSnafu)?,
);
ensure_schema_matches_ignore_timezone(
&projected_file_schema,
&projected_table_schema,
true,
)?;
ensure_schema_compatible(&projected_file_schema, &projected_table_schema)?;
files.push((
Arc::new(compat_schema),
@@ -336,24 +333,20 @@ async fn batch_insert(
Ok(res)
}
fn ensure_schema_matches_ignore_timezone(
left: &SchemaRef,
right: &SchemaRef,
ts_cast: bool,
) -> Result<()> {
let not_match = left
fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
let not_match = from
.fields
.iter()
.zip(right.fields.iter())
.zip(to.fields.iter())
.map(|(l, r)| (l.data_type(), r.data_type()))
.enumerate()
.find(|(_, (l, r))| !data_type_equals_ignore_timezone_with_options(l, r, ts_cast));
.find(|(_, (l, r))| !can_cast_types(l, r));
if let Some((index, _)) = not_match {
error::InvalidSchemaSnafu {
index,
table_schema: left.to_string(),
file_schema: right.to_string(),
table_schema: to.to_string(),
file_schema: from.to_string(),
}
.fail()
} else {
@@ -361,53 +354,6 @@ fn ensure_schema_matches_ignore_timezone(
}
}
fn data_type_equals_ignore_timezone_with_options(
l: &DataType,
r: &DataType,
ts_cast: bool,
) -> bool {
match (l, r) {
(DataType::List(a), DataType::List(b))
| (DataType::LargeList(a), DataType::LargeList(b)) => {
a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone_with_options(
a.data_type(),
b.data_type(),
ts_cast,
)
}
(DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => {
a_size == b_size
&& a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone_with_options(
a.data_type(),
b.data_type(),
ts_cast,
)
}
(DataType::Struct(a), DataType::Struct(b)) => {
a.len() == b.len()
&& a.iter().zip(b).all(|(a, b)| {
a.is_nullable() == b.is_nullable()
&& data_type_equals_ignore_timezone_with_options(
a.data_type(),
b.data_type(),
ts_cast,
)
})
}
(DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => {
a_field == b_field && a_is_sorted == b_is_sorted
}
(DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => {
l_unit == r_unit || ts_cast
}
(&DataType::Utf8, DataType::Timestamp(_, _))
| (DataType::Timestamp(_, _), &DataType::Utf8) => ts_cast,
_ => l == r,
}
}
/// Allows the file schema is a subset of table
fn generated_schema_projection_and_compatible_file_schema(
file: &SchemaRef,
@@ -437,24 +383,23 @@ fn generated_schema_projection_and_compatible_file_schema(
mod tests {
use std::sync::Arc;
use datatypes::arrow::datatypes::{Field, Schema};
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use super::*;
fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) {
test_schema_matches_with_options(l, r, false, matches)
}
fn test_schema_matches_with_options(
l: (DataType, bool),
r: (DataType, bool),
ts_cast: bool,
matches: bool,
) {
let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)]));
let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)]));
let res = ensure_schema_matches_ignore_timezone(&s1, &s2, ts_cast);
assert_eq!(matches, res.is_ok())
fn test_schema_matches(from: (DataType, bool), to: (DataType, bool), matches: bool) {
let s1 = Arc::new(Schema::new(vec![Field::new("col", from.0.clone(), from.1)]));
let s2 = Arc::new(Schema::new(vec![Field::new("col", to.0.clone(), to.1)]));
let res = ensure_schema_compatible(&s1, &s2);
assert_eq!(
matches,
res.is_ok(),
"from data type: {}, to data type: {}, expected: {}, but got: {}",
from.0,
to.0,
matches,
res.is_ok()
)
}
#[test]
@@ -519,17 +464,17 @@ mod tests {
),
true,
),
false,
true,
);
test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false);
test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true);
}
#[test]
fn test_data_type_equals_ignore_timezone_with_options() {
test_schema_matches_with_options(
test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Microsecond,
@@ -545,10 +490,9 @@ mod tests {
true,
),
true,
true,
);
test_schema_matches_with_options(
test_schema_matches(
(DataType::Utf8, true),
(
DataType::Timestamp(
@@ -558,10 +502,9 @@ mod tests {
true,
),
true,
true,
);
test_schema_matches_with_options(
test_schema_matches(
(
DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
@@ -571,7 +514,6 @@ mod tests {
),
(DataType::Utf8, true),
true,
true,
);
}

View File

@@ -1233,6 +1233,44 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
}
}
#[apply(both_instances_cases)]
async fn test_cast_type_issue_1594(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
// setups
execute_sql(
&instance,
"create table tsbs_cpu(hostname STRING, environment STRING, usage_user DOUBLE, usage_system DOUBLE, usage_idle DOUBLE, usage_nice DOUBLE, usage_iowait DOUBLE, usage_irq DOUBLE, usage_softirq DOUBLE, usage_steal DOUBLE, usage_guest DOUBLE, usage_guest_nice DOUBLE, ts TIMESTAMP TIME INDEX, PRIMARY KEY(hostname));",
)
.await;
let filepath = get_data_dir("../src/common/datasource/tests/csv/type_cast.csv")
.canonicalize()
.unwrap()
.display()
.to_string();
let output = execute_sql(
&instance,
&format!("copy tsbs_cpu from '{}' WITH(FORMAT='csv');", &filepath),
)
.await;
assert!(matches!(output, Output::AffectedRows(5)));
let output = execute_sql(&instance, "select * from tsbs_cpu order by hostname;").await;
let expected = "\
+----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+
| hostname | environment | usage_user | usage_system | usage_idle | usage_nice | usage_iowait | usage_irq | usage_softirq | usage_steal | usage_guest | usage_guest_nice | ts |
+----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+
| host_0 | test | 32.0 | 58.0 | 36.0 | 72.0 | 61.0 | 21.0 | 53.0 | 12.0 | 59.0 | 72.0 | 2023-04-01T00:00:00 |
| host_1 | staging | 12.0 | 32.0 | 50.0 | 84.0 | 19.0 | 73.0 | 38.0 | 37.0 | 72.0 | 2.0 | 2023-04-01T00:00:00 |
| host_2 | test | 98.0 | 5.0 | 40.0 | 95.0 | 64.0 | 39.0 | 21.0 | 63.0 | 53.0 | 94.0 | 2023-04-01T00:00:00 |
| host_3 | test | 98.0 | 95.0 | 7.0 | 48.0 | 99.0 | 67.0 | 14.0 | 86.0 | 36.0 | 23.0 | 2023-04-01T00:00:00 |
| host_4 | test | 32.0 | 44.0 | 11.0 | 53.0 | 64.0 | 9.0 | 17.0 | 39.0 | 20.0 | 7.0 | 2023-04-01T00:00:00 |
+----------+-------------+------------+--------------+------------+------------+--------------+-----------+---------------+-------------+-------------+------------------+---------------------+";
check_output_stream(output, expected).await;
}
#[apply(both_instances_cases)]
async fn test_information_schema_dot_tables(instance: Arc<dyn MockInstance>) {
let is_distributed_mode = instance.is_distributed_mode();