From 027707d9698006732fdd643f112fe105e8225566 Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Mon, 15 May 2023 17:54:45 +0800 Subject: [PATCH] feat: support frontend-meta heartbeat (#1555) * feat: support frontend heartbeat * fix: typo "reponse" -> "response" * add ut * enable start heartbeat task * chore: frontend id is specified by metasrv, not in the frontend startup parameter * fix typo * self-cr * cr * cr * cr * remove unnecessary headers * use the member id in the header as the node id --- Cargo.lock | 3 +- src/api/Cargo.toml | 2 +- src/datanode/src/heartbeat.rs | 1 + src/datanode/src/instance.rs | 3 +- src/datanode/src/mock.rs | 3 +- src/frontend/src/error.rs | 15 ++- src/frontend/src/heartbeat.rs | 116 ++++++++++++++++++ src/frontend/src/instance.rs | 18 ++- src/frontend/src/lib.rs | 1 + src/meta-client/examples/lock.rs | 3 +- src/meta-client/examples/meta_client.rs | 4 +- src/meta-client/src/client.rs | 35 ++++-- src/meta-client/src/client/heartbeat.rs | 29 +++-- src/meta-client/src/client/lock.rs | 16 +-- src/meta-client/src/client/router.rs | 18 +-- src/meta-client/src/client/store.rs | 28 +++-- src/meta-client/src/mocks.rs | 3 +- src/meta-srv/Cargo.toml | 1 + src/meta-srv/src/error.rs | 4 + src/meta-srv/src/handler.rs | 6 +- .../src/handler/response_header_handler.rs | 2 +- src/meta-srv/src/service/heartbeat.rs | 71 +++++++++-- tests-integration/src/tests.rs | 3 +- 23 files changed, 304 insertions(+), 81 deletions(-) create mode 100644 src/frontend/src/heartbeat.rs diff --git a/Cargo.lock b/Cargo.lock index c883770ef1..c4966c45be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3860,7 +3860,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1552a21e77752f1baf8062b937584b80de84e0b3#1552a21e77752f1baf8062b937584b80de84e0b3" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6bfb02057c40da0e397c0cb4f6b87bd769669d50#6bfb02057c40da0e397c0cb4f6b87bd769669d50" dependencies = [ "prost", "tonic 0.9.2", @@ -4859,6 +4859,7 @@ dependencies = [ "http-body", "lazy_static", "metrics", + "once_cell", "parking_lot", "prost", "rand", diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index d8e718b73e..677c15f754 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -10,7 +10,7 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1552a21e77752f1baf8062b937584b80de84e0b3" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6bfb02057c40da0e397c0cb4f6b87bd769669d50" } prost.workspace = true snafu = { version = "0.7", features = ["backtraces"] } tonic.workspace = true diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 9ce24d066e..4e0f9f6d9b 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -126,6 +126,7 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); let meta_client = self.meta_client.clone(); let catalog_manager_clone = self.catalog_manager.clone(); + let handler_executor = self.heartbeat_response_handler_exector.clone(); let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16); diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 707d8c3232..95c79ded04 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use std::{fs, path}; +use api::v1::meta::Role; use catalog::remote::MetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::readable_size::ReadableSize; @@ -513,7 +514,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Re let mut channel_manager = ChannelManager::with_config(config); channel_manager.start_channel_recycle(); - let mut meta_client = MetaClientBuilder::new(cluster_id, member_id) + let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) .enable_heartbeat() .enable_router() .enable_store() diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 179fda6ac2..9ffd562abb 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::meta::Role; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_srv::mocks::MockInfo; use storage::compaction::noop::NoopCompactionScheduler; @@ -42,7 +43,7 @@ async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient { } = mock_info; let id = (1000u64, 2000u64); - let mut meta_client = MetaClientBuilder::new(id.0, node_id) + let mut meta_client = MetaClientBuilder::new(id.0, node_id, Role::Datanode) .enable_heartbeat() .enable_router() .enable_store() diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index c2b6c7d49c..bef9d550db 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -170,7 +170,13 @@ pub enum Error { source: meta_client::error::Error, }, - #[snafu(display("Failed to request Meta, source: {}", source))] + #[snafu(display("Failed to create heartbeat stream to Metasrv, source: {}", source))] + CreateMetaHeartbeatStream { + source: meta_client::error::Error, + location: Location, + }, + + #[snafu(display("Failed to request Metasrv, source: {}", source))] RequestMeta { #[snafu(backtrace)] source: meta_client::error::Error, @@ -620,9 +626,10 @@ impl ErrorExt for Error { Error::Catalog { source, .. } => source.status_code(), Error::CatalogEntrySerde { source, .. } => source.status_code(), - Error::StartMetaClient { source } | Error::RequestMeta { source } => { - source.status_code() - } + Error::StartMetaClient { source } + | Error::RequestMeta { source } + | Error::CreateMetaHeartbeatStream { source, .. } => source.status_code(), + Error::BuildCreateExprOnInsertion { source } | Error::ToTableInsertRequest { source } | Error::ToTableDeleteRequest { source } diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs new file mode 100644 index 0000000000..4ff1bf1120 --- /dev/null +++ b/src/frontend/src/heartbeat.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use api::v1::meta::{HeartbeatRequest, HeartbeatResponse}; +use common_telemetry::tracing::trace; +use common_telemetry::{error, info}; +use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; +use snafu::ResultExt; + +use crate::error; +use crate::error::Result; + +#[derive(Clone)] +pub struct HeartbeatTask { + meta_client: Arc, + report_interval: u64, + retry_interval: u64, +} + +impl HeartbeatTask { + pub fn new(meta_client: Arc, report_interval: u64, retry_interval: u64) -> Self { + HeartbeatTask { + meta_client, + report_interval, + retry_interval, + } + } + + pub async fn start(&self) -> Result<()> { + let (req_sender, resp_stream) = self + .meta_client + .heartbeat() + .await + .context(error::CreateMetaHeartbeatStreamSnafu)?; + + info!("A heartbeat connection has been established with metasrv"); + + self.start_handle_resp_stream(resp_stream); + + self.start_heartbeat_report(req_sender); + + Ok(()) + } + + fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream) { + let capture_self = self.clone(); + let retry_interval = self.retry_interval; + + common_runtime::spawn_bg(async move { + loop { + match resp_stream.message().await { + Ok(Some(resp)) => capture_self.handle_response(resp).await, + Ok(None) => break, + Err(e) => { + error!(e; "Occur error while reading heartbeat response"); + + capture_self + .start_with_retry(Duration::from_secs(retry_interval)) + .await; + + break; + } + } + } + }); + } + + fn start_heartbeat_report(&self, req_sender: HeartbeatSender) { + let report_interval = self.report_interval; + + common_runtime::spawn_bg(async move { + loop { + let req = HeartbeatRequest::default(); + + if let Err(e) = req_sender.send(req.clone()).await { + error!(e; "Failed to send heartbeat to metasrv"); + break; + } else { + trace!("Send a heartbeat request to metasrv, content: {:?}", req); + } + + tokio::time::sleep(Duration::from_secs(report_interval)).await; + } + }); + } + + async fn handle_response(&self, resp: HeartbeatResponse) { + trace!("Received a heartbeat response: {:?}", resp); + } + + async fn start_with_retry(&self, retry_interval: Duration) { + loop { + tokio::time::sleep(retry_interval).await; + + info!("Try to re-establish the heartbeat connection to metasrv."); + + if self.start().await.is_ok() { + break; + } + } + } +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index fbbed42af7..e3e5efd42b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -27,6 +27,7 @@ use std::time::Duration; use api::v1::alter_expr::Kind; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; +use api::v1::meta::Role; use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest}; use async_trait::async_trait; use catalog::remote::MetaKvBackend; @@ -74,6 +75,7 @@ use crate::error::{ }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; +use crate::heartbeat::HeartbeatTask; use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metrics; use crate::script::ScriptExecutor; @@ -114,6 +116,8 @@ pub struct Instance { plugins: Arc, servers: Arc, + + heartbeat_task: Option, } impl Instance { @@ -137,7 +141,7 @@ impl Instance { FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone()); let dist_instance = DistInstance::new( - meta_client, + meta_client.clone(), Arc::new(catalog_manager.clone()), datanode_clients, ); @@ -161,6 +165,8 @@ impl Instance { plugins.insert::(statement_executor.clone()); + let heartbeat_task = Some(HeartbeatTask::new(meta_client, 5, 5)); + Ok(Instance { catalog_manager, script_executor, @@ -170,6 +176,7 @@ impl Instance { grpc_query_handler: dist_instance, plugins: plugins.clone(), servers: Arc::new(HashMap::new()), + heartbeat_task, }) } @@ -193,9 +200,10 @@ impl Instance { let mut channel_manager = ChannelManager::with_config(channel_config); channel_manager.start_channel_recycle(); - let mut meta_client = MetaClientBuilder::new(0, 0) + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend) .enable_router() .enable_store() + .enable_heartbeat() .channel_manager(channel_manager) .build(); meta_client @@ -226,6 +234,7 @@ impl Instance { grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()), plugins: Default::default(), servers: Arc::new(HashMap::new()), + heartbeat_task: None, }) } @@ -262,6 +271,7 @@ impl Instance { grpc_query_handler: dist_instance, plugins: Default::default(), servers: Arc::new(HashMap::new()), + heartbeat_task: None, } } @@ -432,6 +442,10 @@ impl FrontendInstance for Instance { async fn start(&mut self) -> Result<()> { // TODO(hl): Frontend init should move to here + if let Some(heartbeat_task) = &self.heartbeat_task { + heartbeat_task.start().await?; + } + futures::future::try_join_all(self.servers.values().map(start_server)) .await .context(error::StartServerSnafu) diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 8ba07cf21c..412c19c79d 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -21,6 +21,7 @@ pub mod error; pub mod expr_factory; pub mod frontend; pub mod grpc; +pub mod heartbeat; pub mod influxdb; pub mod instance; pub(crate) mod metrics; diff --git a/src/meta-client/examples/lock.rs b/src/meta-client/examples/lock.rs index 2591b4da4b..121f3719fc 100644 --- a/src/meta-client/examples/lock.rs +++ b/src/meta-client/examples/lock.rs @@ -14,6 +14,7 @@ use std::time::Duration; +use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::rpc::lock::{LockRequest, UnlockRequest}; @@ -33,7 +34,7 @@ async fn run() { .connect_timeout(Duration::from_secs(5)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config); - let mut meta_client = MetaClientBuilder::new(id.0, id.1) + let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode) .enable_lock() .channel_manager(channel_manager) .build(); diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index ecfc1a0c12..f48bab7ea3 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer}; +use api::v1::meta::{HeartbeatRequest, Peer, Role}; use chrono::DateTime; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use datatypes::prelude::ConcreteDataType; @@ -43,7 +43,7 @@ async fn run() { .connect_timeout(Duration::from_secs(5)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config); - let mut meta_client = MetaClientBuilder::new(id.0, id.1) + let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode) .enable_heartbeat() .enable_router() .enable_store() diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 6bab64de5b..c48bc8e885 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -18,6 +18,7 @@ mod lock; mod router; mod store; +use api::v1::meta::Role; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::info; use heartbeat::Client as HeartbeatClient; @@ -43,6 +44,7 @@ pub type Id = (u64, u64); #[derive(Clone, Debug, Default)] pub struct MetaClientBuilder { id: Id, + role: Role, enable_heartbeat: bool, enable_router: bool, enable_store: bool, @@ -51,9 +53,10 @@ pub struct MetaClientBuilder { } impl MetaClientBuilder { - pub fn new(cluster_id: u64, member_id: u64) -> Self { + pub fn new(cluster_id: u64, member_id: u64, role: Role) -> Self { Self { id: (cluster_id, member_id), + role, ..Default::default() } } @@ -107,16 +110,16 @@ impl MetaClientBuilder { let mgr = client.channel_manager.clone(); if self.enable_heartbeat { - client.heartbeat = Some(HeartbeatClient::new(self.id, mgr.clone())); + client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone())); } if self.enable_router { - client.router = Some(RouterClient::new(self.id, mgr.clone())); + client.router = Some(RouterClient::new(self.id, self.role, mgr.clone())); } if self.enable_store { - client.store = Some(StoreClient::new(self.id, mgr.clone())); + client.store = Some(StoreClient::new(self.id, self.role, mgr.clone())); } if self.enable_lock { - client.lock = Some(LockClient::new(self.id, mgr)); + client.lock = Some(LockClient::new(self.id, self.role, mgr)); } client @@ -409,28 +412,34 @@ mod tests { async fn test_meta_client_builder() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0).enable_heartbeat().build(); + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) + .enable_heartbeat() + .build(); assert!(meta_client.heartbeat_client().is_ok()); assert!(meta_client.router_client().is_err()); assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); assert!(meta_client.heartbeat_client().unwrap().is_started().await); - let mut meta_client = MetaClientBuilder::new(0, 0).enable_router().build(); + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) + .enable_router() + .build(); assert!(meta_client.heartbeat_client().is_err()); assert!(meta_client.router_client().is_ok()); assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); assert!(meta_client.router_client().unwrap().is_started().await); - let mut meta_client = MetaClientBuilder::new(0, 0).enable_store().build(); + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) + .enable_store() + .build(); assert!(meta_client.heartbeat_client().is_err()); assert!(meta_client.router_client().is_err()); assert!(meta_client.store_client().is_ok()); meta_client.start(urls).await.unwrap(); assert!(meta_client.store_client().unwrap().is_started().await); - let mut meta_client = MetaClientBuilder::new(1, 2) + let mut meta_client = MetaClientBuilder::new(1, 2, Role::Datanode) .enable_heartbeat() .enable_router() .enable_store() @@ -449,7 +458,7 @@ mod tests { #[tokio::test] async fn test_not_start_heartbeat_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0) + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) .enable_router() .enable_store() .build(); @@ -494,7 +503,7 @@ mod tests { #[tokio::test] async fn test_not_start_router_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0) + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) .enable_heartbeat() .enable_store() .build(); @@ -509,7 +518,7 @@ mod tests { #[tokio::test] async fn test_not_start_store_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0) + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) .enable_heartbeat() .enable_router() .build(); @@ -522,7 +531,7 @@ mod tests { #[should_panic] #[test] fn test_failed_when_start_nothing() { - let _ = MetaClientBuilder::new(0, 0).build(); + let _ = MetaClientBuilder::new(0, 0, Role::Datanode).build(); } #[tokio::test] diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 2b2e648434..72129a9c1e 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; -use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader}; +use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; use common_telemetry::{debug, info}; use snafu::{ensure, OptionExt, ResultExt}; @@ -32,13 +32,14 @@ use crate::rpc::util; pub struct HeartbeatSender { id: Id, + role: Role, sender: mpsc::Sender, } impl HeartbeatSender { #[inline] - fn new(id: Id, sender: mpsc::Sender) -> Self { - Self { id, sender } + fn new(id: Id, role: Role, sender: mpsc::Sender) -> Self { + Self { id, role, sender } } #[inline] @@ -48,7 +49,7 @@ impl HeartbeatSender { #[inline] pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> { - req.set_header(self.id); + req.set_header(self.id, self.role); self.sender.send(req).await.map_err(|e| { error::SendHeartbeatSnafu { err_msg: e.to_string(), @@ -92,9 +93,10 @@ pub struct Client { } impl Client { - pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { let inner = Arc::new(RwLock::new(Inner { id, + role, channel_manager, peers: HashSet::default(), leader: None, @@ -132,6 +134,7 @@ impl Client { #[derive(Debug)] struct Inner { id: Id, + role: Role, channel_manager: ChannelManager, peers: HashSet, leader: Option, @@ -167,7 +170,7 @@ impl Inner { } ); - let header = RequestHeader::new(self.id); + let header = RequestHeader::new(self.id, self.role); let mut leader = None; for addr in &self.peers { let req = AskLeaderRequest { @@ -195,8 +198,10 @@ impl Inner { let mut leader = self.make_client(leader)?; let (sender, receiver) = mpsc::channel::(128); + + let header = RequestHeader::new(self.id, self.role); let handshake = HeartbeatRequest { - header: Some(RequestHeader::new(self.id)), + header: Some(header), ..Default::default() }; sender.send(handshake).await.map_err(|e| { @@ -221,7 +226,7 @@ impl Inner { info!("Success to create heartbeat stream to server: {:#?}", res); Ok(( - HeartbeatSender::new(self.id, sender), + HeartbeatSender::new(self.id, self.role, sender), HeartbeatStream::new(self.id, stream), )) } @@ -247,7 +252,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); assert!(!client.is_started().await); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) @@ -258,7 +263,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -274,7 +279,7 @@ mod test { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await @@ -285,7 +290,7 @@ mod test { #[tokio::test] async fn test_heartbeat_stream() { let (sender, mut receiver) = mpsc::channel::(100); - let sender = HeartbeatSender::new((8, 8), sender); + let sender = HeartbeatSender::new((8, 8), Role::Datanode, sender); tokio::spawn(async move { for _ in 0..10 { sender.send(HeartbeatRequest::default()).await.unwrap(); diff --git a/src/meta-client/src/client/lock.rs b/src/meta-client/src/client/lock.rs index eddef25f3e..68098255a7 100644 --- a/src/meta-client/src/client/lock.rs +++ b/src/meta-client/src/client/lock.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::lock_client::LockClient; -use api::v1::meta::{LockRequest, LockResponse, UnlockRequest, UnlockResponse}; +use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse}; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; @@ -32,9 +32,10 @@ pub struct Client { } impl Client { - pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { let inner = Arc::new(RwLock::new(Inner { id, + role, channel_manager, peers: vec![], })); @@ -70,6 +71,7 @@ impl Client { #[derive(Debug)] struct Inner { id: Id, + role: Role, channel_manager: ChannelManager, peers: Vec, } @@ -125,7 +127,7 @@ impl Inner { async fn lock(&self, mut req: LockRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.lock(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -133,7 +135,7 @@ impl Inner { async fn unlock(&self, mut req: UnlockRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.unlock(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -146,7 +148,7 @@ mod tests { #[tokio::test] async fn test_start_client() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); assert!(!client.is_started().await); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) @@ -157,7 +159,7 @@ mod tests { #[tokio::test] async fn test_already_start() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -173,7 +175,7 @@ mod tests { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index 486edfa11f..a32020e9cf 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::router_client::RouterClient; -use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; +use api::v1::meta::{CreateRequest, DeleteRequest, Role, RouteRequest, RouteResponse}; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; @@ -32,9 +32,10 @@ pub struct Client { } impl Client { - pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { let inner = Arc::new(RwLock::new(Inner { id, + role, channel_manager, peers: vec![], })); @@ -75,6 +76,7 @@ impl Client { #[derive(Debug)] struct Inner { id: Id, + role: Role, channel_manager: ChannelManager, peers: Vec, } @@ -105,7 +107,7 @@ impl Inner { async fn create(&self, mut req: CreateRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.create(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -113,7 +115,7 @@ impl Inner { async fn route(&self, mut req: RouteRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.route(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -121,7 +123,7 @@ impl Inner { async fn delete(&self, mut req: DeleteRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.delete(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -159,7 +161,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); assert!(!client.is_started().await); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) @@ -170,7 +172,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -186,7 +188,7 @@ mod test { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index 124e0fcb3f..1164f36633 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -20,7 +20,7 @@ use api::v1::meta::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, - RangeRequest, RangeResponse, + RangeRequest, RangeResponse, Role, }; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; @@ -37,9 +37,10 @@ pub struct Client { } impl Client { - pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { let inner = Arc::new(RwLock::new(Inner { id, + role, channel_manager, peers: vec![], })); @@ -108,6 +109,7 @@ impl Client { #[derive(Debug)] struct Inner { id: Id, + role: Role, channel_manager: ChannelManager, peers: Vec, } @@ -138,7 +140,7 @@ impl Inner { async fn range(&self, mut req: RangeRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.range(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -146,7 +148,7 @@ impl Inner { async fn put(&self, mut req: PutRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client.put(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -154,7 +156,7 @@ impl Inner { async fn batch_get(&self, mut req: BatchGetRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client .batch_get(req) @@ -166,7 +168,7 @@ impl Inner { async fn batch_put(&self, mut req: BatchPutRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client .batch_put(req) .await @@ -177,7 +179,7 @@ impl Inner { async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client .batch_delete(req) .await @@ -191,7 +193,7 @@ impl Inner { mut req: CompareAndPutRequest, ) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client .compare_and_put(req) .await @@ -202,7 +204,7 @@ impl Inner { async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client .delete_range(req) .await @@ -213,7 +215,7 @@ impl Inner { async fn move_value(&self, mut req: MoveValueRequest) -> Result { let mut client = self.random_client()?; - req.set_header(self.id); + req.set_header(self.id, self.role); let res = client .move_value(req) .await @@ -254,7 +256,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); assert!(!client.is_started().await); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) @@ -265,7 +267,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -281,7 +283,7 @@ mod test { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), ChannelManager::default()); + let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs index 0e9eb52232..168c382ce6 100644 --- a/src/meta-client/src/mocks.rs +++ b/src/meta-client/src/mocks.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::meta::Role; use meta_srv::metasrv::SelectorRef; use meta_srv::mocks as server_mock; use meta_srv::mocks::MockInfo; @@ -41,7 +42,7 @@ pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient { } = mock_info; let id = (1000u64, 2000u64); - let mut meta_client = MetaClientBuilder::new(id.0, id.1) + let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode) .enable_heartbeat() .enable_router() .enable_store() diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 3e3491da52..9234367db3 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -30,6 +30,7 @@ h2 = "0.3" http-body = "0.4" lazy_static = "1.4" metrics.workspace = true +once_cell = "1.17" parking_lot = "0.12" prost.workspace = true rand.workspace = true diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9bc7be2c32..1856d65634 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -313,6 +313,9 @@ pub enum Error { err_msg: String, location: Location, }, + + #[snafu(display("Missing request header"))] + MissingRequestHeader { location: Location }, } pub type Result = std::result::Result; @@ -362,6 +365,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::MissingRequiredParameter { .. } + | Error::MissingRequestHeader { .. } | Error::EmptyTableName { .. } | Error::InvalidLeaseKey { .. } | Error::InvalidStatKey { .. } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 64b829e2c6..34064e1591 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -85,11 +85,11 @@ pub struct Pusher { impl Pusher { pub fn new( sender: Sender>, - req_header: &Option, + req_header: &RequestHeader, ) -> Self { let res_header = ResponseHeader { protocol_version: PROTOCOL_VERSION, - cluster_id: req_header.as_ref().map_or(0, |h| h.cluster_id), + cluster_id: req_header.cluster_id, ..Default::default() }; @@ -350,7 +350,7 @@ mod tests { protocol_version: PROTOCOL_VERSION, ..Default::default() }; - let pusher: Pusher = Pusher::new(pusher_tx, &Option::from(res_header)); + let pusher: Pusher = Pusher::new(pusher_tx, &res_header); let handler_group = HeartbeatHandlerGroup::default(); handler_group .register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher) diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index bc80730751..1325cba36b 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -78,7 +78,7 @@ mod tests { }; let req = HeartbeatRequest { - header: Some(RequestHeader::new((1, 2))), + header: Some(RequestHeader::new((1, 2), Role::Datanode)), ..Default::default() }; let mut acc = HeartbeatAccumulator::default(); diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index bc9e42fdc7..489a10b530 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -13,13 +13,16 @@ // limitations under the License. use std::io::ErrorKind; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use api::v1::meta::{ heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse, - Peer, ResponseHeader, + Peer, RequestHeader, ResponseHeader, Role, }; use common_telemetry::{error, info, warn}; use futures::StreamExt; +use once_cell::sync::OnceCell; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Streaming}; @@ -48,14 +51,22 @@ impl heartbeat_server::Heartbeat for MetaSrv { let mut quit = false; match msg { Ok(req) => { - let role = req.header.as_ref().map_or(0, |h| h.role); - if pusher_key.is_none() { - if let Some(peer) = &req.peer { - let key = format!("{}-{}", role, peer.id,); - let pusher = Pusher::new(tx.clone(), &req.header); - handler_group.register(&key, pusher).await; - pusher_key = Some(key); + let header = match req.header.as_ref() { + Some(header) => header, + None => { + let err = error::MissingRequestHeaderSnafu {}.build(); + tx.send(Err(err.into())).await.expect("working rx"); + break; } + }; + + if pusher_key.is_none() { + let node_id = get_node_id(header); + let role = header.role() as i32; + let key = format!("{}-{}", role, node_id); + let pusher = Pusher::new(tx.clone(), header); + handler_group.register(&key, pusher).await; + pusher_key = Some(key); } let res = handler_group @@ -136,6 +147,20 @@ async fn handle_ask_leader(req: AskLeaderRequest, ctx: Context) -> Result u64 { + static ID: OnceCell> = OnceCell::new(); + + fn next_id() -> u64 { + let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone(); + id.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } + + match header.role() { + Role::Frontend => next_id(), + Role::Datanode => header.member_id, + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -144,6 +169,7 @@ mod tests { use api::v1::meta::*; use tonic::IntoRequest; + use super::get_node_id; use crate::metasrv::builder::MetaSrvBuilder; use crate::service::store::memory::MemStore; @@ -154,7 +180,7 @@ mod tests { let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; let req = AskLeaderRequest { - header: Some(RequestHeader::new((1, 1))), + header: Some(RequestHeader::new((1, 1), Role::Datanode)), }; let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); @@ -162,4 +188,31 @@ mod tests { assert_eq!(1, res.header.unwrap().cluster_id); assert_eq!(meta_srv.options().bind_addr, res.leader.unwrap().addr); } + + #[test] + fn test_get_node_id() { + let header = RequestHeader { + role: Role::Datanode.into(), + member_id: 11, + ..Default::default() + }; + assert_eq!(11, get_node_id(&header)); + + let header = RequestHeader { + role: Role::Frontend.into(), + ..Default::default() + }; + for i in 0..10 { + assert_eq!(i, get_node_id(&header)); + } + + let header = RequestHeader { + role: Role::Frontend.into(), + member_id: 11, + ..Default::default() + }; + for i in 10..20 { + assert_eq!(i, get_node_id(&header)); + } + } } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 255997cd28..f58cfca5bb 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -20,6 +20,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use api::v1::meta::Role; use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; use catalog::remote::{MetaKvBackend, RemoteCatalogManager}; use client::Client; @@ -286,7 +287,7 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu server_addr, channel_manager, } = meta_srv.clone(); - let mut meta_client = MetaClientBuilder::new(1000, 0) + let mut meta_client = MetaClientBuilder::new(1000, 0, Role::Frontend) .enable_router() .enable_store() .channel_manager(channel_manager)