feat: datanode stats is stored in the mem_kv of meta leader (#943)

* store heartbeat data in memory, instead of etcd

* fix: typo

* fix: license header

* cr
This commit is contained in:
fys
2023-02-07 17:09:28 +08:00
committed by GitHub
parent 926022e14c
commit 83509f31f4
21 changed files with 888 additions and 239 deletions

View File

@@ -21,6 +21,7 @@ fn main() {
"greptime/v1/meta/heartbeat.proto",
"greptime/v1/meta/route.proto",
"greptime/v1/meta/store.proto",
"greptime/v1/meta/cluster.proto",
"prometheus/remote/remote.proto",
],
&["."],

View File

@@ -0,0 +1,27 @@
syntax = "proto3";
package greptime.v1.meta;
import "greptime/v1/meta/common.proto";
import "greptime/v1/meta/store.proto";
// Cluster service is used for communication between meta nodes.
service Cluster {
// Batch get kvs by input keys from leader's in_memory kv store.
rpc BatchGet(BatchGetRequest) returns (BatchGetResponse);
// Range get the kvs from leader's in_memory kv store.
rpc Range(RangeRequest) returns (RangeResponse);
}
message BatchGetRequest {
RequestHeader header = 1;
repeated bytes keys = 2;
}
message BatchGetResponse {
ResponseHeader header = 1;
repeated KeyValue kvs = 2;
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
@@ -22,10 +23,16 @@ use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Router;
use crate::cluster::MetaPeerClient;
use crate::election::etcd::EtcdElection;
use crate::metasrv::{MetaSrv, MetaSrvOptions};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::SelectorType;
use crate::service::admin;
use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::ResetableKvStoreRef;
use crate::service::store::memory::MemStore;
use crate::{error, Result};
@@ -57,6 +64,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(HeartbeatServer::new(meta_srv.clone()))
.add_service(RouterServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv))
}
@@ -69,8 +77,28 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result<MetaSrv> {
Some(EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?),
)
};
let selector = opts.selector.clone().into();
let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), election, None).await;
let in_memory = Arc::new(MemStore::default()) as ResetableKvStoreRef;
let meta_peer_client = MetaPeerClient::new(in_memory.clone(), election.clone());
let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector {
meta_peer_client: meta_peer_client.clone(),
}) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
};
let meta_srv = MetaSrvBuilder::new()
.options(opts)
.kv_store(kv_store)
.in_memory(in_memory)
.selector(selector)
.election(election)
.meta_peer_client(meta_peer_client)
.build()
.await;
meta_srv.start().await;
Ok(meta_srv)
}

245
src/meta-srv/src/cluster.rs Normal file
View File

