diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 8ea9ff3917..d5e0f0b049 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -843,8 +843,46 @@ impl StatementExecutor { } ); - self.alter_logical_tables_procedure(alter_table_exprs, query_context) - .await?; + // group by physical table id + let mut groups: HashMap> = HashMap::new(); + for expr in alter_table_exprs { + // Get table_id from catalog_manager + let catalog = if expr.catalog_name.is_empty() { + query_context.current_catalog() + } else { + &expr.catalog_name + }; + let schema = if expr.schema_name.is_empty() { + query_context.current_schema() + } else { + expr.schema_name.to_string() + }; + let table_name = &expr.table_name; + let table = self + .catalog_manager + .table(catalog, &schema, table_name, Some(&query_context)) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name(catalog, &schema, table_name), + })?; + let table_id = table.table_info().ident.table_id; + let physical_table_id = self + .table_metadata_manager + .table_route_manager() + .get_physical_table_id(table_id) + .await + .context(TableMetadataManagerSnafu)?; + groups.entry(physical_table_id).or_default().push(expr); + } + + // Submit procedure for each physical table + let mut handles = Vec::with_capacity(groups.len()); + for (_physical_table_id, exprs) in groups { + let fut = self.alter_logical_tables_procedure(exprs, query_context.clone()); + handles.push(fut); + } + let _results = futures::future::try_join_all(handles).await?; Ok(Output::new_with_affected_rows(0)) } diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 34d332e81d..ee1586f94e 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -475,6 +475,38 @@ pub fn mock_timeseries() -> Vec { ] } +/// Add new labels to the mock timeseries. +pub fn mock_timeseries_new_label() -> Vec { + let ts_demo_metrics = TimeSeries { + labels: vec![ + new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()), + new_label("idc".to_string(), "idc3".to_string()), + new_label("new_label1".to_string(), "foo".to_string()), + ], + samples: vec![Sample { + value: 42.0, + timestamp: 3000, + }], + ..Default::default() + }; + let ts_multi_labels = TimeSeries { + labels: vec![ + new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()), + new_label("idc".to_string(), "idc4".to_string()), + new_label("env".to_string(), "prod".to_string()), + new_label("host".to_string(), "host9".to_string()), + new_label("new_label2".to_string(), "bar".to_string()), + ], + samples: vec![Sample { + value: 99.0, + timestamp: 4000, + }], + ..Default::default() + }; + + vec![ts_demo_metrics, ts_multi_labels] +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2c663e99b9..bc83eb5904 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -43,7 +43,7 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response; use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::GreptimeQueryOutput; -use servers::prom_store; +use servers::prom_store::{self, mock_timeseries_new_label}; use table::table_name::TableName; use tests_integration::test_util::{ setup_test_http_app, setup_test_http_app_with_frontend, @@ -1223,6 +1223,24 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::NO_CONTENT); + // Write snappy encoded data with new labels + let write_request = WriteRequest { + timeseries: mock_timeseries_new_label(), + ..Default::default() + }; + let serialized_request = write_request.encode_to_vec(); + let compressed_request = + prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy"); + + let res = client + .post("/v1/prometheus/write") + .header("Content-Encoding", "snappy") + .body(compressed_request) + .send() + .await; + + assert_eq!(res.status(), StatusCode::NO_CONTENT); + guard.remove_all().await; }