mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 04:20:39 +00:00
@@ -129,7 +129,7 @@ impl From<ColumnDataTypeWrapper> for ConcreteDataType {
|
||||
};
|
||||
ConcreteDataType::json_native_datatype(inner_type.into())
|
||||
}
|
||||
None => ConcreteDataType::Json(JsonType::null()),
|
||||
None => ConcreteDataType::Json(JsonType::new(JsonFormat::Json2)),
|
||||
_ => {
|
||||
// invalid state, type extension is missing or invalid
|
||||
ConcreteDataType::null_datatype()
|
||||
@@ -461,6 +461,7 @@ impl TryFrom<ConcreteDataType> for ColumnDataTypeWrapper {
|
||||
})
|
||||
}
|
||||
}
|
||||
JsonFormat::Json2 => Some(ColumnDataTypeExtension { type_ext: None }),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub mod json2_get;
|
||||
pub mod json_get;
|
||||
mod json_is;
|
||||
mod json_path_exists;
|
||||
@@ -24,6 +25,7 @@ use json_is::{
|
||||
JsonIsArray, JsonIsBool, JsonIsFloat, JsonIsInt, JsonIsNull, JsonIsObject, JsonIsString,
|
||||
};
|
||||
use json_to_string::JsonToStringFunction;
|
||||
use json2_get::Json2GetFunction;
|
||||
use parse_json::ParseJsonFunction;
|
||||
|
||||
use crate::function_registry::FunctionRegistry;
|
||||
@@ -42,6 +44,7 @@ impl JsonFunction {
|
||||
registry.register_scalar(JsonGetBool::default());
|
||||
registry.register_scalar(JsonGetObject::default());
|
||||
registry.register_scalar(JsonGetWithType::default());
|
||||
registry.register_scalar(Json2GetFunction::default());
|
||||
|
||||
registry.register_scalar(JsonIsNull::default());
|
||||
registry.register_scalar(JsonIsInt::default());
|
||||
|
||||
136
src/common/function/src/scalars/json/json2_get.rs
Normal file
136
src/common/function/src/scalars/json/json2_get.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_cast::display::array_value_to_string;
|
||||
use datafusion_common::arrow::array::{
|
||||
Array, ArrayRef, StringViewBuilder, StructArray, new_null_array,
|
||||
};
|
||||
use datafusion_common::arrow::datatypes::DataType;
|
||||
use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
|
||||
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, TypeSignature, Volatility};
|
||||
use derive_more::Display;
|
||||
|
||||
use crate::function::Function;
|
||||
|
||||
#[derive(Display, Debug)]
|
||||
#[display("{}", Self::NAME.to_ascii_uppercase())]
|
||||
pub struct Json2GetFunction {
|
||||
signature: Signature,
|
||||
}
|
||||
|
||||
impl Json2GetFunction {
|
||||
pub const NAME: &'static str = "json2_get";
|
||||
}
|
||||
|
||||
impl Function for Json2GetFunction {
|
||||
fn name(&self) -> &str {
|
||||
Self::NAME
|
||||
}
|
||||
|
||||
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
|
||||
Ok(DataType::Utf8View)
|
||||
}
|
||||
|
||||
fn signature(&self) -> &Signature {
|
||||
&self.signature
|
||||
}
|
||||
|
||||
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
|
||||
if args.args.len() != 2 {
|
||||
return exec_err!("json2_get expects 2 arguments, got {}", args.args.len());
|
||||
}
|
||||
|
||||
let input = args.args[0].to_array(args.number_rows)?;
|
||||
let path = path_from_arg(&args.args[1])?;
|
||||
|
||||
let segments: Vec<&str> = if path.is_empty() {
|
||||
Vec::new()
|
||||
} else {
|
||||
path.split('.').collect()
|
||||
};
|
||||
let Some(struct_path) = resolve_struct_path(&input, &segments) else {
|
||||
return Ok(ColumnarValue::Array(new_null_array(
|
||||
args.return_type(),
|
||||
input.len(),
|
||||
)));
|
||||
};
|
||||
|
||||
let values = display_array_from_path(&struct_path)?;
|
||||
Ok(ColumnarValue::Array(values))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Json2GetFunction {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
signature: Signature::one_of(vec![TypeSignature::Any(2)], Volatility::Immutable),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn path_from_arg(arg: &ColumnarValue) -> Result<&String> {
|
||||
match arg {
|
||||
ColumnarValue::Scalar(ScalarValue::Utf8(Some(path)))
|
||||
| ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(path)))
|
||||
| ColumnarValue::Scalar(ScalarValue::Utf8View(Some(path))) => Ok(path),
|
||||
ColumnarValue::Scalar(_) => exec_err!("json2_get expects a string path"),
|
||||
ColumnarValue::Array(_) => exec_err!("json2_get expects a literal path"),
|
||||
}
|
||||
}
|
||||
|
||||
struct StructPath {
|
||||
parents: Vec<ArrayRef>,
|
||||
leaf: ArrayRef,
|
||||
}
|
||||
|
||||
fn resolve_struct_path(array: &ArrayRef, segments: &[&str]) -> Option<StructPath> {
|
||||
let mut current = array.clone();
|
||||
let mut parents = Vec::with_capacity(segments.len());
|
||||
|
||||
for segment in segments {
|
||||
let struct_array = current.as_any().downcast_ref::<StructArray>()?;
|
||||
let DataType::Struct(fields) = current.data_type() else {
|
||||
unreachable!()
|
||||
};
|
||||
let index = fields.iter().position(|field| field.name() == *segment)?;
|
||||
parents.push(current.clone());
|
||||
current = struct_array.column(index).clone();
|
||||
}
|
||||
|
||||
Some(StructPath {
|
||||
parents,
|
||||
leaf: current,
|
||||
})
|
||||
}
|
||||
|
||||
fn struct_path_is_null(parents: &[ArrayRef], index: usize) -> bool {
|
||||
parents.iter().any(|parent| parent.is_null(index))
|
||||
}
|
||||
|
||||
fn display_array_from_path(path: &StructPath) -> Result<ArrayRef> {
|
||||
let mut builder = StringViewBuilder::with_capacity(path.leaf.len());
|
||||
for index in 0..path.leaf.len() {
|
||||
if struct_path_is_null(&path.parents, index) || path.leaf.is_null(index) {
|
||||
builder.append_null();
|
||||
continue;
|
||||
}
|
||||
|
||||
let value = array_value_to_string(path.leaf.as_ref(), index)
|
||||
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
|
||||
builder.append_value(value);
|
||||
}
|
||||
Ok(Arc::new(builder.finish()))
|
||||
}
|
||||
@@ -395,6 +395,11 @@ pub fn align_json_array(json_array: &ArrayRef, schema_type: &ArrowDataType) -> R
|
||||
&array_columns[j],
|
||||
schema_field.data_type(),
|
||||
)?);
|
||||
} else if schema_field.data_type() != array_field.data_type() {
|
||||
aligned.push(
|
||||
compute::cast(&array_columns[j], schema_field.data_type())
|
||||
.context(ArrowComputeSnafu)?,
|
||||
);
|
||||
} else {
|
||||
aligned.push(array_columns[j].clone());
|
||||
}
|
||||
|
||||
@@ -306,7 +306,7 @@ pub(crate) fn parse_string_to_value(
|
||||
let v = parse_string_to_jsonb(&s).context(DatatypeSnafu)?;
|
||||
Ok(Value::Binary(v.into()))
|
||||
}
|
||||
JsonFormat::Native(_) => {
|
||||
JsonFormat::Native(_) | JsonFormat::Json2 => {
|
||||
let extension_type: Option<JsonExtensionType> =
|
||||
column_schema.extension_type().context(DatatypeSnafu)?;
|
||||
let json_structure_settings = extension_type
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
mod column_schema;
|
||||
pub mod constraint;
|
||||
pub mod ext;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
|
||||
25
src/datatypes/src/schema/ext.rs
Normal file
25
src/datatypes/src/schema/ext.rs
Normal file
@@ -0,0 +1,25 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::extension::json;
|
||||
|
||||
pub trait ArrowSchemaExt {
|
||||
fn has_json_extension_field(&self) -> bool;
|
||||
}
|
||||
|
||||
impl ArrowSchemaExt for arrow_schema::Schema {
|
||||
fn has_json_extension_field(&self) -> bool {
|
||||
self.fields().iter().any(json::is_json_extension_type)
|
||||
}
|
||||
}
|
||||
@@ -12,12 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, LazyLock};
|
||||
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use arrow_schema::Fields;
|
||||
use common_base::bytes::Bytes;
|
||||
use regex::{Captures, Regex};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -33,6 +35,7 @@ use crate::type_id::LogicalTypeId;
|
||||
use crate::types::{ListType, StructField, StructType};
|
||||
use crate::value::Value;
|
||||
use crate::vectors::json::builder::JsonVectorBuilder;
|
||||
use crate::vectors::json::builder2::Json2VectorBuilder;
|
||||
use crate::vectors::{BinaryVectorBuilder, MutableVector};
|
||||
|
||||
pub const JSON_TYPE_NAME: &str = "Json";
|
||||
@@ -164,6 +167,7 @@ pub enum JsonFormat {
|
||||
#[default]
|
||||
Jsonb,
|
||||
Native(Box<JsonNativeType>),
|
||||
Json2,
|
||||
}
|
||||
|
||||
/// JsonType is a data type for JSON data. It is stored as binary data of jsonb format.
|
||||
@@ -192,6 +196,7 @@ impl JsonType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => &JsonNativeType::String,
|
||||
JsonFormat::Native(x) => x.as_ref(),
|
||||
JsonFormat::Json2 => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,15 +217,24 @@ impl JsonType {
|
||||
ConcreteDataType::Struct(t) => t.clone(),
|
||||
x => plain_json_struct_type(x),
|
||||
},
|
||||
JsonFormat::Json2 => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to merge this json type with others, error on datatype conflict.
|
||||
pub fn merge(&mut self, other: &JsonType) -> Result<()> {
|
||||
self.merge_with(other, false)
|
||||
}
|
||||
|
||||
pub fn merge_with_lifting(&mut self, other: &JsonType) -> Result<()> {
|
||||
self.merge_with(other, true)
|
||||
}
|
||||
|
||||
fn merge_with(&mut self, other: &JsonType, lift: bool) -> Result<()> {
|
||||
match (&self.format, &other.format) {
|
||||
(JsonFormat::Jsonb, JsonFormat::Jsonb) => Ok(()),
|
||||
(JsonFormat::Native(this), JsonFormat::Native(that)) => {
|
||||
let merged = merge(this.as_ref(), that.as_ref())?;
|
||||
let merged = merge(this.as_ref(), that.as_ref(), lift)?;
|
||||
self.format = JsonFormat::Native(Box::new(merged));
|
||||
Ok(())
|
||||
}
|
||||
@@ -313,13 +327,17 @@ fn is_mergeable(this: &JsonNativeType, that: &JsonNativeType) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
fn merge(this: &JsonNativeType, that: &JsonNativeType) -> Result<JsonNativeType> {
|
||||
fn merge_object(this: &JsonObjectType, that: &JsonObjectType) -> Result<JsonObjectType> {
|
||||
fn merge(this: &JsonNativeType, that: &JsonNativeType, lift: bool) -> Result<JsonNativeType> {
|
||||
fn merge_object(
|
||||
this: &JsonObjectType,
|
||||
that: &JsonObjectType,
|
||||
lift: bool,
|
||||
) -> Result<JsonObjectType> {
|
||||
let mut this = this.clone();
|
||||
// merge "that" into "this" directly:
|
||||
for (type_name, that_type) in that {
|
||||
if let Some(this_type) = this.get_mut(type_name) {
|
||||
let merged_type = merge(this_type, that_type)?;
|
||||
let merged_type = merge(this_type, that_type, lift)?;
|
||||
*this_type = merged_type;
|
||||
} else {
|
||||
this.insert(type_name.clone(), that_type.clone());
|
||||
@@ -331,16 +349,45 @@ fn merge(this: &JsonNativeType, that: &JsonNativeType) -> Result<JsonNativeType>
|
||||
match (this, that) {
|
||||
(this, that) if this == that => Ok(this.clone()),
|
||||
(JsonNativeType::Array(this), JsonNativeType::Array(that)) => {
|
||||
merge(this.as_ref(), that.as_ref()).map(|x| JsonNativeType::Array(Box::new(x)))
|
||||
merge(this.as_ref(), that.as_ref(), lift).map(|x| JsonNativeType::Array(Box::new(x)))
|
||||
}
|
||||
(JsonNativeType::Object(this), JsonNativeType::Object(that)) => {
|
||||
merge_object(this, that).map(JsonNativeType::Object)
|
||||
merge_object(this, that, lift).map(JsonNativeType::Object)
|
||||
}
|
||||
(JsonNativeType::Null, x) | (x, JsonNativeType::Null) => Ok(x.clone()),
|
||||
_ => MergeJsonDatatypeSnafu {
|
||||
reason: format!("datatypes have conflict, this: {this}, that: {that}"),
|
||||
_ => {
|
||||
if lift {
|
||||
Ok(JsonNativeType::String)
|
||||
} else {
|
||||
MergeJsonDatatypeSnafu {
|
||||
reason: format!("datatypes have conflict, this: {this}, that: {that}"),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge_as_json_type<'a>(
|
||||
left: &'a ArrowDataType,
|
||||
right: &ArrowDataType,
|
||||
) -> Cow<'a, ArrowDataType> {
|
||||
if left == right {
|
||||
return Cow::Borrowed(left);
|
||||
}
|
||||
|
||||
let mut left = JsonType::from(left);
|
||||
let right = JsonType::from(right);
|
||||
Cow::Owned(if left.merge_with_lifting(&right).is_ok() {
|
||||
left.as_arrow_type()
|
||||
} else {
|
||||
ArrowDataType::Utf8
|
||||
})
|
||||
}
|
||||
|
||||
impl From<&ArrowDataType> for JsonType {
|
||||
fn from(t: &ArrowDataType) -> Self {
|
||||
JsonType::new_native(JsonNativeType::from(&ConcreteDataType::from_arrow_type(t)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,6 +396,7 @@ impl DataType for JsonType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => JSON_TYPE_NAME.to_string(),
|
||||
JsonFormat::Native(x) => format!("Json<{x}>"),
|
||||
JsonFormat::Json2 => "JSON2".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -364,6 +412,7 @@ impl DataType for JsonType {
|
||||
match self.format {
|
||||
JsonFormat::Jsonb => ArrowDataType::Binary,
|
||||
JsonFormat::Native(_) => self.as_struct_type().as_arrow_type(),
|
||||
JsonFormat::Json2 => ArrowDataType::Struct(Fields::empty()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,6 +420,7 @@ impl DataType for JsonType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => Box::new(BinaryVectorBuilder::with_capacity(capacity)),
|
||||
JsonFormat::Native(x) => Box::new(JsonVectorBuilder::new(*x.clone(), capacity)),
|
||||
JsonFormat::Json2 => Box::new(Json2VectorBuilder::new(JsonNativeType::Null, capacity)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3206,7 +3206,7 @@ pub(crate) mod tests {
|
||||
]
|
||||
.into(),
|
||||
)),
|
||||
48,
|
||||
56,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -13,3 +13,4 @@
|
||||
// limitations under the License.
|
||||
|
||||
pub(crate) mod builder;
|
||||
pub(crate) mod builder2;
|
||||
|
||||
163
src/datatypes/src/vectors/json/builder2.rs
Normal file
163
src/datatypes/src/vectors/json/builder2.rs
Normal file
@@ -0,0 +1,163 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use crate::data_type::ConcreteDataType;
|
||||
use crate::error::{Result, TryFromValueSnafu, UnsupportedOperationSnafu};
|
||||
use crate::json::value::{JsonValue, JsonValueRef, JsonVariant};
|
||||
use crate::prelude::{ValueRef, Vector, VectorRef};
|
||||
use crate::types::JsonType;
|
||||
use crate::types::json_type::JsonNativeType;
|
||||
use crate::vectors::{MutableVector, StructVectorBuilder};
|
||||
|
||||
pub(crate) struct Json2VectorBuilder {
|
||||
merged_type: JsonType,
|
||||
capacity: usize,
|
||||
values: Vec<JsonValue>,
|
||||
}
|
||||
|
||||
impl Json2VectorBuilder {
|
||||
pub(crate) fn new(json_type: JsonNativeType, capacity: usize) -> Self {
|
||||
Self {
|
||||
merged_type: JsonType::new_native(json_type),
|
||||
capacity,
|
||||
values: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
fn build(&self) -> VectorRef {
|
||||
let mut builder = StructVectorBuilder::with_type_and_capacity(
|
||||
self.merged_type.as_struct_type(),
|
||||
self.capacity,
|
||||
);
|
||||
for value in self.values.iter() {
|
||||
let value = align_json_value_with_type(&self.merged_type, value);
|
||||
builder
|
||||
.try_push_value_ref(&(*value).as_ref().as_value_ref())
|
||||
// Safety: after the `align_json_value_with_type`, the values to push must have
|
||||
// the same types with the builder, so it's not expected to meet any errors here.
|
||||
.unwrap_or_else(|e| panic!("Failed to push JSON value {value}: {e:?}"));
|
||||
}
|
||||
builder.to_vector()
|
||||
}
|
||||
}
|
||||
|
||||
impl MutableVector for Json2VectorBuilder {
|
||||
fn data_type(&self) -> ConcreteDataType {
|
||||
ConcreteDataType::Json(self.merged_type.clone())
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.values.len()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_mut_any(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn to_vector(&mut self) -> VectorRef {
|
||||
self.build()
|
||||
}
|
||||
|
||||
fn to_vector_cloned(&self) -> VectorRef {
|
||||
self.build()
|
||||
}
|
||||
|
||||
fn try_push_value_ref(&mut self, value: &ValueRef) -> Result<()> {
|
||||
let ValueRef::Json(value) = value else {
|
||||
return TryFromValueSnafu {
|
||||
reason: format!("expected json value, got {value:?}"),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let json_type = value.json_type();
|
||||
self.merged_type.merge_with_lifting(json_type)?;
|
||||
|
||||
let value = JsonValue::from(value.clone().into_variant());
|
||||
self.values.push(value);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn push_null(&mut self) {
|
||||
static NULL_JSON: LazyLock<ValueRef> =
|
||||
LazyLock::new(|| ValueRef::Json(Box::new(JsonValueRef::null())));
|
||||
self.try_push_value_ref(&NULL_JSON)
|
||||
// Safety: learning from the method "try_push_value_ref", a null json value should be
|
||||
// always able to push into any json vectors.
|
||||
.unwrap_or_else(|e| panic!("failed to push null json value, error: {e}"));
|
||||
}
|
||||
|
||||
fn extend_slice_of(&mut self, _: &dyn Vector, _: usize, _: usize) -> Result<()> {
|
||||
UnsupportedOperationSnafu {
|
||||
op: "extend_slice_of",
|
||||
vector_type: "JsonVector",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
|
||||
fn align_json_value_with_type<'a>(
|
||||
expected_type: &JsonType,
|
||||
value: &'a JsonValue,
|
||||
) -> Cow<'a, JsonValue> {
|
||||
if value.json_type() == expected_type {
|
||||
return Cow::Borrowed(value);
|
||||
}
|
||||
|
||||
fn helper(expected_type: &JsonNativeType, value: JsonVariant) -> JsonVariant {
|
||||
match (expected_type, value) {
|
||||
(_, JsonVariant::Null) | (JsonNativeType::Null, _) => JsonVariant::Null,
|
||||
(JsonNativeType::Bool, JsonVariant::Bool(v)) => JsonVariant::Bool(v),
|
||||
(JsonNativeType::Number(_), JsonVariant::Number(v)) => JsonVariant::Number(v),
|
||||
(JsonNativeType::String, JsonVariant::String(v)) => JsonVariant::String(v),
|
||||
|
||||
(JsonNativeType::Array(item_type), JsonVariant::Array(items)) => JsonVariant::Array(
|
||||
items
|
||||
.into_iter()
|
||||
.map(|item| helper(item_type.as_ref(), item))
|
||||
.collect(),
|
||||
),
|
||||
|
||||
(JsonNativeType::Object(expected_fields), JsonVariant::Object(object)) => {
|
||||
JsonVariant::Object(
|
||||
expected_fields
|
||||
.iter()
|
||||
.map(|(field_name, expected_field_type)| {
|
||||
let value =
|
||||
object.get(field_name).cloned().unwrap_or(JsonVariant::Null);
|
||||
(field_name.clone(), helper(expected_field_type, value))
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
||||
(JsonNativeType::String, v) => {
|
||||
let json: serde_json::Value = JsonValue::from(v).into();
|
||||
JsonVariant::String(json.to_string())
|
||||
}
|
||||
|
||||
(t, v) => panic!("unsupported json alignment cast from {v} to {t}"),
|
||||
}
|
||||
}
|
||||
|
||||
let value = helper(expected_type.native_type(), value.clone().into_variant());
|
||||
Cow::Owned(JsonValue::from(value))
|
||||
}
|
||||
@@ -981,17 +981,17 @@ async fn test_list_ssts_with_format(
|
||||
#[tokio::test]
|
||||
async fn test_all_index_metas_list_all_types() {
|
||||
test_all_index_metas_list_all_types_with_format(false, r#"
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await;
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await;
|
||||
test_all_index_metas_list_all_types_with_format(true, r#"
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6500), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await;
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "bloom_filter", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 751, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":640,\"row_count\":20,\"rows_per_segment\":2,\"segment_count\":10}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "fulltext_bloom", target_type: "column", target_key: "4", target_json: "{\"column\":4}", blob_size: 89, meta_json: Some("{\"bloom\":{\"bloom_filter_size\":64,\"row_count\":20,\"rows_per_segment\":4,\"segment_count\":5},\"fulltext\":{\"analyzer\":\"English\",\"case_sensitive\":false}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "fulltext_tantivy", target_type: "column", target_key: "5", target_json: "{\"column\":5}", blob_size: 1100, meta_json: Some("{\"fulltext\":{\"analyzer\":\"Chinese\",\"case_sensitive\":true}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "1", target_json: "{\"column\":1}", blob_size: 518, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":150,\"inverted_index_size\":518,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }
|
||||
PuffinIndexMetaEntry { table_dir: "test/", index_file_path: "test/11_0000000001/index/<file_id>.puffin", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_file_size: Some(6600), index_type: "inverted", target_type: "column", target_key: "2", target_json: "{\"column\":2}", blob_size: 515, meta_json: Some("{\"inverted\":{\"base_offset\":0,\"bitmap_type\":\"Roaring\",\"fst_size\":147,\"inverted_index_size\":515,\"null_bitmap_size\":8,\"relative_fst_offset\":368,\"relative_null_bitmap_offset\":0,\"segment_row_count\":1024,\"total_row_count\":20}}"), node_id: None }"#).await;
|
||||
}
|
||||
|
||||
async fn test_all_index_metas_list_all_types_with_format(flat_format: bool, expect_format: &str) {
|
||||
|
||||
@@ -22,6 +22,7 @@ use std::time::Instant;
|
||||
|
||||
use common_telemetry::{debug, error, info};
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use either::Either;
|
||||
use partition::expr::PartitionExpr;
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
@@ -40,6 +41,7 @@ use crate::error::{
|
||||
RegionTruncatedSnafu, Result,
|
||||
};
|
||||
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
|
||||
use crate::memtable;
|
||||
use crate::memtable::bulk::ENCODE_ROW_THRESHOLD;
|
||||
use crate::memtable::{
|
||||
BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions,
|
||||
@@ -587,6 +589,7 @@ impl RegionFlushTask {
|
||||
&version.metadata,
|
||||
&FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
|
||||
);
|
||||
let batch_schema = maybe_merge_json_fields(batch_schema, &mem_ranges);
|
||||
let flat_sources = memtable_flat_sources(
|
||||
batch_schema,
|
||||
mem_ranges,
|
||||
@@ -762,6 +765,16 @@ struct FlatSources {
|
||||
encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>,
|
||||
}
|
||||
|
||||
fn maybe_merge_json_fields(base: SchemaRef, mem_ranges: &MemtableRanges) -> SchemaRef {
|
||||
if !base.has_json_extension_field() {
|
||||
return base;
|
||||
}
|
||||
let Some(schema) = mem_ranges.schema() else {
|
||||
return base;
|
||||
};
|
||||
memtable::merge_json_extension_fields(&base, &[schema])
|
||||
}
|
||||
|
||||
/// Returns the max sequence and [FlatSource] for the given memtable.
|
||||
fn memtable_flat_sources(
|
||||
schema: SchemaRef,
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Memtables are write buffers for regions.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
@@ -59,6 +60,10 @@ pub use bulk::part::{
|
||||
BulkPart, BulkPartEncoder, BulkPartMeta, UnorderedPart, record_batch_estimated_size,
|
||||
sort_primary_key_record_batch,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{Schema, SchemaRef};
|
||||
use datatypes::extension::json;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use datatypes::types::json_type;
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
pub use time_partition::filter_record_batch;
|
||||
|
||||
@@ -225,6 +230,55 @@ impl MemtableRanges {
|
||||
.max()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
pub(crate) fn schema(&self) -> Option<SchemaRef> {
|
||||
let mut schemas = self
|
||||
.ranges
|
||||
.values()
|
||||
.filter_map(|x| x.record_batch_schema())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if schemas.iter().all(|x| !x.has_json_extension_field()) {
|
||||
// If there are no JSON extension fields in any schemas, the invariant must be hold,
|
||||
// that all schemas are same (they are all derived from same region metadata).
|
||||
// So it's ok to return the first one as the schema of the whole memtable ranges.
|
||||
return (!schemas.is_empty()).then(|| schemas.swap_remove(0));
|
||||
}
|
||||
|
||||
// If there are JSON extension fields, by convention, only their concrete data types
|
||||
// (Arrow's Struct) may differ. Other things like the metadata or the fields count are same.
|
||||
// So to produce the final schema, we can solely merge the data types.
|
||||
schemas
|
||||
.split_first()
|
||||
.map(|(first, rest)| merge_json_extension_fields(first, rest))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn merge_json_extension_fields(base: &SchemaRef, others: &[SchemaRef]) -> SchemaRef {
|
||||
let mut fields = base.fields().iter().cloned().collect::<Vec<_>>();
|
||||
for (i, field) in fields.iter_mut().enumerate() {
|
||||
if !json::is_json_extension_type(field) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let merged = others
|
||||
.iter()
|
||||
.map(|x| Cow::Borrowed(x.field(i).data_type()))
|
||||
.reduce(|acc, e| {
|
||||
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
|
||||
});
|
||||
if let Some(merged) = merged
|
||||
&& field.data_type() != merged.as_ref()
|
||||
{
|
||||
let merged =
|
||||
json_type::merge_as_json_type(field.data_type(), merged.as_ref()).into_owned();
|
||||
|
||||
let mut new = field.as_ref().clone();
|
||||
new.set_data_type(merged);
|
||||
*field = Arc::new(new);
|
||||
}
|
||||
}
|
||||
Arc::new(Schema::new_with_metadata(fields, base.metadata().clone()))
|
||||
}
|
||||
|
||||
impl IterBuilder for MemtableRanges {
|
||||
@@ -552,6 +606,11 @@ pub trait IterBuilder: Send + Sync {
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Returns the schema of record batches produced by this iterator.
|
||||
fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Returns the [EncodedRange] if the range is already encoded into SST.
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
@@ -646,6 +705,11 @@ impl MemtableRange {
|
||||
self.context.builder.is_record_batch()
|
||||
}
|
||||
|
||||
/// Returns the schema of record batches if this range supports record batch iteration.
|
||||
pub fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.context.builder.record_batch_schema()
|
||||
}
|
||||
|
||||
pub fn num_rows(&self) -> usize {
|
||||
self.stats.num_rows
|
||||
}
|
||||
|
||||
@@ -819,6 +819,10 @@ impl IterBuilder for BulkRangeIterBuilder {
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
}
|
||||
|
||||
fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
Some(self.part.batch.schema())
|
||||
}
|
||||
}
|
||||
|
||||
impl IterBuilder for MultiBulkRangeIterBuilder {
|
||||
@@ -850,6 +854,10 @@ impl IterBuilder for MultiBulkRangeIterBuilder {
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
}
|
||||
|
||||
fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.part.record_batch_schema()
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator builder for encoded bulk range
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Bulk part encoder/decoder.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -23,7 +24,7 @@ use api::v1::bulk_wal_entry::Body;
|
||||
use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
|
||||
use bytes::Bytes;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_recordbatch::DfRecordBatch as RecordBatch;
|
||||
use common_recordbatch::{DfRecordBatch as RecordBatch, recordbatch};
|
||||
use common_time::Timestamp;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datatypes::arrow;
|
||||
@@ -39,7 +40,9 @@ use datatypes::arrow::datatypes::{
|
||||
};
|
||||
use datatypes::arrow_array::BinaryArray;
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
|
||||
use datatypes::types::json_type;
|
||||
use datatypes::value::{Value, ValueRef};
|
||||
use datatypes::vectors::Helper;
|
||||
use mito_codec::key_values::{KeyValue, KeyValues, KeyValuesRef};
|
||||
@@ -62,7 +65,7 @@ use table::predicate::Predicate;
|
||||
use crate::error::{
|
||||
self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
|
||||
DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
|
||||
InvalidRequestSnafu, NewRecordBatchSnafu, Result, UnexpectedSnafu,
|
||||
InvalidRequestSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
|
||||
};
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
|
||||
@@ -436,11 +439,10 @@ impl UnorderedPart {
|
||||
return Ok(Some(self.parts[0].batch.clone()));
|
||||
}
|
||||
|
||||
// Get the schema from the first part
|
||||
// Get the schema from the first part and normalize JSON2 columns across all parts.
|
||||
let schema = self.parts[0].batch.schema();
|
||||
let (schema, batches) = normalize_json_columns_for_concat(schema, &self.parts)?;
|
||||
|
||||
// Concatenate all record batches
|
||||
let batches: Vec<RecordBatch> = self.parts.iter().map(|p| p.batch.clone()).collect();
|
||||
let concatenated =
|
||||
arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?;
|
||||
|
||||
@@ -477,6 +479,75 @@ impl UnorderedPart {
|
||||
self.max_timestamp = i64::MIN;
|
||||
self.max_sequence = 0;
|
||||
}
|
||||
|
||||
pub(crate) fn parts(&self) -> &[BulkPart] {
|
||||
&self.parts
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_json_columns_for_concat(
|
||||
base_schema: SchemaRef,
|
||||
parts: &[BulkPart],
|
||||
) -> Result<(SchemaRef, Vec<RecordBatch>)> {
|
||||
let mut merged_json_types = HashMap::new();
|
||||
for (index, field) in base_schema.fields().iter().enumerate() {
|
||||
if !is_json_extension_type(field) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let merged = parts
|
||||
.iter()
|
||||
.map(|x| Cow::Borrowed(x.batch.schema_ref().field(index).data_type()))
|
||||
.reduce(|acc, e| {
|
||||
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
|
||||
});
|
||||
if let Some(merged) = merged
|
||||
&& merged.as_ref() != field.data_type()
|
||||
{
|
||||
merged_json_types.insert(index, merged.into_owned());
|
||||
}
|
||||
}
|
||||
|
||||
if merged_json_types.is_empty() {
|
||||
let batches = parts.iter().map(|p| p.batch.clone()).collect();
|
||||
return Ok((base_schema, batches));
|
||||
}
|
||||
|
||||
let fields = base_schema
|
||||
.fields()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(index, field)| {
|
||||
if let Some(data_type) = merged_json_types.get(&index) {
|
||||
Arc::new(
|
||||
Field::new(field.name().clone(), data_type.clone(), field.is_nullable())
|
||||
.with_metadata(field.metadata().clone()),
|
||||
)
|
||||
} else {
|
||||
field.clone()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let normalized_schema = Arc::new(Schema::new(fields));
|
||||
|
||||
let mut normalized_batches = Vec::with_capacity(parts.len());
|
||||
for part in parts {
|
||||
let mut columns = Vec::with_capacity(part.batch.num_columns());
|
||||
for (index, column) in part.batch.columns().iter().enumerate() {
|
||||
if let Some(target_type) = merged_json_types.get(&index) {
|
||||
columns.push(
|
||||
recordbatch::align_json_array(column, target_type).context(RecordBatchSnafu)?,
|
||||
);
|
||||
} else {
|
||||
columns.push(column.clone());
|
||||
}
|
||||
}
|
||||
let batch = RecordBatch::try_new(normalized_schema.clone(), columns)
|
||||
.context(NewRecordBatchSnafu)?;
|
||||
normalized_batches.push(batch);
|
||||
}
|
||||
|
||||
Ok((normalized_schema, normalized_batches))
|
||||
}
|
||||
|
||||
/// More accurate estimation of the size of a record batch.
|
||||
@@ -693,7 +764,8 @@ impl BulkPartConverter {
|
||||
columns.push(values.sequence.to_arrow_array());
|
||||
columns.push(values.op_type.to_arrow_array());
|
||||
|
||||
let batch = RecordBatch::try_new(self.schema, columns).context(NewRecordBatchSnafu)?;
|
||||
let schema = align_schema_with_json_array(self.schema, &columns);
|
||||
let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
|
||||
// Sorts the record batch.
|
||||
let batch = sort_primary_key_record_batch(&batch)?;
|
||||
|
||||
@@ -708,6 +780,26 @@ impl BulkPartConverter {
|
||||
}
|
||||
}
|
||||
|
||||
fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
|
||||
if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
|
||||
return schema;
|
||||
}
|
||||
|
||||
let mut fields = Vec::with_capacity(schema.fields().len());
|
||||
for (field, array) in schema.fields().iter().zip(columns) {
|
||||
if !is_json_extension_type(field) {
|
||||
fields.push(field.clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut field = field.as_ref().clone();
|
||||
field.set_data_type(array.data_type().clone());
|
||||
fields.push(Arc::new(field));
|
||||
}
|
||||
|
||||
Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
|
||||
}
|
||||
|
||||
fn new_primary_key_column_builders(
|
||||
metadata: &RegionMetadata,
|
||||
capacity: usize,
|
||||
@@ -1346,6 +1438,11 @@ impl MultiBulkPart {
|
||||
self.series_count
|
||||
}
|
||||
|
||||
/// Returns the schema of batches in this part.
|
||||
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.batches.first().map(|batch| batch.schema())
|
||||
}
|
||||
|
||||
/// Returns the number of record batches in this part.
|
||||
pub fn num_batches(&self) -> usize {
|
||||
self.batches.len()
|
||||
|
||||
@@ -917,7 +917,9 @@ impl ValueBuilder {
|
||||
size += field_value.data_size();
|
||||
if !field_value.is_null() || self.fields[idx].is_some() {
|
||||
if let Some(field) = self.fields[idx].as_mut() {
|
||||
let _ = field.push(field_value);
|
||||
field
|
||||
.push(field_value)
|
||||
.unwrap_or_else(|e| panic!("Failed to push field value: {e:?}"));
|
||||
} else {
|
||||
let mut mutable_vector =
|
||||
if let ConcreteDataType::String(_) = &self.field_types[idx] {
|
||||
|
||||
@@ -40,6 +40,7 @@ use crate::sst::{
|
||||
///
|
||||
/// This mapper support duplicate and unsorted projection indices.
|
||||
/// The output schema is determined by the projection indices.
|
||||
#[derive(Clone)]
|
||||
pub struct FlatProjectionMapper {
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
@@ -237,6 +238,10 @@ impl FlatProjectionMapper {
|
||||
self.output_schema.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn with_output_schema(&mut self, output_schema: SchemaRef) {
|
||||
self.output_schema = output_schema;
|
||||
}
|
||||
|
||||
/// Returns an empty [RecordBatch].
|
||||
pub(crate) fn empty_record_batch(&self) -> RecordBatch {
|
||||
RecordBatch::new_empty(self.output_schema.clone())
|
||||
|
||||
@@ -40,6 +40,7 @@ use crate::read::flat_projection::FlatProjectionMapper;
|
||||
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
|
||||
|
||||
/// Wrapper enum for different projection mapper implementations.
|
||||
#[derive(Clone)]
|
||||
pub enum ProjectionMapper {
|
||||
/// Projection mapper for primary key format.
|
||||
PrimaryKey(PrimaryKeyProjectionMapper),
|
||||
@@ -148,6 +149,12 @@ impl ProjectionMapper {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_flat_output_schema(&mut self, output_schema: SchemaRef) {
|
||||
if let ProjectionMapper::Flat(m) = self {
|
||||
m.with_output_schema(output_schema)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an empty [RecordBatch].
|
||||
// TODO(yingwen): This is unused now. Use it after we finishing the flat format.
|
||||
pub fn empty_record_batch(&self) -> RecordBatch {
|
||||
@@ -159,6 +166,7 @@ impl ProjectionMapper {
|
||||
}
|
||||
|
||||
/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
|
||||
#[derive(Clone)]
|
||||
pub struct PrimaryKeyProjectionMapper {
|
||||
/// Metadata of the region.
|
||||
metadata: RegionMetadataRef,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Structs for partition ranges.
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
use store_api::region_engine::PartitionRange;
|
||||
use store_api::storage::TimeSeriesDistribution;
|
||||
@@ -477,6 +478,11 @@ impl MemRangeBuilder {
|
||||
pub(crate) fn stats(&self) -> &MemtableStats {
|
||||
&self.stats
|
||||
}
|
||||
|
||||
/// Returns the record batch schema for this memtable range if available.
|
||||
pub(crate) fn record_batch_schema(&self) -> Option<SchemaRef> {
|
||||
self.range.record_batch_schema()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Scans a region according to the scan request.
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU64;
|
||||
@@ -30,6 +31,10 @@ use common_time::range::TimestampRange;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::Expr;
|
||||
use datafusion_expr::utils::expr_to_columns;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::extension::json::is_json_extension_type;
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::types::json_type;
|
||||
use futures::StreamExt;
|
||||
use partition::expr::PartitionExpr;
|
||||
use smallvec::SmallVec;
|
||||
@@ -566,6 +571,8 @@ impl ScanRegion {
|
||||
} else {
|
||||
input
|
||||
};
|
||||
|
||||
let input = maybe_concretize_flat_json2_schema(input);
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
@@ -792,6 +799,59 @@ impl ScanRegion {
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_concretize_flat_json2_schema(input: ScanInput) -> ScanInput {
|
||||
let Some(flat_mapper) = input.mapper.as_flat() else {
|
||||
return input;
|
||||
};
|
||||
let output_schema = flat_mapper.output_schema();
|
||||
let output_arrow_schema = output_schema.arrow_schema();
|
||||
|
||||
let mem_schemas: Vec<_> = input
|
||||
.memtables
|
||||
.iter()
|
||||
.filter_map(|mem| mem.record_batch_schema())
|
||||
.collect();
|
||||
if mem_schemas.is_empty() {
|
||||
return input;
|
||||
}
|
||||
|
||||
let mut column_schemas = output_schema.column_schemas().to_vec();
|
||||
let mut changed = false;
|
||||
for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
|
||||
let output_field = &output_arrow_schema.fields()[idx];
|
||||
if !is_json_extension_type(output_field) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let merged = mem_schemas
|
||||
.iter()
|
||||
.filter_map(|x| {
|
||||
x.column_with_name(&column_schema.name)
|
||||
.map(|(_, f)| Cow::Borrowed(f.data_type()))
|
||||
})
|
||||
.reduce(|acc, e| {
|
||||
Cow::Owned(json_type::merge_as_json_type(acc.as_ref(), e.as_ref()).into_owned())
|
||||
});
|
||||
if let Some(merged) = merged
|
||||
&& merged.as_ref() != output_field.data_type()
|
||||
{
|
||||
column_schema.data_type = ConcreteDataType::from_arrow_type(merged.as_ref());
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
if changed {
|
||||
let mut mapper = Arc::unwrap_or_clone(input.mapper);
|
||||
mapper.with_flat_output_schema(Arc::new(Schema::new(column_schemas)));
|
||||
ScanInput {
|
||||
mapper: Arc::new(mapper),
|
||||
..input
|
||||
}
|
||||
} else {
|
||||
input
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the time range of a SST `file` matches the `predicate`.
|
||||
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
|
||||
if predicate == &TimestampRange::min_to_max() {
|
||||
|
||||
@@ -87,6 +87,7 @@ impl FlatWriteFormat {
|
||||
}
|
||||
|
||||
/// Gets the arrow schema to store in parquet.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn arrow_schema(&self) -> &SchemaRef {
|
||||
&self.arrow_schema
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ use datatypes::arrow::error::ArrowError;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::DataType;
|
||||
use datatypes::schema::ext::ArrowSchemaExt;
|
||||
use mito_codec::row_converter::build_primary_key_codec;
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
|
||||
@@ -412,7 +413,11 @@ impl ParquetReaderBuilder {
|
||||
let projection_mask = ProjectionMask::roots(parquet_schema_desc, indices.iter().copied());
|
||||
|
||||
// Computes the field levels.
|
||||
let hint = Some(read_format.arrow_schema().fields());
|
||||
let hint = if read_format.arrow_schema().has_json_extension_field() {
|
||||
None
|
||||
} else {
|
||||
Some(read_format.arrow_schema().fields())
|
||||
};
|
||||
let field_levels =
|
||||
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
|
||||
.context(ReadDataPartSnafu)?;
|
||||
|
||||
@@ -450,7 +450,7 @@ where
|
||||
let arrow_batch = flat_format.convert_batch(&record_batch)?;
|
||||
|
||||
let start = Instant::now();
|
||||
self.maybe_init_writer(flat_format.arrow_schema(), opts)
|
||||
self.maybe_init_writer(arrow_batch.schema_ref(), opts)
|
||||
.await?
|
||||
.write(&arrow_batch)
|
||||
.await
|
||||
|
||||
@@ -301,12 +301,22 @@ impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
|
||||
.or_insert_with(|| value_type.clone());
|
||||
|
||||
if !merged_type.is_include(&value_type) {
|
||||
merged_type.merge(&value_type).map_err(|e| {
|
||||
if column_schema
|
||||
.data_type
|
||||
.as_json()
|
||||
.map(|x| x.is_native_type())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
merged_type.merge(&value_type)
|
||||
} else {
|
||||
merged_type.merge_with_lifting(&value_type)
|
||||
}
|
||||
.map_err(|e| {
|
||||
InvalidInsertRequestSnafu {
|
||||
reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
})?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -323,7 +333,17 @@ impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
|
||||
for (column_name, merged_type) in self.merged_value_types.iter() {
|
||||
let Some(column_type) = insert_columns
|
||||
.iter()
|
||||
.find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json()))
|
||||
.find_map(|x| {
|
||||
(&x.name == column_name).then(|| {
|
||||
if let ConcreteDataType::Json(t) = &x.data_type
|
||||
&& t.is_native_type()
|
||||
{
|
||||
Some(t)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
})
|
||||
.flatten()
|
||||
else {
|
||||
continue;
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Planner, QueryEngine implementations based on DataFusion.
|
||||
|
||||
mod error;
|
||||
mod json2_expr_planner;
|
||||
mod planner;
|
||||
|
||||
use std::any::Any;
|
||||
|
||||
54
src/query/src/datafusion/json2_expr_planner.rs
Normal file
54
src/query/src/datafusion/json2_expr_planner.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_schema::Field;
|
||||
use arrow_schema::extension::ExtensionType;
|
||||
use common_function::scalars::json::json2_get::Json2GetFunction;
|
||||
use common_function::scalars::udf::create_udf;
|
||||
use datafusion_common::{Column, Result, ScalarValue, TableReference};
|
||||
use datafusion_expr::Expr;
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::planner::{ExprPlanner, PlannerResult};
|
||||
use datatypes::extension::json::JsonExtensionType;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Json2ExprPlanner;
|
||||
|
||||
fn json2_get(base: Expr, path: String) -> Expr {
|
||||
let args = vec![base, Expr::Literal(ScalarValue::Utf8(Some(path)), None)];
|
||||
let function = create_udf(Arc::new(Json2GetFunction::default()));
|
||||
Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(function), args))
|
||||
}
|
||||
|
||||
impl ExprPlanner for Json2ExprPlanner {
|
||||
fn plan_compound_identifier(
|
||||
&self,
|
||||
field: &Field,
|
||||
qualifier: Option<&TableReference>,
|
||||
nested_names: &[String],
|
||||
) -> Result<PlannerResult<Vec<Expr>>> {
|
||||
if field.extension_type_name() != Some(JsonExtensionType::NAME) {
|
||||
return Ok(PlannerResult::Original(Vec::new()));
|
||||
}
|
||||
|
||||
let path = nested_names.join(".");
|
||||
let column = Column::from((qualifier, field));
|
||||
Ok(PlannerResult::Planned(json2_get(
|
||||
Expr::Column(column),
|
||||
path,
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -38,6 +38,7 @@ use datafusion_sql::parser::Statement as DfStatement;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{Location, ResultExt};
|
||||
|
||||
use crate::datafusion::json2_expr_planner::Json2ExprPlanner;
|
||||
use crate::error::{CatalogSnafu, Result};
|
||||
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
|
||||
|
||||
@@ -87,6 +88,9 @@ impl DfContextProviderAdapter {
|
||||
.map(|format| (format.get_ext().to_lowercase(), format))
|
||||
.collect();
|
||||
|
||||
let mut expr_planners = SessionStateDefaults::default_expr_planners();
|
||||
expr_planners.insert(0, Arc::new(Json2ExprPlanner));
|
||||
|
||||
Ok(Self {
|
||||
engine_state,
|
||||
session_state,
|
||||
@@ -94,7 +98,7 @@ impl DfContextProviderAdapter {
|
||||
table_provider,
|
||||
query_ctx,
|
||||
file_formats,
|
||||
expr_planners: SessionStateDefaults::default_expr_planners(),
|
||||
expr_planners,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,7 +153,16 @@ pub fn column_to_schema(
|
||||
|
||||
column_schema.set_inverted_index(column.extensions.inverted_index_options.is_some());
|
||||
|
||||
if matches!(column.data_type(), SqlDataType::JSON) {
|
||||
let is_json2_column = if let SqlDataType::Custom(object_name, _) = column.data_type() {
|
||||
object_name
|
||||
.0
|
||||
.first()
|
||||
.map(|x| x.to_string_unquoted().eq_ignore_ascii_case("JSON2"))
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
if is_json2_column || matches!(column.data_type(), SqlDataType::JSON) {
|
||||
let settings = column
|
||||
.extensions
|
||||
.build_json_structure_settings()?
|
||||
@@ -290,22 +299,25 @@ pub fn sql_data_type_to_concrete_data_type(
|
||||
};
|
||||
Ok(ConcreteDataType::Json(JsonType::new(format)))
|
||||
}
|
||||
// Vector type
|
||||
SqlDataType::Custom(name, d)
|
||||
if name.0.as_slice().len() == 1
|
||||
&& name.0.as_slice()[0]
|
||||
.to_string_unquoted()
|
||||
.to_ascii_uppercase()
|
||||
== VECTOR_TYPE_NAME
|
||||
&& d.len() == 1 =>
|
||||
{
|
||||
let dim = d[0].parse().map_err(|e| {
|
||||
error::ParseSqlValueSnafu {
|
||||
msg: format!("Failed to parse vector dimension: {}", e),
|
||||
// Vector type and JSON2 type
|
||||
SqlDataType::Custom(name, d) if name.0.len() == 1 => {
|
||||
let name = name.0[0].to_string_unquoted().to_ascii_uppercase();
|
||||
match name.as_str() {
|
||||
VECTOR_TYPE_NAME if d.len() == 1 => {
|
||||
let dim = d[0].parse().map_err(|e| {
|
||||
error::ParseSqlValueSnafu {
|
||||
msg: format!(r#"Failed to parse vector dimension "{}": {}"#, d[0], e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
Ok(ConcreteDataType::vector_datatype(dim))
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
Ok(ConcreteDataType::vector_datatype(dim))
|
||||
"JSON2" => Ok(ConcreteDataType::Json(JsonType::new(JsonFormat::Json2))),
|
||||
_ => error::SqlTypeNotSupportedSnafu {
|
||||
t: data_type.clone(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
_ => error::SqlTypeNotSupportedSnafu {
|
||||
t: data_type.clone(),
|
||||
|
||||
@@ -377,32 +377,35 @@ impl ColumnExtensions {
|
||||
None
|
||||
};
|
||||
|
||||
options
|
||||
let format = options
|
||||
.get(JSON_OPT_FORMAT)
|
||||
.map(|format| match format {
|
||||
JSON_FORMAT_FULL_STRUCTURED => Ok(JsonStructureSettings::Structured(fields)),
|
||||
JSON_FORMAT_PARTIAL => {
|
||||
let fields = fields.map(|fields| {
|
||||
let mut fields = Arc::unwrap_or_clone(fields.fields());
|
||||
fields.push(datatypes::types::StructField::new(
|
||||
JsonStructureSettings::RAW_FIELD.to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
));
|
||||
StructType::new(Arc::new(fields))
|
||||
});
|
||||
Ok(JsonStructureSettings::PartialUnstructuredByKey {
|
||||
fields,
|
||||
unstructured_keys,
|
||||
})
|
||||
.unwrap_or(JSON_FORMAT_FULL_STRUCTURED);
|
||||
let settings = match format {
|
||||
JSON_FORMAT_FULL_STRUCTURED => JsonStructureSettings::Structured(fields),
|
||||
JSON_FORMAT_PARTIAL => {
|
||||
let fields = fields.map(|fields| {
|
||||
let mut fields = Arc::unwrap_or_clone(fields.fields());
|
||||
fields.push(datatypes::types::StructField::new(
|
||||
JsonStructureSettings::RAW_FIELD.to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
));
|
||||
StructType::new(Arc::new(fields))
|
||||
});
|
||||
JsonStructureSettings::PartialUnstructuredByKey {
|
||||
fields,
|
||||
unstructured_keys,
|
||||
}
|
||||
JSON_FORMAT_RAW => Ok(JsonStructureSettings::UnstructuredRaw),
|
||||
_ => InvalidSqlSnafu {
|
||||
}
|
||||
JSON_FORMAT_RAW => JsonStructureSettings::UnstructuredRaw,
|
||||
_ => {
|
||||
return InvalidSqlSnafu {
|
||||
msg: format!("unknown JSON datatype 'format': {format}"),
|
||||
}
|
||||
.fail(),
|
||||
})
|
||||
.transpose()
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
Ok(Some(settings))
|
||||
}
|
||||
|
||||
pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
|
||||
|
||||
@@ -1,82 +0,0 @@
|
||||
CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured") DEFAULT '{"foo": "bar"}');
|
||||
|
||||
Error: 1001(Unsupported), Unsupported default constraint for column: 'j', reason: json column cannot have a default value
|
||||
|
||||
CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured"));
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<"<Null>"> | | YES | | FIELD |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES
|
||||
(1762128001000, '{"int": 1}'),
|
||||
(1762128002000, '{"int": 2, "list": [0.1, 0.2, 0.3]}'),
|
||||
(1762128003000, '{"int": 3, "list": [0.4, 0.5, 0.6], "nested": {"a": {"x": "hello"}, "b": {"y": -1}}}');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<{"int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>"},"b":{"y":"<Number>"}}}> | | YES | | FIELD |
|
||||
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES
|
||||
(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'),
|
||||
(1762128005000, '{"int": 5, "bool": false, "nested": {"b": {"x": "world"}}}');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES (1762128011000, '{}');
|
||||
|
||||
Error: 1004(InvalidArguments), Invalid InsertRequest, reason: empty json object is not supported, consider adding a dummy field
|
||||
|
||||
SELECT ts, j FROM t order by ts;
|
||||
|
||||
+---------------------+----------------------------------------------------------------------------------------+
|
||||
| ts | j |
|
||||
+---------------------+----------------------------------------------------------------------------------------+
|
||||
| 2025-11-03T00:00:01 | {bool: , int: 1, list: , nested: } |
|
||||
| 2025-11-03T00:00:02 | {bool: , int: 2, list: [0.1, 0.2, 0.3], nested: } |
|
||||
| 2025-11-03T00:00:03 | {bool: , int: 3, list: [0.4, 0.5, 0.6], nested: {a: {x: hello, y: }, b: {x: , y: -1}}} |
|
||||
| 2025-11-03T00:00:04 | {bool: true, int: 4, list: , nested: {a: {x: , y: 1}, b: }} |
|
||||
| 2025-11-03T00:00:05 | {bool: false, int: 5, list: , nested: {a: , b: {x: world, y: }}} |
|
||||
| 2025-11-03T00:00:06 | {bool: true, int: 6, list: [-6.0], nested: {a: {x: ax, y: 66}, b: {x: bx, y: -66}}} |
|
||||
+---------------------+----------------------------------------------------------------------------------------+
|
||||
|
||||
DROP table t;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured") DEFAULT '{"foo": "bar"}');
|
||||
|
||||
CREATE TABLE t (ts TIMESTAMP TIME INDEX, j JSON(format = "structured"));
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
INSERT INTO t VALUES
|
||||
(1762128001000, '{"int": 1}'),
|
||||
(1762128002000, '{"int": 2, "list": [0.1, 0.2, 0.3]}'),
|
||||
(1762128003000, '{"int": 3, "list": [0.4, 0.5, 0.6], "nested": {"a": {"x": "hello"}, "b": {"y": -1}}}');
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
INSERT INTO t VALUES
|
||||
(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'),
|
||||
(1762128005000, '{"int": 5, "bool": false, "nested": {"b": {"x": "world"}}}');
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}');
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
INSERT INTO t VALUES (1762128011000, '{}');
|
||||
|
||||
SELECT ts, j FROM t order by ts;
|
||||
|
||||
DROP table t;
|
||||
129
tests/cases/standalone/common/types/json/json2.result
Normal file
129
tests/cases/standalone/common/types/json/json2.result
Normal file
@@ -0,0 +1,129 @@
|
||||
create table json2_table
|
||||
(
|
||||
ts timestamp time index,
|
||||
j json2
|
||||
) with (
|
||||
'append_mode' = 'true',
|
||||
'sst_format' = 'flat',
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into json2_table (ts, j)
|
||||
values (1, '{"a": {"b": 1}, "c": "s1"}'),
|
||||
(2, '{"a": {"b": 2}, "c": "s2"}');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
admin flush_table('json2_table');
|
||||
|
||||
+----------------------------------+
|
||||
| ADMIN flush_table('json2_table') |
|
||||
+----------------------------------+
|
||||
| 0 |
|
||||
+----------------------------------+
|
||||
|
||||
insert into json2_table (ts, j)
|
||||
values (3, '{"a": {"b": 3}, "c": "s3"}');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
insert into json2_table
|
||||
values (4, '{"a": {"b": 4}}'),
|
||||
(5, '{"a": {}, "c": "s5"}'),
|
||||
(6, '{"c": "s6"}');
|
||||
|
||||
Affected Rows: 3
|
||||
|
||||
admin flush_table('json2_table');
|
||||
|
||||
+----------------------------------+
|
||||
| ADMIN flush_table('json2_table') |
|
||||
+----------------------------------+
|
||||
| 0 |
|
||||
+----------------------------------+
|
||||
|
||||
insert into json2_table
|
||||
values (7, '{"a": {"b": "s7"}, "c": [1]}'),
|
||||
(8, '{"a": {"b": 8}, "c": "s8"}');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
insert into json2_table
|
||||
values (9, '{"a": {"x": true}, "c": "s9"}'),
|
||||
(10, '{"a": {"b": 10}, "y": false}');
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
select j.a.b from json2_table order by ts;
|
||||
|
||||
+--------------------------------------+
|
||||
| json2_get(json2_table.j,Utf8("a.b")) |
|
||||
+--------------------------------------+
|
||||
| 1 |
|
||||
| 2 |
|
||||
| 3 |
|
||||
| 4 |
|
||||
| |
|
||||
| |
|
||||
| s7 |
|
||||
| 8 |
|
||||
| |
|
||||
| 10 |
|
||||
+--------------------------------------+
|
||||
|
||||
select j.a.x from json2_table order by ts;
|
||||
|
||||
+--------------------------------------+
|
||||
| json2_get(json2_table.j,Utf8("a.x")) |
|
||||
+--------------------------------------+
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| true |
|
||||
| |
|
||||
+--------------------------------------+
|
||||
|
||||
select j.c from json2_table order by ts;
|
||||
|
||||
+------------------------------------+
|
||||
| json2_get(json2_table.j,Utf8("c")) |
|
||||
+------------------------------------+
|
||||
| s1 |
|
||||
| s2 |
|
||||
| s3 |
|
||||
| |
|
||||
| s5 |
|
||||
| s6 |
|
||||
| [1] |
|
||||
| s8 |
|
||||
| s9 |
|
||||
| |
|
||||
+------------------------------------+
|
||||
|
||||
select j.y from json2_table order by ts;
|
||||
|
||||
+------------------------------------+
|
||||
| json2_get(json2_table.j,Utf8("y")) |
|
||||
+------------------------------------+
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| |
|
||||
| false |
|
||||
+------------------------------------+
|
||||
|
||||
drop table json2_table;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
42
tests/cases/standalone/common/types/json/json2.sql
Normal file
42
tests/cases/standalone/common/types/json/json2.sql
Normal file
@@ -0,0 +1,42 @@
|
||||
create table json2_table
|
||||
(
|
||||
ts timestamp time index,
|
||||
j json2
|
||||
) with (
|
||||
'append_mode' = 'true',
|
||||
'sst_format' = 'flat',
|
||||
);
|
||||
|
||||
insert into json2_table (ts, j)
|
||||
values (1, '{"a": {"b": 1}, "c": "s1"}'),
|
||||
(2, '{"a": {"b": 2}, "c": "s2"}');
|
||||
|
||||
admin flush_table('json2_table');
|
||||
|
||||
insert into json2_table (ts, j)
|
||||
values (3, '{"a": {"b": 3}, "c": "s3"}');
|
||||
|
||||
insert into json2_table
|
||||
values (4, '{"a": {"b": 4}}'),
|
||||
(5, '{"a": {}, "c": "s5"}'),
|
||||
(6, '{"c": "s6"}');
|
||||
|
||||
admin flush_table('json2_table');
|
||||
|
||||
insert into json2_table
|
||||
values (7, '{"a": {"b": "s7"}, "c": [1]}'),
|
||||
(8, '{"a": {"b": 8}, "c": "s8"}');
|
||||
|
||||
insert into json2_table
|
||||
values (9, '{"a": {"x": true}, "c": "s9"}'),
|
||||
(10, '{"a": {"b": 10}, "y": false}');
|
||||
|
||||
select j.a.b from json2_table order by ts;
|
||||
|
||||
select j.a.x from json2_table order by ts;
|
||||
|
||||
select j.c from json2_table order by ts;
|
||||
|
||||
select j.y from json2_table order by ts;
|
||||
|
||||
drop table json2_table;
|
||||
Reference in New Issue
Block a user