diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 40092252e8..ee03172025 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -15,15 +15,18 @@ use ahash::{HashMap, HashMapExt, HashSet}; use api::v1::CreateTableExpr; use catalog::CatalogManagerRef; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_route::TableRouteManagerRef; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use snafu::ResultExt; use table::metadata::{TableId, TableInfoRef}; +use table::table_name::TableName; use table::TableRef; use crate::error; use crate::prom_row_builder::{PromCtx, TableBuilder}; +#[allow(dead_code)] pub struct MetricsBatchBuilder { table_route_manager: TableRouteManagerRef, catalog_manager: CatalogManagerRef, @@ -34,16 +37,34 @@ impl MetricsBatchBuilder { pub async fn create_or_alter_physical_tables( &self, tables: &HashMap>, + current_catalog: Option, + current_schema: Option, ) -> error::Result<()> { // Physical table id -> all tags let mut existing_tables: HashMap> = HashMap::new(); // Physical table name to create -> all tags. let mut tables_to_create: HashMap> = HashMap::new(); + // Logical table name -> physical table ref. + let mut physical_tables: HashMap = HashMap::new(); for (ctx, tables) in tables { - for (table_name, table_builder) in tables { + for (logical_table_name, table_builder) in tables { + // use session catalog. + let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); + // schema in PromCtx precedes session schema. + let schema = ctx + .schema + .as_deref() + .or(current_schema.as_deref()) + .unwrap_or(DEFAULT_SCHEMA_NAME); + let physical_table = self - .determine_physical_table(table_name, &ctx.physical_table, "todo", "todo") + .determine_physical_table( + logical_table_name, + &ctx.physical_table, + catalog, + schema, + ) .await?; if let Some(physical_table) = physical_table { let tags_in_table = existing_tables @@ -57,14 +78,18 @@ impl MetricsBatchBuilder { .collect::>() }); tags_in_table.extend(table_builder.tags().cloned()); + physical_tables.insert( + TableName::new(catalog, schema, logical_table_name), + physical_table, + ); } else { // physical table not exist, build create expr according to logical table tags. - if let Some(tags) = tables_to_create.get_mut(table_name) { + if let Some(tags) = tables_to_create.get_mut(logical_table_name) { tags.extend(table_builder.tags().cloned()); } else { // populate tags for table. tables_to_create.insert( - table_name.to_string(), + logical_table_name.to_string(), table_builder.tags().cloned().collect(), ); } @@ -72,15 +97,18 @@ impl MetricsBatchBuilder { } } todo!() + // Generate create table and alter table requests and submit DDL procedure to ensure + // schema compatibility. Store the created table reference into [physical_tables] } - /// Builds create table expr from provided tag set. - fn build_create_table_expr(tags: HashMap>) -> Vec { + /// Builds create table expr from provided tag set. We should also add the timestamp and field + /// columns because `tags` only contains primary key. + fn build_create_table_expr(_tags: HashMap>) -> Vec { todo!() } /// Builds [AlterTableExpr] by finding new tags. - fn build_alter_table_expr(table: TableInfoRef, all_tags: HashMap>) { + fn build_alter_table_expr(_table: TableInfoRef, _all_tags: HashMap>) { // todo // 1. Find new added tags according to `all_tags` and existing table schema // 2. Build AlterTableExpr @@ -91,12 +119,12 @@ impl MetricsBatchBuilder { &self, logical_table_name: &str, physical_table_name: &Option, - current_catalog: &str, - current_schema: &str, + catalog: &str, + schema: &str, ) -> error::Result> { let logical_table = self .catalog_manager - .table(current_catalog, current_schema, logical_table_name, None) + .table(catalog, schema, logical_table_name, None) .await .context(error::CatalogSnafu)?; if let Some(logical_table) = logical_table { @@ -109,7 +137,7 @@ impl MetricsBatchBuilder { .context(error::CommonMetaSnafu)?; let physical_table = self .catalog_manager - .tables_by_ids(current_catalog, current_schema, &[physical_table_id]) + .tables_by_ids(catalog, schema, &[physical_table_id]) .await .context(error::CatalogSnafu)? .swap_remove(0); @@ -122,8 +150,47 @@ impl MetricsBatchBuilder { .unwrap_or(GREPTIME_PHYSICAL_TABLE); self.catalog_manager - .table(current_catalog, current_schema, physical_table_name, None) + .table(catalog, schema, physical_table_name, None) .await .context(error::CatalogSnafu) } + + /// Builds [RecordBatch] from rows with primary key encoded. + /// Potentially we also need to modify the column name of timestamp and value field to + /// match the schema of physical tables. + /// Note: + /// Make sure all logical table and physical table are created when reach here and the mapping + /// from logical table name to physical table ref is stored in [physical_tables]. + fn rows_to_batch( + &self, + current_catalog: Option, + current_schema: Option, + table_data: &HashMap>, + physical_tables: &HashMap, + ) -> error::Result<()> { + for (ctx, tables_in_schema) in table_data { + for (logical_table_name, _table) in tables_in_schema { + // use session catalog. + let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); + // schema in PromCtx precedes session schema. + let schema = ctx + .schema + .as_deref() + .or(current_schema.as_deref()) + .unwrap_or(DEFAULT_SCHEMA_NAME); + let logical_table = TableName::new(catalog, schema, logical_table_name); + let Some(_physical_table) = physical_tables.get(&logical_table) else { + // all physical tables must be created when reach here. + return error::TableNotFoundSnafu { + catalog, + schema, + table: logical_table_name, + } + .fail(); + }; + } + } + + todo!() + } }