From ed9312f8e35dc323b4c7052972c9b2e3aa1fa49d Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 1 Jun 2026 07:51:14 +0800 Subject: [PATCH] feat: global switch for creating tables automatically (#8203) * feat: global switch for creating table automatically Signed-off-by: Dennis Zhuang * chore: make auto_create_table as comment by default Signed-off-by: Dennis Zhuang * feat: respect gloabl switch for metric engine Signed-off-by: Dennis Zhuang --------- Signed-off-by: Dennis Zhuang --- config/config.md | 2 + config/frontend.example.toml | 4 ++ config/standalone.example.toml | 4 ++ src/cmd/tests/load_config_test.rs | 2 + src/flow/src/server.rs | 4 ++ src/frontend/src/frontend.rs | 6 +++ src/frontend/src/instance/builder.rs | 1 + src/operator/src/insert.rs | 59 +++++++++++++++------ src/standalone/src/options.rs | 6 +++ tests-integration/src/standalone.rs | 9 ++++ tests-integration/src/test_util.rs | 24 +++++++++ tests-integration/tests/grpc.rs | 79 +++++++++++++++++++++++++++- tests-integration/tests/http.rs | 1 + 13 files changed, 185 insertions(+), 16 deletions(-) diff --git a/config/config.md b/config/config.md index 0fae0caaa4..aa0c6701a0 100644 --- a/config/config.md +++ b/config/config.md @@ -14,6 +14,7 @@ | --- | -----| ------- | ----------- | | `default_timezone` | String | Unset | The default timezone of the server. | | `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. | +| `auto_create_table` | Bool | `true` | Server-side global switch for auto table creation on write.
When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. | | `user_provider` | String | Unset | The user provider for authentication.
Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" | | `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
Set to 0 to disable the limit. Default: "0" (unlimited) | | `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.
Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" | @@ -230,6 +231,7 @@ | --- | -----| ------- | ----------- | | `default_timezone` | String | Unset | The default timezone of the server. | | `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. | +| `auto_create_table` | Bool | `true` | Server-side global switch for auto table creation on write.
When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. | | `user_provider` | String | Unset | The user provider for authentication.
Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" | | `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
Set to 0 to disable the limit. Default: "0" (unlimited) | | `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.
Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" | diff --git a/config/frontend.example.toml b/config/frontend.example.toml index 39f38fbef9..a044aebda6 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -6,6 +6,10 @@ default_timezone = "UTC" ## @toml2docs:none-default default_column_prefix = "greptime" +## Server-side global switch for auto table creation on write. +## When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. +#+ auto_create_table = true + ## The user provider for authentication. ## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" ## @toml2docs:none-default diff --git a/config/standalone.example.toml b/config/standalone.example.toml index d5c42e744c..5740e0e1cf 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -6,6 +6,10 @@ default_timezone = "UTC" ## @toml2docs:none-default default_column_prefix = "greptime" +## Server-side global switch for auto table creation on write. +## When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. +#+ auto_create_table = true + ## The user provider for authentication. ## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" ## @toml2docs:none-default diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 6cffcd67c2..cee29e4456 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -114,6 +114,7 @@ fn test_load_frontend_example_config() { component: FrontendOptions { default_timezone: Some("UTC".to_string()), default_column_prefix: Some("greptime".to_string()), + auto_create_table: true, meta_client: Some(MetaClientOptions { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], timeout: Duration::from_secs(3), @@ -267,6 +268,7 @@ fn test_load_standalone_example_config() { component: StandaloneOptions { default_timezone: Some("UTC".to_string()), default_column_prefix: Some("greptime".to_string()), + auto_create_table: true, wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)), sync_period: Some(Duration::from_secs(10)), diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 913128f386..c2d986f645 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -566,11 +566,15 @@ impl FrontendInvoker { name: TABLE_FLOWNODE_SET_CACHE_NAME, })?; + // TODO(auto_create_table): flow sink tables are created through a controlled + // `CREATE FLOW` path, not client writes, so they are intentionally exempt from + // the frontend's global auto-create switch. Revisit if flow should honor it. let inserter = Arc::new(Inserter::new( catalog_manager.clone(), partition_manager.clone(), node_manager.clone(), table_flownode_cache, + true, )); let deleter = Arc::new(Deleter::new( diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index fb3b096f06..918185cb8f 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -44,6 +44,11 @@ pub struct FrontendOptions { pub node_id: Option, pub default_timezone: Option, pub default_column_prefix: Option, + /// Server-side global switch for auto table creation on write. + /// Acts as an upper bound: when `false`, missing tables are never auto-created + /// even if a request sets the `auto_create_table` hint to `true`. When `true` + /// (default), the per-request hint still applies. Default: `true`. + pub auto_create_table: bool, /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight). /// Set to 0 to disable the limit. Default: "0" (unlimited) pub max_in_flight_write_bytes: ReadableSize, @@ -82,6 +87,7 @@ impl Default for FrontendOptions { node_id: None, default_timezone: None, default_column_prefix: None, + auto_create_table: true, max_in_flight_write_bytes: ReadableSize(0), write_bytes_exhausted_policy: OnExhaustedPolicy::default(), http: HttpOptions::default(), diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ff857ed768..526d8aac73 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -185,6 +185,7 @@ impl FrontendBuilder { partition_manager.clone(), node_manager.clone(), table_flownode_cache, + self.options.auto_create_table, )); let deleter = Arc::new(Deleter::new( self.catalog_manager.clone(), diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index ff8ed2b78b..9ef4d2ff94 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -83,6 +83,10 @@ pub struct Inserter { pub(crate) partition_manager: PartitionRuleManagerRef, pub(crate) node_manager: NodeManagerRef, pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef, + /// Server-side upper bound for auto table creation on write. + /// When `false`, missing tables are never auto-created regardless of the + /// per-request `auto_create_table` hint. When `true`, the hint still applies. + auto_create_table: bool, } pub type InserterRef = Arc; @@ -135,12 +139,14 @@ impl Inserter { partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, table_flownode_set_cache: TableFlownodeSetCacheRef, + auto_create_table: bool, ) -> Self { Self { catalog_manager, partition_manager, node_manager, table_flownode_set_cache, + auto_create_table, } } @@ -469,6 +475,30 @@ impl Inserter { Ok(inserts) } + /// Returns `None` if auto table creation is allowed, or `Some(reason)` if + /// disabled by either the global config or the request hint. The reason tells + /// which one, for a clearer error. + fn auto_create_disabled_reason(&self, ctx: &QueryContextRef) -> Result> { + let auto_create_table_hint = ctx + .extension(AUTO_CREATE_TABLE_KEY) + .map(|v| v.parse::()) + .transpose() + .map_err(|_| { + InvalidInsertRequestSnafu { + reason: "`auto_create_table` hint must be a boolean", + } + .build() + })? + .unwrap_or(true); + Ok(if !self.auto_create_table { + Some("auto-create table is disabled by frontend config") + } else if !auto_create_table_hint { + Some("`auto_create_table` hint is disabled") + } else { + None + }) + } + /// Creates or alter tables on demand: /// - if table does not exist, create table by inferred CreateExpr /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` @@ -498,19 +528,7 @@ impl Inserter { let schema = ctx.current_schema(); let mut table_infos = HashMap::new(); - // If `auto_create_table` hint is disabled, skip creating/altering tables. - let auto_create_table_hint = ctx - .extension(AUTO_CREATE_TABLE_KEY) - .map(|v| v.parse::()) - .transpose() - .map_err(|_| { - InvalidInsertRequestSnafu { - reason: "`auto_create_table` hint must be a boolean", - } - .build() - })? - .unwrap_or(true); - if !auto_create_table_hint { + if let Some(disabled_reason) = self.auto_create_disabled_reason(ctx)? { let mut instant_table_ids = HashSet::new(); for req in &requests.inserts { let table = self @@ -518,8 +536,8 @@ impl Inserter { .await? .context(InvalidInsertRequestSnafu { reason: format!( - "Table `{}` does not exist, and `auto_create_table` hint is disabled", - req.table_name + "Table `{}` does not exist, and {}", + req.table_name, disabled_reason ), })?; let table_info = table.table_info(); @@ -767,6 +785,16 @@ impl Inserter { return Ok(()); } + // Gate here too, otherwise a disabled switch would still leak the physical table. + if let Some(disabled_reason) = self.auto_create_disabled_reason(ctx)? { + return InvalidInsertRequestSnafu { + reason: format!( + "Physical table `{physical_table}` does not exist, and {disabled_reason}" + ), + } + .fail(); + } + let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table); info!("Physical metric table `{table_reference}` does not exist, try creating table"); @@ -1333,6 +1361,7 @@ mod tests { Cache::new(100), kv_backend.clone(), )), + true, ); let alter_expr = inserter .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true) diff --git a/src/standalone/src/options.rs b/src/standalone/src/options.rs index dece6389f0..bf1000fd58 100644 --- a/src/standalone/src/options.rs +++ b/src/standalone/src/options.rs @@ -38,6 +38,10 @@ pub struct StandaloneOptions { pub enable_telemetry: bool, pub default_timezone: Option, pub default_column_prefix: Option, + /// Server-side global switch for auto table creation on write. + /// Upper bound: when `false`, missing tables are never auto-created even if a + /// request sets the `auto_create_table` hint to `true`. Default: `true`. + pub auto_create_table: bool, /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight). /// Set to 0 to disable the limit. Default: "0" (unlimited) pub max_in_flight_write_bytes: ReadableSize, @@ -77,6 +81,7 @@ impl Default for StandaloneOptions { enable_telemetry: true, default_timezone: None, default_column_prefix: None, + auto_create_table: true, max_in_flight_write_bytes: ReadableSize(0), write_bytes_exhausted_policy: OnExhaustedPolicy::default(), http: HttpOptions::default(), @@ -130,6 +135,7 @@ impl StandaloneOptions { let cloned_opts = self.clone(); FrontendOptions { default_timezone: cloned_opts.default_timezone, + auto_create_table: cloned_opts.auto_create_table, max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes, write_bytes_exhausted_policy: cloned_opts.write_bytes_exhausted_policy, http: cloned_opts.http, diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b013b8b0d4..74a1501207 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -80,6 +80,7 @@ pub struct GreptimeDbStandaloneBuilder { default_store: Option, plugin: Option, slow_query_options: SlowQueryOptions, + auto_create_table: bool, } impl GreptimeDbStandaloneBuilder { @@ -97,9 +98,16 @@ impl GreptimeDbStandaloneBuilder { threshold: Duration::from_secs(1), ..Default::default() }, + auto_create_table: true, } } + #[must_use] + pub fn with_auto_create_table(mut self, auto_create_table: bool) -> Self { + self.auto_create_table = auto_create_table; + self + } + #[must_use] pub fn with_default_store_type(self, store_type: StorageType) -> Self { Self { @@ -347,6 +355,7 @@ impl GreptimeDbStandaloneBuilder { wal: self.metasrv_wal_config.clone().into(), grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"), slow_query: self.slow_query_options.clone(), + auto_create_table: self.auto_create_table, ..StandaloneOptions::default() }; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 15d65c34ea..60e0d9143e 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -649,6 +649,20 @@ pub async fn setup_grpc_server_with_user_provider( setup_grpc_server_with(store_type, name, user_provider, None, None).await } +/// Sets up a gRPC server backed by a standalone instance whose frontend has auto +/// table creation disabled, for testing the server-side global switch. +pub async fn setup_grpc_server_with_auto_create_table_disabled( + store_type: StorageType, + name: &str, +) -> (GreptimeDbStandalone, Arc) { + let instance = GreptimeDbStandaloneBuilder::new(name) + .with_default_store_type(store_type) + .with_auto_create_table(false) + .build() + .await; + setup_grpc_server_for_instance(instance, None, None, None).await +} + pub async fn setup_grpc_server_with( store_type: StorageType, name: &str, @@ -657,7 +671,17 @@ pub async fn setup_grpc_server_with( memory_limiter: Option, ) -> (GreptimeDbStandalone, Arc) { let instance = setup_standalone_instance(name, store_type).await; + setup_grpc_server_for_instance(instance, user_provider, grpc_config, memory_limiter).await +} +/// Builds and starts a gRPC server on top of an already-constructed standalone +/// instance. This is the shared core behind the `setup_grpc_server_*` helpers. +async fn setup_grpc_server_for_instance( + instance: GreptimeDbStandalone, + user_provider: Option, + grpc_config: Option, + memory_limiter: Option, +) -> (GreptimeDbStandalone, Arc) { let runtime: Runtime = RuntimeBuilder::default() .worker_threads(2) .thread_name("grpc-handlers") diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 0f1112ff4a..d6b96484af 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -44,7 +44,8 @@ use servers::request_memory_limiter::ServerMemoryLimiter; use servers::server::Server; use servers::tls::{TlsMode, TlsOption}; use tests_integration::test_util::{ - StorageType, setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider, + StorageType, setup_grpc_server, setup_grpc_server_with, + setup_grpc_server_with_auto_create_table_disabled, setup_grpc_server_with_user_provider, }; use tonic::Request; use tonic::metadata::MetadataValue; @@ -82,6 +83,7 @@ macro_rules! grpc_tests { test_invalid_dbname, test_auto_create_table, test_auto_create_table_with_hints, + test_auto_create_table_disabled_by_config, test_otel_arrow_auth, test_insert_and_select, test_dbname, @@ -405,6 +407,81 @@ pub async fn test_auto_create_table_with_hints(store_type: StorageType) { let _ = fe_grpc_server.shutdown().await; } +/// When the frontend global switch disables auto table creation, a write to a +/// missing table must fail even if the request sets `auto_create_table=true`, +/// proving the global config is an upper bound that hints cannot bypass. +pub async fn test_auto_create_table_disabled_by_config(store_type: StorageType) { + let (_db, fe_grpc_server) = setup_grpc_server_with_auto_create_table_disabled( + store_type, + "test_auto_create_table_disabled_by_config", + ) + .await; + let addr = fe_grpc_server.bind_addr().unwrap().to_string(); + + let grpc_client = Client::with_urls(vec![addr]); + let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client); + + // Plain row insert to a missing table: must fail even with `auto_create_table=true`. + let (host, cpu, mem, ts) = expect_data(); + let request = InsertRequest { + table_name: "demo".to_string(), + columns: vec![host, cpu, mem, ts], + row_count: 4, + }; + let result = db + .insert_with_hints( + InsertRequests { + inserts: vec![request], + }, + &[("auto_create_table", "true")], + ) + .await; + let err = result.unwrap_err().to_string(); + assert!( + err.contains("does not exist") && err.contains("disabled by frontend config"), + "unexpected error: {err}" + ); + + // Metric path (via `physical_table` hint): must also fail without leaking the physical table. + let (host, cpu, mem, ts) = expect_data(); + let request = InsertRequest { + table_name: "demo_metric".to_string(), + columns: vec![host, cpu, mem, ts], + row_count: 4, + }; + let result = db + .insert_with_hints( + InsertRequests { + inserts: vec![request], + }, + &[ + ("auto_create_table", "true"), + ("physical_table", "greptime_physical_table"), + ], + ) + .await; + let err = result.unwrap_err().to_string(); + assert!( + err.contains("does not exist") && err.contains("disabled by frontend config"), + "unexpected error: {err}" + ); + + // The physical table must not have been created before the failure. + let output = db.sql("SHOW TABLES").await.unwrap(); + let record_batches = match output.data { + OutputData::RecordBatches(record_batches) => record_batches, + OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + OutputData::AffectedRows(_) => unreachable!(), + }; + let tables = record_batches.pretty_print().unwrap(); + assert!( + !tables.contains("greptime_physical_table"), + "physical table leaked despite disabled auto-create:\n{tables}" + ); + + let _ = fe_grpc_server.shutdown().await; +} + fn expect_data() -> (Column, Column, Column, Column) { // testing data: let expected_host_col = Column { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7f411cdec2..6e35ed7656 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1491,6 +1491,7 @@ mem_threshold_on_create = "auto" let expected_toml_str = format!( r#" enable_telemetry = true +auto_create_table = true max_in_flight_write_bytes = "0KiB" write_bytes_exhausted_policy = "wait" init_regions_in_background = false