feat: accommodate default column name with pre-created table schema (#6126)

* refactor: prepare_mocked_backend

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* modify request in place

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply to influx line protocol

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* return on empty alter expr list

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* expose to other write paths

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-05-20 15:22:13 +08:00
committed by Zhenchi
parent 6c672b96bf
commit f3ca5f5d7f
13 changed files with 278 additions and 64 deletions

View File

@@ -581,7 +581,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.handle_row_inserts(requests, ctx, &self.statement_executor, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -72,7 +72,10 @@ impl GrpcQueryHandler for Instance {
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
self.handle_row_inserts(requests, ctx.clone(), false)
.await?
}
Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?,
Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?,
Request::Query(query_request) => {
@@ -407,9 +410,15 @@ impl Instance {
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
accommodate_existing_schema: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(requests, ctx, self.statement_executor.as_ref())
.handle_row_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
)
.await
.context(TableOperationSnafu)
}
@@ -421,7 +430,7 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref())
.handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true)
.await
.context(TableOperationSnafu)
}

View File

@@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance {
};
let output = self
.handle_row_inserts(requests, ctx)
.handle_row_inserts(requests, ctx, true)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;

View File

@@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
None
};
self.handle_row_inserts(requests, ctx)
self.handle_row_inserts(requests, ctx, false)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(request, ctx.clone())
self.handle_row_inserts(request, ctx.clone(), true)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?

View File

@@ -63,5 +63,6 @@ tokio-util.workspace = true
tonic.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
path-slash = "0.2"

View File

