From 83509f31f4ad226b81b36bf31f9fe84172dfa41a Mon Sep 17 00:00:00 2001 From: fys <40801205+Fengys123@users.noreply.github.com> Date: Tue, 7 Feb 2023 17:09:28 +0800 Subject: [PATCH] feat: datanode stats is stored in the mem_kv of meta leader (#943) * store heartbeat data in memory, instead of etcd * fix: typo * fix: license header * cr --- src/api/build.rs | 1 + src/api/greptime/v1/meta/cluster.proto | 27 ++ src/meta-srv/src/bootstrap.rs | 34 ++- src/meta-srv/src/cluster.rs | 245 ++++++++++++++++++ src/meta-srv/src/error.rs | 48 ++++ .../src/handler/persist_stats_handler.rs | 4 +- src/meta-srv/src/keys.rs | 4 +- src/meta-srv/src/lib.rs | 1 + src/meta-srv/src/metasrv.rs | 59 +---- src/meta-srv/src/metasrv/builder.rs | 147 +++++++++++ src/meta-srv/src/mocks.rs | 14 +- src/meta-srv/src/selector.rs | 18 +- src/meta-srv/src/selector/load_based.rs | 160 ++---------- src/meta-srv/src/service.rs | 1 + src/meta-srv/src/service/admin.rs | 10 +- src/meta-srv/src/service/admin/heartbeat.rs | 63 +++++ src/meta-srv/src/service/cluster.rs | 70 +++++ src/meta-srv/src/service/heartbeat.rs | 6 +- src/meta-srv/src/service/router.rs | 34 +-- src/meta-srv/src/service/store.rs | 28 +- src/meta-srv/src/service/store/ext.rs | 153 +++++++++++ 21 files changed, 888 insertions(+), 239 deletions(-) create mode 100644 src/api/greptime/v1/meta/cluster.proto create mode 100644 src/meta-srv/src/cluster.rs create mode 100644 src/meta-srv/src/metasrv/builder.rs create mode 100644 src/meta-srv/src/service/admin/heartbeat.rs create mode 100644 src/meta-srv/src/service/cluster.rs create mode 100644 src/meta-srv/src/service/store/ext.rs diff --git a/src/api/build.rs b/src/api/build.rs index 913d4d1519..1844fae618 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -21,6 +21,7 @@ fn main() { "greptime/v1/meta/heartbeat.proto", "greptime/v1/meta/route.proto", "greptime/v1/meta/store.proto", + "greptime/v1/meta/cluster.proto", "prometheus/remote/remote.proto", ], &["."], diff --git a/src/api/greptime/v1/meta/cluster.proto b/src/api/greptime/v1/meta/cluster.proto new file mode 100644 index 0000000000..7bacb0b1ab --- /dev/null +++ b/src/api/greptime/v1/meta/cluster.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package greptime.v1.meta; + +import "greptime/v1/meta/common.proto"; +import "greptime/v1/meta/store.proto"; + +// Cluster service is used for communication between meta nodes. +service Cluster { + // Batch get kvs by input keys from leader's in_memory kv store. + rpc BatchGet(BatchGetRequest) returns (BatchGetResponse); + + // Range get the kvs from leader's in_memory kv store. + rpc Range(RangeRequest) returns (RangeResponse); +} + +message BatchGetRequest { + RequestHeader header = 1; + + repeated bytes keys = 2; +} + +message BatchGetResponse { + ResponseHeader header = 1; + + repeated KeyValue kvs = 2; +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 028219a9df..5ca0ac3809 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::router_server::RouterServer; use api::v1::meta::store_server::StoreServer; @@ -22,10 +23,16 @@ use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::server::Router; +use crate::cluster::MetaPeerClient; use crate::election::etcd::EtcdElection; -use crate::metasrv::{MetaSrv, MetaSrvOptions}; +use crate::metasrv::builder::MetaSrvBuilder; +use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; +use crate::selector::lease_based::LeaseBasedSelector; +use crate::selector::load_based::LoadBasedSelector; +use crate::selector::SelectorType; use crate::service::admin; use crate::service::store::etcd::EtcdStore; +use crate::service::store::kv::ResetableKvStoreRef; use crate::service::store::memory::MemStore; use crate::{error, Result}; @@ -57,6 +64,7 @@ pub fn router(meta_srv: MetaSrv) -> Router { .add_service(HeartbeatServer::new(meta_srv.clone())) .add_service(RouterServer::new(meta_srv.clone())) .add_service(StoreServer::new(meta_srv.clone())) + .add_service(ClusterServer::new(meta_srv.clone())) .add_service(admin::make_admin_service(meta_srv)) } @@ -69,8 +77,28 @@ pub async fn make_meta_srv(opts: MetaSrvOptions) -> Result { Some(EtcdElection::with_endpoints(&opts.server_addr, [&opts.store_addr]).await?), ) }; - let selector = opts.selector.clone().into(); - let meta_srv = MetaSrv::new(opts, kv_store, Some(selector), election, None).await; + + let in_memory = Arc::new(MemStore::default()) as ResetableKvStoreRef; + let meta_peer_client = MetaPeerClient::new(in_memory.clone(), election.clone()); + + let selector = match opts.selector { + SelectorType::LoadBased => Arc::new(LoadBasedSelector { + meta_peer_client: meta_peer_client.clone(), + }) as SelectorRef, + SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, + }; + + let meta_srv = MetaSrvBuilder::new() + .options(opts) + .kv_store(kv_store) + .in_memory(in_memory) + .selector(selector) + .election(election) + .meta_peer_client(meta_peer_client) + .build() + .await; + meta_srv.start().await; + Ok(meta_srv) } diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs new file mode 100644 index 0000000000..a8934d8110 --- /dev/null +++ b/src/meta-srv/src/cluster.rs @@ -0,0 +1,245 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use api::v1::meta::cluster_client::ClusterClient; +use api::v1::meta::{ + BatchGetRequest, BatchGetResponse, KeyValue, RangeRequest, RangeResponse, ResponseHeader, +}; +use common_grpc::channel_manager::ChannelManager; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::Result; +use crate::keys::{StatKey, StatValue, DN_STAT_PREFIX}; +use crate::metasrv::ElectionRef; +use crate::service::store::ext::KvStoreExt; +use crate::service::store::kv::ResetableKvStoreRef; +use crate::{error, util}; + +#[derive(Clone)] +pub struct MetaPeerClient { + election: Option, + in_memory: ResetableKvStoreRef, + channel_manager: ChannelManager, +} + +impl MetaPeerClient { + pub fn new(in_mem: ResetableKvStoreRef, election: Option) -> Self { + Self { + election, + in_memory: in_mem, + channel_manager: ChannelManager::default(), + } + } + + // Get all datanode stat kvs from leader meta. + pub async fn get_all_dn_stat_kvs(&self) -> Result> { + let stat_prefix = format!("{DN_STAT_PREFIX}-").into_bytes(); + let range_end = util::get_prefix_end_key(&stat_prefix); + let req = RangeRequest { + key: stat_prefix.clone(), + range_end, + ..Default::default() + }; + + if self.is_leader() { + let kvs = self.in_memory.range(req).await?.kvs; + return to_stat_kv_map(kvs); + } + + // Safety: when self.is_leader() == false, election must not empty. + let election = self.election.as_ref().unwrap(); + + let leader_addr = election.leader().await?.0; + + let channel = self + .channel_manager + .get(&leader_addr) + .context(error::CreateChannelSnafu)?; + + let request = tonic::Request::new(req); + + let response: RangeResponse = ClusterClient::new(channel) + .range(request) + .await + .context(error::BatchGetSnafu)? + .into_inner(); + + check_resp_header(&response.header, Context { addr: &leader_addr })?; + + to_stat_kv_map(response.kvs) + } + + // Get datanode stat kvs from leader meta by input keys. + pub async fn get_dn_stat_kvs(&self, keys: Vec) -> Result> { + let stat_keys = keys.into_iter().map(|key| key.into()).collect(); + let stat_kvs = self.batch_get(stat_keys).await?; + + let mut result = HashMap::with_capacity(stat_kvs.len()); + for stat_kv in stat_kvs { + let stat_key = stat_kv.key.try_into()?; + let stat_val = stat_kv.value.try_into()?; + result.insert(stat_key, stat_val); + } + Ok(result) + } + + // Get kv information from the leader's in_mem kv store + async fn batch_get(&self, keys: Vec>) -> Result> { + if self.is_leader() { + return self.in_memory.batch_get(keys).await; + } + + // Safety: when self.is_leader() == false, election must not empty. + let election = self.election.as_ref().unwrap(); + + let leader_addr = election.leader().await?.0; + + let channel = self + .channel_manager + .get(&leader_addr) + .context(error::CreateChannelSnafu)?; + + let request = tonic::Request::new(BatchGetRequest { + keys: keys.clone(), + ..Default::default() + }); + + let response: BatchGetResponse = ClusterClient::new(channel.clone()) + .batch_get(request) + .await + .context(error::BatchGetSnafu)? + .into_inner(); + + check_resp_header(&response.header, Context { addr: &leader_addr })?; + + Ok(response.kvs) + } + + // Check if the meta node is a leader node. + // Note: when self.election is None, we also consider the meta node is leader + fn is_leader(&self) -> bool { + self.election + .as_ref() + .map(|election| election.is_leader()) + .unwrap_or(true) + } +} + +fn to_stat_kv_map(kvs: Vec) -> Result> { + let mut map = HashMap::with_capacity(kvs.len()); + for kv in kvs { + map.insert(kv.key.try_into()?, kv.value.try_into()?); + } + Ok(map) +} + +struct Context<'a> { + addr: &'a str, +} + +fn check_resp_header(header: &Option, ctx: Context) -> Result<()> { + let header = header + .as_ref() + .context(error::ResponseHeaderNotFoundSnafu)?; + + ensure!( + !header.is_not_leader(), + error::IsNotLeaderSnafu { + node_addr: ctx.addr + } + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use api::v1::meta::{Error, ErrorCode, KeyValue, ResponseHeader}; + + use super::{check_resp_header, to_stat_kv_map, Context}; + use crate::error; + use crate::handler::node_stat::Stat; + use crate::keys::{StatKey, StatValue}; + + #[test] + fn test_to_stat_kv_map() { + let stat_key = StatKey { + cluster_id: 0, + node_id: 100, + }; + + let stat = Stat { + cluster_id: 0, + id: 100, + addr: "127.0.0.1:3001".to_string(), + is_leader: true, + ..Default::default() + }; + let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap(); + + let kv = KeyValue { + key: stat_key.clone().into(), + value: stat_val, + }; + + let kv_map = to_stat_kv_map(vec![kv]).unwrap(); + assert_eq!(1, kv_map.len()); + assert!(kv_map.get(&stat_key).is_some()); + + let stat_val = kv_map.get(&stat_key).unwrap(); + let stat = stat_val.stats.get(0).unwrap(); + + assert_eq!(0, stat.cluster_id); + assert_eq!(100, stat.id); + assert_eq!("127.0.0.1:3001", stat.addr); + assert!(stat.is_leader); + } + + #[test] + fn test_check_resp_header() { + let header = Some(ResponseHeader { + error: None, + ..Default::default() + }); + let result = check_resp_header(&header, mock_ctx()); + assert!(result.is_ok()); + + let result = check_resp_header(&None, mock_ctx()); + assert!(result.is_err()); + assert!(matches!( + result.err().unwrap(), + error::Error::ResponseHeaderNotFound { .. } + )); + + let header = Some(ResponseHeader { + error: Some(Error { + code: ErrorCode::NotLeader as i32, + err_msg: "The current meta is not leader".to_string(), + }), + ..Default::default() + }); + let result = check_resp_header(&header, mock_ctx()); + assert!(result.is_err()); + assert!(matches!( + result.err().unwrap(), + error::Error::IsNotLeader { .. } + )); + } + + fn mock_ctx<'a>() -> Context<'a> { + Context { addr: "addr" } + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9e3bfdac67..81f8d4a4c6 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_error::prelude::*; +use tonic::codegen::http; use tonic::{Code, Status}; #[derive(Debug, Snafu)] @@ -165,6 +166,46 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Invalid KVs length, expected: {}, actual: {}", expected, actual))] + InvalidKvsLength { + expected: usize, + actual: usize, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to create gRPC channel, source: {}", source))] + CreateChannel { + #[snafu(backtrace)] + source: common_grpc::error::Error, + }, + + #[snafu(display( + "Failed to batch get KVs from leader's in_memory kv store, source: {}", + source + ))] + BatchGet { + source: tonic::Status, + backtrace: Backtrace, + }, + + #[snafu(display("Response header not found"))] + ResponseHeaderNotFound { backtrace: Backtrace }, + + #[snafu(display("The requested meta node is not leader, node addr: {}", node_addr))] + IsNotLeader { + node_addr: String, + backtrace: Backtrace, + }, + + #[snafu(display("MetaSrv has no meta peer client"))] + NoMetaPeerClient { backtrace: Backtrace }, + + #[snafu(display("Invalid http body, source: {}", source))] + InvalidHttpBody { + source: http::Error, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -194,6 +235,12 @@ impl ErrorExt for Error { | Error::DeserializeFromJson { .. } | Error::DecodeTableRoute { .. } | Error::NoLeader { .. } + | Error::CreateChannel { .. } + | Error::BatchGet { .. } + | Error::ResponseHeaderNotFound { .. } + | Error::IsNotLeader { .. } + | Error::NoMetaPeerClient { .. } + | Error::InvalidHttpBody { .. } | Error::StartGrpc { .. } => StatusCode::Internal, Error::EmptyKey { .. } | Error::EmptyTableName { .. } @@ -210,6 +257,7 @@ impl ErrorExt for Error { | Error::TableRouteNotFound { .. } | Error::NextSequence { .. } | Error::MoveValue { .. } + | Error::InvalidKvsLength { .. } | Error::InvalidTxnResult { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidCatalogValue { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index c1e36ba9dc..0dff1fac1d 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -51,7 +51,7 @@ impl HeartbeatHandler for PersistStatsHandler { ..Default::default() }; - ctx.kv_store.put(put).await?; + ctx.in_memory.put(put).await?; Ok(()) } @@ -112,7 +112,7 @@ mod tests { ..Default::default() }; - let res = ctx.kv_store.range(req).await.unwrap(); + let res = ctx.in_memory.range(req).await.unwrap(); assert_eq!(1, res.kvs.len()); diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index d4b0040b20..df4cde082b 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -174,7 +174,7 @@ impl<'a> TableRouteKey<'a> { } } -#[derive(Eq, PartialEq, Debug, Hash)] +#[derive(Eq, PartialEq, Debug, Clone, Hash)] pub struct StatKey { pub cluster_id: u64, pub node_id: u64, @@ -246,7 +246,7 @@ impl TryFrom for Vec { fn try_from(stats: StatValue) -> Result { Ok(serde_json::to_string(&stats) - .context(crate::error::SerializeToJsonSnafu { + .context(error::SerializeToJsonSnafu { input: format!("{stats:?}"), })? .into_bytes()) diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 02e671de62..c0b00e446b 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -14,6 +14,7 @@ #![feature(btree_drain_filter)] pub mod bootstrap; +pub mod cluster; pub mod election; pub mod error; pub mod handler; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 8e9fcfaaca..62633cc950 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod builder; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -19,16 +21,12 @@ use api::v1::meta::Peer; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; +use crate::cluster::MetaPeerClient; use crate::election::Election; -use crate::handler::{ - CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, - OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler, -}; -use crate::selector::lease_based::LeaseBasedSelector; +use crate::handler::HeartbeatHandlerGroup; use crate::selector::{Selector, SelectorType}; -use crate::sequence::{Sequence, SequenceRef}; +use crate::sequence::SequenceRef; use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef}; -use crate::service::store::memory::MemStore; pub const TABLE_ID_SEQ: &str = "table_id"; @@ -100,50 +98,10 @@ pub struct MetaSrv { selector: SelectorRef, handler_group: HeartbeatHandlerGroup, election: Option, + meta_peer_client: Option, } impl MetaSrv { - pub async fn new( - options: MetaSrvOptions, - kv_store: KvStoreRef, - selector: Option, - election: Option, - handler_group: Option, - ) -> Self { - let started = Arc::new(AtomicBool::new(false)); - let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); - let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {})); - let in_memory = Arc::new(MemStore::default()); - let handler_group = match handler_group { - Some(hg) => hg, - None => { - let group = HeartbeatHandlerGroup::default(); - let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone()); - group.add_handler(ResponseHeaderHandler::default()).await; - // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, - // because even if the current meta-server node is no longer the leader it can - // still help the datanode to keep lease. - group.add_handler(keep_lease_handler).await; - group.add_handler(CheckLeaderHandler::default()).await; - group.add_handler(OnLeaderStartHandler::default()).await; - group.add_handler(CollectStatsHandler::default()).await; - group.add_handler(PersistStatsHandler::default()).await; - group - } - }; - - Self { - started, - options, - in_memory, - kv_store, - table_id_sequence, - selector, - handler_group, - election, - } - } - pub async fn start(&self) { if self .started @@ -211,6 +169,11 @@ impl MetaSrv { self.election.clone() } + #[inline] + pub fn meta_peer_client(&self) -> Option { + self.meta_peer_client.clone() + } + #[inline] pub fn new_ctx(&self) -> Context { let datanode_lease_secs = self.options().datanode_lease_secs; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs new file mode 100644 index 0000000000..730dc3d54a --- /dev/null +++ b/src/meta-srv/src/metasrv/builder.rs @@ -0,0 +1,147 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use crate::cluster::MetaPeerClient; +use crate::handler::{ + CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, + OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler, +}; +use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ}; +use crate::selector::lease_based::LeaseBasedSelector; +use crate::sequence::Sequence; +use crate::service::store::kv::{KvStoreRef, ResetableKvStoreRef}; +use crate::service::store::memory::MemStore; + +// TODO(fys): try use derive_builder macro +pub struct MetaSrvBuilder { + options: Option, + kv_store: Option, + in_memory: Option, + selector: Option, + handler_group: Option, + election: Option, + meta_peer_client: Option, +} + +impl MetaSrvBuilder { + pub fn new() -> Self { + Self { + kv_store: None, + in_memory: None, + selector: None, + handler_group: None, + meta_peer_client: None, + election: None, + options: None, + } + } + + pub fn options(mut self, options: MetaSrvOptions) -> Self { + self.options = Some(options); + self + } + + pub fn kv_store(mut self, kv_store: KvStoreRef) -> Self { + self.kv_store = Some(kv_store); + self + } + + pub fn in_memory(mut self, in_memory: ResetableKvStoreRef) -> Self { + self.in_memory = Some(in_memory); + self + } + + pub fn selector(mut self, selector: SelectorRef) -> Self { + self.selector = Some(selector); + self + } + + pub fn heartbeat_handler(mut self, handler_group: HeartbeatHandlerGroup) -> Self { + self.handler_group = Some(handler_group); + self + } + + pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClient) -> Self { + self.meta_peer_client = Some(meta_peer_client); + self + } + + pub fn election(mut self, election: Option) -> Self { + self.election = election; + self + } + + pub async fn build(self) -> MetaSrv { + let started = Arc::new(AtomicBool::new(false)); + + let MetaSrvBuilder { + election, + meta_peer_client, + options, + kv_store, + in_memory, + selector, + handler_group, + } = self; + + let options = options.unwrap_or_default(); + + let kv_store = kv_store.unwrap_or_else(|| Arc::new(MemStore::default())); + + let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemStore::default())); + + let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); + + let handler_group = match handler_group { + Some(handler_group) => handler_group, + None => { + let group = HeartbeatHandlerGroup::default(); + let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone()); + group.add_handler(ResponseHeaderHandler::default()).await; + // `KeepLeaseHandler` should preferably be in front of `CheckLeaderHandler`, + // because even if the current meta-server node is no longer the leader it can + // still help the datanode to keep lease. + group.add_handler(keep_lease_handler).await; + group.add_handler(CheckLeaderHandler::default()).await; + group.add_handler(OnLeaderStartHandler::default()).await; + group.add_handler(CollectStatsHandler::default()).await; + group.add_handler(PersistStatsHandler::default()).await; + group + } + }; + + let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); + + MetaSrv { + started, + options, + in_memory, + kv_store, + table_id_sequence, + selector, + handler_group, + election, + meta_peer_client, + } + } +} + +impl Default for MetaSrvBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index a59c7d608d..db38a15b35 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -20,7 +20,8 @@ use api::v1::meta::store_server::StoreServer; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use tower::service_fn; -use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; +use crate::metasrv::builder::MetaSrvBuilder; +use crate::metasrv::{MetaSrvOptions, SelectorRef}; use crate::service::store::etcd::EtcdStore; use crate::service::store::kv::KvStoreRef; use crate::service::store::memory::MemStore; @@ -52,7 +53,16 @@ pub async fn mock( selector: Option, ) -> MockInfo { let server_addr = opts.server_addr.clone(); - let meta_srv = MetaSrv::new(opts, kv_store, selector, None, None).await; + + let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store); + + let builder = match selector { + Some(s) => builder.selector(s), + None => builder, + }; + + let meta_srv = builder.build().await; + let (client, server) = tokio::io::duplex(1024); tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 065f8f95bf..952af5886d 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -15,15 +15,10 @@ pub mod lease_based; pub mod load_based; -use std::sync::Arc; - use serde::{Deserialize, Serialize}; -use self::lease_based::LeaseBasedSelector; -use self::load_based::LoadBasedSelector; use crate::error; use crate::error::Result; -use crate::metasrv::SelectorRef; pub type Namespace = u64; @@ -41,15 +36,6 @@ pub enum SelectorType { LeaseBased, } -impl From for SelectorRef { - fn from(selector_type: SelectorType) -> Self { - match selector_type { - SelectorType::LoadBased => Arc::new(LoadBasedSelector) as SelectorRef, - SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, - } - } -} - impl Default for SelectorType { fn default() -> Self { SelectorType::LeaseBased @@ -91,8 +77,8 @@ mod tests { let selector_type = loadbased.try_into().unwrap(); assert_eq!(SelectorType::LoadBased, selector_type); - let unknow = "unknow"; - let selector_type: Result = unknow.try_into(); + let unknown = "unknown"; + let selector_type: Result = unknown.try_into(); assert!(selector_type.is_err()); } } diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 15ec2754a7..9ea135ae7b 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -14,17 +14,19 @@ use std::collections::HashMap; -use api::v1::meta::{Peer, RangeRequest}; +use api::v1::meta::Peer; use common_time::util as time_util; -use super::{Namespace, Selector}; +use crate::cluster::MetaPeerClient; use crate::error::Result; -use crate::keys::{LeaseKey, LeaseValue, StatKey, StatValue, DN_STAT_PREFIX}; +use crate::keys::{LeaseKey, LeaseValue, StatKey}; +use crate::lease; use crate::metasrv::Context; -use crate::service::store::kv::KvStoreRef; -use crate::{lease, util}; +use crate::selector::{Namespace, Selector}; -pub struct LoadBasedSelector; +pub struct LoadBasedSelector { + pub meta_peer_client: MetaPeerClient, +} #[async_trait::async_trait] impl Selector for LoadBasedSelector { @@ -42,18 +44,25 @@ impl Selector for LoadBasedSelector { .into_iter() .collect(); - // get stats of datanodes - let stat_kvs = all_stat_kvs(ns, &ctx.kv_store).await?; + // get stats of alive datanodes + let stat_keys: Vec = lease_kvs + .keys() + .map(|k| StatKey { + cluster_id: k.cluster_id, + node_id: k.node_id, + }) + .collect(); + let stat_kvs = self.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; - // filter out expired datanodes and nodes that cannot get region number - let mut tuples: Vec<_> = stat_kvs - .iter() + // aggregate lease and stat information + let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = stat_kvs + .into_iter() .filter_map(|(stat_key, stat_val)| { - match ( - lease_kvs.get(&to_lease_key(stat_key)), - stat_val.region_num(), - ) { - (Some(lease_val), Some(region_num)) => Some((stat_key, lease_val, region_num)), + let lease_key = to_lease_key(&stat_key); + match (lease_kvs.get(&lease_key), stat_val.region_num()) { + (Some(lease_val), Some(region_num)) => { + Some((lease_key, lease_val.clone(), region_num)) + } _ => None, } }) @@ -66,38 +75,12 @@ impl Selector for LoadBasedSelector { .into_iter() .map(|(stat_key, lease_val, _)| Peer { id: stat_key.node_id, - addr: lease_val.node_addr.clone(), + addr: lease_val.node_addr, }) .collect()) } } -// get all stat kvs from store -pub async fn all_stat_kvs( - cluster_id: u64, - kv_store: &KvStoreRef, -) -> Result> { - let key = stat_prefix(cluster_id); - let range_end = util::get_prefix_end_key(&key); - let req = RangeRequest { - key, - range_end, - ..Default::default() - }; - - let kvs = kv_store.range(req).await?.kvs; - - let mut stat_kvs = Vec::with_capacity(kvs.len()); - - for kv in kvs { - let key: StatKey = kv.key.try_into()?; - let value: StatValue = kv.value.try_into()?; - stat_kvs.push((key, value)); - } - - Ok(stat_kvs) -} - fn to_lease_key(k: &StatKey) -> LeaseKey { LeaseKey { cluster_id: k.cluster_id, @@ -105,36 +88,10 @@ fn to_lease_key(k: &StatKey) -> LeaseKey { } } -fn stat_prefix(cluster_id: u64) -> Vec { - format!("{DN_STAT_PREFIX}-{cluster_id}").into_bytes() -} - #[cfg(test)] mod tests { - use std::sync::Arc; - - use api::v1::meta::PutRequest; - - use super::{all_stat_kvs, to_lease_key}; - use crate::handler::node_stat::Stat; - use crate::keys::{StatKey, StatValue}; - use crate::selector::load_based::stat_prefix; - use crate::service::store::kv::{KvStore, KvStoreRef}; - use crate::service::store::memory::MemStore; - - #[tokio::test] - async fn test_all_stat_kvs() { - let kv_store = Arc::new(MemStore::new()) as Arc; - let kvs = all_stat_kvs(0, &kv_store).await.unwrap(); - assert!(kvs.is_empty()); - - let mut kv_store = Arc::new(MemStore::new()) as Arc; - put_stats_to_store(&mut kv_store).await; - let kvs = all_stat_kvs(0, &kv_store).await.unwrap(); - assert_eq!(2, kvs.len()); - let kvs = all_stat_kvs(1, &kv_store).await.unwrap(); - assert_eq!(1, kvs.len()); - } + use super::to_lease_key; + use crate::keys::StatKey; #[test] fn test_to_lease_key() { @@ -146,65 +103,4 @@ mod tests { assert_eq!(1, lease_key.cluster_id); assert_eq!(101, lease_key.node_id); } - - #[test] - fn test_stat_prefix() { - assert_eq!(stat_prefix(1), b"__meta_dnstat-1"); - } - - async fn put_stats_to_store(store: &mut KvStoreRef) { - let put1 = PutRequest { - key: StatKey { - cluster_id: 0, - node_id: 101, - } - .into(), - value: StatValue { - stats: vec![Stat { - region_num: Some(100), - ..Default::default() - }], - } - .try_into() - .unwrap(), - ..Default::default() - }; - store.put(put1).await.unwrap(); - - let put2 = PutRequest { - key: StatKey { - cluster_id: 0, - node_id: 102, - } - .into(), - value: StatValue { - stats: vec![Stat { - region_num: Some(99), - ..Default::default() - }], - } - .try_into() - .unwrap(), - ..Default::default() - }; - store.put(put2).await.unwrap(); - - let put3 = PutRequest { - key: StatKey { - cluster_id: 1, - node_id: 103, - } - .into(), - value: StatValue { - stats: vec![Stat { - region_num: Some(98), - ..Default::default() - }], - } - .try_into() - .unwrap(), - ..Default::default() - }; - store.put(put3).await.unwrap(); - } } diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index ed52d3bab8..d8a5eaf4c9 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -18,6 +18,7 @@ use futures::Stream; use tonic::{Response, Status}; pub mod admin; +pub mod cluster; mod heartbeat; pub mod router; pub mod store; diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 05986b7873..5d15c173ce 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -13,6 +13,7 @@ // limitations under the License. mod health; +mod heartbeat; use std::collections::HashMap; use std::convert::Infallible; @@ -25,9 +26,16 @@ use tonic::transport::NamedService; use crate::metasrv::MetaSrv; -pub fn make_admin_service(_: MetaSrv) -> Admin { +pub fn make_admin_service(meta_srv: MetaSrv) -> Admin { let router = Router::new().route("/health", health::HealthHandler); + let router = router.route( + "/heartbeat", + heartbeat::HeartBeatHandler { + meta_peer_client: meta_srv.meta_peer_client(), + }, + ); + let router = Router::nest("/admin", router); Admin::new(router) diff --git a/src/meta-srv/src/service/admin/heartbeat.rs b/src/meta-srv/src/service/admin/heartbeat.rs new file mode 100644 index 0000000000..489c236b56 --- /dev/null +++ b/src/meta-srv/src/service/admin/heartbeat.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; +use tonic::codegen::http; + +use crate::cluster::MetaPeerClient; +use crate::error::{self, Result}; +use crate::keys::StatValue; +use crate::service::admin::HttpHandler; + +pub struct HeartBeatHandler { + pub meta_peer_client: Option, +} + +#[async_trait::async_trait] +impl HttpHandler for HeartBeatHandler { + async fn handle(&self, _: &str, _: &HashMap) -> Result> { + let meta_peer_client = self + .meta_peer_client + .as_ref() + .context(error::NoMetaPeerClientSnafu)?; + + let stat_kvs = meta_peer_client.get_all_dn_stat_kvs().await?; + let stat_vals: Vec = stat_kvs.into_values().collect(); + let result = StatValues { stat_vals }.try_into()?; + + http::Response::builder() + .status(http::StatusCode::OK) + .body(result) + .context(error::InvalidHttpBodySnafu) + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct StatValues { + pub stat_vals: Vec, +} + +impl TryFrom for String { + type Error = error::Error; + + fn try_from(vals: StatValues) -> Result { + serde_json::to_string(&vals).context(error::SerializeToJsonSnafu { + input: format!("{vals:?}"), + }) + } +} diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs new file mode 100644 index 0000000000..02e378738b --- /dev/null +++ b/src/meta-srv/src/service/cluster.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{ + cluster_server, BatchGetRequest, BatchGetResponse, Error, RangeRequest, RangeResponse, + ResponseHeader, +}; +use tonic::{Request, Response}; + +use crate::metasrv::MetaSrv; +use crate::service::store::ext::KvStoreExt; +use crate::service::GrpcResult; + +#[async_trait::async_trait] +impl cluster_server::Cluster for MetaSrv { + async fn batch_get(&self, req: Request) -> GrpcResult { + if !self.is_leader() { + let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader()); + let resp = BatchGetResponse { + header: Some(is_not_leader), + ..Default::default() + }; + return Ok(Response::new(resp)); + } + + let req = req.into_inner(); + let kvs = self.in_memory().batch_get(req.keys).await?; + let success = ResponseHeader::success(0); + + let get_resp = BatchGetResponse { + kvs, + header: Some(success), + }; + + Ok(Response::new(get_resp)) + } + + async fn range(&self, req: Request) -> GrpcResult { + if !self.is_leader() { + let is_not_leader = ResponseHeader::failed(0, Error::is_not_leader()); + let resp = RangeResponse { + header: Some(is_not_leader), + ..Default::default() + }; + return Ok(Response::new(resp)); + } + + let req = req.into_inner(); + let res = self.in_memory().range(req).await?; + + Ok(Response::new(res)) + } +} + +impl MetaSrv { + pub fn is_leader(&self) -> bool { + self.election().map(|x| x.is_leader()).unwrap_or(false) + } +} diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 25216e1546..83cf5e26ea 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -149,14 +149,14 @@ mod tests { use api::v1::meta::*; use tonic::IntoRequest; - use super::*; - use crate::metasrv::MetaSrvOptions; + use crate::metasrv::builder::MetaSrvBuilder; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_ask_leader() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; let req = AskLeaderRequest { header: Some(RequestHeader::new((1, 1))), diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index ffe102d294..07fa2a7e96 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -14,8 +14,8 @@ use api::v1::meta::{ router_server, CreateRequest, DeleteRequest, Error, MoveValueRequest, Peer, PeerDict, - PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, - Table, TableName, TableRoute, TableRouteValue, + PutRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, Table, TableName, + TableRoute, TableRouteValue, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_telemetry::warn; @@ -27,6 +27,7 @@ use crate::error::Result; use crate::keys::TableRouteKey; use crate::metasrv::{Context, MetaSrv, SelectorRef}; use crate::sequence::SequenceRef; +use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::KvStoreRef; use crate::service::GrpcResult; @@ -260,10 +261,12 @@ async fn get_table_route_value( kv_store: &KvStoreRef, key: &TableRouteKey<'_>, ) -> Result { - let trv = get_from_store(kv_store, key.key().into_bytes()) + let trkv = kv_store + .get(key.key().into_bytes()) .await? .context(error::TableRouteNotFoundSnafu { key: key.key() })?; - let trv: TableRouteValue = trv + let trv: TableRouteValue = trkv + .value .as_slice() .try_into() .context(error::DecodeTableRouteSnafu)?; @@ -293,10 +296,11 @@ async fn get_table_global_value( key: &TableGlobalKey, ) -> Result> { let tg_key = format!("{key}").into_bytes(); - let tv = get_from_store(kv_store, tg_key).await?; - match tv { - Some(tv) => { - let tv = TableGlobalValue::from_bytes(tv).context(error::InvalidCatalogValueSnafu)?; + let tkv = kv_store.get(tg_key).await?; + match tkv { + Some(tkv) => { + let tv = + TableGlobalValue::from_bytes(tkv.value).context(error::InvalidCatalogValueSnafu)?; Ok(Some(tv)) } None => Ok(None), @@ -336,17 +340,3 @@ async fn put_into_store( Ok(()) } - -async fn get_from_store(kv_store: &KvStoreRef, key: Vec) -> Result>> { - let req = RangeRequest { - key, - ..Default::default() - }; - let res = kv_store.range(req).await?; - let mut kvs = res.kvs; - if kvs.is_empty() { - Ok(None) - } else { - Ok(Some(kvs.pop().unwrap().value)) - } -} diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 4f0f355c6c..a972c1ff59 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod etcd; +pub mod ext; pub mod kv; pub mod memory; @@ -85,14 +86,15 @@ mod tests { use api::v1::meta::*; use tonic::IntoRequest; - use super::*; - use crate::metasrv::MetaSrvOptions; + use crate::metasrv::builder::MetaSrvBuilder; use crate::service::store::memory::MemStore; #[tokio::test] async fn test_range() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + let req = RangeRequest::default(); let res = meta_srv.range(req.into_request()).await; @@ -102,7 +104,9 @@ mod tests { #[tokio::test] async fn test_put() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + let req = PutRequest::default(); let res = meta_srv.put(req.into_request()).await; @@ -112,7 +116,9 @@ mod tests { #[tokio::test] async fn test_batch_put() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + let req = BatchPutRequest::default(); let res = meta_srv.batch_put(req.into_request()).await; @@ -122,7 +128,9 @@ mod tests { #[tokio::test] async fn test_compare_and_put() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + let req = CompareAndPutRequest::default(); let res = meta_srv.compare_and_put(req.into_request()).await; @@ -132,7 +140,9 @@ mod tests { #[tokio::test] async fn test_delete_range() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + let req = DeleteRangeRequest::default(); let res = meta_srv.delete_range(req.into_request()).await; @@ -142,7 +152,9 @@ mod tests { #[tokio::test] async fn test_move_value() { let kv_store = Arc::new(MemStore::new()); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None, None).await; + + let meta_srv = MetaSrvBuilder::new().kv_store(kv_store).build().await; + let req = MoveValueRequest::default(); let res = meta_srv.move_value(req.into_request()).await; diff --git a/src/meta-srv/src/service/store/ext.rs b/src/meta-srv/src/service/store/ext.rs new file mode 100644 index 0000000000..ba0cce2a46 --- /dev/null +++ b/src/meta-srv/src/service/store/ext.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{KeyValue, RangeRequest}; +use snafu::ensure; + +use crate::error::{self, Result}; +use crate::service::store::kv::KvStore; + +#[async_trait::async_trait] +pub trait KvStoreExt { + async fn get(&self, key: Vec) -> Result>; + + async fn batch_get(&self, key: Vec>) -> Result>; +} + +#[async_trait::async_trait] +impl KvStoreExt for T +where + T: KvStore + ?Sized, +{ + async fn get(&self, key: Vec) -> Result> { + let req = RangeRequest { + key, + ..Default::default() + }; + + let mut kvs = self.range(req).await?.kvs; + + if kvs.is_empty() { + return Ok(None); + } + + ensure!( + kvs.len() == 1, + error::InvalidKvsLengthSnafu { + expected: 1_usize, + actual: kvs.len(), + } + ); + + // Safety: the length check has been performed before using unwrap() + Ok(Some(kvs.pop().unwrap())) + } + + async fn batch_get(&self, keys: Vec>) -> Result> { + let mut kvs = Vec::with_capacity(keys.len()); + + for key in keys { + if let Some(kv) = self.get(key).await? { + kvs.push(kv); + } + } + + Ok(kvs) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::PutRequest; + + use crate::service::store::ext::KvStoreExt; + use crate::service::store::kv::KvStoreRef; + use crate::service::store::memory::MemStore; + + #[tokio::test] + async fn test_get() { + let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef; + + put_stats_to_store(&mut in_mem).await; + + let kv = in_mem + .get("test_key1".as_bytes().to_vec()) + .await + .unwrap() + .unwrap(); + + assert_eq!("test_key1".as_bytes(), kv.key); + assert_eq!("test_val1".as_bytes(), kv.value); + + let kv = in_mem + .get("test_key2".as_bytes().to_vec()) + .await + .unwrap() + .unwrap(); + + assert_eq!("test_key2".as_bytes(), kv.key); + assert_eq!("test_val2".as_bytes(), kv.value); + + let may_kv = in_mem.get("test_key3".as_bytes().to_vec()).await.unwrap(); + + assert!(may_kv.is_none()); + } + + #[tokio::test] + async fn test_batch_get() { + let mut in_mem = Arc::new(MemStore::new()) as KvStoreRef; + + put_stats_to_store(&mut in_mem).await; + + let keys = vec![ + "test_key1".as_bytes().to_vec(), + "test_key1".as_bytes().to_vec(), + "test_key2".as_bytes().to_vec(), + ]; + + let kvs = in_mem.batch_get(keys).await.unwrap(); + + assert_eq!(3, kvs.len()); + + assert_eq!("test_key1".as_bytes(), kvs[0].key); + assert_eq!("test_key1".as_bytes(), kvs[1].key); + assert_eq!("test_key2".as_bytes(), kvs[2].key); + + assert_eq!("test_val1".as_bytes(), kvs[0].value); + assert_eq!("test_val1".as_bytes(), kvs[1].value); + assert_eq!("test_val2".as_bytes(), kvs[2].value); + } + + async fn put_stats_to_store(store: &mut KvStoreRef) { + store + .put(PutRequest { + key: "test_key1".as_bytes().to_vec(), + value: "test_val1".as_bytes().to_vec(), + ..Default::default() + }) + .await + .unwrap(); + + store + .put(PutRequest { + key: "test_key2".as_bytes().to_vec(), + value: "test_val2".as_bytes().to_vec(), + ..Default::default() + }) + .await + .unwrap(); + } +}