diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 3b305d90c5..d2d5963491 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -22,6 +22,7 @@ use common_time::timestamp::TimeUnit; use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp}; use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant}; use datatypes::prelude::{ConcreteDataType, ValueRef}; +use datatypes::types::json_type::JsonNativeType; use datatypes::types::{ IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType, }; @@ -127,7 +128,9 @@ impl From for ConcreteDataType { datatype: type_ext.datatype(), datatype_ext: type_ext.datatype_extension.clone().map(|d| *d), }; - ConcreteDataType::json_native_datatype(inner_type.into()) + ConcreteDataType::json2(JsonNativeType::from(&ConcreteDataType::from( + inner_type, + ))) } None => ConcreteDataType::Json(JsonType::null()), _ => { @@ -448,7 +451,8 @@ impl TryFrom for ColumnDataTypeWrapper { if native_type.is_null() { None } else { - let native_type = ConcreteDataType::from(native_type.as_ref()); + let native_type = + ConcreteDataType::from_arrow_type(&native_type.as_arrow_type()); let (datatype, datatype_extension) = ColumnDataTypeWrapper::try_from(native_type)?.into_parts(); Some(ColumnDataTypeExtension { @@ -1157,6 +1161,7 @@ mod tests { use common_time::interval::IntervalUnit; use datatypes::scalars::ScalarVector; + use datatypes::types::json_type::JsonObjectType; use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type}; use datatypes::value::{ListValue, StructValue}; use datatypes::vectors::{ @@ -1393,9 +1398,12 @@ mod tests { .into() ); assert_eq!( - ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype( - struct_type.clone() - )), + ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([ + ("address".to_string(), JsonNativeType::String), + ("age".to_string(), JsonNativeType::i64()), + ("id".to_string(), JsonNativeType::i64()), + ("name".to_string(), JsonNativeType::String), + ]))), ColumnDataTypeWrapper::new( ColumnDataType::Json, Some(ColumnDataTypeExtension { @@ -1579,20 +1587,6 @@ mod tests { ]))).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })") ); - let struct_type = StructType::new(Arc::new(vec![ - StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true), - StructField::new( - "name".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true), - StructField::new( - "address".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - ])); assert_eq!( ColumnDataTypeWrapper::new( ColumnDataType::Json, @@ -1636,9 +1630,12 @@ mod tests { }))) }) ), - ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype( - struct_type.clone() - )) + ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([ + ("address".to_string(), JsonNativeType::String), + ("age".to_string(), JsonNativeType::i64()), + ("id".to_string(), JsonNativeType::i64()), + ("name".to_string(), JsonNativeType::String), + ]))) .try_into() .expect("failed to convert json type") ); diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 4289714afd..413d89fb78 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul Ok(RecordBatch::from_df_record_batch(schema, record_batch)) } -fn maybe_align_json_array_with_schema( +pub fn maybe_align_json_array_with_schema( schema: &ArrowSchemaRef, arrays: Vec, ) -> Result> { diff --git a/src/datatypes/src/data_type.rs b/src/datatypes/src/data_type.rs index 56bc0ad249..0d098f3a78 100644 --- a/src/datatypes/src/data_type.rs +++ b/src/datatypes/src/data_type.rs @@ -28,6 +28,7 @@ use serde::{Deserialize, Serialize}; use crate::error::{self, Error, Result}; use crate::type_id::LogicalTypeId; +use crate::types::json_type::JsonNativeType; use crate::types::{ BinaryType, BooleanType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type, @@ -475,7 +476,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType { ArrowDataType::Decimal128(precision, scale) => { ConcreteDataType::decimal128_datatype(*precision, *scale) } - ArrowDataType::Struct(fields) => ConcreteDataType::Struct(fields.try_into()?), + ArrowDataType::Struct(fields) => ConcreteDataType::Struct(StructType::from(fields)), ArrowDataType::Float16 | ArrowDataType::Date64 | ArrowDataType::FixedSizeBinary(_) @@ -683,8 +684,8 @@ impl ConcreteDataType { Self::vector_datatype(0) } - pub fn json_native_datatype(inner_type: ConcreteDataType) -> ConcreteDataType { - ConcreteDataType::Json(JsonType::new_json2((&inner_type).into())) + pub fn json2(native_type: JsonNativeType) -> ConcreteDataType { + ConcreteDataType::Json(JsonType::new_json2(native_type)) } } diff --git a/src/datatypes/src/json/value.rs b/src/datatypes/src/json/value.rs index e2422ca78d..f3b652a549 100644 --- a/src/datatypes/src/json/value.rs +++ b/src/datatypes/src/json/value.rs @@ -394,22 +394,25 @@ impl JsonValue { }, JsonVariant::String(x) => Value::String(x.into()), JsonVariant::Array(array) => { - let item_type = if let Some(first) = array.first() { - first.native_type() - } else { - JsonNativeType::Null - }; - Value::List(ListValue::new( - array.into_iter().map(helper).collect(), - Arc::new((&item_type).into()), - )) + let values = array.into_iter().map(helper).collect::>(); + debug_assert!( + values + .windows(2) + .all(|w| w[0].data_type() == w[1].data_type()) + ); + let item_type = values + .first() + .map(|x| x.data_type()) + .unwrap_or_else(ConcreteDataType::null_datatype); + Value::List(ListValue::new(values, Arc::new(item_type))) } JsonVariant::Object(object) => { let mut fields = Vec::with_capacity(object.len()); let mut items = Vec::with_capacity(object.len()); for (k, v) in object { - fields.push(StructField::new(k, (&v.native_type()).into(), true)); - items.push(helper(v)); + let v = helper(v); + fields.push(StructField::new(k, v.data_type(), true)); + items.push(v); } Value::Struct(StructValue::new(items, StructType::new(Arc::new(fields)))) } diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index cd479ecd37..362357c5e6 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -18,6 +18,7 @@ use std::str::FromStr; use std::sync::{Arc, LazyLock}; use arrow::datatypes::DataType as ArrowDataType; +use arrow_schema::{Field, Fields}; use common_base::bytes::Bytes; use regex::{Captures, Regex}; use serde::{Deserialize, Serialize}; @@ -32,7 +33,7 @@ use crate::error::{ use crate::prelude::ConcreteDataType; use crate::scalars::ScalarVectorBuilder; use crate::type_id::LogicalTypeId; -use crate::types::{ListType, StructField, StructType}; +use crate::types::{StructField, StructType}; use crate::value::Value; use crate::vectors::json::builder::JsonVectorBuilder; use crate::vectors::{BinaryVectorBuilder, MutableVector}; @@ -117,30 +118,28 @@ impl JsonNativeType { _ => JsonNativeType::Variant, }; } -} -impl From<&JsonNativeType> for ConcreteDataType { - fn from(value: &JsonNativeType) -> Self { - match value { - JsonNativeType::Null => ConcreteDataType::null_datatype(), - JsonNativeType::Bool => ConcreteDataType::boolean_datatype(), - JsonNativeType::Number(JsonNumberType::U64) => ConcreteDataType::uint64_datatype(), - JsonNativeType::Number(JsonNumberType::I64) => ConcreteDataType::int64_datatype(), - JsonNativeType::Number(JsonNumberType::F64) => ConcreteDataType::float64_datatype(), - JsonNativeType::String => ConcreteDataType::string_datatype(), - JsonNativeType::Array(item_type) => { - ConcreteDataType::List(ListType::new(Arc::new(item_type.as_ref().into()))) + pub fn as_arrow_type(&self) -> ArrowDataType { + match self { + JsonNativeType::Null => ArrowDataType::Null, + JsonNativeType::Bool => ArrowDataType::Boolean, + JsonNativeType::Number(n) => match n { + JsonNumberType::U64 => ArrowDataType::UInt64, + JsonNumberType::I64 => ArrowDataType::Int64, + JsonNumberType::F64 => ArrowDataType::Float64, + }, + JsonNativeType::String => ArrowDataType::Utf8, + JsonNativeType::Array(array) => { + ArrowDataType::List(Arc::new(Field::new("item", array.as_arrow_type(), true))) } JsonNativeType::Object(object) => { let fields = object .iter() - .map(|(type_name, field_type)| { - StructField::new(type_name.clone(), field_type.into(), true) - }) - .collect(); - ConcreteDataType::Struct(StructType::new(Arc::new(fields))) + .map(|(k, v)| Arc::new(Field::new(k, v.as_arrow_type(), true))) + .collect::>(); + ArrowDataType::Struct(Fields::from(fields)) } - JsonNativeType::Variant => ConcreteDataType::binary_datatype(), + JsonNativeType::Variant => ArrowDataType::Binary, } } } @@ -304,9 +303,10 @@ impl JsonType { pub(crate) fn as_struct_type(&self) -> StructType { match &self.format { JsonFormat::Jsonb => StructType::default(), - JsonFormat::Json2(inner) => match ConcreteDataType::from(inner.as_ref()) { - ConcreteDataType::Struct(t) => t.clone(), - x => plain_json_struct_type(x), + JsonFormat::Json2(native_type) => match native_type.as_arrow_type() { + // TODO(LFC): Direct use Arrow's Struct datatype here. + ArrowDataType::Struct(fields) => StructType::from(&fields), + data_type => plain_json_struct_type(&data_type), }, } } @@ -370,8 +370,12 @@ fn is_include(this: &JsonNativeType, that: &JsonNativeType) -> bool { /// A special struct type for denoting "plain"(not object) json value. It has only one field, with /// fixed name [JSON_PLAIN_FIELD_NAME] and with metadata [JSON_PLAIN_FIELD_METADATA_KEY] = `"true"`. -pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType { - let mut field = StructField::new(JSON_PLAIN_FIELD_NAME.to_string(), item_type, true); +fn plain_json_struct_type(data_type: &ArrowDataType) -> StructType { + let mut field = StructField::new( + JSON_PLAIN_FIELD_NAME.to_string(), + ConcreteDataType::from_arrow_type(data_type), + true, + ); field.insert_metadata(JSON_PLAIN_FIELD_METADATA_KEY, true); StructType::new(Arc::new(vec![field])) } diff --git a/src/datatypes/src/types/struct_type.rs b/src/datatypes/src/types/struct_type.rs index 3da18bb6cc..2fa7a9d91d 100644 --- a/src/datatypes/src/types/struct_type.rs +++ b/src/datatypes/src/types/struct_type.rs @@ -28,22 +28,21 @@ pub struct StructType { fields: Arc>, } -impl TryFrom<&Fields> for StructType { - type Error = crate::error::Error; - fn try_from(value: &Fields) -> Result { +impl From<&Fields> for StructType { + fn from(value: &Fields) -> Self { let fields = value .iter() .map(|field| { - Ok(StructField::new( + StructField::new( field.name().clone(), - ConcreteDataType::try_from(field.data_type())?, + ConcreteDataType::from_arrow_type(field.data_type()), field.is_nullable(), - )) + ) }) - .collect::, Self::Error>>()?; - Ok(StructType { + .collect::>(); + StructType { fields: Arc::new(fields), - }) + } } } diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 13193fe0b0..5d703480d3 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -1205,7 +1205,7 @@ impl TryFrom for Value { .map(|v| Value::Decimal128(Decimal128::new(v, p, s))) .unwrap_or(Value::Null), ScalarValue::Struct(struct_array) => { - let struct_type: StructType = (struct_array.fields()).try_into()?; + let struct_type = StructType::from(struct_array.fields()); let items = struct_array .columns() .iter() @@ -1741,6 +1741,7 @@ pub(crate) mod tests { use super::*; use crate::json::value::{JsonVariant, JsonVariantRef}; use crate::types::StructField; + use crate::types::json_type::{JsonNativeType, JsonObjectType}; use crate::vectors::ListVectorBuilder; pub(crate) fn build_struct_type() -> StructType { @@ -1791,10 +1792,6 @@ pub(crate) mod tests { ScalarValue::Struct(Arc::new(struct_arrow_array)) } - pub fn build_list_type() -> ConcreteDataType { - ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::boolean_datatype())) - } - pub(crate) fn build_list_value() -> ListValue { let items = vec![Value::Boolean(true), Value::Boolean(false)]; ListValue::new(items, Arc::new(ConcreteDataType::boolean_datatype())) @@ -2274,39 +2271,26 @@ pub(crate) mod tests { ); check_type_and_value( - &ConcreteDataType::json_native_datatype(ConcreteDataType::boolean_datatype()), + &ConcreteDataType::json2(JsonNativeType::Bool), &Value::Json(Box::new(true.into())), ); check_type_and_value( - &ConcreteDataType::json_native_datatype(build_list_type()), + &ConcreteDataType::json2(JsonNativeType::Array(Box::new(JsonNativeType::Bool))), &Value::Json(Box::new([true].into())), ); check_type_and_value( - &ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype( - StructType::new(Arc::new(vec![ - StructField::new( - "address".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - StructField::new("age".to_string(), ConcreteDataType::uint64_datatype(), true), - StructField::new( - "awards".to_string(), - ConcreteDataType::list_datatype(Arc::new( - ConcreteDataType::boolean_datatype(), - )), - true, - ), - StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true), - StructField::new( - "name".to_string(), - ConcreteDataType::string_datatype(), - true, - ), - ])), - )), + &ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([ + ("address".to_string(), JsonNativeType::String), + ("age".to_string(), JsonNativeType::u64()), + ( + "awards".to_string(), + JsonNativeType::Array(Box::new(JsonNativeType::Bool)), + ), + ("id".to_string(), JsonNativeType::i64()), + ("name".to_string(), JsonNativeType::String), + ]))), &Value::Json(Box::new( [ ("id", JsonVariant::from(1i64)), diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 21bb371617..b2689eee2c 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -240,7 +240,7 @@ impl Helper { ConstantVector::new(Arc::new(vector), length) } ScalarValue::Struct(v) => { - let struct_type = StructType::try_from(v.fields())?; + let struct_type = StructType::from(v.fields()); ConstantVector::new( Arc::new(StructVector::try_new(struct_type, (*v).clone())?), length, @@ -401,7 +401,7 @@ impl Helper { .downcast_ref::() .unwrap(); Arc::new(StructVector::try_new( - StructType::try_from(fields)?, + StructType::from(fields), array.clone(), )?) } diff --git a/src/datatypes/src/vectors/struct_vector.rs b/src/datatypes/src/vectors/struct_vector.rs index d3c44840aa..23d8b95042 100644 --- a/src/datatypes/src/vectors/struct_vector.rs +++ b/src/datatypes/src/vectors/struct_vector.rs @@ -220,7 +220,7 @@ impl TryFrom for StructVector { fn try_from(array: StructArray) -> Result { let fields = match array.data_type() { - ArrowDataType::Struct(fields) => StructType::try_from(fields)?, + ArrowDataType::Struct(fields) => StructType::from(fields), other => ConversionSnafu { from: other.to_string(), } diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index 23ba1411f5..880ae0f685 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -182,6 +182,7 @@ fn bulk_part_converter(c: &mut Criterion) { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: false, + ..Default::default() }, ); let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false); @@ -208,6 +209,7 @@ fn bulk_part_converter(c: &mut Criterion) { &FlatSchemaOptions { raw_pk_columns: true, string_pk_use_dict: true, + ..Default::default() }, ); let mut converter = diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 4b8ef71a56..5cf3c444a8 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -29,6 +29,7 @@ use std::time::Instant; use api::v1::region::compact_request; use api::v1::region::compact_request::Options; +use arrow_schema::Schema; use common_base::Plugins; use common_base::cancellation::CancellationHandle; use common_memory_manager::OnExhaustedPolicy; @@ -39,6 +40,11 @@ use common_time::timestamp::TimeUnit; use common_time::{TimeToLive, Timestamp}; use datafusion_common::ScalarValue; use datafusion_expr::Expr; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type::JsonNativeType; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::metadata::PageIndexPolicy; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -54,13 +60,15 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ - CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu, - ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, - RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, + CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error, + GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu, + RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result, + TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::FlatSource; use crate::read::flat_projection::FlatProjectionMapper; +use crate::read::read_columns::ReadColumns; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; use crate::region::options::{MergeMode, RegionOptions}; @@ -72,6 +80,7 @@ use crate::schedule::remote_job_scheduler::{ }; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::{FileHandle, FileMeta, Level}; +use crate::sst::parquet::reader::MetadataCacheMetrics; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -996,19 +1005,56 @@ impl CompactionSstReaderBuilder<'_> { /// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files, /// for compaction. The schema of the [FlatSource] is unified. async fn build_flat_sst_reader(self) -> Result { - let scan_input = self.build_scan_input()?.with_compaction(true); + let scan_input = self.build_scan_input().await?.with_compaction(true); let schema = scan_input.mapper.output_schema(); let schema = schema.arrow_schema(); - SeqScan::new(scan_input) + let stream = SeqScan::new(scan_input) .build_flat_reader_for_compaction() - .await - .map(|stream| FlatSource::new_stream(schema.clone(), stream)) + .await?; + Ok(FlatSource::new_stream(schema.clone(), stream)) } - fn build_scan_input(self) -> Result { - let mapper = FlatProjectionMapper::all(&self.metadata)?; + async fn build_scan_input(self) -> Result { + let schema = self.metadata.schema.arrow_schema(); + let json_type_hint = if schema.has_json_extension_field() { + let mut json_type_hint = schema + .fields() + .iter() + .filter(|&field| is_json_extension_type(field)) + .map(|field| (field.name().clone(), JsonNativeType::Null)) + .collect::>(); + + let schemas = self.collect_arrow_schemas_from_parquet().await?; + for schema in schemas { + for field in schema.fields() { + let Some(merged) = json_type_hint.get_mut(field.name()) else { + continue; + }; + + let json_type = JsonNativeType::try_from(field.data_type()) + .context(DataTypeMismatchSnafu)?; + merged.merge(&json_type); + } + } + + Some(json_type_hint) + } else { + None + }; + + let projection = (0..self.metadata.column_metadatas.len()).collect(); + let read_columns = ReadColumns::from_deduped_column_ids( + self.metadata.column_metadatas.iter().map(|x| x.column_id), + ); + let mapper = FlatProjectionMapper::new_with_read_columns( + &self.metadata, + projection, + read_columns, + json_type_hint.as_ref(), + )?; + let mut scan_input = ScanInput::new(self.sst_layer, mapper) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) @@ -1028,6 +1074,43 @@ impl CompactionSstReaderBuilder<'_> { Ok(scan_input) } + + async fn collect_arrow_schemas_from_parquet(&self) -> Result> { + let mut schemas = Vec::with_capacity(self.inputs.len()); + + for file_handle in self.inputs { + let file_path = + file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type()); + let file_size = file_handle.meta_ref().file_size; + let parquet_metadata = match self + .sst_layer + .read_sst(file_handle.clone()) + .cache(CacheStrategy::Compaction(self.cache.clone())) + .read_parquet_metadata( + &file_path, + file_size, + &mut MetadataCacheMetrics::default(), + PageIndexPolicy::default(), + ) + .await + .map(|x| x.0.parquet_metadata()) + { + Ok(x) => x, + Err(e) if e.is_object_not_found() => continue, + Err(e) => return Err(e), + }; + let file_metadata = parquet_metadata.file_metadata(); + + let schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .context(ParquetToArrowSchemaSnafu { file: file_path })?; + + schemas.push(schema); + } + Ok(schemas) + } } /// Converts time range to predicates so that rows outside the range will be filtered. diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 9ae748205e..d61cf8470f 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1241,6 +1241,15 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to generate Arrow schema from Parquet file: {}", file))] + ParquetToArrowSchema { + file: String, + #[snafu(source)] + error: parquet::errors::ParquetError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1331,7 +1340,8 @@ impl ErrorExt for Error { | BuildEntry { .. } | Metadata { .. } | CastColumn { .. } - | MitoManifestInfo { .. } => StatusCode::Internal, + | MitoManifestInfo { .. } + | ParquetToArrowSchema { .. } => StatusCode::Internal, FetchManifests { source, .. } => source.status_code(), diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 126e4fbeb2..e094b662f9 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -2115,6 +2115,7 @@ mod tests { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: true, + ..Default::default() }, ); @@ -2552,6 +2553,7 @@ mod tests { &FlatSchemaOptions { raw_pk_columns: false, string_pk_use_dict: true, + ..Default::default() }, ); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index d0159fe3d6..567754f939 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -1163,8 +1163,7 @@ impl FlatSource { } } - #[expect(unused)] - fn schema(&self) -> &SchemaRef { + pub(crate) fn schema(&self) -> &SchemaRef { &self.schema } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index 6d6f535068..c00aa3189c 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -25,7 +25,9 @@ use datatypes::arrow::compute::{TakeOptions, take}; use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; +use datatypes::extension::json::is_json_extension_type; use datatypes::prelude::DataType; +use datatypes::schema::ext::ArrowSchemaExt; use datatypes::value::Value; use datatypes::vectors::VectorRef; use datatypes::vectors::json::array::JsonArray; @@ -43,15 +45,19 @@ use crate::error::{ EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu, }; use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns}; -use crate::sst::parquet::flat_format::primary_key_column_index; -use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray}; +use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index}; +use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, PrimaryKeyArray}; use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id}; -/// Returns true if `left` and `right` have same columns and primary key encoding. +/// Returns true if the columns in the `projection_mapper` and `read_format` have same data types +/// and primary key encodings. pub(crate) fn has_same_columns_and_pk_encoding( - left: &RegionMetadata, - right: &RegionMetadata, + projection_mapper: &FlatProjectionMapper, + read_format: &FlatReadFormat, + compaction: bool, ) -> bool { + let left = projection_mapper.metadata(); + let right = read_format.metadata(); if left.primary_key_encoding != right.primary_key_encoding { return false; } @@ -61,17 +67,13 @@ pub(crate) fn has_same_columns_and_pk_encoding( } for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) { - if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) { + if left_col.column_id != right_col.column_id { return false; } - debug_assert_eq!( - left_col.column_schema.data_type, - right_col.column_schema.data_type - ); debug_assert_eq!(left_col.semantic_type, right_col.semantic_type); } - true + &projection_mapper.input_arrow_schema(compaction) == read_format.arrow_schema() } /// A helper struct to adapt schema of the batch to an expected schema. @@ -88,16 +90,28 @@ impl FlatCompatBatch { /// Creates a [FlatCompatBatch]. /// /// - `mapper` is built from the metadata users expect to see. - /// - `actual` is the [RegionMetadata] of the input parquet. - /// - `format_projection` is the projection of the read format for the input parquet. + /// - `read_format` is the [FlatReadFormat] of the input parquet. /// - `compaction` indicates whether the reader is for compaction. pub(crate) fn try_new( mapper: &FlatProjectionMapper, - actual: &RegionMetadataRef, - format_projection: &FormatProjection, + read_format: &FlatReadFormat, compaction: bool, ) -> Result> { - let actual_schema = flat_projected_columns(actual, format_projection); + let actual = read_format.metadata(); + let format_projection = read_format.format_projection(); + let mut actual_schema = flat_projected_columns(actual, format_projection); + if read_format.arrow_schema().has_json_extension_field() { + for field in read_format.arrow_schema().fields() { + if is_json_extension_type(field) + && let Some(column_id) = + actual.column_by_name(field.name()).map(|x| x.column_id) + && let Some(i) = actual_schema.iter().position(|x| x.0 == column_id) + { + actual_schema[i].1 = ConcreteDataType::from_arrow_type(field.data_type()); + } + } + } + let expect_schema = mapper.batch_schema(); if expect_schema == actual_schema { // Although the SST has a different schema, but the schema after projection is the same @@ -152,10 +166,12 @@ impl FlatCompatBatch { expect_column.column_id, ))); } else { - fields.push(Arc::new(with_field_id( - (**column_field).clone(), + let field = with_field_id( + Arc::unwrap_or_clone(column_field.clone()), expect_column.column_id, - ))); + ) + .with_data_type(expect_data_type.as_arrow_type()); + fields.push(Arc::new(field)) }; if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) { @@ -275,6 +291,9 @@ impl FlatCompatBatch { ) .collect::>>()?; + common_telemetry::debug!("index_or_defaults: {:?}", self.index_or_defaults); + common_telemetry::debug!("arrow_schema: {:?}", self.arrow_schema); + common_telemetry::debug!("columns: {:?}", columns); let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns) .context(NewRecordBatchSnafu)?; @@ -720,12 +739,10 @@ mod tests { false, ) .unwrap(); - let format_projection = read_format.format_projection(); - let compat_batch = - FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false) - .unwrap() - .unwrap(); + let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false) + .unwrap() + .unwrap(); let mut tag_builder = StringDictionaryBuilder::::new(); tag_builder.append_value("tag1"); @@ -804,6 +821,7 @@ mod tests { &expected_metadata, vec![1, 2], ReadColumns::from_deduped_column_ids([1, 2, 3]), + None, ) .unwrap(); let read_format = FlatReadFormat::new( @@ -814,12 +832,10 @@ mod tests { false, ) .unwrap(); - let format_projection = read_format.format_projection(); - let compat_batch = - FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false) - .unwrap() - .unwrap(); + let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false) + .unwrap() + .unwrap(); let mut tag_builder = StringDictionaryBuilder::::new(); tag_builder.append_value("tag1"); @@ -905,12 +921,10 @@ mod tests { false, ) .unwrap(); - let format_projection = read_format.format_projection(); - let compat_batch = - FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false) - .unwrap() - .unwrap(); + let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false) + .unwrap() + .unwrap(); // Tag array. let mut tag1_builder = StringDictionaryBuilder::::new(); @@ -999,12 +1013,10 @@ mod tests { true, ) .unwrap(); - let format_projection = read_format.format_projection(); - let compat_batch = - FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true) - .unwrap() - .unwrap(); + let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, true) + .unwrap() + .unwrap(); let sparse_k1 = encode_sparse_key(&[]); let input_columns: Vec = vec![ diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 523ee6e684..583370a202 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -14,6 +14,7 @@ //! Utilities for projection on flat format. +use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; @@ -27,6 +28,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field}; use datatypes::extension::json::is_json_extension_type; use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; +use datatypes::types::json_type::JsonNativeType; use datatypes::value::Value; use datatypes::vectors::Helper; use datatypes::vectors::json::array::JsonArray; @@ -84,7 +86,7 @@ impl FlatProjectionMapper { let projection: Vec<_> = projection.into_iter().collect(); let read_column_ids = read_column_ids_from_projection(metadata, &projection)?; let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids); - Self::new_with_read_columns(metadata, projection, read_cols) + Self::new_with_read_columns(metadata, projection, read_cols, None) } /// Returns a new mapper with output projection and explicit read columns. @@ -92,6 +94,7 @@ impl FlatProjectionMapper { metadata: &RegionMetadataRef, projection: Vec, read_cols: ReadColumns, + json_type_hint: Option<&HashMap>, ) -> Result { // If the original projection is empty. let is_empty_projection = projection.is_empty(); @@ -109,7 +112,16 @@ impl FlatProjectionMapper { reason: format!("projection index {} is out of bound", idx), })?; output_col_ids.push(col.column_id); - col_schemas.push(col.column_schema.clone()); + + let mut schema = col.column_schema.clone(); + if let Some(concretized) = json_type_hint + .and_then(|x| x.get(&schema.name)) + .cloned() + .map(ConcreteDataType::json2) + { + schema.data_type = concretized; + } + col_schemas.push(schema); } // Creates a map to lookup index. @@ -123,7 +135,21 @@ impl FlatProjectionMapper { read_cols.clone(), ); - let batch_schema = flat_projected_columns(metadata, &format_projection); + let mut batch_schema = flat_projected_columns(metadata, &format_projection); + + if let Some(json_type_hint) = json_type_hint + && !json_type_hint.is_empty() + { + for (column_id, data_type) in batch_schema.iter_mut() { + if let Some(concretized) = metadata + .column_by_id(*column_id) + .and_then(|x| json_type_hint.get(&x.column_schema.name).cloned()) + .map(ConcreteDataType::json2) + { + *data_type = concretized; + } + } + } // Safety: We get the column id from the metadata. let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema); @@ -228,10 +254,15 @@ impl FlatProjectionMapper { self.input_arrow_schema.clone() } else { // For compaction, we need to build a different schema from encoding. - to_flat_sst_arrow_schema( - &self.metadata, - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) + let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); + options.concretized_json_types = self + .input_arrow_schema + .fields() + .iter() + .filter(|&field| is_json_extension_type(field)) + .map(|field| (field.name().clone(), field.data_type().clone())) + .collect(); + to_flat_sst_arrow_schema(&self.metadata, &options) } } @@ -242,10 +273,6 @@ impl FlatProjectionMapper { self.output_schema.clone() } - pub(crate) fn with_output_schema(&mut self, schema: SchemaRef) { - self.output_schema = schema; - } - /// Converts a flat format [RecordBatch] to a normal [RecordBatch]. /// /// The batch must match the `projection` using to build the mapper. @@ -407,13 +434,14 @@ pub(crate) fn compute_input_arrow_schema( batch_schema: &[(ColumnId, ConcreteDataType)], ) -> datatypes::arrow::datatypes::SchemaRef { let mut new_fields = Vec::with_capacity(batch_schema.len() + 3); - for (column_id, _) in batch_schema { + for (column_id, data_type) in batch_schema { let column_metadata = metadata.column_by_id(*column_id).unwrap(); let field = Field::new( &column_metadata.column_schema.name, - column_metadata.column_schema.data_type.as_arrow_type(), + data_type.as_arrow_type(), column_metadata.column_schema.is_nullable(), - ); + ) + .with_metadata(column_metadata.column_schema.metadata().clone()); let field = with_field_id(field, *column_id); if column_metadata.semantic_type == SemanticType::Tag { new_fields.push(tag_maybe_to_dictionary_field( @@ -454,7 +482,8 @@ impl CompactionProjectionMapper { let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id); let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids); - let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?; + let mapper = + FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols, None)?; let assembler = DfBatchAssembler::new(mapper.output_schema()); Ok(Self { mapper, assembler }) diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index c3d8e4323d..27fe76345b 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -296,6 +296,7 @@ mod tests { &metadata, vec![4, 1], ReadColumns::from_deduped_column_ids([4, 1, 3]), + None, ) .unwrap(); assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice()); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index fb30913534..ecd7e23417 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,7 +14,7 @@ //! Scans a region according to the scan request. -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::fmt; use std::num::NonZeroU64; use std::sync::Arc; @@ -31,11 +31,7 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; use datafusion_common::Column; use datafusion_expr::Expr; use datafusion_expr::utils::expr_to_columns; -use datatypes::data_type::ConcreteDataType; -use datatypes::extension::json::is_json_extension_type; -use datatypes::schema::Schema; use datatypes::schema::ext::ArrowSchemaExt; -use datatypes::types::json_type::JsonNativeType; use futures::StreamExt; use partition::expr::PartitionExpr; use smallvec::SmallVec; @@ -429,15 +425,24 @@ impl ScanRegion { let read_col_ids = read_cols.column_ids(); // The mapper always computes projected column ids as the schema of SSTs may change. - let mut mapper = match self.request.projection_indices() { - Some(p) => FlatProjectionMapper::new_with_read_columns( - &self.version.metadata, - p.to_vec(), - read_cols, - )?, - None => FlatProjectionMapper::all(&self.version.metadata)?, - }; - concretize_json_types(&mut mapper, &self.request.json_type_hint); + let projection = self + .request + .projection_indices() + .map(|x| x.to_vec()) + .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect()); + let json_type_hint = self + .version + .metadata + .schema + .arrow_schema() + .has_json_extension_field() + .then_some(&self.request.json_type_hint); + let mapper = FlatProjectionMapper::new_with_read_columns( + &self.version.metadata, + projection, + read_cols, + json_type_hint, + )?; let ssts = &self.version.ssts; let mut files = Vec::new(); @@ -733,34 +738,6 @@ impl ScanRegion { } } -fn concretize_json_types( - mapper: &mut FlatProjectionMapper, - json_type_hint: &HashMap, -) { - let output_schema = mapper.output_schema(); - let output_arrow_schema = output_schema.arrow_schema(); - if !output_arrow_schema.has_json_extension_field() { - return; - } - - let mut column_schemas = output_schema.column_schemas().to_vec(); - for (idx, column_schema) in column_schemas.iter_mut().enumerate() { - if !is_json_extension_type(&output_arrow_schema.fields()[idx]) { - continue; - } - let Some(json_type) = json_type_hint.get(&column_schema.name) else { - continue; - }; - column_schema.data_type = ConcreteDataType::from(json_type); - } - - let output_schema = Arc::new(Schema::new_with_version( - column_schemas, - output_schema.version(), - )); - mapper.with_output_schema(output_schema); -} - /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { @@ -1126,16 +1103,16 @@ impl ScanInput { }; let need_compat = !compat::has_same_columns_and_pk_encoding( - self.mapper.metadata(), - file_range_ctx.read_format().metadata(), + &self.mapper, + file_range_ctx.read_format(), + self.compaction, ); if need_compat { // They have different schema. We need to adapt the batch first so the // mapper can convert it. let compat = FlatCompatBatch::try_new( &self.mapper, - file_range_ctx.read_format().metadata(), - file_range_ctx.read_format().format_projection(), + file_range_ctx.read_format(), self.compaction, )?; file_range_ctx.set_compat_batch(compat); @@ -1774,19 +1751,15 @@ impl PredicateGroup { #[cfg(test)] mod tests { - use std::collections::HashMap; use std::sync::Arc; use datafusion::physical_plan::expressions::lit as physical_lit; use datafusion_common::ScalarValue; use datafusion_expr::{col, lit}; - use datatypes::extension::json::{JsonExtensionType, JsonMetadata}; - use datatypes::schema::ColumnSchema; - use datatypes::types::json_type::JsonObjectType; use datatypes::value::Value; use partition::expr::col as partition_col; - use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; - use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector}; + use store_api::metadata::RegionMetadataBuilder; + use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector}; use super::*; use crate::cache::CacheManager; @@ -1818,113 +1791,6 @@ mod tests { lit(ScalarValue::TimestampMillisecond(Some(val), None)) } - fn metadata_with_json_field() -> RegionMetadataRef { - let mut json_column = ColumnSchema::new( - "payload", - ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())), - true, - ); - json_column - .with_extension_type(&JsonExtensionType::new(Arc::new(JsonMetadata::default()))) - .unwrap(); - - let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1)); - builder - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "host", - ConcreteDataType::string_datatype(), - false, - ), - semantic_type: SemanticType::Tag, - column_id: 1, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 2, - }) - .push_column_metadata(ColumnMetadata { - column_schema: json_column, - semantic_type: SemanticType::Field, - column_id: 3, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true), - semantic_type: SemanticType::Field, - column_id: 4, - }) - .primary_key(vec![1]); - - Arc::new(builder.build().unwrap()) - } - - fn concrete_json_type_hint() -> JsonNativeType { - JsonNativeType::Object(JsonObjectType::from([ - ("active".to_string(), JsonNativeType::Bool), - ("name".to_string(), JsonNativeType::String), - ])) - } - - #[test] - fn test_concretize_json_types_rewrites_json_output_schema() -> Result<()> { - let metadata = metadata_with_json_field(); - let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3])?; - let output_schema = mapper.output_schema(); - let original_arrow_schema = output_schema.arrow_schema(); - assert!(original_arrow_schema.has_json_extension_field()); - assert!(is_json_extension_type(&original_arrow_schema.fields()[1])); - - let expected_type = concrete_json_type_hint(); - concretize_json_types( - &mut mapper, - &HashMap::from([("payload".to_string(), expected_type.clone())]), - ); - - let output_schema = mapper.output_schema(); - let output_arrow_schema = output_schema.arrow_schema(); - assert!(output_arrow_schema.has_json_extension_field()); - assert_eq!( - ConcreteDataType::string_datatype(), - output_schema.column_schemas()[0].data_type - ); - assert_eq!( - ConcreteDataType::from(&expected_type), - output_schema.column_schemas()[1].data_type - ); - assert_eq!( - ConcreteDataType::int64_datatype(), - output_schema.column_schemas()[2].data_type - ); - Ok(()) - } - - #[test] - fn test_concretize_json_types_keeps_json_without_hint() { - let metadata = metadata_with_json_field(); - let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap(); - - concretize_json_types( - &mut mapper, - &HashMap::from([("missing".to_string(), JsonNativeType::String)]), - ); - - let output_schema = mapper.output_schema(); - let output_arrow_schema = output_schema.arrow_schema(); - assert!(output_arrow_schema.has_json_extension_field()); - assert!(is_json_extension_type(&output_arrow_schema.fields()[1])); - - // Assert that the expected JSON type stays un-concretized: empty object. - assert_eq!( - ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())), - output_schema.column_schemas()[1].data_type - ); - } - #[tokio::test] async fn test_build_scan_fingerprint_for_eligible_scan() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 432099dbcf..6bc796c85c 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -130,7 +130,7 @@ impl SeqScan { /// /// # Panics /// Panics if the compaction flag is not set. - pub async fn build_flat_reader_for_compaction(&self) -> Result { + pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result { assert!(self.stream_ctx.input.compaction); let metrics_set = ExecutionPlanMetricsSet::new(); diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 1007b57668..cd6eb627a6 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -14,9 +14,11 @@ //! Sorted strings tables. +use std::collections::HashMap; use std::sync::Arc; use api::v1::SemanticType; +use arrow_schema::DataType; use common_base::readable_size::ReadableSize; use datatypes::arrow::datatypes::{ DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, @@ -108,6 +110,9 @@ pub struct FlatSchemaOptions { /// when storing primary key columns. /// Only takes effect when `raw_pk_columns` is true. pub string_pk_use_dict: bool, + /// The column's concretized JSON types, to be set into Arrow schema. + /// Otherwise it's empty struct in the Arrow schema. + pub concretized_json_types: HashMap, } impl Default for FlatSchemaOptions { @@ -115,6 +120,7 @@ impl Default for FlatSchemaOptions { Self { raw_pk_columns: true, string_pk_use_dict: true, + concretized_json_types: HashMap::new(), } } } @@ -128,6 +134,7 @@ impl FlatSchemaOptions { Self { raw_pk_columns: false, string_pk_use_dict: false, + concretized_json_types: HashMap::new(), } } } @@ -159,6 +166,7 @@ pub fn to_flat_sst_arrow_schema( &metadata.column_metadatas[pk_index].column_schema.data_type, old_field, ); + let new_field = concretize_json_type(new_field, options); fields.push(Arc::new(with_field_id((*new_field).clone(), column_id))); } } @@ -169,8 +177,9 @@ pub fn to_flat_sst_arrow_schema( .zip(&metadata.column_metadatas) .filter_map(|(field, column_meta)| { if column_meta.semantic_type == SemanticType::Field { + let field = concretize_json_type(field.clone(), options); Some(Arc::new(with_field_id( - (**field).clone(), + Arc::unwrap_or_clone(field), column_meta.column_id, ))) } else { @@ -189,6 +198,16 @@ pub fn to_flat_sst_arrow_schema( Arc::new(Schema::new(fields)) } +fn concretize_json_type(field: Arc, options: &FlatSchemaOptions) -> Arc { + if let Some(data_type) = options.concretized_json_types.get(field.name()) { + let mut field = Arc::unwrap_or_clone(field); + field.set_data_type(data_type.clone()); + Arc::new(field) + } else { + field + } +} + /// Returns the number of columns in the flat format. pub fn flat_sst_arrow_schema_column_num( metadata: &RegionMetadata, diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d8111dbe62..8929866ded 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -41,7 +41,8 @@ use datatypes::arrow::datatypes::{Schema, SchemaRef}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::{ConcreteDataType, DataType}; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec}; -use parquet::file::metadata::RowGroupMetaData; +use parquet::arrow::parquet_to_arrow_schema; +use parquet::file::metadata::{FileMetaData, RowGroupMetaData}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; @@ -49,7 +50,7 @@ use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu, - NewRecordBatchSnafu, Result, + NewRecordBatchSnafu, ParquetToArrowSchemaSnafu, Result, }; use crate::read::read_columns::ReadColumns; use crate::sst::parquet::format::{ @@ -165,10 +166,11 @@ impl FlatReadFormat { pub fn new( metadata: RegionMetadataRef, read_cols: ReadColumns, - num_columns: Option, + file_metadata: Option<&FileMetaData>, file_path: &str, skip_auto_convert: bool, ) -> Result { + let num_columns = file_metadata.map(|x| x.schema_descr().num_columns()); let is_legacy = match num_columns { Some(num) => Self::is_legacy_format(&metadata, num, file_path)?, None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse, @@ -189,7 +191,13 @@ impl FlatReadFormat { )) } } else { - ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols)) + let arrow_schema = file_metadata + .map(|x| parquet_to_arrow_schema(x.schema_descr(), x.key_value_metadata())) + .transpose() + .context(ParquetToArrowSchemaSnafu { file: file_path })? + .map(Arc::new) + .unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default())); + ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, arrow_schema)) }; Ok(FlatReadFormat { @@ -248,9 +256,6 @@ impl FlatReadFormat { } /// Gets the arrow schema of the SST file. - /// - /// This schema is computed from the region metadata but should be the same - /// as the arrow schema decoded from the file metadata. pub(crate) fn arrow_schema(&self) -> &SchemaRef { match &self.parquet_adapter { ParquetAdapter::Flat(p) => &p.arrow_schema, @@ -483,10 +488,13 @@ struct ParquetFlat { impl ParquetFlat { /// Creates a helper with existing `metadata` and `column_ids` to read. - fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> ParquetFlat { + fn new( + metadata: RegionMetadataRef, + read_cols: ReadColumns, + arrow_schema: SchemaRef, + ) -> ParquetFlat { // Creates a map to lookup index. let id_to_index = sst_column_id_indices(&metadata); - let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); let sst_column_num = flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default()); let format_projection = diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index b9afdfaac7..43cfe8fd38 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -832,6 +832,9 @@ mod tests { use mito_codec::row_converter::{ DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, }; + use parquet::basic::Type as PhysicalType; + use parquet::file::metadata::FileMetaData; + use parquet::schema::types::{SchemaDescriptor, Type}; use store_api::codec::PrimaryKeyEncoding; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; @@ -890,6 +893,33 @@ mod tests { Arc::new(builder.build().unwrap()) } + fn build_test_file_metadata(fields: Vec<&str>) -> FileMetaData { + FileMetaData::new( + 0, + 0, + None, + None, + Arc::new(SchemaDescriptor::new(Arc::new( + Type::group_type_builder("x") + .with_fields( + fields + .into_iter() + .map(|field| { + Arc::new( + Type::primitive_type_builder(field, PhysicalType::INT64) + .build() + .unwrap(), + ) + }) + .collect(), + ) + .build() + .unwrap(), + ))), + None, + ) + } + fn build_test_arrow_schema() -> SchemaRef { let fields = vec![ make_field("field1", ArrowDataType::Int64, true, Some(4)), @@ -1323,7 +1353,9 @@ mod tests { let mut format = FlatReadFormat::new( metadata, ReadColumns::from_deduped_column_ids(std::iter::once(1)), // Just read tag0 - Some(8), + Some(&build_test_file_metadata(vec![ + "tag0", "field0", "tag1", "field1", "ts", "tag2", "field2", "tag3", + ])), "test", false, ) @@ -1540,7 +1572,9 @@ mod tests { let format = FlatReadFormat::new( metadata.clone(), ReadColumns::from_deduped_column_ids(column_ids), - Some(6), + Some(&build_test_file_metadata(vec![ + "tag0", "field0", "tag1", "field1", "ts", "tag2", + ])), "test", false, ) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 26d5cb7b61..396fdfcdb6 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -393,7 +393,7 @@ impl ParquetReaderBuilder { let mut read_format = FlatReadFormat::new( region_meta.clone(), read_cols, - Some(parquet_meta.file_metadata().schema_descr().num_columns()), + Some(parquet_meta.file_metadata()), &file_path, skip_auto_convert, )?; @@ -585,7 +585,7 @@ impl ParquetReaderBuilder { /// Reads parquet metadata of specific file. /// Returns (fused metadata, cache_miss_flag). - async fn read_parquet_metadata( + pub(crate) async fn read_parquet_metadata( &self, file_path: &str, file_size: u64, diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 373f488f04..5b7b1c310e 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -14,6 +14,7 @@ //! Parquet writer. +use std::collections::HashMap; use std::future::Future; use std::mem; use std::pin::Pin; @@ -31,6 +32,8 @@ use datatypes::arrow::array::{ use datatypes::arrow::compute::{max, min}; use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::extension::json::is_json_extension_type; +use datatypes::schema::ext::ArrowSchemaExt; use object_store::{FuturesAsyncWriter, ObjectStore}; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; @@ -271,12 +274,21 @@ where override_sequence: Option, opts: &WriteOptions, ) -> Result { + let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding); + + if source.schema().has_json_extension_field() { + options.concretized_json_types = source + .schema() + .fields() + .iter() + .filter(|&field| is_json_extension_type(field)) + .map(|field| (field.name().clone(), field.data_type().clone())) + .collect::>(); + } + let converter = FlatBatchConverter::Flat( - FlatWriteFormat::new( - self.metadata.clone(), - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) - .with_override_sequence(override_sequence), + FlatWriteFormat::new(self.metadata.clone(), &options) + .with_override_sequence(override_sequence), ); let res = self.write_all_flat_inner(source, &converter, opts).await; if res.is_err() { diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index 5bdc641433..fdae802f3b 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -42,6 +42,14 @@ admin flush_table('json2_table'); | 0 | +----------------------------------+ +admin compact_table('json2_table', 'swcs', '86400'); + ++-----------------------------------------------------+ +| ADMIN compact_table('json2_table', 'swcs', '86400') | ++-----------------------------------------------------+ +| 0 | ++-----------------------------------------------------+ + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); @@ -131,10 +139,10 @@ select j.d from json2_table order by ts; +-----------------------------------+ | json_get(json2_table.j,Utf8("d")) | +-----------------------------------+ -| [{e: {f: 0.1}}] | -| [{e: {f: 0.2}}] | +| [{e: {f: 0.1, g: }}] | +| [{e: {f: 0.2, g: }}] | | | -| [{e: {g: -0.4}}] | +| [{e: {f: , g: -0.4}}] | | | | | | [{e: {g: -0.7}}] | diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 75c7b46b41..57e113f8be 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'), admin flush_table('json2_table'); +admin compact_table('json2_table', 'swcs', '86400'); + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'), (8, '{"a": {"b": 8}, "c": "s8"}');