poc/create-alter-for-metrics:

- **Add `operator` dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `operator` dependency.
 - **Expose structs and functions in `schema_helper.rs`**: Made `LogicalSchema`, `LogicalSchemas`, and `ensure_logical_tables_for_metrics` public in `src/operator/src/schema_helper.rs`.
 - **Refactor `batch_builder.rs`**:
   - Changed the logic for handling physical and logical tables, including the introduction of `tags_to_logical_schemas` function.
   - Modified `determine_physical_table_name` to return a string instead of a table reference.
   - Updated logic for managing tags and logical schemas in `MetricsBatchBuilder`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-06-24 07:26:41 +00:00
parent 80b14965a6
commit 24da3367c1
4 changed files with 72 additions and 69 deletions

1
Cargo.lock generated
View File

@@ -11280,6 +11280,7 @@ dependencies = [
"openmetrics-parser",
"opensrv-mysql",
"opentelemetry-proto 0.27.0",
"operator",
"otel-arrow-rust",
"parking_lot 0.12.3",
"permutation",

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader};
use api::v1::region::{ListMetadataRequest, RegionRequestHeader};
use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{
@@ -611,22 +611,22 @@ impl SchemaHelper {
}
/// Schema of a logical table.
struct LogicalSchema {
pub struct LogicalSchema {
/// Name of the logical table.
name: String,
pub name: String,
/// Schema of columns in the logical table.
columns: Vec<ColumnSchema>,
pub columns: Vec<ColumnSchema>,
}
/// Logical table schemas.
struct LogicalSchemas {
pub struct LogicalSchemas {
/// Logical table schemas group by physical table name.
schemas: HashMap<String, Vec<LogicalSchema>>,
pub schemas: HashMap<String, Vec<LogicalSchema>>,
}
/// Creates or alters logical tables to match the provided schemas
/// for prometheus metrics.
async fn ensure_logical_tables_for_metrics(
pub async fn ensure_logical_tables_for_metrics(
helper: &SchemaHelper,
schemas: &LogicalSchemas,
query_ctx: &QueryContextRef,

View File

@@ -78,6 +78,7 @@ mime_guess = "2.0"
notify.workspace = true
object-pool = "0.5"
once_cell.workspace = true
operator.workspace = true
openmetrics-parser = "0.4"
simd-json.workspace = true
socket2 = "0.5"

View File

@@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::{HashMap, HashMapExt, HashSet};
use api::v1::CreateTableExpr;
use std::collections::{HashMap, HashSet};
use api::v1::{ColumnDataType, ColumnSchema, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_route::TableRouteManagerRef;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use operator::schema_helper::{LogicalSchema, LogicalSchemas};
use snafu::ResultExt;
use table::metadata::{TableId, TableInfoRef};
use table::table_name::TableName;
use table::TableRef;
@@ -40,12 +41,8 @@ impl MetricsBatchBuilder {
current_catalog: Option<String>,
current_schema: Option<String>,
) -> error::Result<()> {
// Physical table id -> all tags
let mut existing_tables: HashMap<TableId, HashSet<_>> = HashMap::new();
// Physical table name to create -> all tags.
let mut tables_to_create: HashMap<String, HashSet<String>> = HashMap::new();
// Logical table name -> physical table ref.
let mut physical_tables: HashMap<TableName, TableRef> = HashMap::new();
// Physical table name -> logical tables -> tags in logical table
let mut tags: HashMap<String, HashMap<String, HashSet<String>>> = HashMap::new();
for (ctx, tables) in tables {
for (logical_table_name, table_builder) in tables {
@@ -58,70 +55,35 @@ impl MetricsBatchBuilder {
.or(current_schema.as_deref())
.unwrap_or(DEFAULT_SCHEMA_NAME);
let physical_table = self
.determine_physical_table(
let physical_table_name = self
.determine_physical_table_name(
logical_table_name,
&ctx.physical_table,
catalog,
schema,
)
.await?;
if let Some(physical_table) = physical_table {
let tags_in_table = existing_tables
.entry(physical_table.table_info().ident.table_id)
.or_insert_with(|| {
physical_table
.table_info()
.meta
.primary_key_names()
.cloned()
.collect::<HashSet<_>>()
});
tags_in_table.extend(table_builder.tags().cloned());
physical_tables.insert(
TableName::new(catalog, schema, logical_table_name),
physical_table,
);
} else {
// physical table not exist, build create expr according to logical table tags.
if let Some(tags) = tables_to_create.get_mut(logical_table_name) {
tags.extend(table_builder.tags().cloned());
} else {
// populate tags for table.
tables_to_create.insert(
logical_table_name.to_string(),
table_builder.tags().cloned().collect(),
);
}
}
tags.entry(physical_table_name)
.or_default()
.entry(logical_table_name.clone())
.or_default()
.extend(table_builder.tags().cloned());
}
}
todo!()
// Generate create table and alter table requests and submit DDL procedure to ensure
// schema compatibility. Store the created table reference into [physical_tables]
}
let _logical_schemas = tags_to_logical_schemas(tags);
/// Builds create table expr from provided tag set. We should also add the timestamp and field
/// columns because `tags` only contains primary key.
fn build_create_table_expr(_tags: HashMap<String, HashSet<String>>) -> Vec<CreateTableExpr> {
todo!()
}
/// Builds [AlterTableExpr] by finding new tags.
fn build_alter_table_expr(_table: TableInfoRef, _all_tags: HashMap<String, HashSet<String>>) {
// todo
// 1. Find new added tags according to `all_tags` and existing table schema
// 2. Build AlterTableExpr
// Call [ensure_logical_tables_for_metrics]
}
/// Finds physical table id for logical table.
async fn determine_physical_table(
async fn determine_physical_table_name(
&self,
logical_table_name: &str,
physical_table_name: &Option<String>,
catalog: &str,
schema: &str,
) -> error::Result<Option<TableRef>> {
) -> error::Result<String> {
let logical_table = self
.catalog_manager
.table(catalog, schema, logical_table_name, None)
@@ -141,18 +103,14 @@ impl MetricsBatchBuilder {
.await
.context(error::CatalogSnafu)?
.swap_remove(0);
return Ok(Some(physical_table));
return Ok(physical_table.table_info().name.clone());
}
// Logical table not exist, try assign logical table to a physical table.
let physical_table_name = physical_table_name
.as_deref()
.unwrap_or(GREPTIME_PHYSICAL_TABLE);
self.catalog_manager
.table(catalog, schema, physical_table_name, None)
.await
.context(error::CatalogSnafu)
Ok(physical_table_name.to_string())
}
/// Builds [RecordBatch] from rows with primary key encoded.
@@ -194,3 +152,46 @@ impl MetricsBatchBuilder {
todo!()
}
}
fn tags_to_logical_schemas(
tags: HashMap<String, HashMap<String, HashSet<String>>>,
) -> LogicalSchemas {
let schemas: HashMap<String, Vec<LogicalSchema>> = tags
.into_iter()
.map(|(physical, logical_tables)| {
let schemas: Vec<_> = logical_tables
.into_iter()
.map(|(logical, tags)| {
let mut columns: Vec<_> = tags
.into_iter()
.map(|tag_name| ColumnSchema {
column_name: tag_name,
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
})
.collect();
columns.push(ColumnSchema {
column_name: GREPTIME_TIMESTAMP.to_string(),
datatype: ColumnDataType::TimestampNanosecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
});
columns.push(ColumnSchema {
column_name: GREPTIME_VALUE.to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
});
LogicalSchema {
name: logical,
columns,
}
})
.collect();
(physical, schemas)
})
.collect();
LogicalSchemas { schemas }
}