feat: expose node info for placement selectors (#8095)

* feat: expose node info for placement selectors

Return `NodeInfo` from `PeerDiscovery` methods and keep OSS selectors mapping back to `Peer`.

Carry `__greptime_origin_frontend.addr` from frontend create-table DDLs into selector `extensions`, and thread `PeerAllocContext` through table-route allocation.

Persist datanode `NodeInfo` when heartbeat stats are absent so collected env vars remain available after restart.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: skip datanode node info without stats

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: avoid unnecessary workload clones

Skip workload cloning for inactive nodes and for active node-info lookups without workload filters.

Files: `src/meta-srv/src/discovery/utils.rs`
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: require frontend origin address

Require `StatementExecutor` to carry a concrete frontend origin address and always attach it to meta DDL query contexts.

Files: `src/operator/src/statement.rs`, `src/operator/src/statement/ddl.rs`, `src/operator/src/utils.rs`, `src/frontend/src/instance/builder.rs`, `src/frontend/src/heartbeat.rs`, `src/flow/src/server.rs`, `src/cmd/src/standalone.rs`, `src/cmd/src/flownode.rs`.
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor: reuse resolved frontend address

Resolve the frontend peer address once in the frontend builder, store it on the instance, and reuse it for heartbeat and flow invoker origins.

Files: `src/frontend/src/instance/builder.rs`, `src/frontend/src/instance.rs`, `src/frontend/src/heartbeat.rs`, `src/cmd/src/frontend.rs`, `src/cmd/src/standalone.rs`, `src/frontend/src/frontend.rs`, `src/frontend/src/heartbeat/tests.rs`.
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: preserve datanode lease liveness

Filter active datanode node infos through lease timestamps and workloads while preserving node info fields such as reported env vars.

Files: `src/meta-srv/src/discovery/utils.rs`, `src/meta-srv/src/discovery/lease.rs`.
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* Remove stale datanode lease helper

- `discovery`: remove the obsolete `alive_datanodes` helper and related tests in `src/meta-srv/src/discovery/utils.rs` and `src/meta-srv/src/discovery/lease.rs`
- `integration`: update cluster and standalone setup paths in `tests-integration/src/cluster.rs` and `tests-integration/src/standalone.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/env-based-region-selector-oss: simplify lease discovery

- `lease-discovery`: simplify logic and remove unused utilities in `src/meta-srv/src/discovery/lease.rs` and `src/meta-srv/src/discovery/utils.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-05-15 15:35:18 +08:00
committed by GitHub
parent ebaa9272ee
commit 2fdbe6c8c3
38 changed files with 570 additions and 446 deletions

View File

@@ -43,6 +43,7 @@ use flow::{
};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::flownode::context::GrpcConfigureContext;
use servers::addrs;
use servers::configurator::GrpcBuilderConfiguratorRef;
use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
@@ -422,6 +423,7 @@ impl StartCommand {
layered_cache_registry.clone(),
meta_client.clone(),
client,
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
)
.await
.context(StartFlownodeSnafu)?;

View File

@@ -519,7 +519,13 @@ pub fn create_heartbeat_task(
Arc::new(stat)
};
HeartbeatTask::new(options, meta_client, executor, stat)
HeartbeatTask::new(
instance.frontend_peer_addr().to_string(),
options,
meta_client,
executor,
stat,
)
}
#[cfg(test)]

View File

@@ -600,6 +600,7 @@ impl StartCommand {
layered_cache_registry.clone(),
procedure_executor,
node_manager.clone(),
fe_instance.frontend_peer_addr().to_string(),
)
.await
.context(StartFlownodeSnafu)?;

View File

