mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 12:50:40 +00:00
feat: enable metasrv to accept flownode's heartbeats (#4160)
* feat: make metasrv can accept flownode's heartbeat * chore: proto
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4237,7 +4237,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ae26136accd82fbdf8be540cd502f2e94951077e#ae26136accd82fbdf8be540cd502f2e94951077e"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a70a6af9c69e40f9a918936a48717343402b4393#a70a6af9c69e40f9a918936a48717343402b4393"
|
||||
dependencies = [
|
||||
"prost 0.12.6",
|
||||
"serde",
|
||||
|
||||
@@ -121,7 +121,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ae26136accd82fbdf8be540cd502f2e94951077e" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a70a6af9c69e40f9a918936a48717343402b4393" }
|
||||
humantime = "2.1"
|
||||
humantime-serde = "1.1"
|
||||
itertools = "0.10"
|
||||
|
||||
@@ -99,6 +99,7 @@ pub struct NodeInfo {
|
||||
pub enum Role {
|
||||
Datanode,
|
||||
Frontend,
|
||||
Flownode,
|
||||
Metasrv,
|
||||
}
|
||||
|
||||
@@ -106,6 +107,7 @@ pub enum Role {
|
||||
pub enum NodeStatus {
|
||||
Datanode(DatanodeStatus),
|
||||
Frontend(FrontendStatus),
|
||||
Flownode(FlownodeStatus),
|
||||
Metasrv(MetasrvStatus),
|
||||
Standalone,
|
||||
}
|
||||
@@ -116,6 +118,7 @@ impl NodeStatus {
|
||||
match self {
|
||||
NodeStatus::Datanode(_) => "DATANODE",
|
||||
NodeStatus::Frontend(_) => "FRONTEND",
|
||||
NodeStatus::Flownode(_) => "FLOWNODE",
|
||||
NodeStatus::Metasrv(_) => "METASRV",
|
||||
NodeStatus::Standalone => "STANDALONE",
|
||||
}
|
||||
@@ -139,6 +142,10 @@ pub struct DatanodeStatus {
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FrontendStatus {}
|
||||
|
||||
/// The status of a flownode.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FlownodeStatus {}
|
||||
|
||||
/// The status of a metasrv.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct MetasrvStatus {
|
||||
@@ -235,7 +242,8 @@ impl From<Role> for i32 {
|
||||
match role {
|
||||
Role::Datanode => 0,
|
||||
Role::Frontend => 1,
|
||||
Role::Metasrv => 2,
|
||||
Role::Flownode => 2,
|
||||
Role::Metasrv => 99,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -247,7 +255,8 @@ impl TryFrom<i32> for Role {
|
||||
match role {
|
||||
0 => Ok(Self::Datanode),
|
||||
1 => Ok(Self::Frontend),
|
||||
2 => Ok(Self::Metasrv),
|
||||
2 => Ok(Self::Flownode),
|
||||
99 => Ok(Self::Metasrv),
|
||||
_ => InvalidRoleSnafu { role }.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,7 +331,6 @@ impl HeartbeatTask {
|
||||
// TODO(ruihang): scratch more info
|
||||
rcus: 0,
|
||||
wcus: 0,
|
||||
approximate_rows: 0,
|
||||
};
|
||||
region_stats.push(region_stat);
|
||||
}
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
|
||||
use common_meta::cluster;
|
||||
use common_meta::cluster::{DatanodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus};
|
||||
use common_meta::cluster::{
|
||||
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
|
||||
};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::store::PutRequest;
|
||||
use snafu::ResultExt;
|
||||
@@ -59,6 +61,39 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// The handler to collect cluster info from the heartbeat request of flownode.
|
||||
pub struct CollectFlownodeClusterInfoHandler;
|
||||
#[async_trait::async_trait]
|
||||
impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
|
||||
fn is_acceptable(&self, role: Role) -> bool {
|
||||
role == Role::Flownode
|
||||
}
|
||||
|
||||
async fn handle(
|
||||
&self,
|
||||
req: &HeartbeatRequest,
|
||||
ctx: &mut Context,
|
||||
_acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<HandleControl> {
|
||||
let Some((key, peer, info)) = extract_base_info(req, Role::Flownode) else {
|
||||
return Ok(HandleControl::Continue);
|
||||
};
|
||||
|
||||
let value = NodeInfo {
|
||||
peer,
|
||||
last_activity_ts: common_time::util::current_time_millis(),
|
||||
status: NodeStatus::Flownode(FlownodeStatus {}),
|
||||
version: info.version,
|
||||
git_commit: info.git_commit,
|
||||
start_time_ms: info.start_time_ms,
|
||||
};
|
||||
|
||||
save_to_mem_store(key, value, ctx).await?;
|
||||
|
||||
Ok(HandleControl::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
/// The handler to collect cluster info from the heartbeat request of datanode.
|
||||
pub struct CollectDatanodeClusterInfoHandler;
|
||||
|
||||
@@ -132,6 +167,7 @@ fn extract_base_info(
|
||||
role: match role {
|
||||
Role::Datanode => cluster::Role::Datanode,
|
||||
Role::Frontend => cluster::Role::Frontend,
|
||||
Role::Flownode => cluster::Role::Flownode,
|
||||
},
|
||||
node_id: peer.id,
|
||||
},
|
||||
|
||||
@@ -140,7 +140,6 @@ mod tests {
|
||||
rcus: 0,
|
||||
wcus: 0,
|
||||
approximate_bytes: 0,
|
||||
approximate_rows: 0,
|
||||
engine: default_engine().to_string(),
|
||||
role: RegionRole::Follower,
|
||||
}
|
||||
|
||||
@@ -53,8 +53,6 @@ pub struct RegionStat {
|
||||
pub wcus: i64,
|
||||
/// Approximate bytes of this region
|
||||
pub approximate_bytes: i64,
|
||||
/// Approximate number of rows in this region
|
||||
pub approximate_rows: i64,
|
||||
/// The engine name.
|
||||
pub engine: String,
|
||||
/// The region role.
|
||||
@@ -142,7 +140,6 @@ impl TryFrom<api::v1::meta::RegionStat> for RegionStat {
|
||||
rcus: value.rcus,
|
||||
wcus: value.wcus,
|
||||
approximate_bytes: value.approximate_bytes,
|
||||
approximate_rows: value.approximate_rows,
|
||||
engine: value.engine.to_string(),
|
||||
role: RegionRole::from(value.role()),
|
||||
})
|
||||
|
||||
@@ -134,7 +134,6 @@ mod test {
|
||||
rcus: 0,
|
||||
wcus: 0,
|
||||
approximate_bytes: 0,
|
||||
approximate_rows: 0,
|
||||
engine: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,7 +483,6 @@ impl Metasrv {
|
||||
.await
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn options(&self) -> &MetasrvOptions {
|
||||
&self.options
|
||||
}
|
||||
|
||||
@@ -194,7 +194,6 @@ mod tests {
|
||||
rcus: 1,
|
||||
wcus: 1,
|
||||
approximate_bytes: 1,
|
||||
approximate_rows: 1,
|
||||
engine: "mito2".to_string(),
|
||||
role: RegionRole::Leader,
|
||||
}],
|
||||
@@ -211,7 +210,6 @@ mod tests {
|
||||
rcus: 1,
|
||||
wcus: 1,
|
||||
approximate_bytes: 1,
|
||||
approximate_rows: 1,
|
||||
engine: "mito2".to_string(),
|
||||
role: RegionRole::Leader,
|
||||
}],
|
||||
@@ -228,7 +226,6 @@ mod tests {
|
||||
rcus: 1,
|
||||
wcus: 1,
|
||||
approximate_bytes: 1,
|
||||
approximate_rows: 1,
|
||||
engine: "mito2".to_string(),
|
||||
role: RegionRole::Leader,
|
||||
}],
|
||||
|
||||
@@ -164,7 +164,7 @@ fn get_node_id(header: &RequestHeader) -> u64 {
|
||||
|
||||
match header.role() {
|
||||
Role::Frontend => next_id(),
|
||||
Role::Datanode => header.member_id,
|
||||
Role::Datanode | Role::Flownode => header.member_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user