feat: compact json2 data

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-19 16:57:40 +08:00
parent 72434ee5d7
commit ada53d143c
26 changed files with 426 additions and 351 deletions

View File

@@ -22,6 +22,7 @@ use common_time::timestamp::TimeUnit;
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant};
use datatypes::prelude::{ConcreteDataType, ValueRef};
use datatypes::types::json_type::JsonNativeType;
use datatypes::types::{
IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType,
};
@@ -127,7 +128,9 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
datatype: type_ext.datatype(),
datatype_ext: type_ext.datatype_extension.clone().map(|d| *d),
};
ConcreteDataType::json_native_datatype(inner_type.into())
ConcreteDataType::json2(JsonNativeType::from(&ConcreteDataType::from(
inner_type,
)))
}
None => ConcreteDataType::Json(JsonType::null()),
_ => {
@@ -448,7 +451,8 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
if native_type.is_null() {
None
} else {
let native_type = ConcreteDataType::from(native_type.as_ref());
let native_type =
ConcreteDataType::from_arrow_type(&native_type.as_arrow_type());
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(native_type)?.into_parts();
Some(ColumnDataTypeExtension {
@@ -1157,6 +1161,7 @@ mod tests {
use common_time::interval::IntervalUnit;
use datatypes::scalars::ScalarVector;
use datatypes::types::json_type::JsonObjectType;
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
use datatypes::value::{ListValue, StructValue};
use datatypes::vectors::{
@@ -1393,9 +1398,12 @@ mod tests {
.into()
);
assert_eq!(
ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
struct_type.clone()
)),
ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
("address".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::i64()),
("id".to_string(), JsonNativeType::i64()),
("name".to_string(), JsonNativeType::String),
]))),
ColumnDataTypeWrapper::new(
ColumnDataType::Json,
Some(ColumnDataTypeExtension {
@@ -1579,20 +1587,6 @@ mod tests {
]))).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })")
);
let struct_type = StructType::new(Arc::new(vec![
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
StructField::new(
"address".to_string(),
ConcreteDataType::string_datatype(),
true,
),
]));
assert_eq!(
ColumnDataTypeWrapper::new(
ColumnDataType::Json,
@@ -1636,9 +1630,12 @@ mod tests {
})))
})
),
ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
struct_type.clone()
))
ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
("address".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::i64()),
("id".to_string(), JsonNativeType::i64()),
("name".to_string(), JsonNativeType::String),
])))
.try_into()
.expect("failed to convert json type")
);

View File