@@ -86,6 +86,7 @@ impl FrontendSelector for MetaClientSelector {
peers
.into_iter()
.map(|node| node.peer)
.filter(predicate)
.map(|peer| {
let channel = self

View File

@@ -18,7 +18,7 @@ use common_telemetry::debug;
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::error::Result;
use crate::peer::PeerAllocator;
use crate::peer::{PeerAllocContext, PeerAllocator};
use crate::rpc::router::{Region, RegionRoute};
pub type RegionRoutesAllocatorRef = Arc<dyn RegionRoutesAllocator>;
@@ -29,6 +29,7 @@ pub trait RegionRoutesAllocator: Send + Sync {
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
ctx: &PeerAllocContext,
) -> Result<Vec<RegionRoute>>;
}
@@ -38,9 +39,10 @@ impl<T: PeerAllocator> RegionRoutesAllocator for T {
&self,
table_id: TableId,
regions_and_partitions: &[(RegionNumber, &str)],
ctx: &PeerAllocContext,
) -> Result<Vec<RegionRoute>> {
let regions = regions_and_partitions.len().max(1);
let peers = self.alloc(regions).await?;
let peers = self.alloc(regions, ctx).await?;
debug!("Allocated peers {:?} for table {}", peers, table_id,);
let mut region_routes = regions_and_partitions

View File

@@ -43,8 +43,9 @@ use crate::error::{self, Result};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::metrics;
use crate::peer::PeerAllocContext;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::ddl::{CreateTableTask, QueryContext};
use crate::rpc::router::{RegionRoute, operating_leader_region_roles};
pub struct CreateTableProcedure {
@@ -76,11 +77,19 @@ impl CreateTableProcedure {
pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateTable";
pub fn new(task: CreateTableTask, context: DdlContext) -> Result<Self> {
Self::new_with_query_context(task, QueryContext::default(), context)
}
pub fn new_with_query_context(
task: CreateTableTask,
query_context: QueryContext,
context: DdlContext,
) -> Result<Self> {
let executor = build_executor_from_create_table_data(&task.create_table)?;
Ok(Self {
context,
data: CreateTableData::new(task),
data: CreateTableData::new(task, query_context),
opening_regions: vec![],
executor,
})
@@ -154,7 +163,12 @@ impl CreateTableProcedure {
} = self
.context
.table_metadata_allocator
.create(&self.data.task)
.create_with_context(
&self.data.task,
&PeerAllocContext {
extensions: self.data.query_context.extensions.clone(),
},
)
.await?;
self.set_allocated_metadata(table_id, table_route, region_wal_options);
@@ -356,6 +370,8 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
#[serde(default)]
pub query_context: QueryContext,
#[serde(default)]
pub column_metadatas: Vec<ColumnMetadata>,
/// None stands for not allocated yet.
pub(crate) table_route: Option<PhysicalTableRouteValue>,
@@ -364,11 +380,12 @@ pub struct CreateTableData {
}
impl CreateTableData {
pub fn new(task: CreateTableTask) -> Self {
pub fn new(task: CreateTableTask, query_context: QueryContext) -> Self {
CreateTableData {
state: CreateTableState::Prepare,
column_metadatas: vec![],
task,
query_context,
table_route: None,
region_wal_options: None,
}

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use crate::error::Result;
use crate::key::FlowId;
use crate::peer::{NoopPeerAllocator, Peer, PeerAllocatorRef};
use crate::peer::{NoopPeerAllocator, Peer, PeerAllocContext, PeerAllocatorRef};
use crate::sequence::SequenceRef;
/// The reference of [FlowMetadataAllocator].
@@ -59,7 +59,10 @@ impl FlowMetadataAllocator {
/// Allocates the [FlowId] and [Peer]s.
pub async fn create(&self, partitions: usize) -> Result<(FlowId, Vec<Peer>)> {
let flow_id = self.allocate_flow_id().await?;
let peers = self.peer_allocator.alloc(partitions).await?;
let peers = self
.peer_allocator
.alloc(partitions, &PeerAllocContext::default())
.await?;
Ok((flow_id, peers))
}

View File

@@ -25,7 +25,7 @@ use crate::ddl::allocator::resource_id::ResourceIdAllocatorRef;
use crate::ddl::allocator::wal_options::WalOptionsAllocatorRef;
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::PhysicalTableRouteValue;
use crate::peer::{NoopPeerAllocator, PeerAllocatorRef};
use crate::peer::{NoopPeerAllocator, PeerAllocContext, PeerAllocatorRef};
use crate::rpc::ddl::CreateTableTask;
pub type TableMetadataAllocatorRef = Arc<TableMetadataAllocator>;
@@ -108,6 +108,7 @@ impl TableMetadataAllocator {
&self,
table_id: TableId,
partition_exprs: &[&str],
alloc_context: &PeerAllocContext,
) -> Result<PhysicalTableRouteValue> {
let region_number_and_partition_exprs = partition_exprs
.iter()
@@ -116,7 +117,7 @@ impl TableMetadataAllocator {
.collect::<Vec<_>>();
let region_routes = self
.region_routes_allocator
.allocate(table_id, &region_number_and_partition_exprs)
.allocate(table_id, &region_number_and_partition_exprs, alloc_context)
.await?;
Ok(PhysicalTableRouteValue::new(region_routes))
@@ -133,13 +134,24 @@ impl TableMetadataAllocator {
}
pub async fn create(&self, task: &CreateTableTask) -> Result<TableMetadata> {
self.create_with_context(task, &PeerAllocContext::default())
.await
}
pub async fn create_with_context(
&self,
task: &CreateTableTask,
alloc_context: &PeerAllocContext,
) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(&task.create_table.table_id).await?;
let partition_exprs = task
.partitions
.iter()
.map(|p| p.expression.as_str())
.collect::<Vec<_>>();
let table_route = self.create_table_route(table_id, &partition_exprs).await?;
let table_route = self
.create_table_route(table_id, &partition_exprs, alloc_context)
.await?;
let region_numbers = table_route
.region_routes
.iter()

View File

@@ -353,10 +353,15 @@ impl DdlManager {
pub async fn submit_create_table_task(
&self,
create_table_task: CreateTableTask,
query_context: QueryContext,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
let procedure = CreateTableProcedure::new(create_table_task, context)?;
let procedure = CreateTableProcedure::new_with_query_context(
create_table_task,
query_context,
context,
)?;
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
@@ -593,7 +598,7 @@ impl DdlManager {
debug!("Submitting Ddl task: {:?}", request.task);
match request.task {
CreateTable(create_table_task) => {
handle_create_table_task(self, create_table_task).await
handle_create_table_task(self, create_table_task, request.query_context).await
}
DropTable(drop_table_task) => handle_drop_table_task(self, drop_table_task).await,
AlterTable(alter_table_task) => {
@@ -746,9 +751,10 @@ async fn handle_drop_table_task(
async fn handle_create_table_task(
ddl_manager: &DdlManager,
create_table_task: CreateTableTask,
query_context: QueryContext,
) -> Result<SubmitDdlTaskResponse> {
let (id, output) = ddl_manager
.submit_create_table_task(create_table_task)
.submit_create_table_task(create_table_task, query_context)
.await?;
let procedure_id = id.to_string();

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
pub use api::v1::meta::Peer;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use crate::cluster::NodeInfo;
use crate::error::Error;
use crate::{DatanodeId, FlownodeId};
@@ -48,7 +50,7 @@ pub trait PeerDiscovery: Send + Sync {
///
/// A frontend is considered active if it has reported a heartbeat within the most recent heartbeat interval,
/// as determined by the in-memory backend.
async fn active_frontends(&self) -> Result<Vec<Peer>, Error>;
async fn active_frontends(&self) -> Result<Vec<NodeInfo>, Error>;
/// Returns all currently active datanodes, optionally filtered by a predicate on their workloads.
///
@@ -58,7 +60,7 @@ pub trait PeerDiscovery: Send + Sync {
async fn active_datanodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>, Error>;
) -> Result<Vec<NodeInfo>, Error>;
/// Returns all currently active flownodes, optionally filtered by a predicate on their workloads.
///
@@ -68,23 +70,28 @@ pub trait PeerDiscovery: Send + Sync {
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>, Error>;
) -> Result<Vec<NodeInfo>, Error>;
}
pub type PeerDiscoveryRef = Arc<dyn PeerDiscovery>;
#[derive(Debug, Clone, Default)]
pub struct PeerAllocContext {
pub extensions: HashMap<String, String>,
}
/// [`PeerAllocator`] allocates [`Peer`]s for creating region or flow.
#[async_trait::async_trait]
pub trait PeerAllocator: Send + Sync {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error>;
async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error>;
}
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
#[async_trait::async_trait]
impl<T: PeerAllocator + ?Sized> PeerAllocator for Arc<T> {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error> {
T::alloc(self, num).await
async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error> {
T::alloc(self, num, ctx).await
}
}
@@ -92,7 +99,7 @@ pub struct NoopPeerAllocator;
#[async_trait::async_trait]
impl PeerAllocator for NoopPeerAllocator {
async fn alloc(&self, num: usize) -> Result<Vec<Peer>, Error> {
async fn alloc(&self, num: usize, _ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error> {
Ok(vec![Peer::default(); num])
}
}

View File

@@ -58,6 +58,9 @@ use crate::error::{
};
use crate::key::FlowId;
/// Reserved query-context extension key for the frontend peer address that submitted a DDL request.
pub const ORIGIN_FRONTEND_ADDR_EXTENSION_KEY: &str = "__greptime_origin_frontend.addr";
/// DDL tasks
#[derive(Debug, Clone)]
pub enum DdlTask {

View File

@@ -208,6 +208,7 @@ impl FrontendClient {
meta_client
.active_frontends()
.await
.map(|nodes| nodes.into_iter().map(|node| node.peer).collect())
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

View File

@@ -544,6 +544,7 @@ impl FrontendInvoker {
layered_cache_registry: LayeredCacheRegistryRef,
procedure_executor: ProcedureExecutorRef,
node_manager: NodeManagerRef,
origin_frontend_addr: String,
) -> Result<FrontendInvoker, Error> {
let table_route_cache: TableRouteCacheRef =
layered_cache_registry.get().context(CacheRequiredSnafu {
@@ -589,6 +590,7 @@ impl FrontendInvoker {
inserter.clone(),
partition_manager,
None,
origin_frontend_addr,
));
let invoker = FrontendInvoker::new(inserter, deleter, statement_executor);

View File

@@ -326,6 +326,7 @@ mod tests {
Arc::new(SuspendHandler::new(instance.suspend_state())),
]));
let heartbeat_task = Some(HeartbeatTask::new(
instance.frontend_peer_addr().to_string(),
options,
meta_client,
executor,

View File

@@ -51,20 +51,14 @@ pub struct HeartbeatTask {
impl HeartbeatTask {
pub fn new(
peer_addr: String,
opts: &FrontendOptions,
meta_client: Arc<MetaClient>,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
resource_stat: ResourceStatRef,
) -> Self {
HeartbeatTask {
// if internal grpc is configured, use its address as the peer address
// otherwise use the public grpc address, because peer address only promises to be reachable
// by other components, it doesn't matter whether it's internal or external
peer_addr: if let Some(internal) = &opts.internal_grpc {
addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
} else {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
},
peer_addr,
meta_client,
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
@@ -273,3 +267,14 @@ impl HeartbeatTask {
}
}
}
pub(crate) fn frontend_peer_addr(opts: &FrontendOptions) -> String {
// if internal grpc is configured, use its address as the peer address
// otherwise use the public grpc address, because peer address only promises to be reachable
// by other components, it doesn't matter whether it's internal or external
if let Some(internal) = &opts.internal_grpc {
addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
} else {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
}
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use api::v1::meta::HeartbeatResponse;
use api::v1::meta::{HeartbeatResponse, Role};
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::{
@@ -26,9 +26,14 @@ use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::MetadataKey;
use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
use common_meta::key::table_info::TableInfoKey;
use common_stat::ResourceStatImpl;
use common_telemetry::tracing_context::TracingContext;
use meta_client::client::MetaClient;
use tokio::sync::mpsc;
use super::HeartbeatTask;
use crate::frontend::FrontendOptions;
#[derive(Default)]
pub struct MockKvCacheInvalidator {
inner: Mutex<HashMap<Vec<u8>, i32>>,
@@ -151,3 +156,21 @@ async fn test_invalidate_schema_key_handler() {
)
.await;
}
#[test]
fn test_heartbeat_task_uses_resolved_peer_addr() {
let options = FrontendOptions::default();
let meta_client = Arc::new(MetaClient::new(0, Role::Frontend));
let executor = Arc::new(HandlerGroupExecutor::new(vec![]));
let stat = Arc::new(ResourceStatImpl::default());
let task = HeartbeatTask::new(
"10.0.0.1:4001".to_string(),
&options,
meta_client,
executor,
stat,
);
assert_eq!(task.peer_addr, "10.0.0.1:4001");
}

View File

@@ -110,6 +110,7 @@ lazy_static! {
/// [`servers::query_handler::sql::SqlQueryHandler`], etc.
#[derive(Clone)]
pub struct Instance {
frontend_peer_addr: String,
catalog_manager: CatalogManagerRef,
pipeline_operator: Arc<PipelineOperator>,
statement_executor: Arc<StatementExecutor>,
@@ -131,6 +132,10 @@ pub struct Instance {
}
impl Instance {
pub fn frontend_peer_addr(&self) -> &str {
&self.frontend_peer_addr
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}

View File

@@ -48,6 +48,7 @@ use snafu::{OptionExt, ResultExt};
use crate::error::{self, ExternalSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::frontend::FrontendOptions;
use crate::heartbeat::frontend_peer_addr;
use crate::instance::Instance;
use crate::instance::region_query::FrontendRegionQueryHandler;
@@ -223,6 +224,7 @@ impl FrontendBuilder {
)
.query_engine();
let frontend_peer_addr = frontend_peer_addr(&self.options);
let statement_executor = StatementExecutor::new(
self.catalog_manager.clone(),
query_engine.clone(),
@@ -232,6 +234,7 @@ impl FrontendBuilder {
inserter.clone(),
partition_manager,
Some(process_manager.clone()),
frontend_peer_addr.clone(),
);
let statement_executor =
@@ -265,6 +268,7 @@ impl FrontendBuilder {
))));
Ok(Instance {
frontend_peer_addr,
catalog_manager: self.catalog_manager,
pipeline_operator,
statement_executor,

View File

@@ -45,7 +45,7 @@ use common_meta::error::{
};
use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::peer::PeerDiscovery;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::KeyValue;
@@ -590,7 +590,7 @@ impl ClusterInfo for MetaClient {
// maybe we need to use the timestamp from metasrv in the future.
#[async_trait::async_trait]
impl PeerDiscovery for MetaClient {
async fn active_frontends(&self) -> MetaResult<Vec<Peer>> {
async fn active_frontends(&self) -> MetaResult<Vec<NodeInfo>> {
let nodes = self
.list_nodes(Some(ClusterRole::Frontend))
.await
@@ -608,7 +608,7 @@ impl PeerDiscovery for MetaClient {
async fn active_datanodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> MetaResult<Vec<Peer>> {
) -> MetaResult<Vec<NodeInfo>> {
let nodes = self
.list_nodes(Some(ClusterRole::Datanode))
.await
@@ -625,7 +625,7 @@ impl PeerDiscovery for MetaClient {
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> MetaResult<Vec<Peer>> {
) -> MetaResult<Vec<NodeInfo>> {
let nodes = self
.list_nodes(Some(ClusterRole::Flownode))
.await

View File

@@ -15,7 +15,6 @@
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{ErrorCode, ResponseHeader};
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::peer::Peer;
use common_time::util::SystemTimer;
use tonic::{Code, Status};
@@ -49,14 +48,14 @@ pub(crate) fn alive_frontends(
timer: &impl SystemTimer,
nodes: Vec<NodeInfo>,
active_duration: std::time::Duration,
) -> Vec<Peer> {
) -> Vec<NodeInfo> {
nodes
.into_iter()
.filter_map(|node| {
if matches!(node.status, NodeStatus::Frontend(_))
&& is_active_node(timer, node.last_activity_ts, active_duration)
{
Some(node.peer)
Some(node)
} else {
None
}
@@ -69,17 +68,17 @@ pub(crate) fn alive_datanodes(
nodes: Vec<NodeInfo>,
active_duration: std::time::Duration,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Vec<Peer> {
) -> Vec<NodeInfo> {
let filter = filter.unwrap_or(|_| true);
nodes
.into_iter()
.filter_map(|node| {
if let NodeStatus::Datanode(status) = node.status
if let NodeStatus::Datanode(status) = &node.status
&& is_active_node(timer, node.last_activity_ts, active_duration)
{
let workloads = NodeWorkloads::Datanode(status.workloads);
filter(&workloads).then_some(node.peer)
let workloads = NodeWorkloads::Datanode(status.workloads.clone());
filter(&workloads).then_some(node)
} else {
None
}
@@ -92,17 +91,17 @@ pub(crate) fn alive_flownodes(
nodes: Vec<NodeInfo>,
active_duration: std::time::Duration,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Vec<Peer> {
) -> Vec<NodeInfo> {
let filter = filter.unwrap_or(|_| true);
nodes
.into_iter()
.filter_map(|node| {
if let NodeStatus::Flownode(status) = node.status
if let NodeStatus::Flownode(status) = &node.status
&& is_active_node(timer, node.last_activity_ts, active_duration)
{
let workloads = NodeWorkloads::Flownode(status.workloads);
filter(&workloads).then_some(node.peer)
let workloads = NodeWorkloads::Flownode(status.workloads.clone());
filter(&workloads).then_some(node)
} else {
None
}
@@ -202,7 +201,10 @@ mod tests {
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
peers
.into_iter()
.map(|node| node.peer.id)
.collect::<Vec<_>>()
);
}
@@ -229,7 +231,10 @@ mod tests {
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
peers
.into_iter()
.map(|node| node.peer.id)
.collect::<Vec<_>>()
);
}
@@ -249,7 +254,10 @@ mod tests {
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
peers
.into_iter()
.map(|node| node.peer.id)
.collect::<Vec<_>>()
);
}
@@ -282,7 +290,10 @@ mod tests {
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
peers
.into_iter()
.map(|node| node.peer.id)
.collect::<Vec<_>>()
);
}
}

View File

@@ -18,6 +18,7 @@ pub mod utils;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::cluster::NodeInfo;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::error::Result;
use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
@@ -30,8 +31,8 @@ use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
#[async_trait::async_trait]
impl PeerDiscovery for MetaPeerClient {
async fn active_frontends(&self) -> Result<Vec<Peer>> {
utils::alive_frontends(
async fn active_frontends(&self) -> Result<Vec<NodeInfo>> {
utils::alive_frontend_infos(
&DefaultSystemTimer,
self,
default_distributed_time_constants().frontend_heartbeat_interval,
@@ -44,8 +45,8 @@ impl PeerDiscovery for MetaPeerClient {
async fn active_datanodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
utils::alive_datanodes(
) -> Result<Vec<NodeInfo>> {
utils::alive_datanode_infos(
&DefaultSystemTimer,
self,
default_distributed_time_constants().datanode_lease,
@@ -59,8 +60,8 @@ impl PeerDiscovery for MetaPeerClient {
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
utils::alive_flownodes(
) -> Result<Vec<NodeInfo>> {
utils::alive_flownode_infos(
&DefaultSystemTimer,
self,
default_distributed_time_constants().flownode_lease,

View File

@@ -95,22 +95,22 @@ impl LeaseValueAccessor for MetaPeerClient {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Duration;
use std::collections::HashMap;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads};
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role,
};
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::rpc::store::PutRequest;
use common_time::util::{DefaultSystemTimer, SystemTimer, current_time_millis};
use common_time::util::current_time_millis;
use common_workload::DatanodeWorkloadType;
use crate::discovery::utils::{self, accept_ingest_workload};
use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue};
use crate::discovery::utils::accept_ingest_workload;
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::test_util::create_meta_peer_client;
async fn put_lease_value(
@@ -128,14 +128,10 @@ mod tests {
.unwrap();
}
async fn put_flownode_lease_value(
kv_backend: &ResettableKvBackendRef,
key: FlownodeLeaseKey,
value: LeaseValue,
) {
async fn put_node_info(kv_backend: &ResettableKvBackendRef, key: NodeInfoKey, value: NodeInfo) {
kv_backend
.put(PutRequest {
key: key.try_into().unwrap(),
key: (&key).into(),
value: value.try_into().unwrap(),
prev_kv: false,
})
@@ -143,332 +139,170 @@ mod tests {
.unwrap();
}
struct MockTimer {
current: Arc<AtomicI64>,
}
impl SystemTimer for MockTimer {
fn current_time_millis(&self) -> i64 {
self.current.fetch_add(1, Ordering::Relaxed)
}
fn current_time_rfc3339(&self) -> String {
unimplemented!()
}
}
#[tokio::test]
async fn test_alive_datanodes() {
async fn test_active_datanodes_uses_lease_liveness_with_stale_node_info() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
let lease = default_distributed_time_constants().datanode_lease;
// put a stale lease value for node 1
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
let mut env_vars = HashMap::new();
env_vars.insert("AZ".to_string(), "az-a".to_string());
// put a fresh lease value for node 2
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_datanodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_alive_datanodes_with_timer() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = MockTimer {
current: Arc::new(AtomicI64::new(current_time_millis())),
};
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_datanodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_alive_datanodes_with_condition() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
// put a lease value for node 1 without mode info
let key = DatanodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: timer.current_time_millis() - 20 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 2 with mode info
let key = DatanodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 3 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20203".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
// put a lease value for node 3 with mode info
let key = DatanodeLeaseKey { node_id: 4 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20204".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![i32::MAX],
}),
};
put_lease_value(&in_memory, key, value).await;
let peers = utils::alive_datanodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs),
Some(accept_ingest_workload),
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert!(peers.contains(&Peer::new(2, "127.0.0.1:20202".to_string())));
}
#[tokio::test]
async fn test_alive_flownodes() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
// put a stale lease value for node 1
let key = FlownodeLeaseKey { node_id: 1 };
let value = LeaseValue {
// 20s ago
timestamp_millis: timer.current_time_millis() - lease_secs * 2 * 1000,
node_addr: "127.0.0.1:20201".to_string(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
put_flownode_lease_value(&in_memory, key, value).await;
// put a fresh lease value for node 2
let key = FlownodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_flownodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_alive_flownodes_with_timer() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = MockTimer {
current: Arc::new(AtomicI64::new(current_time_millis())),
};
let key = FlownodeLeaseKey { node_id: 2 };
let value = LeaseValue {
timestamp_millis: timer.current_time_millis(),
node_addr: "127.0.0.1:20202".to_string(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
};
put_flownode_lease_value(&in_memory, key.clone(), value.clone()).await;
let peers = utils::alive_flownodes(
&timer,
client.as_ref(),
Duration::from_secs(lease_secs as u64),
None,
)
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers, vec![Peer::new(2, "127.0.0.1:20202".to_string())]);
}
#[tokio::test]
async fn test_lookup_frontends() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = DefaultSystemTimer;
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
put_node_info(
&in_memory,
NodeInfoKey {
role: Role::Datanode,
node_id: 1,
},
last_activity_ts: timer.current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
NodeInfo {
peer: Peer::new(1, "127.0.0.1:4001".to_string()),
last_activity_ts: current_time_millis() - (lease.as_millis() * 2) as i64,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads: DatanodeWorkloads {
types: vec![i32::MAX],
},
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars,
},
)
.await;
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
put_lease_value(
&in_memory,
DatanodeLeaseKey { node_id: 1 },
LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:4001".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
},
)
.await;
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
let nodes = client
.active_datanodes(Some(accept_ingest_workload))
.await
.unwrap();
let inactive_frontend_node = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts: timer.current_time_millis() - 20 * 1000,
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "1").into(),
value: inactive_frontend_node.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
let peers =
utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].id, 0);
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].peer.id, 1);
assert_eq!(
nodes[0].env_vars.get("AZ").map(String::as_str),
Some("az-a")
);
}
#[tokio::test]
async fn test_lookup_frontends_with_timer() {
async fn test_active_datanodes_returns_node_info_with_env_vars() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease_secs = 10;
let timer = MockTimer {
current: Arc::new(AtomicI64::new(current_time_millis())),
};
let active_frontend_node = NodeInfo {
peer: Peer {
id: 0,
addr: "127.0.0.1:20201".to_string(),
let mut env_vars = HashMap::new();
env_vars.insert("AZ".to_string(), "az-a".to_string());
put_node_info(
&in_memory,
NodeInfoKey {
role: Role::Datanode,
node_id: 1,
},
last_activity_ts: timer.current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: current_time_millis() as u64,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};
let key_prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
in_memory
.put(PutRequest {
key: format!("{}{}", key_prefix, "0").into(),
value: active_frontend_node.try_into().unwrap(),
prev_kv: false,
})
NodeInfo {
peer: Peer::new(1, "127.0.0.1:4001".to_string()),
last_activity_ts: current_time_millis(),
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads: DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
},
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars,
},
)
.await;
put_lease_value(
&in_memory,
DatanodeLeaseKey { node_id: 1 },
LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:4001".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
},
)
.await;
let nodes = client
.active_datanodes(Some(accept_ingest_workload))
.await
.unwrap();
let peers =
utils::alive_frontends(&timer, client.as_ref(), Duration::from_secs(lease_secs))
.await
.unwrap();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].id, 0);
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].peer.id, 1);
assert_eq!(
nodes[0].env_vars.get("AZ").map(String::as_str),
Some("az-a")
);
}
#[tokio::test]
async fn test_active_flownodes_returns_node_info() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
put_node_info(
&in_memory,
NodeInfoKey {
role: Role::Flownode,
node_id: 11,
},
NodeInfo {
peer: Peer::new(11, "127.0.0.1:5001".to_string()),
last_activity_ts: current_time_millis(),
status: NodeStatus::Flownode(FlownodeStatus {
workloads: FlownodeWorkloads { types: vec![7] },
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars: Default::default(),
},
)
.await;
let nodes = client.active_flownodes(None).await.unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].peer.id, 11);
}
#[tokio::test]

View File

@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::DatanodeId;
use common_meta::cluster::NodeInfo;
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_time::util::SystemTimer;
@@ -57,68 +58,137 @@ pub fn build_active_filter<T: LastActiveTs>(active_duration: Duration) -> impl F
}
}
/// Returns the alive datanodes.
pub async fn alive_datanodes(
timer: &impl SystemTimer,
accessor: &impl LeaseValueAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(active_duration);
let condition = condition.unwrap_or(|_| true);
let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?;
let now = timer.current_time_millis();
Ok(lease_values
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
Some(Peer::new(peer_id, lease_value.node_addr))
} else {
None
}
})
.collect::<Vec<_>>())
}
/// Returns the alive flownodes.
pub async fn alive_flownodes(
timer: &impl SystemTimer,
accessor: &impl LeaseValueAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<Peer>> {
let active_filter = build_active_filter(active_duration);
let condition = condition.unwrap_or(|_| true);
let lease_values = accessor.lease_values(LeaseValueType::Flownode).await?;
let now = timer.current_time_millis();
Ok(lease_values
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
Some(Peer::new(peer_id, lease_value.node_addr))
} else {
None
}
})
.collect::<Vec<_>>())
}
/// Returns the alive frontends.
pub async fn alive_frontends(
/// Returns the alive frontend node infos.
pub async fn alive_frontend_infos(
timer: &impl SystemTimer,
lister: &impl NodeInfoAccessor,
active_duration: Duration,
) -> Result<Vec<Peer>> {
) -> Result<Vec<NodeInfo>> {
let active_filter = build_active_filter(active_duration);
let node_infos = lister.node_infos(NodeInfoType::Frontend).await?;
let now = timer.current_time_millis();
Ok(node_infos
.into_iter()
.filter_map(|(_, node_info)| active_filter(now, &node_info).then_some(node_info))
.collect::<Vec<_>>())
}
/// Returns the alive datanode node infos.
pub async fn alive_datanode_infos(
timer: &impl SystemTimer,
accessor: &(impl LeaseValueAccessor + NodeInfoAccessor),
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<NodeInfo>> {
let active_filter = build_active_filter(active_duration);
let condition = condition.unwrap_or(|_| true);
let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?;
let mut node_infos = accessor
.node_infos(NodeInfoType::Datanode)
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let now = timer.current_time_millis();
Ok(lease_values
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if !active_filter(now, &lease_value) || !condition(&lease_value.workloads) {
return None;
}
let peer = Peer::new(peer_id, lease_value.node_addr.clone());
let mut node_info = node_infos
.remove(&peer_id)
.filter(|node_info| matches!(node_info.status, NodeStatus::Datanode(_)))
.unwrap_or_else(|| datanode_node_info_from_lease(peer.clone(), &lease_value));
node_info.peer = peer;
node_info.last_activity_ts = lease_value.timestamp_millis;
if let (NodeStatus::Datanode(status), NodeWorkloads::Datanode(workloads)) =
(&mut node_info.status, &lease_value.workloads)
{
status.workloads = workloads.clone();
}
Some(node_info)
})
.collect::<Vec<_>>())
}
fn datanode_node_info_from_lease(peer: Peer, lease_value: &LeaseValue) -> NodeInfo {
let workloads = match &lease_value.workloads {
NodeWorkloads::Datanode(workloads) => workloads.clone(),
_ => Default::default(),
};
NodeInfo {
peer,
last_activity_ts: lease_value.timestamp_millis,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads,
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars: Default::default(),
}
}
/// Returns the alive flownode node infos.
pub async fn alive_flownode_infos(
timer: &impl SystemTimer,
lister: &impl NodeInfoAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<NodeInfo>> {
alive_node_infos(
timer,
lister,
NodeInfoType::Flownode,
active_duration,
condition,
)
.await
}
async fn alive_node_infos(
timer: &impl SystemTimer,
lister: &impl NodeInfoAccessor,
node_info_type: NodeInfoType,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<NodeInfo>> {
let active_filter = build_active_filter(active_duration);
let node_infos = lister.node_infos(node_info_type).await?;
let now = timer.current_time_millis();
Ok(node_infos
.into_iter()
.filter_map(|(_, node_info)| {
if active_filter(now, &node_info) {
Some(node_info.peer)
} else {
None
if !active_filter(now, &node_info) {
return None;
}
match (&node_info.status, condition) {
(NodeStatus::Datanode(status), Some(condition)) => {
let workloads = NodeWorkloads::Datanode(status.workloads.clone());
condition(&workloads).then_some(node_info)
}
(NodeStatus::Flownode(status), Some(condition)) => {
let workloads = NodeWorkloads::Flownode(status.workloads.clone());
condition(&workloads).then_some(node_info)
}
(NodeStatus::Datanode(_), None) | (NodeStatus::Flownode(_), None) => {
Some(node_info)
}
_ => None,
}
})
.collect::<Vec<_>>())

View File

@@ -17,7 +17,7 @@ use std::collections::HashSet;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::error::{ExternalSnafu, Result as MetaResult};
use common_meta::peer::{Peer, PeerAllocator};
use common_meta::peer::{Peer, PeerAllocContext, PeerAllocator};
use snafu::{ResultExt, ensure};
use crate::discovery::utils::accept_ingest_workload;
@@ -55,7 +55,11 @@ impl MetasrvPeerAllocator {
/// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
/// no guarantee that how the returned peers are used, like whether they are from the same
/// table or not. So this method isn't idempotent.
async fn alloc(&self, min_required_items: usize) -> Result<Vec<Peer>> {
async fn alloc(
&self,
min_required_items: usize,
alloc_context: &PeerAllocContext,
) -> Result<Vec<Peer>> {
if let Some(max_items) = self.max_items {
ensure!(
min_required_items <= max_items as usize,
@@ -71,6 +75,7 @@ impl MetasrvPeerAllocator {
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: Some(accept_ingest_workload),
extensions: alloc_context.extensions.clone(),
},
)
.await
@@ -79,8 +84,8 @@ impl MetasrvPeerAllocator {
#[async_trait]
impl PeerAllocator for MetasrvPeerAllocator {
async fn alloc(&self, regions: usize) -> MetaResult<Vec<Peer>> {
self.alloc(regions)
async fn alloc(&self, regions: usize, ctx: &PeerAllocContext) -> MetaResult<Vec<Peer>> {
self.alloc(regions, ctx)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)

View File

@@ -21,6 +21,7 @@ use common_meta::ddl::create_table::template::{
};
use common_meta::lock_key::TableLock;
use common_meta::node_manager::NodeManagerRef;
use common_meta::peer::PeerAllocContext;
use common_meta::rpc::router::RegionRoute;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{debug, info};
@@ -156,6 +157,7 @@ impl ExecutePlan {
.iter()
.map(|(n, p)| (*n, p.as_str()))
.collect::<Vec<_>>(),
&PeerAllocContext::default(),
)
.await
.context(error::AllocateRegionRoutesSnafu { table_id })?;

View File

@@ -620,6 +620,7 @@ impl RegionSupervisor {
allow_duplication: true,
exclude_peer_ids,
workload_filter: Some(accept_ingest_workload),
extensions: Default::default(),
};
let peers = selector.select(&self.selector_context, opt).await?;
ensure!(

View File

@@ -20,7 +20,7 @@ pub mod round_robin;
pub(crate) mod test_utils;
pub mod weight_compute;
pub mod weighted_choose;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use serde::{Deserialize, Serialize};
@@ -66,6 +66,8 @@ pub struct SelectorOptions {
pub exclude_peer_ids: HashSet<u64>,
/// The filter to select the peers based on their workloads.
pub workload_filter: Option<fn(&NodeWorkloads) -> bool>,
/// Additional placement context, such as the frontend identity that originated the request.
pub extensions: HashMap<String, String>,
}
impl Default for SelectorOptions {
@@ -75,6 +77,7 @@ impl Default for SelectorOptions {
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
extensions: HashMap::new(),
}
}
}

View File

@@ -86,7 +86,7 @@ where
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use common_meta::peer::Peer;
@@ -140,6 +140,7 @@ mod tests {
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
extensions: HashMap::new(),
};
let selected_peers: HashSet<_> =
@@ -156,6 +157,7 @@ mod tests {
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
extensions: HashMap::new(),
};
let selected_result =
@@ -168,6 +170,7 @@ mod tests {
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
extensions: HashMap::new(),
};
let selected_peers =

View File

@@ -36,7 +36,10 @@ impl Selector for LeaseBasedSelector {
.peer_discovery
.active_datanodes(opts.workload_filter)
.await
.context(ListActiveDatanodesSnafu)?;
.context(ListActiveDatanodesSnafu)?
.into_iter()
.map(|node| node.peer)
.collect::<Vec<_>>();
// 2. compute weight array, but the weight of each item is the same.
let mut weight_array = alive_datanodes

View File

@@ -55,7 +55,10 @@ where
.peer_discovery
.active_datanodes(opts.workload_filter)
.await
.context(ListActiveDatanodesSnafu)?;
.context(ListActiveDatanodesSnafu)?
.into_iter()
.map(|node| node.peer)
.collect::<Vec<_>>();
// 2. get stat kvs and filter out expired datanodes.
let stat_keys = alive_datanodes

View File

@@ -65,6 +65,7 @@ impl RoundRobinSelector {
// 2. filter out excluded datanodes.
alive_datanodes
.into_iter()
.map(|node| node.peer)
.filter(|p| !opts.exclude_peer_ids.contains(&p.id))
.collect::<Vec<_>>()
}
@@ -72,7 +73,10 @@ impl RoundRobinSelector {
.peer_discovery
.active_flownodes(opts.workload_filter)
.await
.context(ListActiveFlownodesSnafu)?,
.context(ListActiveFlownodesSnafu)?
.into_iter()
.map(|node| node.peer)
.collect::<Vec<_>>(),
};
ensure!(
@@ -150,6 +154,7 @@ mod test {
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
extensions: Default::default(),
},
)
.await
@@ -168,6 +173,7 @@ mod test {
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
workload_filter: None,
extensions: Default::default(),
},
)
.await
@@ -210,6 +216,7 @@ mod test {
allow_duplication: true,
exclude_peer_ids: HashSet::from([2, 5]),
workload_filter: None,
extensions: Default::default(),
},
)
.await

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use api::v1::meta::DatanodeWorkloads;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -57,6 +58,7 @@ pub(crate) fn create_meta_peer_client() -> MetaPeerClientRef {
pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanodes: Vec<Peer>) {
let backend = meta_peer_client.memory_backend();
for datanode in datanodes {
let peer = datanode.clone();
let lease_key = DatanodeLeaseKey {
node_id: datanode.id,
};
@@ -69,9 +71,45 @@ pub(crate) async fn put_datanodes(meta_peer_client: &MetaPeerClientRef, datanode
};
let lease_key_bytes: Vec<u8> = lease_key.try_into().unwrap();
let lease_value_bytes: Vec<u8> = lease_value.try_into().unwrap();
backend
.put(common_meta::rpc::store::PutRequest {
key: lease_key_bytes,
value: lease_value_bytes,
..Default::default()
})
.await
.unwrap();
let workloads = DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid.to_i32()],
};
let node_info = NodeInfo {
peer,
last_activity_ts: time_util::current_time_millis(),
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads,
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars: Default::default(),
};
let node_info_key = NodeInfoKey {
role: Role::Datanode,
node_id: datanode.id,
};
let put_request = common_meta::rpc::store::PutRequest {
key: lease_key_bytes,
value: lease_value_bytes,
key: (&node_info_key).into(),
value: node_info.try_into().unwrap(),
..Default::default()
};
backend.put(put_request).await.unwrap();

View File

@@ -105,7 +105,7 @@ impl DatabaseOperator {
let urls = frontends
.into_iter()
.map(|peer| peer.addr)
.map(|node| node.peer.addr)
.collect::<Vec<_>>();
debug!("Available frontend addresses: {:?}", urls);

View File

@@ -118,6 +118,7 @@ pub struct StatementExecutor {
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
process_manager: Option<ProcessManagerRef>,
origin_frontend_addr: String,
#[cfg(feature = "enterprise")]
trigger_querier: Option<TriggerQuerierRef>,
}
@@ -153,6 +154,7 @@ impl StatementExecutor {
inserter: InserterRef,
partition_manager: PartitionRuleManagerRef,
process_manager: Option<ProcessManagerRef>,
origin_frontend_addr: String,
) -> Self {
Self {
catalog_manager,
@@ -165,6 +167,7 @@ impl StatementExecutor {
cache_invalidator,
inserter,
process_manager,
origin_frontend_addr,
#[cfg(feature = "enterprise")]
trigger_querier: None,
}

View File

@@ -105,7 +105,7 @@ use crate::error::{
use crate::expr_helper::{self, RepartitionRequest};
use crate::statement::StatementExecutor;
use crate::statement::show::create_partitions_stmt;
use crate::utils::to_meta_query_context;
use crate::utils::{to_meta_query_context, to_meta_query_context_with_origin_frontend};
#[derive(Debug, Clone, Copy)]
struct DdlSubmitOptions {
@@ -1824,7 +1824,7 @@ impl StatementExecutor {
.collect::<Result<Vec<_>>>()?;
let request = SubmitDdlTaskRequest::new(
to_meta_query_context(query_context),
to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr),
DdlTask::new_create_table(create_table, partitions, table_info),
);
@@ -1840,7 +1840,7 @@ impl StatementExecutor {
query_context: QueryContextRef,
) -> Result<SubmitDdlTaskResponse> {
let request = SubmitDdlTaskRequest::new(
to_meta_query_context(query_context),
to_meta_query_context_with_origin_frontend(query_context, &self.origin_frontend_addr),
DdlTask::new_create_logical_tables(tables_data),
);

View File

@@ -34,6 +34,18 @@ pub fn to_meta_query_context(
}
}
pub fn to_meta_query_context_with_origin_frontend(
query_context: QueryContextRef,
origin_frontend_addr: &str,
) -> common_meta::rpc::ddl::QueryContext {
let mut meta_query_context = to_meta_query_context(query_context);
meta_query_context.extensions.insert(
common_meta::rpc::ddl::ORIGIN_FRONTEND_ADDR_EXTENSION_KEY.to_string(),
origin_frontend_addr.to_string(),
);
meta_query_context
}
pub fn try_to_session_query_context(
value: common_meta::rpc::ddl::QueryContext,
) -> Result<session::context::QueryContext, Error> {
@@ -57,10 +69,14 @@ mod tests {
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use common_meta::rpc::ddl::ORIGIN_FRONTEND_ADDR_EXTENSION_KEY;
use common_time::Timezone;
use session::context::QueryContextBuilder;
use super::{to_meta_query_context, try_to_session_query_context};
use super::{
to_meta_query_context, to_meta_query_context_with_origin_frontend,
try_to_session_query_context,
};
#[test]
fn test_query_context_meta_roundtrip_with_sequences() {
@@ -84,4 +100,26 @@ mod tests {
assert_eq!(roundtrip.sst_min_sequences(), HashMap::from([(10, 90)]));
assert_eq!(roundtrip.extension("flow.return_region_seq"), Some("true"));
}
#[test]
fn test_meta_query_context_with_origin_frontend_overrides_reserved_key() {
let session_ctx = Arc::new(
QueryContextBuilder::default()
.set_extension(
ORIGIN_FRONTEND_ADDR_EXTENSION_KEY.to_string(),
"spoofed".to_string(),
)
.build(),
);
let meta_ctx = to_meta_query_context_with_origin_frontend(session_ctx, "127.0.0.1:4000");
assert_eq!(
meta_ctx
.extensions
.get(ORIGIN_FRONTEND_ADDR_EXTENSION_KEY)
.map(String::as_str),
Some("127.0.0.1:4000")
);
}
}

View File

@@ -401,7 +401,7 @@ impl GreptimeDbClusterBuilder {
expected_datanodes: usize,
) {
for _ in 0..100 {
let alive_datanodes = discovery::utils::alive_datanodes(
let alive_datanodes = discovery::utils::alive_datanode_infos(
&DefaultSystemTimer,
meta_peer_client.as_ref(),
Duration::from_secs(u64::MAX),

View File

@@ -286,6 +286,7 @@ impl GreptimeDbStandaloneBuilder {
cache_registry.clone(),
procedure_executor.clone(),
node_manager.clone(),
instance.frontend_peer_addr().to_string(),
)
.await
.context(StartFlownodeSnafu)