feat: add greptime's arrow json extension type (#7168)

* feat: add arrow json extension type

* feat: add json structure settings to extension type

* refactor: store json structure settings as extension metadata

* chore: make binary an acceptable type for extension
This commit is contained in:
Ning Sun
2025-11-06 02:34:57 +08:00
committed by GitHub
parent 0939dc1d32
commit fb92e4d0b2
11 changed files with 204 additions and 25 deletions

1
Cargo.lock generated
View File

@@ -214,6 +214,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
name = "api"
version = "0.18.0"
dependencies = [
"arrow-schema",
"common-base",
"common-decimal",
"common-error",

View File

@@ -8,6 +8,7 @@ license.workspace = true
workspace = true
[dependencies]
arrow-schema.workspace = true
common-base.workspace = true
common-decimal.workspace = true
common-error.workspace = true

View File

@@ -14,10 +14,11 @@
use std::collections::HashMap;
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, JSON_STRUCTURE_SETTINGS_KEY,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -68,8 +69,14 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.to_owned());
}
if let Some(settings) = options.options.get(JSON_STRUCTURE_SETTINGS_KEY) {
metadata.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone());
if let Some(extension_name) = options.options.get(EXTENSION_TYPE_NAME_KEY) {
metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), extension_name.clone());
}
if let Some(extension_metadata) = options.options.get(EXTENSION_TYPE_METADATA_KEY) {
metadata.insert(
EXTENSION_TYPE_METADATA_KEY.to_string(),
extension_metadata.clone(),
);
}
}
@@ -142,10 +149,16 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), skipping_index.clone());
}
if let Some(settings) = column_schema.metadata().get(JSON_STRUCTURE_SETTINGS_KEY) {
if let Some(extension_name) = column_schema.metadata().get(EXTENSION_TYPE_NAME_KEY) {
options
.options
.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone());
.insert(EXTENSION_TYPE_NAME_KEY.to_string(), extension_name.clone());
}
if let Some(extension_metadata) = column_schema.metadata().get(EXTENSION_TYPE_METADATA_KEY) {
options.options.insert(
EXTENSION_TYPE_METADATA_KEY.to_string(),
extension_metadata.clone(),
);
}
(!options.options.is_empty()).then_some(options)

View File

@@ -266,6 +266,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse or serialize arrow metadata"))]
ArrowMetadata {
#[snafu(source)]
error: arrow::error::ArrowError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -307,7 +315,8 @@ impl ErrorExt for Error {
| ConvertArrowArrayToScalars { .. }
| ConvertScalarToArrowArray { .. }
| ParseExtendedType { .. }
| InconsistentStructFieldsAndItems { .. } => StatusCode::Internal,
| InconsistentStructFieldsAndItems { .. }
| ArrowMetadata { .. } => StatusCode::Internal,
}
}

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod json;

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_schema::extension::ExtensionType;
use arrow_schema::{ArrowError, DataType};
use serde::{Deserialize, Serialize};
use crate::json::JsonStructureSettings;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct JsonMetadata {
/// Indicates how to handle JSON is stored in underlying data type
///
/// This field can be `None` for data is converted to complete structured in-memory form.
pub json_structure_settings: Option<JsonStructureSettings>,
}
#[derive(Debug, Clone)]
pub struct JsonExtensionType(Arc<JsonMetadata>);
impl JsonExtensionType {
pub fn new(metadata: Arc<JsonMetadata>) -> Self {
JsonExtensionType(metadata)
}
}
impl ExtensionType for JsonExtensionType {
const NAME: &'static str = "greptime.json";
type Metadata = Arc<JsonMetadata>;
fn metadata(&self) -> &Self::Metadata {
&self.0
}
fn serialize_metadata(&self) -> Option<String> {
serde_json::to_string(self.metadata()).ok()
}
fn deserialize_metadata(metadata: Option<&str>) -> Result<Self::Metadata, ArrowError> {
if let Some(metadata) = metadata {
let metadata = serde_json::from_str(metadata).map_err(|e| {
ArrowError::ParseError(format!("Failed to deserialize JSON metadata: {}", e))
})?;
Ok(Arc::new(metadata))
} else {
Ok(Arc::new(JsonMetadata::default()))
}
}
fn supports_data_type(&self, data_type: &DataType) -> Result<(), ArrowError> {
match data_type {
// object
DataType::Struct(_)
// array
| DataType::List(_)
| DataType::ListView(_)
| DataType::LargeList(_)
| DataType::LargeListView(_)
// string
| DataType::Utf8
| DataType::Utf8View
| DataType::LargeUtf8
// number
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float32
| DataType::Float64
// boolean
| DataType::Boolean
// null
| DataType::Null
// legacy json type
| DataType::Binary => Ok(()),
dt => Err(ArrowError::SchemaError(format!(
"Unexpected data type {dt}"
))),
}
}
fn try_new(data_type: &DataType, metadata: Self::Metadata) -> Result<Self, ArrowError> {
let json = Self(metadata);
json.supports_data_type(data_type)?;
Ok(json)
}
}

