feat: support frontend-meta heartbeat (#1555)

* feat: support frontend heartbeat

* fix: typo "reponse" -> "response"

* add ut

* enable start heartbeat task

* chore: frontend id is specified by metasrv, not in the frontend startup parameter

* fix typo

* self-cr

* cr

* cr

* cr

* remove unnecessary headers

* use the member id in the header as the node id
This commit is contained in:
fys
2023-05-15 17:54:45 +08:00
committed by GitHub
parent 8d54d40b21
commit 027707d969
23 changed files with 304 additions and 81 deletions

3
Cargo.lock generated
View File

@@ -3860,7 +3860,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1552a21e77752f1baf8062b937584b80de84e0b3#1552a21e77752f1baf8062b937584b80de84e0b3"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=6bfb02057c40da0e397c0cb4f6b87bd769669d50#6bfb02057c40da0e397c0cb4f6b87bd769669d50"
dependencies = [
"prost",
"tonic 0.9.2",
@@ -4859,6 +4859,7 @@ dependencies = [
"http-body",
"lazy_static",
"metrics",
"once_cell",
"parking_lot",
"prost",
"rand",

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1552a21e77752f1baf8062b937584b80de84e0b3" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "6bfb02057c40da0e397c0cb4f6b87bd769669d50" }
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -126,6 +126,7 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();
let handler_executor = self.heartbeat_response_handler_exector.clone();
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, path};
use api::v1::meta::Role;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
@@ -513,7 +514,7 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Re
let mut channel_manager = ChannelManager::with_config(config);
channel_manager.start_channel_recycle();
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id)
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::v1::meta::Role;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_srv::mocks::MockInfo;
use storage::compaction::noop::NoopCompactionScheduler;
@@ -42,7 +43,7 @@ async fn mock_meta_client(mock_info: MockInfo, node_id: u64) -> MetaClient {
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, node_id)
let mut meta_client = MetaClientBuilder::new(id.0, node_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()

View File

@@ -170,7 +170,13 @@ pub enum Error {
source: meta_client::error::Error,
},
#[snafu(display("Failed to request Meta, source: {}", source))]
#[snafu(display("Failed to create heartbeat stream to Metasrv, source: {}", source))]
CreateMetaHeartbeatStream {
source: meta_client::error::Error,
location: Location,
},
#[snafu(display("Failed to request Metasrv, source: {}", source))]
RequestMeta {
#[snafu(backtrace)]
source: meta_client::error::Error,
@@ -620,9 +626,10 @@ impl ErrorExt for Error {
Error::Catalog { source, .. } => source.status_code(),
Error::CatalogEntrySerde { source, .. } => source.status_code(),
Error::StartMetaClient { source } | Error::RequestMeta { source } => {
source.status_code()
}
Error::StartMetaClient { source }
| Error::RequestMeta { source }
| Error::CreateMetaHeartbeatStream { source, .. } => source.status_code(),
Error::BuildCreateExprOnInsertion { source }
| Error::ToTableInsertRequest { source }
| Error::ToTableDeleteRequest { source }

View File

@@ -0,0 +1,116 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse};
use common_telemetry::tracing::trace;
use common_telemetry::{error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use snafu::ResultExt;
use crate::error;
use crate::error::Result;
#[derive(Clone)]
pub struct HeartbeatTask {
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
}
impl HeartbeatTask {
pub fn new(meta_client: Arc<MetaClient>, report_interval: u64, retry_interval: u64) -> Self {
HeartbeatTask {
meta_client,
report_interval,
retry_interval,
}
}
pub async fn start(&self) -> Result<()> {
let (req_sender, resp_stream) = self
.meta_client
.heartbeat()
.await
.context(error::CreateMetaHeartbeatStreamSnafu)?;
info!("A heartbeat connection has been established with metasrv");
self.start_handle_resp_stream(resp_stream);
self.start_heartbeat_report(req_sender);
Ok(())
}
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream) {
let capture_self = self.clone();
let retry_interval = self.retry_interval;
common_runtime::spawn_bg(async move {
loop {
match resp_stream.message().await {
Ok(Some(resp)) => capture_self.handle_response(resp).await,
Ok(None) => break,
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_secs(retry_interval))
.await;
break;
}
}
}
});
}
fn start_heartbeat_report(&self, req_sender: HeartbeatSender) {
let report_interval = self.report_interval;
common_runtime::spawn_bg(async move {
loop {
let req = HeartbeatRequest::default();
if let Err(e) = req_sender.send(req.clone()).await {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
trace!("Send a heartbeat request to metasrv, content: {:?}", req);
}
tokio::time::sleep(Duration::from_secs(report_interval)).await;
}
});
}
async fn handle_response(&self, resp: HeartbeatResponse) {
trace!("Received a heartbeat response: {:?}", resp);
}
async fn start_with_retry(&self, retry_interval: Duration) {
loop {
tokio::time::sleep(retry_interval).await;
info!("Try to re-establish the heartbeat connection to metasrv.");
if self.start().await.is_ok() {
break;
}
}
}
}

View File

@@ -27,6 +27,7 @@ use std::time::Duration;
use api::v1::alter_expr::Kind;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::meta::Role;
use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
@@ -74,6 +75,7 @@ use crate::error::{
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::heartbeat::HeartbeatTask;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metrics;
use crate::script::ScriptExecutor;
@@ -114,6 +116,8 @@ pub struct Instance {
plugins: Arc<Plugins>,
servers: Arc<ServerHandlers>,
heartbeat_task: Option<HeartbeatTask>,
}
impl Instance {
@@ -137,7 +141,7 @@ impl Instance {
FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone());
let dist_instance = DistInstance::new(
meta_client,
meta_client.clone(),
Arc::new(catalog_manager.clone()),
datanode_clients,
);
@@ -161,6 +165,8 @@ impl Instance {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let heartbeat_task = Some(HeartbeatTask::new(meta_client, 5, 5));
Ok(Instance {
catalog_manager,
script_executor,
@@ -170,6 +176,7 @@ impl Instance {
grpc_query_handler: dist_instance,
plugins: plugins.clone(),
servers: Arc::new(HashMap::new()),
heartbeat_task,
})
}
@@ -193,9 +200,10 @@ impl Instance {
let mut channel_manager = ChannelManager::with_config(channel_config);
channel_manager.start_channel_recycle();
let mut meta_client = MetaClientBuilder::new(0, 0)
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
.channel_manager(channel_manager)
.build();
meta_client
@@ -226,6 +234,7 @@ impl Instance {
grpc_query_handler: StandaloneGrpcQueryHandler::arc(dn_instance.clone()),
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
})
}
@@ -262,6 +271,7 @@ impl Instance {
grpc_query_handler: dist_instance,
plugins: Default::default(),
servers: Arc::new(HashMap::new()),
heartbeat_task: None,
}
}
@@ -432,6 +442,10 @@ impl FrontendInstance for Instance {
async fn start(&mut self) -> Result<()> {
// TODO(hl): Frontend init should move to here
if let Some(heartbeat_task) = &self.heartbeat_task {
heartbeat_task.start().await?;
}
futures::future::try_join_all(self.servers.values().map(start_server))
.await
.context(error::StartServerSnafu)

View File

@@ -21,6 +21,7 @@ pub mod error;
pub mod expr_factory;
pub mod frontend;
pub mod grpc;
pub mod heartbeat;
pub mod influxdb;
pub mod instance;
pub(crate) mod metrics;

View File

@@ -14,6 +14,7 @@
use std::time::Duration;
use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::rpc::lock::{LockRequest, UnlockRequest};
@@ -33,7 +34,7 @@ async fn run() {
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
.enable_lock()
.channel_manager(channel_manager)
.build();

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer};
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use chrono::DateTime;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use datatypes::prelude::ConcreteDataType;
@@ -43,7 +43,7 @@ async fn run() {
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()

View File

@@ -18,6 +18,7 @@ mod lock;
mod router;
mod store;
use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::info;
use heartbeat::Client as HeartbeatClient;
@@ -43,6 +44,7 @@ pub type Id = (u64, u64);
#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
id: Id,
role: Role,
enable_heartbeat: bool,
enable_router: bool,
enable_store: bool,
@@ -51,9 +53,10 @@ pub struct MetaClientBuilder {
}
impl MetaClientBuilder {
pub fn new(cluster_id: u64, member_id: u64) -> Self {
pub fn new(cluster_id: u64, member_id: u64, role: Role) -> Self {
Self {
id: (cluster_id, member_id),
role,
..Default::default()
}
}
@@ -107,16 +110,16 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
client.heartbeat = Some(HeartbeatClient::new(self.id, mgr.clone()));
client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_router {
client.router = Some(RouterClient::new(self.id, mgr.clone()));
client.router = Some(RouterClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_store {
client.store = Some(StoreClient::new(self.id, mgr.clone()));
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, mgr));
client.lock = Some(LockClient::new(self.id, self.role, mgr));
}
client
@@ -409,28 +412,34 @@ mod tests {
async fn test_meta_client_builder() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0).enable_heartbeat().build();
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_heartbeat()
.build();
assert!(meta_client.heartbeat_client().is_ok());
assert!(meta_client.router_client().is_err());
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.heartbeat_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(0, 0).enable_router().build();
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_router()
.build();
assert!(meta_client.heartbeat_client().is_err());
assert!(meta_client.router_client().is_ok());
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
assert!(meta_client.router_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(0, 0).enable_store().build();
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_store()
.build();
assert!(meta_client.heartbeat_client().is_err());
assert!(meta_client.router_client().is_err());
assert!(meta_client.store_client().is_ok());
meta_client.start(urls).await.unwrap();
assert!(meta_client.store_client().unwrap().is_started().await);
let mut meta_client = MetaClientBuilder::new(1, 2)
let mut meta_client = MetaClientBuilder::new(1, 2, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
@@ -449,7 +458,7 @@ mod tests {
#[tokio::test]
async fn test_not_start_heartbeat_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0)
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_router()
.enable_store()
.build();
@@ -494,7 +503,7 @@ mod tests {
#[tokio::test]
async fn test_not_start_router_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0)
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_heartbeat()
.enable_store()
.build();
@@ -509,7 +518,7 @@ mod tests {
#[tokio::test]
async fn test_not_start_store_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0)
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_heartbeat()
.enable_router()
.build();
@@ -522,7 +531,7 @@ mod tests {
#[should_panic]
#[test]
fn test_failed_when_start_nothing() {
let _ = MetaClientBuilder::new(0, 0).build();
let _ = MetaClientBuilder::new(0, 0, Role::Datanode).build();
}
#[tokio::test]

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader};
use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::{debug, info};
use snafu::{ensure, OptionExt, ResultExt};
@@ -32,13 +32,14 @@ use crate::rpc::util;
pub struct HeartbeatSender {
id: Id,
role: Role,
sender: mpsc::Sender<HeartbeatRequest>,
}
impl HeartbeatSender {
#[inline]
fn new(id: Id, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
Self { id, sender }
fn new(id: Id, role: Role, sender: mpsc::Sender<HeartbeatRequest>) -> Self {
Self { id, role, sender }
}
#[inline]
@@ -48,7 +49,7 @@ impl HeartbeatSender {
#[inline]
pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> {
req.set_header(self.id);
req.set_header(self.id, self.role);
self.sender.send(req).await.map_err(|e| {
error::SendHeartbeatSnafu {
err_msg: e.to_string(),
@@ -92,9 +93,10 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: HashSet::default(),
leader: None,
@@ -132,6 +134,7 @@ impl Client {
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: HashSet<String>,
leader: Option<String>,
@@ -167,7 +170,7 @@ impl Inner {
}
);
let header = RequestHeader::new(self.id);
let header = RequestHeader::new(self.id, self.role);
let mut leader = None;
for addr in &self.peers {
let req = AskLeaderRequest {
@@ -195,8 +198,10 @@ impl Inner {
let mut leader = self.make_client(leader)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
let header = RequestHeader::new(self.id, self.role);
let handshake = HeartbeatRequest {
header: Some(RequestHeader::new(self.id)),
header: Some(header),
..Default::default()
};
sender.send(handshake).await.map_err(|e| {
@@ -221,7 +226,7 @@ impl Inner {
info!("Success to create heartbeat stream to server: {:#?}", res);
Ok((
HeartbeatSender::new(self.id, sender),
HeartbeatSender::new(self.id, self.role, sender),
HeartbeatStream::new(self.id, stream),
))
}
@@ -247,7 +252,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
@@ -258,7 +263,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -274,7 +279,7 @@ mod test {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
@@ -285,7 +290,7 @@ mod test {
#[tokio::test]
async fn test_heartbeat_stream() {
let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);
let sender = HeartbeatSender::new((8, 8), sender);
let sender = HeartbeatSender::new((8, 8), Role::Datanode, sender);
tokio::spawn(async move {
for _ in 0..10 {
sender.send(HeartbeatRequest::default()).await.unwrap();

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::lock_client::LockClient;
use api::v1::meta::{LockRequest, LockResponse, UnlockRequest, UnlockResponse};
use api::v1::meta::{LockRequest, LockResponse, Role, UnlockRequest, UnlockResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
@@ -32,9 +32,10 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: vec![],
}));
@@ -70,6 +71,7 @@ impl Client {
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -125,7 +127,7 @@ impl Inner {
async fn lock(&self, mut req: LockRequest) -> Result<LockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.lock(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -133,7 +135,7 @@ impl Inner {
async fn unlock(&self, mut req: UnlockRequest) -> Result<UnlockResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.unlock(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -146,7 +148,7 @@ mod tests {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
@@ -157,7 +159,7 @@ mod tests {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -173,7 +175,7 @@ mod tests {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use api::v1::meta::{CreateRequest, DeleteRequest, Role, RouteRequest, RouteResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
@@ -32,9 +32,10 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: vec![],
}));
@@ -75,6 +76,7 @@ impl Client {
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -105,7 +107,7 @@ impl Inner {
async fn create(&self, mut req: CreateRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -113,7 +115,7 @@ impl Inner {
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.route(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -121,7 +123,7 @@ impl Inner {
async fn delete(&self, mut req: DeleteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.delete(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -159,7 +161,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
@@ -170,7 +172,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -186,7 +188,7 @@ mod test {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await

View File

@@ -20,7 +20,7 @@ use api::v1::meta::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse,
RangeRequest, RangeResponse,
RangeRequest, RangeResponse, Role,
};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
@@ -37,9 +37,10 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: vec![],
}));
@@ -108,6 +109,7 @@ impl Client {
#[derive(Debug)]
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: Vec<String>,
}
@@ -138,7 +140,7 @@ impl Inner {
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.range(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -146,7 +148,7 @@ impl Inner {
async fn put(&self, mut req: PutRequest) -> Result<PutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client.put(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
@@ -154,7 +156,7 @@ impl Inner {
async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client
.batch_get(req)
@@ -166,7 +168,7 @@ impl Inner {
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client
.batch_put(req)
.await
@@ -177,7 +179,7 @@ impl Inner {
async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client
.batch_delete(req)
.await
@@ -191,7 +193,7 @@ impl Inner {
mut req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client
.compare_and_put(req)
.await
@@ -202,7 +204,7 @@ impl Inner {
async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client
.delete_range(req)
.await
@@ -213,7 +215,7 @@ impl Inner {
async fn move_value(&self, mut req: MoveValueRequest) -> Result<MoveValueResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
req.set_header(self.id, self.role);
let res = client
.move_value(req)
.await
@@ -254,7 +256,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
@@ -265,7 +267,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await
@@ -281,7 +283,7 @@ mod test {
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), ChannelManager::default());
let mut client = Client::new((0, 0), Role::Frontend, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::Role;
use meta_srv::metasrv::SelectorRef;
use meta_srv::mocks as server_mock;
use meta_srv::mocks::MockInfo;
@@ -41,7 +42,7 @@ pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient {
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()

View File

@@ -30,6 +30,7 @@ h2 = "0.3"
http-body = "0.4"
lazy_static = "1.4"
metrics.workspace = true
once_cell = "1.17"
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true

View File

@@ -313,6 +313,9 @@ pub enum Error {
err_msg: String,
location: Location,
},
#[snafu(display("Missing request header"))]
MissingRequestHeader { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -362,6 +365,7 @@ impl ErrorExt for Error {
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
| Error::MissingRequestHeader { .. }
| Error::EmptyTableName { .. }
| Error::InvalidLeaseKey { .. }
| Error::InvalidStatKey { .. }

View File

@@ -85,11 +85,11 @@ pub struct Pusher {
impl Pusher {
pub fn new(
sender: Sender<std::result::Result<HeartbeatResponse, tonic::Status>>,
req_header: &Option<RequestHeader>,
req_header: &RequestHeader,
) -> Self {
let res_header = ResponseHeader {
protocol_version: PROTOCOL_VERSION,
cluster_id: req_header.as_ref().map_or(0, |h| h.cluster_id),
cluster_id: req_header.cluster_id,
..Default::default()
};
@@ -350,7 +350,7 @@ mod tests {
protocol_version: PROTOCOL_VERSION,
..Default::default()
};
let pusher: Pusher = Pusher::new(pusher_tx, &Option::from(res_header));
let pusher: Pusher = Pusher::new(pusher_tx, &res_header);
let handler_group = HeartbeatHandlerGroup::default();
handler_group
.register(format!("{}-{}", Role::Datanode as i32, datanode_id), pusher)

View File

@@ -78,7 +78,7 @@ mod tests {
};
let req = HeartbeatRequest {
header: Some(RequestHeader::new((1, 2))),
header: Some(RequestHeader::new((1, 2), Role::Datanode)),
..Default::default()
};
let mut acc = HeartbeatAccumulator::default();

View File

@@ -13,13 +13,16 @@
// limitations under the License.
use std::io::ErrorKind;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use api::v1::meta::{
heartbeat_server, AskLeaderRequest, AskLeaderResponse, HeartbeatRequest, HeartbeatResponse,
Peer, ResponseHeader,
Peer, RequestHeader, ResponseHeader, Role,
};
use common_telemetry::{error, info, warn};
use futures::StreamExt;
use once_cell::sync::OnceCell;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Streaming};
@@ -48,14 +51,22 @@ impl heartbeat_server::Heartbeat for MetaSrv {
let mut quit = false;
match msg {
Ok(req) => {
let role = req.header.as_ref().map_or(0, |h| h.role);
if pusher_key.is_none() {
if let Some(peer) = &req.peer {
let key = format!("{}-{}", role, peer.id,);
let pusher = Pusher::new(tx.clone(), &req.header);
handler_group.register(&key, pusher).await;
pusher_key = Some(key);
let header = match req.header.as_ref() {
Some(header) => header,
None => {
let err = error::MissingRequestHeaderSnafu {}.build();
tx.send(Err(err.into())).await.expect("working rx");
break;
}
};
if pusher_key.is_none() {
let node_id = get_node_id(header);
let role = header.role() as i32;
let key = format!("{}-{}", role, node_id);
let pusher = Pusher::new(tx.clone(), header);
handler_group.register(&key, pusher).await;
pusher_key = Some(key);
}
let res = handler_group
@@ -136,6 +147,20 @@ async fn handle_ask_leader(req: AskLeaderRequest, ctx: Context) -> Result<AskLea
Ok(AskLeaderResponse { header, leader })
}
fn get_node_id(header: &RequestHeader) -> u64 {
static ID: OnceCell<Arc<AtomicU64>> = OnceCell::new();
fn next_id() -> u64 {
let id = ID.get_or_init(|| Arc::new(AtomicU64::new(0))).clone();
id.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
match header.role() {
Role::Frontend => next_id(),
Role::Datanode => header.member_id,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -144,6 +169,7 @@ mod tests {
use api::v1::meta::*;
use tonic::IntoRequest;
use super::get_node_id;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::service::store::memory::MemStore;
@@ -154,7 +180,7 @@ mod tests {
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = AskLeaderRequest {
header: Some(RequestHeader::new((1, 1))),
header: Some(RequestHeader::new((1, 1), Role::Datanode)),
};
let res = meta_srv.ask_leader(req.into_request()).await.unwrap();
@@ -162,4 +188,31 @@ mod tests {
assert_eq!(1, res.header.unwrap().cluster_id);
assert_eq!(meta_srv.options().bind_addr, res.leader.unwrap().addr);
}
#[test]
fn test_get_node_id() {
let header = RequestHeader {
role: Role::Datanode.into(),
member_id: 11,
..Default::default()
};
assert_eq!(11, get_node_id(&header));
let header = RequestHeader {
role: Role::Frontend.into(),
..Default::default()
};
for i in 0..10 {
assert_eq!(i, get_node_id(&header));
}
let header = RequestHeader {
role: Role::Frontend.into(),
member_id: 11,
..Default::default()
};
for i in 10..20 {
assert_eq!(i, get_node_id(&header));
}
}
}

View File

@@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::remote::{MetaKvBackend, RemoteCatalogManager};
use client::Client;
@@ -286,7 +287,7 @@ pub(crate) async fn create_distributed_instance(test_name: &str) -> MockDistribu
server_addr,
channel_manager,
} = meta_srv.clone();
let mut meta_client = MetaClientBuilder::new(1000, 0)
let mut meta_client = MetaClientBuilder::new(1000, 0, Role::Frontend)
.enable_router()
.enable_store()
.channel_manager(channel_manager)