From 151273d1df148f8462b3b1439e95d5a3caa908d5 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 23 Jun 2025 19:21:09 +0800 Subject: [PATCH] feat: get region metadata by ids Signed-off-by: evenyag --- src/operator/src/error.rs | 9 ++++ src/operator/src/schema_helper.rs | 88 +++++++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index c488efe8cc..457a2d1ffd 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -860,6 +860,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to decode object from json"))] + DecodeJson { + #[snafu(source)] + error: serde_json::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -991,6 +999,7 @@ impl ErrorExt for Error { Error::InvalidProcessId { .. } => StatusCode::InvalidArguments, Error::ProcessManagerMissing { .. } => StatusCode::Unexpected, Error::PathNotFound { .. } => StatusCode::InvalidArguments, + Error::DecodeJson { .. } => StatusCode::Unexpected, } } diff --git a/src/operator/src/schema_helper.rs b/src/operator/src/schema_helper.rs index bd1d586eca..fddee7347e 100644 --- a/src/operator/src/schema_helper.rs +++ b/src/operator/src/schema_helper.rs @@ -18,6 +18,8 @@ use std::collections::HashMap; use std::sync::Arc; use api::v1::alter_table_expr::Kind; +use api::v1::region::region_request::Body; +use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader}; use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType}; use catalog::CatalogManagerRef; use common_catalog::consts::{ @@ -30,17 +32,23 @@ use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::TableMetadataManagerRef; +use common_meta::node_manager::NodeManagerRef; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::Partition; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use common_query::Output; use common_telemetry::tracing; +use common_telemetry::tracing_context::TracingContext; +use futures::future; +use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::create::Partitions; +use store_api::metadata::RegionMetadata; use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, }; +use store_api::storage::RegionId; use table::dist_table::DistTable; use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_name::TableName; @@ -48,13 +56,15 @@ use table::table_reference::TableReference; use table::TableRef; use crate::error::{ - CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, EmptyDdlExprSnafu, - ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, InvalidateTableCacheSnafu, - Result, SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, - TableMetadataManagerSnafu, TableNotFoundSnafu, UnexpectedSnafu, + CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DecodeJsonSnafu, + EmptyDdlExprSnafu, ExecuteDdlSnafu, FindRegionLeaderSnafu, InvalidPartitionRuleSnafu, + InvalidTableNameSnafu, InvalidateTableCacheSnafu, JoinTaskSnafu, RequestRegionSnafu, Result, + SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, + TableNotFoundSnafu, UnexpectedSnafu, }; use crate::expr_helper; use crate::insert::{build_create_table_expr, fill_table_options_for_create, AutoCreateTableType}; +use crate::region_req_factory::RegionRequestFactory; use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG}; /// Helper to query and manipulate (CREATE/ALTER) table schemas. @@ -714,3 +724,73 @@ async fn ensure_logical_tables_for_metrics( Ok(()) } + +/// Gets the list of metadatas for a list of region ids. +// TODO(yingwen): Should we return RegionMetadataRef? +async fn metadatas_for_region_ids( + partition_manager: &PartitionRuleManagerRef, + node_manager: &NodeManagerRef, + region_ids: &[RegionId], + ctx: &QueryContextRef, +) -> Result>> { + // Groups regions by peers. + // This map contains: peer => (ListMetadataRequest, A vec of indices of regions). + let mut request_per_region = HashMap::new(); + for (index, region_id) in region_ids.iter().copied().enumerate() { + let peer = partition_manager + .find_region_leader(region_id) + .await + .context(FindRegionLeaderSnafu)?; + let request_indices = request_per_region + .entry(peer) + .or_insert_with(|| (ListMetadataRequest::default(), Vec::new())); + request_indices.0.region_ids.push(region_id.as_u64()); + request_indices.1.push(index); + } + + // Sends requests to datanode and waits for responses. + let tasks = request_per_region + .into_iter() + .map(|(peer, (request, indices))| { + let node_manager = node_manager.clone(); + let request_factory = RegionRequestFactory::new(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + dbname: ctx.get_db_string(), + ..Default::default() + }); + + common_runtime::spawn_global(async move { + let request = request_factory.build_request(Body::ListMetadata(request)); + let resp = node_manager + .datanode(&peer) + .await + .handle(request) + .await + .context(RequestRegionSnafu)?; + + let metadatas: Vec> = + serde_json::from_slice(&resp.metadata).context(DecodeJsonSnafu)?; + Ok((metadatas, indices)) + }) + }); + let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?; + let mut output_metadatas = vec![None; region_ids.len()]; + for result in results { + let (mut metadatas, indices) = result?; + ensure!( + metadatas.len() == indices.len(), + UnexpectedSnafu { + violated: format!( + "Length mismatch between request and response, expected {} metadatas, got {}", + indices.len(), + metadatas.len() + ), + } + ); + for index in indices { + output_metadatas[index] = metadatas[index].take(); + } + } + + Ok(output_metadatas) +}