@@ -0,0 +1,245 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::meta::cluster_client::ClusterClient;
use api::v1::meta::{
BatchGetRequest, BatchGetResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader,
};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::Result;
use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX};
use crate::metasrv::ElectionRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::ResetableKvStoreRef;
use crate::{error, util};
#[derive(Clone)]
pub struct MetaPeerClient {
election: Option<ElectionRef>,
in_memory: ResetableKvStoreRef,
channel_manager: ChannelManager,
}
impl MetaPeerClient {
pub fn new(in_mem: ResetableKvStoreRef, election: Option<ElectionRef>) -> Self {
Self {
election,
in_memory: in_mem,
channel_manager: ChannelManager::default(),
}
}
// Get all datanode stat kvs from leader meta.
pub async fn get_all_dn_stat_kvs(&self) -> Result<HashMap<StatKey, StatValue>> {
let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes();
let range_end = util::get_prefix_end_key(&stat_prefix);
let req = RangeRequest {
key: stat_prefix.clone(),
range_end,
..Default::default()
};
if self.is_leader() {
let kvs = self.in_memory.range(req).await?.kvs;
return to_stat_kv_map(kvs);
}
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();
let leader_addr = election.leader().await?.0;
let channel = self
.channel_manager
.get(&leader_addr)
.context(error::CreateChannelSnafu)?;
let request = tonic::Request::new(req);
let response: RangeResponse = ClusterClient::new(channel)
.range(request)
.await
.context(error::BatchGetSnafu)?
.into_inner();
check_resp_header(&response.header, Context { addr: &leader_addr })?;
to_stat_kv_map(response.kvs)
}
// Get datanode stat kvs from leader meta by input keys.
pub async fn get_dn_stat_kvs(&self, keys: Vec<StatKey>) -> Result<HashMap<StatKey, StatValue>> {
let stat_keys = keys.into_iter().map(|key| key.into()).collect();
let stat_kvs = self.batch_get(stat_keys).await?;
let mut result = HashMap::with_capacity(stat_kvs.len());
for stat_kv in stat_kvs {
let stat_key = stat_kv.key.try_into()?;
let stat_val = stat_kv.value.try_into()?;
result.insert(stat_key, stat_val);
}
Ok(result)
}
// Get kv information from the leader's in_mem kv store
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
if self.is_leader() {
return self.in_memory.batch_get(keys).await;
}
// Safety: when self.is_leader() == false, election must not empty.
let election = self.election.as_ref().unwrap();
let leader_addr = election.leader().await?.0;
let channel = self
.channel_manager
.get(&leader_addr)
.context(error::CreateChannelSnafu)?;
let request = tonic::Request::new(BatchGetRequest {
keys: keys.clone(),
..Default::default()
});
let response: BatchGetResponse = ClusterClient::new(channel.clone())
.batch_get(request)
.await
.context(error::BatchGetSnafu)?
.into_inner();
check_resp_header(&response.header, Context { addr: &leader_addr })?;
Ok(response.kvs)
}
// Check if the meta node is a leader node.
// Note: when self.election is None, we also consider the meta node is leader
fn is_leader(&self) -> bool {
self.election
.as_ref()
.map(|election| election.is_leader())
.unwrap_or(true)
}
}
fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<StatKey, StatValue>> {
let mut map = HashMap::with_capacity(kvs.len());
for kv in kvs {
map.insert(kv.key.try_into()?, kv.value.try_into()?);
}
Ok(map)
}
struct Context<'a> {
addr: &'a str,
}
fn check_resp_header(header: &Option<ResponseHeader>, ctx: Context) -> Result<()> {
let header = header
.as_ref()
.context(error::ResponseHeaderNotFoundSnafu)?;
ensure!(
!header.is_not_leader(),
error::IsNotLeaderSnafu {
node_addr: ctx.addr
}
);
Ok(())
}
#[cfg(test)]
mod tests {
use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader};
use super::{check_resp_header, to_stat_kv_map, Context};
use crate::error;
use crate::handler::node_stat::Stat;
use crate::keys::{StatKey, StatValue};
#[test]
fn test_to_stat_kv_map() {
let stat_key = StatKey {
cluster_id: 0,
node_id: 100,
};
let stat = Stat {
cluster_id: 0,
id: 100,
addr: "127.0.0.1:3001".to_string(),
is_leader: true,
..Default::default()
};
let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap();
let kv = KeyValue {
key: stat_key.clone().into(),
value: stat_val,
};
let kv_map = to_stat_kv_map(vec![kv]).unwrap();
assert_eq!(1, kv_map.len());
assert!(kv_map.get(&stat_key).is_some());
let stat_val = kv_map.get(&stat_key).unwrap();
let stat = stat_val.stats.get(0).unwrap();
assert_eq!(0, stat.cluster_id);
assert_eq!(100, stat.id);
assert_eq!("127.0.0.1:3001", stat.addr);
assert!(stat.is_leader);
}
#[test]
fn test_check_resp_header() {
let header = Some(ResponseHeader {
error: None,
..Default::default()
});
let result = check_resp_header(&header, mock_ctx());
assert!(result.is_ok());
let result = check_resp_header(&None, mock_ctx());
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
error::Error::ResponseHeaderNotFound { .. }
));
let header = Some(ResponseHeader {
error: Some(Error {
code: ErrorCode::NotLeader as i32,
err_msg: "The current meta is not leader".to_string(),
}),
..Default::default()
});
let result = check_resp_header(&header, mock_ctx());
assert!(result.is_err());
assert!(matches!(
result.err().unwrap(),
error::Error::IsNotLeader { .. }
));
}
fn mock_ctx<'a>() -> Context<'a> {
Context { addr: "addr" }
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_error::prelude::*;
use tonic::codegen::http;
use tonic::{Code, Status};
#[derive(Debug, Snafu)]
@@ -165,6 +166,46 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Invalid KVs length, expected: {}, actual: {}", expected, actual))]
InvalidKvsLength {
expected: usize,
actual: usize,
backtrace: Backtrace,
},
#[snafu(display("Failed to create gRPC channel, source: {}", source))]
CreateChannel {
#[snafu(backtrace)]
source: common_grpc::error::Error,
},
#[snafu(display(
"Failed to batch get KVs from leader's in_memory kv store, source: {}",
source
))]
BatchGet {
source: tonic::Status,
backtrace: Backtrace,
},
#[snafu(display("Response header not found"))]
ResponseHeaderNotFound { backtrace: Backtrace },
#[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))]
IsNotLeader {
node_addr: String,
backtrace: Backtrace,
},
#[snafu(display("MetaSrv has no meta peer client"))]
NoMetaPeerClient { backtrace: Backtrace },
#[snafu(display("Invalid http body, source: {}", source))]
InvalidHttpBody {
source: http::Error,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -194,6 +235,12 @@ impl ErrorExt for Error {
| Error::DeserializeFromJson { .. }
| Error::DecodeTableRoute { .. }
| Error::NoLeader { .. }
| Error::CreateChannel { .. }
| Error::BatchGet { .. }
| Error::ResponseHeaderNotFound { .. }
| Error::IsNotLeader { .. }
| Error::NoMetaPeerClient { .. }
| Error::InvalidHttpBody { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::EmptyTableName { .. }
@@ -210,6 +257,7 @@ impl ErrorExt for Error {
| Error::TableRouteNotFound { .. }
| Error::NextSequence { .. }
| Error::MoveValue { .. }
| Error::InvalidKvsLength { .. }
| Error::InvalidTxnResult { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),

View File

@@ -51,7 +51,7 @@ impl HeartbeatHandler for PersistStatsHandler {
..Default::default()
};
ctx.kv_store.put(put).await?;
ctx.in_memory.put(put).await?;
Ok(())
}
@@ -112,7 +112,7 @@ mod tests {
..Default::default()
};
let res = ctx.kv_store.range(req).await.unwrap();
let res = ctx.in_memory.range(req).await.unwrap();
assert_eq!(1, res.kvs.len());

View File

@@ -174,7 +174,7 @@ impl<'a> TableRouteKey<'a> {
}
}
#[derive(Eq, PartialEq, Debug, Hash)]
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
pub struct StatKey {
pub cluster_id: u64,
pub node_id: u64,
@@ -246,7 +246,7 @@ impl TryFrom<StatValue> for Vec<u8> {
fn try_from(stats: StatValue) -> Result<Self> {
Ok(serde_json::to_string(&stats)
.context(crate::error::SerializeToJsonSnafu {
.context(error::SerializeToJsonSnafu {
input: format!("{stats:?}"),
})?
.into_bytes())

View File

@@ -14,6 +14,7 @@
#![feature(btree_drain_filter)]
pub mod bootstrap;
pub mod cluster;
pub mod election;
pub mod error;
pub mod handler;

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod builder;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@@ -19,16 +21,12 @@ use api::v1::meta::Peer;
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use crate::cluster::MetaPeerClient;
use crate::election::Election;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::handler::HeartbeatHandlerGroup;
use crate::selector::{Selector, SelectorType};
use crate::sequence::{Sequence, SequenceRef};
use crate::sequence::SequenceRef;
use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef};
use crate::service::store::memory::MemStore;
pub const TABLE_ID_SEQ: &str = "table_id";
@@ -100,50 +98,10 @@ pub struct MetaSrv {
selector: SelectorRef,
handler_group: HeartbeatHandlerGroup,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
}
impl MetaSrv {
pub async fn new(
options: MetaSrvOptions,
kv_store: KvStoreRef,
selector: Option<SelectorRef>,
election: Option<ElectionRef>,
handler_group: Option<HeartbeatHandlerGroup>,
) -> Self {
let started = Arc::new(AtomicBool::new(false));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {}));
let in_memory = Arc::new(MemStore::default());
let handler_group = match handler_group {
Some(hg) => hg,
None => {
let group = HeartbeatHandlerGroup::default();
let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone());
group.add_handler(ResponseHeaderHandler::default()).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(keep_lease_handler).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group
}
};
Self {
started,
options,
in_memory,
kv_store,
table_id_sequence,
selector,
handler_group,
election,
}
}
pub async fn start(&self) {
if self
.started
@@ -211,6 +169,11 @@ impl MetaSrv {
self.election.clone()
}
#[inline]
pub fn meta_peer_client(&self) -> Option<MetaPeerClient> {
self.meta_peer_client.clone()
}
#[inline]
pub fn new_ctx(&self) -> Context {
let datanode_lease_secs = self.options().datanode_lease_secs;

View File

@@ -0,0 +1,147 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use crate::cluster::MetaPeerClient;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler,
};
use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::sequence::Sequence;
use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef};
use crate::service::store::memory::MemStore;
// TODO(fys): try use derive_builder macro
pub struct MetaSrvBuilder {
options: Option<MetaSrvOptions>,
kv_store: Option<KvStoreRef>,
in_memory: Option<ResetableKvStoreRef>,
selector: Option<SelectorRef>,
handler_group: Option<HeartbeatHandlerGroup>,
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClient>,
}
impl MetaSrvBuilder {
pub fn new() -> Self {
Self {
kv_store: None,
in_memory: None,
selector: None,
handler_group: None,
meta_peer_client: None,
election: None,
options: None,
}
}
pub fn options(mut self, options: MetaSrvOptions) -> Self {
self.options = Some(options);
self
}
pub fn kv_store(mut self, kv_store: KvStoreRef) -> Self {
self.kv_store = Some(kv_store);
self
}
pub fn in_memory(mut self, in_memory: ResetableKvStoreRef) -> Self {
self.in_memory = Some(in_memory);
self
}
pub fn selector(mut self, selector: SelectorRef) -> Self {
self.selector = Some(selector);
self
}
pub fn heartbeat_handler(mut self, handler_group: HeartbeatHandlerGroup) -> Self {
self.handler_group = Some(handler_group);
self
}
pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClient) -> Self {
self.meta_peer_client = Some(meta_peer_client);
self
}
pub fn election(mut self, election: Option<ElectionRef>) -> Self {
self.election = election;
self
}
pub async fn build(self) -> MetaSrv {
let started = Arc::new(AtomicBool::new(false));
let MetaSrvBuilder {
election,
meta_peer_client,
options,
kv_store,
in_memory,
selector,
handler_group,
} = self;
let options = options.unwrap_or_default();
let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default()));
let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default()));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
let group = HeartbeatHandlerGroup::default();
let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone());
group.add_handler(ResponseHeaderHandler::default()).await;
// `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`,
// because even if the current meta-server node is no longer the leader it can
// still help the datanode to keep lease.
group.add_handler(keep_lease_handler).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(PersistStatsHandler::default()).await;
group
}
};
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
MetaSrv {
started,
options,
in_memory,
kv_store,
table_id_sequence,
selector,
handler_group,
election,
meta_peer_client,
}
}
}
impl Default for MetaSrvBuilder {
fn default() -> Self {
Self::new()
}
}

