feat: support altering multiple logical table in one remote write request (#6137)

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-05-20 19:22:38 +08:00
committed by GitHub
parent 43c30b55ae
commit 7ae0e150e5
3 changed files with 91 additions and 3 deletions

View File

@@ -843,8 +843,46 @@ impl StatementExecutor {
} }
); );
self.alter_logical_tables_procedure(alter_table_exprs, query_context) // group by physical table id
.await?; let mut groups: HashMap<TableId, Vec<AlterTableExpr>> = HashMap::new();
for expr in alter_table_exprs {
// Get table_id from catalog_manager
let catalog = if expr.catalog_name.is_empty() {
query_context.current_catalog()
} else {
&expr.catalog_name
};
let schema = if expr.schema_name.is_empty() {
query_context.current_schema()
} else {
expr.schema_name.to_string()
};
let table_name = &expr.table_name;
let table = self
.catalog_manager
.table(catalog, &schema, table_name, Some(&query_context))
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(catalog, &schema, table_name),
})?;
let table_id = table.table_info().ident.table_id;
let physical_table_id = self
.table_metadata_manager
.table_route_manager()
.get_physical_table_id(table_id)
.await
.context(TableMetadataManagerSnafu)?;
groups.entry(physical_table_id).or_default().push(expr);
}
// Submit procedure for each physical table
let mut handles = Vec::with_capacity(groups.len());
for (_physical_table_id, exprs) in groups {
let fut = self.alter_logical_tables_procedure(exprs, query_context.clone());
handles.push(fut);
}
let _results = futures::future::try_join_all(handles).await?;
Ok(Output::new_with_affected_rows(0)) Ok(Output::new_with_affected_rows(0))
} }

View File

@@ -475,6 +475,38 @@ pub fn mock_timeseries() -> Vec<TimeSeries> {
] ]
} }
/// Add new labels to the mock timeseries.
pub fn mock_timeseries_new_label() -> Vec<TimeSeries> {
let ts_demo_metrics = TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "demo_metrics".to_string()),
new_label("idc".to_string(), "idc3".to_string()),
new_label("new_label1".to_string(), "foo".to_string()),
],
samples: vec![Sample {
value: 42.0,
timestamp: 3000,
}],
..Default::default()
};
let ts_multi_labels = TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
new_label("idc".to_string(), "idc4".to_string()),
new_label("env".to_string(), "prod".to_string()),
new_label("host".to_string(), "host9".to_string()),
new_label("new_label2".to_string(), "bar".to_string()),
],
samples: vec![Sample {
value: 99.0,
timestamp: 4000,
}],
..Default::default()
};
vec![ts_demo_metrics, ts_multi_labels]
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;

View File

@@ -43,7 +43,7 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response}; use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::test_helpers::{TestClient, TestResponse}; use servers::http::test_helpers::{TestClient, TestResponse};
use servers::http::GreptimeQueryOutput; use servers::http::GreptimeQueryOutput;
use servers::prom_store; use servers::prom_store::{self, mock_timeseries_new_label};
use table::table_name::TableName; use table::table_name::TableName;
use tests_integration::test_util::{ use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend, setup_test_http_app, setup_test_http_app_with_frontend,
@@ -1270,6 +1270,24 @@ pub async fn test_prometheus_remote_write(store_type: StorageType) {
) )
.await; .await;
// Write snappy encoded data with new labels
let write_request = WriteRequest {
timeseries: mock_timeseries_new_label(),
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");
let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);
guard.remove_all().await; guard.remove_all().await;
} }