From 2fdbe6c8c30d2d9a04b106f6f4d7accb330338c2 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 15 May 2026 15:35:18 +0800 Subject: [PATCH] feat: expose node info for placement selectors (#8095) * feat: expose node info for placement selectors Return `NodeInfo` from `PeerDiscovery` methods and keep OSS selectors mapping back to `Peer`. Carry `__greptime_origin_frontend.addr` from frontend create-table DDLs into selector `extensions`, and thread `PeerAllocContext` through table-route allocation. Persist datanode `NodeInfo` when heartbeat stats are absent so collected env vars remain available after restart. Signed-off-by: Lei, HUANG * fix: skip datanode node info without stats Signed-off-by: Lei, HUANG * fix: avoid unnecessary workload clones Skip workload cloning for inactive nodes and for active node-info lookups without workload filters. Files: `src/meta-srv/src/discovery/utils.rs` Signed-off-by: Lei, HUANG * fix: require frontend origin address Require `StatementExecutor` to carry a concrete frontend origin address and always attach it to meta DDL query contexts. Files: `src/operator/src/statement.rs`, `src/operator/src/statement/ddl.rs`, `src/operator/src/utils.rs`, `src/frontend/src/instance/builder.rs`, `src/frontend/src/heartbeat.rs`, `src/flow/src/server.rs`, `src/cmd/src/standalone.rs`, `src/cmd/src/flownode.rs`. Signed-off-by: Lei, HUANG * refactor: reuse resolved frontend address Resolve the frontend peer address once in the frontend builder, store it on the instance, and reuse it for heartbeat and flow invoker origins. Files: `src/frontend/src/instance/builder.rs`, `src/frontend/src/instance.rs`, `src/frontend/src/heartbeat.rs`, `src/cmd/src/frontend.rs`, `src/cmd/src/standalone.rs`, `src/frontend/src/frontend.rs`, `src/frontend/src/heartbeat/tests.rs`. Signed-off-by: Lei, HUANG * fix: preserve datanode lease liveness Filter active datanode node infos through lease timestamps and workloads while preserving node info fields such as reported env vars. Files: `src/meta-srv/src/discovery/utils.rs`, `src/meta-srv/src/discovery/lease.rs`. Signed-off-by: Lei, HUANG * Remove stale datanode lease helper - `discovery`: remove the obsolete `alive_datanodes` helper and related tests in `src/meta-srv/src/discovery/utils.rs` and `src/meta-srv/src/discovery/lease.rs` - `integration`: update cluster and standalone setup paths in `tests-integration/src/cluster.rs` and `tests-integration/src/standalone.rs` Signed-off-by: Lei, HUANG * feat/env-based-region-selector-oss: simplify lease discovery - `lease-discovery`: simplify logic and remove unused utilities in `src/meta-srv/src/discovery/lease.rs` and `src/meta-srv/src/discovery/utils.rs` Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/cmd/src/flownode.rs | 2 + src/cmd/src/frontend.rs | 8 +- src/cmd/src/standalone.rs | 1 + src/common/frontend/src/selector.rs | 1 + .../meta/src/ddl/allocator/region_routes.rs | 6 +- src/common/meta/src/ddl/create_table.rs | 25 +- src/common/meta/src/ddl/flow_meta.rs | 7 +- src/common/meta/src/ddl/table_meta.rs | 18 +- src/common/meta/src/ddl_manager.rs | 12 +- src/common/meta/src/peer.rs | 21 +- src/common/meta/src/rpc/ddl.rs | 3 + src/flow/src/batching_mode/frontend_client.rs | 1 + src/flow/src/server.rs | 2 + src/frontend/src/frontend.rs | 1 + src/frontend/src/heartbeat.rs | 21 +- src/frontend/src/heartbeat/tests.rs | 25 +- src/frontend/src/instance.rs | 5 + src/frontend/src/instance/builder.rs | 4 + src/meta-client/src/client.rs | 8 +- src/meta-client/src/client/util.rs | 41 +- src/meta-srv/src/discovery.rs | 13 +- src/meta-srv/src/discovery/lease.rs | 472 ++++++------------ src/meta-srv/src/discovery/utils.rs | 178 +++++-- src/meta-srv/src/peer.rs | 13 +- .../procedure/repartition/allocate_region.rs | 2 + src/meta-srv/src/region/supervisor.rs | 1 + src/meta-srv/src/selector.rs | 5 +- src/meta-srv/src/selector/common.rs | 5 +- src/meta-srv/src/selector/lease_based.rs | 5 +- src/meta-srv/src/selector/load_based.rs | 5 +- src/meta-srv/src/selector/round_robin.rs | 9 +- src/meta-srv/src/test_util.rs | 42 +- src/meta-srv/src/utils/database.rs | 2 +- src/operator/src/statement.rs | 3 + src/operator/src/statement/ddl.rs | 6 +- src/operator/src/utils.rs | 40 +- tests-integration/src/cluster.rs | 2 +- tests-integration/src/standalone.rs | 1 + 38 files changed, 570 insertions(+), 446 deletions(-) diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index df7a0725e9..6228cbd3f3 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -43,6 +43,7 @@ use flow::{ }; use meta_client::{MetaClientOptions, MetaClientType}; use plugins::flownode::context::GrpcConfigureContext; +use servers::addrs; use servers::configurator::GrpcBuilderConfiguratorRef; use snafu::{OptionExt, ResultExt, ensure}; use tracing_appender::non_blocking::WorkerGuard; @@ -422,6 +423,7 @@ impl StartCommand { layered_cache_registry.clone(), meta_client.clone(), client, + addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), ) .await .context(StartFlownodeSnafu)?; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 5e19e90f89..b192bc7608 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -519,7 +519,13 @@ pub fn create_heartbeat_task( Arc::new(stat) }; - HeartbeatTask::new(options, meta_client, executor, stat) + HeartbeatTask::new( + instance.frontend_peer_addr().to_string(), + options, + meta_client, + executor, + stat, + ) } #[cfg(test)] diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 6f35e74b65..b8a21a98c4 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -600,6 +600,7 @@ impl StartCommand { layered_cache_registry.clone(), procedure_executor, node_manager.clone(), + fe_instance.frontend_peer_addr().to_string(), ) .await .context(StartFlownodeSnafu)?; diff --git a/src/common/frontend/src/selector.rs b/src/common/frontend/src/selector.rs index bba36ef81b..c9d7ae22f9 100644 --- a/src/common/frontend/src/selector.rs +++ b/src/common/frontend/src/selector.rs @@ -86,6 +86,7 @@ impl FrontendSelector for MetaClientSelector { peers .into_iter() + .map(|node| node.peer) .filter(predicate) .map(|peer| { let channel = self diff --git a/src/common/meta/src/ddl/allocator/region_routes.rs b/src/common/meta/src/ddl/allocator/region_routes.rs index 04665c5a31..44419001b3 100644 --- a/src/common/meta/src/ddl/allocator/region_routes.rs +++ b/src/common/meta/src/ddl/allocator/region_routes.rs @@ -18,7 +18,7 @@ use common_telemetry::debug; use store_api::storage::{RegionId, RegionNumber, TableId}; use crate::error::Result; -use crate::peer::PeerAllocator; +use crate::peer::{PeerAllocContext, PeerAllocator}; use crate::rpc::router::{Region, RegionRoute}; pub type RegionRoutesAllocatorRef = Arc; @@ -29,6 +29,7 @@ pub trait RegionRoutesAllocator: Send + Sync { &self, table_id: TableId, regions_and_partitions: &[(RegionNumber, &str)], + ctx: &PeerAllocContext, ) -> Result>; } @@ -38,9 +39,10 @@ impl RegionRoutesAllocator for T { &self, table_id: TableId, regions_and_partitions: &[(RegionNumber, &str)], + ctx: &PeerAllocContext, ) -> Result> { let regions = regions_and_partitions.len().max(1); - let peers = self.alloc(regions).await?; + let peers = self.alloc(regions, ctx).await?; debug!("Allocated peers {:?} for table {}", peers, table_id,); let mut region_routes = regions_and_partitions diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 650a706296..ed84c0cd1b 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -43,8 +43,9 @@ use crate::error::{self, Result}; use crate::key::table_route::PhysicalTableRouteValue; use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; use crate::metrics; +use crate::peer::PeerAllocContext; use crate::region_keeper::OperatingRegionGuard; -use crate::rpc::ddl::CreateTableTask; +use crate::rpc::ddl::{CreateTableTask, QueryContext}; use crate::rpc::router::{RegionRoute, operating_leader_region_roles}; pub struct CreateTableProcedure { @@ -76,11 +77,19 @@ impl CreateTableProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable"; pub fn new(task: CreateTableTask, context: DdlContext) -> Result { + Self::new_with_query_context(task, QueryContext::default(), context) + } + + pub fn new_with_query_context( + task: CreateTableTask, + query_context: QueryContext, + context: DdlContext, + ) -> Result { let executor = build_executor_from_create_table_data(&task.create_table)?; Ok(Self { context, - data: CreateTableData::new(task), + data: CreateTableData::new(task, query_context), opening_regions: vec![], executor, }) @@ -154,7 +163,12 @@ impl CreateTableProcedure { } = self .context .table_metadata_allocator - .create(&self.data.task) + .create_with_context( + &self.data.task, + &PeerAllocContext { + extensions: self.data.query_context.extensions.clone(), + }, + ) .await?; self.set_allocated_metadata(table_id, table_route, region_wal_options); @@ -356,6 +370,8 @@ pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, #[serde(default)] + pub query_context: QueryContext, + #[serde(default)] pub column_metadatas: Vec, /// None stands for not allocated yet. pub(crate) table_route: Option, @@ -364,11 +380,12 @@ pub struct CreateTableData { } impl CreateTableData { - pub fn new(task: CreateTableTask) -> Self { + pub fn new(task: CreateTableTask, query_context: QueryContext) -> Self { CreateTableData { state: CreateTableState::Prepare, column_metadatas: vec![], task, + query_context, table_route: None, region_wal_options: None, } diff --git a/src/common/meta/src/ddl/flow_meta.rs b/src/common/meta/src/ddl/flow_meta.rs index 85c1f3e989..bca6779dc4 100644 --- a/src/common/meta/src/ddl/flow_meta.rs +++ b/src/common/meta/src/ddl/flow_meta.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use crate::error::Result; use crate::key::FlowId; -use crate::peer::{NoopPeerAllocator, Peer, PeerAllocatorRef}; +use crate::peer::{NoopPeerAllocator, Peer, PeerAllocContext, PeerAllocatorRef}; use crate::sequence::SequenceRef; /// The reference of [FlowMetadataAllocator]. @@ -59,7 +59,10 @@ impl FlowMetadataAllocator { /// Allocates the [FlowId] and [Peer]s. pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec)> { let flow_id = self.allocate_flow_id().await?; - let peers = self.peer_allocator.alloc(partitions).await?; + let peers = self + .peer_allocator + .alloc(partitions, &PeerAllocContext::default()) + .await?; Ok((flow_id, peers)) } diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 10782d7fa1..4a0ecf7fb0 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -25,7 +25,7 @@ use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef; use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef; use crate::error::{Result, UnsupportedSnafu}; use crate::key::table_route::PhysicalTableRouteValue; -use crate::peer::{NoopPeerAllocator, PeerAllocatorRef}; +use crate::peer::{NoopPeerAllocator, PeerAllocContext, PeerAllocatorRef}; use crate::rpc::ddl::CreateTableTask; pub type TableMetadataAllocatorRef = Arc; @@ -108,6 +108,7 @@ impl TableMetadataAllocator { &self, table_id: TableId, partition_exprs: &[&str], + alloc_context: &PeerAllocContext, ) -> Result { let region_number_and_partition_exprs = partition_exprs .iter() @@ -116,7 +117,7 @@ impl TableMetadataAllocator { .collect::>(); let region_routes = self .region_routes_allocator - .allocate(table_id, ®ion_number_and_partition_exprs) + .allocate(table_id, ®ion_number_and_partition_exprs, alloc_context) .await?; Ok(PhysicalTableRouteValue::new(region_routes)) @@ -133,13 +134,24 @@ impl TableMetadataAllocator { } pub async fn create(&self, task: &CreateTableTask) -> Result { + self.create_with_context(task, &PeerAllocContext::default()) + .await + } + + pub async fn create_with_context( + &self, + task: &CreateTableTask, + alloc_context: &PeerAllocContext, + ) -> Result { let table_id = self.allocate_table_id(&task.create_table.table_id).await?; let partition_exprs = task .partitions .iter() .map(|p| p.expression.as_str()) .collect::>(); - let table_route = self.create_table_route(table_id, &partition_exprs).await?; + let table_route = self + .create_table_route(table_id, &partition_exprs, alloc_context) + .await?; let region_numbers = table_route .region_routes .iter() diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index d0619ca74f..52af4a36af 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -353,10 +353,15 @@ impl DdlManager { pub async fn submit_create_table_task( &self, create_table_task: CreateTableTask, + query_context: QueryContext, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = CreateTableProcedure::new(create_table_task, context)?; + let procedure = CreateTableProcedure::new_with_query_context( + create_table_task, + query_context, + context, + )?; let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -593,7 +598,7 @@ impl DdlManager { debug!("Submitting Ddl task: {:?}", request.task); match request.task { CreateTable(create_table_task) => { - handle_create_table_task(self, create_table_task).await + handle_create_table_task(self, create_table_task, request.query_context).await } DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await, AlterTable(alter_table_task) => { @@ -746,9 +751,10 @@ async fn handle_drop_table_task( async fn handle_create_table_task( ddl_manager: &DdlManager, create_table_task: CreateTableTask, + query_context: QueryContext, ) -> Result { let (id, output) = ddl_manager - .submit_create_table_task(create_table_task) + .submit_create_table_task(create_table_task, query_context) .await?; let procedure_id = id.to_string(); diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index f849facbb4..924c293bbe 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; pub use api::v1::meta::Peer; use api::v1::meta::heartbeat_request::NodeWorkloads; +use crate::cluster::NodeInfo; use crate::error::Error; use crate::{DatanodeId, FlownodeId}; @@ -48,7 +50,7 @@ pub trait PeerDiscovery: Send + Sync { /// /// A frontend is considered active if it has reported a heartbeat within the most recent heartbeat interval, /// as determined by the in-memory backend. - async fn active_frontends(&self) -> Result, Error>; + async fn active_frontends(&self) -> Result, Error>; /// Returns all currently active datanodes, optionally filtered by a predicate on their workloads. /// @@ -58,7 +60,7 @@ pub trait PeerDiscovery: Send + Sync { async fn active_datanodes( &self, filter: Option fn(&'a NodeWorkloads) -> bool>, - ) -> Result, Error>; + ) -> Result, Error>; /// Returns all currently active flownodes, optionally filtered by a predicate on their workloads. /// @@ -68,23 +70,28 @@ pub trait PeerDiscovery: Send + Sync { async fn active_flownodes( &self, filter: Option fn(&'a NodeWorkloads) -> bool>, - ) -> Result, Error>; + ) -> Result, Error>; } pub type PeerDiscoveryRef = Arc; +#[derive(Debug, Clone, Default)] +pub struct PeerAllocContext { + pub extensions: HashMap, +} + /// [`PeerAllocator`] allocates [`Peer`]s for creating region or flow. #[async_trait::async_trait] pub trait PeerAllocator: Send + Sync { - async fn alloc(&self, num: usize) -> Result, Error>; + async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result, Error>; } pub type PeerAllocatorRef = Arc; #[async_trait::async_trait] impl PeerAllocator for Arc { - async fn alloc(&self, num: usize) -> Result, Error> { - T::alloc(self, num).await + async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result, Error> { + T::alloc(self, num, ctx).await } } @@ -92,7 +99,7 @@ pub struct NoopPeerAllocator; #[async_trait::async_trait] impl PeerAllocator for NoopPeerAllocator { - async fn alloc(&self, num: usize) -> Result, Error> { + async fn alloc(&self, num: usize, _ctx: &PeerAllocContext) -> Result, Error> { Ok(vec![Peer::default(); num]) } } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index ed6f78154a..d2b700d30c 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -58,6 +58,9 @@ use crate::error::{ }; use crate::key::FlowId; +/// Reserved query-context extension key for the frontend peer address that submitted a DDL request. +pub const ORIGIN_FRONTEND_ADDR_EXTENSION_KEY: &str = "__greptime_origin_frontend.addr"; + /// DDL tasks #[derive(Debug, Clone)] pub enum DdlTask { diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index c29c52846b..7382f214e5 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -208,6 +208,7 @@ impl FrontendClient { meta_client .active_frontends() .await + .map(|nodes| nodes.into_iter().map(|node| node.peer).collect()) .map_err(BoxedError::new) .context(ExternalSnafu) } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 3dcb54c0a7..913128f386 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -544,6 +544,7 @@ impl FrontendInvoker { layered_cache_registry: LayeredCacheRegistryRef, procedure_executor: ProcedureExecutorRef, node_manager: NodeManagerRef, + origin_frontend_addr: String, ) -> Result { let table_route_cache: TableRouteCacheRef = layered_cache_registry.get().context(CacheRequiredSnafu { @@ -589,6 +590,7 @@ impl FrontendInvoker { inserter.clone(), partition_manager, None, + origin_frontend_addr, )); let invoker = FrontendInvoker::new(inserter, deleter, statement_executor); diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 6e862fdce4..fb3b096f06 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -326,6 +326,7 @@ mod tests { Arc::new(SuspendHandler::new(instance.suspend_state())), ])); let heartbeat_task = Some(HeartbeatTask::new( + instance.frontend_peer_addr().to_string(), options, meta_client, executor, diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 937c74be19..5ce695b7ce 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -51,20 +51,14 @@ pub struct HeartbeatTask { impl HeartbeatTask { pub fn new( + peer_addr: String, opts: &FrontendOptions, meta_client: Arc, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, resource_stat: ResourceStatRef, ) -> Self { HeartbeatTask { - // if internal grpc is configured, use its address as the peer address - // otherwise use the public grpc address, because peer address only promises to be reachable - // by other components, it doesn't matter whether it's internal or external - peer_addr: if let Some(internal) = &opts.internal_grpc { - addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr)) - } else { - addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)) - }, + peer_addr, meta_client, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, @@ -273,3 +267,14 @@ impl HeartbeatTask { } } } + +pub(crate) fn frontend_peer_addr(opts: &FrontendOptions) -> String { + // if internal grpc is configured, use its address as the peer address + // otherwise use the public grpc address, because peer address only promises to be reachable + // by other components, it doesn't matter whether it's internal or external + if let Some(internal) = &opts.internal_grpc { + addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr)) + } else { + addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)) + } +} diff --git a/src/frontend/src/heartbeat/tests.rs b/src/frontend/src/heartbeat/tests.rs index f93584855d..d6c314afba 100644 --- a/src/frontend/src/heartbeat/tests.rs +++ b/src/frontend/src/heartbeat/tests.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use api::v1::meta::HeartbeatResponse; +use api::v1::meta::{HeartbeatResponse, Role}; use common_meta::cache_invalidator::KvCacheInvalidator; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::{ @@ -26,9 +26,14 @@ use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::MetadataKey; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::table_info::TableInfoKey; +use common_stat::ResourceStatImpl; use common_telemetry::tracing_context::TracingContext; +use meta_client::client::MetaClient; use tokio::sync::mpsc; +use super::HeartbeatTask; +use crate::frontend::FrontendOptions; + #[derive(Default)] pub struct MockKvCacheInvalidator { inner: Mutex, i32>>, @@ -151,3 +156,21 @@ async fn test_invalidate_schema_key_handler() { ) .await; } + +#[test] +fn test_heartbeat_task_uses_resolved_peer_addr() { + let options = FrontendOptions::default(); + let meta_client = Arc::new(MetaClient::new(0, Role::Frontend)); + let executor = Arc::new(HandlerGroupExecutor::new(vec![])); + let stat = Arc::new(ResourceStatImpl::default()); + + let task = HeartbeatTask::new( + "10.0.0.1:4001".to_string(), + &options, + meta_client, + executor, + stat, + ); + + assert_eq!(task.peer_addr, "10.0.0.1:4001"); +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index dbce6a693e..ac0c52fccd 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -110,6 +110,7 @@ lazy_static! { /// [`servers::query_handler::sql::SqlQueryHandler`], etc. #[derive(Clone)] pub struct Instance { + frontend_peer_addr: String, catalog_manager: CatalogManagerRef, pipeline_operator: Arc, statement_executor: Arc, @@ -131,6 +132,10 @@ pub struct Instance { } impl Instance { + pub fn frontend_peer_addr(&self) -> &str { + &self.frontend_peer_addr + } + pub fn catalog_manager(&self) -> &CatalogManagerRef { &self.catalog_manager } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ba08cda7a2..6ab1427067 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -48,6 +48,7 @@ use snafu::{OptionExt, ResultExt}; use crate::error::{self, ExternalSnafu, Result}; use crate::events::EventHandlerImpl; use crate::frontend::FrontendOptions; +use crate::heartbeat::frontend_peer_addr; use crate::instance::Instance; use crate::instance::region_query::FrontendRegionQueryHandler; @@ -223,6 +224,7 @@ impl FrontendBuilder { ) .query_engine(); + let frontend_peer_addr = frontend_peer_addr(&self.options); let statement_executor = StatementExecutor::new( self.catalog_manager.clone(), query_engine.clone(), @@ -232,6 +234,7 @@ impl FrontendBuilder { inserter.clone(), partition_manager, Some(process_manager.clone()), + frontend_peer_addr.clone(), ); let statement_executor = @@ -265,6 +268,7 @@ impl FrontendBuilder { )))); Ok(Instance { + frontend_peer_addr, catalog_manager: self.catalog_manager, pipeline_operator, statement_executor, diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 46b537a68d..cbd9b43151 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -45,7 +45,7 @@ use common_meta::error::{ }; use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager}; use common_meta::kv_backend::KvBackendRef; -use common_meta::peer::{Peer, PeerDiscovery}; +use common_meta::peer::PeerDiscovery; use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor}; use common_meta::range_stream::PaginationStream; use common_meta::rpc::KeyValue; @@ -590,7 +590,7 @@ impl ClusterInfo for MetaClient { // maybe we need to use the timestamp from metasrv in the future. #[async_trait::async_trait] impl PeerDiscovery for MetaClient { - async fn active_frontends(&self) -> MetaResult> { + async fn active_frontends(&self) -> MetaResult> { let nodes = self .list_nodes(Some(ClusterRole::Frontend)) .await @@ -608,7 +608,7 @@ impl PeerDiscovery for MetaClient { async fn active_datanodes( &self, filter: Option fn(&'a NodeWorkloads) -> bool>, - ) -> MetaResult> { + ) -> MetaResult> { let nodes = self .list_nodes(Some(ClusterRole::Datanode)) .await @@ -625,7 +625,7 @@ impl PeerDiscovery for MetaClient { async fn active_flownodes( &self, filter: Option fn(&'a NodeWorkloads) -> bool>, - ) -> MetaResult> { + ) -> MetaResult> { let nodes = self .list_nodes(Some(ClusterRole::Flownode)) .await diff --git a/src/meta-client/src/client/util.rs b/src/meta-client/src/client/util.rs index aa08871e04..61a80a83e3 100644 --- a/src/meta-client/src/client/util.rs +++ b/src/meta-client/src/client/util.rs @@ -15,7 +15,6 @@ use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::{ErrorCode, ResponseHeader}; use common_meta::cluster::{NodeInfo, NodeStatus}; -use common_meta::peer::Peer; use common_time::util::SystemTimer; use tonic::{Code, Status}; @@ -49,14 +48,14 @@ pub(crate) fn alive_frontends( timer: &impl SystemTimer, nodes: Vec, active_duration: std::time::Duration, -) -> Vec { +) -> Vec { nodes .into_iter() .filter_map(|node| { if matches!(node.status, NodeStatus::Frontend(_)) && is_active_node(timer, node.last_activity_ts, active_duration) { - Some(node.peer) + Some(node) } else { None } @@ -69,17 +68,17 @@ pub(crate) fn alive_datanodes( nodes: Vec, active_duration: std::time::Duration, filter: Option fn(&'a NodeWorkloads) -> bool>, -) -> Vec { +) -> Vec { let filter = filter.unwrap_or(|_| true); nodes .into_iter() .filter_map(|node| { - if let NodeStatus::Datanode(status) = node.status + if let NodeStatus::Datanode(status) = &node.status && is_active_node(timer, node.last_activity_ts, active_duration) { - let workloads = NodeWorkloads::Datanode(status.workloads); - filter(&workloads).then_some(node.peer) + let workloads = NodeWorkloads::Datanode(status.workloads.clone()); + filter(&workloads).then_some(node) } else { None } @@ -92,17 +91,17 @@ pub(crate) fn alive_flownodes( nodes: Vec, active_duration: std::time::Duration, filter: Option fn(&'a NodeWorkloads) -> bool>, -) -> Vec { +) -> Vec { let filter = filter.unwrap_or(|_| true); nodes .into_iter() .filter_map(|node| { - if let NodeStatus::Flownode(status) = node.status + if let NodeStatus::Flownode(status) = &node.status && is_active_node(timer, node.last_activity_ts, active_duration) { - let workloads = NodeWorkloads::Flownode(status.workloads); - filter(&workloads).then_some(node.peer) + let workloads = NodeWorkloads::Flownode(status.workloads.clone()); + filter(&workloads).then_some(node) } else { None } @@ -202,7 +201,10 @@ mod tests { assert_eq!( vec![1], - peers.into_iter().map(|peer| peer.id).collect::>() + peers + .into_iter() + .map(|node| node.peer.id) + .collect::>() ); } @@ -229,7 +231,10 @@ mod tests { assert_eq!( vec![1], - peers.into_iter().map(|peer| peer.id).collect::>() + peers + .into_iter() + .map(|node| node.peer.id) + .collect::>() ); } @@ -249,7 +254,10 @@ mod tests { assert_eq!( vec![1], - peers.into_iter().map(|peer| peer.id).collect::>() + peers + .into_iter() + .map(|node| node.peer.id) + .collect::>() ); } @@ -282,7 +290,10 @@ mod tests { assert_eq!( vec![1], - peers.into_iter().map(|peer| peer.id).collect::>() + peers + .into_iter() + .map(|node| node.peer.id) + .collect::>() ); } } diff --git a/src/meta-srv/src/discovery.rs b/src/meta-srv/src/discovery.rs index c65d401232..b0d14e4a94 100644 --- a/src/meta-srv/src/discovery.rs +++ b/src/meta-srv/src/discovery.rs @@ -18,6 +18,7 @@ pub mod utils; use api::v1::meta::heartbeat_request::NodeWorkloads; use common_error::ext::BoxedError; +use common_meta::cluster::NodeInfo; use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::error::Result; use common_meta::peer::{Peer, PeerDiscovery, PeerResolver}; @@ -30,8 +31,8 @@ use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType}; #[async_trait::async_trait] impl PeerDiscovery for MetaPeerClient { - async fn active_frontends(&self) -> Result> { - utils::alive_frontends( + async fn active_frontends(&self) -> Result> { + utils::alive_frontend_infos( &DefaultSystemTimer, self, default_distributed_time_constants().frontend_heartbeat_interval, @@ -44,8 +45,8 @@ impl PeerDiscovery for MetaPeerClient { async fn active_datanodes( &self, filter: Option fn(&'a NodeWorkloads) -> bool>, - ) -> Result> { - utils::alive_datanodes( + ) -> Result> { + utils::alive_datanode_infos( &DefaultSystemTimer, self, default_distributed_time_constants().datanode_lease, @@ -59,8 +60,8 @@ impl PeerDiscovery for MetaPeerClient { async fn active_flownodes( &self, filter: Option fn(&'a NodeWorkloads) -> bool>, - ) -> Result> { - utils::alive_flownodes( + ) -> Result> { + utils::alive_flownode_infos( &DefaultSystemTimer, self, default_distributed_time_constants().flownode_lease, diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs index 9fcc52f423..f3c603af00 100644 --- a/src/meta-srv/src/discovery/lease.rs +++ b/src/meta-srv/src/discovery/lease.rs @@ -95,22 +95,22 @@ impl LeaseValueAccessor for MetaPeerClient { #[cfg(test)] mod tests { - use std::sync::Arc; - use std::sync::atomic::{AtomicI64, Ordering}; - use std::time::Duration; + use std::collections::HashMap; use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads}; - use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role}; + use common_meta::cluster::{ + DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role, + }; use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::peer::{Peer, PeerDiscovery}; use common_meta::rpc::store::PutRequest; - use common_time::util::{DefaultSystemTimer, SystemTimer, current_time_millis}; + use common_time::util::current_time_millis; use common_workload::DatanodeWorkloadType; - use crate::discovery::utils::{self, accept_ingest_workload}; - use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; + use crate::discovery::utils::accept_ingest_workload; + use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::test_util::create_meta_peer_client; async fn put_lease_value( @@ -128,14 +128,10 @@ mod tests { .unwrap(); } - async fn put_flownode_lease_value( - kv_backend: &ResettableKvBackendRef, - key: FlownodeLeaseKey, - value: LeaseValue, - ) { + async fn put_node_info(kv_backend: &ResettableKvBackendRef, key: NodeInfoKey, value: NodeInfo) { kv_backend .put(PutRequest { - key: key.try_into().unwrap(), + key: (&key).into(), value: value.try_into().unwrap(), prev_kv: false, }) @@ -143,332 +139,170 @@ mod tests { .unwrap(); } - struct MockTimer { - current: Arc, - } - - impl SystemTimer for MockTimer { - fn current_time_millis(&self) -> i64 { - self.current.fetch_add(1, Ordering::Relaxed) - } - - fn current_time_rfc3339(&self) -> String { - unimplemented!() - } - } - #[tokio::test] - async fn test_alive_datanodes() { + async fn test_active_datanodes_uses_lease_liveness_with_stale_node_info() { let client = create_meta_peer_client(); let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = DefaultSystemTimer; + let lease = default_distributed_time_constants().datanode_lease; - // put a stale lease value for node 1 - let key = DatanodeLeaseKey { node_id: 1 }; - let value = LeaseValue { - // 20s ago - timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000, - node_addr: "127.0.0.1:20201".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key, value).await; + let mut env_vars = HashMap::new(); + env_vars.insert("AZ".to_string(), "az-a".to_string()); - // put a fresh lease value for node 2 - let key = DatanodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key.clone(), value.clone()).await; - let peers = utils::alive_datanodes( - &timer, - client.as_ref(), - Duration::from_secs(lease_secs as u64), - None, - ) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]); - } - - #[tokio::test] - async fn test_alive_datanodes_with_timer() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = MockTimer { - current: Arc::new(AtomicI64::new(current_time_millis())), - }; - - let key = DatanodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key.clone(), value.clone()).await; - let peers = utils::alive_datanodes( - &timer, - client.as_ref(), - Duration::from_secs(lease_secs as u64), - None, - ) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]); - } - - #[tokio::test] - async fn test_alive_datanodes_with_condition() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = DefaultSystemTimer; - - // put a lease value for node 1 without mode info - let key = DatanodeLeaseKey { node_id: 1 }; - let value = LeaseValue { - // 20s ago - timestamp_millis: timer.current_time_millis() - 20 * 1000, - node_addr: "127.0.0.1:20201".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a lease value for node 2 with mode info - let key = DatanodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a lease value for node 3 with mode info - let key = DatanodeLeaseKey { node_id: 3 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20203".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![i32::MAX], - }), - }; - put_lease_value(&in_memory, key, value).await; - - // put a lease value for node 3 with mode info - let key = DatanodeLeaseKey { node_id: 4 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20204".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![i32::MAX], - }), - }; - put_lease_value(&in_memory, key, value).await; - - let peers = utils::alive_datanodes( - &timer, - client.as_ref(), - Duration::from_secs(lease_secs), - Some(accept_ingest_workload), - ) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string()))); - } - - #[tokio::test] - async fn test_alive_flownodes() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = DefaultSystemTimer; - - // put a stale lease value for node 1 - let key = FlownodeLeaseKey { node_id: 1 }; - let value = LeaseValue { - // 20s ago - timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000, - node_addr: "127.0.0.1:20201".to_string(), - workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }), - }; - put_flownode_lease_value(&in_memory, key, value).await; - - // put a fresh lease value for node 2 - let key = FlownodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }), - }; - put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await; - let peers = utils::alive_flownodes( - &timer, - client.as_ref(), - Duration::from_secs(lease_secs as u64), - None, - ) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]); - } - - #[tokio::test] - async fn test_alive_flownodes_with_timer() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = MockTimer { - current: Arc::new(AtomicI64::new(current_time_millis())), - }; - - let key = FlownodeLeaseKey { node_id: 2 }; - let value = LeaseValue { - timestamp_millis: timer.current_time_millis(), - node_addr: "127.0.0.1:20202".to_string(), - workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }), - }; - put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await; - let peers = utils::alive_flownodes( - &timer, - client.as_ref(), - Duration::from_secs(lease_secs as u64), - None, - ) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]); - } - - #[tokio::test] - async fn test_lookup_frontends() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = DefaultSystemTimer; - - let active_frontend_node = NodeInfo { - peer: Peer { - id: 0, - addr: "127.0.0.1:20201".to_string(), + put_node_info( + &in_memory, + NodeInfoKey { + role: Role::Datanode, + node_id: 1, }, - last_activity_ts: timer.current_time_millis(), - status: NodeStatus::Frontend(FrontendStatus {}), - version: "1.0.0".to_string(), - git_commit: "1234567890".to_string(), - start_time_ms: current_time_millis() as u64, - total_cpu_millicores: 0, - total_memory_bytes: 0, - cpu_usage_millicores: 0, - memory_usage_bytes: 0, - hostname: "test_hostname".to_string(), - env_vars: Default::default(), - }; + NodeInfo { + peer: Peer::new(1, "127.0.0.1:4001".to_string()), + last_activity_ts: current_time_millis() - (lease.as_millis() * 2) as i64, + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 0, + wcus: 0, + leader_regions: 0, + follower_regions: 0, + workloads: DatanodeWorkloads { + types: vec![i32::MAX], + }, + }), + version: String::new(), + git_commit: String::new(), + start_time_ms: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + hostname: String::new(), + env_vars, + }, + ) + .await; - let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); + put_lease_value( + &in_memory, + DatanodeLeaseKey { node_id: 1 }, + LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:4001".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }, + ) + .await; - in_memory - .put(PutRequest { - key: format!("{}{}", key_prefix, "0").into(), - value: active_frontend_node.try_into().unwrap(), - prev_kv: false, - }) + let nodes = client + .active_datanodes(Some(accept_ingest_workload)) .await .unwrap(); - let inactive_frontend_node = NodeInfo { - peer: Peer { - id: 1, - addr: "127.0.0.1:20201".to_string(), - }, - last_activity_ts: timer.current_time_millis() - 20 * 1000, - status: NodeStatus::Frontend(FrontendStatus {}), - version: "1.0.0".to_string(), - git_commit: "1234567890".to_string(), - start_time_ms: current_time_millis() as u64, - total_cpu_millicores: 0, - total_memory_bytes: 0, - cpu_usage_millicores: 0, - memory_usage_bytes: 0, - hostname: "test_hostname".to_string(), - env_vars: Default::default(), - }; - - in_memory - .put(PutRequest { - key: format!("{}{}", key_prefix, "1").into(), - value: inactive_frontend_node.try_into().unwrap(), - prev_kv: false, - }) - .await - .unwrap(); - - let peers = - utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs)) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert_eq!(peers[0].id, 0); + assert_eq!(nodes.len(), 1); + assert_eq!(nodes[0].peer.id, 1); + assert_eq!( + nodes[0].env_vars.get("AZ").map(String::as_str), + Some("az-a") + ); } #[tokio::test] - async fn test_lookup_frontends_with_timer() { + async fn test_active_datanodes_returns_node_info_with_env_vars() { let client = create_meta_peer_client(); let in_memory = client.memory_backend(); - let lease_secs = 10; - let timer = MockTimer { - current: Arc::new(AtomicI64::new(current_time_millis())), - }; - let active_frontend_node = NodeInfo { - peer: Peer { - id: 0, - addr: "127.0.0.1:20201".to_string(), + let mut env_vars = HashMap::new(); + env_vars.insert("AZ".to_string(), "az-a".to_string()); + + put_node_info( + &in_memory, + NodeInfoKey { + role: Role::Datanode, + node_id: 1, }, - last_activity_ts: timer.current_time_millis(), - status: NodeStatus::Frontend(FrontendStatus {}), - version: "1.0.0".to_string(), - git_commit: "1234567890".to_string(), - start_time_ms: current_time_millis() as u64, - total_cpu_millicores: 0, - total_memory_bytes: 0, - cpu_usage_millicores: 0, - memory_usage_bytes: 0, - hostname: "test_hostname".to_string(), - env_vars: Default::default(), - }; - let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend); - in_memory - .put(PutRequest { - key: format!("{}{}", key_prefix, "0").into(), - value: active_frontend_node.try_into().unwrap(), - prev_kv: false, - }) + NodeInfo { + peer: Peer::new(1, "127.0.0.1:4001".to_string()), + last_activity_ts: current_time_millis(), + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 0, + wcus: 0, + leader_regions: 0, + follower_regions: 0, + workloads: DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }, + }), + version: String::new(), + git_commit: String::new(), + start_time_ms: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + hostname: String::new(), + env_vars, + }, + ) + .await; + put_lease_value( + &in_memory, + DatanodeLeaseKey { node_id: 1 }, + LeaseValue { + timestamp_millis: current_time_millis(), + node_addr: "127.0.0.1:4001".to_string(), + workloads: NodeWorkloads::Datanode(DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid as i32], + }), + }, + ) + .await; + + let nodes = client + .active_datanodes(Some(accept_ingest_workload)) .await .unwrap(); - let peers = - utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs)) - .await - .unwrap(); - assert_eq!(peers.len(), 1); - assert_eq!(peers[0].id, 0); + + assert_eq!(nodes.len(), 1); + assert_eq!(nodes[0].peer.id, 1); + assert_eq!( + nodes[0].env_vars.get("AZ").map(String::as_str), + Some("az-a") + ); + } + + #[tokio::test] + async fn test_active_flownodes_returns_node_info() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + + put_node_info( + &in_memory, + NodeInfoKey { + role: Role::Flownode, + node_id: 11, + }, + NodeInfo { + peer: Peer::new(11, "127.0.0.1:5001".to_string()), + last_activity_ts: current_time_millis(), + status: NodeStatus::Flownode(FlownodeStatus { + workloads: FlownodeWorkloads { types: vec![7] }, + }), + version: String::new(), + git_commit: String::new(), + start_time_ms: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + hostname: String::new(), + env_vars: Default::default(), + }, + ) + .await; + + let nodes = client.active_flownodes(None).await.unwrap(); + + assert_eq!(nodes.len(), 1); + assert_eq!(nodes[0].peer.id, 11); } #[tokio::test] diff --git a/src/meta-srv/src/discovery/utils.rs b/src/meta-srv/src/discovery/utils.rs index 317033e0cf..2db2d73881 100644 --- a/src/meta-srv/src/discovery/utils.rs +++ b/src/meta-srv/src/discovery/utils.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::time::Duration; use api::v1::meta::heartbeat_request::NodeWorkloads; use common_meta::DatanodeId; -use common_meta::cluster::NodeInfo; +use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_time::util::SystemTimer; @@ -57,68 +58,137 @@ pub fn build_active_filter(active_duration: Duration) -> impl F } } -/// Returns the alive datanodes. -pub async fn alive_datanodes( - timer: &impl SystemTimer, - accessor: &impl LeaseValueAccessor, - active_duration: Duration, - condition: Option bool>, -) -> Result> { - let active_filter = build_active_filter(active_duration); - let condition = condition.unwrap_or(|_| true); - let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?; - let now = timer.current_time_millis(); - Ok(lease_values - .into_iter() - .filter_map(|(peer_id, lease_value)| { - if active_filter(now, &lease_value) && condition(&lease_value.workloads) { - Some(Peer::new(peer_id, lease_value.node_addr)) - } else { - None - } - }) - .collect::>()) -} - -/// Returns the alive flownodes. -pub async fn alive_flownodes( - timer: &impl SystemTimer, - accessor: &impl LeaseValueAccessor, - active_duration: Duration, - condition: Option bool>, -) -> Result> { - let active_filter = build_active_filter(active_duration); - let condition = condition.unwrap_or(|_| true); - let lease_values = accessor.lease_values(LeaseValueType::Flownode).await?; - let now = timer.current_time_millis(); - Ok(lease_values - .into_iter() - .filter_map(|(peer_id, lease_value)| { - if active_filter(now, &lease_value) && condition(&lease_value.workloads) { - Some(Peer::new(peer_id, lease_value.node_addr)) - } else { - None - } - }) - .collect::>()) -} - -/// Returns the alive frontends. -pub async fn alive_frontends( +/// Returns the alive frontend node infos. +pub async fn alive_frontend_infos( timer: &impl SystemTimer, lister: &impl NodeInfoAccessor, active_duration: Duration, -) -> Result> { +) -> Result> { let active_filter = build_active_filter(active_duration); let node_infos = lister.node_infos(NodeInfoType::Frontend).await?; let now = timer.current_time_millis(); + Ok(node_infos + .into_iter() + .filter_map(|(_, node_info)| active_filter(now, &node_info).then_some(node_info)) + .collect::>()) +} + +/// Returns the alive datanode node infos. +pub async fn alive_datanode_infos( + timer: &impl SystemTimer, + accessor: &(impl LeaseValueAccessor + NodeInfoAccessor), + active_duration: Duration, + condition: Option bool>, +) -> Result> { + let active_filter = build_active_filter(active_duration); + let condition = condition.unwrap_or(|_| true); + let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?; + let mut node_infos = accessor + .node_infos(NodeInfoType::Datanode) + .await? + .into_iter() + .collect::>(); + let now = timer.current_time_millis(); + + Ok(lease_values + .into_iter() + .filter_map(|(peer_id, lease_value)| { + if !active_filter(now, &lease_value) || !condition(&lease_value.workloads) { + return None; + } + + let peer = Peer::new(peer_id, lease_value.node_addr.clone()); + let mut node_info = node_infos + .remove(&peer_id) + .filter(|node_info| matches!(node_info.status, NodeStatus::Datanode(_))) + .unwrap_or_else(|| datanode_node_info_from_lease(peer.clone(), &lease_value)); + node_info.peer = peer; + node_info.last_activity_ts = lease_value.timestamp_millis; + if let (NodeStatus::Datanode(status), NodeWorkloads::Datanode(workloads)) = + (&mut node_info.status, &lease_value.workloads) + { + status.workloads = workloads.clone(); + } + Some(node_info) + }) + .collect::>()) +} + +fn datanode_node_info_from_lease(peer: Peer, lease_value: &LeaseValue) -> NodeInfo { + let workloads = match &lease_value.workloads { + NodeWorkloads::Datanode(workloads) => workloads.clone(), + _ => Default::default(), + }; + + NodeInfo { + peer, + last_activity_ts: lease_value.timestamp_millis, + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 0, + wcus: 0, + leader_regions: 0, + follower_regions: 0, + workloads, + }), + version: String::new(), + git_commit: String::new(), + start_time_ms: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + hostname: String::new(), + env_vars: Default::default(), + } +} + +/// Returns the alive flownode node infos. +pub async fn alive_flownode_infos( + timer: &impl SystemTimer, + lister: &impl NodeInfoAccessor, + active_duration: Duration, + condition: Option bool>, +) -> Result> { + alive_node_infos( + timer, + lister, + NodeInfoType::Flownode, + active_duration, + condition, + ) + .await +} + +async fn alive_node_infos( + timer: &impl SystemTimer, + lister: &impl NodeInfoAccessor, + node_info_type: NodeInfoType, + active_duration: Duration, + condition: Option bool>, +) -> Result> { + let active_filter = build_active_filter(active_duration); + let node_infos = lister.node_infos(node_info_type).await?; + let now = timer.current_time_millis(); Ok(node_infos .into_iter() .filter_map(|(_, node_info)| { - if active_filter(now, &node_info) { - Some(node_info.peer) - } else { - None + if !active_filter(now, &node_info) { + return None; + } + + match (&node_info.status, condition) { + (NodeStatus::Datanode(status), Some(condition)) => { + let workloads = NodeWorkloads::Datanode(status.workloads.clone()); + condition(&workloads).then_some(node_info) + } + (NodeStatus::Flownode(status), Some(condition)) => { + let workloads = NodeWorkloads::Flownode(status.workloads.clone()); + condition(&workloads).then_some(node_info) + } + (NodeStatus::Datanode(_), None) | (NodeStatus::Flownode(_), None) => { + Some(node_info) + } + _ => None, } }) .collect::>()) diff --git a/src/meta-srv/src/peer.rs b/src/meta-srv/src/peer.rs index 2672a239e0..9572006182 100644 --- a/src/meta-srv/src/peer.rs +++ b/src/meta-srv/src/peer.rs @@ -17,7 +17,7 @@ use std::collections::HashSet; use async_trait::async_trait; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result as MetaResult}; -use common_meta::peer::{Peer, PeerAllocator}; +use common_meta::peer::{Peer, PeerAllocContext, PeerAllocator}; use snafu::{ResultExt, ensure}; use crate::discovery::utils::accept_ingest_workload; @@ -55,7 +55,11 @@ impl MetasrvPeerAllocator { /// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is /// no guarantee that how the returned peers are used, like whether they are from the same /// table or not. So this method isn't idempotent. - async fn alloc(&self, min_required_items: usize) -> Result> { + async fn alloc( + &self, + min_required_items: usize, + alloc_context: &PeerAllocContext, + ) -> Result> { if let Some(max_items) = self.max_items { ensure!( min_required_items <= max_items as usize, @@ -71,6 +75,7 @@ impl MetasrvPeerAllocator { allow_duplication: true, exclude_peer_ids: HashSet::new(), workload_filter: Some(accept_ingest_workload), + extensions: alloc_context.extensions.clone(), }, ) .await @@ -79,8 +84,8 @@ impl MetasrvPeerAllocator { #[async_trait] impl PeerAllocator for MetasrvPeerAllocator { - async fn alloc(&self, regions: usize) -> MetaResult> { - self.alloc(regions) + async fn alloc(&self, regions: usize, ctx: &PeerAllocContext) -> MetaResult> { + self.alloc(regions, ctx) .await .map_err(BoxedError::new) .context(ExternalSnafu) diff --git a/src/meta-srv/src/procedure/repartition/allocate_region.rs b/src/meta-srv/src/procedure/repartition/allocate_region.rs index e38d6d3a95..a3293e8c3e 100644 --- a/src/meta-srv/src/procedure/repartition/allocate_region.rs +++ b/src/meta-srv/src/procedure/repartition/allocate_region.rs @@ -21,6 +21,7 @@ use common_meta::ddl::create_table::template::{ }; use common_meta::lock_key::TableLock; use common_meta::node_manager::NodeManagerRef; +use common_meta::peer::PeerAllocContext; use common_meta::rpc::router::RegionRoute; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{debug, info}; @@ -156,6 +157,7 @@ impl ExecutePlan { .iter() .map(|(n, p)| (*n, p.as_str())) .collect::>(), + &PeerAllocContext::default(), ) .await .context(error::AllocateRegionRoutesSnafu { table_id })?; diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 05716e60c8..441d6498e0 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -620,6 +620,7 @@ impl RegionSupervisor { allow_duplication: true, exclude_peer_ids, workload_filter: Some(accept_ingest_workload), + extensions: Default::default(), }; let peers = selector.select(&self.selector_context, opt).await?; ensure!( diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 2562c6bfb7..a02d294d82 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -20,7 +20,7 @@ pub mod round_robin; pub(crate) mod test_utils; pub mod weight_compute; pub mod weighted_choose; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use api::v1::meta::heartbeat_request::NodeWorkloads; use serde::{Deserialize, Serialize}; @@ -66,6 +66,8 @@ pub struct SelectorOptions { pub exclude_peer_ids: HashSet, /// The filter to select the peers based on their workloads. pub workload_filter: Option bool>, + /// Additional placement context, such as the frontend identity that originated the request. + pub extensions: HashMap, } impl Default for SelectorOptions { @@ -75,6 +77,7 @@ impl Default for SelectorOptions { allow_duplication: false, exclude_peer_ids: HashSet::new(), workload_filter: None, + extensions: HashMap::new(), } } } diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs index 70da22aedd..2a31c1ce86 100644 --- a/src/meta-srv/src/selector/common.rs +++ b/src/meta-srv/src/selector/common.rs @@ -86,7 +86,7 @@ where #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; use common_meta::peer::Peer; @@ -140,6 +140,7 @@ mod tests { allow_duplication: false, exclude_peer_ids: HashSet::new(), workload_filter: None, + extensions: HashMap::new(), }; let selected_peers: HashSet<_> = @@ -156,6 +157,7 @@ mod tests { allow_duplication: false, exclude_peer_ids: HashSet::new(), workload_filter: None, + extensions: HashMap::new(), }; let selected_result = @@ -168,6 +170,7 @@ mod tests { allow_duplication: true, exclude_peer_ids: HashSet::new(), workload_filter: None, + extensions: HashMap::new(), }; let selected_peers = diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index ad5993696c..4a44bfca1d 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -36,7 +36,10 @@ impl Selector for LeaseBasedSelector { .peer_discovery .active_datanodes(opts.workload_filter) .await - .context(ListActiveDatanodesSnafu)?; + .context(ListActiveDatanodesSnafu)? + .into_iter() + .map(|node| node.peer) + .collect::>(); // 2. compute weight array, but the weight of each item is the same. let mut weight_array = alive_datanodes diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 02c8581522..9f36ba051f 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -55,7 +55,10 @@ where .peer_discovery .active_datanodes(opts.workload_filter) .await - .context(ListActiveDatanodesSnafu)?; + .context(ListActiveDatanodesSnafu)? + .into_iter() + .map(|node| node.peer) + .collect::>(); // 2. get stat kvs and filter out expired datanodes. let stat_keys = alive_datanodes diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index 0f8727fba6..f9cf71e949 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -65,6 +65,7 @@ impl RoundRobinSelector { // 2. filter out excluded datanodes. alive_datanodes .into_iter() + .map(|node| node.peer) .filter(|p| !opts.exclude_peer_ids.contains(&p.id)) .collect::>() } @@ -72,7 +73,10 @@ impl RoundRobinSelector { .peer_discovery .active_flownodes(opts.workload_filter) .await - .context(ListActiveFlownodesSnafu)?, + .context(ListActiveFlownodesSnafu)? + .into_iter() + .map(|node| node.peer) + .collect::>(), }; ensure!( @@ -150,6 +154,7 @@ mod test { allow_duplication: true, exclude_peer_ids: HashSet::new(), workload_filter: None, + extensions: Default::default(), }, ) .await @@ -168,6 +173,7 @@ mod test { allow_duplication: true, exclude_peer_ids: HashSet::new(), workload_filter: None, + extensions: Default::default(), }, ) .await @@ -210,6 +216,7 @@ mod test { allow_duplication: true, exclude_peer_ids: HashSet::from([2, 5]), workload_filter: None, + extensions: Default::default(), }, ) .await diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 1d9437ef12..1169a93309 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use api::v1::meta::DatanodeWorkloads; use api::v1::meta::heartbeat_request::NodeWorkloads; +use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus, Role}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; @@ -57,6 +58,7 @@ pub(crate) fn create_meta_peer_client() -> MetaPeerClientRef { pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanodes: Vec) { let backend = meta_peer_client.memory_backend(); for datanode in datanodes { + let peer = datanode.clone(); let lease_key = DatanodeLeaseKey { node_id: datanode.id, }; @@ -69,9 +71,45 @@ pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanode }; let lease_key_bytes: Vec = lease_key.try_into().unwrap(); let lease_value_bytes: Vec = lease_value.try_into().unwrap(); + backend + .put(common_meta::rpc::store::PutRequest { + key: lease_key_bytes, + value: lease_value_bytes, + ..Default::default() + }) + .await + .unwrap(); + + let workloads = DatanodeWorkloads { + types: vec![DatanodeWorkloadType::Hybrid.to_i32()], + }; + let node_info = NodeInfo { + peer, + last_activity_ts: time_util::current_time_millis(), + status: NodeStatus::Datanode(DatanodeStatus { + rcus: 0, + wcus: 0, + leader_regions: 0, + follower_regions: 0, + workloads, + }), + version: String::new(), + git_commit: String::new(), + start_time_ms: 0, + total_cpu_millicores: 0, + total_memory_bytes: 0, + cpu_usage_millicores: 0, + memory_usage_bytes: 0, + hostname: String::new(), + env_vars: Default::default(), + }; + let node_info_key = NodeInfoKey { + role: Role::Datanode, + node_id: datanode.id, + }; let put_request = common_meta::rpc::store::PutRequest { - key: lease_key_bytes, - value: lease_value_bytes, + key: (&node_info_key).into(), + value: node_info.try_into().unwrap(), ..Default::default() }; backend.put(put_request).await.unwrap(); diff --git a/src/meta-srv/src/utils/database.rs b/src/meta-srv/src/utils/database.rs index bf8cad535f..976be8a85b 100644 --- a/src/meta-srv/src/utils/database.rs +++ b/src/meta-srv/src/utils/database.rs @@ -105,7 +105,7 @@ impl DatabaseOperator { let urls = frontends .into_iter() - .map(|peer| peer.addr) + .map(|node| node.peer.addr) .collect::>(); debug!("Available frontend addresses: {:?}", urls); diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index f3931255d0..34b2550acf 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -118,6 +118,7 @@ pub struct StatementExecutor { cache_invalidator: CacheInvalidatorRef, inserter: InserterRef, process_manager: Option, + origin_frontend_addr: String, #[cfg(feature = "enterprise")] trigger_querier: Option, } @@ -153,6 +154,7 @@ impl StatementExecutor { inserter: InserterRef, partition_manager: PartitionRuleManagerRef, process_manager: Option, + origin_frontend_addr: String, ) -> Self { Self { catalog_manager, @@ -165,6 +167,7 @@ impl StatementExecutor { cache_invalidator, inserter, process_manager, + origin_frontend_addr, #[cfg(feature = "enterprise")] trigger_querier: None, } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 6fd6e4adb4..08d3206548 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -105,7 +105,7 @@ use crate::error::{ use crate::expr_helper::{self, RepartitionRequest}; use crate::statement::StatementExecutor; use crate::statement::show::create_partitions_stmt; -use crate::utils::to_meta_query_context; +use crate::utils::{to_meta_query_context, to_meta_query_context_with_origin_frontend}; #[derive(Debug, Clone, Copy)] struct DdlSubmitOptions { @@ -1824,7 +1824,7 @@ impl StatementExecutor { .collect::>>()?; let request = SubmitDdlTaskRequest::new( - to_meta_query_context(query_context), + to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr), DdlTask::new_create_table(create_table, partitions, table_info), ); @@ -1840,7 +1840,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - to_meta_query_context(query_context), + to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr), DdlTask::new_create_logical_tables(tables_data), ); diff --git a/src/operator/src/utils.rs b/src/operator/src/utils.rs index 6e9386b3fa..fbb760c4fc 100644 --- a/src/operator/src/utils.rs +++ b/src/operator/src/utils.rs @@ -34,6 +34,18 @@ pub fn to_meta_query_context( } } +pub fn to_meta_query_context_with_origin_frontend( + query_context: QueryContextRef, + origin_frontend_addr: &str, +) -> common_meta::rpc::ddl::QueryContext { + let mut meta_query_context = to_meta_query_context(query_context); + meta_query_context.extensions.insert( + common_meta::rpc::ddl::ORIGIN_FRONTEND_ADDR_EXTENSION_KEY.to_string(), + origin_frontend_addr.to_string(), + ); + meta_query_context +} + pub fn try_to_session_query_context( value: common_meta::rpc::ddl::QueryContext, ) -> Result { @@ -57,10 +69,14 @@ mod tests { use std::collections::HashMap; use std::sync::{Arc, RwLock}; + use common_meta::rpc::ddl::ORIGIN_FRONTEND_ADDR_EXTENSION_KEY; use common_time::Timezone; use session::context::QueryContextBuilder; - use super::{to_meta_query_context, try_to_session_query_context}; + use super::{ + to_meta_query_context, to_meta_query_context_with_origin_frontend, + try_to_session_query_context, + }; #[test] fn test_query_context_meta_roundtrip_with_sequences() { @@ -84,4 +100,26 @@ mod tests { assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)])); assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true")); } + + #[test] + fn test_meta_query_context_with_origin_frontend_overrides_reserved_key() { + let session_ctx = Arc::new( + QueryContextBuilder::default() + .set_extension( + ORIGIN_FRONTEND_ADDR_EXTENSION_KEY.to_string(), + "spoofed".to_string(), + ) + .build(), + ); + + let meta_ctx = to_meta_query_context_with_origin_frontend(session_ctx, "127.0.0.1:4000"); + + assert_eq!( + meta_ctx + .extensions + .get(ORIGIN_FRONTEND_ADDR_EXTENSION_KEY) + .map(String::as_str), + Some("127.0.0.1:4000") + ); + } } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 0719fcd9f1..3021f684fe 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -401,7 +401,7 @@ impl GreptimeDbClusterBuilder { expected_datanodes: usize, ) { for _ in 0..100 { - let alive_datanodes = discovery::utils::alive_datanodes( + let alive_datanodes = discovery::utils::alive_datanode_infos( &DefaultSystemTimer, meta_peer_client.as_ref(), Duration::from_secs(u64::MAX), diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index acdd0e60b5..b013b8b0d4 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -286,6 +286,7 @@ impl GreptimeDbStandaloneBuilder { cache_registry.clone(), procedure_executor.clone(), node_manager.clone(), + instance.frontend_peer_addr().to_string(), ) .await .context(StartFlownodeSnafu)