feat/frontend-stager:

### Commit Summary

 - **Enhancements in `MetricsBatchBuilder`:**
   - Added support for session catalog and schema in `create_or_alter_physical_tables`.
   - Introduced `physical_tables` mapping for logical to physical table references.
   - Implemented `rows_to_batch` to build `RecordBatch` from rows with primary key encoded.

 - **Function Modifications:**
   - Updated `determine_physical_table` to use session catalog and schema.
   - Modified `build_create_table_expr` and `build_alter_table_expr` to include additional parameters for schema compatibility.

 - **Error Handling:**
   - Added error handling for missing physical tables in `rows_to_batch`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-23 12:50:00 +00:00
parent 5da3f86d0c
commit 80b14965a6

View File

@@ -15,15 +15,18 @@
use ahash::{HashMap, HashMapExt, HashSet};
use api::v1::CreateTableExpr;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_route::TableRouteManagerRef;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use snafu::ResultExt;
use table::metadata::{TableId, TableInfoRef};
use table::table_name::TableName;
use table::TableRef;
use crate::error;
use crate::prom_row_builder::{PromCtx, TableBuilder};
#[allow(dead_code)]
pub struct MetricsBatchBuilder {
table_route_manager: TableRouteManagerRef,
catalog_manager: CatalogManagerRef,
@@ -34,16 +37,34 @@ impl MetricsBatchBuilder {
pub async fn create_or_alter_physical_tables(
&self,
tables: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
current_catalog: Option<String>,
current_schema: Option<String>,
) -> error::Result<()> {
// Physical table id -> all tags
let mut existing_tables: HashMap<TableId, HashSet<_>> = HashMap::new();
// Physical table name to create -> all tags.
let mut tables_to_create: HashMap<String, HashSet<String>> = HashMap::new();
// Logical table name -> physical table ref.
let mut physical_tables: HashMap<TableName, TableRef> = HashMap::new();
for (ctx, tables) in tables {
for (table_name, table_builder) in tables {
for (logical_table_name, table_builder) in tables {
// use session catalog.
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
// schema in PromCtx precedes session schema.
let schema = ctx
.schema
.as_deref()
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
let physical_table = self
.determine_physical_table(table_name, &ctx.physical_table, "todo", "todo")
.determine_physical_table(
logical_table_name,
&ctx.physical_table,
catalog,
schema,
)
.await?;
if let Some(physical_table) = physical_table {
let tags_in_table = existing_tables
@@ -57,14 +78,18 @@ impl MetricsBatchBuilder {
.collect::<HashSet<_>>()
});
tags_in_table.extend(table_builder.tags().cloned());
physical_tables.insert(
TableName::new(catalog, schema, logical_table_name),
physical_table,
);
} else {
// physical table not exist, build create expr according to logical table tags.
if let Some(tags) = tables_to_create.get_mut(table_name) {
if let Some(tags) = tables_to_create.get_mut(logical_table_name) {
tags.extend(table_builder.tags().cloned());
} else {
// populate tags for table.
tables_to_create.insert(
table_name.to_string(),
logical_table_name.to_string(),
table_builder.tags().cloned().collect(),
);
}
@@ -72,15 +97,18 @@ impl MetricsBatchBuilder {
}
}
todo!()
// Generate create table and alter table requests and submit DDL procedure to ensure
// schema compatibility. Store the created table reference into [physical_tables]
}
/// Builds create table expr from provided tag set.
fn build_create_table_expr(tags: HashMap<String, HashSet<String>>) -> Vec<CreateTableExpr> {
/// Builds create table expr from provided tag set. We should also add the timestamp and field
/// columns because `tags` only contains primary key.
fn build_create_table_expr(_tags: HashMap<String, HashSet<String>>) -> Vec<CreateTableExpr> {
todo!()
}
/// Builds [AlterTableExpr] by finding new tags.
fn build_alter_table_expr(table: TableInfoRef, all_tags: HashMap<String, HashSet<String>>) {
fn build_alter_table_expr(_table: TableInfoRef, _all_tags: HashMap<String, HashSet<String>>) {
// todo
// 1. Find new added tags according to `all_tags` and existing table schema
// 2. Build AlterTableExpr
@@ -91,12 +119,12 @@ impl MetricsBatchBuilder {
&self,
logical_table_name: &str,
physical_table_name: &Option<String>,
current_catalog: &str,
current_schema: &str,
catalog: &str,
schema: &str,
) -> error::Result<Option<TableRef>> {
let logical_table = self
.catalog_manager
.table(current_catalog, current_schema, logical_table_name, None)
.table(catalog, schema, logical_table_name, None)
.await
.context(error::CatalogSnafu)?;
if let Some(logical_table) = logical_table {
@@ -109,7 +137,7 @@ impl MetricsBatchBuilder {
.context(error::CommonMetaSnafu)?;
let physical_table = self
.catalog_manager
.tables_by_ids(current_catalog, current_schema, &[physical_table_id])
.tables_by_ids(catalog, schema, &[physical_table_id])
.await
.context(error::CatalogSnafu)?
.swap_remove(0);
@@ -122,8 +150,47 @@ impl MetricsBatchBuilder {
.unwrap_or(GREPTIME_PHYSICAL_TABLE);
self.catalog_manager
.table(current_catalog, current_schema, physical_table_name, None)
.table(catalog, schema, physical_table_name, None)
.await
.context(error::CatalogSnafu)
}
/// Builds [RecordBatch] from rows with primary key encoded.
/// Potentially we also need to modify the column name of timestamp and value field to
/// match the schema of physical tables.
/// Note:
/// Make sure all logical table and physical table are created when reach here and the mapping
/// from logical table name to physical table ref is stored in [physical_tables].
fn rows_to_batch(
&self,
current_catalog: Option<String>,
current_schema: Option<String>,
table_data: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
physical_tables: &HashMap<TableName, TableRef>,
) -> error::Result<()> {
for (ctx, tables_in_schema) in table_data {
for (logical_table_name, _table) in tables_in_schema {
// use session catalog.
let catalog = current_catalog.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
// schema in PromCtx precedes session schema.
let schema = ctx
.schema
.as_deref()
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
let logical_table = TableName::new(catalog, schema, logical_table_name);
let Some(_physical_table) = physical_tables.get(&logical_table) else {
// all physical tables must be created when reach here.
return error::TableNotFoundSnafu {
catalog,
schema,
table: logical_table_name,
}
.fail();
};
}
}
todo!()
}
}