From e2c7dfcaba2000ec99341a311c856f70bfd570ff Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 9 May 2025 17:05:30 +0800 Subject: [PATCH] fix: choose frontend randomly --- Cargo.lock | 1 + src/flow/Cargo.toml | 1 + src/flow/src/batching_mode/frontend_client.rs | 9 +++++---- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e5c85eba7..bd42003c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4312,6 +4312,7 @@ dependencies = [ "prometheus", "prost 0.13.5", "query", + "rand 0.9.0", "serde", "serde_json", "servers", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 27566c8bd5..18e29d62af 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -59,6 +59,7 @@ partition.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true +rand.workspace = true serde.workspace = true servers.workspace = true session.workspace = true diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index bf7a843ae1..04e8975d72 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -27,8 +27,9 @@ use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; use common_telemetry::warn; -use itertools::Itertools; use meta_client::client::MetaClient; +use rand::rng; +use rand::seq::SliceRandom; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -201,17 +202,17 @@ impl FrontendClient { let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT); interval.tick().await; for retry in 0..GRPC_MAX_RETRIES { - let frontends = self.scan_for_frontend().await?; + let mut frontends = self.scan_for_frontend().await?; let now_in_ms = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_millis() as i64; + // shuffle the frontends to avoid always pick the same one + frontends.shuffle(&mut rng()); // found node with maximum last_activity_ts for (_, node_info) in frontends .iter() - .sorted_by_key(|(_, node_info)| node_info.last_activity_ts) - .rev() // filter out frontend that have been down for more than 1 min .filter(|(_, node_info)| { node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64