feat: simple read write new json type values (#7175)

feat: basic json read and write

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-11-27 20:40:35 +08:00
committed by GitHub
parent 4c07d2d5de
commit fdab75ce27
35 changed files with 1049 additions and 289 deletions

View File

@@ -762,7 +762,8 @@ pub(crate) fn to_alter_table_expr(
target_type,
} => {
let target_type =
sql_data_type_to_concrete_data_type(&target_type).context(ParseSqlSnafu)?;
sql_data_type_to_concrete_data_type(&target_type, &Default::default())
.context(ParseSqlSnafu)?;
let (target_type, target_type_extension) = ColumnDataTypeWrapper::try_from(target_type)
.map(|w| w.to_parts())
.context(ColumnDataTypeSnafu)?;

View File

@@ -353,10 +353,11 @@ impl Inserter {
&self,
insert: &Insert,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.convert(insert, ctx, statement_executor)
.await?;
let table_infos =

View File

@@ -63,7 +63,7 @@ impl ImpureDefaultFiller {
column.default_constraint()
),
})?;
let grpc_default_value = api::helper::to_proto_value(default_value);
let grpc_default_value = api::helper::to_grpc_value(default_value);
let def = column_schemas_to_defs(vec![column], &pk_names)?.swap_remove(0);
let grpc_column_schema = api::v1::ColumnSchema {
column_name: def.name,

View File

@@ -12,13 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
use std::cell::LazyCell;
use std::collections::HashMap;
use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::options_from_column_schema;
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
use api::v1::{
AlterTableExpr, ColumnSchema as GrpcColumnSchema, ModifyColumnType, ModifyColumnTypes, Row,
Rows,
};
use catalog::CatalogManager;
use common_telemetry::info;
use common_time::Timezone;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::types::JsonType;
use datatypes::value::Value;
use partition::manager::PartitionRuleManager;
use session::context::{QueryContext, QueryContextRef};
use snafu::{OptionExt, ResultExt, ensure};
@@ -30,12 +41,13 @@ use table::metadata::TableInfoRef;
use crate::error::{
CatalogSnafu, ColumnDataTypeSnafu, ColumnDefaultValueSnafu, ColumnNoneDefaultValueSnafu,
ColumnNotFoundSnafu, InvalidSqlSnafu, MissingInsertBodySnafu, ParseSqlSnafu, Result,
SchemaReadOnlySnafu, TableNotFoundSnafu,
ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidSqlSnafu, MissingInsertBodySnafu,
ParseSqlSnafu, Result, SchemaReadOnlySnafu, TableNotFoundSnafu,
};
use crate::insert::InstantAndNormalInsertRequests;
use crate::req_convert::common::partitioner::Partitioner;
use crate::req_convert::insert::semantic_type;
use crate::statement::StatementExecutor;
const DEFAULT_PLACEHOLDER_VALUE: &str = "default";
@@ -62,12 +74,12 @@ impl<'a> StatementToRegion<'a> {
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<(InstantAndNormalInsertRequests, TableInfoRef)> {
let name = stmt.table_name().context(ParseSqlSnafu)?;
let (catalog, schema, table_name) = self.get_full_name(name)?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let mut table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
let table_info = table.table_info();
ensure!(
!common_catalog::consts::is_readonly_schema(&schema),
@@ -94,7 +106,6 @@ impl<'a> StatementToRegion<'a> {
Ok(())
})?;
let mut schema = Vec::with_capacity(column_count);
let mut rows = vec![
Row {
values: Vec::with_capacity(column_count)
@@ -102,17 +113,57 @@ impl<'a> StatementToRegion<'a> {
row_count
];
for (i, column_name) in column_names.into_iter().enumerate() {
let column_schema = table_schema
.column_schema_by_name(column_name)
.with_context(|| ColumnNotFoundSnafu {
msg: format!("Column {} not found in table {}", column_name, &table_name),
})?;
fn find_insert_columns<'a>(
table: &'a TableRef,
column_names: &[&String],
) -> Result<Vec<&'a ColumnSchema>> {
let schema = table.schema_ref();
column_names
.iter()
.map(|name| {
schema
.column_schema_by_name(name)
.context(ColumnNotFoundSnafu { msg: *name })
})
.collect::<Result<Vec<_>>>()
}
let mut insert_columns = find_insert_columns(&table, &column_names)?;
let converter = SqlRowConverter::new(&insert_columns, query_ctx);
// Convert the SQL values to GreptimeDB values, and merge a "largest" JSON types of all
// values on the way by `JsonColumnTypeUpdater`.
let mut updater = JsonColumnTypeUpdater::new(statement_executor, query_ctx);
let value_rows = converter.convert(&mut updater, &sql_rows)?;
// If the JSON values have a "larger" json type than the one in the table schema, modify
// the column's json type first, by executing an "alter table" DDL.
if updater
.maybe_update_column_type(&catalog, &schema, &table_name, &insert_columns)
.await?
{
// Update with the latest schema, if changed.
table = self.get_table(&catalog, &schema, &table_name).await?;
insert_columns = find_insert_columns(&table, &column_names)?;
}
// Finally convert GreptimeDB values to GRPC values, ready to do insertion on Datanode.
for (i, row) in value_rows.into_iter().enumerate() {
for value in row {
let grpc_value = to_grpc_value(value);
rows[i].values.push(grpc_value);
}
}
let table_info = table.table_info();
let mut schema = Vec::with_capacity(column_count);
for column_schema in insert_columns {
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(column_schema.data_type.clone())
.context(ColumnDataTypeSnafu)?
.to_parts();
let column_name = &column_schema.name;
let semantic_type = semantic_type(&table_info, column_name)?;
let grpc_column_schema = GrpcColumnSchema {
@@ -123,16 +174,6 @@ impl<'a> StatementToRegion<'a> {
options: options_from_column_schema(column_schema),
};
schema.push(grpc_column_schema);
for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
let value = sql_value_to_grpc_value(
column_schema,
&sql_row[i],
Some(&query_ctx.timezone()),
query_ctx.auto_string_to_numeric(),
)?;
grpc_row.values.push(value);
}
}
let requests = Partitioner::new(self.partition_manager)
@@ -194,6 +235,147 @@ impl<'a> StatementToRegion<'a> {
}
}
struct SqlRowConverter<'a, 'b> {
insert_columns: &'a [&'a ColumnSchema],
query_context: &'b QueryContextRef,
}
impl<'a, 'b> SqlRowConverter<'a, 'b> {
fn new(insert_columns: &'a [&'a ColumnSchema], query_context: &'b QueryContextRef) -> Self {
Self {
insert_columns,
query_context,
}
}
fn convert(
&self,
updater: &mut JsonColumnTypeUpdater<'_, 'a>,
sql_rows: &[Vec<SqlValue>],
) -> Result<Vec<Vec<Value>>> {
let timezone = Some(&self.query_context.timezone());
let auto_string_to_numeric = self.query_context.auto_string_to_numeric();
let mut value_rows = Vec::with_capacity(sql_rows.len());
for sql_row in sql_rows {
let mut value_row = Vec::with_capacity(self.insert_columns.len());
for (insert_column, sql_value) in self.insert_columns.iter().zip(sql_row) {
let value =
sql_value_to_value(insert_column, sql_value, timezone, auto_string_to_numeric)?;
updater.merge_types(insert_column, &value)?;
value_row.push(value);
}
value_rows.push(value_row);
}
Ok(value_rows)
}
}
struct JsonColumnTypeUpdater<'a, 'b> {
statement_executor: &'a StatementExecutor,
query_context: &'a QueryContextRef,
merged_value_types: LazyCell<HashMap<&'b str, JsonType>>,
}
impl<'a, 'b> JsonColumnTypeUpdater<'a, 'b> {
fn new(statement_executor: &'a StatementExecutor, query_context: &'a QueryContextRef) -> Self {
Self {
statement_executor,
query_context,
merged_value_types: LazyCell::new(Default::default),
}
}
fn merge_types(&mut self, column_schema: &'b ColumnSchema, value: &Value) -> Result<()> {
if !matches!(value, Value::Json(_)) {
return Ok(());
}
if let ConcreteDataType::Json(value_type) = value.data_type() {
let merged_type = self
.merged_value_types
.entry(&column_schema.name)
.or_insert_with(|| value_type.clone());
if !merged_type.is_include(&value_type) {
merged_type.merge(&value_type).map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!(r#"cannot merge "{value_type}" into "{merged_type}": {e}"#),
}
.build()
})?;
}
}
Ok(())
}
async fn maybe_update_column_type(
self,
catalog: &str,
schema: &str,
table: &str,
insert_columns: &[&ColumnSchema],
) -> Result<bool> {
let mut has_update = false;
for (column_name, merged_type) in self.merged_value_types.iter() {
let Some(column_type) = insert_columns
.iter()
.find_map(|x| (&x.name == column_name).then(|| x.data_type.as_json()))
.flatten()
else {
continue;
};
if column_type.is_include(merged_type) {
continue;
}
let new_column_type = {
let mut x = column_type.clone();
x.merge(merged_type)
.map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!(
r#"cannot merge "{merged_type}" into "{column_type}": {e}"#
),
}
.build()
})
.map(|()| x)
}?;
info!(
"updating table {}.{}.{} column {} json type: {} => {}",
catalog, schema, table, column_name, column_type, new_column_type,
);
let (target_type, target_type_extension) =
ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(new_column_type))
.context(ColumnDataTypeSnafu)?
.into_parts();
let alter_expr = AlterTableExpr {
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types: vec![ModifyColumnType {
column_name: column_name.to_string(),
target_type: target_type as i32,
target_type_extension,
}],
})),
};
self.statement_executor
.alter_table_inner(alter_expr, self.query_context.clone())
.await?;
has_update = true;
}
Ok(has_update)
}
}
fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a String> {
if !stmt.columns().is_empty() {
stmt.columns()
@@ -209,12 +391,12 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St
/// Converts SQL value to gRPC value according to the column schema.
/// If `auto_string_to_numeric` is true, tries to cast the string value to numeric values,
/// and fills the default value if the cast fails.
fn sql_value_to_grpc_value(
fn sql_value_to_value(
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
auto_string_to_numeric: bool,
) -> Result<GrpcValue> {
) -> Result<Value> {
let column = &column_schema.name;
let value = if replace_default(sql_val) {
let default_value = column_schema
@@ -237,9 +419,25 @@ fn sql_value_to_grpc_value(
)
.context(crate::error::SqlCommonSnafu)?
};
validate(&value)?;
Ok(value)
}
let grpc_value = value_to_grpc_value(value);
Ok(grpc_value)
fn validate(value: &Value) -> Result<()> {
match value {
Value::Json(value) => {
// Json object will be stored as Arrow struct in parquet, and it has the restriction:
// "Parquet does not support writing empty structs".
ensure!(
!value.is_empty_object(),
InvalidInsertRequestSnafu {
reason: "empty json object is not supported, consider adding a dummy field"
}
);
Ok(())
}
_ => Ok(()),
}
}
fn replace_default(sql_val: &SqlValue) -> bool {

View File

@@ -28,7 +28,7 @@ impl StatementExecutor {
if insert.can_extract_values() {
// Fast path: plain insert ("insert with literal values") is executed directly
self.inserter
.handle_statement_insert(insert.as_ref(), &query_ctx)
.handle_statement_insert(insert.as_ref(), &query_ctx, self)
.await
} else {
// Slow path: insert with subquery. Execute using query engine.