diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 9070e2babe..65607ddf29 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -14,6 +14,7 @@ mod column_schema; pub mod constraint; +pub mod ext; use std::collections::HashMap; use std::fmt; diff --git a/src/datatypes/src/schema/ext.rs b/src/datatypes/src/schema/ext.rs new file mode 100644 index 0000000000..d36e6f13d8 --- /dev/null +++ b/src/datatypes/src/schema/ext.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::extension::json; + +pub trait ArrowSchemaExt { + fn has_json_extension_field(&self) -> bool; +} + +impl ArrowSchemaExt for arrow_schema::Schema { + fn has_json_extension_field(&self) -> bool { + self.fields().iter().any(json::is_json_extension_type) + } +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index dd575ac687..be0702010f 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -22,6 +22,7 @@ use std::time::Instant; use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; +use datatypes::schema::ext::ArrowSchemaExt; use either::Either; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; @@ -40,6 +41,7 @@ use crate::error::{ RegionTruncatedSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::memtable; use crate::memtable::bulk::ENCODE_ROW_THRESHOLD; use crate::memtable::{ BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions, @@ -587,6 +589,7 @@ impl RegionFlushTask { &version.metadata, &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding), ); + let batch_schema = maybe_merge_json_fields(batch_schema, &mem_ranges); let flat_sources = memtable_flat_sources( batch_schema, mem_ranges, @@ -762,6 +765,16 @@ struct FlatSources { encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, } +fn maybe_merge_json_fields(base: SchemaRef, mem_ranges: &MemtableRanges) -> SchemaRef { + if !base.has_json_extension_field() { + return base; + } + let Some(schema) = mem_ranges.schema() else { + return base; + }; + memtable::merge_json_extension_fields(&base, &[schema]) +} + /// Returns the max sequence and [FlatSource] for the given memtable. fn memtable_flat_sources( schema: SchemaRef, diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 5fc91e66a8..63db9dc2bb 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,6 +14,7 @@ //! Memtables are write buffers for regions. +use std::borrow::Cow; use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -59,7 +60,10 @@ pub use bulk::part::{ BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size, sort_primary_key_record_batch, }; -use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::datatypes::{Schema, SchemaRef}; +use datatypes::extension::json; +use datatypes::schema::ext::ArrowSchemaExt; +use datatypes::types::json_type; #[cfg(any(test, feature = "test"))] pub use time_partition::filter_record_batch; @@ -226,6 +230,55 @@ impl MemtableRanges { .max() .unwrap_or(0) } + + pub(crate) fn schema(&self) -> Option { + let mut schemas = self + .ranges + .values() + .filter_map(|x| x.record_batch_schema()) + .collect::>(); + + if schemas.iter().all(|x| !x.has_json_extension_field()) { + // If there are no JSON extension fields in any schemas, the invariant must be hold, + // that all schemas are same (they are all derived from same region metadata). + // So it's ok to return the first one as the schema of the whole memtable ranges. + return (!schemas.is_empty()).then(|| schemas.swap_remove(0)); + } + + // If there are JSON extension fields, by convention, only their concrete data types + // (Arrow's Struct) may differ. Other things like the metadata or the fields count are same. + // So to produce the final schema, we can solely merge the data types. + schemas + .split_first() + .map(|(first, rest)| merge_json_extension_fields(first, rest)) + } +} + +pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef { + let mut fields = base.fields().iter().cloned().collect::>(); + for (i, field) in fields.iter_mut().enumerate() { + if !json::is_json_extension_type(field) { + continue; + } + + let merged = others + .iter() + .map(|x| Cow::Borrowed(x.field(i).data_type())) + .reduce(|acc, e| { + Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned()) + }); + if let Some(merged) = merged + && field.data_type() != merged.as_ref() + { + let merged = + json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned(); + + let mut new = field.as_ref().clone(); + new.set_data_type(merged); + *field = Arc::new(new); + } + } + Arc::new(Schema::new_with_metadata(fields, base.metadata().clone())) } impl IterBuilder for MemtableRanges { diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d6b061e468..6c7adfa388 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -87,6 +87,7 @@ impl FlatWriteFormat { } /// Gets the arrow schema to store in parquet. + #[cfg(test)] pub(crate) fn arrow_schema(&self) -> &SchemaRef { &self.arrow_schema } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 460b18f3a3..595d21fe68 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -31,6 +31,7 @@ use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::DataType; +use datatypes::schema::ext::ArrowSchemaExt; use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; @@ -412,7 +413,11 @@ impl ParquetReaderBuilder { let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied()); // Computes the field levels. - let hint = Some(read_format.arrow_schema().fields()); + let hint = if read_format.arrow_schema().has_json_extension_field() { + None + } else { + Some(read_format.arrow_schema().fields()) + }; let field_levels = parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadDataPartSnafu)?; diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 568b54415c..4e9d9a53c0 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -446,7 +446,7 @@ where let arrow_batch = flat_format.convert_batch(&record_batch)?; let start = Instant::now(); - self.maybe_init_writer(flat_format.arrow_schema(), opts) + self.maybe_init_writer(arrow_batch.schema_ref(), opts) .await? .write(&arrow_batch) .await diff --git a/tests/cases/standalone/common/types/json/json2.result b/tests/cases/standalone/common/types/json/json2.result index dd4e64ed2e..f0644cbcab 100644 --- a/tests/cases/standalone/common/types/json/json2.result +++ b/tests/cases/standalone/common/types/json/json2.result @@ -11,10 +11,22 @@ Affected Rows: 0 insert into json2_table (ts, j) values (1, '{"a": {"b": 1}, "c": "s1"}'), - (2, '{"a": {"b": 2}, "c": "s2"}'), - (3, '{"a": {"b": 3}, "c": "s3"}'); + (2, '{"a": {"b": 2}, "c": "s2"}'); -Affected Rows: 3 +Affected Rows: 2 + +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + +insert into json2_table (ts, j) +values (3, '{"a": {"b": 3}, "c": "s3"}'); + +Affected Rows: 1 insert into json2_table values (4, '{"a": {"b": 4}}'), @@ -23,6 +35,14 @@ values (4, '{"a": {"b": 4}}'), Affected Rows: 3 +admin flush_table('json2_table'); + ++----------------------------------+ +| ADMIN flush_table('json2_table') | ++----------------------------------+ +| 0 | ++----------------------------------+ + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1]}'), (8, '{"a": {"b": 8}, "c": "s8"}'); diff --git a/tests/cases/standalone/common/types/json/json2.sql b/tests/cases/standalone/common/types/json/json2.sql index 32db0311b3..a6ea4eb32d 100644 --- a/tests/cases/standalone/common/types/json/json2.sql +++ b/tests/cases/standalone/common/types/json/json2.sql @@ -9,14 +9,20 @@ create table json2_table insert into json2_table (ts, j) values (1, '{"a": {"b": 1}, "c": "s1"}'), - (2, '{"a": {"b": 2}, "c": "s2"}'), - (3, '{"a": {"b": 3}, "c": "s3"}'); + (2, '{"a": {"b": 2}, "c": "s2"}'); + +admin flush_table('json2_table'); + +insert into json2_table (ts, j) +values (3, '{"a": {"b": 3}, "c": "s3"}'); insert into json2_table values (4, '{"a": {"b": 4}}'), (5, '{"a": {}, "c": "s5"}'), (6, '{"c": "s6"}'); +admin flush_table('json2_table'); + insert into json2_table values (7, '{"a": {"b": "s7"}, "c": [1]}'), (8, '{"a": {"b": 8}, "c": "s8"}');