@@ -147,7 +147,7 @@ impl Inserter {
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor)
self.handle_row_inserts(row_inserts, ctx, statement_executor, false)
.await
}
@@ -157,6 +157,7 @@ impl Inserter {
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
) -> Result<Output> {
preprocess_row_insert_requests(&mut requests.inserts)?;
self.handle_row_inserts_with_create_type(
@@ -164,6 +165,7 @@ impl Inserter {
ctx,
statement_executor,
AutoCreateTableType::Physical,
accommodate_existing_schema,
)
.await
}
@@ -180,6 +182,7 @@ impl Inserter {
ctx,
statement_executor,
AutoCreateTableType::Log,
false,
)
.await
}
@@ -195,6 +198,7 @@ impl Inserter {
ctx,
statement_executor,
AutoCreateTableType::Trace,
false,
)
.await
}
@@ -205,12 +209,14 @@ impl Inserter {
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::LastNonNull,
accommodate_existing_schema,
)
.await
}
@@ -222,6 +228,7 @@ impl Inserter {
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
) -> Result<Output> {
// remove empty requests
requests.inserts.retain(|req| {
@@ -236,7 +243,13 @@ impl Inserter {
instant_table_ids,
table_infos,
} = self
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
.create_or_alter_tables_on_demand(
&mut requests,
&ctx,
create_type,
statement_executor,
accommodate_existing_schema,
)
.await?;
let name_to_info = table_infos
@@ -281,10 +294,11 @@ impl Inserter {
table_infos,
} = self
.create_or_alter_tables_on_demand(
&requests,
&mut requests,
&ctx,
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true,
)
.await?;
let name_to_info = table_infos
@@ -448,12 +462,18 @@ impl Inserter {
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
///
/// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
/// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with
/// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
/// remote write. This will modify the `RowInsertRequests` in place.
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
requests: &mut RowInsertRequests,
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
) -> Result<CreateAlterTableResult> {
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
.with_label_values(&[auto_create_table_type.as_str()])
@@ -504,7 +524,7 @@ impl Inserter {
let mut alter_tables = vec![];
let mut instant_table_ids = HashSet::new();
for req in &requests.inserts {
for req in &mut requests.inserts {
match self.get_table(catalog, &schema, &req.table_name).await? {
Some(table) => {
let table_info = table.table_info();
@@ -512,9 +532,12 @@ impl Inserter {
instant_table_ids.insert(table_info.table_id());
}
table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
req,
&table,
ctx,
accommodate_existing_schema,
)? {
alter_tables.push(alter_expr);
}
}
@@ -784,12 +807,16 @@ impl Inserter {
}
/// Returns an alter table expression if it finds new columns in the request.
/// It always adds columns if not exist.
/// When `accommodate_existing_schema` is false, it always adds columns if not exist.
/// When `accommodate_existing_schema` is true, it may modify the input `req` to
/// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
/// for more details.
fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
accommodate_existing_schema: bool,
) -> Result<Option<AlterTableExpr>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
@@ -798,10 +825,64 @@ impl Inserter {
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let column_exprs = ColumnExpr::from_column_schemas(request_schema);
let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
let Some(add_columns) = add_columns else {
let Some(mut add_columns) = add_columns else {
return Ok(None);
};
// If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
if accommodate_existing_schema {
let table_schema = table.schema();
// Find timestamp column name
let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
// Find field column name if there is only one
let mut field_col_name = None;
let mut multiple_field_cols = false;
table.field_columns().for_each(|col| {
if field_col_name.is_none() {
field_col_name = Some(col.name.clone());
} else {
multiple_field_cols = true;
}
});
if multiple_field_cols {
field_col_name = None;
}
// Update column name in request schema for Timestamp/Field columns
if let Some(rows) = req.rows.as_mut() {
for col in &mut rows.schema {
match col.semantic_type {
x if x == SemanticType::Timestamp as i32 => {
if let Some(ref ts_name) = ts_col_name {
if col.column_name != *ts_name {
col.column_name = ts_name.clone();
}
}
}
x if x == SemanticType::Field as i32 => {
if let Some(ref field_name) = field_col_name {
if col.column_name != *field_name {
col.column_name = field_name.clone();
}
}
}
_ => {}
}
}
}
// Remove from add_columns any column that is timestamp or field (if there is only one field column)
add_columns.add_columns.retain(|col| {
let def = col.column_def.as_ref().unwrap();
def.semantic_type != SemanticType::Timestamp as i32
&& (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some())
});
if add_columns.add_columns.is_empty() {
return Ok(None);
}
}
Ok(Some(AlterTableExpr {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
@@ -1035,3 +1116,124 @@ impl FlowMirrorTask {
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::cache::new_table_flownode_set_cache;
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use common_meta::test_util::MockDatanodeManager;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use moka::future::Cache;
use session::context::QueryContext;
use table::dist_table::DummyDataSource;
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
use table::TableRef;
use super::*;
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
ColumnSchema::new(
ts_name,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_time_index(true),
ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
])
.unwrap()
.build()
.unwrap();
let meta = TableMetaBuilder::empty()
.schema(Arc::new(schema))
.primary_key_indices(vec![])
.value_indices(vec![1])
.engine("mito")
.next_column_id(0)
.options(Default::default())
.created_on(Default::default())
.region_numbers(vec![0])
.build()
.unwrap();
let info = Arc::new(
TableInfoBuilder::default()
.table_id(1)
.table_version(0)
.name("test_table")
.schema_name(DEFAULT_SCHEMA_NAME)
.catalog_name(DEFAULT_CATALOG_NAME)
.desc(None)
.table_type(TableType::Base)
.meta(meta)
.build()
.unwrap(),
);
Arc::new(table::Table::new(
info,
table::metadata::FilterPushDownType::Unsupported,
Arc::new(DummyDataSource),
))
}
#[tokio::test]
async fn test_accommodate_existing_schema_logic() {
let ts_name = "my_ts";
let field_name = "my_field";
let table = make_table_ref_with_schema(ts_name, field_name);
// The request uses different names for timestamp and field columns
let mut req = RowInsertRequest {
table_name: "test_table".to_string(),
rows: Some(Rows {
schema: vec![
GrpcColumnSchema {
column_name: "ts_wrong".to_string(),
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
GrpcColumnSchema {
column_name: "field_wrong".to_string(),
datatype: api::v1::ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
},
],
rows: vec![api::v1::Row {
values: vec![Value::default(), Value::default()],
}],
}),
};
let ctx = Arc::new(QueryContext::with(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
));
let kv_backend = prepare_mocked_backend().await;
let inserter = Inserter::new(
catalog::memory::MemoryCatalogManager::new(),
create_partition_rule_manager(kv_backend.clone()).await,
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
Arc::new(new_table_flownode_set_cache(
String::new(),
Cache::new(100),
kv_backend.clone(),
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true)
.unwrap();
assert!(alter_expr.is_none());
// The request's schema should have updated names for timestamp and field columns
let req_schema = req.rows.as_ref().unwrap().schema.clone();
assert_eq!(req_schema[0].column_name, ts_name);
assert_eq!(req_schema[1].column_name, field_name);
}
}

View File

@@ -14,6 +14,7 @@
#![feature(assert_matches)]
#![feature(if_let_guard)]
#![feature(let_chains)]
pub mod delete;
pub mod error;

View File

@@ -57,33 +57,13 @@ mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
use super::*;
use crate::tests::{create_partition_rule_manager, new_test_table_info};
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();
backend
}
use crate::tests::{
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
};
#[tokio::test]
async fn test_delete_request_table_to_region() {

View File

@@ -73,33 +73,13 @@ mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use datatypes::vectors::{Int32Vector, VectorRef};
use store_api::storage::RegionId;
use super::*;
use crate::tests::{create_partition_rule_manager, new_test_table_info};
async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();
backend
}
use crate::tests::{
create_partition_rule_manager, new_test_table_info, prepare_mocked_backend,
};
#[tokio::test]
async fn test_insert_request_table_to_region() {

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod kv_backend;
mod partition_manager;
pub(crate) use kv_backend::prepare_mocked_backend;
pub(crate) use partition_manager::{create_partition_rule_manager, new_test_table_info};

View File

@@ -0,0 +1,38 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey};
use common_meta::key::schema_name::{SchemaManager, SchemaNameKey};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
pub async fn prepare_mocked_backend() -> KvBackendRef {
let backend = Arc::new(MemoryKvBackend::default());
let catalog_manager = CatalogManager::new(backend.clone());
let schema_manager = SchemaManager::new(backend.clone());
catalog_manager
.create(CatalogNameKey::default(), false)
.await
.unwrap();
schema_manager
.create(SchemaNameKey::default(), None, false)
.await
.unwrap();
backend
}

View File

@@ -249,6 +249,7 @@ impl PipelineTable {
requests,
Self::query_ctx(&table_info),
&self.statement_executor,
false,
)
.await
.context(InsertPipelineSnafu)?;