From 5da3f86d0c3da85c0895f4ca54860b3b7797a803 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 23 Jun 2025 12:08:26 +0000 Subject: [PATCH] feat/frontend-stager: ### Add `MetricsBatchBuilder` for Physical Table Management - **New Feature**: Introduced `MetricsBatchBuilder` in `batch_builder.rs` to manage the creation and alteration of physical tables based on logical table tags. - **Error Handling**: Added `CommonMeta` error variant in `error.rs` to handle errors from `common_meta`. - **Enhancements**: - Added `tags` method in `prom_row_builder.rs` to retrieve tag names from `TableBuilder`. - Implemented `primary_key_names` method in `metadata.rs` to return primary key names from `TableMeta`. Signed-off-by: Lei, HUANG --- src/servers/src/batch_builder.rs | 129 ++++++++++++++++++++++++++++ src/servers/src/error.rs | 8 ++ src/servers/src/lib.rs | 1 + src/servers/src/prom_row_builder.rs | 7 ++ src/table/src/metadata.rs | 8 ++ 5 files changed, 153 insertions(+) create mode 100644 src/servers/src/batch_builder.rs diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs new file mode 100644 index 0000000000..40092252e8 --- /dev/null +++ b/src/servers/src/batch_builder.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::{HashMap, HashMapExt, HashSet}; +use api::v1::CreateTableExpr; +use catalog::CatalogManagerRef; +use common_meta::key::table_route::TableRouteManagerRef; +use common_query::prelude::GREPTIME_PHYSICAL_TABLE; +use snafu::ResultExt; +use table::metadata::{TableId, TableInfoRef}; +use table::TableRef; + +use crate::error; +use crate::prom_row_builder::{PromCtx, TableBuilder}; + +pub struct MetricsBatchBuilder { + table_route_manager: TableRouteManagerRef, + catalog_manager: CatalogManagerRef, +} + +impl MetricsBatchBuilder { + /// Detected the DDL requirements according to the staged table rows. + pub async fn create_or_alter_physical_tables( + &self, + tables: &HashMap>, + ) -> error::Result<()> { + // Physical table id -> all tags + let mut existing_tables: HashMap> = HashMap::new(); + // Physical table name to create -> all tags. + let mut tables_to_create: HashMap> = HashMap::new(); + + for (ctx, tables) in tables { + for (table_name, table_builder) in tables { + let physical_table = self + .determine_physical_table(table_name, &ctx.physical_table, "todo", "todo") + .await?; + if let Some(physical_table) = physical_table { + let tags_in_table = existing_tables + .entry(physical_table.table_info().ident.table_id) + .or_insert_with(|| { + physical_table + .table_info() + .meta + .primary_key_names() + .cloned() + .collect::>() + }); + tags_in_table.extend(table_builder.tags().cloned()); + } else { + // physical table not exist, build create expr according to logical table tags. + if let Some(tags) = tables_to_create.get_mut(table_name) { + tags.extend(table_builder.tags().cloned()); + } else { + // populate tags for table. + tables_to_create.insert( + table_name.to_string(), + table_builder.tags().cloned().collect(), + ); + } + } + } + } + todo!() + } + + /// Builds create table expr from provided tag set. + fn build_create_table_expr(tags: HashMap>) -> Vec { + todo!() + } + + /// Builds [AlterTableExpr] by finding new tags. + fn build_alter_table_expr(table: TableInfoRef, all_tags: HashMap>) { + // todo + // 1. Find new added tags according to `all_tags` and existing table schema + // 2. Build AlterTableExpr + } + + /// Finds physical table id for logical table. + async fn determine_physical_table( + &self, + logical_table_name: &str, + physical_table_name: &Option, + current_catalog: &str, + current_schema: &str, + ) -> error::Result> { + let logical_table = self + .catalog_manager + .table(current_catalog, current_schema, logical_table_name, None) + .await + .context(error::CatalogSnafu)?; + if let Some(logical_table) = logical_table { + // logical table already exist, just return the physical table + let logical_table_id = logical_table.table_info().table_id(); + let physical_table_id = self + .table_route_manager + .get_physical_table_id(logical_table_id) + .await + .context(error::CommonMetaSnafu)?; + let physical_table = self + .catalog_manager + .tables_by_ids(current_catalog, current_schema, &[physical_table_id]) + .await + .context(error::CatalogSnafu)? + .swap_remove(0); + return Ok(Some(physical_table)); + } + + // Logical table not exist, try assign logical table to a physical table. + let physical_table_name = physical_table_name + .as_deref() + .unwrap_or(GREPTIME_PHYSICAL_TABLE); + + self.catalog_manager + .table(current_catalog, current_schema, physical_table_name, None) + .await + .context(error::CatalogSnafu) + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index a756156d0b..26995da2e4 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -624,6 +624,13 @@ pub enum Error { #[snafu(display("Unknown hint: {}", hint))] UnknownHint { hint: String }, + + #[snafu(display("Failed to invoke common_meta"))] + CommonMeta { + source: common_meta::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -747,6 +754,7 @@ impl ErrorExt for Error { DurationOverflow { .. } => StatusCode::InvalidArguments, HandleOtelArrowRequest { .. } => StatusCode::Internal, + CommonMeta { source, .. } => source.status_code(), } } diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 699dea0f84..8b183e62ef 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -22,6 +22,7 @@ use datafusion_expr::LogicalPlan; use datatypes::schema::Schema; pub mod addrs; +mod batch_builder; pub mod configurator; pub(crate) mod elasticsearch; pub mod error; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 95f9f4ee83..cd298d0582 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -210,6 +210,13 @@ impl TableBuilder { rows: Some(Rows { schema, rows }), } } + + pub(crate) fn tags(&self) -> impl Iterator { + self.schema + .iter() + .filter(|v| v.semantic_type == SemanticType::Tag as i32) + .map(|c| &c.column_name) + } } #[cfg(test)] diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 5f644cbc10..9ecbe969ad 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -213,6 +213,14 @@ impl TableMeta { .map(|(_, cs)| &cs.name) } + /// Returns names of primary keys. + pub fn primary_key_names(&self) -> impl Iterator { + let columns_schemas = self.schema.column_schemas(); + self.primary_key_indices + .iter() + .map(|pk_idx| &columns_schemas[*pk_idx].name) + } + /// Returns the new [TableMetaBuilder] after applying given `alter_kind`. /// /// The returned builder would derive the next column id of this meta.