mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
Compare commits
1 Commits
feat/bulk-
...
flow/choos
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2c7dfcaba |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4312,6 +4312,7 @@ dependencies = [
|
|||||||
"prometheus",
|
"prometheus",
|
||||||
"prost 0.13.5",
|
"prost 0.13.5",
|
||||||
"query",
|
"query",
|
||||||
|
"rand 0.9.0",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"servers",
|
"servers",
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ partition.workspace = true
|
|||||||
prometheus.workspace = true
|
prometheus.workspace = true
|
||||||
prost.workspace = true
|
prost.workspace = true
|
||||||
query.workspace = true
|
query.workspace = true
|
||||||
|
rand.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
servers.workspace = true
|
servers.workspace = true
|
||||||
session.workspace = true
|
session.workspace = true
|
||||||
|
|||||||
@@ -27,8 +27,9 @@ use common_meta::peer::Peer;
|
|||||||
use common_meta::rpc::store::RangeRequest;
|
use common_meta::rpc::store::RangeRequest;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::warn;
|
use common_telemetry::warn;
|
||||||
use itertools::Itertools;
|
|
||||||
use meta_client::client::MetaClient;
|
use meta_client::client::MetaClient;
|
||||||
|
use rand::rng;
|
||||||
|
use rand::seq::SliceRandom;
|
||||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
@@ -201,17 +202,17 @@ impl FrontendClient {
|
|||||||
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
|
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
for retry in 0..GRPC_MAX_RETRIES {
|
for retry in 0..GRPC_MAX_RETRIES {
|
||||||
let frontends = self.scan_for_frontend().await?;
|
let mut frontends = self.scan_for_frontend().await?;
|
||||||
let now_in_ms = SystemTime::now()
|
let now_in_ms = SystemTime::now()
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_millis() as i64;
|
.as_millis() as i64;
|
||||||
|
// shuffle the frontends to avoid always pick the same one
|
||||||
|
frontends.shuffle(&mut rng());
|
||||||
|
|
||||||
// found node with maximum last_activity_ts
|
// found node with maximum last_activity_ts
|
||||||
for (_, node_info) in frontends
|
for (_, node_info) in frontends
|
||||||
.iter()
|
.iter()
|
||||||
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
|
|
||||||
.rev()
|
|
||||||
// filter out frontend that have been down for more than 1 min
|
// filter out frontend that have been down for more than 1 min
|
||||||
.filter(|(_, node_info)| {
|
.filter(|(_, node_info)| {
|
||||||
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
|
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
|
||||||
|
|||||||
Reference in New Issue
Block a user