feat: get region metadata by ids

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-23 19:21:09 +08:00
committed by Lei, HUANG
parent b0289dbdde
commit 151273d1df
2 changed files with 93 additions and 4 deletions

View File

@@ -860,6 +860,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode object from json"))]
DecodeJson {
#[snafu(source)]
error: serde_json::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -991,6 +999,7 @@ impl ErrorExt for Error {
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
Error::DecodeJson { .. } => StatusCode::Unexpected,
}
}

View File

@@ -18,6 +18,8 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::alter_table_expr::Kind;
use api::v1::region::region_request::Body;
use api::v1::region::{ListMetadataRequest, RegionRequest, RegionRequestHeader};
use api::v1::{AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, SemanticType};
use catalog::CatalogManagerRef;
use common_catalog::consts::{
@@ -30,17 +32,23 @@ use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_meta::node_manager::NodeManagerRef;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::Partition;
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
use common_query::Output;
use common_telemetry::tracing;
use common_telemetry::tracing_context::TracingContext;
use futures::future;
use partition::manager::PartitionRuleManagerRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::create::Partitions;
use store_api::metadata::RegionMetadata;
use store_api::metric_engine_consts::{
LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::storage::RegionId;
use table::dist_table::DistTable;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_name::TableName;
@@ -48,13 +56,15 @@ use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, EmptyDdlExprSnafu,
ExecuteDdlSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, InvalidateTableCacheSnafu,
Result, SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnexpectedSnafu,
CatalogSnafu, CreateLogicalTablesSnafu, CreateTableInfoSnafu, DecodeJsonSnafu,
EmptyDdlExprSnafu, ExecuteDdlSnafu, FindRegionLeaderSnafu, InvalidPartitionRuleSnafu,
InvalidTableNameSnafu, InvalidateTableCacheSnafu, JoinTaskSnafu, RequestRegionSnafu, Result,
SchemaNotFoundSnafu, SchemaReadOnlySnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu,
TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::expr_helper;
use crate::insert::{build_create_table_expr, fill_table_options_for_create, AutoCreateTableType};
use crate::region_req_factory::RegionRequestFactory;
use crate::statement::ddl::{create_table_info, parse_partitions, verify_alter, NAME_PATTERN_REG};
/// Helper to query and manipulate (CREATE/ALTER) table schemas.
@@ -714,3 +724,73 @@ async fn ensure_logical_tables_for_metrics(
Ok(())
}
/// Gets the list of metadatas for a list of region ids.
// TODO(yingwen): Should we return RegionMetadataRef?
async fn metadatas_for_region_ids(
partition_manager: &PartitionRuleManagerRef,
node_manager: &NodeManagerRef,
region_ids: &[RegionId],
ctx: &QueryContextRef,
) -> Result<Vec<Option<RegionMetadata>>> {
// Groups regions by peers.
// This map contains: peer => (ListMetadataRequest, A vec of indices of regions).
let mut request_per_region = HashMap::new();
for (index, region_id) in region_ids.iter().copied().enumerate() {
let peer = partition_manager
.find_region_leader(region_id)
.await
.context(FindRegionLeaderSnafu)?;
let request_indices = request_per_region
.entry(peer)
.or_insert_with(|| (ListMetadataRequest::default(), Vec::new()));
request_indices.0.region_ids.push(region_id.as_u64());
request_indices.1.push(index);
}
// Sends requests to datanode and waits for responses.
let tasks = request_per_region
.into_iter()
.map(|(peer, (request, indices))| {
let node_manager = node_manager.clone();
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
dbname: ctx.get_db_string(),
..Default::default()
});
common_runtime::spawn_global(async move {
let request = request_factory.build_request(Body::ListMetadata(request));
let resp = node_manager
.datanode(&peer)
.await
.handle(request)
.await
.context(RequestRegionSnafu)?;
let metadatas: Vec<Option<RegionMetadata>> =
serde_json::from_slice(&resp.metadata).context(DecodeJsonSnafu)?;
Ok((metadatas, indices))
})
});
let results = future::try_join_all(tasks).await.context(JoinTaskSnafu)?;
let mut output_metadatas = vec![None; region_ids.len()];
for result in results {
let (mut metadatas, indices) = result?;
ensure!(
metadatas.len() == indices.len(),
UnexpectedSnafu {
violated: format!(
"Length mismatch between request and response, expected {} metadatas, got {}",
indices.len(),
metadatas.len()
),
}
);
for index in indices {
output_metadatas[index] = metadatas[index].take();
}
}
Ok(output_metadatas)
}