From 344006deca43b4a38e696f86002e8c9f7ad38575 Mon Sep 17 00:00:00 2001 From: evenyag Date: Wed, 25 Jun 2025 19:33:28 +0800 Subject: [PATCH] feat: Implements collect_physical_region_metadata Add partition_manager and node_manager to PromBulkState and PromBulkContext Signed-off-by: evenyag --- Cargo.lock | 1 + src/frontend/src/instance.rs | 10 +++ src/frontend/src/server.rs | 2 + src/operator/src/insert.rs | 8 ++ src/operator/src/schema_helper.rs | 3 +- src/servers/Cargo.toml | 1 + src/servers/src/batch_builder.rs | 109 +++++++++++++++++++++++++--- src/servers/src/http/prom_store.rs | 16 ++-- src/servers/src/prom_row_builder.rs | 23 +++--- src/servers/src/proto.rs | 12 +-- 10 files changed, 146 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a25143946..1d6ed173c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11286,6 +11286,7 @@ dependencies = [ "operator", "otel-arrow-rust", "parking_lot 0.12.3", + "partition", "permutation", "pgwire", "pin-project", diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index d9bf7135a0..e4c79de63b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -39,6 +39,7 @@ use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; +use common_meta::node_manager::NodeManagerRef; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::options::ProcedureConfig; @@ -51,6 +52,7 @@ use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::schema_helper::SchemaHelper; use operator::statement::{StatementExecutor, StatementExecutorRef}; +use partition::manager::PartitionRuleManagerRef; use pipeline::pipeline_operator::PipelineOperator; use prometheus::HistogramTimer; use promql_parser::label::Matcher; @@ -171,6 +173,14 @@ impl Instance { self.statement_executor.cache_invalidator().clone(), ) } + + pub fn partition_manager(&self) -> &PartitionRuleManagerRef { + self.inserter.partition_manager() + } + + pub fn node_manager(&self) -> &NodeManagerRef { + self.inserter.node_manager() + } } fn parse_stmt(sql: &str, dialect: &(dyn Dialect + Send + Sync)) -> Result> { diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 30baf4de24..43b6be8371 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -99,6 +99,8 @@ where let bulk_state = if opts.prom_store.bulk_mode { Some(PromBulkState { schema_helper: self.instance.create_schema_helper(), + partition_manager: self.instance.partition_manager().clone(), + node_manager: self.instance.node_manager().clone(), }) } else { None diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index b6dcf0a231..ff4e8c45cd 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -136,6 +136,14 @@ impl Inserter { } } + pub fn partition_manager(&self) -> &PartitionRuleManagerRef { + &self.partition_manager + } + + pub fn node_manager(&self) -> &NodeManagerRef { + &self.node_manager + } + pub async fn handle_column_inserts( &self, requests: InsertRequests, diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index efb6e56334..df171d756d 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -730,8 +730,7 @@ pub async fn ensure_logical_tables_for_metrics( /// Gets the list of metadatas for a list of region ids. // TODO(yingwen): Should we return RegionMetadataRef? -#[allow(dead_code)] -async fn metadatas_for_region_ids( +pub async fn metadatas_for_region_ids( partition_manager: &PartitionRuleManagerRef, node_manager: &NodeManagerRef, region_ids: &[RegionId], diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index eb1d071134..5861624d1e 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -82,6 +82,7 @@ object-pool = "0.5" once_cell.workspace = true openmetrics-parser = "0.4" operator.workspace = true +partition.workspace = true simd-json.workspace = true socket2 = "0.5" # use crates.io version once the following PRs is merged into the nextest release diff --git a/src/servers/src/batch_builder.rs b/src/servers/src/batch_builder.rs index 1a0c8f8616..a48f65b30d 100644 --- a/src/servers/src/batch_builder.rs +++ b/src/servers/src/batch_builder.rs @@ -24,19 +24,23 @@ use arrow::array::{ use arrow::compute; use arrow_schema::Field; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_meta::node_manager::NodeManagerRef; use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, GREPTIME_TIMESTAMP, GREPTIME_VALUE}; +use itertools::Itertools; use metric_engine::row_modifier::{RowModifier, RowsIter}; use mito_codec::row_converter::SparsePrimaryKeyCodec; use operator::schema_helper::{ - ensure_logical_tables_for_metrics, LogicalSchema, LogicalSchemas, SchemaHelper, + ensure_logical_tables_for_metrics, metadatas_for_region_ids, LogicalSchema, LogicalSchemas, + SchemaHelper, }; +use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; -use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::{ ReservedColumnId, OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, }; -use store_api::storage::ColumnId; +use store_api::storage::{ColumnId, RegionId}; use table::metadata::TableId; use table::table_name::TableName; @@ -46,13 +50,21 @@ use crate::prom_row_builder::{PromCtx, TableBuilder}; pub struct MetricsBatchBuilder { schema_helper: SchemaHelper, builders: HashMap, + partition_manager: PartitionRuleManagerRef, + node_manager: NodeManagerRef, } impl MetricsBatchBuilder { - pub fn new(schema_helper: SchemaHelper) -> Self { + pub fn new( + schema_helper: SchemaHelper, + partition_manager: PartitionRuleManagerRef, + node_manager: NodeManagerRef, + ) -> Self { MetricsBatchBuilder { schema_helper, builders: Default::default(), + partition_manager, + node_manager, } } @@ -132,14 +144,89 @@ impl MetricsBatchBuilder { } /// Retrieves physical region metadata of given logical table names. - pub async fn collect_physical_region_metadata( + /// + /// The `logical_tables` is a list of table names, each entry contains the schema name and the table name. + /// Returns the following mapping: `schema => logical table => (logical table id, region 0 metadata of the physical table)`. + pub(crate) async fn collect_physical_region_metadata( &self, - logical_table_names: &[String], - ) -> HashMap< - TableName, /*logical table name*/ - RegionMetadataRef, /*Region metadata for physical re*/ - > { - todo!() + logical_tables: &[(String, String)], + query_ctx: &QueryContextRef, + ) -> error::Result>> { + let catalog = query_ctx.current_catalog(); + // Logical and physical table ids. + let mut table_ids = Vec::with_capacity(logical_tables.len()); + let mut physical_region_ids = HashSet::new(); + for (schema, table_name) in logical_tables { + let logical_table = self + .schema_helper + .get_table(catalog, schema, table_name) + .await + .context(error::OperatorSnafu)? + .context(error::TableNotFoundSnafu { + catalog, + schema: schema, + table: table_name, + })?; + let logical_table_id = logical_table.table_info().table_id(); + let physical_table_id = self + .schema_helper + .table_route_manager() + .get_physical_table_id(logical_table_id) + .await + .context(error::CommonMetaSnafu)?; + table_ids.push((logical_table_id, physical_table_id)); + // We only get metadata from region 0. + physical_region_ids.insert(RegionId::new(physical_table_id, 0)); + } + + // Batch get physical metadata. + let physical_region_ids = physical_region_ids.into_iter().collect_vec(); + let region_metadatas = metadatas_for_region_ids( + &self.partition_manager, + &self.node_manager, + &physical_region_ids, + query_ctx, + ) + .await + .context(error::OperatorSnafu)?; + let mut result_map: HashMap<_, HashMap<_, _>> = HashMap::new(); + let region_metadatas: HashMap<_, _> = region_metadatas + .into_iter() + .flatten() + .map(|meta| (meta.region_id, Arc::new(meta))) + .collect(); + for (i, (schema, table_name)) in logical_tables.iter().enumerate() { + let physical_table_id = table_ids[i].1; + let physical_region_id = RegionId::new(physical_table_id, 0); + let physical_metadata = + region_metadatas.get(&physical_region_id).with_context(|| { + error::UnexpectedResultSnafu { + reason: format!( + "Physical region metadata {} for table {} not found", + physical_region_id, table_name + ), + } + })?; + + match result_map.get_mut(schema) { + Some(table_map) => { + table_map.insert( + table_name.clone(), + (table_ids[i].0, physical_metadata.clone()), + ); + } + None => { + let mut table_map = HashMap::new(); + table_map.insert( + table_name.clone(), + (table_ids[i].0, physical_metadata.clone()), + ); + result_map.insert(schema.to_string(), table_map); + } + } + } + + Ok(result_map) } /// Builds [RecordBatch] from rows with primary key encoded. diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index d9b0e076b0..3b83788655 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -22,12 +22,14 @@ use axum::response::IntoResponse; use axum::Extension; use axum_extra::TypedHeader; use common_catalog::consts::DEFAULT_SCHEMA_NAME; +use common_meta::node_manager::NodeManagerRef; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_telemetry::tracing; use hyper::HeaderMap; use lazy_static::lazy_static; use object_pool::Pool; use operator::schema_helper::SchemaHelper; +use partition::manager::PartitionRuleManagerRef; use pipeline::util::to_pipeline_version; use pipeline::{ContextReq, PipelineDefinition}; use prost::Message; @@ -57,6 +59,8 @@ pub const VM_PROTO_VERSION: &str = "1"; #[derive(Clone)] pub struct PromBulkState { pub schema_helper: SchemaHelper, + pub partition_manager: PartitionRuleManagerRef, + pub node_manager: NodeManagerRef, } #[derive(Clone)] @@ -212,9 +216,11 @@ fn try_decompress(is_zstd: bool, body: &[u8]) -> Result { } /// Context for processing remote write requests in bulk mode. -struct PromBulkContext { - schema_helper: SchemaHelper, - query_ctx: QueryContextRef, +pub struct PromBulkContext { + pub(crate) schema_helper: SchemaHelper, + pub(crate) query_ctx: QueryContextRef, + pub(crate) partition_manager: PartitionRuleManagerRef, + pub(crate) node_manager: NodeManagerRef, } async fn decode_remote_write_request( @@ -280,9 +286,7 @@ async fn decode_remote_write_request_to_batch( .merge(buf, prom_validation_mode, processor) .context(error::DecodePromRemoteRequestSnafu)?; - request - .as_record_batch(bulk.schema_helper, &bulk.query_ctx) - .await + request.as_record_batch(&bulk).await } async fn decode_remote_read_request(body: Bytes) -> Result { diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index a2759ee35c..6ba2b3fff3 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -20,13 +20,12 @@ use api::prom_store::remote::Sample; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; -use operator::schema_helper::SchemaHelper; use pipeline::{ContextOpt, ContextReq}; use prost::DecodeError; -use session::context::QueryContextRef; use crate::batch_builder::MetricsBatchBuilder; use crate::error::Result; +use crate::http::prom_store::PromBulkContext; use crate::http::PromValidationMode; use crate::proto::{decode_string, PromLabel}; use crate::repeated_field::Clear; @@ -97,20 +96,22 @@ impl TablesBuilder { } /// Converts [TablesBuilder] to record batch and clears inner states. - pub(crate) async fn as_record_batch( - &mut self, - schema_helper: SchemaHelper, - query_ctx: &QueryContextRef, - ) -> Result<()> { - let mut batch_builder = MetricsBatchBuilder::new(schema_helper); - let mut tables = std::mem::take(&mut self.tables); + pub(crate) async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> { + let batch_builder = MetricsBatchBuilder::new( + bulk_ctx.schema_helper.clone(), + bulk_ctx.partition_manager.clone(), + bulk_ctx.node_manager.clone(), + ); + let tables = std::mem::take(&mut self.tables); batch_builder - .create_or_alter_physical_tables(&tables, query_ctx) + .create_or_alter_physical_tables(&tables, &bulk_ctx.query_ctx) .await?; // Gather all region metadata for region 0 of physical tables. - let physical_region_metadata = batch_builder.collect_physical_region_metadata(&[]).await; + let physical_region_metadata = batch_builder + .collect_physical_region_metadata(&[], &bulk_ctx.query_ctx) + .await; batch_builder .append_rows_to_batch(None, None, &mut tables, &physical_region_metadata) diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index a62ad04f48..c257700fc3 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -21,7 +21,6 @@ use api::prom_store::remote::Sample; use bytes::{Buf, Bytes}; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_telemetry::debug; -use operator::schema_helper::SchemaHelper; use pipeline::{ContextReq, GreptimePipelineParams, PipelineContext, PipelineDefinition, Value}; use prost::encoding::message::merge; use prost::encoding::{decode_key, decode_varint, WireType}; @@ -31,6 +30,7 @@ use snafu::OptionExt; use crate::error::{InternalSnafu, Result}; use crate::http::event::PipelineIngestRequest; +use crate::http::prom_store::PromBulkContext; use crate::http::PromValidationMode; use crate::pipeline::run_pipeline; use crate::prom_row_builder::{PromCtx, TablesBuilder}; @@ -355,14 +355,8 @@ impl PromWriteRequest { } /// Converts the write request into a record batch and reset the table data. - pub async fn as_record_batch( - &mut self, - schema_helper: SchemaHelper, - query_ctx: &QueryContextRef, - ) -> Result<()> { - self.table_data - .as_record_batch(schema_helper, query_ctx) - .await + pub async fn as_record_batch(&mut self, bulk_ctx: &PromBulkContext) -> Result<()> { + self.table_data.as_record_batch(bulk_ctx).await } }