From 24da3367c10e9f7b52ff1ab95cb86a428e4f284b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 24 Jun 2025 07:26:41 +0000 Subject: [PATCH] poc/create-alter-for-metrics: - **Add `operator` dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `operator` dependency. - **Expose structs and functions in `schema_helper.rs`**: Made `LogicalSchema`, `LogicalSchemas`, and `ensure_logical_tables_for_metrics` public in `src/operator/src/schema_helper.rs`. - **Refactor `batch_builder.rs`**: - Changed the logic for handling physical and logical tables, including the introduction of `tags_to_logical_schemas` function. - Modified `determine_physical_table_name` to return a string instead of a table reference. - Updated logic for managing tags and logical schemas in `MetricsBatchBuilder`. Signed-off-by: Lei, HUANG --- Cargo.lock | 1 + src/operator/src/schema_helper.rs | 14 ++-- src/servers/Cargo.toml | 1 + src/servers/src/batch_builder.rs | 125 +++++++++++++++--------------- 4 files changed, 72 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3361f2568d..8381d83409 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11280,6 +11280,7 @@ dependencies = [ "openmetrics-parser", "opensrv-mysql", "opentelemetry-proto 0.27.0", + "operator", "otel-arrow-rust", "parking_lot 0.12.3", "permutation", diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index fddee7347e..599e3b1c3e 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use api::v1::alter_table_expr::Kind; use api::v1::region::region_request::Body; -use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader}; +use api::v1::region::{ListMetadataRequest, RegionRequestHeader}; use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType}; use catalog::CatalogManagerRef; use common_catalog::consts::{ @@ -611,22 +611,22 @@ impl SchemaHelper { } /// Schema of a logical table. -struct LogicalSchema { +pub struct LogicalSchema { /// Name of the logical table. - name: String, + pub name: String, /// Schema of columns in the logical table. - columns: Vec, + pub columns: Vec, } /// Logical table schemas. -struct LogicalSchemas { +pub struct LogicalSchemas { /// Logical table schemas group by physical table name. - schemas: HashMap>, + pub schemas: HashMap>, } /// Creates or alters logical tables to match the provided schemas /// for prometheus metrics. -async fn ensure_logical_tables_for_metrics( +pub async fn ensure_logical_tables_for_metrics( helper: &SchemaHelper, schemas: &LogicalSchemas, query_ctx: &QueryContextRef, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index bc64e19485..0a6cb2f756 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -78,6 +78,7 @@ mime_guess = "2.0" notify.workspace = true object-pool = "0.5" once_cell.workspace = true +operator.workspace = true openmetrics-parser = "0.4" simd-json.workspace = true socket2 = "0.5" diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index ee03172025..86dd11bc31 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::{HashMap, HashMapExt, HashSet}; -use api::v1::CreateTableExpr; +use std::collections::{HashMap, HashSet}; + +use api::v1::{ColumnDataType, ColumnSchema, SemanticType}; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_route::TableRouteManagerRef; -use common_query::prelude::GREPTIME_PHYSICAL_TABLE; +use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use operator::schema_helper::{LogicalSchema, LogicalSchemas}; use snafu::ResultExt; -use table::metadata::{TableId, TableInfoRef}; use table::table_name::TableName; use table::TableRef; @@ -40,12 +41,8 @@ impl MetricsBatchBuilder { current_catalog: Option, current_schema: Option, ) -> error::Result<()> { - // Physical table id -> all tags - let mut existing_tables: HashMap> = HashMap::new(); - // Physical table name to create -> all tags. - let mut tables_to_create: HashMap> = HashMap::new(); - // Logical table name -> physical table ref. - let mut physical_tables: HashMap = HashMap::new(); + // Physical table name -> logical tables -> tags in logical table + let mut tags: HashMap>> = HashMap::new(); for (ctx, tables) in tables { for (logical_table_name, table_builder) in tables { @@ -58,70 +55,35 @@ impl MetricsBatchBuilder { .or(current_schema.as_deref()) .unwrap_or(DEFAULT_SCHEMA_NAME); - let physical_table = self - .determine_physical_table( + let physical_table_name = self + .determine_physical_table_name( logical_table_name, &ctx.physical_table, catalog, schema, ) .await?; - if let Some(physical_table) = physical_table { - let tags_in_table = existing_tables - .entry(physical_table.table_info().ident.table_id) - .or_insert_with(|| { - physical_table - .table_info() - .meta - .primary_key_names() - .cloned() - .collect::>() - }); - tags_in_table.extend(table_builder.tags().cloned()); - physical_tables.insert( - TableName::new(catalog, schema, logical_table_name), - physical_table, - ); - } else { - // physical table not exist, build create expr according to logical table tags. - if let Some(tags) = tables_to_create.get_mut(logical_table_name) { - tags.extend(table_builder.tags().cloned()); - } else { - // populate tags for table. - tables_to_create.insert( - logical_table_name.to_string(), - table_builder.tags().cloned().collect(), - ); - } - } + tags.entry(physical_table_name) + .or_default() + .entry(logical_table_name.clone()) + .or_default() + .extend(table_builder.tags().cloned()); } } - todo!() - // Generate create table and alter table requests and submit DDL procedure to ensure - // schema compatibility. Store the created table reference into [physical_tables] - } + let _logical_schemas = tags_to_logical_schemas(tags); - /// Builds create table expr from provided tag set. We should also add the timestamp and field - /// columns because `tags` only contains primary key. - fn build_create_table_expr(_tags: HashMap>) -> Vec { todo!() - } - - /// Builds [AlterTableExpr] by finding new tags. - fn build_alter_table_expr(_table: TableInfoRef, _all_tags: HashMap>) { - // todo - // 1. Find new added tags according to `all_tags` and existing table schema - // 2. Build AlterTableExpr + // Call [ensure_logical_tables_for_metrics] } /// Finds physical table id for logical table. - async fn determine_physical_table( + async fn determine_physical_table_name( &self, logical_table_name: &str, physical_table_name: &Option, catalog: &str, schema: &str, - ) -> error::Result> { + ) -> error::Result { let logical_table = self .catalog_manager .table(catalog, schema, logical_table_name, None) @@ -141,18 +103,14 @@ impl MetricsBatchBuilder { .await .context(error::CatalogSnafu)? .swap_remove(0); - return Ok(Some(physical_table)); + return Ok(physical_table.table_info().name.clone()); } // Logical table not exist, try assign logical table to a physical table. let physical_table_name = physical_table_name .as_deref() .unwrap_or(GREPTIME_PHYSICAL_TABLE); - - self.catalog_manager - .table(catalog, schema, physical_table_name, None) - .await - .context(error::CatalogSnafu) + Ok(physical_table_name.to_string()) } /// Builds [RecordBatch] from rows with primary key encoded. @@ -194,3 +152,46 @@ impl MetricsBatchBuilder { todo!() } } + +fn tags_to_logical_schemas( + tags: HashMap>>, +) -> LogicalSchemas { + let schemas: HashMap> = tags + .into_iter() + .map(|(physical, logical_tables)| { + let schemas: Vec<_> = logical_tables + .into_iter() + .map(|(logical, tags)| { + let mut columns: Vec<_> = tags + .into_iter() + .map(|tag_name| ColumnSchema { + column_name: tag_name, + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }) + .collect(); + columns.push(ColumnSchema { + column_name: GREPTIME_TIMESTAMP.to_string(), + datatype: ColumnDataType::TimestampNanosecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }); + columns.push(ColumnSchema { + column_name: GREPTIME_VALUE.to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }); + LogicalSchema { + name: logical, + columns, + } + }) + .collect(); + (physical, schemas) + }) + .collect(); + + LogicalSchemas { schemas } +}