diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index a48f65b30d..30b5e09a18 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -42,7 +42,6 @@ use store_api::storage::consts::{ }; use store_api::storage::{ColumnId, RegionId}; use table::metadata::TableId; -use table::table_name::TableName; use crate::error; use crate::prom_row_builder::{PromCtx, TableBuilder}; @@ -234,13 +233,13 @@ impl MetricsBatchBuilder { /// match the schema of physical tables. /// Note: /// Make sure all logical table and physical table are created when reach here and the mapping - /// from logical table name to physical table ref is stored in [physical_tables]. + /// from logical table name to physical table ref is stored in [physical_region_metadata]. pub(crate) async fn append_rows_to_batch( &mut self, current_catalog: Option, current_schema: Option, table_data: &mut HashMap>, - physical_table_metadata: &HashMap, + physical_region_metadata: &HashMap>, ) -> error::Result<()> { for (ctx, tables_in_schema) in table_data { for (logical_table_name, table) in tables_in_schema { @@ -252,34 +251,26 @@ impl MetricsBatchBuilder { .as_deref() .or(current_schema.as_deref()) .unwrap_or(DEFAULT_SCHEMA_NAME); - let logical_table_ref = self - .schema_helper - .catalog_manager() - .table(&catalog, &schema, &logical_table_name, None) - .await - .context(error::CatalogSnafu)? + // Look up physical region metadata by schema and table name + let schema_metadata = physical_region_metadata.get(schema) .context(error::TableNotFoundSnafu { catalog, schema, table: logical_table_name, })?; - let logical_table = TableName::new(catalog, schema, logical_table_name.clone()); - let Some(physical_table) = physical_table_metadata.get(&logical_table) else { - // all physical tables must be created when reach here. - return error::TableNotFoundSnafu { + let (logical_table_id, physical_table) = schema_metadata.get(logical_table_name) + .context(error::TableNotFoundSnafu { catalog, schema, table: logical_table_name, - } - .fail(); - }; + })?; let encoder = self .builders .entry(physical_table.region_id.table_id()) .or_insert_with(|| Self::create_sparse_encoder(&physical_table)); encoder.append_rows( - logical_table_ref.table_info().table_id(), + *logical_table_id, std::mem::take(table), )?; } diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 6ba2b3fff3..6db3da31a7 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -97,21 +97,33 @@ impl TablesBuilder { /// Converts [TablesBuilder] to record batch and clears inner states. pub(crate) async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> { - let batch_builder = MetricsBatchBuilder::new( + let mut batch_builder = MetricsBatchBuilder::new( bulk_ctx.schema_helper.clone(), bulk_ctx.partition_manager.clone(), bulk_ctx.node_manager.clone(), ); - let tables = std::mem::take(&mut self.tables); + let mut tables = std::mem::take(&mut self.tables); batch_builder .create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx) .await?; + // Extract logical table names from tables for metadata collection + let current_schema = bulk_ctx.query_ctx.current_schema(); + let logical_tables: Vec<(String, String)> = tables + .iter() + .flat_map(|(ctx, table_map)| { + let schema = ctx.schema.as_deref().unwrap_or(¤t_schema); + table_map + .keys() + .map(|table_name| (schema.to_string(), table_name.clone())) + }) + .collect(); + // Gather all region metadata for region 0 of physical tables. let physical_region_metadata = batch_builder - .collect_physical_region_metadata(&[], &bulk_ctx.query_ctx) - .await; + .collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx) + .await?; batch_builder .append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)