diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 53710f5048..c6b5082f82 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -581,7 +581,7 @@ impl FrontendInvoker { .start_timer(); self.inserter - .handle_row_inserts(requests, ctx, &self.statement_executor) + .handle_row_inserts(requests, ctx, &self.statement_executor, false) .await .map_err(BoxedError::new) .context(common_frontend::error::ExternalSnafu) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 4a83a99d12..2a90f07c56 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -72,7 +72,10 @@ impl GrpcQueryHandler for Instance { let output = match request { Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, - Request::RowInserts(requests) => self.handle_row_inserts(requests, ctx.clone()).await?, + Request::RowInserts(requests) => { + self.handle_row_inserts(requests, ctx.clone(), false) + .await? + } Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?, Request::Query(query_request) => { @@ -407,9 +410,15 @@ impl Instance { &self, requests: RowInsertRequests, ctx: QueryContextRef, + accommodate_existing_schema: bool, ) -> Result { self.inserter - .handle_row_inserts(requests, ctx, self.statement_executor.as_ref()) + .handle_row_inserts( + requests, + ctx, + self.statement_executor.as_ref(), + accommodate_existing_schema, + ) .await .context(TableOperationSnafu) } @@ -421,7 +430,7 @@ impl Instance { ctx: QueryContextRef, ) -> Result { self.inserter - .handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref()) + .handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref(), true) .await .context(TableOperationSnafu) } diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 6baf7a440e..75b5d3966d 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -53,7 +53,7 @@ impl OpentsdbProtocolHandler for Instance { }; let output = self - .handle_row_inserts(requests, ctx) + .handle_row_inserts(requests, ctx, true) .await .map_err(BoxedError::new) .context(servers::error::ExecuteGrpcQuerySnafu)?; diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 4e355cc433..f6332d3bdb 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -63,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance { None }; - self.handle_row_inserts(requests, ctx) + self.handle_row_inserts(requests, ctx, false) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 9b1a06487c..f7e06ffc5f 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -195,7 +195,7 @@ impl PromStoreProtocolHandler for Instance { .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)? } else { - self.handle_row_inserts(request, ctx.clone()) + self.handle_row_inserts(request, ctx.clone(), true) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)? diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index f4ee399b42..7cfb8bf58f 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -63,5 +63,6 @@ tokio-util.workspace = true tonic.workspace = true [dev-dependencies] +common-meta = { workspace = true, features = ["testing"] } common-test-util.workspace = true path-slash = "0.2" diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 637da8c731..24b61269df 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -147,7 +147,7 @@ impl Inserter { statement_executor: &StatementExecutor, ) -> Result { let row_inserts = ColumnToRow::convert(requests)?; - self.handle_row_inserts(row_inserts, ctx, statement_executor) + self.handle_row_inserts(row_inserts, ctx, statement_executor, false) .await } @@ -157,6 +157,7 @@ impl Inserter { mut requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, + accommodate_existing_schema: bool, ) -> Result { preprocess_row_insert_requests(&mut requests.inserts)?; self.handle_row_inserts_with_create_type( @@ -164,6 +165,7 @@ impl Inserter { ctx, statement_executor, AutoCreateTableType::Physical, + accommodate_existing_schema, ) .await } @@ -180,6 +182,7 @@ impl Inserter { ctx, statement_executor, AutoCreateTableType::Log, + false, ) .await } @@ -195,6 +198,7 @@ impl Inserter { ctx, statement_executor, AutoCreateTableType::Trace, + false, ) .await } @@ -205,12 +209,14 @@ impl Inserter { requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, + accommodate_existing_schema: bool, ) -> Result { self.handle_row_inserts_with_create_type( requests, ctx, statement_executor, AutoCreateTableType::LastNonNull, + accommodate_existing_schema, ) .await } @@ -222,6 +228,7 @@ impl Inserter { ctx: QueryContextRef, statement_executor: &StatementExecutor, create_type: AutoCreateTableType, + accommodate_existing_schema: bool, ) -> Result { // remove empty requests requests.inserts.retain(|req| { @@ -236,7 +243,13 @@ impl Inserter { instant_table_ids, table_infos, } = self - .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor) + .create_or_alter_tables_on_demand( + &mut requests, + &ctx, + create_type, + statement_executor, + accommodate_existing_schema, + ) .await?; let name_to_info = table_infos @@ -281,10 +294,11 @@ impl Inserter { table_infos, } = self .create_or_alter_tables_on_demand( - &requests, + &mut requests, &ctx, AutoCreateTableType::Logical(physical_table.to_string()), statement_executor, + true, ) .await?; let name_to_info = table_infos @@ -448,12 +462,18 @@ impl Inserter { /// /// Returns a mapping from table name to table id, where table name is the table name involved in the requests. /// This mapping is used in the conversion of RowToRegion. + /// + /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema. + /// It only works for TIME_INDEX and VALUE columns. This is for the case where the user creates a table with + /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus + /// remote write. This will modify the `RowInsertRequests` in place. async fn create_or_alter_tables_on_demand( &self, - requests: &RowInsertRequests, + requests: &mut RowInsertRequests, ctx: &QueryContextRef, auto_create_table_type: AutoCreateTableType, statement_executor: &StatementExecutor, + accommodate_existing_schema: bool, ) -> Result { let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND .with_label_values(&[auto_create_table_type.as_str()]) @@ -504,7 +524,7 @@ impl Inserter { let mut alter_tables = vec![]; let mut instant_table_ids = HashSet::new(); - for req in &requests.inserts { + for req in &mut requests.inserts { match self.get_table(catalog, &schema, &req.table_name).await? { Some(table) => { let table_info = table.table_info(); @@ -512,9 +532,12 @@ impl Inserter { instant_table_ids.insert(table_info.table_id()); } table_infos.insert(table_info.table_id(), table.table_info()); - if let Some(alter_expr) = - self.get_alter_table_expr_on_demand(req, &table, ctx)? - { + if let Some(alter_expr) = self.get_alter_table_expr_on_demand( + req, + &table, + ctx, + accommodate_existing_schema, + )? { alter_tables.push(alter_expr); } } @@ -784,12 +807,16 @@ impl Inserter { } /// Returns an alter table expression if it finds new columns in the request. - /// It always adds columns if not exist. + /// When `accommodate_existing_schema` is false, it always adds columns if not exist. + /// When `accommodate_existing_schema` is true, it may modify the input `req` to + /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand) + /// for more details. fn get_alter_table_expr_on_demand( &self, - req: &RowInsertRequest, + req: &mut RowInsertRequest, table: &TableRef, ctx: &QueryContextRef, + accommodate_existing_schema: bool, ) -> Result> { let catalog_name = ctx.current_catalog(); let schema_name = ctx.current_schema(); @@ -798,10 +825,64 @@ impl Inserter { let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let column_exprs = ColumnExpr::from_column_schemas(request_schema); let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?; - let Some(add_columns) = add_columns else { + let Some(mut add_columns) = add_columns else { return Ok(None); }; + // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns + if accommodate_existing_schema { + let table_schema = table.schema(); + // Find timestamp column name + let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone()); + // Find field column name if there is only one + let mut field_col_name = None; + let mut multiple_field_cols = false; + table.field_columns().for_each(|col| { + if field_col_name.is_none() { + field_col_name = Some(col.name.clone()); + } else { + multiple_field_cols = true; + } + }); + if multiple_field_cols { + field_col_name = None; + } + + // Update column name in request schema for Timestamp/Field columns + if let Some(rows) = req.rows.as_mut() { + for col in &mut rows.schema { + match col.semantic_type { + x if x == SemanticType::Timestamp as i32 => { + if let Some(ref ts_name) = ts_col_name { + if col.column_name != *ts_name { + col.column_name = ts_name.clone(); + } + } + } + x if x == SemanticType::Field as i32 => { + if let Some(ref field_name) = field_col_name { + if col.column_name != *field_name { + col.column_name = field_name.clone(); + } + } + } + _ => {} + } + } + } + + // Remove from add_columns any column that is timestamp or field (if there is only one field column) + add_columns.add_columns.retain(|col| { + let def = col.column_def.as_ref().unwrap(); + def.semantic_type != SemanticType::Timestamp as i32 + && (def.semantic_type != SemanticType::Field as i32 && field_col_name.is_some()) + }); + + if add_columns.add_columns.is_empty() { + return Ok(None); + } + } + Ok(Some(AlterTableExpr { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), @@ -1035,3 +1116,124 @@ impl FlowMirrorTask { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::cache::new_table_flownode_set_cache; + use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler; + use common_meta::test_util::MockDatanodeManager; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use moka::future::Cache; + use session::context::QueryContext; + use table::dist_table::DummyDataSource; + use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType}; + use table::TableRef; + + use super::*; + use crate::tests::{create_partition_rule_manager, prepare_mocked_backend}; + + fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef { + let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![ + ColumnSchema::new( + ts_name, + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true), + ]) + .unwrap() + .build() + .unwrap(); + let meta = TableMetaBuilder::empty() + .schema(Arc::new(schema)) + .primary_key_indices(vec![]) + .value_indices(vec![1]) + .engine("mito") + .next_column_id(0) + .options(Default::default()) + .created_on(Default::default()) + .region_numbers(vec![0]) + .build() + .unwrap(); + let info = Arc::new( + TableInfoBuilder::default() + .table_id(1) + .table_version(0) + .name("test_table") + .schema_name(DEFAULT_SCHEMA_NAME) + .catalog_name(DEFAULT_CATALOG_NAME) + .desc(None) + .table_type(TableType::Base) + .meta(meta) + .build() + .unwrap(), + ); + Arc::new(table::Table::new( + info, + table::metadata::FilterPushDownType::Unsupported, + Arc::new(DummyDataSource), + )) + } + + #[tokio::test] + async fn test_accommodate_existing_schema_logic() { + let ts_name = "my_ts"; + let field_name = "my_field"; + let table = make_table_ref_with_schema(ts_name, field_name); + + // The request uses different names for timestamp and field columns + let mut req = RowInsertRequest { + table_name: "test_table".to_string(), + rows: Some(Rows { + schema: vec![ + GrpcColumnSchema { + column_name: "ts_wrong".to_string(), + datatype: api::v1::ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + GrpcColumnSchema { + column_name: "field_wrong".to_string(), + datatype: api::v1::ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ], + rows: vec![api::v1::Row { + values: vec![Value::default(), Value::default()], + }], + }), + }; + let ctx = Arc::new(QueryContext::with( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + )); + + let kv_backend = prepare_mocked_backend().await; + let inserter = Inserter::new( + catalog::memory::MemoryCatalogManager::new(), + create_partition_rule_manager(kv_backend.clone()).await, + Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)), + Arc::new(new_table_flownode_set_cache( + String::new(), + Cache::new(100), + kv_backend.clone(), + )), + ); + let alter_expr = inserter + .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true) + .unwrap(); + assert!(alter_expr.is_none()); + + // The request's schema should have updated names for timestamp and field columns + let req_schema = req.rows.as_ref().unwrap().schema.clone(); + assert_eq!(req_schema[0].column_name, ts_name); + assert_eq!(req_schema[1].column_name, field_name); + } +} diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index ae00ef40dd..0772148345 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -14,6 +14,7 @@ #![feature(assert_matches)] #![feature(if_let_guard)] +#![feature(let_chains)] pub mod delete; pub mod error; diff --git a/src/operator/src/req_convert/delete/table_to_region.rs b/src/operator/src/req_convert/delete/table_to_region.rs index ba93ad0f4a..5baaa078a4 100644 --- a/src/operator/src/req_convert/delete/table_to_region.rs +++ b/src/operator/src/req_convert/delete/table_to_region.rs @@ -57,33 +57,13 @@ mod tests { use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; - use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackendRef; use datatypes::vectors::{Int32Vector, VectorRef}; use store_api::storage::RegionId; use super::*; - use crate::tests::{create_partition_rule_manager, new_test_table_info}; - - async fn prepare_mocked_backend() -> KvBackendRef { - let backend = Arc::new(MemoryKvBackend::default()); - - let catalog_manager = CatalogManager::new(backend.clone()); - let schema_manager = SchemaManager::new(backend.clone()); - - catalog_manager - .create(CatalogNameKey::default(), false) - .await - .unwrap(); - schema_manager - .create(SchemaNameKey::default(), None, false) - .await - .unwrap(); - - backend - } + use crate::tests::{ + create_partition_rule_manager, new_test_table_info, prepare_mocked_backend, + }; #[tokio::test] async fn test_delete_request_table_to_region() { diff --git a/src/operator/src/req_convert/insert/table_to_region.rs b/src/operator/src/req_convert/insert/table_to_region.rs index ac79cce503..abc435ee03 100644 --- a/src/operator/src/req_convert/insert/table_to_region.rs +++ b/src/operator/src/req_convert/insert/table_to_region.rs @@ -73,33 +73,13 @@ mod tests { use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType, Value}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; - use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackendRef; use datatypes::vectors::{Int32Vector, VectorRef}; use store_api::storage::RegionId; use super::*; - use crate::tests::{create_partition_rule_manager, new_test_table_info}; - - async fn prepare_mocked_backend() -> KvBackendRef { - let backend = Arc::new(MemoryKvBackend::default()); - - let catalog_manager = CatalogManager::new(backend.clone()); - let schema_manager = SchemaManager::new(backend.clone()); - - catalog_manager - .create(CatalogNameKey::default(), false) - .await - .unwrap(); - schema_manager - .create(SchemaNameKey::default(), None, false) - .await - .unwrap(); - - backend - } + use crate::tests::{ + create_partition_rule_manager, new_test_table_info, prepare_mocked_backend, + }; #[tokio::test] async fn test_insert_request_table_to_region() { diff --git a/src/operator/src/tests.rs b/src/operator/src/tests.rs index e124293beb..9263fa4cca 100644 --- a/src/operator/src/tests.rs +++ b/src/operator/src/tests.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod kv_backend; mod partition_manager; +pub(crate) use kv_backend::prepare_mocked_backend; pub(crate) use partition_manager::{create_partition_rule_manager, new_test_table_info}; diff --git a/src/operator/src/tests/kv_backend.rs b/src/operator/src/tests/kv_backend.rs new file mode 100644 index 0000000000..2106c46a76 --- /dev/null +++ b/src/operator/src/tests/kv_backend.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::key::catalog_name::{CatalogManager, CatalogNameKey}; +use common_meta::key::schema_name::{SchemaManager, SchemaNameKey}; +use common_meta::kv_backend::memory::MemoryKvBackend; +use common_meta::kv_backend::KvBackendRef; + +pub async fn prepare_mocked_backend() -> KvBackendRef { + let backend = Arc::new(MemoryKvBackend::default()); + + let catalog_manager = CatalogManager::new(backend.clone()); + let schema_manager = SchemaManager::new(backend.clone()); + + catalog_manager + .create(CatalogNameKey::default(), false) + .await + .unwrap(); + schema_manager + .create(SchemaNameKey::default(), None, false) + .await + .unwrap(); + + backend +} diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 649bab9f6e..bfd1a8c626 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -249,6 +249,7 @@ impl PipelineTable { requests, Self::query_ctx(&table_info), &self.statement_executor, + false, ) .await .context(InsertPipelineSnafu)?;