@@ -355,7 +355,7 @@ pub fn merge_record_batches(schema: SchemaRef, batches: &[RecordBatch]) -> Resul
Ok(RecordBatch::from_df_record_batch(schema, record_batch))
}
fn maybe_align_json_array_with_schema(
pub fn maybe_align_json_array_with_schema(
schema: &ArrowSchemaRef,
arrays: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>> {

View File

@@ -28,6 +28,7 @@ use serde::{Deserialize, Serialize};
use crate::error::{self, Error, Result};
use crate::type_id::LogicalTypeId;
use crate::types::json_type::JsonNativeType;
use crate::types::{
BinaryType, BooleanType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType,
DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type,
@@ -475,7 +476,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
ArrowDataType::Decimal128(precision, scale) => {
ConcreteDataType::decimal128_datatype(*precision, *scale)
}
ArrowDataType::Struct(fields) => ConcreteDataType::Struct(fields.try_into()?),
ArrowDataType::Struct(fields) => ConcreteDataType::Struct(StructType::from(fields)),
ArrowDataType::Float16
| ArrowDataType::Date64
| ArrowDataType::FixedSizeBinary(_)
@@ -683,8 +684,8 @@ impl ConcreteDataType {
Self::vector_datatype(0)
}
pub fn json_native_datatype(inner_type: ConcreteDataType) -> ConcreteDataType {
ConcreteDataType::Json(JsonType::new_json2((&inner_type).into()))
pub fn json2(native_type: JsonNativeType) -> ConcreteDataType {
ConcreteDataType::Json(JsonType::new_json2(native_type))
}
}

View File

@@ -394,22 +394,25 @@ impl JsonValue {
},
JsonVariant::String(x) => Value::String(x.into()),
JsonVariant::Array(array) => {
let item_type = if let Some(first) = array.first() {
first.native_type()
} else {
JsonNativeType::Null
};
Value::List(ListValue::new(
array.into_iter().map(helper).collect(),
Arc::new((&item_type).into()),
))
let values = array.into_iter().map(helper).collect::<Vec<_>>();
debug_assert!(
values
.windows(2)
.all(|w| w[0].data_type() == w[1].data_type())
);
let item_type = values
.first()
.map(|x| x.data_type())
.unwrap_or_else(ConcreteDataType::null_datatype);
Value::List(ListValue::new(values, Arc::new(item_type)))
}
JsonVariant::Object(object) => {
let mut fields = Vec::with_capacity(object.len());
let mut items = Vec::with_capacity(object.len());
for (k, v) in object {
fields.push(StructField::new(k, (&v.native_type()).into(), true));
items.push(helper(v));
let v = helper(v);
fields.push(StructField::new(k, v.data_type(), true));
items.push(v);
}
Value::Struct(StructValue::new(items, StructType::new(Arc::new(fields))))
}

View File

@@ -18,6 +18,7 @@ use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_schema::{Field, Fields};
use common_base::bytes::Bytes;
use regex::{Captures, Regex};
use serde::{Deserialize, Serialize};
@@ -32,7 +33,7 @@ use crate::error::{
use crate::prelude::ConcreteDataType;
use crate::scalars::ScalarVectorBuilder;
use crate::type_id::LogicalTypeId;
use crate::types::{ListType, StructField, StructType};
use crate::types::{StructField, StructType};
use crate::value::Value;
use crate::vectors::json::builder::JsonVectorBuilder;
use crate::vectors::{BinaryVectorBuilder, MutableVector};
@@ -117,30 +118,28 @@ impl JsonNativeType {
_ => JsonNativeType::Variant,
};
}
}
impl From<&JsonNativeType> for ConcreteDataType {
fn from(value: &JsonNativeType) -> Self {
match value {
JsonNativeType::Null => ConcreteDataType::null_datatype(),
JsonNativeType::Bool => ConcreteDataType::boolean_datatype(),
JsonNativeType::Number(JsonNumberType::U64) => ConcreteDataType::uint64_datatype(),
JsonNativeType::Number(JsonNumberType::I64) => ConcreteDataType::int64_datatype(),
JsonNativeType::Number(JsonNumberType::F64) => ConcreteDataType::float64_datatype(),
JsonNativeType::String => ConcreteDataType::string_datatype(),
JsonNativeType::Array(item_type) => {
ConcreteDataType::List(ListType::new(Arc::new(item_type.as_ref().into())))
pub fn as_arrow_type(&self) -> ArrowDataType {
match self {
JsonNativeType::Null => ArrowDataType::Null,
JsonNativeType::Bool => ArrowDataType::Boolean,
JsonNativeType::Number(n) => match n {
JsonNumberType::U64 => ArrowDataType::UInt64,
JsonNumberType::I64 => ArrowDataType::Int64,
JsonNumberType::F64 => ArrowDataType::Float64,
},
JsonNativeType::String => ArrowDataType::Utf8,
JsonNativeType::Array(array) => {
ArrowDataType::List(Arc::new(Field::new("item", array.as_arrow_type(), true)))
}
JsonNativeType::Object(object) => {
let fields = object
.iter()
.map(|(type_name, field_type)| {
StructField::new(type_name.clone(), field_type.into(), true)
})
.collect();
ConcreteDataType::Struct(StructType::new(Arc::new(fields)))
.map(|(k, v)| Arc::new(Field::new(k, v.as_arrow_type(), true)))
.collect::<Vec<_>>();
ArrowDataType::Struct(Fields::from(fields))
}
JsonNativeType::Variant => ConcreteDataType::binary_datatype(),
JsonNativeType::Variant => ArrowDataType::Binary,
}
}
}
@@ -304,9 +303,10 @@ impl JsonType {
pub(crate) fn as_struct_type(&self) -> StructType {
match &self.format {
JsonFormat::Jsonb => StructType::default(),
JsonFormat::Json2(inner) => match ConcreteDataType::from(inner.as_ref()) {
ConcreteDataType::Struct(t) => t.clone(),
x => plain_json_struct_type(x),
JsonFormat::Json2(native_type) => match native_type.as_arrow_type() {
// TODO(LFC): Direct use Arrow's Struct datatype here.
ArrowDataType::Struct(fields) => StructType::from(&fields),
data_type => plain_json_struct_type(&data_type),
},
}
}
@@ -370,8 +370,12 @@ fn is_include(this: &JsonNativeType, that: &JsonNativeType) -> bool {
/// A special struct type for denoting "plain"(not object) json value. It has only one field, with
/// fixed name [JSON_PLAIN_FIELD_NAME] and with metadata [JSON_PLAIN_FIELD_METADATA_KEY] = `"true"`.
pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType {
let mut field = StructField::new(JSON_PLAIN_FIELD_NAME.to_string(), item_type, true);
fn plain_json_struct_type(data_type: &ArrowDataType) -> StructType {
let mut field = StructField::new(
JSON_PLAIN_FIELD_NAME.to_string(),
ConcreteDataType::from_arrow_type(data_type),
true,
);
field.insert_metadata(JSON_PLAIN_FIELD_METADATA_KEY, true);
StructType::new(Arc::new(vec![field]))
}

View File

@@ -28,22 +28,21 @@ pub struct StructType {
fields: Arc<Vec<StructField>>,
}
impl TryFrom<&Fields> for StructType {
type Error = crate::error::Error;
fn try_from(value: &Fields) -> Result<Self, Self::Error> {
impl From<&Fields> for StructType {
fn from(value: &Fields) -> Self {
let fields = value
.iter()
.map(|field| {
Ok(StructField::new(
StructField::new(
field.name().clone(),
ConcreteDataType::try_from(field.data_type())?,
ConcreteDataType::from_arrow_type(field.data_type()),
field.is_nullable(),
))
)
})
.collect::<Result<Vec<StructField>, Self::Error>>()?;
Ok(StructType {
.collect::<Vec<_>>();
StructType {
fields: Arc::new(fields),
})
}
}
}

View File

@@ -1205,7 +1205,7 @@ impl TryFrom<ScalarValue> for Value {
.map(|v| Value::Decimal128(Decimal128::new(v, p, s)))
.unwrap_or(Value::Null),
ScalarValue::Struct(struct_array) => {
let struct_type: StructType = (struct_array.fields()).try_into()?;
let struct_type = StructType::from(struct_array.fields());
let items = struct_array
.columns()
.iter()
@@ -1741,6 +1741,7 @@ pub(crate) mod tests {
use super::*;
use crate::json::value::{JsonVariant, JsonVariantRef};
use crate::types::StructField;
use crate::types::json_type::{JsonNativeType, JsonObjectType};
use crate::vectors::ListVectorBuilder;
pub(crate) fn build_struct_type() -> StructType {
@@ -1791,10 +1792,6 @@ pub(crate) mod tests {
ScalarValue::Struct(Arc::new(struct_arrow_array))
}
pub fn build_list_type() -> ConcreteDataType {
ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::boolean_datatype()))
}
pub(crate) fn build_list_value() -> ListValue {
let items = vec![Value::Boolean(true), Value::Boolean(false)];
ListValue::new(items, Arc::new(ConcreteDataType::boolean_datatype()))
@@ -2274,39 +2271,26 @@ pub(crate) mod tests {
);
check_type_and_value(
&ConcreteDataType::json_native_datatype(ConcreteDataType::boolean_datatype()),
&ConcreteDataType::json2(JsonNativeType::Bool),
&Value::Json(Box::new(true.into())),
);
check_type_and_value(
&ConcreteDataType::json_native_datatype(build_list_type()),
&ConcreteDataType::json2(JsonNativeType::Array(Box::new(JsonNativeType::Bool))),
&Value::Json(Box::new([true].into())),
);
check_type_and_value(
&ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
StructType::new(Arc::new(vec![
StructField::new(
"address".to_string(),
ConcreteDataType::string_datatype(),
true,
),
StructField::new("age".to_string(), ConcreteDataType::uint64_datatype(), true),
StructField::new(
"awards".to_string(),
ConcreteDataType::list_datatype(Arc::new(
ConcreteDataType::boolean_datatype(),
)),
true,
),
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
StructField::new(
"name".to_string(),
ConcreteDataType::string_datatype(),
true,
),
])),
)),
&ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
("address".to_string(), JsonNativeType::String),
("age".to_string(), JsonNativeType::u64()),
(
"awards".to_string(),
JsonNativeType::Array(Box::new(JsonNativeType::Bool)),
),
("id".to_string(), JsonNativeType::i64()),
("name".to_string(), JsonNativeType::String),
]))),
&Value::Json(Box::new(
[
("id", JsonVariant::from(1i64)),

View File

@@ -240,7 +240,7 @@ impl Helper {
ConstantVector::new(Arc::new(vector), length)
}
ScalarValue::Struct(v) => {
let struct_type = StructType::try_from(v.fields())?;
let struct_type = StructType::from(v.fields());
ConstantVector::new(
Arc::new(StructVector::try_new(struct_type, (*v).clone())?),
length,
@@ -401,7 +401,7 @@ impl Helper {
.downcast_ref::<StructArray>()
.unwrap();
Arc::new(StructVector::try_new(
StructType::try_from(fields)?,
StructType::from(fields),
array.clone(),
)?)
}

View File

@@ -220,7 +220,7 @@ impl TryFrom<StructArray> for StructVector {
fn try_from(array: StructArray) -> Result<Self> {
let fields = match array.data_type() {
ArrowDataType::Struct(fields) => StructType::try_from(fields)?,
ArrowDataType::Struct(fields) => StructType::from(fields),
other => ConversionSnafu {
from: other.to_string(),
}

View File

@@ -182,6 +182,7 @@ fn bulk_part_converter(c: &mut Criterion) {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: false,
..Default::default()
},
);
let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false);
@@ -208,6 +209,7 @@ fn bulk_part_converter(c: &mut Criterion) {
&FlatSchemaOptions {
raw_pk_columns: true,
string_pk_use_dict: true,
..Default::default()
},
);
let mut converter =

View File

@@ -29,6 +29,7 @@ use std::time::Instant;
use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use arrow_schema::Schema;
use common_base::Plugins;
use common_base::cancellation::CancellationHandle;
use common_memory_manager::OnExhaustedPolicy;
@@ -39,6 +40,11 @@ use common_time::timestamp::TimeUnit;
use common_time::{TimeToLive, Timestamp};
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::metadata::PageIndexPolicy;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -54,13 +60,15 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::config::MitoConfig;
use crate::error::{
CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu,
RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::FlatSource;
use crate::read::flat_projection::FlatProjectionMapper;
use crate::read::read_columns::ReadColumns;
use crate::read::scan_region::{PredicateGroup, ScanInput};
use crate::read::seq_scan::SeqScan;
use crate::region::options::{MergeMode, RegionOptions};
@@ -72,6 +80,7 @@ use crate::schedule::remote_job_scheduler::{
};
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::{FileHandle, FileMeta, Level};
use crate::sst::parquet::reader::MetadataCacheMetrics;
use crate::sst::version::LevelMeta;
use crate::worker::WorkerListener;
@@ -996,19 +1005,56 @@ impl CompactionSstReaderBuilder<'_> {
/// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
/// for compaction. The schema of the [FlatSource] is unified.
async fn build_flat_sst_reader(self) -> Result<FlatSource> {
let scan_input = self.build_scan_input()?.with_compaction(true);
let scan_input = self.build_scan_input().await?.with_compaction(true);
let schema = scan_input.mapper.output_schema();
let schema = schema.arrow_schema();
SeqScan::new(scan_input)
let stream = SeqScan::new(scan_input)
.build_flat_reader_for_compaction()
.await
.map(|stream| FlatSource::new_stream(schema.clone(), stream))
.await?;
Ok(FlatSource::new_stream(schema.clone(), stream))
}
fn build_scan_input(self) -> Result<ScanInput> {
let mapper = FlatProjectionMapper::all(&self.metadata)?;
async fn build_scan_input(self) -> Result<ScanInput> {
let schema = self.metadata.schema.arrow_schema();
let json_type_hint = if schema.has_json_extension_field() {
let mut json_type_hint = schema
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.map(|field| (field.name().clone(), JsonNativeType::Null))
.collect::<HashMap<_, _>>();
let schemas = self.collect_arrow_schemas_from_parquet().await?;
for schema in schemas {
for field in schema.fields() {
let Some(merged) = json_type_hint.get_mut(field.name()) else {
continue;
};
let json_type = JsonNativeType::try_from(field.data_type())
.context(DataTypeMismatchSnafu)?;
merged.merge(&json_type);
}
}
Some(json_type_hint)
} else {
None
};
let projection = (0..self.metadata.column_metadatas.len()).collect();
let read_columns = ReadColumns::from_deduped_column_ids(
self.metadata.column_metadatas.iter().map(|x| x.column_id),
);
let mapper = FlatProjectionMapper::new_with_read_columns(
&self.metadata,
projection,
read_columns,
json_type_hint.as_ref(),
)?;
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
@@ -1028,6 +1074,43 @@ impl CompactionSstReaderBuilder<'_> {
Ok(scan_input)
}
async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
let mut schemas = Vec::with_capacity(self.inputs.len());
for file_handle in self.inputs {
let file_path =
file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
let file_size = file_handle.meta_ref().file_size;
let parquet_metadata = match self
.sst_layer
.read_sst(file_handle.clone())
.cache(CacheStrategy::Compaction(self.cache.clone()))
.read_parquet_metadata(
&file_path,
file_size,
&mut MetadataCacheMetrics::default(),
PageIndexPolicy::default(),
)
.await
.map(|x| x.0.parquet_metadata())
{
Ok(x) => x,
Err(e) if e.is_object_not_found() => continue,
Err(e) => return Err(e),
};
let file_metadata = parquet_metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.context(ParquetToArrowSchemaSnafu { file: file_path })?;
schemas.push(schema);
}
Ok(schemas)
}
}
/// Converts time range to predicates so that rows outside the range will be filtered.

View File

@@ -1241,6 +1241,15 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to generate Arrow schema from Parquet file: {}", file))]
ParquetToArrowSchema {
file: String,
#[snafu(source)]
error: parquet::errors::ParquetError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1331,7 +1340,8 @@ impl ErrorExt for Error {
| BuildEntry { .. }
| Metadata { .. }
| CastColumn { .. }
| MitoManifestInfo { .. } => StatusCode::Internal,
| MitoManifestInfo { .. }
| ParquetToArrowSchema { .. } => StatusCode::Internal,
FetchManifests { source, .. } => source.status_code(),

View File

@@ -2115,6 +2115,7 @@ mod tests {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
..Default::default()
},
);
@@ -2552,6 +2553,7 @@ mod tests {
&FlatSchemaOptions {
raw_pk_columns: false,
string_pk_use_dict: true,
..Default::default()
},
);

View File

@@ -1163,8 +1163,7 @@ impl FlatSource {
}
}
#[expect(unused)]
fn schema(&self) -> &SchemaRef {
pub(crate) fn schema(&self) -> &SchemaRef {
&self.schema
}

View File

@@ -25,7 +25,9 @@ use datatypes::arrow::compute::{TakeOptions, take};
use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::DataType;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use datatypes::vectors::json::array::JsonArray;
@@ -43,15 +45,19 @@ use crate::error::{
EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
};
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, PrimaryKeyArray};
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
/// Returns true if `left` and `right` have same columns and primary key encoding.
/// Returns true if the columns in the `projection_mapper` and `read_format` have same data types
/// and primary key encodings.
pub(crate) fn has_same_columns_and_pk_encoding(
left: &RegionMetadata,
right: &RegionMetadata,
projection_mapper: &FlatProjectionMapper,
read_format: &FlatReadFormat,
compaction: bool,
) -> bool {
let left = projection_mapper.metadata();
let right = read_format.metadata();
if left.primary_key_encoding != right.primary_key_encoding {
return false;
}
@@ -61,17 +67,13 @@ pub(crate) fn has_same_columns_and_pk_encoding(
}
for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
if left_col.column_id != right_col.column_id {
return false;
}
debug_assert_eq!(
left_col.column_schema.data_type,
right_col.column_schema.data_type
);
debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
}
true
&projection_mapper.input_arrow_schema(compaction) == read_format.arrow_schema()
}
/// A helper struct to adapt schema of the batch to an expected schema.
@@ -88,16 +90,28 @@ impl FlatCompatBatch {
/// Creates a [FlatCompatBatch].
///
/// - `mapper` is built from the metadata users expect to see.
/// - `actual` is the [RegionMetadata] of the input parquet.
/// - `format_projection` is the projection of the read format for the input parquet.
/// - `read_format` is the [FlatReadFormat] of the input parquet.
/// - `compaction` indicates whether the reader is for compaction.
pub(crate) fn try_new(
mapper: &FlatProjectionMapper,
actual: &RegionMetadataRef,
format_projection: &FormatProjection,
read_format: &FlatReadFormat,
compaction: bool,
) -> Result<Option<Self>> {
let actual_schema = flat_projected_columns(actual, format_projection);
let actual = read_format.metadata();
let format_projection = read_format.format_projection();
let mut actual_schema = flat_projected_columns(actual, format_projection);
if read_format.arrow_schema().has_json_extension_field() {
for field in read_format.arrow_schema().fields() {
if is_json_extension_type(field)
&& let Some(column_id) =
actual.column_by_name(field.name()).map(|x| x.column_id)
&& let Some(i) = actual_schema.iter().position(|x| x.0 == column_id)
{
actual_schema[i].1 = ConcreteDataType::from_arrow_type(field.data_type());
}
}
}
let expect_schema = mapper.batch_schema();
if expect_schema == actual_schema {
// Although the SST has a different schema, but the schema after projection is the same
@@ -152,10 +166,12 @@ impl FlatCompatBatch {
expect_column.column_id,
)));
} else {
fields.push(Arc::new(with_field_id(
(**column_field).clone(),
let field = with_field_id(
Arc::unwrap_or_clone(column_field.clone()),
expect_column.column_id,
)));
)
.with_data_type(expect_data_type.as_arrow_type());
fields.push(Arc::new(field))
};
if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
@@ -275,6 +291,9 @@ impl FlatCompatBatch {
)
.collect::<Result<Vec<_>>>()?;
common_telemetry::debug!("index_or_defaults: {:?}", self.index_or_defaults);
common_telemetry::debug!("arrow_schema: {:?}", self.arrow_schema);
common_telemetry::debug!("columns: {:?}", columns);
let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
.context(NewRecordBatchSnafu)?;
@@ -720,12 +739,10 @@ mod tests {
false,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
.unwrap()
.unwrap();
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
.unwrap()
.unwrap();
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag_builder.append_value("tag1");
@@ -804,6 +821,7 @@ mod tests {
&expected_metadata,
vec![1, 2],
ReadColumns::from_deduped_column_ids([1, 2, 3]),
None,
)
.unwrap();
let read_format = FlatReadFormat::new(
@@ -814,12 +832,10 @@ mod tests {
false,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
.unwrap()
.unwrap();
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
.unwrap()
.unwrap();
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
tag_builder.append_value("tag1");
@@ -905,12 +921,10 @@ mod tests {
false,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
.unwrap()
.unwrap();
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
.unwrap()
.unwrap();
// Tag array.
let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
@@ -999,12 +1013,10 @@ mod tests {
true,
)
.unwrap();
let format_projection = read_format.format_projection();
let compat_batch =
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
.unwrap()
.unwrap();
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, true)
.unwrap()
.unwrap();
let sparse_k1 = encode_sparse_key(&[]);
let input_columns: Vec<ArrayRef> = vec![

View File

@@ -14,6 +14,7 @@
//! Utilities for projection on flat format.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
@@ -27,6 +28,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::extension::json::is_json_extension_type;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::types::json_type::JsonNativeType;
use datatypes::value::Value;
use datatypes::vectors::Helper;
use datatypes::vectors::json::array::JsonArray;
@@ -84,7 +86,7 @@ impl FlatProjectionMapper {
let projection: Vec<_> = projection.into_iter().collect();
let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids);
Self::new_with_read_columns(metadata, projection, read_cols)
Self::new_with_read_columns(metadata, projection, read_cols, None)
}
/// Returns a new mapper with output projection and explicit read columns.
@@ -92,6 +94,7 @@ impl FlatProjectionMapper {
metadata: &RegionMetadataRef,
projection: Vec<usize>,
read_cols: ReadColumns,
json_type_hint: Option<&HashMap<String, JsonNativeType>>,
) -> Result<Self> {
// If the original projection is empty.
let is_empty_projection = projection.is_empty();
@@ -109,7 +112,16 @@ impl FlatProjectionMapper {
reason: format!("projection index {} is out of bound", idx),
})?;
output_col_ids.push(col.column_id);
col_schemas.push(col.column_schema.clone());
let mut schema = col.column_schema.clone();
if let Some(concretized) = json_type_hint
.and_then(|x| x.get(&schema.name))
.cloned()
.map(ConcreteDataType::json2)
{
schema.data_type = concretized;
}
col_schemas.push(schema);
}
// Creates a map to lookup index.
@@ -123,7 +135,21 @@ impl FlatProjectionMapper {
read_cols.clone(),
);
let batch_schema = flat_projected_columns(metadata, &format_projection);
let mut batch_schema = flat_projected_columns(metadata, &format_projection);
if let Some(json_type_hint) = json_type_hint
&& !json_type_hint.is_empty()
{
for (column_id, data_type) in batch_schema.iter_mut() {
if let Some(concretized) = metadata
.column_by_id(*column_id)
.and_then(|x| json_type_hint.get(&x.column_schema.name).cloned())
.map(ConcreteDataType::json2)
{
*data_type = concretized;
}
}
}
// Safety: We get the column id from the metadata.
let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
@@ -228,10 +254,15 @@ impl FlatProjectionMapper {
self.input_arrow_schema.clone()
} else {
// For compaction, we need to build a different schema from encoding.
to_flat_sst_arrow_schema(
&self.metadata,
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
options.concretized_json_types = self
.input_arrow_schema
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.map(|field| (field.name().clone(), field.data_type().clone()))
.collect();
to_flat_sst_arrow_schema(&self.metadata, &options)
}
}
@@ -242,10 +273,6 @@ impl FlatProjectionMapper {
self.output_schema.clone()
}
pub(crate) fn with_output_schema(&mut self, schema: SchemaRef) {
self.output_schema = schema;
}
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
@@ -407,13 +434,14 @@ pub(crate) fn compute_input_arrow_schema(
batch_schema: &[(ColumnId, ConcreteDataType)],
) -> datatypes::arrow::datatypes::SchemaRef {
let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
for (column_id, _) in batch_schema {
for (column_id, data_type) in batch_schema {
let column_metadata = metadata.column_by_id(*column_id).unwrap();
let field = Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
);
)
.with_metadata(column_metadata.column_schema.metadata().clone());
let field = with_field_id(field, *column_id);
if column_metadata.semantic_type == SemanticType::Tag {
new_fields.push(tag_maybe_to_dictionary_field(
@@ -454,7 +482,8 @@ impl CompactionProjectionMapper {
let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id);
let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids);
let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?;
let mapper =
FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols, None)?;
let assembler = DfBatchAssembler::new(mapper.output_schema());
Ok(Self { mapper, assembler })

View File

@@ -296,6 +296,7 @@ mod tests {
&metadata,
vec![4, 1],
ReadColumns::from_deduped_column_ids([4, 1, 3]),
None,
)
.unwrap();
assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice());

View File

@@ -14,7 +14,7 @@
//! Scans a region according to the scan request.
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::fmt;
use std::num::NonZeroU64;
use std::sync::Arc;
@@ -31,11 +31,7 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_expr::utils::expr_to_columns;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::Schema;
use datatypes::schema::ext::ArrowSchemaExt;
use datatypes::types::json_type::JsonNativeType;
use futures::StreamExt;
use partition::expr::PartitionExpr;
use smallvec::SmallVec;
@@ -429,15 +425,24 @@ impl ScanRegion {
let read_col_ids = read_cols.column_ids();
// The mapper always computes projected column ids as the schema of SSTs may change.
let mut mapper = match self.request.projection_indices() {
Some(p) => FlatProjectionMapper::new_with_read_columns(
&self.version.metadata,
p.to_vec(),
read_cols,
)?,
None => FlatProjectionMapper::all(&self.version.metadata)?,
};
concretize_json_types(&mut mapper, &self.request.json_type_hint);
let projection = self
.request
.projection_indices()
.map(|x| x.to_vec())
.unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
let json_type_hint = self
.version
.metadata
.schema
.arrow_schema()
.has_json_extension_field()
.then_some(&self.request.json_type_hint);
let mapper = FlatProjectionMapper::new_with_read_columns(
&self.version.metadata,
projection,
read_cols,
json_type_hint,
)?;
let ssts = &self.version.ssts;
let mut files = Vec::new();
@@ -733,34 +738,6 @@ impl ScanRegion {
}
}
fn concretize_json_types(
mapper: &mut FlatProjectionMapper,
json_type_hint: &HashMap<String, JsonNativeType>,
) {
let output_schema = mapper.output_schema();
let output_arrow_schema = output_schema.arrow_schema();
if !output_arrow_schema.has_json_extension_field() {
return;
}
let mut column_schemas = output_schema.column_schemas().to_vec();
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
if !is_json_extension_type(&output_arrow_schema.fields()[idx]) {
continue;
}
let Some(json_type) = json_type_hint.get(&column_schema.name) else {
continue;
};
column_schema.data_type = ConcreteDataType::from(json_type);
}
let output_schema = Arc::new(Schema::new_with_version(
column_schemas,
output_schema.version(),
));
mapper.with_output_schema(output_schema);
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -1126,16 +1103,16 @@ impl ScanInput {
};
let need_compat = !compat::has_same_columns_and_pk_encoding(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
&self.mapper,
file_range_ctx.read_format(),
self.compaction,
);
if need_compat {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat = FlatCompatBatch::try_new(
&self.mapper,
file_range_ctx.read_format().metadata(),
file_range_ctx.read_format().format_projection(),
file_range_ctx.read_format(),
self.compaction,
)?;
file_range_ctx.set_compat_batch(compat);
@@ -1774,19 +1751,15 @@ impl PredicateGroup {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::physical_plan::expressions::lit as physical_lit;
use datafusion_common::ScalarValue;
use datafusion_expr::{col, lit};
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
use datatypes::schema::ColumnSchema;
use datatypes::types::json_type::JsonObjectType;
use datatypes::value::Value;
use partition::expr::col as partition_col;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
use store_api::metadata::RegionMetadataBuilder;
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
use super::*;
use crate::cache::CacheManager;
@@ -1818,113 +1791,6 @@ mod tests {
lit(ScalarValue::TimestampMillisecond(Some(val), None))
}
fn metadata_with_json_field() -> RegionMetadataRef {
let mut json_column = ColumnSchema::new(
"payload",
ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
true,
);
json_column
.with_extension_type(&JsonExtensionType::new(Arc::new(JsonMetadata::default())))
.unwrap();
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"host",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: json_column,
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 4,
})
.primary_key(vec![1]);
Arc::new(builder.build().unwrap())
}
fn concrete_json_type_hint() -> JsonNativeType {
JsonNativeType::Object(JsonObjectType::from([
("active".to_string(), JsonNativeType::Bool),
("name".to_string(), JsonNativeType::String),
]))
}
#[test]
fn test_concretize_json_types_rewrites_json_output_schema() -> Result<()> {
let metadata = metadata_with_json_field();
let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3])?;
let output_schema = mapper.output_schema();
let original_arrow_schema = output_schema.arrow_schema();
assert!(original_arrow_schema.has_json_extension_field());
assert!(is_json_extension_type(&original_arrow_schema.fields()[1]));
let expected_type = concrete_json_type_hint();
concretize_json_types(
&mut mapper,
&HashMap::from([("payload".to_string(), expected_type.clone())]),
);
let output_schema = mapper.output_schema();
let output_arrow_schema = output_schema.arrow_schema();
assert!(output_arrow_schema.has_json_extension_field());
assert_eq!(
ConcreteDataType::string_datatype(),
output_schema.column_schemas()[0].data_type
);
assert_eq!(
ConcreteDataType::from(&expected_type),
output_schema.column_schemas()[1].data_type
);
assert_eq!(
ConcreteDataType::int64_datatype(),
output_schema.column_schemas()[2].data_type
);
Ok(())
}
#[test]
fn test_concretize_json_types_keeps_json_without_hint() {
let metadata = metadata_with_json_field();
let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
concretize_json_types(
&mut mapper,
&HashMap::from([("missing".to_string(), JsonNativeType::String)]),
);
let output_schema = mapper.output_schema();
let output_arrow_schema = output_schema.arrow_schema();
assert!(output_arrow_schema.has_json_extension_field());
assert!(is_json_extension_type(&output_arrow_schema.fields()[1]));
// Assert that the expected JSON type stays un-concretized: empty object.
assert_eq!(
ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
output_schema.column_schemas()[1].data_type
);
}
#[tokio::test]
async fn test_build_scan_fingerprint_for_eligible_scan() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));

View File

@@ -130,7 +130,7 @@ impl SeqScan {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
assert!(self.stream_ctx.input.compaction);
let metrics_set = ExecutionPlanMetricsSet::new();

View File

@@ -14,9 +14,11 @@
//! Sorted strings tables.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use arrow_schema::DataType;
use common_base::readable_size::ReadableSize;
use datatypes::arrow::datatypes::{
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
@@ -108,6 +110,9 @@ pub struct FlatSchemaOptions {
/// when storing primary key columns.
/// Only takes effect when `raw_pk_columns` is true.
pub string_pk_use_dict: bool,
/// The column's concretized JSON types, to be set into Arrow schema.
/// Otherwise it's empty struct in the Arrow schema.
pub concretized_json_types: HashMap<String, DataType>,
}
impl Default for FlatSchemaOptions {
@@ -115,6 +120,7 @@ impl Default for FlatSchemaOptions {
Self {
raw_pk_columns: true,
string_pk_use_dict: true,
concretized_json_types: HashMap::new(),
}
}
}
@@ -128,6 +134,7 @@ impl FlatSchemaOptions {
Self {
raw_pk_columns: false,
string_pk_use_dict: false,
concretized_json_types: HashMap::new(),
}
}
}
@@ -159,6 +166,7 @@ pub fn to_flat_sst_arrow_schema(
&metadata.column_metadatas[pk_index].column_schema.data_type,
old_field,
);
let new_field = concretize_json_type(new_field, options);
fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
}
}
@@ -169,8 +177,9 @@ pub fn to_flat_sst_arrow_schema(
.zip(&metadata.column_metadatas)
.filter_map(|(field, column_meta)| {
if column_meta.semantic_type == SemanticType::Field {
let field = concretize_json_type(field.clone(), options);
Some(Arc::new(with_field_id(
(**field).clone(),
Arc::unwrap_or_clone(field),
column_meta.column_id,
)))
} else {
@@ -189,6 +198,16 @@ pub fn to_flat_sst_arrow_schema(
Arc::new(Schema::new(fields))
}
fn concretize_json_type(field: Arc<Field>, options: &FlatSchemaOptions) -> Arc<Field> {
if let Some(data_type) = options.concretized_json_types.get(field.name()) {
let mut field = Arc::unwrap_or_clone(field);
field.set_data_type(data_type.clone());
Arc::new(field)
} else {
field
}
}
/// Returns the number of columns in the flat format.
pub fn flat_sst_arrow_schema_column_num(
metadata: &RegionMetadata,

View File

@@ -41,7 +41,8 @@ use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::{ConcreteDataType, DataType};
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
use parquet::file::metadata::RowGroupMetaData;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::metadata::{FileMetaData, RowGroupMetaData};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
@@ -49,7 +50,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
use crate::error::{
ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
NewRecordBatchSnafu, Result,
NewRecordBatchSnafu, ParquetToArrowSchemaSnafu, Result,
};
use crate::read::read_columns::ReadColumns;
use crate::sst::parquet::format::{
@@ -165,10 +166,11 @@ impl FlatReadFormat {
pub fn new(
metadata: RegionMetadataRef,
read_cols: ReadColumns,
num_columns: Option<usize>,
file_metadata: Option<&FileMetaData>,
file_path: &str,
skip_auto_convert: bool,
) -> Result<FlatReadFormat> {
let num_columns = file_metadata.map(|x| x.schema_descr().num_columns());
let is_legacy = match num_columns {
Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
@@ -189,7 +191,13 @@ impl FlatReadFormat {
))
}
} else {
ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols))
let arrow_schema = file_metadata
.map(|x| parquet_to_arrow_schema(x.schema_descr(), x.key_value_metadata()))
.transpose()
.context(ParquetToArrowSchemaSnafu { file: file_path })?
.map(Arc::new)
.unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default()));
ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, arrow_schema))
};
Ok(FlatReadFormat {
@@ -248,9 +256,6 @@ impl FlatReadFormat {
}
/// Gets the arrow schema of the SST file.
///
/// This schema is computed from the region metadata but should be the same
/// as the arrow schema decoded from the file metadata.
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
match &self.parquet_adapter {
ParquetAdapter::Flat(p) => &p.arrow_schema,
@@ -483,10 +488,13 @@ struct ParquetFlat {
impl ParquetFlat {
/// Creates a helper with existing `metadata` and `column_ids` to read.
fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> ParquetFlat {
fn new(
metadata: RegionMetadataRef,
read_cols: ReadColumns,
arrow_schema: SchemaRef,
) -> ParquetFlat {
// Creates a map to lookup index.
let id_to_index = sst_column_id_indices(&metadata);
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let sst_column_num =
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
let format_projection =

View File

@@ -832,6 +832,9 @@ mod tests {
use mito_codec::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
};
use parquet::basic::Type as PhysicalType;
use parquet::file::metadata::FileMetaData;
use parquet::schema::types::{SchemaDescriptor, Type};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
@@ -890,6 +893,33 @@ mod tests {
Arc::new(builder.build().unwrap())
}
fn build_test_file_metadata(fields: Vec<&str>) -> FileMetaData {
FileMetaData::new(
0,
0,
None,
None,
Arc::new(SchemaDescriptor::new(Arc::new(
Type::group_type_builder("x")
.with_fields(
fields
.into_iter()
.map(|field| {
Arc::new(
Type::primitive_type_builder(field, PhysicalType::INT64)
.build()
.unwrap(),
)
})
.collect(),
)
.build()
.unwrap(),
))),
None,
)
}
fn build_test_arrow_schema() -> SchemaRef {
let fields = vec![
make_field("field1", ArrowDataType::Int64, true, Some(4)),
@@ -1323,7 +1353,9 @@ mod tests {
let mut format = FlatReadFormat::new(
metadata,
ReadColumns::from_deduped_column_ids(std::iter::once(1)), // Just read tag0
Some(8),
Some(&build_test_file_metadata(vec![
"tag0", "field0", "tag1", "field1", "ts", "tag2", "field2", "tag3",
])),
"test",
false,
)
@@ -1540,7 +1572,9 @@ mod tests {
let format = FlatReadFormat::new(
metadata.clone(),
ReadColumns::from_deduped_column_ids(column_ids),
Some(6),
Some(&build_test_file_metadata(vec![
"tag0", "field0", "tag1", "field1", "ts", "tag2",
])),
"test",
false,
)

View File

@@ -393,7 +393,7 @@ impl ParquetReaderBuilder {
let mut read_format = FlatReadFormat::new(
region_meta.clone(),
read_cols,
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
Some(parquet_meta.file_metadata()),
&file_path,
skip_auto_convert,
)?;
@@ -585,7 +585,7 @@ impl ParquetReaderBuilder {
/// Reads parquet metadata of specific file.
/// Returns (fused metadata, cache_miss_flag).
async fn read_parquet_metadata(
pub(crate) async fn read_parquet_metadata(
&self,
file_path: &str,
file_size: u64,

View File

@@ -14,6 +14,7 @@
//! Parquet writer.
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::Pin;
@@ -31,6 +32,8 @@ use datatypes::arrow::array::{
use datatypes::arrow::compute::{max, min};
use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::extension::json::is_json_extension_type;
use datatypes::schema::ext::ArrowSchemaExt;
use object_store::{FuturesAsyncWriter, ObjectStore};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
@@ -271,12 +274,21 @@ where
override_sequence: Option<SequenceNumber>,
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
if source.schema().has_json_extension_field() {
options.concretized_json_types = source
.schema()
.fields()
.iter()
.filter(|&field| is_json_extension_type(field))
.map(|field| (field.name().clone(), field.data_type().clone()))
.collect::<HashMap<_, _>>();
}
let converter = FlatBatchConverter::Flat(
FlatWriteFormat::new(
self.metadata.clone(),
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
)
.with_override_sequence(override_sequence),
FlatWriteFormat::new(self.metadata.clone(), &options)
.with_override_sequence(override_sequence),
);
let res = self.write_all_flat_inner(source, &converter, opts).await;
if res.is_err() {

View File

@@ -42,6 +42,14 @@ admin flush_table('json2_table');
| 0 |
+----------------------------------+
admin compact_table('json2_table', 'swcs', '86400');
+-----------------------------------------------------+
| ADMIN compact_table('json2_table', 'swcs', '86400') |
+-----------------------------------------------------+
| 0 |
+-----------------------------------------------------+
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');
@@ -131,10 +139,10 @@ select j.d from json2_table order by ts;
+-----------------------------------+
| json_get(json2_table.j,Utf8("d")) |
+-----------------------------------+
| [{e: {f: 0.1}}] |
| [{e: {f: 0.2}}] |
| [{e: {f: 0.1, g: }}] |
| [{e: {f: 0.2, g: }}] |
| |
| [{e: {g: -0.4}}] |
| [{e: {f: , g: -0.4}}] |
| |
| |
| [{e: {g: -0.7}}] |

View File

@@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
admin flush_table('json2_table');
admin compact_table('json2_table', 'swcs', '86400');
insert into json2_table
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
(8, '{"a": {"b": 8}, "c": "s8"}');