mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
refactor: update MetricsBatchBuilder to use physical region metadata
- Update append_rows_to_batch to accept physical_region_metadata parameter - Refactor metadata lookup to use HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>> - Fix collect_physical_region_metadata call in TablesBuilder to extract logical table info - Remove unused TableName import from batch_builder.rs Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -42,7 +42,6 @@ use store_api::storage::consts::{
|
||||
};
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
use table::metadata::TableId;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::error;
|
||||
use crate::prom_row_builder::{PromCtx, TableBuilder};
|
||||
@@ -234,13 +233,13 @@ impl MetricsBatchBuilder {
|
||||
/// match the schema of physical tables.
|
||||
/// Note:
|
||||
/// Make sure all logical table and physical table are created when reach here and the mapping
|
||||
/// from logical table name to physical table ref is stored in [physical_tables].
|
||||
/// from logical table name to physical table ref is stored in [physical_region_metadata].
|
||||
pub(crate) async fn append_rows_to_batch(
|
||||
&mut self,
|
||||
current_catalog: Option<String>,
|
||||
current_schema: Option<String>,
|
||||
table_data: &mut HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
physical_table_metadata: &HashMap<TableName, RegionMetadataRef>,
|
||||
physical_region_metadata: &HashMap<String, HashMap<String, (TableId, RegionMetadataRef)>>,
|
||||
) -> error::Result<()> {
|
||||
for (ctx, tables_in_schema) in table_data {
|
||||
for (logical_table_name, table) in tables_in_schema {
|
||||
@@ -252,34 +251,26 @@ impl MetricsBatchBuilder {
|
||||
.as_deref()
|
||||
.or(current_schema.as_deref())
|
||||
.unwrap_or(DEFAULT_SCHEMA_NAME);
|
||||
let logical_table_ref = self
|
||||
.schema_helper
|
||||
.catalog_manager()
|
||||
.table(&catalog, &schema, &logical_table_name, None)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?
|
||||
// Look up physical region metadata by schema and table name
|
||||
let schema_metadata = physical_region_metadata.get(schema)
|
||||
.context(error::TableNotFoundSnafu {
|
||||
catalog,
|
||||
schema,
|
||||
table: logical_table_name,
|
||||
})?;
|
||||
let logical_table = TableName::new(catalog, schema, logical_table_name.clone());
|
||||
let Some(physical_table) = physical_table_metadata.get(&logical_table) else {
|
||||
// all physical tables must be created when reach here.
|
||||
return error::TableNotFoundSnafu {
|
||||
let (logical_table_id, physical_table) = schema_metadata.get(logical_table_name)
|
||||
.context(error::TableNotFoundSnafu {
|
||||
catalog,
|
||||
schema,
|
||||
table: logical_table_name,
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
})?;
|
||||
|
||||
let encoder = self
|
||||
.builders
|
||||
.entry(physical_table.region_id.table_id())
|
||||
.or_insert_with(|| Self::create_sparse_encoder(&physical_table));
|
||||
encoder.append_rows(
|
||||
logical_table_ref.table_info().table_id(),
|
||||
*logical_table_id,
|
||||
std::mem::take(table),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -97,21 +97,33 @@ impl TablesBuilder {
|
||||
|
||||
/// Converts [TablesBuilder] to record batch and clears inner states.
|
||||
pub(crate) async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> {
|
||||
let batch_builder = MetricsBatchBuilder::new(
|
||||
let mut batch_builder = MetricsBatchBuilder::new(
|
||||
bulk_ctx.schema_helper.clone(),
|
||||
bulk_ctx.partition_manager.clone(),
|
||||
bulk_ctx.node_manager.clone(),
|
||||
);
|
||||
let tables = std::mem::take(&mut self.tables);
|
||||
let mut tables = std::mem::take(&mut self.tables);
|
||||
|
||||
batch_builder
|
||||
.create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx)
|
||||
.await?;
|
||||
|
||||
// Extract logical table names from tables for metadata collection
|
||||
let current_schema = bulk_ctx.query_ctx.current_schema();
|
||||
let logical_tables: Vec<(String, String)> = tables
|
||||
.iter()
|
||||
.flat_map(|(ctx, table_map)| {
|
||||
let schema = ctx.schema.as_deref().unwrap_or(¤t_schema);
|
||||
table_map
|
||||
.keys()
|
||||
.map(|table_name| (schema.to_string(), table_name.clone()))
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Gather all region metadata for region 0 of physical tables.
|
||||
let physical_region_metadata = batch_builder
|
||||
.collect_physical_region_metadata(&[], &bulk_ctx.query_ctx)
|
||||
.await;
|
||||
.collect_physical_region_metadata(&logical_tables, &bulk_ctx.query_ctx)
|
||||
.await?;
|
||||
|
||||
batch_builder
|
||||
.append_rows_to_batch(None, None, &mut tables, &physical_region_metadata)
|
||||
|
||||
Reference in New Issue
Block a user