mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 23:10:37 +00:00
@@ -22,6 +22,7 @@ use common_time::timestamp::TimeUnit;
|
||||
use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
|
||||
use datatypes::json::value::{JsonNumber, JsonValue, JsonValueRef, JsonVariant};
|
||||
use datatypes::prelude::{ConcreteDataType, ValueRef};
|
||||
use datatypes::types::json_type::JsonNativeType;
|
||||
use datatypes::types::{
|
||||
IntervalType, JsonFormat, JsonType, StructField, StructType, TimeType, TimestampType,
|
||||
};
|
||||
@@ -127,7 +128,9 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
|
||||
datatype: type_ext.datatype(),
|
||||
datatype_ext: type_ext.datatype_extension.clone().map(|d| *d),
|
||||
};
|
||||
ConcreteDataType::json_native_datatype(inner_type.into())
|
||||
ConcreteDataType::json2(JsonNativeType::from(&ConcreteDataType::from(
|
||||
inner_type,
|
||||
)))
|
||||
}
|
||||
None => ConcreteDataType::Json(JsonType::null()),
|
||||
_ => {
|
||||
@@ -448,7 +451,8 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
if native_type.is_null() {
|
||||
None
|
||||
} else {
|
||||
let native_type = ConcreteDataType::from(native_type.as_ref());
|
||||
let native_type =
|
||||
ConcreteDataType::from_arrow_type(&native_type.as_arrow_type());
|
||||
let (datatype, datatype_extension) =
|
||||
ColumnDataTypeWrapper::try_from(native_type)?.into_parts();
|
||||
Some(ColumnDataTypeExtension {
|
||||
@@ -1157,6 +1161,7 @@ mod tests {
|
||||
|
||||
use common_time::interval::IntervalUnit;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
use datatypes::types::json_type::JsonObjectType;
|
||||
use datatypes::types::{Int8Type, Int32Type, UInt8Type, UInt32Type};
|
||||
use datatypes::value::{ListValue, StructValue};
|
||||
use datatypes::vectors::{
|
||||
@@ -1393,9 +1398,12 @@ mod tests {
|
||||
.into()
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
|
||||
struct_type.clone()
|
||||
)),
|
||||
ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
|
||||
("address".to_string(), JsonNativeType::String),
|
||||
("age".to_string(), JsonNativeType::i64()),
|
||||
("id".to_string(), JsonNativeType::i64()),
|
||||
("name".to_string(), JsonNativeType::String),
|
||||
]))),
|
||||
ColumnDataTypeWrapper::new(
|
||||
ColumnDataType::Json,
|
||||
Some(ColumnDataTypeExtension {
|
||||
@@ -1579,20 +1587,6 @@ mod tests {
|
||||
]))).try_into().expect("Failed to create column datatype from Struct(StructType { fields: [StructField { name: \"a\", data_type: Int64(Int64Type) }, StructField { name: \"a.a\", data_type: List(ListType { item_type: String(StringType) }) }] })")
|
||||
);
|
||||
|
||||
let struct_type = StructType::new(Arc::new(vec![
|
||||
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
|
||||
StructField::new(
|
||||
"name".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
StructField::new("age".to_string(), ConcreteDataType::int32_datatype(), true),
|
||||
StructField::new(
|
||||
"address".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
]));
|
||||
assert_eq!(
|
||||
ColumnDataTypeWrapper::new(
|
||||
ColumnDataType::Json,
|
||||
@@ -1636,9 +1630,12 @@ mod tests {
|
||||
})))
|
||||
})
|
||||
),
|
||||
ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
|
||||
struct_type.clone()
|
||||
))
|
||||
ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
|
||||
("address".to_string(), JsonNativeType::String),
|
||||
("age".to_string(), JsonNativeType::i64()),
|
||||
("id".to_string(), JsonNativeType::i64()),
|
||||
("name".to_string(), JsonNativeType::String),
|
||||
])))
|
||||
.try_into()
|
||||
.expect("failed to convert json type")
|
||||
);
|
||||
|
||||
@@ -28,6 +28,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{self, Error, Result};
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::types::json_type::JsonNativeType;
|
||||
use crate::types::{
|
||||
BinaryType, BooleanType, DateType, Decimal128Type, DictionaryType, DurationMicrosecondType,
|
||||
DurationMillisecondType, DurationNanosecondType, DurationSecondType, DurationType, Float32Type,
|
||||
@@ -475,7 +476,7 @@ impl TryFrom<&ArrowDataType> for ConcreteDataType {
|
||||
ArrowDataType::Decimal128(precision, scale) => {
|
||||
ConcreteDataType::decimal128_datatype(*precision, *scale)
|
||||
}
|
||||
ArrowDataType::Struct(fields) => ConcreteDataType::Struct(fields.try_into()?),
|
||||
ArrowDataType::Struct(fields) => ConcreteDataType::Struct(StructType::from(fields)),
|
||||
ArrowDataType::Float16
|
||||
| ArrowDataType::Date64
|
||||
| ArrowDataType::FixedSizeBinary(_)
|
||||
@@ -683,8 +684,8 @@ impl ConcreteDataType {
|
||||
Self::vector_datatype(0)
|
||||
}
|
||||
|
||||
pub fn json_native_datatype(inner_type: ConcreteDataType) -> ConcreteDataType {
|
||||
ConcreteDataType::Json(JsonType::new_json2((&inner_type).into()))
|
||||
pub fn json2(native_type: JsonNativeType) -> ConcreteDataType {
|
||||
ConcreteDataType::Json(JsonType::new_json2(native_type))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -394,22 +394,25 @@ impl JsonValue {
|
||||
},
|
||||
JsonVariant::String(x) => Value::String(x.into()),
|
||||
JsonVariant::Array(array) => {
|
||||
let item_type = if let Some(first) = array.first() {
|
||||
first.native_type()
|
||||
} else {
|
||||
JsonNativeType::Null
|
||||
};
|
||||
Value::List(ListValue::new(
|
||||
array.into_iter().map(helper).collect(),
|
||||
Arc::new((&item_type).into()),
|
||||
))
|
||||
let values = array.into_iter().map(helper).collect::<Vec<_>>();
|
||||
debug_assert!(
|
||||
values
|
||||
.windows(2)
|
||||
.all(|w| w[0].data_type() == w[1].data_type())
|
||||
);
|
||||
let item_type = values
|
||||
.first()
|
||||
.map(|x| x.data_type())
|
||||
.unwrap_or_else(ConcreteDataType::null_datatype);
|
||||
Value::List(ListValue::new(values, Arc::new(item_type)))
|
||||
}
|
||||
JsonVariant::Object(object) => {
|
||||
let mut fields = Vec::with_capacity(object.len());
|
||||
let mut items = Vec::with_capacity(object.len());
|
||||
for (k, v) in object {
|
||||
fields.push(StructField::new(k, (&v.native_type()).into(), true));
|
||||
items.push(helper(v));
|
||||
let v = helper(v);
|
||||
fields.push(StructField::new(k, v.data_type(), true));
|
||||
items.push(v);
|
||||
}
|
||||
Value::Struct(StructValue::new(items, StructType::new(Arc::new(fields))))
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::str::FromStr;
|
||||
use std::sync::{Arc, LazyLock};
|
||||
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use arrow_schema::{Field, Fields};
|
||||
use common_base::bytes::Bytes;
|
||||
use regex::{Captures, Regex};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -32,7 +33,7 @@ use crate::error::{
|
||||
use crate::prelude::ConcreteDataType;
|
||||
use crate::scalars::ScalarVectorBuilder;
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::types::{ListType, StructField, StructType};
|
||||
use crate::types::{StructField, StructType};
|
||||
use crate::value::Value;
|
||||
use crate::vectors::json::builder::JsonVectorBuilder;
|
||||
use crate::vectors::{BinaryVectorBuilder, MutableVector};
|
||||
@@ -117,30 +118,28 @@ impl JsonNativeType {
|
||||
_ => JsonNativeType::Variant,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&JsonNativeType> for ConcreteDataType {
|
||||
fn from(value: &JsonNativeType) -> Self {
|
||||
match value {
|
||||
JsonNativeType::Null => ConcreteDataType::null_datatype(),
|
||||
JsonNativeType::Bool => ConcreteDataType::boolean_datatype(),
|
||||
JsonNativeType::Number(JsonNumberType::U64) => ConcreteDataType::uint64_datatype(),
|
||||
JsonNativeType::Number(JsonNumberType::I64) => ConcreteDataType::int64_datatype(),
|
||||
JsonNativeType::Number(JsonNumberType::F64) => ConcreteDataType::float64_datatype(),
|
||||
JsonNativeType::String => ConcreteDataType::string_datatype(),
|
||||
JsonNativeType::Array(item_type) => {
|
||||
ConcreteDataType::List(ListType::new(Arc::new(item_type.as_ref().into())))
|
||||
pub fn as_arrow_type(&self) -> ArrowDataType {
|
||||
match self {
|
||||
JsonNativeType::Null => ArrowDataType::Null,
|
||||
JsonNativeType::Bool => ArrowDataType::Boolean,
|
||||
JsonNativeType::Number(n) => match n {
|
||||
JsonNumberType::U64 => ArrowDataType::UInt64,
|
||||
JsonNumberType::I64 => ArrowDataType::Int64,
|
||||
JsonNumberType::F64 => ArrowDataType::Float64,
|
||||
},
|
||||
JsonNativeType::String => ArrowDataType::Utf8,
|
||||
JsonNativeType::Array(array) => {
|
||||
ArrowDataType::List(Arc::new(Field::new("item", array.as_arrow_type(), true)))
|
||||
}
|
||||
JsonNativeType::Object(object) => {
|
||||
let fields = object
|
||||
.iter()
|
||||
.map(|(type_name, field_type)| {
|
||||
StructField::new(type_name.clone(), field_type.into(), true)
|
||||
})
|
||||
.collect();
|
||||
ConcreteDataType::Struct(StructType::new(Arc::new(fields)))
|
||||
.map(|(k, v)| Arc::new(Field::new(k, v.as_arrow_type(), true)))
|
||||
.collect::<Vec<_>>();
|
||||
ArrowDataType::Struct(Fields::from(fields))
|
||||
}
|
||||
JsonNativeType::Variant => ConcreteDataType::binary_datatype(),
|
||||
JsonNativeType::Variant => ArrowDataType::Binary,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -304,9 +303,10 @@ impl JsonType {
|
||||
pub(crate) fn as_struct_type(&self) -> StructType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => StructType::default(),
|
||||
JsonFormat::Json2(inner) => match ConcreteDataType::from(inner.as_ref()) {
|
||||
ConcreteDataType::Struct(t) => t.clone(),
|
||||
x => plain_json_struct_type(x),
|
||||
JsonFormat::Json2(native_type) => match native_type.as_arrow_type() {
|
||||
// TODO(LFC): Direct use Arrow's Struct datatype here.
|
||||
ArrowDataType::Struct(fields) => StructType::from(&fields),
|
||||
data_type => plain_json_struct_type(&data_type),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -370,8 +370,12 @@ fn is_include(this: &JsonNativeType, that: &JsonNativeType) -> bool {
|
||||
|
||||
/// A special struct type for denoting "plain"(not object) json value. It has only one field, with
|
||||
/// fixed name [JSON_PLAIN_FIELD_NAME] and with metadata [JSON_PLAIN_FIELD_METADATA_KEY] = `"true"`.
|
||||
pub(crate) fn plain_json_struct_type(item_type: ConcreteDataType) -> StructType {
|
||||
let mut field = StructField::new(JSON_PLAIN_FIELD_NAME.to_string(), item_type, true);
|
||||
fn plain_json_struct_type(data_type: &ArrowDataType) -> StructType {
|
||||
let mut field = StructField::new(
|
||||
JSON_PLAIN_FIELD_NAME.to_string(),
|
||||
ConcreteDataType::from_arrow_type(data_type),
|
||||
true,
|
||||
);
|
||||
field.insert_metadata(JSON_PLAIN_FIELD_METADATA_KEY, true);
|
||||
StructType::new(Arc::new(vec![field]))
|
||||
}
|
||||
|
||||
@@ -28,22 +28,21 @@ pub struct StructType {
|
||||
fields: Arc<Vec<StructField>>,
|
||||
}
|
||||
|
||||
impl TryFrom<&Fields> for StructType {
|
||||
type Error = crate::error::Error;
|
||||
fn try_from(value: &Fields) -> Result<Self, Self::Error> {
|
||||
impl From<&Fields> for StructType {
|
||||
fn from(value: &Fields) -> Self {
|
||||
let fields = value
|
||||
.iter()
|
||||
.map(|field| {
|
||||
Ok(StructField::new(
|
||||
StructField::new(
|
||||
field.name().clone(),
|
||||
ConcreteDataType::try_from(field.data_type())?,
|
||||
ConcreteDataType::from_arrow_type(field.data_type()),
|
||||
field.is_nullable(),
|
||||
))
|
||||
)
|
||||
})
|
||||
.collect::<Result<Vec<StructField>, Self::Error>>()?;
|
||||
Ok(StructType {
|
||||
.collect::<Vec<_>>();
|
||||
StructType {
|
||||
fields: Arc::new(fields),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1205,7 +1205,7 @@ impl TryFrom<ScalarValue> for Value {
|
||||
.map(|v| Value::Decimal128(Decimal128::new(v, p, s)))
|
||||
.unwrap_or(Value::Null),
|
||||
ScalarValue::Struct(struct_array) => {
|
||||
let struct_type: StructType = (struct_array.fields()).try_into()?;
|
||||
let struct_type = StructType::from(struct_array.fields());
|
||||
let items = struct_array
|
||||
.columns()
|
||||
.iter()
|
||||
@@ -1741,6 +1741,7 @@ pub(crate) mod tests {
|
||||
use super::*;
|
||||
use crate::json::value::{JsonVariant, JsonVariantRef};
|
||||
use crate::types::StructField;
|
||||
use crate::types::json_type::{JsonNativeType, JsonObjectType};
|
||||
use crate::vectors::ListVectorBuilder;
|
||||
|
||||
pub(crate) fn build_struct_type() -> StructType {
|
||||
@@ -1791,10 +1792,6 @@ pub(crate) mod tests {
|
||||
ScalarValue::Struct(Arc::new(struct_arrow_array))
|
||||
}
|
||||
|
||||
pub fn build_list_type() -> ConcreteDataType {
|
||||
ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::boolean_datatype()))
|
||||
}
|
||||
|
||||
pub(crate) fn build_list_value() -> ListValue {
|
||||
let items = vec![Value::Boolean(true), Value::Boolean(false)];
|
||||
ListValue::new(items, Arc::new(ConcreteDataType::boolean_datatype()))
|
||||
@@ -2274,39 +2271,26 @@ pub(crate) mod tests {
|
||||
);
|
||||
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::json_native_datatype(ConcreteDataType::boolean_datatype()),
|
||||
&ConcreteDataType::json2(JsonNativeType::Bool),
|
||||
&Value::Json(Box::new(true.into())),
|
||||
);
|
||||
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::json_native_datatype(build_list_type()),
|
||||
&ConcreteDataType::json2(JsonNativeType::Array(Box::new(JsonNativeType::Bool))),
|
||||
&Value::Json(Box::new([true].into())),
|
||||
);
|
||||
|
||||
check_type_and_value(
|
||||
&ConcreteDataType::json_native_datatype(ConcreteDataType::struct_datatype(
|
||||
StructType::new(Arc::new(vec![
|
||||
StructField::new(
|
||||
"address".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
StructField::new("age".to_string(), ConcreteDataType::uint64_datatype(), true),
|
||||
StructField::new(
|
||||
"awards".to_string(),
|
||||
ConcreteDataType::list_datatype(Arc::new(
|
||||
ConcreteDataType::boolean_datatype(),
|
||||
)),
|
||||
true,
|
||||
),
|
||||
StructField::new("id".to_string(), ConcreteDataType::int64_datatype(), true),
|
||||
StructField::new(
|
||||
"name".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
])),
|
||||
)),
|
||||
&ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
|
||||
("address".to_string(), JsonNativeType::String),
|
||||
("age".to_string(), JsonNativeType::u64()),
|
||||
(
|
||||
"awards".to_string(),
|
||||
JsonNativeType::Array(Box::new(JsonNativeType::Bool)),
|
||||
),
|
||||
("id".to_string(), JsonNativeType::i64()),
|
||||
("name".to_string(), JsonNativeType::String),
|
||||
]))),
|
||||
&Value::Json(Box::new(
|
||||
[
|
||||
("id", JsonVariant::from(1i64)),
|
||||
|
||||
@@ -240,7 +240,7 @@ impl Helper {
|
||||
ConstantVector::new(Arc::new(vector), length)
|
||||
}
|
||||
ScalarValue::Struct(v) => {
|
||||
let struct_type = StructType::try_from(v.fields())?;
|
||||
let struct_type = StructType::from(v.fields());
|
||||
ConstantVector::new(
|
||||
Arc::new(StructVector::try_new(struct_type, (*v).clone())?),
|
||||
length,
|
||||
@@ -401,7 +401,7 @@ impl Helper {
|
||||
.downcast_ref::<StructArray>()
|
||||
.unwrap();
|
||||
Arc::new(StructVector::try_new(
|
||||
StructType::try_from(fields)?,
|
||||
StructType::from(fields),
|
||||
array.clone(),
|
||||
)?)
|
||||
}
|
||||
|
||||
@@ -220,7 +220,7 @@ impl TryFrom<StructArray> for StructVector {
|
||||
|
||||
fn try_from(array: StructArray) -> Result<Self> {
|
||||
let fields = match array.data_type() {
|
||||
ArrowDataType::Struct(fields) => StructType::try_from(fields)?,
|
||||
ArrowDataType::Struct(fields) => StructType::from(fields),
|
||||
other => ConversionSnafu {
|
||||
from: other.to_string(),
|
||||
}
|
||||
|
||||
@@ -182,6 +182,7 @@ fn bulk_part_converter(c: &mut Criterion) {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: false,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
let mut converter = BulkPartConverter::new(&metadata, schema, rows, codec, false);
|
||||
@@ -208,6 +209,7 @@ fn bulk_part_converter(c: &mut Criterion) {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: true,
|
||||
string_pk_use_dict: true,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
let mut converter =
|
||||
|
||||
@@ -29,6 +29,7 @@ use std::time::Instant;
|
||||
|
||||
use api::v1::region::compact_request;
|
||||
use api::v1::region::compact_request::Options;
|
||||
use arrow_schema::Schema;
|
||||
use common_base::Plugins;
|
||||
use common_base::cancellation::CancellationHandle;
|
||||
use common_memory_manager::OnExhaustedPolicy;
|
||||
@@ -39,6 +40,11 @@ use common_time::timestamp::TimeUnit;
|
||||
use common_time::{TimeToLive, Timestamp};
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::Expr;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::types::json_type::JsonNativeType;
|
||||
use parquet::arrow::parquet_to_arrow_schema;
|
||||
use parquet::file::metadata::PageIndexPolicy;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -54,13 +60,15 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker};
|
||||
use crate::compaction::task::CompactionTaskImpl;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu,
|
||||
ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu,
|
||||
RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu,
|
||||
CompactRegionSnafu, CompactionCancelledSnafu, DataTypeMismatchSnafu, Error,
|
||||
GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, ParquetToArrowSchemaSnafu,
|
||||
RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result,
|
||||
TimeRangePredicateOverflowSnafu, TimeoutSnafu,
|
||||
};
|
||||
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
|
||||
use crate::read::FlatSource;
|
||||
use crate::read::flat_projection::FlatProjectionMapper;
|
||||
use crate::read::read_columns::ReadColumns;
|
||||
use crate::read::scan_region::{PredicateGroup, ScanInput};
|
||||
use crate::read::seq_scan::SeqScan;
|
||||
use crate::region::options::{MergeMode, RegionOptions};
|
||||
@@ -72,6 +80,7 @@ use crate::schedule::remote_job_scheduler::{
|
||||
};
|
||||
use crate::schedule::scheduler::SchedulerRef;
|
||||
use crate::sst::file::{FileHandle, FileMeta, Level};
|
||||
use crate::sst::parquet::reader::MetadataCacheMetrics;
|
||||
use crate::sst::version::LevelMeta;
|
||||
use crate::worker::WorkerListener;
|
||||
|
||||
@@ -996,19 +1005,56 @@ impl CompactionSstReaderBuilder<'_> {
|
||||
/// Build a [FlatSource] that yields Arrow `RecordBatch`s from reading all the input SST files,
|
||||
/// for compaction. The schema of the [FlatSource] is unified.
|
||||
async fn build_flat_sst_reader(self) -> Result<FlatSource> {
|
||||
let scan_input = self.build_scan_input()?.with_compaction(true);
|
||||
let scan_input = self.build_scan_input().await?.with_compaction(true);
|
||||
|
||||
let schema = scan_input.mapper.output_schema();
|
||||
let schema = schema.arrow_schema();
|
||||
|
||||
SeqScan::new(scan_input)
|
||||
let stream = SeqScan::new(scan_input)
|
||||
.build_flat_reader_for_compaction()
|
||||
.await
|
||||
.map(|stream| FlatSource::new_stream(schema.clone(), stream))
|
||||
.await?;
|
||||
Ok(FlatSource::new_stream(schema.clone(), stream))
|
||||
}
|
||||
|
||||
fn build_scan_input(self) -> Result<ScanInput> {
|
||||
let mapper = FlatProjectionMapper::all(&self.metadata)?;
|
||||
async fn build_scan_input(self) -> Result<ScanInput> {
|
||||
let schema = self.metadata.schema.arrow_schema();
|
||||
let json_type_hint = if schema.has_json_extension_field() {
|
||||
let mut json_type_hint = schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter(|&field| is_json_extension_type(field))
|
||||
.map(|field| (field.name().clone(), JsonNativeType::Null))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
let schemas = self.collect_arrow_schemas_from_parquet().await?;
|
||||
for schema in schemas {
|
||||
for field in schema.fields() {
|
||||
let Some(merged) = json_type_hint.get_mut(field.name()) else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let json_type = JsonNativeType::try_from(field.data_type())
|
||||
.context(DataTypeMismatchSnafu)?;
|
||||
merged.merge(&json_type);
|
||||
}
|
||||
}
|
||||
|
||||
Some(json_type_hint)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let projection = (0..self.metadata.column_metadatas.len()).collect();
|
||||
let read_columns = ReadColumns::from_deduped_column_ids(
|
||||
self.metadata.column_metadatas.iter().map(|x| x.column_id),
|
||||
);
|
||||
let mapper = FlatProjectionMapper::new_with_read_columns(
|
||||
&self.metadata,
|
||||
projection,
|
||||
read_columns,
|
||||
json_type_hint.as_ref(),
|
||||
)?;
|
||||
|
||||
let mut scan_input = ScanInput::new(self.sst_layer, mapper)
|
||||
.with_files(self.inputs.to_vec())
|
||||
.with_append_mode(self.append_mode)
|
||||
@@ -1028,6 +1074,43 @@ impl CompactionSstReaderBuilder<'_> {
|
||||
|
||||
Ok(scan_input)
|
||||
}
|
||||
|
||||
async fn collect_arrow_schemas_from_parquet(&self) -> Result<Vec<Schema>> {
|
||||
let mut schemas = Vec::with_capacity(self.inputs.len());
|
||||
|
||||
for file_handle in self.inputs {
|
||||
let file_path =
|
||||
file_handle.file_path(self.sst_layer.table_dir(), self.sst_layer.path_type());
|
||||
let file_size = file_handle.meta_ref().file_size;
|
||||
let parquet_metadata = match self
|
||||
.sst_layer
|
||||
.read_sst(file_handle.clone())
|
||||
.cache(CacheStrategy::Compaction(self.cache.clone()))
|
||||
.read_parquet_metadata(
|
||||
&file_path,
|
||||
file_size,
|
||||
&mut MetadataCacheMetrics::default(),
|
||||
PageIndexPolicy::default(),
|
||||
)
|
||||
.await
|
||||
.map(|x| x.0.parquet_metadata())
|
||||
{
|
||||
Ok(x) => x,
|
||||
Err(e) if e.is_object_not_found() => continue,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
let file_metadata = parquet_metadata.file_metadata();
|
||||
|
||||
let schema = parquet_to_arrow_schema(
|
||||
file_metadata.schema_descr(),
|
||||
file_metadata.key_value_metadata(),
|
||||
)
|
||||
.context(ParquetToArrowSchemaSnafu { file: file_path })?;
|
||||
|
||||
schemas.push(schema);
|
||||
}
|
||||
Ok(schemas)
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts time range to predicates so that rows outside the range will be filtered.
|
||||
|
||||
@@ -1241,6 +1241,15 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to generate Arrow schema from Parquet file: {}", file))]
|
||||
ParquetToArrowSchema {
|
||||
file: String,
|
||||
#[snafu(source)]
|
||||
error: parquet::errors::ParquetError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -1331,7 +1340,8 @@ impl ErrorExt for Error {
|
||||
| BuildEntry { .. }
|
||||
| Metadata { .. }
|
||||
| CastColumn { .. }
|
||||
| MitoManifestInfo { .. } => StatusCode::Internal,
|
||||
| MitoManifestInfo { .. }
|
||||
| ParquetToArrowSchema { .. } => StatusCode::Internal,
|
||||
|
||||
FetchManifests { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -2115,6 +2115,7 @@ mod tests {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: true,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
@@ -2552,6 +2553,7 @@ mod tests {
|
||||
&FlatSchemaOptions {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: true,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -1163,8 +1163,7 @@ impl FlatSource {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(unused)]
|
||||
fn schema(&self) -> &SchemaRef {
|
||||
pub(crate) fn schema(&self) -> &SchemaRef {
|
||||
&self.schema
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,9 @@ use datatypes::arrow::compute::{TakeOptions, take};
|
||||
use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use datatypes::vectors::json::array::JsonArray;
|
||||
@@ -43,15 +45,19 @@ use crate::error::{
|
||||
EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
|
||||
};
|
||||
use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
|
||||
use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
|
||||
use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, PrimaryKeyArray};
|
||||
use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
|
||||
|
||||
/// Returns true if `left` and `right` have same columns and primary key encoding.
|
||||
/// Returns true if the columns in the `projection_mapper` and `read_format` have same data types
|
||||
/// and primary key encodings.
|
||||
pub(crate) fn has_same_columns_and_pk_encoding(
|
||||
left: &RegionMetadata,
|
||||
right: &RegionMetadata,
|
||||
projection_mapper: &FlatProjectionMapper,
|
||||
read_format: &FlatReadFormat,
|
||||
compaction: bool,
|
||||
) -> bool {
|
||||
let left = projection_mapper.metadata();
|
||||
let right = read_format.metadata();
|
||||
if left.primary_key_encoding != right.primary_key_encoding {
|
||||
return false;
|
||||
}
|
||||
@@ -61,17 +67,13 @@ pub(crate) fn has_same_columns_and_pk_encoding(
|
||||
}
|
||||
|
||||
for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
|
||||
if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
|
||||
if left_col.column_id != right_col.column_id {
|
||||
return false;
|
||||
}
|
||||
debug_assert_eq!(
|
||||
left_col.column_schema.data_type,
|
||||
right_col.column_schema.data_type
|
||||
);
|
||||
debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
|
||||
}
|
||||
|
||||
true
|
||||
&projection_mapper.input_arrow_schema(compaction) == read_format.arrow_schema()
|
||||
}
|
||||
|
||||
/// A helper struct to adapt schema of the batch to an expected schema.
|
||||
@@ -88,16 +90,28 @@ impl FlatCompatBatch {
|
||||
/// Creates a [FlatCompatBatch].
|
||||
///
|
||||
/// - `mapper` is built from the metadata users expect to see.
|
||||
/// - `actual` is the [RegionMetadata] of the input parquet.
|
||||
/// - `format_projection` is the projection of the read format for the input parquet.
|
||||
/// - `read_format` is the [FlatReadFormat] of the input parquet.
|
||||
/// - `compaction` indicates whether the reader is for compaction.
|
||||
pub(crate) fn try_new(
|
||||
mapper: &FlatProjectionMapper,
|
||||
actual: &RegionMetadataRef,
|
||||
format_projection: &FormatProjection,
|
||||
read_format: &FlatReadFormat,
|
||||
compaction: bool,
|
||||
) -> Result<Option<Self>> {
|
||||
let actual_schema = flat_projected_columns(actual, format_projection);
|
||||
let actual = read_format.metadata();
|
||||
let format_projection = read_format.format_projection();
|
||||
let mut actual_schema = flat_projected_columns(actual, format_projection);
|
||||
if read_format.arrow_schema().has_json_extension_field() {
|
||||
for field in read_format.arrow_schema().fields() {
|
||||
if is_json_extension_type(field)
|
||||
&& let Some(column_id) =
|
||||
actual.column_by_name(field.name()).map(|x| x.column_id)
|
||||
&& let Some(i) = actual_schema.iter().position(|x| x.0 == column_id)
|
||||
{
|
||||
actual_schema[i].1 = ConcreteDataType::from_arrow_type(field.data_type());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let expect_schema = mapper.batch_schema();
|
||||
if expect_schema == actual_schema {
|
||||
// Although the SST has a different schema, but the schema after projection is the same
|
||||
@@ -152,10 +166,12 @@ impl FlatCompatBatch {
|
||||
expect_column.column_id,
|
||||
)));
|
||||
} else {
|
||||
fields.push(Arc::new(with_field_id(
|
||||
(**column_field).clone(),
|
||||
let field = with_field_id(
|
||||
Arc::unwrap_or_clone(column_field.clone()),
|
||||
expect_column.column_id,
|
||||
)));
|
||||
)
|
||||
.with_data_type(expect_data_type.as_arrow_type());
|
||||
fields.push(Arc::new(field))
|
||||
};
|
||||
|
||||
if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
|
||||
@@ -720,12 +736,10 @@ mod tests {
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
let format_projection = read_format.format_projection();
|
||||
|
||||
let compat_batch =
|
||||
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
|
||||
tag_builder.append_value("tag1");
|
||||
@@ -804,6 +818,7 @@ mod tests {
|
||||
&expected_metadata,
|
||||
vec![1, 2],
|
||||
ReadColumns::from_deduped_column_ids([1, 2, 3]),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
let read_format = FlatReadFormat::new(
|
||||
@@ -814,12 +829,10 @@ mod tests {
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
let format_projection = read_format.format_projection();
|
||||
|
||||
let compat_batch =
|
||||
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
|
||||
tag_builder.append_value("tag1");
|
||||
@@ -905,12 +918,10 @@ mod tests {
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
let format_projection = read_format.format_projection();
|
||||
|
||||
let compat_batch =
|
||||
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
// Tag array.
|
||||
let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
|
||||
@@ -999,12 +1010,10 @@ mod tests {
|
||||
true,
|
||||
)
|
||||
.unwrap();
|
||||
let format_projection = read_format.format_projection();
|
||||
|
||||
let compat_batch =
|
||||
FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, true)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
let sparse_k1 = encode_sparse_key(&[]);
|
||||
let input_columns: Vec<ArrayRef> = vec![
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Utilities for projection on flat format.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
@@ -27,6 +28,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::prelude::{ConcreteDataType, DataType};
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use datatypes::types::json_type::JsonNativeType;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::Helper;
|
||||
use datatypes::vectors::json::array::JsonArray;
|
||||
@@ -84,7 +86,7 @@ impl FlatProjectionMapper {
|
||||
let projection: Vec<_> = projection.into_iter().collect();
|
||||
let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
|
||||
let read_cols = ReadColumns::from_deduped_column_ids(read_column_ids);
|
||||
Self::new_with_read_columns(metadata, projection, read_cols)
|
||||
Self::new_with_read_columns(metadata, projection, read_cols, None)
|
||||
}
|
||||
|
||||
/// Returns a new mapper with output projection and explicit read columns.
|
||||
@@ -92,6 +94,7 @@ impl FlatProjectionMapper {
|
||||
metadata: &RegionMetadataRef,
|
||||
projection: Vec<usize>,
|
||||
read_cols: ReadColumns,
|
||||
json_type_hint: Option<&HashMap<String, JsonNativeType>>,
|
||||
) -> Result<Self> {
|
||||
// If the original projection is empty.
|
||||
let is_empty_projection = projection.is_empty();
|
||||
@@ -109,7 +112,17 @@ impl FlatProjectionMapper {
|
||||
reason: format!("projection index {} is out of bound", idx),
|
||||
})?;
|
||||
output_col_ids.push(col.column_id);
|
||||
col_schemas.push(col.column_schema.clone());
|
||||
|
||||
let mut schema = col.column_schema.clone();
|
||||
if let Some(concretized) = json_type_hint
|
||||
.and_then(|x| x.get(&schema.name))
|
||||
.cloned()
|
||||
.map(ConcreteDataType::json2)
|
||||
&& schema.data_type.is_json()
|
||||
{
|
||||
schema.data_type = concretized;
|
||||
}
|
||||
col_schemas.push(schema);
|
||||
}
|
||||
|
||||
// Creates a map to lookup index.
|
||||
@@ -123,7 +136,21 @@ impl FlatProjectionMapper {
|
||||
read_cols.clone(),
|
||||
);
|
||||
|
||||
let batch_schema = flat_projected_columns(metadata, &format_projection);
|
||||
let mut batch_schema = flat_projected_columns(metadata, &format_projection);
|
||||
|
||||
if let Some(json_type_hint) = json_type_hint
|
||||
&& !json_type_hint.is_empty()
|
||||
{
|
||||
for (column_id, data_type) in batch_schema.iter_mut() {
|
||||
if let Some(concretized) = metadata
|
||||
.column_by_id(*column_id)
|
||||
.and_then(|x| json_type_hint.get(&x.column_schema.name).cloned())
|
||||
.map(ConcreteDataType::json2)
|
||||
{
|
||||
*data_type = concretized;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Safety: We get the column id from the metadata.
|
||||
let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
|
||||
@@ -228,10 +255,15 @@ impl FlatProjectionMapper {
|
||||
self.input_arrow_schema.clone()
|
||||
} else {
|
||||
// For compaction, we need to build a different schema from encoding.
|
||||
to_flat_sst_arrow_schema(
|
||||
&self.metadata,
|
||||
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
|
||||
)
|
||||
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
|
||||
options.concretized_json_types = self
|
||||
.input_arrow_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.filter(|&field| is_json_extension_type(field))
|
||||
.map(|field| (field.name().clone(), field.data_type().clone()))
|
||||
.collect();
|
||||
to_flat_sst_arrow_schema(&self.metadata, &options)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,10 +274,6 @@ impl FlatProjectionMapper {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn with_output_schema(&mut self, schema: SchemaRef) {
|
||||
self.output_schema = schema;
|
||||
}
|
||||
|
||||
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
|
||||
///
|
||||
/// The batch must match the `projection` using to build the mapper.
|
||||
@@ -407,13 +435,14 @@ pub(crate) fn compute_input_arrow_schema(
|
||||
batch_schema: &[(ColumnId, ConcreteDataType)],
|
||||
) -> datatypes::arrow::datatypes::SchemaRef {
|
||||
let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
|
||||
for (column_id, _) in batch_schema {
|
||||
for (column_id, data_type) in batch_schema {
|
||||
let column_metadata = metadata.column_by_id(*column_id).unwrap();
|
||||
let field = Field::new(
|
||||
&column_metadata.column_schema.name,
|
||||
column_metadata.column_schema.data_type.as_arrow_type(),
|
||||
data_type.as_arrow_type(),
|
||||
column_metadata.column_schema.is_nullable(),
|
||||
);
|
||||
)
|
||||
.with_metadata(column_metadata.column_schema.metadata().clone());
|
||||
let field = with_field_id(field, *column_id);
|
||||
if column_metadata.semantic_type == SemanticType::Tag {
|
||||
new_fields.push(tag_maybe_to_dictionary_field(
|
||||
@@ -454,7 +483,8 @@ impl CompactionProjectionMapper {
|
||||
|
||||
let read_col_ids = metadata.column_metadatas.iter().map(|col| col.column_id);
|
||||
let read_cols = ReadColumns::from_deduped_column_ids(read_col_ids);
|
||||
let mapper = FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols)?;
|
||||
let mapper =
|
||||
FlatProjectionMapper::new_with_read_columns(metadata, projection, read_cols, None)?;
|
||||
let assembler = DfBatchAssembler::new(mapper.output_schema());
|
||||
|
||||
Ok(Self { mapper, assembler })
|
||||
|
||||
@@ -296,6 +296,7 @@ mod tests {
|
||||
&metadata,
|
||||
vec![4, 1],
|
||||
ReadColumns::from_deduped_column_ids([4, 1, 3]),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice());
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Scans a region according to the scan request.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
@@ -31,11 +31,7 @@ use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::Expr;
|
||||
use datafusion_expr::utils::expr_to_columns;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::types::json_type::JsonNativeType;
|
||||
use futures::StreamExt;
|
||||
use partition::expr::PartitionExpr;
|
||||
use smallvec::SmallVec;
|
||||
@@ -429,15 +425,24 @@ impl ScanRegion {
|
||||
let read_col_ids = read_cols.column_ids();
|
||||
|
||||
// The mapper always computes projected column ids as the schema of SSTs may change.
|
||||
let mut mapper = match self.request.projection_indices() {
|
||||
Some(p) => FlatProjectionMapper::new_with_read_columns(
|
||||
&self.version.metadata,
|
||||
p.to_vec(),
|
||||
read_cols,
|
||||
)?,
|
||||
None => FlatProjectionMapper::all(&self.version.metadata)?,
|
||||
};
|
||||
concretize_json_types(&mut mapper, &self.request.json_type_hint);
|
||||
let projection = self
|
||||
.request
|
||||
.projection_indices()
|
||||
.map(|x| x.to_vec())
|
||||
.unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
|
||||
let json_type_hint = self
|
||||
.version
|
||||
.metadata
|
||||
.schema
|
||||
.arrow_schema()
|
||||
.has_json_extension_field()
|
||||
.then_some(&self.request.json_type_hint);
|
||||
let mapper = FlatProjectionMapper::new_with_read_columns(
|
||||
&self.version.metadata,
|
||||
projection,
|
||||
read_cols,
|
||||
json_type_hint,
|
||||
)?;
|
||||
|
||||
let ssts = &self.version.ssts;
|
||||
let mut files = Vec::new();
|
||||
@@ -733,34 +738,6 @@ impl ScanRegion {
|
||||
}
|
||||
}
|
||||
|
||||
fn concretize_json_types(
|
||||
mapper: &mut FlatProjectionMapper,
|
||||
json_type_hint: &HashMap<String, JsonNativeType>,
|
||||
) {
|
||||
let output_schema = mapper.output_schema();
|
||||
let output_arrow_schema = output_schema.arrow_schema();
|
||||
if !output_arrow_schema.has_json_extension_field() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut column_schemas = output_schema.column_schemas().to_vec();
|
||||
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
|
||||
if !is_json_extension_type(&output_arrow_schema.fields()[idx]) {
|
||||
continue;
|
||||
}
|
||||
let Some(json_type) = json_type_hint.get(&column_schema.name) else {
|
||||
continue;
|
||||
};
|
||||
column_schema.data_type = ConcreteDataType::from(json_type);
|
||||
}
|
||||
|
||||
let output_schema = Arc::new(Schema::new_with_version(
|
||||
column_schemas,
|
||||
output_schema.version(),
|
||||
));
|
||||
mapper.with_output_schema(output_schema);
|
||||
}
|
||||
|
||||
/// Returns true if the time range of a SST `file` matches the `predicate`.
|
||||
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
|
||||
if predicate == &TimestampRange::min_to_max() {
|
||||
@@ -1126,16 +1103,16 @@ impl ScanInput {
|
||||
};
|
||||
|
||||
let need_compat = !compat::has_same_columns_and_pk_encoding(
|
||||
self.mapper.metadata(),
|
||||
file_range_ctx.read_format().metadata(),
|
||||
&self.mapper,
|
||||
file_range_ctx.read_format(),
|
||||
self.compaction,
|
||||
);
|
||||
if need_compat {
|
||||
// They have different schema. We need to adapt the batch first so the
|
||||
// mapper can convert it.
|
||||
let compat = FlatCompatBatch::try_new(
|
||||
&self.mapper,
|
||||
file_range_ctx.read_format().metadata(),
|
||||
file_range_ctx.read_format().format_projection(),
|
||||
file_range_ctx.read_format(),
|
||||
self.compaction,
|
||||
)?;
|
||||
file_range_ctx.set_compat_batch(compat);
|
||||
@@ -1774,19 +1751,15 @@ impl PredicateGroup {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::physical_plan::expressions::lit as physical_lit;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{col, lit};
|
||||
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use datatypes::types::json_type::JsonObjectType;
|
||||
use datatypes::value::Value;
|
||||
use partition::expr::col as partition_col;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
|
||||
use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
use store_api::metadata::RegionMetadataBuilder;
|
||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
||||
|
||||
use super::*;
|
||||
use crate::cache::CacheManager;
|
||||
@@ -1818,113 +1791,6 @@ mod tests {
|
||||
lit(ScalarValue::TimestampMillisecond(Some(val), None))
|
||||
}
|
||||
|
||||
fn metadata_with_json_field() -> RegionMetadataRef {
|
||||
let mut json_column = ColumnSchema::new(
|
||||
"payload",
|
||||
ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
|
||||
true,
|
||||
);
|
||||
json_column
|
||||
.with_extension_type(&JsonExtensionType::new(Arc::new(JsonMetadata::default())))
|
||||
.unwrap();
|
||||
|
||||
let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"host",
|
||||
ConcreteDataType::string_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts",
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: json_column,
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 3,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 4,
|
||||
})
|
||||
.primary_key(vec![1]);
|
||||
|
||||
Arc::new(builder.build().unwrap())
|
||||
}
|
||||
|
||||
fn concrete_json_type_hint() -> JsonNativeType {
|
||||
JsonNativeType::Object(JsonObjectType::from([
|
||||
("active".to_string(), JsonNativeType::Bool),
|
||||
("name".to_string(), JsonNativeType::String),
|
||||
]))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_concretize_json_types_rewrites_json_output_schema() -> Result<()> {
|
||||
let metadata = metadata_with_json_field();
|
||||
let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3])?;
|
||||
let output_schema = mapper.output_schema();
|
||||
let original_arrow_schema = output_schema.arrow_schema();
|
||||
assert!(original_arrow_schema.has_json_extension_field());
|
||||
assert!(is_json_extension_type(&original_arrow_schema.fields()[1]));
|
||||
|
||||
let expected_type = concrete_json_type_hint();
|
||||
concretize_json_types(
|
||||
&mut mapper,
|
||||
&HashMap::from([("payload".to_string(), expected_type.clone())]),
|
||||
);
|
||||
|
||||
let output_schema = mapper.output_schema();
|
||||
let output_arrow_schema = output_schema.arrow_schema();
|
||||
assert!(output_arrow_schema.has_json_extension_field());
|
||||
assert_eq!(
|
||||
ConcreteDataType::string_datatype(),
|
||||
output_schema.column_schemas()[0].data_type
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::from(&expected_type),
|
||||
output_schema.column_schemas()[1].data_type
|
||||
);
|
||||
assert_eq!(
|
||||
ConcreteDataType::int64_datatype(),
|
||||
output_schema.column_schemas()[2].data_type
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_concretize_json_types_keeps_json_without_hint() {
|
||||
let metadata = metadata_with_json_field();
|
||||
let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
|
||||
|
||||
concretize_json_types(
|
||||
&mut mapper,
|
||||
&HashMap::from([("missing".to_string(), JsonNativeType::String)]),
|
||||
);
|
||||
|
||||
let output_schema = mapper.output_schema();
|
||||
let output_arrow_schema = output_schema.arrow_schema();
|
||||
assert!(output_arrow_schema.has_json_extension_field());
|
||||
assert!(is_json_extension_type(&output_arrow_schema.fields()[1]));
|
||||
|
||||
// Assert that the expected JSON type stays un-concretized: empty object.
|
||||
assert_eq!(
|
||||
ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
|
||||
output_schema.column_schemas()[1].data_type
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_scan_fingerprint_for_eligible_scan() {
|
||||
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
|
||||
|
||||
@@ -130,7 +130,7 @@ impl SeqScan {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the compaction flag is not set.
|
||||
pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
|
||||
pub(crate) async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
|
||||
assert!(self.stream_ctx.input.compaction);
|
||||
|
||||
let metrics_set = ExecutionPlanMetricsSet::new();
|
||||
|
||||
@@ -14,9 +14,11 @@
|
||||
|
||||
//! Sorted strings tables.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
use arrow_schema::DataType;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use datatypes::arrow::datatypes::{
|
||||
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
|
||||
@@ -108,6 +110,9 @@ pub struct FlatSchemaOptions {
|
||||
/// when storing primary key columns.
|
||||
/// Only takes effect when `raw_pk_columns` is true.
|
||||
pub string_pk_use_dict: bool,
|
||||
/// The column's concretized JSON types, to be set into Arrow schema.
|
||||
/// Otherwise it's empty struct in the Arrow schema.
|
||||
pub concretized_json_types: HashMap<String, DataType>,
|
||||
}
|
||||
|
||||
impl Default for FlatSchemaOptions {
|
||||
@@ -115,6 +120,7 @@ impl Default for FlatSchemaOptions {
|
||||
Self {
|
||||
raw_pk_columns: true,
|
||||
string_pk_use_dict: true,
|
||||
concretized_json_types: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -128,6 +134,7 @@ impl FlatSchemaOptions {
|
||||
Self {
|
||||
raw_pk_columns: false,
|
||||
string_pk_use_dict: false,
|
||||
concretized_json_types: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -159,6 +166,7 @@ pub fn to_flat_sst_arrow_schema(
|
||||
&metadata.column_metadatas[pk_index].column_schema.data_type,
|
||||
old_field,
|
||||
);
|
||||
let new_field = concretize_json_type(new_field, options);
|
||||
fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
|
||||
}
|
||||
}
|
||||
@@ -169,8 +177,9 @@ pub fn to_flat_sst_arrow_schema(
|
||||
.zip(&metadata.column_metadatas)
|
||||
.filter_map(|(field, column_meta)| {
|
||||
if column_meta.semantic_type == SemanticType::Field {
|
||||
let field = concretize_json_type(field.clone(), options);
|
||||
Some(Arc::new(with_field_id(
|
||||
(**field).clone(),
|
||||
Arc::unwrap_or_clone(field),
|
||||
column_meta.column_id,
|
||||
)))
|
||||
} else {
|
||||
@@ -189,6 +198,16 @@ pub fn to_flat_sst_arrow_schema(
|
||||
Arc::new(Schema::new(fields))
|
||||
}
|
||||
|
||||
fn concretize_json_type(field: Arc<Field>, options: &FlatSchemaOptions) -> Arc<Field> {
|
||||
if let Some(data_type) = options.concretized_json_types.get(field.name()) {
|
||||
let mut field = Arc::unwrap_or_clone(field);
|
||||
field.set_data_type(data_type.clone());
|
||||
Arc::new(field)
|
||||
} else {
|
||||
field
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the number of columns in the flat format.
|
||||
pub fn flat_sst_arrow_schema_column_num(
|
||||
metadata: &RegionMetadata,
|
||||
|
||||
@@ -165,10 +165,11 @@ impl FlatReadFormat {
|
||||
pub fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
read_cols: ReadColumns,
|
||||
num_columns: Option<usize>,
|
||||
file_schema: Option<SchemaRef>,
|
||||
file_path: &str,
|
||||
skip_auto_convert: bool,
|
||||
) -> Result<FlatReadFormat> {
|
||||
let num_columns = file_schema.as_ref().map(|x| x.fields().len());
|
||||
let is_legacy = match num_columns {
|
||||
Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
|
||||
None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
|
||||
@@ -189,7 +190,9 @@ impl FlatReadFormat {
|
||||
))
|
||||
}
|
||||
} else {
|
||||
ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols))
|
||||
let file_schema = file_schema
|
||||
.unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default()));
|
||||
ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, file_schema))
|
||||
};
|
||||
|
||||
Ok(FlatReadFormat {
|
||||
@@ -248,9 +251,6 @@ impl FlatReadFormat {
|
||||
}
|
||||
|
||||
/// Gets the arrow schema of the SST file.
|
||||
///
|
||||
/// This schema is computed from the region metadata but should be the same
|
||||
/// as the arrow schema decoded from the file metadata.
|
||||
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
|
||||
match &self.parquet_adapter {
|
||||
ParquetAdapter::Flat(p) => &p.arrow_schema,
|
||||
@@ -483,10 +483,13 @@ struct ParquetFlat {
|
||||
|
||||
impl ParquetFlat {
|
||||
/// Creates a helper with existing `metadata` and `column_ids` to read.
|
||||
fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> ParquetFlat {
|
||||
fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
read_cols: ReadColumns,
|
||||
arrow_schema: SchemaRef,
|
||||
) -> ParquetFlat {
|
||||
// Creates a map to lookup index.
|
||||
let id_to_index = sst_column_id_indices(&metadata);
|
||||
let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
let sst_column_num =
|
||||
flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
|
||||
let format_projection =
|
||||
|
||||
@@ -1323,7 +1323,7 @@ mod tests {
|
||||
let mut format = FlatReadFormat::new(
|
||||
metadata,
|
||||
ReadColumns::from_deduped_column_ids(std::iter::once(1)), // Just read tag0
|
||||
Some(8),
|
||||
Some(build_test_flat_sst_schema()),
|
||||
"test",
|
||||
false,
|
||||
)
|
||||
@@ -1540,7 +1540,7 @@ mod tests {
|
||||
let format = FlatReadFormat::new(
|
||||
metadata.clone(),
|
||||
ReadColumns::from_deduped_column_ids(column_ids),
|
||||
Some(6),
|
||||
Some(build_test_arrow_schema()),
|
||||
"test",
|
||||
false,
|
||||
)
|
||||
|
||||
@@ -37,8 +37,8 @@ use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use futures::StreamExt;
|
||||
use mito_codec::row_converter::build_primary_key_codec;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::ProjectionMask;
|
||||
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions, RowSelection};
|
||||
use parquet::arrow::{ProjectionMask, parquet_to_arrow_schema};
|
||||
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
|
||||
use partition::expr::PartitionExpr;
|
||||
use snafu::ResultExt;
|
||||
@@ -53,7 +53,9 @@ use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::cache::{CacheStrategy, CachedSstMeta};
|
||||
#[cfg(feature = "vector_index")]
|
||||
use crate::error::ApplyVectorIndexSnafu;
|
||||
use crate::error::{ReadDataPartSnafu, Result, SerializePartitionExprSnafu};
|
||||
use crate::error::{
|
||||
ParquetToArrowSchemaSnafu, ReadDataPartSnafu, Result, SerializePartitionExprSnafu,
|
||||
};
|
||||
use crate::metrics::{
|
||||
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
|
||||
READ_ROWS_TOTAL, READ_STAGE_ELAPSED,
|
||||
@@ -390,10 +392,16 @@ impl ParquetReaderBuilder {
|
||||
.map(|col| col.column_id),
|
||||
)
|
||||
};
|
||||
|
||||
let file_metadata = parquet_meta.file_metadata();
|
||||
let parquet_schema_desc = file_metadata.schema_descr();
|
||||
let file_schema =
|
||||
parquet_to_arrow_schema(parquet_schema_desc, file_metadata.key_value_metadata())
|
||||
.context(ParquetToArrowSchemaSnafu { file: &file_path })?;
|
||||
let mut read_format = FlatReadFormat::new(
|
||||
region_meta.clone(),
|
||||
read_cols,
|
||||
Some(parquet_meta.file_metadata().schema_descr().num_columns()),
|
||||
Some(Arc::new(file_schema)),
|
||||
&file_path,
|
||||
skip_auto_convert,
|
||||
)?;
|
||||
@@ -403,7 +411,6 @@ impl ParquetReaderBuilder {
|
||||
}
|
||||
|
||||
// Computes the projection mask.
|
||||
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();
|
||||
let parquet_read_cols = read_format.parquet_read_columns();
|
||||
let projection_plan = build_projection_plan(parquet_read_cols, parquet_schema_desc);
|
||||
let selection = self
|
||||
@@ -585,7 +592,7 @@ impl ParquetReaderBuilder {
|
||||
|
||||
/// Reads parquet metadata of specific file.
|
||||
/// Returns (fused metadata, cache_miss_flag).
|
||||
async fn read_parquet_metadata(
|
||||
pub(crate) async fn read_parquet_metadata(
|
||||
&self,
|
||||
file_path: &str,
|
||||
file_size: u64,
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Parquet writer.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
@@ -31,6 +32,8 @@ use datatypes::arrow::array::{
|
||||
use datatypes::arrow::compute::{max, min};
|
||||
use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use object_store::{FuturesAsyncWriter, ObjectStore};
|
||||
use parquet::arrow::AsyncArrowWriter;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
@@ -271,12 +274,21 @@ where
|
||||
override_sequence: Option<SequenceNumber>,
|
||||
opts: &WriteOptions,
|
||||
) -> Result<SstInfoArray> {
|
||||
let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
|
||||
|
||||
if source.schema().has_json_extension_field() {
|
||||
options.concretized_json_types = source
|
||||
.schema()
|
||||
.fields()
|
||||
.iter()
|
||||
.filter(|&field| is_json_extension_type(field))
|
||||
.map(|field| (field.name().clone(), field.data_type().clone()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
}
|
||||
|
||||
let converter = FlatBatchConverter::Flat(
|
||||
FlatWriteFormat::new(
|
||||
self.metadata.clone(),
|
||||
&FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
|
||||
)
|
||||
.with_override_sequence(override_sequence),
|
||||
FlatWriteFormat::new(self.metadata.clone(), &options)
|
||||
.with_override_sequence(override_sequence),
|
||||
);
|
||||
let res = self.write_all_flat_inner(source, &converter, opts).await;
|
||||
if res.is_err() {
|
||||
|
||||
@@ -42,6 +42,14 @@ admin flush_table('json2_table');
|
||||
| 0 |
|
||||
+----------------------------------+
|
||||
|
||||
admin compact_table('json2_table', 'swcs', '86400');
|
||||
|
||||
+-----------------------------------------------------+
|
||||
| ADMIN compact_table('json2_table', 'swcs', '86400') |
|
||||
+-----------------------------------------------------+
|
||||
| 0 |
|
||||
+-----------------------------------------------------+
|
||||
|
||||
insert into json2_table
|
||||
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
|
||||
(8, '{"a": {"b": 8}, "c": "s8"}');
|
||||
@@ -131,10 +139,10 @@ select j.d from json2_table order by ts;
|
||||
+-----------------------------------+
|
||||
| json_get(json2_table.j,Utf8("d")) |
|
||||
+-----------------------------------+
|
||||
| [{e: {f: 0.1}}] |
|
||||
| [{e: {f: 0.2}}] |
|
||||
| [{e: {f: 0.1, g: }}] |
|
||||
| [{e: {f: 0.2, g: }}] |
|
||||
| |
|
||||
| [{e: {g: -0.4}}] |
|
||||
| [{e: {f: , g: -0.4}}] |
|
||||
| |
|
||||
| |
|
||||
| [{e: {g: -0.7}}] |
|
||||
|
||||
@@ -22,6 +22,8 @@ values (4, '{"a": {"b": -4}, "d": [{"e": {"g": -0.4}}]}'),
|
||||
|
||||
admin flush_table('json2_table');
|
||||
|
||||
admin compact_table('json2_table', 'swcs', '86400');
|
||||
|
||||
insert into json2_table
|
||||
values (7, '{"a": {"b": "s7"}, "c": [1], "d": [{"e": {"g": -0.7}}]}'),
|
||||
(8, '{"a": {"b": 8}, "c": "s8"}');
|
||||
|
||||
Reference in New Issue
Block a user