mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat: add metasrv start time to node info (#3883)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3894,7 +3894,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=47af36e2b218bdb09fa3a84a31999245152db2ee#47af36e2b218bdb09fa3a84a31999245152db2ee"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e152fcbf173b8759dd8a91ce7f6f4b0ca987828e#e152fcbf173b8759dd8a91ce7f6f4b0ca987828e"
|
||||
dependencies = [
|
||||
"prost 0.12.4",
|
||||
"serde",
|
||||
|
||||
@@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "47af36e2b218bdb09fa3a84a31999245152db2ee" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e152fcbf173b8759dd8a91ce7f6f4b0ca987828e" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -264,7 +264,6 @@ impl ClusterInfo for MetaClient {
|
||||
|
||||
let mut nodes = if get_metasrv_nodes {
|
||||
let last_activity_ts = -1; // Metasrv does not provide this information.
|
||||
let start_time_ms = 0;
|
||||
|
||||
let (leader, followers) = cluster_client.get_metasrv_peers().await?;
|
||||
followers
|
||||
@@ -275,7 +274,7 @@ impl ClusterInfo for MetaClient {
|
||||
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
|
||||
version: node.version,
|
||||
git_commit: node.git_commit,
|
||||
start_time_ms,
|
||||
start_time_ms: node.start_time_ms,
|
||||
})
|
||||
.chain(leader.into_iter().map(|node| NodeInfo {
|
||||
peer: node.peer.map(|p| p.into()).unwrap_or_default(),
|
||||
@@ -283,7 +282,7 @@ impl ClusterInfo for MetaClient {
|
||||
status: NodeStatus::Metasrv(MetasrvStatus { is_leader: true }),
|
||||
version: node.version,
|
||||
git_commit: node.git_commit,
|
||||
start_time_ms,
|
||||
start_time_ms: node.start_time_ms,
|
||||
}))
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
|
||||
@@ -68,7 +68,7 @@ pub trait Election: Send + Sync {
|
||||
fn in_infancy(&self) -> bool;
|
||||
|
||||
/// Registers a candidate for the election.
|
||||
async fn register_candidate(&self) -> Result<()>;
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()>;
|
||||
|
||||
/// Gets all candidates in the election.
|
||||
async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>>;
|
||||
|
||||
@@ -131,7 +131,7 @@ impl Election for EtcdElection {
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
async fn register_candidate(&self) -> Result<()> {
|
||||
async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
|
||||
const CANDIDATE_LEASE_SECS: u64 = 600;
|
||||
const KEEP_ALIVE_INTERVAL_SECS: u64 = CANDIDATE_LEASE_SECS / 2;
|
||||
|
||||
@@ -144,15 +144,9 @@ impl Election for EtcdElection {
|
||||
|
||||
// The register info: key is the candidate key, value is its node info(addr, version, git_commit).
|
||||
let key = self.candidate_key().into_bytes();
|
||||
let build_info = common_version::build_info();
|
||||
let value = MetasrvNodeInfo {
|
||||
addr: self.leader_value.clone(),
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
};
|
||||
let value = serde_json::to_string(&value)
|
||||
let value = serde_json::to_string(node_info)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: format!("{value:?}"),
|
||||
input: format!("{node_info:?}"),
|
||||
})?
|
||||
.into_bytes();
|
||||
// Puts with the lease id
|
||||
|
||||
@@ -225,6 +225,8 @@ pub struct MetasrvNodeInfo {
|
||||
pub version: String,
|
||||
// The node build git commit hash
|
||||
pub git_commit: String,
|
||||
// The node start timestamp in milliseconds
|
||||
pub start_time_ms: u64,
|
||||
}
|
||||
|
||||
impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
|
||||
@@ -236,6 +238,7 @@ impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
|
||||
}),
|
||||
version: node_info.version,
|
||||
git_commit: node_info.git_commit,
|
||||
start_time_ms: node_info.start_time_ms,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -305,6 +308,7 @@ impl MetaStateHandler {
|
||||
pub struct Metasrv {
|
||||
state: StateRef,
|
||||
started: Arc<AtomicBool>,
|
||||
start_time_ms: u64,
|
||||
options: MetasrvOptions,
|
||||
// It is only valid at the leader node and is used to temporarily
|
||||
// store some data that will not be persisted.
|
||||
@@ -339,7 +343,11 @@ impl Metasrv {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.create_default_schema_if_not_exist().await?;
|
||||
// Creates default schema if not exists
|
||||
self.table_metadata_manager
|
||||
.init()
|
||||
.await
|
||||
.context(InitMetadataSnafu)?;
|
||||
|
||||
if let Some(election) = self.election() {
|
||||
let procedure_manager = self.procedure_manager.clone();
|
||||
@@ -394,9 +402,10 @@ impl Metasrv {
|
||||
{
|
||||
let election = election.clone();
|
||||
let started = self.started.clone();
|
||||
let node_info = self.node_info();
|
||||
let _handle = common_runtime::spawn_bg(async move {
|
||||
while started.load(Ordering::Relaxed) {
|
||||
let res = election.register_candidate().await;
|
||||
let res = election.register_candidate(&node_info).await;
|
||||
if let Err(e) = res {
|
||||
warn!("Metasrv register candidate error: {}", e);
|
||||
}
|
||||
@@ -435,14 +444,8 @@ impl Metasrv {
|
||||
}
|
||||
|
||||
info!("Metasrv started");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_default_schema_if_not_exist(&self) -> Result<()> {
|
||||
self.table_metadata_manager
|
||||
.init()
|
||||
.await
|
||||
.context(InitMetadataSnafu)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
@@ -453,6 +456,20 @@ impl Metasrv {
|
||||
.context(StopProcedureManagerSnafu)
|
||||
}
|
||||
|
||||
pub fn start_time_ms(&self) -> u64 {
|
||||
self.start_time_ms
|
||||
}
|
||||
|
||||
pub fn node_info(&self) -> MetasrvNodeInfo {
|
||||
let build_info = common_version::build_info();
|
||||
MetasrvNodeInfo {
|
||||
addr: self.options().server_addr.clone(),
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Lookup a peer by peer_id, return it only when it's alive.
|
||||
pub(crate) async fn lookup_peer(
|
||||
&self,
|
||||
|
||||
@@ -360,6 +360,7 @@ impl MetasrvBuilder {
|
||||
Ok(Metasrv {
|
||||
state,
|
||||
started,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
options,
|
||||
in_memory,
|
||||
kv_backend,
|
||||
|
||||
@@ -88,35 +88,23 @@ impl cluster_server::Cluster for Metasrv {
|
||||
return Ok(Response::new(resp));
|
||||
}
|
||||
|
||||
fn make_node_info(addr: String) -> Option<MetasrvNodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
Some(
|
||||
metasrv::MetasrvNodeInfo {
|
||||
addr,
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
|
||||
let leader_addr = &self.options().server_addr;
|
||||
let (leader, followers) = match self.election() {
|
||||
Some(election) => {
|
||||
let leader = election.leader().await?;
|
||||
let nodes = election.all_candidates().await?;
|
||||
let followers = nodes
|
||||
.into_iter()
|
||||
.filter(|node_info| node_info.addr != leader.0)
|
||||
.filter(|node_info| &node_info.addr != leader_addr)
|
||||
.map(api::v1::meta::MetasrvNodeInfo::from)
|
||||
.collect();
|
||||
(make_node_info(leader.0.clone()), followers)
|
||||
(self.node_info().into(), followers)
|
||||
}
|
||||
None => (make_node_info(self.options().server_addr.clone()), vec![]),
|
||||
None => (self.make_node_info(leader_addr), vec![]),
|
||||
};
|
||||
|
||||
let resp = MetasrvPeersResponse {
|
||||
header: Some(ResponseHeader::success(0)),
|
||||
leader,
|
||||
leader: Some(leader),
|
||||
followers,
|
||||
};
|
||||
|
||||
@@ -129,4 +117,15 @@ impl Metasrv {
|
||||
// Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
|
||||
self.election().map(|x| x.is_leader()).unwrap_or(true)
|
||||
}
|
||||
|
||||
fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
|
||||
let build_info = common_version::build_info();
|
||||
metasrv::MetasrvNodeInfo {
|
||||
addr: addr.to_string(),
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms: self.start_time_ms(),
|
||||
}
|
||||
.into()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ DESC TABLE CLUSTER_INFO;
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|FRONTEND|127.0.0.1:4001|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
@@ -35,7 +35,7 @@ SELECT * FROM CLUSTER_INFO ORDER BY peer_type;
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'METASRV' ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
@@ -55,7 +55,7 @@ SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE = 'FRONTEND' ORDER BY peer_type;
|
||||
-- SQLNESS REPLACE [\s\-]+
|
||||
SELECT * FROM CLUSTER_INFO WHERE PEER_TYPE != 'FRONTEND' ORDER BY peer_type;
|
||||
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash||||+++++++++
|
||||
+++++++++|peer_id|peer_type|peer_addr|node_version|git_commit|start_time|uptime|active_time|+++++++++|1|DATANODE|127.0.0.1:4101|Version|Hash|Start_time|Duration|Duration||2|DATANODE|127.0.0.1:4102|Version|Hash|Start_time|Duration|Duration||3|DATANODE|127.0.0.1:4103|Version|Hash|Start_time|Duration|Duration||1|METASRV|127.0.0.1:3002|Version|Hash|Start_time|Duration||+++++++++
|
||||
|
||||
-- SQLNESS REPLACE version node_version
|
||||
-- SQLNESS REPLACE (\s\d\.\d\.\d\s) Version
|
||||
|
||||
Reference in New Issue
Block a user