View File

@@ -19,6 +19,7 @@ pub mod arrow_array;
pub mod data_type;
pub mod duration;
pub mod error;
pub mod extension;
pub mod interval;
pub mod json;
pub mod macros;

View File

@@ -32,9 +32,8 @@ pub use crate::schema::column_schema::{
COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY,
JSON_STRUCTURE_SETTINGS_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType, TIME_INDEX_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

View File

@@ -17,13 +17,17 @@ use std::fmt;
use std::str::FromStr;
use arrow::datatypes::Field;
use arrow_schema::extension::{
EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY, ExtensionType,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use sqlparser_derive::{Visit, VisitMut};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result};
use crate::json::JsonStructureSettings;
use crate::error::{
self, ArrowMetadataSnafu, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result,
};
use crate::schema::TYPE_KEY;
use crate::schema::constraint::ColumnDefaultConstraint;
use crate::value::Value;
@@ -42,7 +46,6 @@ pub const FULLTEXT_KEY: &str = "greptime:fulltext";
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Key used to store skip options in arrow field's metadata.
pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
pub const JSON_STRUCTURE_SETTINGS_KEY: &str = "greptime:json:structure_settings";
/// Keys used in fulltext options
pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
@@ -394,18 +397,38 @@ impl ColumnSchema {
Ok(())
}
pub fn json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
self.metadata
.get(JSON_STRUCTURE_SETTINGS_KEY)
.map(|json| serde_json::from_str(json).context(error::DeserializeSnafu { json }))
.transpose()
pub fn extension_type<E>(&self) -> Result<Option<E>>
where
E: ExtensionType,
{
let extension_type_name = self.metadata.get(EXTENSION_TYPE_NAME_KEY);
if extension_type_name.map(|s| s.as_str()) == Some(E::NAME) {
let extension_metadata = self.metadata.get(EXTENSION_TYPE_METADATA_KEY);
let extension_metadata =
E::deserialize_metadata(extension_metadata.map(|s| s.as_str()))
.context(ArrowMetadataSnafu)?;
let extension = E::try_new(&self.data_type.as_arrow_type(), extension_metadata)
.context(ArrowMetadataSnafu)?;
Ok(Some(extension))
} else {
Ok(None)
}
}
pub fn with_json_structure_settings(&mut self, settings: &JsonStructureSettings) -> Result<()> {
self.metadata.insert(
JSON_STRUCTURE_SETTINGS_KEY.to_string(),
serde_json::to_string(settings).context(error::SerializeSnafu)?,
);
pub fn with_extension_type<E>(&mut self, extension_type: &E) -> Result<()>
where
E: ExtensionType,
{
self.metadata
.insert(EXTENSION_TYPE_NAME_KEY.to_string(), E::NAME.to_string());
if let Some(extension_metadata) = extension_type.serialize_metadata() {
self.metadata
.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), extension_metadata);
}
Ok(())
}
}

View File

@@ -16,7 +16,9 @@
use std::collections::HashMap;
use arrow_schema::extension::ExtensionType;
use common_meta::SchemaOptions;
use datatypes::extension::json::JsonExtensionType;
use datatypes::schema::{
COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND,
COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE,
@@ -159,7 +161,12 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Colu
extensions.inverted_index_options = Some(HashMap::new().into());
}
if let Some(settings) = column_schema.json_structure_settings()? {
if let Some(json_extension) = column_schema.extension_type::<JsonExtensionType>()? {
let settings = json_extension
.metadata()
.json_structure_settings
.clone()
.unwrap_or_default();
extensions.set_json_structure_settings(settings);
}

View File

@@ -32,10 +32,13 @@ pub mod tql;
pub(crate) mod transform;
pub mod truncate;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::SemanticType;
use common_sql::default_constraint::parse_column_default_constraint;
use common_time::timezone::Timezone;
use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::TimestampType;
@@ -149,8 +152,11 @@ pub fn column_to_schema(
.extensions
.build_json_structure_settings()?
.unwrap_or_default();
let extension = JsonExtensionType::new(Arc::new(JsonMetadata {
json_structure_settings: Some(settings.clone()),
}));
column_schema
.with_json_structure_settings(&settings)
.with_extension_type(&extension)
.with_context(|_| SetJsonStructureSettingsSnafu {
value: format!("{settings:?}"),
})?;