feat: create table with new json datatype (#7128)

* feat: create table with new json datatype

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-10-24 10:16:49 +08:00
committed by GitHub
parent 2f637a262e
commit b53a0b86fb
16 changed files with 226 additions and 14 deletions

1
Cargo.lock generated
View File

@@ -11964,6 +11964,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-sql",
"datatypes",
"either",
"hex",
"humantime",
"iso8601",

View File

@@ -16,8 +16,8 @@ use std::collections::HashMap;
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, JSON_STRUCTURE_SETTINGS_KEY,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -68,6 +68,9 @@ pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
if let Some(skipping_index) = options.options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), skipping_index.to_owned());
}
if let Some(settings) = options.options.get(JSON_STRUCTURE_SETTINGS_KEY) {
metadata.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone());
}
}
ColumnSchema::new(&column_def.name, data_type.into(), column_def.is_nullable)
@@ -139,6 +142,11 @@ pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<Column
.options
.insert(SKIPPING_INDEX_GRPC_KEY.to_string(), skipping_index.clone());
}
if let Some(settings) = column_schema.metadata().get(JSON_STRUCTURE_SETTINGS_KEY) {
options
.options
.insert(JSON_STRUCTURE_SETTINGS_KEY.to_string(), settings.clone());
}
(!options.options.is_empty()).then_some(options)
}

View File

