feat: get metasrv clusterinfo (#3696)

* feat: add doc for MetasrvOptions

* feat: register candidate before election

* feat: get all peers metasrv

* chore: simply code

* chore: proto rev

* Update src/common/meta/src/cluster.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* Update src/meta-client/src/client.rs

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* fmt

Signed-off-by: tison <wander4096@gmail.com>

* Apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* impl<T: AsRef<[u8]>> From<T> for LeaderValue

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: tison <wander4096@gmail.com>
This commit is contained in:
Jeremyhi
2024-04-15 16:10:48 +08:00
committed by GitHub
parent 7a04bfe50a
commit 87795248dd
9 changed files with 289 additions and 61 deletions

2
Cargo.lock generated
View File

@@ -3775,7 +3775,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e#b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=8da84a04b137c4104262459807eab1c04b92f3cc#8da84a04b137c4104262459807eab1c04b92f3cc"
dependencies = [
"prost 0.12.3",
"serde",

View File

@@ -104,7 +104,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b97efbf92a0bf9abcfa1d8fe0ffe8741a2e7309e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "8da84a04b137c4104262459807eab1c04b92f3cc" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -50,11 +50,13 @@ pub trait ClusterInfo {
}
/// The key of [NodeInfo] in the storage. The format is `__meta_cluster_node_info-{cluster_id}-{role}-{node_id}`.
/// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have
/// a `cluster_id`, it serves multiple clusters.
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct NodeInfoKey {
/// The cluster id.
pub cluster_id: u64,
/// The role of the node. It can be [Role::Datanode], [Role::Frontend], or [Role::Metasrv].
/// The role of the node. It can be `[Role::Datanode]` or `[Role::Frontend]`.
pub role: Role,
/// The node id.
pub node_id: u64,

View File

@@ -26,6 +26,9 @@ use api::v1::meta::Role;
use cluster::Client as ClusterClient;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{
ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
@@ -46,8 +49,9 @@ use snafu::{OptionExt, ResultExt};
use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::{ConvertMetaResponseSnafu, Result};
use crate::error::{
ConvertMetaRequestSnafu, ConvertMetaResponseSnafu, Error, NotStartedSnafu, Result,
};
pub type Id = (u64, u64);
@@ -239,6 +243,57 @@ impl ProcedureExecutor for MetaClient {
}
}
#[async_trait::async_trait]
impl ClusterInfo for MetaClient {
type Error = Error;
async fn list_nodes(&self, role: Option<ClusterRole>) -> Result<Vec<NodeInfo>> {
let cluster_client = self.cluster_client()?;
let (get_metasrv_nodes, nodes_key_prefix) = match role {
None => (
true,
Some(NodeInfoKey::key_prefix_with_cluster_id(self.id.0)),
),
Some(ClusterRole::Metasrv) => (true, None),
Some(role) => (
false,
Some(NodeInfoKey::key_prefix_with_role(self.id.0, role)),
),
};
let mut nodes = if get_metasrv_nodes {
let last_activity_ts = -1; // Metasrv does not provide this information.
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
followers
.into_iter()
.map(|peer| NodeInfo {
peer,
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
})
.chain(leader.into_iter().map(|leader| NodeInfo {
peer: leader,
last_activity_ts,
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
}))
.collect::<Vec<_>>()
} else {
Vec::new()
};
if let Some(prefix) = nodes_key_prefix {
let req = RangeRequest::new().with_prefix(prefix);
let res = cluster_client.range(req).await?;
for kv in res.kvs {
nodes.push(NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu)?);
}
}
Ok(nodes)
}
}
impl MetaClient {
pub fn new(id: Id) -> Self {
Self {
@@ -405,40 +460,46 @@ impl MetaClient {
) -> Result<SubmitDdlTaskResponse> {
let res = self
.procedure_client()?
.submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?)
.submit_ddl_task(req.try_into().context(ConvertMetaRequestSnafu)?)
.await?
.try_into()
.context(error::ConvertMetaResponseSnafu)?;
.context(ConvertMetaResponseSnafu)?;
Ok(res)
}
#[inline]
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
self.heartbeat.clone().context(NotStartedSnafu {
name: "heartbeat_client",
})
}
#[inline]
pub fn store_client(&self) -> Result<StoreClient> {
self.store.clone().context(error::NotStartedSnafu {
self.store.clone().context(NotStartedSnafu {
name: "store_client",
})
}
#[inline]
pub fn lock_client(&self) -> Result<LockClient> {
self.lock.clone().context(error::NotStartedSnafu {
self.lock.clone().context(NotStartedSnafu {
name: "lock_client",
})
}
#[inline]
pub fn procedure_client(&self) -> Result<ProcedureClient> {
self.procedure
.clone()
.context(error::NotStartedSnafu { name: "ddl_client" })
self.procedure.clone().context(NotStartedSnafu {
name: "procedure_client",
})
}
pub fn cluster_client(&self) -> Result<ClusterClient> {
self.cluster.clone().context(NotStartedSnafu {
name: "cluster_client",
})
}
#[inline]
@@ -460,7 +521,7 @@ mod tests {
use meta_srv::Result as MetaResult;
use super::*;
use crate::mocks;
use crate::{error, mocks};
const TEST_KEY_PREFIX: &str = "__unit_test__meta__";

View File

@@ -17,10 +17,9 @@ use std::sync::Arc;
use api::greptime_proto::v1;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{ResponseHeader, Role};
use api::v1::meta::{MetasrvPeersRequest, ResponseHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::cluster;
use common_meta::cluster::{ClusterInfo, NodeInfo, NodeInfoKey};
use common_meta::peer::Peer;
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse, RangeRequest, RangeResponse};
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
@@ -72,27 +71,10 @@ impl Client {
let inner = self.inner.read().await;
inner.batch_get(req).await
}
}
#[async_trait::async_trait]
impl ClusterInfo for Client {
type Error = Error;
async fn list_nodes(&self, role: Option<cluster::Role>) -> Result<Vec<NodeInfo>> {
let cluster_id = self.inner.read().await.id.0;
let key_prefix = match role {
None => NodeInfoKey::key_prefix_with_cluster_id(cluster_id),
Some(role) => NodeInfoKey::key_prefix_with_role(cluster_id, role),
};
let req = RangeRequest::new().with_prefix(key_prefix);
let res = self.range(req).await?;
res.kvs
.into_iter()
.map(|kv| NodeInfo::try_from(kv.value).context(ConvertMetaResponseSnafu))
.collect::<Result<Vec<_>>>()
pub async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
let inner = self.inner.read().await;
inner.get_metasrv_peers().await
}
}
@@ -239,4 +221,27 @@ impl Inner {
.try_into()
.context(ConvertMetaResponseSnafu)
}
async fn get_metasrv_peers(&self) -> Result<(Option<Peer>, Vec<Peer>)> {
self.with_retry(
"get_metasrv_peers",
move |mut client| {
let inner_req = tonic::Request::new(MetasrvPeersRequest::default());
async move {
client
.metasrv_peers(inner_req)
.await
.map(|res| res.into_inner())
}
},
|res| &res.header,
)
.await
.map(|res| {
let leader = res.leader.map(|x| x.into());
let peers = res.followers.into_iter().map(|x| x.into()).collect();
(leader, peers)
})
}
}

View File

@@ -21,8 +21,10 @@ use etcd_client::LeaderKey;
use tokio::sync::broadcast::Receiver;
use crate::error::Result;
use crate::metasrv::LeaderValue;
pub const ELECTION_KEY: &str = "__metasrv_election";
pub const CANDIDATES_ROOT: &str = "__metasrv_election_candidates/";
#[derive(Debug, Clone)]
pub enum LeaderChangeMessage {
@@ -65,6 +67,12 @@ pub trait Election: Send + Sync {
/// note: a new leader will only return true on the first call.
fn in_infancy(&self) -> bool;
/// Registers a candidate for the election.
async fn register_candidate(&self) -> Result<()>;
/// Gets all candidates in the election.
async fn all_candidates(&self) -> Result<Vec<LeaderValue>>;
/// Campaign waits to acquire leadership in an election.
///
/// Multiple sessions can participate in the election,

View File

@@ -18,13 +18,13 @@ use std::time::Duration;
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
use common_telemetry::{error, info, warn};
use etcd_client::Client;
use etcd_client::{Client, GetOptions, PutOptions};
use snafu::{OptionExt, ResultExt};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::broadcast::Receiver;
use crate::election::{Election, LeaderChangeMessage, ELECTION_KEY};
use crate::election::{Election, LeaderChangeMessage, CANDIDATES_ROOT, ELECTION_KEY};
use crate::error;
use crate::error::Result;
use crate::metasrv::{ElectionRef, LeaderValue};
@@ -105,11 +105,15 @@ impl EtcdElection {
}
fn election_key(&self) -> String {
if self.store_key_prefix.is_empty() {
ELECTION_KEY.to_string()
} else {
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
format!("{}{}", self.store_key_prefix, ELECTION_KEY)
}
fn candidate_root(&self) -> String {
format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
}
fn candidate_key(&self) -> String {
format!("{}{}", self.candidate_root(), self.leader_value)
}
}
@@ -127,6 +131,61 @@ impl Election for EtcdElection {
.is_ok()
}
async fn register_candidate(&self) -> Result<()> {
const CANDIDATE_LEASE_SECS: u64 = 600;
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
let mut lease_client = self.client.lease_client();
let res = lease_client
.grant(CANDIDATE_LEASE_SECS as i64, None)
.await
.context(error::EtcdFailedSnafu)?;
let lease_id = res.id();
// The register info: key is the candidate key, value is its leader value.
let key = self.candidate_key().into_bytes();
let value = self.leader_value.clone().into_bytes();
// Puts with the lease id
self.client
.kv_client()
.put(key, value, Some(PutOptions::new().with_lease(lease_id)))
.await
.context(error::EtcdFailedSnafu)?;
let (mut keeper, mut receiver) = lease_client
.keep_alive(lease_id)
.await
.context(error::EtcdFailedSnafu)?;
let mut keep_alive_interval =
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS));
loop {
let _ = keep_alive_interval.tick().await;
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
if let Some(res) = receiver.message().await.context(error::EtcdFailedSnafu)? {
if res.ttl() <= 0 {
// Failed to keep alive, just break the loop.
break;
}
}
}
Ok(())
}
async fn all_candidates(&self) -> Result<Vec<LeaderValue>> {
let key = self.candidate_root().into_bytes();
let res = self
.client
.kv_client()
.get(key, Some(GetOptions::new().with_prefix()))
.await
.context(error::EtcdFailedSnafu)?;
res.kvs().iter().map(|kv| Ok(kv.value().into())).collect()
}
async fn campaign(&self) -> Result<()> {
let mut lease_client = self.client.lease_client();
let mut election_client = self.client.election_client();
@@ -201,7 +260,7 @@ impl Election for EtcdElection {
async fn leader(&self) -> Result<LeaderValue> {
if self.is_leader.load(Ordering::Relaxed) {
Ok(LeaderValue(self.leader_value.clone()))
Ok(self.leader_value.as_bytes().into())
} else {
let res = self
.client
@@ -210,8 +269,7 @@ impl Election for EtcdElection {
.await
.context(error::EtcdFailedSnafu)?;
let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
let leader_value = String::from_utf8_lossy(leader_value).to_string();
Ok(LeaderValue(leader_value))
Ok(leader_value.into())
}
}

View File

@@ -64,21 +64,38 @@ pub const METASRV_HOME: &str = "/tmp/metasrv";
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
/// The address the server listens on.
pub bind_addr: String,
/// The address the server advertises to the clients.
pub server_addr: String,
/// The address of the store, e.g., etcd.
pub store_addr: String,
/// The type of selector.
pub selector: SelectorType,
/// Whether to use the memory store.
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
/// The HTTP server options.
pub http: HttpOptions,
/// The logging options.
pub logging: LoggingOptions,
/// The procedure options.
pub procedure: ProcedureConfig,
/// The failure detector options.
pub failure_detector: PhiAccrualFailureDetectorOptions,
/// The datanode options.
pub datanode: DatanodeOptions,
/// Whether to enable telemetry.
pub enable_telemetry: bool,
/// The data home directory.
pub data_home: String,
/// The WAL options.
pub wal: MetasrvWalConfig,
/// The metrics export options.
pub export_metrics: ExportMetricsOption,
/// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
/// This is useful when multiple metasrv clusters share the same store.
pub store_key_prefix: String,
/// The max operations per txn
///
@@ -191,6 +208,13 @@ impl Context {
pub struct LeaderValue(pub String);
impl<T: AsRef<[u8]>> From<T> for LeaderValue {
fn from(value: T) -> Self {
let string = String::from_utf8_lossy(value.as_ref());
Self(string.to_string())
}
}
#[derive(Clone)]
pub struct SelectorContext {
pub server_addr: String,
@@ -341,18 +365,35 @@ impl Metasrv {
state_handler.on_become_follower().await;
});
let election = election.clone();
let started = self.started.clone();
let _handle = common_runtime::spawn_bg(async move {
while started.load(Ordering::Relaxed) {
let res = election.campaign().await;
if let Err(e) = res {
warn!("Metasrv election error: {}", e);
// Register candidate and keep lease in background.
{
let election = election.clone();
let started = self.started.clone();
let _handle = common_runtime::spawn_bg(async move {
while started.load(Ordering::Relaxed) {
let res = election.register_candidate().await;
if let Err(e) = res {
warn!("Metasrv register candidate error: {}", e);
}
}
info!("Metasrv re-initiate election");
}
info!("Metasrv stopped");
});
});
}
// Campaign
{
let election = election.clone();
let started = self.started.clone();
let _handle = common_runtime::spawn_bg(async move {
while started.load(Ordering::Relaxed) {
let res = election.campaign().await;
if let Err(e) = res {
warn!("Metasrv election error: {}", e);
}
info!("Metasrv re-initiate election");
}
info!("Metasrv stopped");
});
}
} else {
if let Err(e) = self.wal_options_allocator.start().await {
error!(e; "Failed to start wal options allocator");

View File

@@ -14,7 +14,8 @@
use api::v1::meta::{
cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
Error, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
Error, MetasrvPeersRequest, MetasrvPeersResponse, Peer, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse, ResponseHeader,
};
use common_telemetry::warn;
use snafu::ResultExt;
@@ -34,7 +35,7 @@ impl cluster_server::Cluster for Metasrv {
..Default::default()
};
warn!("The current meta is not leader, but a batch_get request have reached the meta. Detail: {:?}.", req);
warn!("The current meta is not leader, but a `batch_get` request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}
@@ -57,7 +58,7 @@ impl cluster_server::Cluster for Metasrv {
..Default::default()
};
warn!("The current meta is not leader, but a range request have reached the meta. Detail: {:?}.", req);
warn!("The current meta is not leader, but a `range` request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}
@@ -71,6 +72,58 @@ impl cluster_server::Cluster for Metasrv {
let resp = res.to_proto_resp(ResponseHeader::success(0));
Ok(Response::new(resp))
}
async fn metasrv_peers(
&self,
req: Request<MetasrvPeersRequest>,
) -> GrpcResult<MetasrvPeersResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader());
let resp = MetasrvPeersResponse {
header: Some(is_not_leader),
..Default::default()
};
warn!("The current meta is not leader, but a `metasrv_peers` request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}
let (leader, followers) = match self.election() {
Some(election) => {
let leader = election.leader().await?;
let peers = election.all_candidates().await?;
let followers = peers
.into_iter()
.filter(|peer| peer.0 != leader.0)
.map(|peer| Peer {
addr: peer.0,
..Default::default()
})
.collect();
(
Some(Peer {
addr: leader.0,
..Default::default()
}),
followers,
)
}
None => (
Some(Peer {
addr: self.options().server_addr.clone(),
..Default::default()
}),
vec![],
),
};
let resp = MetasrvPeersResponse {
header: Some(ResponseHeader::success(0)),
leader,
followers,
};
Ok(Response::new(resp))
}
}
impl Metasrv {