mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat/frontend-stager:
### Add `MetricsBatchBuilder` for Physical Table Management - **New Feature**: Introduced `MetricsBatchBuilder` in `batch_builder.rs` to manage the creation and alteration of physical tables based on logical table tags. - **Error Handling**: Added `CommonMeta` error variant in `error.rs` to handle errors from `common_meta`. - **Enhancements**: - Added `tags` method in `prom_row_builder.rs` to retrieve tag names from `TableBuilder`. - Implemented `primary_key_names` method in `metadata.rs` to return primary key names from `TableMeta`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
129
src/servers/src/batch_builder.rs
Normal file
129
src/servers/src/batch_builder.rs
Normal file
@@ -0,0 +1,129 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use ahash::{HashMap, HashMapExt, HashSet};
|
||||
use api::v1::CreateTableExpr;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_meta::key::table_route::TableRouteManagerRef;
|
||||
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
||||
use snafu::ResultExt;
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error;
|
||||
use crate::prom_row_builder::{PromCtx, TableBuilder};
|
||||
|
||||
pub struct MetricsBatchBuilder {
|
||||
table_route_manager: TableRouteManagerRef,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
}
|
||||
|
||||
impl MetricsBatchBuilder {
|
||||
/// Detected the DDL requirements according to the staged table rows.
|
||||
pub async fn create_or_alter_physical_tables(
|
||||
&self,
|
||||
tables: &HashMap<PromCtx, HashMap<String, TableBuilder>>,
|
||||
) -> error::Result<()> {
|
||||
// Physical table id -> all tags
|
||||
let mut existing_tables: HashMap<TableId, HashSet<_>> = HashMap::new();
|
||||
// Physical table name to create -> all tags.
|
||||
let mut tables_to_create: HashMap<String, HashSet<String>> = HashMap::new();
|
||||
|
||||
for (ctx, tables) in tables {
|
||||
for (table_name, table_builder) in tables {
|
||||
let physical_table = self
|
||||
.determine_physical_table(table_name, &ctx.physical_table, "todo", "todo")
|
||||
.await?;
|
||||
if let Some(physical_table) = physical_table {
|
||||
let tags_in_table = existing_tables
|
||||
.entry(physical_table.table_info().ident.table_id)
|
||||
.or_insert_with(|| {
|
||||
physical_table
|
||||
.table_info()
|
||||
.meta
|
||||
.primary_key_names()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>()
|
||||
});
|
||||
tags_in_table.extend(table_builder.tags().cloned());
|
||||
} else {
|
||||
// physical table not exist, build create expr according to logical table tags.
|
||||
if let Some(tags) = tables_to_create.get_mut(table_name) {
|
||||
tags.extend(table_builder.tags().cloned());
|
||||
} else {
|
||||
// populate tags for table.
|
||||
tables_to_create.insert(
|
||||
table_name.to_string(),
|
||||
table_builder.tags().cloned().collect(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Builds create table expr from provided tag set.
|
||||
fn build_create_table_expr(tags: HashMap<String, HashSet<String>>) -> Vec<CreateTableExpr> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Builds [AlterTableExpr] by finding new tags.
|
||||
fn build_alter_table_expr(table: TableInfoRef, all_tags: HashMap<String, HashSet<String>>) {
|
||||
// todo
|
||||
// 1. Find new added tags according to `all_tags` and existing table schema
|
||||
// 2. Build AlterTableExpr
|
||||
}
|
||||
|
||||
/// Finds physical table id for logical table.
|
||||
async fn determine_physical_table(
|
||||
&self,
|
||||
logical_table_name: &str,
|
||||
physical_table_name: &Option<String>,
|
||||
current_catalog: &str,
|
||||
current_schema: &str,
|
||||
) -> error::Result<Option<TableRef>> {
|
||||
let logical_table = self
|
||||
.catalog_manager
|
||||
.table(current_catalog, current_schema, logical_table_name, None)
|
||||
.await
|
||||
.context(error::CatalogSnafu)?;
|
||||
if let Some(logical_table) = logical_table {
|
||||
// logical table already exist, just return the physical table
|
||||
let logical_table_id = logical_table.table_info().table_id();
|
||||
let physical_table_id = self
|
||||
.table_route_manager
|
||||
.get_physical_table_id(logical_table_id)
|
||||
.await
|
||||
.context(error::CommonMetaSnafu)?;
|
||||
let physical_table = self
|
||||
.catalog_manager
|
||||
.tables_by_ids(current_catalog, current_schema, &[physical_table_id])
|
||||
.await
|
||||
.context(error::CatalogSnafu)?
|
||||
.swap_remove(0);
|
||||
return Ok(Some(physical_table));
|
||||
}
|
||||
|
||||
// Logical table not exist, try assign logical table to a physical table.
|
||||
let physical_table_name = physical_table_name
|
||||
.as_deref()
|
||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE);
|
||||
|
||||
self.catalog_manager
|
||||
.table(current_catalog, current_schema, physical_table_name, None)
|
||||
.await
|
||||
.context(error::CatalogSnafu)
|
||||
}
|
||||
}
|
||||
@@ -624,6 +624,13 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Unknown hint: {}", hint))]
|
||||
UnknownHint { hint: String },
|
||||
|
||||
#[snafu(display("Failed to invoke common_meta"))]
|
||||
CommonMeta {
|
||||
source: common_meta::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -747,6 +754,7 @@ impl ErrorExt for Error {
|
||||
DurationOverflow { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
HandleOtelArrowRequest { .. } => StatusCode::Internal,
|
||||
CommonMeta { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use datafusion_expr::LogicalPlan;
|
||||
use datatypes::schema::Schema;
|
||||
|
||||
pub mod addrs;
|
||||
mod batch_builder;
|
||||
pub mod configurator;
|
||||
pub(crate) mod elasticsearch;
|
||||
pub mod error;
|
||||
|
||||
@@ -210,6 +210,13 @@ impl TableBuilder {
|
||||
rows: Some(Rows { schema, rows }),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn tags(&self) -> impl Iterator<Item = &String> {
|
||||
self.schema
|
||||
.iter()
|
||||
.filter(|v| v.semantic_type == SemanticType::Tag as i32)
|
||||
.map(|c| &c.column_name)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -213,6 +213,14 @@ impl TableMeta {
|
||||
.map(|(_, cs)| &cs.name)
|
||||
}
|
||||
|
||||
/// Returns names of primary keys.
|
||||
pub fn primary_key_names(&self) -> impl Iterator<Item = &String> {
|
||||
let columns_schemas = self.schema.column_schemas();
|
||||
self.primary_key_indices
|
||||
.iter()
|
||||
.map(|pk_idx| &columns_schemas[*pk_idx].name)
|
||||
}
|
||||
|
||||
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.
|
||||
///
|
||||
/// The returned builder would derive the next column id of this meta.
|
||||
|
||||
Reference in New Issue
Block a user