Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-03-06 17:54:19 +08:00
parent 95e567810f
commit d30521668e
9 changed files with 132 additions and 8 deletions

View File

@@ -14,6 +14,7 @@
mod column_schema;
pub mod constraint;
pub mod ext;
use std::collections::HashMap;
use std::fmt;

View File

@@ -0,0 +1,25 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::extension::json;
pub trait ArrowSchemaExt {
fn has_json_extension_field(&self) -> bool;
}
impl ArrowSchemaExt for arrow_schema::Schema {
fn has_json_extension_field(&self) -> bool {
self.fields().iter().any(json::is_json_extension_type)
}
}

View File

@@ -22,6 +22,7 @@ use std::time::Instant;
use common_telemetry::{debug, error, info};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::schema::ext::ArrowSchemaExt;
use either::Either;
use partition::expr::PartitionExpr;
use smallvec::{SmallVec, smallvec};
@@ -40,6 +41,7 @@ use crate::error::{
RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::memtable;
use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
use crate::memtable::{
BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
@@ -587,6 +589,7 @@ impl RegionFlushTask {
&version.metadata,
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
);
let batch_schema = maybe_merge_json_fields(batch_schema, &mem_ranges);
let flat_sources = memtable_flat_sources(
batch_schema,
mem_ranges,
@@ -762,6 +765,16 @@ struct FlatSources {
encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
}
fn maybe_merge_json_fields(base: SchemaRef, mem_ranges: &MemtableRanges) -> SchemaRef {
if !base.has_json_extension_field() {
return base;
}
let Some(schema) = mem_ranges.schema() else {
return base;
};
memtable::merge_json_extension_fields(&base, &[schema])
}
/// Returns the max sequence and [FlatSource] for the given memtable.
fn memtable_flat_sources(
schema: SchemaRef,

View File

@@ -14,6 +14,7 @@
//! Memtables are write buffers for regions.
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -59,7 +60,10 @@ pub use bulk::part::{
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
sort_primary_key_record_batch,
};
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::extension::json;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type;
#[cfg(any(test, feature = "test"))]
pub use time_partition::filter_record_batch;
@@ -226,6 +230,55 @@ impl MemtableRanges {
.max()
.unwrap_or(0)
}
pub(crate) fn schema(&self) -> Option<SchemaRef> {
let mut schemas = self
.ranges
.values()
.filter_map(|x| x.record_batch_schema())
.collect::<Vec<_>>();
if schemas.iter().all(|x| !x.has_json_extension_field()) {
// If there are no JSON extension fields in any schemas, the invariant must be hold,
// that all schemas are same (they are all derived from same region metadata).
// So it's ok to return the first one as the schema of the whole memtable ranges.
return (!schemas.is_empty()).then(|| schemas.swap_remove(0));
}
// If there are JSON extension fields, by convention, only their concrete data types
// (Arrow's Struct) may differ. Other things like the metadata or the fields count are same.
// So to produce the final schema, we can solely merge the data types.
schemas
.split_first()
.map(|(first, rest)| merge_json_extension_fields(first, rest))
}
}
pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef {
let mut fields = base.fields().iter().cloned().collect::<Vec<_>>();
for (i, field) in fields.iter_mut().enumerate() {
if !json::is_json_extension_type(field) {
continue;
}
let merged = others
.iter()
.map(|x| Cow::Borrowed(x.field(i).data_type()))
.reduce(|acc, e| {
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
});
if let Some(merged) = merged
&& field.data_type() != merged.as_ref()
{
let merged =
json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned();
let mut new = field.as_ref().clone();
new.set_data_type(merged);
*field = Arc::new(new);
}
}
Arc::new(Schema::new_with_metadata(fields, base.metadata().clone()))
}
impl IterBuilder for MemtableRanges {

View File

@@ -87,6 +87,7 @@ impl FlatWriteFormat {
}
/// Gets the arrow schema to store in parquet.
#[cfg(test)]
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
&self.arrow_schema
}

View File

@@ -31,6 +31,7 @@ use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use datatypes::schema::ext::ArrowSchemaExt;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
@@ -412,7 +413,11 @@ impl ParquetReaderBuilder {
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
// Computes the field levels.
let hint = Some(read_format.arrow_schema().fields());
let hint = if read_format.arrow_schema().has_json_extension_field() {
None
} else {
Some(read_format.arrow_schema().fields())
};
let field_levels =
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadDataPartSnafu)?;

View File

@@ -446,7 +446,7 @@ where
let arrow_batch = flat_format.convert_batch(&record_batch)?;
let start = Instant::now();
self.maybe_init_writer(flat_format.arrow_schema(), opts)
self.maybe_init_writer(arrow_batch.schema_ref(), opts)
.await?
.write(&arrow_batch)
.await

View File

@@ -11,10 +11,22 @@ Affected Rows: 0
insert into json2_table (ts, j)
values (1, '{"a": {"b": 1}, "c": "s1"}'),
(2, '{"a": {"b": 2}, "c": "s2"}'),
(3, '{"a": {"b": 3}, "c": "s3"}');
(2, '{"a": {"b": 2}, "c": "s2"}');
Affected Rows: 3
Affected Rows: 2
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
Affected Rows: 1
insert into json2_table
values (4, '{"a": {"b": 4}}'),
@@ -23,6 +35,14 @@ values (4, '{"a": {"b": 4}}'),
Affected Rows: 3
admin flush_table('json2_table');
+----------------------------------+
| ADMIN flush_table('json2_table') |
+----------------------------------+
| 0 |
+----------------------------------+
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');

View File

@@ -9,14 +9,20 @@ create table json2_table
insert into json2_table (ts, j)
values (1, '{"a": {"b": 1}, "c": "s1"}'),
(2, '{"a": {"b": 2}, "c": "s2"}'),
(3, '{"a": {"b": 3}, "c": "s3"}');
(2, '{"a": {"b": 2}, "c": "s2"}');
admin flush_table('json2_table');
insert into json2_table (ts, j)
values (3, '{"a": {"b": 3}, "c": "s3"}');
insert into json2_table
values (4, '{"a": {"b": 4}}'),
(5, '{"a": {}, "c": "s5"}'),
(6, '{"c": "s6"}');
admin flush_table('json2_table');
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');