mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 12:50:40 +00:00
feat: global switch for creating tables automatically (#8203)
* feat: global switch for creating table automatically Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: make auto_create_table as comment by default Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * feat: respect gloabl switch for metric engine Signed-off-by: Dennis Zhuang <killme2008@gmail.com> --------- Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `default_timezone` | String | Unset | The default timezone of the server. |
|
||||
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
|
||||
| `auto_create_table` | Bool | `true` | Server-side global switch for auto table creation on write.<br/>When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. |
|
||||
| `user_provider` | String | Unset | The user provider for authentication.<br/>Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
|
||||
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
|
||||
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
|
||||
@@ -230,6 +231,7 @@
|
||||
| --- | -----| ------- | ----------- |
|
||||
| `default_timezone` | String | Unset | The default timezone of the server. |
|
||||
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
|
||||
| `auto_create_table` | Bool | `true` | Server-side global switch for auto table creation on write.<br/>When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`. |
|
||||
| `user_provider` | String | Unset | The user provider for authentication.<br/>Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
|
||||
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
|
||||
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
|
||||
|
||||
@@ -6,6 +6,10 @@ default_timezone = "UTC"
|
||||
## @toml2docs:none-default
|
||||
default_column_prefix = "greptime"
|
||||
|
||||
## Server-side global switch for auto table creation on write.
|
||||
## When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`.
|
||||
#+ auto_create_table = true
|
||||
|
||||
## The user provider for authentication.
|
||||
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
|
||||
## @toml2docs:none-default
|
||||
|
||||
@@ -6,6 +6,10 @@ default_timezone = "UTC"
|
||||
## @toml2docs:none-default
|
||||
default_column_prefix = "greptime"
|
||||
|
||||
## Server-side global switch for auto table creation on write.
|
||||
## When `false`, a missing table is never auto-created even if the request sets the `auto_create_table` hint to `true`. Default: `true`.
|
||||
#+ auto_create_table = true
|
||||
|
||||
## The user provider for authentication.
|
||||
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
|
||||
## @toml2docs:none-default
|
||||
|
||||
@@ -114,6 +114,7 @@ fn test_load_frontend_example_config() {
|
||||
component: FrontendOptions {
|
||||
default_timezone: Some("UTC".to_string()),
|
||||
default_column_prefix: Some("greptime".to_string()),
|
||||
auto_create_table: true,
|
||||
meta_client: Some(MetaClientOptions {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout: Duration::from_secs(3),
|
||||
@@ -267,6 +268,7 @@ fn test_load_standalone_example_config() {
|
||||
component: StandaloneOptions {
|
||||
default_timezone: Some("UTC".to_string()),
|
||||
default_column_prefix: Some("greptime".to_string()),
|
||||
auto_create_table: true,
|
||||
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
|
||||
dir: Some(format!("{}/{}", DEFAULT_DATA_HOME, WAL_DIR)),
|
||||
sync_period: Some(Duration::from_secs(10)),
|
||||
|
||||
@@ -566,11 +566,15 @@ impl FrontendInvoker {
|
||||
name: TABLE_FLOWNODE_SET_CACHE_NAME,
|
||||
})?;
|
||||
|
||||
// TODO(auto_create_table): flow sink tables are created through a controlled
|
||||
// `CREATE FLOW` path, not client writes, so they are intentionally exempt from
|
||||
// the frontend's global auto-create switch. Revisit if flow should honor it.
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
catalog_manager.clone(),
|
||||
partition_manager.clone(),
|
||||
node_manager.clone(),
|
||||
table_flownode_cache,
|
||||
true,
|
||||
));
|
||||
|
||||
let deleter = Arc::new(Deleter::new(
|
||||
|
||||
@@ -44,6 +44,11 @@ pub struct FrontendOptions {
|
||||
pub node_id: Option<String>,
|
||||
pub default_timezone: Option<String>,
|
||||
pub default_column_prefix: Option<String>,
|
||||
/// Server-side global switch for auto table creation on write.
|
||||
/// Acts as an upper bound: when `false`, missing tables are never auto-created
|
||||
/// even if a request sets the `auto_create_table` hint to `true`. When `true`
|
||||
/// (default), the per-request hint still applies. Default: `true`.
|
||||
pub auto_create_table: bool,
|
||||
/// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
|
||||
/// Set to 0 to disable the limit. Default: "0" (unlimited)
|
||||
pub max_in_flight_write_bytes: ReadableSize,
|
||||
@@ -82,6 +87,7 @@ impl Default for FrontendOptions {
|
||||
node_id: None,
|
||||
default_timezone: None,
|
||||
default_column_prefix: None,
|
||||
auto_create_table: true,
|
||||
max_in_flight_write_bytes: ReadableSize(0),
|
||||
write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
|
||||
http: HttpOptions::default(),
|
||||
|
||||
@@ -185,6 +185,7 @@ impl FrontendBuilder {
|
||||
partition_manager.clone(),
|
||||
node_manager.clone(),
|
||||
table_flownode_cache,
|
||||
self.options.auto_create_table,
|
||||
));
|
||||
let deleter = Arc::new(Deleter::new(
|
||||
self.catalog_manager.clone(),
|
||||
|
||||
@@ -83,6 +83,10 @@ pub struct Inserter {
|
||||
pub(crate) partition_manager: PartitionRuleManagerRef,
|
||||
pub(crate) node_manager: NodeManagerRef,
|
||||
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
/// Server-side upper bound for auto table creation on write.
|
||||
/// When `false`, missing tables are never auto-created regardless of the
|
||||
/// per-request `auto_create_table` hint. When `true`, the hint still applies.
|
||||
auto_create_table: bool,
|
||||
}
|
||||
|
||||
pub type InserterRef = Arc<Inserter>;
|
||||
@@ -135,12 +139,14 @@ impl Inserter {
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
node_manager: NodeManagerRef,
|
||||
table_flownode_set_cache: TableFlownodeSetCacheRef,
|
||||
auto_create_table: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
partition_manager,
|
||||
node_manager,
|
||||
table_flownode_set_cache,
|
||||
auto_create_table,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -469,6 +475,30 @@ impl Inserter {
|
||||
Ok(inserts)
|
||||
}
|
||||
|
||||
/// Returns `None` if auto table creation is allowed, or `Some(reason)` if
|
||||
/// disabled by either the global config or the request hint. The reason tells
|
||||
/// which one, for a clearer error.
|
||||
fn auto_create_disabled_reason(&self, ctx: &QueryContextRef) -> Result<Option<&'static str>> {
|
||||
let auto_create_table_hint = ctx
|
||||
.extension(AUTO_CREATE_TABLE_KEY)
|
||||
.map(|v| v.parse::<bool>())
|
||||
.transpose()
|
||||
.map_err(|_| {
|
||||
InvalidInsertRequestSnafu {
|
||||
reason: "`auto_create_table` hint must be a boolean",
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.unwrap_or(true);
|
||||
Ok(if !self.auto_create_table {
|
||||
Some("auto-create table is disabled by frontend config")
|
||||
} else if !auto_create_table_hint {
|
||||
Some("`auto_create_table` hint is disabled")
|
||||
} else {
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates or alter tables on demand:
|
||||
/// - if table does not exist, create table by inferred CreateExpr
|
||||
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
|
||||
@@ -498,19 +528,7 @@ impl Inserter {
|
||||
let schema = ctx.current_schema();
|
||||
|
||||
let mut table_infos = HashMap::new();
|
||||
// If `auto_create_table` hint is disabled, skip creating/altering tables.
|
||||
let auto_create_table_hint = ctx
|
||||
.extension(AUTO_CREATE_TABLE_KEY)
|
||||
.map(|v| v.parse::<bool>())
|
||||
.transpose()
|
||||
.map_err(|_| {
|
||||
InvalidInsertRequestSnafu {
|
||||
reason: "`auto_create_table` hint must be a boolean",
|
||||
}
|
||||
.build()
|
||||
})?
|
||||
.unwrap_or(true);
|
||||
if !auto_create_table_hint {
|
||||
if let Some(disabled_reason) = self.auto_create_disabled_reason(ctx)? {
|
||||
let mut instant_table_ids = HashSet::new();
|
||||
for req in &requests.inserts {
|
||||
let table = self
|
||||
@@ -518,8 +536,8 @@ impl Inserter {
|
||||
.await?
|
||||
.context(InvalidInsertRequestSnafu {
|
||||
reason: format!(
|
||||
"Table `{}` does not exist, and `auto_create_table` hint is disabled",
|
||||
req.table_name
|
||||
"Table `{}` does not exist, and {}",
|
||||
req.table_name, disabled_reason
|
||||
),
|
||||
})?;
|
||||
let table_info = table.table_info();
|
||||
@@ -767,6 +785,16 @@ impl Inserter {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Gate here too, otherwise a disabled switch would still leak the physical table.
|
||||
if let Some(disabled_reason) = self.auto_create_disabled_reason(ctx)? {
|
||||
return InvalidInsertRequestSnafu {
|
||||
reason: format!(
|
||||
"Physical table `{physical_table}` does not exist, and {disabled_reason}"
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
|
||||
info!("Physical metric table `{table_reference}` does not exist, try creating table");
|
||||
|
||||
@@ -1333,6 +1361,7 @@ mod tests {
|
||||
Cache::new(100),
|
||||
kv_backend.clone(),
|
||||
)),
|
||||
true,
|
||||
);
|
||||
let alter_expr = inserter
|
||||
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
|
||||
|
||||
@@ -38,6 +38,10 @@ pub struct StandaloneOptions {
|
||||
pub enable_telemetry: bool,
|
||||
pub default_timezone: Option<String>,
|
||||
pub default_column_prefix: Option<String>,
|
||||
/// Server-side global switch for auto table creation on write.
|
||||
/// Upper bound: when `false`, missing tables are never auto-created even if a
|
||||
/// request sets the `auto_create_table` hint to `true`. Default: `true`.
|
||||
pub auto_create_table: bool,
|
||||
/// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
|
||||
/// Set to 0 to disable the limit. Default: "0" (unlimited)
|
||||
pub max_in_flight_write_bytes: ReadableSize,
|
||||
@@ -77,6 +81,7 @@ impl Default for StandaloneOptions {
|
||||
enable_telemetry: true,
|
||||
default_timezone: None,
|
||||
default_column_prefix: None,
|
||||
auto_create_table: true,
|
||||
max_in_flight_write_bytes: ReadableSize(0),
|
||||
write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
|
||||
http: HttpOptions::default(),
|
||||
@@ -130,6 +135,7 @@ impl StandaloneOptions {
|
||||
let cloned_opts = self.clone();
|
||||
FrontendOptions {
|
||||
default_timezone: cloned_opts.default_timezone,
|
||||
auto_create_table: cloned_opts.auto_create_table,
|
||||
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
|
||||
write_bytes_exhausted_policy: cloned_opts.write_bytes_exhausted_policy,
|
||||
http: cloned_opts.http,
|
||||
|
||||
@@ -80,6 +80,7 @@ pub struct GreptimeDbStandaloneBuilder {
|
||||
default_store: Option<StorageType>,
|
||||
plugin: Option<Plugins>,
|
||||
slow_query_options: SlowQueryOptions,
|
||||
auto_create_table: bool,
|
||||
}
|
||||
|
||||
impl GreptimeDbStandaloneBuilder {
|
||||
@@ -97,9 +98,16 @@ impl GreptimeDbStandaloneBuilder {
|
||||
threshold: Duration::from_secs(1),
|
||||
..Default::default()
|
||||
},
|
||||
auto_create_table: true,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_auto_create_table(mut self, auto_create_table: bool) -> Self {
|
||||
self.auto_create_table = auto_create_table;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn with_default_store_type(self, store_type: StorageType) -> Self {
|
||||
Self {
|
||||
@@ -347,6 +355,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
wal: self.metasrv_wal_config.clone().into(),
|
||||
grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"),
|
||||
slow_query: self.slow_query_options.clone(),
|
||||
auto_create_table: self.auto_create_table,
|
||||
..StandaloneOptions::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -649,6 +649,20 @@ pub async fn setup_grpc_server_with_user_provider(
|
||||
setup_grpc_server_with(store_type, name, user_provider, None, None).await
|
||||
}
|
||||
|
||||
/// Sets up a gRPC server backed by a standalone instance whose frontend has auto
|
||||
/// table creation disabled, for testing the server-side global switch.
|
||||
pub async fn setup_grpc_server_with_auto_create_table_disabled(
|
||||
store_type: StorageType,
|
||||
name: &str,
|
||||
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
|
||||
let instance = GreptimeDbStandaloneBuilder::new(name)
|
||||
.with_default_store_type(store_type)
|
||||
.with_auto_create_table(false)
|
||||
.build()
|
||||
.await;
|
||||
setup_grpc_server_for_instance(instance, None, None, None).await
|
||||
}
|
||||
|
||||
pub async fn setup_grpc_server_with(
|
||||
store_type: StorageType,
|
||||
name: &str,
|
||||
@@ -657,7 +671,17 @@ pub async fn setup_grpc_server_with(
|
||||
memory_limiter: Option<servers::request_memory_limiter::ServerMemoryLimiter>,
|
||||
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
|
||||
let instance = setup_standalone_instance(name, store_type).await;
|
||||
setup_grpc_server_for_instance(instance, user_provider, grpc_config, memory_limiter).await
|
||||
}
|
||||
|
||||
/// Builds and starts a gRPC server on top of an already-constructed standalone
|
||||
/// instance. This is the shared core behind the `setup_grpc_server_*` helpers.
|
||||
async fn setup_grpc_server_for_instance(
|
||||
instance: GreptimeDbStandalone,
|
||||
user_provider: Option<UserProviderRef>,
|
||||
grpc_config: Option<GrpcServerConfig>,
|
||||
memory_limiter: Option<servers::request_memory_limiter::ServerMemoryLimiter>,
|
||||
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
|
||||
let runtime: Runtime = RuntimeBuilder::default()
|
||||
.worker_threads(2)
|
||||
.thread_name("grpc-handlers")
|
||||
|
||||
@@ -44,7 +44,8 @@ use servers::request_memory_limiter::ServerMemoryLimiter;
|
||||
use servers::server::Server;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use tests_integration::test_util::{
|
||||
StorageType, setup_grpc_server, setup_grpc_server_with, setup_grpc_server_with_user_provider,
|
||||
StorageType, setup_grpc_server, setup_grpc_server_with,
|
||||
setup_grpc_server_with_auto_create_table_disabled, setup_grpc_server_with_user_provider,
|
||||
};
|
||||
use tonic::Request;
|
||||
use tonic::metadata::MetadataValue;
|
||||
@@ -82,6 +83,7 @@ macro_rules! grpc_tests {
|
||||
test_invalid_dbname,
|
||||
test_auto_create_table,
|
||||
test_auto_create_table_with_hints,
|
||||
test_auto_create_table_disabled_by_config,
|
||||
test_otel_arrow_auth,
|
||||
test_insert_and_select,
|
||||
test_dbname,
|
||||
@@ -405,6 +407,81 @@ pub async fn test_auto_create_table_with_hints(store_type: StorageType) {
|
||||
let _ = fe_grpc_server.shutdown().await;
|
||||
}
|
||||
|
||||
/// When the frontend global switch disables auto table creation, a write to a
|
||||
/// missing table must fail even if the request sets `auto_create_table=true`,
|
||||
/// proving the global config is an upper bound that hints cannot bypass.
|
||||
pub async fn test_auto_create_table_disabled_by_config(store_type: StorageType) {
|
||||
let (_db, fe_grpc_server) = setup_grpc_server_with_auto_create_table_disabled(
|
||||
store_type,
|
||||
"test_auto_create_table_disabled_by_config",
|
||||
)
|
||||
.await;
|
||||
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
|
||||
|
||||
let grpc_client = Client::with_urls(vec![addr]);
|
||||
let db = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, grpc_client);
|
||||
|
||||
// Plain row insert to a missing table: must fail even with `auto_create_table=true`.
|
||||
let (host, cpu, mem, ts) = expect_data();
|
||||
let request = InsertRequest {
|
||||
table_name: "demo".to_string(),
|
||||
columns: vec![host, cpu, mem, ts],
|
||||
row_count: 4,
|
||||
};
|
||||
let result = db
|
||||
.insert_with_hints(
|
||||
InsertRequests {
|
||||
inserts: vec![request],
|
||||
},
|
||||
&[("auto_create_table", "true")],
|
||||
)
|
||||
.await;
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("does not exist") && err.contains("disabled by frontend config"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
|
||||
// Metric path (via `physical_table` hint): must also fail without leaking the physical table.
|
||||
let (host, cpu, mem, ts) = expect_data();
|
||||
let request = InsertRequest {
|
||||
table_name: "demo_metric".to_string(),
|
||||
columns: vec![host, cpu, mem, ts],
|
||||
row_count: 4,
|
||||
};
|
||||
let result = db
|
||||
.insert_with_hints(
|
||||
InsertRequests {
|
||||
inserts: vec![request],
|
||||
},
|
||||
&[
|
||||
("auto_create_table", "true"),
|
||||
("physical_table", "greptime_physical_table"),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let err = result.unwrap_err().to_string();
|
||||
assert!(
|
||||
err.contains("does not exist") && err.contains("disabled by frontend config"),
|
||||
"unexpected error: {err}"
|
||||
);
|
||||
|
||||
// The physical table must not have been created before the failure.
|
||||
let output = db.sql("SHOW TABLES").await.unwrap();
|
||||
let record_batches = match output.data {
|
||||
OutputData::RecordBatches(record_batches) => record_batches,
|
||||
OutputData::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
|
||||
OutputData::AffectedRows(_) => unreachable!(),
|
||||
};
|
||||
let tables = record_batches.pretty_print().unwrap();
|
||||
assert!(
|
||||
!tables.contains("greptime_physical_table"),
|
||||
"physical table leaked despite disabled auto-create:\n{tables}"
|
||||
);
|
||||
|
||||
let _ = fe_grpc_server.shutdown().await;
|
||||
}
|
||||
|
||||
fn expect_data() -> (Column, Column, Column, Column) {
|
||||
// testing data:
|
||||
let expected_host_col = Column {
|
||||
|
||||
@@ -1491,6 +1491,7 @@ mem_threshold_on_create = "auto"
|
||||
let expected_toml_str = format!(
|
||||
r#"
|
||||
enable_telemetry = true
|
||||
auto_create_table = true
|
||||
max_in_flight_write_bytes = "0KiB"
|
||||
write_bytes_exhausted_policy = "wait"
|
||||
init_regions_in_background = false
|
||||
|
||||
Reference in New Issue
Block a user