View File

@@ -20,7 +20,8 @@ use api::v1::meta::store_server::StoreServer;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use tower::service_fn;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrvOptions, SelectorRef};
use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::memory::MemStore;
@@ -52,7 +53,16 @@ pub async fn mock(
selector: Option<SelectorRef>,
) -> MockInfo {
let server_addr = opts.server_addr.clone();
let meta_srv = MetaSrv::new(opts, kv_store, selector, None, None).await;
let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store);
let builder = match selector {
Some(s) => builder.selector(s),
None => builder,
};
let meta_srv = builder.build().await;
let (client, server) = tokio::io::duplex(1024);
tokio::spawn(async move {
tonic::transport::Server::builder()

View File

@@ -15,15 +15,10 @@
pub mod lease_based;
pub mod load_based;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use self::lease_based::LeaseBasedSelector;
use self::load_based::LoadBasedSelector;
use crate::error;
use crate::error::Result;
use crate::metasrv::SelectorRef;
pub type Namespace = u64;
@@ -41,15 +36,6 @@ pub enum SelectorType {
LeaseBased,
}
impl From<SelectorType> for SelectorRef {
fn from(selector_type: SelectorType) -> Self {
match selector_type {
SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
}
}
}
impl Default for SelectorType {
fn default() -> Self {
SelectorType::LeaseBased
@@ -91,8 +77,8 @@ mod tests {
let selector_type = loadbased.try_into().unwrap();
assert_eq!(SelectorType::LoadBased, selector_type);
let unknow = "unknow";
let selector_type: Result<SelectorType> = unknow.try_into();
let unknown = "unknown";
let selector_type: Result<SelectorType> = unknown.try_into();
assert!(selector_type.is_err());
}
}

View File

@@ -14,17 +14,19 @@
use std::collections::HashMap;
use api::v1::meta::{Peer, RangeRequest};
use api::v1::meta::Peer;
use common_time::util as time_util;
use super::{Namespace, Selector};
use crate::cluster::MetaPeerClient;
use crate::error::Result;
use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue, DN_STAT_PREFIX};
use crate::keys::{LeaseKey, LeaseValue, StatKey};
use crate::lease;
use crate::metasrv::Context;
use crate::service::store::kv::KvStoreRef;
use crate::{lease, util};
use crate::selector::{Namespace, Selector};
pub struct LoadBasedSelector;
pub struct LoadBasedSelector {
pub meta_peer_client: MetaPeerClient,
}
#[async_trait::async_trait]
impl Selector for LoadBasedSelector {
@@ -42,18 +44,25 @@ impl Selector for LoadBasedSelector {
.into_iter()
.collect();
// get stats of datanodes
let stat_kvs = all_stat_kvs(ns, &ctx.kv_store).await?;
// get stats of alive datanodes
let stat_keys: Vec<StatKey> = lease_kvs
.keys()
.map(|k| StatKey {
cluster_id: k.cluster_id,
node_id: k.node_id,
})
.collect();
let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;
// filter out expired datanodes and nodes that cannot get region number
let mut tuples: Vec<_> = stat_kvs
.iter()
// aggregate lease and stat information
let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = stat_kvs
.into_iter()
.filter_map(|(stat_key, stat_val)| {
match (
lease_kvs.get(&to_lease_key(stat_key)),
stat_val.region_num(),
) {
(Some(lease_val), Some(region_num)) => Some((stat_key, lease_val, region_num)),
let lease_key = to_lease_key(&stat_key);
match (lease_kvs.get(&lease_key), stat_val.region_num()) {
(Some(lease_val), Some(region_num)) => {
Some((lease_key, lease_val.clone(), region_num))
}
_ => None,
}
})
@@ -66,38 +75,12 @@ impl Selector for LoadBasedSelector {
.into_iter()
.map(|(stat_key, lease_val, _)| Peer {
id: stat_key.node_id,
addr: lease_val.node_addr.clone(),
addr: lease_val.node_addr,
})
.collect())
}
}
// get all stat kvs from store
pub async fn all_stat_kvs(
cluster_id: u64,
kv_store: &KvStoreRef,
) -> Result<Vec<(StatKey, StatValue)>> {
let key = stat_prefix(cluster_id);
let range_end = util::get_prefix_end_key(&key);
let req = RangeRequest {
key,
range_end,
..Default::default()
};
let kvs = kv_store.range(req).await?.kvs;
let mut stat_kvs = Vec::with_capacity(kvs.len());
for kv in kvs {
let key: StatKey = kv.key.try_into()?;
let value: StatValue = kv.value.try_into()?;
stat_kvs.push((key, value));
}
Ok(stat_kvs)
}
fn to_lease_key(k: &StatKey) -> LeaseKey {
LeaseKey {
cluster_id: k.cluster_id,
@@ -105,36 +88,10 @@ fn to_lease_key(k: &StatKey) -> LeaseKey {
}
}
fn stat_prefix(cluster_id: u64) -> Vec<u8> {
format!("{DN_STAT_PREFIX}-{cluster_id}").into_bytes()
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::PutRequest;
use super::{all_stat_kvs, to_lease_key};
use crate::handler::node_stat::Stat;
use crate::keys::{StatKey, StatValue};
use crate::selector::load_based::stat_prefix;
use crate::service::store::kv::{KvStore, KvStoreRef};
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_all_stat_kvs() {
let kv_store = Arc::new(MemStore::new()) as Arc<dyn KvStore>;
let kvs = all_stat_kvs(0, &kv_store).await.unwrap();
assert!(kvs.is_empty());
let mut kv_store = Arc::new(MemStore::new()) as Arc<dyn KvStore>;
put_stats_to_store(&mut kv_store).await;
let kvs = all_stat_kvs(0, &kv_store).await.unwrap();
assert_eq!(2, kvs.len());
let kvs = all_stat_kvs(1, &kv_store).await.unwrap();
assert_eq!(1, kvs.len());
}
use super::to_lease_key;
use crate::keys::StatKey;
#[test]
fn test_to_lease_key() {
@@ -146,65 +103,4 @@ mod tests {
assert_eq!(1, lease_key.cluster_id);
assert_eq!(101, lease_key.node_id);
}
#[test]
fn test_stat_prefix() {
assert_eq!(stat_prefix(1), b"__meta_dnstat-1");
}
async fn put_stats_to_store(store: &mut KvStoreRef) {
let put1 = PutRequest {
key: StatKey {
cluster_id: 0,
node_id: 101,
}
.into(),
value: StatValue {
stats: vec![Stat {
region_num: Some(100),
..Default::default()
}],
}
.try_into()
.unwrap(),
..Default::default()
};
store.put(put1).await.unwrap();
let put2 = PutRequest {
key: StatKey {
cluster_id: 0,
node_id: 102,
}
.into(),
value: StatValue {
stats: vec![Stat {
region_num: Some(99),
..Default::default()
}],
}
.try_into()
.unwrap(),
..Default::default()
};
store.put(put2).await.unwrap();
let put3 = PutRequest {
key: StatKey {
cluster_id: 1,
node_id: 103,
}
.into(),
value: StatValue {
stats: vec![Stat {
region_num: Some(98),
..Default::default()
}],
}
.try_into()
.unwrap(),
..Default::default()
};
store.put(put3).await.unwrap();
}
}

View File

@@ -18,6 +18,7 @@ use futures::Stream;
use tonic::{Response, Status};
pub mod admin;
pub mod cluster;
mod heartbeat;
pub mod router;
pub mod store;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod health;
mod heartbeat;
use std::collections::HashMap;
use std::convert::Infallible;
@@ -25,9 +26,16 @@ use tonic::transport::NamedService;
use crate::metasrv::MetaSrv;
pub fn make_admin_service(_: MetaSrv) -> Admin {
pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);
let router = router.route(
"/heartbeat",
heartbeat::HeartBeatHandler {
meta_peer_client: meta_srv.meta_peer_client(),
},
);
let router = Router::nest("/admin", router);
Admin::new(router)

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tonic::codegen::http;
use crate::cluster::MetaPeerClient;
use crate::error::{self, Result};
use crate::keys::StatValue;
use crate::service::admin::HttpHandler;
pub struct HeartBeatHandler {
pub meta_peer_client: Option<MetaPeerClient>,
}
#[async_trait::async_trait]
impl HttpHandler for HeartBeatHandler {
async fn handle(&self, _: &str, _: &HashMap<String, String>) -> Result<http::Response<String>> {
let meta_peer_client = self
.meta_peer_client
.as_ref()
.context(error::NoMetaPeerClientSnafu)?;
let stat_kvs = meta_peer_client.get_all_dn_stat_kvs().await?;
let stat_vals: Vec<StatValue> = stat_kvs.into_values().collect();
let result = StatValues { stat_vals }.try_into()?;
http::Response::builder()
.status(http::StatusCode::OK)
.body(result)
.context(error::InvalidHttpBodySnafu)
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct StatValues {
pub stat_vals: Vec<StatValue>,
}
impl TryFrom<StatValues> for String {
type Error = error::Error;
fn try_from(vals: StatValues) -> Result<Self> {
serde_json::to_string(&vals).context(error::SerializeToJsonSnafu {
input: format!("{vals:?}"),
})
}
}

View File

@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{
cluster_server, BatchGetRequest, BatchGetResponse, Error, RangeRequest, RangeResponse,
ResponseHeader,
};
use tonic::{Request, Response};
use crate::metasrv::MetaSrv;
use crate::service::store::ext::KvStoreExt;
use crate::service::GrpcResult;
#[async_trait::async_trait]
impl cluster_server::Cluster for MetaSrv {
async fn batch_get(&self, req: Request<BatchGetRequest>) -> GrpcResult<BatchGetResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader());
let resp = BatchGetResponse {
header: Some(is_not_leader),
..Default::default()
};
return Ok(Response::new(resp));
}
let req = req.into_inner();
let kvs = self.in_memory().batch_get(req.keys).await?;
let success = ResponseHeader::success(0);
let get_resp = BatchGetResponse {
kvs,
header: Some(success),
};
Ok(Response::new(get_resp))
}
async fn range(&self, req: Request<RangeRequest>) -> GrpcResult<RangeResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader());
let resp = RangeResponse {
header: Some(is_not_leader),
..Default::default()
};
return Ok(Response::new(resp));
}
let req = req.into_inner();
let res = self.in_memory().range(req).await?;
Ok(Response::new(res))
}
}
impl MetaSrv {
pub fn is_leader(&self) -> bool {
self.election().map(|x| x.is_leader()).unwrap_or(false)
}
}

View File

@@ -149,14 +149,14 @@ mod tests {
use api::v1::meta::*;
use tonic::IntoRequest;
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_ask_leader() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = AskLeaderRequest {
header: Some(RequestHeader::new((1, 1))),

View File

@@ -14,8 +14,8 @@
use api::v1::meta::{
router_server, CreateRequest, DeleteRequest, Error, MoveValueRequest, Peer, PeerDict,
PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse,
Table, TableName, TableRoute, TableRouteValue,
PutRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, Table, TableName,
TableRoute, TableRouteValue,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_telemetry::warn;
@@ -27,6 +27,7 @@ use crate::error::Result;
use crate::keys::TableRouteKey;
use crate::metasrv::{Context, MetaSrv, SelectorRef};
use crate::sequence::SequenceRef;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
use crate::service::GrpcResult;
@@ -260,10 +261,12 @@ async fn get_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,
) -> Result<TableRouteValue> {
let trv = get_from_store(kv_store, key.key().into_bytes())
let trkv = kv_store
.get(key.key().into_bytes())
.await?
.context(error::TableRouteNotFoundSnafu { key: key.key() })?;
let trv: TableRouteValue = trv
let trv: TableRouteValue = trkv
.value
.as_slice()
.try_into()
.context(error::DecodeTableRouteSnafu)?;
@@ -293,10 +296,11 @@ async fn get_table_global_value(
key: &TableGlobalKey,
) -> Result<Option<TableGlobalValue>> {
let tg_key = format!("{key}").into_bytes();
let tv = get_from_store(kv_store, tg_key).await?;
match tv {
Some(tv) => {
let tv = TableGlobalValue::from_bytes(tv).context(error::InvalidCatalogValueSnafu)?;
let tkv = kv_store.get(tg_key).await?;
match tkv {
Some(tkv) => {
let tv =
TableGlobalValue::from_bytes(tkv.value).context(error::InvalidCatalogValueSnafu)?;
Ok(Some(tv))
}
None => Ok(None),
@@ -336,17 +340,3 @@ async fn put_into_store(
Ok(())
}
async fn get_from_store(kv_store: &KvStoreRef, key: Vec<u8>) -> Result<Option<Vec<u8>>> {
let req = RangeRequest {
key,
..Default::default()
};
let res = kv_store.range(req).await?;
let mut kvs = res.kvs;
if kvs.is_empty() {
Ok(None)
} else {
Ok(Some(kvs.pop().unwrap().value))
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
pub mod etcd;
pub mod ext;
pub mod kv;
pub mod memory;
@@ -85,14 +86,15 @@ mod tests {
use api::v1::meta::*;
use tonic::IntoRequest;
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::metasrv::builder::MetaSrvBuilder;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = RangeRequest::default();
let res = meta_srv.range(req.into_request()).await;
@@ -102,7 +104,9 @@ mod tests {
#[tokio::test]
async fn test_put() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = PutRequest::default();
let res = meta_srv.put(req.into_request()).await;
@@ -112,7 +116,9 @@ mod tests {
#[tokio::test]
async fn test_batch_put() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = BatchPutRequest::default();
let res = meta_srv.batch_put(req.into_request()).await;
@@ -122,7 +128,9 @@ mod tests {
#[tokio::test]
async fn test_compare_and_put() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = CompareAndPutRequest::default();
let res = meta_srv.compare_and_put(req.into_request()).await;
@@ -132,7 +140,9 @@ mod tests {
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = DeleteRangeRequest::default();
let res = meta_srv.delete_range(req.into_request()).await;
@@ -142,7 +152,9 @@ mod tests {
#[tokio::test]
async fn test_move_value() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await;
let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await;
let req = MoveValueRequest::default();
let res = meta_srv.move_value(req.into_request()).await;

View File

@@ -0,0 +1,153 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{KeyValue, RangeRequest};
use snafu::ensure;
use crate::error::{self, Result};
use crate::service::store::kv::KvStore;
#[async_trait::async_trait]
pub trait KvStoreExt {
async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>>;
async fn batch_get(&self, key: Vec<Vec<u8>>) -> Result<Vec<KeyValue>>;
}
#[async_trait::async_trait]
impl<T> KvStoreExt for T
where
T: KvStore + ?Sized,
{
async fn get(&self, key: Vec<u8>) -> Result<Option<KeyValue>> {
let req = RangeRequest {
key,
..Default::default()
};
let mut kvs = self.range(req).await?.kvs;
if kvs.is_empty() {
return Ok(None);
}
ensure!(
kvs.len() == 1,
error::InvalidKvsLengthSnafu {
expected: 1_usize,
actual: kvs.len(),
}
);
// Safety: the length check has been performed before using unwrap()
Ok(Some(kvs.pop().unwrap()))
}
async fn batch_get(&self, keys: Vec<Vec<u8>>) -> Result<Vec<KeyValue>> {
let mut kvs = Vec::with_capacity(keys.len());
for key in keys {
if let Some(kv) = self.get(key).await? {
kvs.push(kv);
}
}
Ok(kvs)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::PutRequest;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::memory::MemStore;
#[tokio::test]
async fn test_get() {
let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef;
put_stats_to_store(&mut in_mem).await;
let kv = in_mem
.get("test_key1".as_bytes().to_vec())
.await
.unwrap()
.unwrap();
assert_eq!("test_key1".as_bytes(), kv.key);
assert_eq!("test_val1".as_bytes(), kv.value);
let kv = in_mem
.get("test_key2".as_bytes().to_vec())
.await
.unwrap()
.unwrap();
assert_eq!("test_key2".as_bytes(), kv.key);
assert_eq!("test_val2".as_bytes(), kv.value);
let may_kv = in_mem.get("test_key3".as_bytes().to_vec()).await.unwrap();
assert!(may_kv.is_none());
}
#[tokio::test]
async fn test_batch_get() {
let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef;
put_stats_to_store(&mut in_mem).await;
let keys = vec![
"test_key1".as_bytes().to_vec(),
"test_key1".as_bytes().to_vec(),
"test_key2".as_bytes().to_vec(),
];
let kvs = in_mem.batch_get(keys).await.unwrap();
assert_eq!(3, kvs.len());
assert_eq!("test_key1".as_bytes(), kvs[0].key);
assert_eq!("test_key1".as_bytes(), kvs[1].key);
assert_eq!("test_key2".as_bytes(), kvs[2].key);
assert_eq!("test_val1".as_bytes(), kvs[0].value);
assert_eq!("test_val1".as_bytes(), kvs[1].value);
assert_eq!("test_val2".as_bytes(), kvs[2].value);
}
async fn put_stats_to_store(store: &mut KvStoreRef) {
store
.put(PutRequest {
key: "test_key1".as_bytes().to_vec(),
value: "test_val1".as_bytes().to_vec(),
..Default::default()
})
.await
.unwrap();
store
.put(PutRequest {
key: "test_key2".as_bytes().to_vec(),
value: "test_val2".as_bytes().to_vec(),
..Default::default()
})
.await
.unwrap();
}
}