From e19b63f4f54875bfeb080646356c1dbbf922f085 Mon Sep 17 00:00:00 2001 From: Jiachun Feng Date: Thu, 3 Nov 2022 18:33:29 +0800 Subject: [PATCH] chore: meta mock test (#379) * chore: meta mock * chore: refacor datanode selector * chore: create route mock test * chore: add mock module * chore: memory store for test * chore: mock meta for test * chore: ensure memorysotre has the same behavious with etcd * chore: replace tokio lock to parking_lot --- Cargo.lock | 7 + src/api/greptime/v1/meta/store.proto | 1 + src/meta-client/Cargo.toml | 4 + src/meta-client/examples/meta_client.rs | 2 +- src/meta-client/src/client.rs | 347 ++++++++++++++++++++++- src/meta-client/src/lib.rs | 4 +- src/meta-client/src/mocks.rs | 47 +++ src/meta-srv/Cargo.toml | 6 + src/meta-srv/src/bootstrap.rs | 2 +- src/meta-srv/src/handler.rs | 4 +- src/meta-srv/src/lib.rs | 4 + src/meta-srv/src/metasrv.rs | 53 +++- src/meta-srv/src/mocks.rs | 89 ++++++ src/meta-srv/src/selector.rs | 13 + src/meta-srv/src/selector/lease_based.rs | 39 +++ src/meta-srv/src/service/heartbeat.rs | 10 +- src/meta-srv/src/service/router.rs | 77 ++--- src/meta-srv/src/service/store.rs | 14 +- src/meta-srv/src/service/store/etcd.rs | 26 +- src/meta-srv/src/service/store/memory.rs | 219 ++++++++++++++ 20 files changed, 870 insertions(+), 98 deletions(-) create mode 100644 src/meta-client/src/mocks.rs create mode 100644 src/meta-srv/src/mocks.rs create mode 100644 src/meta-srv/src/selector.rs create mode 100644 src/meta-srv/src/selector/lease_based.rs create mode 100644 src/meta-srv/src/service/store/memory.rs diff --git a/Cargo.lock b/Cargo.lock index ba78692f7f..e37712f389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2702,15 +2702,19 @@ name = "meta-client" version = "0.1.0" dependencies = [ "api", + "async-trait", "common-error", "common-grpc", "common-telemetry", "etcd-client", + "futures", + "meta-srv", "rand 0.8.5", "snafu", "tokio", "tokio-stream", "tonic", + "tower", "tracing", "tracing-subscriber", ] @@ -2723,6 +2727,7 @@ dependencies = [ "async-trait", "common-base", "common-error", + "common-grpc", "common-runtime", "common-telemetry", "common-time", @@ -2731,6 +2736,7 @@ dependencies = [ "h2", "http-body", "lazy_static", + "parking_lot", "regex", "serde", "serde_json", @@ -2738,6 +2744,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tower", "tracing", "tracing-subscriber", "url", diff --git a/src/api/greptime/v1/meta/store.proto b/src/api/greptime/v1/meta/store.proto index c710c2928f..3931cc1af1 100644 --- a/src/api/greptime/v1/meta/store.proto +++ b/src/api/greptime/v1/meta/store.proto @@ -47,6 +47,7 @@ message RangeResponse { // kvs is the list of key-value pairs matched by the range request. repeated KeyValue kvs = 2; + // more indicates if there are more keys to return in the requested range. bool more = 3; } diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 2bb57d0111..c3f51f32be 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] api = { path = "../api" } +async-trait = "0.1" common-error = { path = "../common/error" } common-grpc = { path = "../common/grpc" } common-telemetry = { path = "../common/telemetry" } @@ -16,5 +17,8 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" [dev-dependencies] +futures = "0.3" +meta-srv = { path = "../meta-srv", features = ["mock"]} +tower = "0.4" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 302950bcdf..c2935b926c 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -74,7 +74,7 @@ async fn run() { value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], }; - let table_name = TableName::new("test_catlog", "test_schema", "test_table"); + let table_name = TableName::new("test_catalog", "test_schema", "test_table"); let create_req = CreateRequest::new(table_name) .add_partition(p1) diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index f9e697dc7d..13d46f3de8 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -301,7 +301,18 @@ impl MetaClient { #[cfg(test)] mod tests { + use std::sync::Arc; + + use api::v1::meta::HeartbeatRequest; + use api::v1::meta::Peer; + use meta_srv::metasrv::Context; + use meta_srv::selector::Namespace; + use meta_srv::selector::Selector; + use meta_srv::Result as MetaResult; + use super::*; + use crate::mocks; + use crate::rpc::Partition; use crate::rpc::TableName; #[tokio::test] @@ -385,7 +396,341 @@ mod tests { #[should_panic] #[test] - fn test_enable_at_least_one_client() { + fn test_failed_when_start_nothing() { let _ = MetaClientBuilder::new(0, 0).build(); } + + #[tokio::test] + async fn test_ask_leader() { + let client = mocks::mock_client_with_noopstore().await; + let res = client.ask_leader().await; + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_heartbeat() { + let client = mocks::mock_client_with_noopstore().await; + let (sender, mut receiver) = client.heartbeat().await.unwrap(); + // send heartbeats + tokio::spawn(async move { + for _ in 0..5 { + let req = HeartbeatRequest { + peer: Some(Peer { + id: 1, + addr: "meta_client_peer".to_string(), + }), + ..Default::default() + }; + sender.send(req).await.unwrap(); + } + }); + + tokio::spawn(async move { + while let Some(res) = receiver.message().await.unwrap() { + assert_eq!(1000, res.header.unwrap().cluster_id); + } + }); + } + + struct MockSelector; + + #[async_trait::async_trait] + impl Selector for MockSelector { + type Context = Context; + type Output = Vec; + + async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult { + Ok(vec![ + Peer { + id: 0, + addr: "peer0".to_string(), + }, + Peer { + id: 1, + addr: "peer1".to_string(), + }, + Peer { + id: 2, + addr: "peer2".to_string(), + }, + ]) + } + } + + #[tokio::test] + async fn test_create_route() { + let selector = Arc::new(MockSelector {}); + let client = mocks::mock_client_with_selector(selector).await; + let p1 = Partition { + column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], + value_list: vec![b"k1".to_vec(), b"k2".to_vec()], + }; + let p2 = Partition { + column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], + value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], + }; + let table_name = TableName::new("test_catalog", "test_schema", "test_table"); + let req = CreateRequest::new(table_name) + .add_partition(p1) + .add_partition(p2); + + let res = client.create_route(req).await.unwrap(); + + let table_routes = res.table_routes; + assert_eq!(1, table_routes.len()); + let table_route = table_routes.get(0).unwrap(); + let table = &table_route.table; + let table_name = &table.table_name; + assert_eq!("test_catalog", table_name.catalog_name); + assert_eq!("test_schema", table_name.schema_name); + assert_eq!("test_table", table_name.table_name); + + let region_routes = &table_route.region_routes; + assert_eq!(2, region_routes.len()); + let r0 = region_routes.get(0).unwrap(); + let r1 = region_routes.get(1).unwrap(); + assert_eq!(0, r0.region.as_ref().unwrap().id); + assert_eq!(1, r1.region.as_ref().unwrap().id); + assert_eq!("peer0", r0.leader_peer.as_ref().unwrap().addr); + assert_eq!("peer1", r1.leader_peer.as_ref().unwrap().addr); + } + + async fn gen_data(client: &MetaClient) { + for i in 0..10 { + let req = PutRequest::new() + .with_key(format!("{}-{}", "key", i).into_bytes()) + .with_value(format!("{}-{}", "value", i).into_bytes()) + .with_prev_kv(); + let res = client.put(req).await; + assert!(res.is_ok()); + } + } + + #[tokio::test] + async fn test_range_get() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = RangeRequest::new().with_key(b"key-0".to_vec()); + let res = client.range(req).await; + let mut kvs = res.unwrap().take_kvs(); + assert_eq!(1, kvs.len()); + let mut kv = kvs.pop().unwrap(); + assert_eq!(b"key-0".to_vec(), kv.take_key()); + assert_eq!(b"value-0".to_vec(), kv.take_value()); + } + + #[tokio::test] + async fn test_range_get_prefix() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = RangeRequest::new().with_prefix(b"key-".to_vec()); + let res = client.range(req).await; + let kvs = res.unwrap().take_kvs(); + assert_eq!(10, kvs.len()); + for (i, mut kv) in kvs.into_iter().enumerate() { + assert_eq!(format!("{}-{}", "key", i).into_bytes(), kv.take_key()); + assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value()); + } + } + + #[tokio::test] + async fn test_range() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = RangeRequest::new().with_range(b"key-5".to_vec(), b"key-8".to_vec()); + let res = client.range(req).await; + let kvs = res.unwrap().take_kvs(); + assert_eq!(3, kvs.len()); + for (i, mut kv) in kvs.into_iter().enumerate() { + assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key()); + assert_eq!( + format!("{}-{}", "value", i + 5).into_bytes(), + kv.take_value() + ); + } + } + + #[tokio::test] + async fn test_range_keys_only() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = RangeRequest::new() + .with_range(b"key-5".to_vec(), b"key-8".to_vec()) + .with_keys_only(); + let res = client.range(req).await; + let kvs = res.unwrap().take_kvs(); + assert_eq!(3, kvs.len()); + for (i, mut kv) in kvs.into_iter().enumerate() { + assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key()); + assert!(kv.take_value().is_empty()); + } + } + + #[tokio::test] + async fn test_put() { + let client = mocks::mock_client_with_memstore().await; + let req = PutRequest::new() + .with_key(b"key".to_vec()) + .with_value(b"value".to_vec()); + let res = client.put(req).await; + assert!(res.unwrap().take_prev_kv().is_none()); + } + + #[tokio::test] + async fn test_put_with_prev_kv() { + let client = mocks::mock_client_with_memstore().await; + let req = PutRequest::new() + .with_key(b"key".to_vec()) + .with_value(b"value".to_vec()) + .with_prev_kv(); + let res = client.put(req).await; + assert!(res.unwrap().take_prev_kv().is_none()); + + let req = PutRequest::new() + .with_key(b"key".to_vec()) + .with_value(b"value1".to_vec()) + .with_prev_kv(); + let res = client.put(req).await; + let mut kv = res.unwrap().take_prev_kv().unwrap(); + assert_eq!(b"key".to_vec(), kv.take_key()); + assert_eq!(b"value".to_vec(), kv.take_value()); + } + + #[tokio::test] + async fn test_batch_put() { + let client = mocks::mock_client_with_memstore().await; + let req = BatchPutRequest::new() + .add_kv(b"key".to_vec(), b"value".to_vec()) + .add_kv(b"key2".to_vec(), b"value2".to_vec()); + let res = client.batch_put(req).await; + assert_eq!(0, res.unwrap().take_prev_kvs().len()); + + let req = RangeRequest::new().with_range(b"key".to_vec(), b"key3".to_vec()); + let res = client.range(req).await; + let kvs = res.unwrap().take_kvs(); + assert_eq!(2, kvs.len()); + } + + #[tokio::test] + async fn test_batch_put_with_prev_kv() { + let client = mocks::mock_client_with_memstore().await; + let req = BatchPutRequest::new().add_kv(b"key".to_vec(), b"value".to_vec()); + let res = client.batch_put(req).await; + assert_eq!(0, res.unwrap().take_prev_kvs().len()); + + let req = BatchPutRequest::new() + .add_kv(b"key".to_vec(), b"value-".to_vec()) + .add_kv(b"key2".to_vec(), b"value2-".to_vec()) + .with_prev_kv(); + let res = client.batch_put(req).await; + let mut kvs = res.unwrap().take_prev_kvs(); + assert_eq!(1, kvs.len()); + let mut kv = kvs.pop().unwrap(); + assert_eq!(b"key".to_vec(), kv.take_key()); + assert_eq!(b"value".to_vec(), kv.take_value()); + } + + #[tokio::test] + async fn test_compare_and_put() { + let client = mocks::mock_client_with_memstore().await; + let req = CompareAndPutRequest::new() + .with_key(b"key".to_vec()) + .with_expect(b"expect".to_vec()) + .with_value(b"value".to_vec()); + let res = client.compare_and_put(req).await; + assert!(!res.unwrap().is_success()); + + // empty expect key is not allowed + let req = CompareAndPutRequest::new() + .with_key(b"key".to_vec()) + .with_value(b"value".to_vec()); + let res = client.compare_and_put(req).await; + let mut res = res.unwrap(); + assert!(!res.is_success()); + let mut kv = res.take_prev_kv().unwrap(); + assert_eq!(b"key".to_vec(), kv.take_key()); + assert!(kv.take_value().is_empty()); + + let req = PutRequest::new() + .with_key(b"key".to_vec()) + .with_value(b"value".to_vec()); + let res = client.put(req).await; + assert!(res.is_ok()); + + let req = CompareAndPutRequest::new() + .with_key(b"key".to_vec()) + .with_expect(b"value".to_vec()) + .with_value(b"value2".to_vec()); + let res = client.compare_and_put(req).await; + let mut res = res.unwrap(); + assert!(res.is_success()); + assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value()); + } + + #[tokio::test] + async fn test_delete_with_key() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = DeleteRangeRequest::new() + .with_key(b"key-0".to_vec()) + .with_prev_kv(); + let res = client.delete_range(req).await; + let mut res = res.unwrap(); + assert_eq!(1, res.deleted()); + let mut kvs = res.take_prev_kvs(); + assert_eq!(1, kvs.len()); + let mut kv = kvs.pop().unwrap(); + assert_eq!(b"value-0".to_vec(), kv.take_value()); + } + + #[tokio::test] + async fn test_delete_with_prefix() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = DeleteRangeRequest::new() + .with_prefix(b"key-".to_vec()) + .with_prev_kv(); + let res = client.delete_range(req).await; + let mut res = res.unwrap(); + assert_eq!(10, res.deleted()); + let kvs = res.take_prev_kvs(); + assert_eq!(10, kvs.len()); + for (i, mut kv) in kvs.into_iter().enumerate() { + assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value()); + } + } + + #[tokio::test] + async fn test_delete_with_range() { + let client = mocks::mock_client_with_memstore().await; + + gen_data(&client).await; + + let req = DeleteRangeRequest::new() + .with_range(b"key-2".to_vec(), b"key-7".to_vec()) + .with_prev_kv(); + let res = client.delete_range(req).await; + let mut res = res.unwrap(); + assert_eq!(5, res.deleted()); + let kvs = res.take_prev_kvs(); + assert_eq!(5, kvs.len()); + for (i, mut kv) in kvs.into_iter().enumerate() { + assert_eq!( + format!("{}-{}", "value", i + 2).into_bytes(), + kv.take_value() + ); + } + } } diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 85e7ff643a..0d07295c41 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -1,3 +1,5 @@ pub mod client; -mod error; +pub mod error; +#[cfg(test)] +mod mocks; pub mod rpc; diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs new file mode 100644 index 0000000000..7ec0f2f271 --- /dev/null +++ b/src/meta-client/src/mocks.rs @@ -0,0 +1,47 @@ +use meta_srv::metasrv::SelectorRef; +use meta_srv::mocks as server_mock; +use meta_srv::mocks::MockInfo; + +use crate::client::MetaClient; +use crate::client::MetaClientBuilder; + +pub async fn mock_client_with_noopstore() -> MetaClient { + let mock_info = server_mock::mock_with_noopstore().await; + mock_client_by(mock_info).await +} + +pub async fn mock_client_with_memstore() -> MetaClient { + let mock_info = server_mock::mock_with_memstore().await; + mock_client_by(mock_info).await +} + +#[allow(dead_code)] +pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient { + let mock_info = server_mock::mock_with_etcdstore(addr).await; + mock_client_by(mock_info).await +} + +pub async fn mock_client_with_selector(selector: SelectorRef) -> MetaClient { + let mock_info = server_mock::mock_with_selector(selector).await; + mock_client_by(mock_info).await +} + +pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient { + let MockInfo { + server_addr, + channel_manager, + } = mock_info; + + let id = (1000u64, 2000u64); + let mut meta_client = MetaClientBuilder::new(id.0, id.1) + .enable_heartbeat() + .enable_router() + .enable_store() + .channel_manager(channel_manager) + .build(); + meta_client.start(&[&server_addr]).await.unwrap(); + // required only when the heartbeat_client is enabled + meta_client.ask_leader().await.unwrap(); + + meta_client +} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index f9b8064c3a..cab35cf317 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -3,11 +3,15 @@ name = "meta-srv" version = "0.1.0" edition = "2021" +[features] +mock = [] + [dependencies] api = { path = "../api" } async-trait = "0.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-grpc = { path = "../common/grpc" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } @@ -16,6 +20,7 @@ futures = "0.3" h2 = "0.3" http-body = "0.4" lazy_static = "1.4" +parking_lot = "0.12" regex = "1.6" serde = "1.0" serde_json = "1.0" @@ -23,6 +28,7 @@ snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" +tower = "0.4" url = "2.3" [dev-dependencies] diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index e7d0cddbe3..85b9208bcf 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -22,7 +22,7 @@ pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { })?; let listener = TcpListenerStream::new(listener); - let meta_srv = MetaSrv::new(opts, kv_store).await; + let meta_srv = MetaSrv::new(opts, kv_store, None).await; tonic::transport::Server::builder() .accept_http1(true) // for admin services diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index e7439d19a1..697ea62c29 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -65,12 +65,12 @@ pub enum Instruction {} pub type Pusher = Sender>; #[derive(Clone, Default)] -pub struct HeartbeatHandlers { +pub struct HeartbeatHandlerGroup { handlers: Arc>>>, pushers: Arc>>, } -impl HeartbeatHandlers { +impl HeartbeatHandlerGroup { pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) { let mut handlers = self.handlers.write().await; handlers.push(Box::new(handler)); diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 296749906c..f67ce21989 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -1,9 +1,13 @@ +#![feature(btree_drain_filter)] pub mod bootstrap; pub mod error; pub mod handler; mod keys; pub mod lease; pub mod metasrv; +#[cfg(feature = "mock")] +pub mod mocks; +pub mod selector; pub mod service; mod util; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 206ad16ff7..c7ab3d5454 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -1,9 +1,14 @@ +use std::sync::Arc; + +use api::v1::meta::Peer; use serde::Deserialize; use serde::Serialize; use crate::handler::datanode_lease::DatanodeLeaseHandler; use crate::handler::response_header::ResponseHeaderHandler; -use crate::handler::HeartbeatHandlers; +use crate::handler::HeartbeatHandlerGroup; +use crate::selector::lease_based::LeaseBasedSelector; +use crate::selector::Selector; use crate::service::store::kv::KvStoreRef; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -25,23 +30,38 @@ impl Default for MetaSrvOptions { } } +#[derive(Clone)] +pub struct Context { + pub datanode_lease_secs: i64, + pub kv_store: KvStoreRef, +} + +pub type SelectorRef = Arc>>; + #[derive(Clone)] pub struct MetaSrv { options: MetaSrvOptions, kv_store: KvStoreRef, - heartbeat_handlers: HeartbeatHandlers, + selector: SelectorRef, + handler_group: HeartbeatHandlerGroup, } impl MetaSrv { - pub async fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self { - let heartbeat_handlers = HeartbeatHandlers::default(); - heartbeat_handlers.add_handler(ResponseHeaderHandler).await; - heartbeat_handlers.add_handler(DatanodeLeaseHandler).await; + pub async fn new( + options: MetaSrvOptions, + kv_store: KvStoreRef, + selector: Option, + ) -> Self { + let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {})); + let handler_group = HeartbeatHandlerGroup::default(); + handler_group.add_handler(ResponseHeaderHandler).await; + handler_group.add_handler(DatanodeLeaseHandler).await; Self { options, kv_store, - heartbeat_handlers, + selector, + handler_group, } } @@ -56,7 +76,22 @@ impl MetaSrv { } #[inline] - pub fn heartbeat_handlers(&self) -> HeartbeatHandlers { - self.heartbeat_handlers.clone() + pub fn selector(&self) -> SelectorRef { + self.selector.clone() + } + + #[inline] + pub fn handler_group(&self) -> HeartbeatHandlerGroup { + self.handler_group.clone() + } + + #[inline] + pub fn new_ctx(&self) -> Context { + let datanode_lease_secs = self.options().datanode_lease_secs; + let kv_store = self.kv_store(); + Context { + datanode_lease_secs, + kv_store, + } } } diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs new file mode 100644 index 0000000000..cc44d210cf --- /dev/null +++ b/src/meta-srv/src/mocks.rs @@ -0,0 +1,89 @@ +use std::sync::Arc; + +use api::v1::meta::heartbeat_server::HeartbeatServer; +use api::v1::meta::router_server::RouterServer; +use api::v1::meta::store_server::StoreServer; +use common_grpc::channel_manager::ChannelConfig; +use common_grpc::channel_manager::ChannelManager; +use tower::service_fn; + +use crate::metasrv::MetaSrv; +use crate::metasrv::MetaSrvOptions; +use crate::metasrv::SelectorRef; +use crate::service::store::etcd::EtcdStore; +use crate::service::store::kv::KvStoreRef; +use crate::service::store::memory::MemStore; +use crate::service::store::noop::NoopKvStore; + +pub struct MockInfo { + pub server_addr: String, + pub channel_manager: ChannelManager, +} + +pub async fn mock_with_noopstore() -> MockInfo { + let kv_store = Arc::new(NoopKvStore {}); + mock(Default::default(), kv_store, None).await +} + +pub async fn mock_with_memstore() -> MockInfo { + let kv_store = Arc::new(MemStore::default()); + mock(Default::default(), kv_store, None).await +} + +pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { + let kv_store = EtcdStore::with_endpoints([addr]).await.unwrap(); + mock(Default::default(), kv_store, None).await +} + +pub async fn mock_with_selector(selector: SelectorRef) -> MockInfo { + let kv_store = Arc::new(NoopKvStore {}); + mock(Default::default(), kv_store, Some(selector)).await +} + +pub async fn mock( + opts: MetaSrvOptions, + kv_store: KvStoreRef, + selector: Option, +) -> MockInfo { + let server_addr = opts.server_addr.clone(); + let meta_srv = MetaSrv::new(opts, kv_store, selector).await; + let (client, server) = tokio::io::duplex(1024); + tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(HeartbeatServer::new(meta_srv.clone())) + .add_service(RouterServer::new(meta_srv.clone())) + .add_service(StoreServer::new(meta_srv.clone())) + .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .await + }); + + let config = ChannelConfig::new(); + let channel_manager = ChannelManager::with_config(config); + + // Move client to an option so we can _move_ the inner value + // on the first attempt to connect. All other attempts will fail. + let mut client = Some(client); + let res = channel_manager.reset_with_connector( + &server_addr, + service_fn(move |_| { + let client = client.take(); + + async move { + if let Some(client) = client { + Ok(client) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Client already taken", + )) + } + } + }), + ); + assert!(res.is_ok()); + + MockInfo { + server_addr, + channel_manager, + } +} diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs new file mode 100644 index 0000000000..1b03960477 --- /dev/null +++ b/src/meta-srv/src/selector.rs @@ -0,0 +1,13 @@ +pub mod lease_based; + +use crate::error::Result; + +pub type Namespace = u64; + +#[async_trait::async_trait] +pub trait Selector: Send + Sync { + type Context; + type Output; + + async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result; +} diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs new file mode 100644 index 0000000000..2065654c81 --- /dev/null +++ b/src/meta-srv/src/selector/lease_based.rs @@ -0,0 +1,39 @@ +use api::v1::meta::Peer; +use common_time::util as time_util; + +use super::Namespace; +use super::Selector; +use crate::error::Result; +use crate::keys::LeaseKey; +use crate::keys::LeaseValue; +use crate::lease; +use crate::metasrv::Context; + +pub struct LeaseBasedSelector; + +#[async_trait::async_trait] +impl Selector for LeaseBasedSelector { + type Context = Context; + type Output = Vec; + + async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result { + // filter out the nodes out lease + let lease_filter = |_: &LeaseKey, v: &LeaseValue| { + time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs + }; + let mut lease_kvs = lease::alive_datanodes(ns, ctx.kv_store.clone(), lease_filter).await?; + // TODO(jiachun): At the moment we are just pushing the latest to the forefront, + // and it is better to use load-based strategies in the future. + lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis)); + + let peers = lease_kvs + .into_iter() + .map(|(k, v)| Peer { + id: k.node_id, + addr: v.node_addr, + }) + .collect::>(); + + Ok(peers) + } +} diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 9c971125cd..462ef69519 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -38,7 +38,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { ) -> GrpcResult { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); - let handlers = self.heartbeat_handlers(); + let handler_group = self.handler_group(); let ctx = Context { server_addr: self.options().server_addr.clone(), kv_store: self.kv_store(), @@ -56,13 +56,13 @@ impl heartbeat_server::Heartbeat for MetaSrv { peer.id, PUSHER_ID.fetch_add(1, Ordering::Relaxed) ); - handlers.register(&key, tx.clone()).await; + handler_group.register(&key, tx.clone()).await; pusher_key = Some(key); } } tx.send( - handlers + handler_group .handle(req, ctx.clone()) .await .map_err(|e| e.into()), @@ -91,7 +91,7 @@ impl heartbeat_server::Heartbeat for MetaSrv { pusher_key.as_ref().unwrap_or(&"unknow".to_string()) ); if let Some(key) = pusher_key { - let _ = handlers.unregister(&key); + let _ = handler_group.unregister(&key); } }); @@ -147,7 +147,7 @@ mod tests { #[tokio::test] async fn test_ask_leader() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = AskLeaderRequest { header: Some(RequestHeader::new((1, 1))), diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index e169e72c02..abc7e4a03d 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -1,6 +1,5 @@ use api::v1::meta::router_server; use api::v1::meta::CreateRequest; -use api::v1::meta::Peer; use api::v1::meta::Region; use api::v1::meta::RegionRoute; use api::v1::meta::ResponseHeader; @@ -8,91 +7,44 @@ use api::v1::meta::RouteRequest; use api::v1::meta::RouteResponse; use api::v1::meta::Table; use api::v1::meta::TableRoute; -use common_time::util as time_util; use snafu::OptionExt; use tonic::Request; use tonic::Response; -use super::store::kv::KvStoreRef; use super::GrpcResult; use crate::error; use crate::error::Result; -use crate::keys::LeaseKey; -use crate::keys::LeaseValue; -use crate::lease; +use crate::metasrv::Context; use crate::metasrv::MetaSrv; - -#[derive(Clone)] -struct Context { - pub datanode_lease_secs: i64, - pub kv_store: KvStoreRef, -} +use crate::metasrv::SelectorRef; #[async_trait::async_trait] impl router_server::Router for MetaSrv { async fn route(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let ctx = Context { - datanode_lease_secs: self.options().datanode_lease_secs, - kv_store: self.kv_store(), - }; - let res = handle_route(req, ctx).await?; + let res = handle_route(req).await?; Ok(Response::new(res)) } async fn create(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let ctx = Context { - datanode_lease_secs: self.options().datanode_lease_secs, - kv_store: self.kv_store(), - }; - let res = handle_create(req, ctx, LeaseBasedSelector::default()).await?; + let ctx = self.new_ctx(); + let selector = self.selector(); + let res = handle_create(req, ctx, selector).await?; Ok(Response::new(res)) } } -#[async_trait::async_trait] -trait DatanodeSelector { - async fn select(&self, id: u64, ctx: &Context) -> Result>; -} - -#[derive(Default)] -struct LeaseBasedSelector; - -#[async_trait::async_trait] -impl DatanodeSelector for LeaseBasedSelector { - async fn select(&self, id: u64, ctx: &Context) -> Result> { - // filter out the nodes out lease - let lease_filter = |_: &LeaseKey, v: &LeaseValue| { - time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs - }; - let mut lease_kvs = lease::alive_datanodes(id, ctx.kv_store.clone(), lease_filter).await?; - // TODO(jiachun): At the moment we are just pushing the latest to the forefront, - // and it is better to use load-based strategies in the future. - lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis)); - - let peers = lease_kvs - .into_iter() - .map(|(k, v)| Peer { - id: k.node_id, - addr: v.node_addr, - }) - .collect::>(); - - Ok(peers) - } -} - -async fn handle_route(_req: RouteRequest, _ctx: Context) -> Result { +async fn handle_route(_req: RouteRequest) -> Result { todo!() } async fn handle_create( req: CreateRequest, ctx: Context, - selector: impl DatanodeSelector, + selector: SelectorRef, ) -> Result { let CreateRequest { header, @@ -145,13 +97,14 @@ mod tests { use super::*; use crate::metasrv::MetaSrvOptions; + use crate::selector::{Namespace, Selector}; use crate::service::store::noop::NoopKvStore; #[should_panic] #[tokio::test] async fn test_handle_route() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = RouteRequest { header: Some(RequestHeader::new((1, 1))), @@ -180,8 +133,11 @@ mod tests { struct MockSelector; #[async_trait::async_trait] - impl DatanodeSelector for MockSelector { - async fn select(&self, _: u64, _: &Context) -> Result> { + impl Selector for MockSelector { + type Context = Context; + type Output = Vec; + + async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result { Ok(vec![ Peer { id: 0, @@ -220,7 +176,8 @@ mod tests { datanode_lease_secs: 10, kv_store, }; - let res = handle_create(req, ctx, MockSelector {}).await.unwrap(); + let selector = Arc::new(MockSelector {}); + let res = handle_create(req, ctx, selector).await.unwrap(); assert_eq!( vec![ diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index a205817880..70c56194a7 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -1,7 +1,7 @@ pub mod etcd; pub mod kv; -#[cfg(test)] -pub(crate) mod noop; +pub mod memory; +pub mod noop; use api::v1::meta::store_server; use api::v1::meta::BatchPutRequest; @@ -79,7 +79,7 @@ mod tests { #[tokio::test] async fn test_range() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = RangeRequest::default(); let res = meta_srv.range(req.into_request()).await; @@ -89,7 +89,7 @@ mod tests { #[tokio::test] async fn test_put() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = PutRequest::default(); let res = meta_srv.put(req.into_request()).await; @@ -99,7 +99,7 @@ mod tests { #[tokio::test] async fn test_batch_put() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = BatchPutRequest::default(); let res = meta_srv.batch_put(req.into_request()).await; @@ -109,7 +109,7 @@ mod tests { #[tokio::test] async fn test_compare_and_put() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = CompareAndPutRequest::default(); let res = meta_srv.compare_and_put(req.into_request()).await; @@ -119,7 +119,7 @@ mod tests { #[tokio::test] async fn test_delete_range() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await; let req = DeleteRangeRequest::default(); let res = meta_srv.delete_range(req.into_request()).await; diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index 66c359c68e..7753c01c57 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -159,29 +159,33 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; let success = txn_res.succeeded(); - let prev_kv; let op_res = txn_res .op_responses() .pop() .context(error::InvalidTxnResultSnafu { err_msg: "empty response", })?; - if success { - prev_kv = Some(KeyValue { key, value: expect }); + let prev_kv = if success { + Some(KeyValue { key, value: expect }) } else { match op_res { TxnOpResponse::Get(get_res) => { - ensure!( - get_res.count() == 1, - error::InvalidTxnResultSnafu { - err_msg: format!("expect 1 response, actual {}", get_res.count()) - } - ); - prev_kv = Some(KeyValue::from(KvPair::new(&get_res.kvs()[0]))); + if get_res.count() == 0 { + // do not exists + Some(KeyValue { key, value: vec![] }) + } else { + ensure!( + get_res.count() == 1, + error::InvalidTxnResultSnafu { + err_msg: format!("expect 1 response, actual {}", get_res.count()) + } + ); + Some(KeyValue::from(KvPair::new(&get_res.kvs()[0]))) + } } _ => unreachable!(), // never get here } - } + }; let header = Some(ResponseHeader::success(cluster_id)); Ok(CompareAndPutResponse { diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs new file mode 100644 index 0000000000..52f9b96cdc --- /dev/null +++ b/src/meta-srv/src/service/store/memory.rs @@ -0,0 +1,219 @@ +use std::cmp::Ordering; +use std::collections::BTreeMap; +use std::ops::Range; +use std::sync::Arc; + +use api::v1::meta::BatchPutRequest; +use api::v1::meta::BatchPutResponse; +use api::v1::meta::CompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse; +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::KeyValue; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; +use api::v1::meta::ResponseHeader; +use parking_lot::RwLock; + +use super::kv::KvStore; +use crate::error::Result; + +/// Only for mock test +#[derive(Clone)] +pub struct MemStore { + inner: Arc, Vec>>>, +} + +impl Default for MemStore { + fn default() -> Self { + Self::new() + } +} + +impl MemStore { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(Default::default())), + } + } +} + +#[async_trait::async_trait] +impl KvStore for MemStore { + async fn range(&self, req: RangeRequest) -> Result { + let RangeRequest { + header, + key, + range_end, + limit, + keys_only, + } = req; + + let memory = self.inner.read(); + + let mut kvs = if range_end.is_empty() { + memory.get_key_value(&key).map_or(vec![], |(k, v)| { + vec![KeyValue { + key: k.clone(), + value: if keys_only { vec![] } else { v.clone() }, + }] + }) + } else { + let range = Range { + start: key, + end: range_end, + }; + memory + .range(range) + .map(|kv| KeyValue { + key: kv.0.clone(), + value: if keys_only { vec![] } else { kv.1.clone() }, + }) + .collect::>() + }; + + let more = if limit > 0 { + kvs.truncate(limit as usize); + true + } else { + false + }; + + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(RangeResponse { header, kvs, more }) + } + + async fn put(&self, req: PutRequest) -> Result { + let PutRequest { + header, + key, + value, + prev_kv, + } = req; + + let mut memory = self.inner.write(); + let prev_value = memory.insert(key.clone(), value); + let prev_kv = if prev_kv { + prev_value.map(|value| KeyValue { key, value }) + } else { + None + }; + + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(PutResponse { header, prev_kv }) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + let BatchPutRequest { + header, + kvs, + prev_kv, + } = req; + + let mut memory = self.inner.write(); + let prev_kvs = if prev_kv { + kvs.into_iter() + .map(|kv| (kv.key.clone(), memory.insert(kv.key, kv.value))) + .filter(|(_, v)| v.is_some()) + .map(|(key, value)| KeyValue { + key, + value: value.unwrap(), + }) + .collect() + } else { + for kv in kvs.into_iter() { + memory.insert(kv.key, kv.value); + } + vec![] + }; + + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(BatchPutResponse { header, prev_kvs }) + } + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + let CompareAndPutRequest { + header, + key, + expect, + value, + } = req; + + let mut memory = self.inner.write(); + let (success, prev_kv) = if expect.is_empty() { + ( + false, + Some(KeyValue { + key: key.clone(), + value: vec![], + }), + ) + } else { + let prev_val = memory.get(&key); + let success = prev_val + .map(|v| expect.cmp(v) == Ordering::Equal) + .unwrap_or(false); + ( + success, + prev_val.map(|v| KeyValue { + key: key.clone(), + value: v.clone(), + }), + ) + }; + + if success { + memory.insert(key, value); + } + + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(CompareAndPutResponse { + header, + success, + prev_kv, + }) + } + + async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + let DeleteRangeRequest { + header, + key, + range_end, + prev_kv, + } = req; + + let mut memory = self.inner.write(); + + let prev_kvs = if range_end.is_empty() { + let prev_val = memory.remove(&key); + prev_val.map_or(vec![], |value| vec![KeyValue { key, value }]) + } else { + let range = Range { + start: key, + end: range_end, + }; + memory + .drain_filter(|key, _| range.contains(key)) + .map(|(key, value)| KeyValue { key, value }) + .collect::>() + }; + + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(DeleteRangeResponse { + header, + deleted: prev_kvs.len() as i64, + prev_kvs: if prev_kv { + prev_kvs + } else { + Default::default() + }, + }) + } +}