@@ -28,7 +28,7 @@ pub async fn check_output_stream(output: OutputData, expected: &str) {
_ => unreachable!(),
};
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print);
assert_eq!(pretty_print, expected.trim(), "actual: \n{}", pretty_print);
}
pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {

View File

@@ -24,6 +24,7 @@ use std::sync::Arc;
use common_base::bytes::StringBytes;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value as Json};
use snafu::{ResultExt, ensure};
@@ -45,7 +46,7 @@ use crate::value::{ListValue, StructValue, Value};
/// convert them to fully structured StructValue for user-facing APIs: the UI protocol and the UDF interface.
///
/// **Important**: This settings only controls the internal form of JSON encoding.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JsonStructureSettings {
// TODO(sunng87): provide a limit
Structured(Option<StructType>),
@@ -111,6 +112,12 @@ impl JsonStructureSettings {
}
}
impl Default for JsonStructureSettings {
fn default() -> Self {
Self::Structured(None)
}
}
impl<'a> JsonContext<'a> {
/// Create a new context with an updated key path
pub fn with_key(&self, key: &str) -> JsonContext<'a> {

View File

@@ -32,8 +32,9 @@ pub use crate::schema::column_schema::{
COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_FULLTEXT_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_FALSE_POSITIVE_RATE, COLUMN_SKIPPING_INDEX_OPT_KEY_GRANULARITY,
COLUMN_SKIPPING_INDEX_OPT_KEY_TYPE, COMMENT_KEY, ColumnExtType, ColumnSchema, FULLTEXT_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata,
SKIPPING_INDEX_KEY, SkippingIndexOptions, SkippingIndexType, TIME_INDEX_KEY,
FulltextAnalyzer, FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY,
JSON_STRUCTURE_SETTINGS_KEY, Metadata, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType, TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

View File

@@ -23,6 +23,7 @@ use sqlparser_derive::{Visit, VisitMut};
use crate::data_type::{ConcreteDataType, DataType};
use crate::error::{self, Error, InvalidFulltextOptionSnafu, ParseExtendedTypeSnafu, Result};
use crate::json::JsonStructureSettings;
use crate::schema::TYPE_KEY;
use crate::schema::constraint::ColumnDefaultConstraint;
use crate::value::Value;
@@ -41,6 +42,7 @@ pub const FULLTEXT_KEY: &str = "greptime:fulltext";
pub const INVERTED_INDEX_KEY: &str = "greptime:inverted_index";
/// Key used to store skip options in arrow field's metadata.
pub const SKIPPING_INDEX_KEY: &str = "greptime:skipping_index";
pub const JSON_STRUCTURE_SETTINGS_KEY: &str = "greptime:json:structure_settings";
/// Keys used in fulltext options
pub const COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE: &str = "enable";
@@ -391,6 +393,21 @@ impl ColumnSchema {
self.metadata.remove(SKIPPING_INDEX_KEY);
Ok(())
}
pub fn json_structure_settings(&self) -> Result<Option<JsonStructureSettings>> {
self.metadata
.get(JSON_STRUCTURE_SETTINGS_KEY)
.map(|json| serde_json::from_str(json).context(error::DeserializeSnafu { json }))
.transpose()
}
pub fn with_json_structure_settings(&mut self, settings: &JsonStructureSettings) -> Result<()> {
self.metadata.insert(
JSON_STRUCTURE_SETTINGS_KEY.to_string(),
serde_json::to_string(settings).context(error::SerializeSnafu)?,
);
Ok(())
}
}
/// Column extended type set in column schema's metadata.

View File

@@ -15,6 +15,7 @@
use std::str::FromStr;
use arrow::datatypes::DataType as ArrowDataType;
use arrow_schema::Fields;
use common_base::bytes::Bytes;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -63,7 +64,10 @@ impl DataType for JsonType {
}
fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::Binary
match self.format {
JsonFormat::Jsonb => ArrowDataType::Binary,
JsonFormat::Native(_) => ArrowDataType::Struct(Fields::empty()),
}
}
fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {

View File

@@ -353,6 +353,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
Datatypes {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -406,9 +413,10 @@ impl ErrorExt for Error {
MissingTableMutationHandler { .. } => StatusCode::Unexpected,
GetRegionMetadata { .. } => StatusCode::RegionNotReady,
TableReadOnly { .. } => StatusCode::Unsupported,
GetFulltextOptions { source, .. } | GetSkippingIndexOptions { source, .. } => {
source.status_code()
}
GetFulltextOptions { source, .. }
| GetSkippingIndexOptions { source, .. }
| Datatypes { source, .. } => source.status_code(),
}
}

View File

@@ -159,6 +159,10 @@ fn create_column(column_schema: &ColumnSchema, quote_style: char) -> Result<Colu
extensions.inverted_index_options = Some(HashMap::new().into());
}
if let Some(settings) = column_schema.json_structure_settings()? {
extensions.set_json_structure_settings(settings);
}
Ok(Column {
column_def: ColumnDef {
name: Ident::with_quote(quote_style, name),

View File

@@ -29,6 +29,7 @@ datafusion-expr.workspace = true
datafusion-physical-expr.workspace = true
datafusion-sql.workspace = true
datatypes.workspace = true
either.workspace = true
hex = "0.4"
humantime.workspace = true
iso8601 = "0.6.1"

View File

@@ -332,6 +332,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to set JSON structure settings: {value}"))]
SetJsonStructureSettings {
value: String,
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -377,7 +385,9 @@ impl ErrorExt for Error {
#[cfg(feature = "enterprise")]
InvalidTriggerWebhookOption { .. } => StatusCode::InvalidArguments,
SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
SerializeColumnDefaultConstraint { source, .. }
| SetJsonStructureSettings { source, .. } => source.status_code(),
ConvertToGrpcDataType { source, .. } => source.status_code(),
SqlCommon { source, .. } => source.status_code(),
ConvertToDfStatement { .. } => StatusCode::Internal,

View File

@@ -49,8 +49,8 @@ use crate::ast::{
};
use crate::error::{
self, ConvertToGrpcDataTypeSnafu, ConvertValueSnafu, Result,
SerializeColumnDefaultConstraintSnafu, SetFulltextOptionSnafu, SetSkippingIndexOptionSnafu,
SqlCommonSnafu,
SerializeColumnDefaultConstraintSnafu, SetFulltextOptionSnafu, SetJsonStructureSettingsSnafu,
SetSkippingIndexOptionSnafu, SqlCommonSnafu,
};
use crate::statements::create::Column;
pub use crate::statements::option_map::OptionMap;
@@ -144,6 +144,18 @@ pub fn column_to_schema(
column_schema.set_inverted_index(column.extensions.inverted_index_options.is_some());
if matches!(column.data_type(), SqlDataType::JSON) {
let settings = column
.extensions
.build_json_structure_settings()?
.unwrap_or_default();
column_schema
.with_json_structure_settings(&settings)
.with_context(|_| SetJsonStructureSettingsSnafu {
value: format!("{settings:?}"),
})?;
}
Ok(column_schema)
}

View File

@@ -32,6 +32,7 @@ use crate::error::{
use crate::statements::OptionMap;
use crate::statements::statement::Statement;
use crate::statements::tql::Tql;
use crate::util::OptionValue;
const LINE_SEP: &str = ",\n";
const COMMA_SEP: &str = ", ";
@@ -166,7 +167,20 @@ impl Display for Column {
return Ok(());
}
write!(f, "{}", self.column_def)?;
write!(f, "{} {}", self.column_def.name, self.column_def.data_type)?;
if let Some(options) = &self.extensions.json_datatype_options {
write!(
f,
"({})",
options
.entries()
.map(|(k, v)| format!("{k} = {v}"))
.join(COMMA_SEP)
)?;
}
for option in &self.column_def.options {
write!(f, " {option}")?;
}
if let Some(fulltext_options) = &self.extensions.fulltext_index_options {
if !fulltext_options.is_empty() {
@@ -251,6 +265,34 @@ impl ColumnExtensions {
})
.transpose()
}
pub fn set_json_structure_settings(&mut self, settings: JsonStructureSettings) {
let mut map = OptionMap::default();
let format = match settings {
JsonStructureSettings::Structured(_) => JSON_FORMAT_FULL_STRUCTURED,
JsonStructureSettings::PartialUnstructuredByKey { .. } => JSON_FORMAT_PARTIAL,
JsonStructureSettings::UnstructuredRaw => JSON_FORMAT_RAW,
};
map.insert(JSON_OPT_FORMAT.to_string(), format.to_string());
if let JsonStructureSettings::PartialUnstructuredByKey {
fields: _,
unstructured_keys,
} = settings
{
let value = OptionValue::from(
unstructured_keys
.iter()
.map(|x| x.as_str())
.sorted()
.collect::<Vec<_>>(),
);
map.insert_options(JSON_OPT_UNSTRUCTURED_KEYS, value);
}
self.json_datatype_options = Some(map);
}
}
/// Partition on columns or values.

View File

@@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ops::ControlFlow;
use common_base::secrets::{ExposeSecret, ExposeSecretMut, SecretString};
use either::Either;
use serde::Serialize;
use sqlparser::ast::{Visit, VisitMut, Visitor, VisitorMut};
@@ -56,6 +57,17 @@ impl OptionMap {
}
}
pub fn insert_options(&mut self, key: &str, value: OptionValue) {
if REDACTED_OPTIONS.contains(&key) {
self.secrets.insert(
key.to_string(),
SecretString::new(Box::new(value.to_string())),
);
} else {
self.options.insert(key.to_string(), value);
}
}
pub fn get(&self, k: &str) -> Option<&str> {
if let Some(value) = self.options.get(k) {
value.as_string()
@@ -130,6 +142,18 @@ impl OptionMap {
}
result
}
pub fn entries(&self) -> impl Iterator<Item = (&str, Either<&OptionValue, &str>)> {
let options = self
.options
.iter()
.map(|(k, v)| (k.as_str(), Either::Left(v)));
let secrets = self
.secrets
.keys()
.map(|k| (k.as_str(), Either::Right("******")));
std::iter::chain(options, secrets)
}
}
impl<I: IntoIterator<Item = (String, String)>> From<I> for OptionMap {

View File

@@ -15,6 +15,7 @@
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use itertools::Itertools;
use serde::Serialize;
use snafu::ensure;
use sqlparser::ast::{
@@ -131,6 +132,22 @@ impl From<Vec<&str>> for OptionValue {
}
}
impl Display for OptionValue {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(s) = self.as_string() {
write!(f, "'{s}'")
} else if let Some(s) = self.as_list() {
write!(
f,
"[{}]",
s.into_iter().map(|x| format!("'{x}'")).join(", ")
)
} else {
write!(f, "'{}'", self.0)
}
}
}
pub fn parse_option_string(option: SqlOption) -> Result<(String, OptionValue)> {
let SqlOption::KeyValue { key, value } = option else {
return InvalidSqlSnafu {

View File

@@ -2338,3 +2338,59 @@ async fn test_copy_parquet_map_to_binary(instance: Arc<dyn MockInstance>) {
+----+-----------------------------------------+"#;
check_output_stream(output, expected).await;
}
#[apply(both_instances_cases)]
async fn test_create_table_with_json_datatype(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let sql = r#"
CREATE TABLE a (
j JSON(format = "partial", unstructured_keys = ["foo", "foo.bar"]),
ts TIMESTAMP TIME INDEX,
)"#;
let output = execute_sql(&instance, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
// "show create table" finds the information from table metadata.
// So if the output is expected, we know the options are really set.
let output = execute_sql(&instance, "SHOW CREATE TABLE a").await.data;
let expected = r#"
+-------+------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------+
| a | CREATE TABLE IF NOT EXISTS "a" ( |
| | "j" JSON(format = 'partial', unstructured_keys = ['foo', 'foo.bar']) NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------+------------------------------------------------------------------------------+"#;
check_output_stream(output, expected).await;
// test the default options
let sql = r#"
CREATE TABLE b (
j JSON,
ts TIMESTAMP TIME INDEX,
)"#;
let output = execute_sql(&instance, sql).await.data;
assert!(matches!(output, OutputData::AffectedRows(0)));
let output = execute_sql(&instance, "SHOW CREATE TABLE b").await.data;
let expected = r#"
+-------+-----------------------------------------+
| Table | Create Table |
+-------+-----------------------------------------+
| b | CREATE TABLE IF NOT EXISTS "b" ( |
| | "j" JSON(format = 'structured') NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts") |
| | ) |
| | |
| | ENGINE=mito |
| | |
+-------+-----------------------------------------+"#;
check_output_stream(output, expected).await;
}