mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat: ignoring time zone info when import from external files (#1341)
* feat: ignore timezone info when copy from external files * chore: rebase onto develop
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2441,6 +2441,7 @@ name = "datatypes"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
"common-base",
|
||||
"common-error",
|
||||
|
||||
@@ -423,11 +423,13 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"File Schema mismatch, expected table schema: {} but found :{}",
|
||||
"File schema mismatch at index {}, expected table schema: {} but found: {}",
|
||||
index,
|
||||
table_schema,
|
||||
file_schema
|
||||
))]
|
||||
InvalidSchema {
|
||||
index: usize,
|
||||
table_schema: String,
|
||||
file_schema: String,
|
||||
},
|
||||
|
||||
@@ -23,10 +23,11 @@ use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::error::DataTypesSnafu;
|
||||
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
|
||||
use datatypes::arrow::datatypes::{DataType, SchemaRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use futures_util::StreamExt;
|
||||
use regex::Regex;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use snafu::ResultExt;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::{CopyTableRequest, InsertRequest};
|
||||
use tokio::io::BufReader;
|
||||
@@ -88,13 +89,7 @@ impl SqlHandler {
|
||||
.await
|
||||
.context(error::ReadParquetSnafu)?;
|
||||
|
||||
ensure!(
|
||||
builder.schema() == table.schema().arrow_schema(),
|
||||
error::InvalidSchemaSnafu {
|
||||
table_schema: table.schema().arrow_schema().to_string(),
|
||||
file_schema: (*(builder.schema())).to_string()
|
||||
}
|
||||
);
|
||||
ensure_schema_matches_ignore_timezone(builder.schema(), table.schema().arrow_schema())?;
|
||||
|
||||
let mut stream = builder
|
||||
.build()
|
||||
@@ -159,3 +154,137 @@ async fn batch_insert(
|
||||
*pending_bytes = 0;
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn ensure_schema_matches_ignore_timezone(left: &SchemaRef, right: &SchemaRef) -> Result<()> {
|
||||
let not_match = left
|
||||
.fields
|
||||
.iter()
|
||||
.zip(right.fields.iter())
|
||||
.map(|(l, r)| (l.data_type(), r.data_type()))
|
||||
.enumerate()
|
||||
.find(|(_, (l, r))| !data_type_equals_ignore_timezone(l, r));
|
||||
|
||||
if let Some((index, _)) = not_match {
|
||||
error::InvalidSchemaSnafu {
|
||||
index,
|
||||
table_schema: left.to_string(),
|
||||
file_schema: right.to_string(),
|
||||
}
|
||||
.fail()
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn data_type_equals_ignore_timezone(l: &DataType, r: &DataType) -> bool {
|
||||
match (l, r) {
|
||||
(DataType::List(a), DataType::List(b))
|
||||
| (DataType::LargeList(a), DataType::LargeList(b)) => {
|
||||
a.is_nullable() == b.is_nullable()
|
||||
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
|
||||
}
|
||||
(DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => {
|
||||
a_size == b_size
|
||||
&& a.is_nullable() == b.is_nullable()
|
||||
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
|
||||
}
|
||||
(DataType::Struct(a), DataType::Struct(b)) => {
|
||||
a.len() == b.len()
|
||||
&& a.iter().zip(b).all(|(a, b)| {
|
||||
a.is_nullable() == b.is_nullable()
|
||||
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
|
||||
})
|
||||
}
|
||||
(DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => {
|
||||
a_field == b_field && a_is_sorted == b_is_sorted
|
||||
}
|
||||
(DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => l_unit == r_unit,
|
||||
_ => l == r,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::arrow::datatypes::{Field, Schema};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) {
|
||||
let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)]));
|
||||
let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)]));
|
||||
let res = ensure_schema_matches_ignore_timezone(&s1, &s2);
|
||||
assert_eq!(matches, res.is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ensure_datatype_matches_ignore_timezone() {
|
||||
test_schema_matches(
|
||||
(
|
||||
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
|
||||
true,
|
||||
),
|
||||
(
|
||||
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
|
||||
true,
|
||||
),
|
||||
true,
|
||||
);
|
||||
|
||||
test_schema_matches(
|
||||
(
|
||||
DataType::Timestamp(
|
||||
datatypes::arrow::datatypes::TimeUnit::Second,
|
||||
Some("UTC".to_string()),
|
||||
),
|
||||
true,
|
||||
),
|
||||
(
|
||||
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
|
||||
true,
|
||||
),
|
||||
true,
|
||||
);
|
||||
|
||||
test_schema_matches(
|
||||
(
|
||||
DataType::Timestamp(
|
||||
datatypes::arrow::datatypes::TimeUnit::Second,
|
||||
Some("UTC".to_string()),
|
||||
),
|
||||
true,
|
||||
),
|
||||
(
|
||||
DataType::Timestamp(
|
||||
datatypes::arrow::datatypes::TimeUnit::Second,
|
||||
Some("PDT".to_string()),
|
||||
),
|
||||
true,
|
||||
),
|
||||
true,
|
||||
);
|
||||
|
||||
test_schema_matches(
|
||||
(
|
||||
DataType::Timestamp(
|
||||
datatypes::arrow::datatypes::TimeUnit::Second,
|
||||
Some("UTC".to_string()),
|
||||
),
|
||||
true,
|
||||
),
|
||||
(
|
||||
DataType::Timestamp(
|
||||
datatypes::arrow::datatypes::TimeUnit::Millisecond,
|
||||
Some("UTC".to_string()),
|
||||
),
|
||||
true,
|
||||
),
|
||||
false,
|
||||
);
|
||||
|
||||
test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
|
||||
|
||||
test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ test = []
|
||||
|
||||
[dependencies]
|
||||
arrow.workspace = true
|
||||
arrow-array = "36"
|
||||
arrow-schema.workspace = true
|
||||
common-base = { path = "../common/base" }
|
||||
common-error = { path = "../common/error" }
|
||||
|
||||
@@ -238,16 +238,18 @@ impl Helper {
|
||||
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?),
|
||||
ArrowDataType::Timestamp(unit, _) => match unit {
|
||||
TimeUnit::Second => Arc::new(TimestampSecondVector::try_from_arrow_array(array)?),
|
||||
TimeUnit::Millisecond => {
|
||||
Arc::new(TimestampMillisecondVector::try_from_arrow_array(array)?)
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
Arc::new(TimestampMicrosecondVector::try_from_arrow_array(array)?)
|
||||
}
|
||||
TimeUnit::Nanosecond => {
|
||||
Arc::new(TimestampNanosecondVector::try_from_arrow_array(array)?)
|
||||
}
|
||||
TimeUnit::Second => Arc::new(
|
||||
TimestampSecondVector::try_from_arrow_timestamp_array(array)?,
|
||||
),
|
||||
TimeUnit::Millisecond => Arc::new(
|
||||
TimestampMillisecondVector::try_from_arrow_timestamp_array(array)?,
|
||||
),
|
||||
TimeUnit::Microsecond => Arc::new(
|
||||
TimestampMicrosecondVector::try_from_arrow_timestamp_array(array)?,
|
||||
),
|
||||
TimeUnit::Nanosecond => Arc::new(
|
||||
TimestampNanosecondVector::try_from_arrow_timestamp_array(array)?,
|
||||
),
|
||||
},
|
||||
ArrowDataType::Float16
|
||||
| ArrowDataType::Time32(_)
|
||||
|
||||
@@ -18,7 +18,10 @@ use std::sync::Arc;
|
||||
|
||||
use arrow::array::{
|
||||
Array, ArrayBuilder, ArrayData, ArrayIter, ArrayRef, PrimitiveArray, PrimitiveBuilder,
|
||||
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
|
||||
TimestampSecondArray,
|
||||
};
|
||||
use arrow_schema::DataType;
|
||||
use serde_json::Value as JsonValue;
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -70,6 +73,48 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
|
||||
Ok(Self::new(concrete_array))
|
||||
}
|
||||
|
||||
/// Converts arrow timestamp array to vectors, ignoring time zone info.
|
||||
pub fn try_from_arrow_timestamp_array(array: impl AsRef<dyn Array>) -> Result<Self> {
|
||||
let array = array.as_ref();
|
||||
let array_data = match array.data_type() {
|
||||
DataType::Timestamp(unit, _) => match unit {
|
||||
arrow_schema::TimeUnit::Second => array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampSecondArray>()
|
||||
.unwrap()
|
||||
.with_timezone_opt(None)
|
||||
.data()
|
||||
.clone(),
|
||||
arrow_schema::TimeUnit::Millisecond => array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap()
|
||||
.with_timezone_opt(None)
|
||||
.data()
|
||||
.clone(),
|
||||
arrow_schema::TimeUnit::Microsecond => array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMicrosecondArray>()
|
||||
.unwrap()
|
||||
.with_timezone_opt(None)
|
||||
.data()
|
||||
.clone(),
|
||||
arrow_schema::TimeUnit::Nanosecond => array
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampNanosecondArray>()
|
||||
.unwrap()
|
||||
.with_timezone_opt(None)
|
||||
.data()
|
||||
.clone(),
|
||||
},
|
||||
_ => {
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
let concrete_array = PrimitiveArray::<T::ArrowPrimitive>::from(array_data);
|
||||
Ok(Self::new(concrete_array))
|
||||
}
|
||||
|
||||
pub fn from_slice<P: AsRef<[T::Native]>>(slice: P) -> Self {
|
||||
let iter = slice.as_ref().iter().copied();
|
||||
Self {
|
||||
|
||||
Reference in New Issue
Block a user