diff --git a/Cargo.lock b/Cargo.lock index b10396cb7b..e75c0edbcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4237,7 +4237,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ae26136accd82fbdf8be540cd502f2e94951077e#ae26136accd82fbdf8be540cd502f2e94951077e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a70a6af9c69e40f9a918936a48717343402b4393#a70a6af9c69e40f9a918936a48717343402b4393" dependencies = [ "prost 0.12.6", "serde", diff --git a/Cargo.toml b/Cargo.toml index 0cb9d9b939..9921484f82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ae26136accd82fbdf8be540cd502f2e94951077e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a70a6af9c69e40f9a918936a48717343402b4393" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 75da644fff..0cf593f1ca 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -99,6 +99,7 @@ pub struct NodeInfo { pub enum Role { Datanode, Frontend, + Flownode, Metasrv, } @@ -106,6 +107,7 @@ pub enum Role { pub enum NodeStatus { Datanode(DatanodeStatus), Frontend(FrontendStatus), + Flownode(FlownodeStatus), Metasrv(MetasrvStatus), Standalone, } @@ -116,6 +118,7 @@ impl NodeStatus { match self { NodeStatus::Datanode(_) => "DATANODE", NodeStatus::Frontend(_) => "FRONTEND", + NodeStatus::Flownode(_) => "FLOWNODE", NodeStatus::Metasrv(_) => "METASRV", NodeStatus::Standalone => "STANDALONE", } @@ -139,6 +142,10 @@ pub struct DatanodeStatus { #[derive(Debug, Serialize, Deserialize)] pub struct FrontendStatus {} +/// The status of a flownode. +#[derive(Debug, Serialize, Deserialize)] +pub struct FlownodeStatus {} + /// The status of a metasrv. #[derive(Debug, Serialize, Deserialize)] pub struct MetasrvStatus { @@ -235,7 +242,8 @@ impl From for i32 { match role { Role::Datanode => 0, Role::Frontend => 1, - Role::Metasrv => 2, + Role::Flownode => 2, + Role::Metasrv => 99, } } } @@ -247,7 +255,8 @@ impl TryFrom for Role { match role { 0 => Ok(Self::Datanode), 1 => Ok(Self::Frontend), - 2 => Ok(Self::Metasrv), + 2 => Ok(Self::Flownode), + 99 => Ok(Self::Metasrv), _ => InvalidRoleSnafu { role }.fail(), } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ed518fcebe..b8474866bf 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -331,7 +331,6 @@ impl HeartbeatTask { // TODO(ruihang): scratch more info rcus: 0, wcus: 0, - approximate_rows: 0, }; region_stats.push(region_stat); } diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 6fc3937e4e..f42e84acd4 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -14,7 +14,9 @@ use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role}; use common_meta::cluster; -use common_meta::cluster::{DatanodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus}; +use common_meta::cluster::{ + DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, +}; use common_meta::peer::Peer; use common_meta::rpc::store::PutRequest; use snafu::ResultExt; @@ -59,6 +61,39 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { } } +/// The handler to collect cluster info from the heartbeat request of flownode. +pub struct CollectFlownodeClusterInfoHandler; +#[async_trait::async_trait] +impl HeartbeatHandler for CollectFlownodeClusterInfoHandler { + fn is_acceptable(&self, role: Role) -> bool { + role == Role::Flownode + } + + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result { + let Some((key, peer, info)) = extract_base_info(req, Role::Flownode) else { + return Ok(HandleControl::Continue); + }; + + let value = NodeInfo { + peer, + last_activity_ts: common_time::util::current_time_millis(), + status: NodeStatus::Flownode(FlownodeStatus {}), + version: info.version, + git_commit: info.git_commit, + start_time_ms: info.start_time_ms, + }; + + save_to_mem_store(key, value, ctx).await?; + + Ok(HandleControl::Continue) + } +} + /// The handler to collect cluster info from the heartbeat request of datanode. pub struct CollectDatanodeClusterInfoHandler; @@ -132,6 +167,7 @@ fn extract_base_info( role: match role { Role::Datanode => cluster::Role::Datanode, Role::Frontend => cluster::Role::Frontend, + Role::Flownode => cluster::Role::Flownode, }, node_id: peer.id, }, diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 75d15ae2c2..dffeb2eb18 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -140,7 +140,6 @@ mod tests { rcus: 0, wcus: 0, approximate_bytes: 0, - approximate_rows: 0, engine: default_engine().to_string(), role: RegionRole::Follower, } diff --git a/src/meta-srv/src/handler/node_stat.rs b/src/meta-srv/src/handler/node_stat.rs index 09a2f98213..e8f3f4d67f 100644 --- a/src/meta-srv/src/handler/node_stat.rs +++ b/src/meta-srv/src/handler/node_stat.rs @@ -53,8 +53,6 @@ pub struct RegionStat { pub wcus: i64, /// Approximate bytes of this region pub approximate_bytes: i64, - /// Approximate number of rows in this region - pub approximate_rows: i64, /// The engine name. pub engine: String, /// The region role. @@ -142,7 +140,6 @@ impl TryFrom for RegionStat { rcus: value.rcus, wcus: value.wcus, approximate_bytes: value.approximate_bytes, - approximate_rows: value.approximate_rows, engine: value.engine.to_string(), role: RegionRole::from(value.role()), }) diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 83d190ef8a..2481e86c8f 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -134,7 +134,6 @@ mod test { rcus: 0, wcus: 0, approximate_bytes: 0, - approximate_rows: 0, engine: String::new(), } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index ce812cfba8..9e7032b6a8 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -483,7 +483,6 @@ impl Metasrv { .await } - #[inline] pub fn options(&self) -> &MetasrvOptions { &self.options } diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index c897ebf174..b1a75c6ad9 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -194,7 +194,6 @@ mod tests { rcus: 1, wcus: 1, approximate_bytes: 1, - approximate_rows: 1, engine: "mito2".to_string(), role: RegionRole::Leader, }], @@ -211,7 +210,6 @@ mod tests { rcus: 1, wcus: 1, approximate_bytes: 1, - approximate_rows: 1, engine: "mito2".to_string(), role: RegionRole::Leader, }], @@ -228,7 +226,6 @@ mod tests { rcus: 1, wcus: 1, approximate_bytes: 1, - approximate_rows: 1, engine: "mito2".to_string(), role: RegionRole::Leader, }], diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 542793b128..19ac09249f 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -164,7 +164,7 @@ fn get_node_id(header: &RequestHeader) -> u64 { match header.role() { Role::Frontend => next_id(), - Role::Datanode => header.member_id, + Role::Datanode | Role::Flownode => header.member_id, } }