chore: meta mock test (#379)

* chore: meta mock

* chore: refacor datanode selector

* chore: create route mock test

* chore: add mock module

* chore: memory store for test

* chore: mock meta for test

* chore: ensure memorysotre has the same behavious with etcd

* chore: replace tokio lock to parking_lot
This commit is contained in:
Jiachun Feng
2022-11-03 18:33:29 +08:00
committed by GitHub
parent 750310c648
commit e19b63f4f5
20 changed files with 870 additions and 98 deletions

7
Cargo.lock generated
View File

@@ -2702,15 +2702,19 @@ name = "meta-client"
version = "0.1.0"
dependencies = [
"api",
"async-trait",
"common-error",
"common-grpc",
"common-telemetry",
"etcd-client",
"futures",
"meta-srv",
"rand 0.8.5",
"snafu",
"tokio",
"tokio-stream",
"tonic",
"tower",
"tracing",
"tracing-subscriber",
]
@@ -2723,6 +2727,7 @@ dependencies = [
"async-trait",
"common-base",
"common-error",
"common-grpc",
"common-runtime",
"common-telemetry",
"common-time",
@@ -2731,6 +2736,7 @@ dependencies = [
"h2",
"http-body",
"lazy_static",
"parking_lot",
"regex",
"serde",
"serde_json",
@@ -2738,6 +2744,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tonic",
"tower",
"tracing",
"tracing-subscriber",
"url",

View File

@@ -47,6 +47,7 @@ message RangeResponse {
// kvs is the list of key-value pairs matched by the range request.
repeated KeyValue kvs = 2;
// more indicates if there are more keys to return in the requested range.
bool more = 3;
}

View File

@@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-telemetry = { path = "../common/telemetry" }
@@ -16,5 +17,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
[dev-dependencies]
futures = "0.3"
meta-srv = { path = "../meta-srv", features = ["mock"]}
tower = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@@ -74,7 +74,7 @@ async fn run() {
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catlog", "test_schema", "test_table");
let table_name = TableName::new("test_catalog", "test_schema", "test_table");
let create_req = CreateRequest::new(table_name)
.add_partition(p1)

View File

@@ -301,7 +301,18 @@ impl MetaClient {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::HeartbeatRequest;
use api::v1::meta::Peer;
use meta_srv::metasrv::Context;
use meta_srv::selector::Namespace;
use meta_srv::selector::Selector;
use meta_srv::Result as MetaResult;
use super::*;
use crate::mocks;
use crate::rpc::Partition;
use crate::rpc::TableName;
#[tokio::test]
@@ -385,7 +396,341 @@ mod tests {
#[should_panic]
#[test]
fn test_enable_at_least_one_client() {
fn test_failed_when_start_nothing() {
let _ = MetaClientBuilder::new(0, 0).build();
}
#[tokio::test]
async fn test_ask_leader() {
let client = mocks::mock_client_with_noopstore().await;
let res = client.ask_leader().await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_heartbeat() {
let client = mocks::mock_client_with_noopstore().await;
let (sender, mut receiver) = client.heartbeat().await.unwrap();
// send heartbeats
tokio::spawn(async move {
for _ in 0..5 {
let req = HeartbeatRequest {
peer: Some(Peer {
id: 1,
addr: "meta_client_peer".to_string(),
}),
..Default::default()
};
sender.send(req).await.unwrap();
}
});
tokio::spawn(async move {
while let Some(res) = receiver.message().await.unwrap() {
assert_eq!(1000, res.header.unwrap().cluster_id);
}
});
}
struct MockSelector;
#[async_trait::async_trait]
impl Selector for MockSelector {
type Context = Context;
type Output = Vec<Peer>;
async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> MetaResult<Self::Output> {
Ok(vec![
Peer {
id: 0,
addr: "peer0".to_string(),
},
Peer {
id: 1,
addr: "peer1".to_string(),
},
Peer {
id: 2,
addr: "peer2".to_string(),
},
])
}
}
#[tokio::test]
async fn test_create_route() {
let selector = Arc::new(MockSelector {});
let client = mocks::mock_client_with_selector(selector).await;
let p1 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"k1".to_vec(), b"k2".to_vec()],
};
let p2 = Partition {
column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()],
value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()],
};
let table_name = TableName::new("test_catalog", "test_schema", "test_table");
let req = CreateRequest::new(table_name)
.add_partition(p1)
.add_partition(p2);
let res = client.create_route(req).await.unwrap();
let table_routes = res.table_routes;
assert_eq!(1, table_routes.len());
let table_route = table_routes.get(0).unwrap();
let table = &table_route.table;
let table_name = &table.table_name;
assert_eq!("test_catalog", table_name.catalog_name);
assert_eq!("test_schema", table_name.schema_name);
assert_eq!("test_table", table_name.table_name);
let region_routes = &table_route.region_routes;
assert_eq!(2, region_routes.len());
let r0 = region_routes.get(0).unwrap();
let r1 = region_routes.get(1).unwrap();
assert_eq!(0, r0.region.as_ref().unwrap().id);
assert_eq!(1, r1.region.as_ref().unwrap().id);
assert_eq!("peer0", r0.leader_peer.as_ref().unwrap().addr);
assert_eq!("peer1", r1.leader_peer.as_ref().unwrap().addr);
}
async fn gen_data(client: &MetaClient) {
for i in 0..10 {
let req = PutRequest::new()
.with_key(format!("{}-{}", "key", i).into_bytes())
.with_value(format!("{}-{}", "value", i).into_bytes())
.with_prev_kv();
let res = client.put(req).await;
assert!(res.is_ok());
}
}
#[tokio::test]
async fn test_range_get() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = RangeRequest::new().with_key(b"key-0".to_vec());
let res = client.range(req).await;
let mut kvs = res.unwrap().take_kvs();
assert_eq!(1, kvs.len());
let mut kv = kvs.pop().unwrap();
assert_eq!(b"key-0".to_vec(), kv.take_key());
assert_eq!(b"value-0".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_range_get_prefix() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = RangeRequest::new().with_prefix(b"key-".to_vec());
let res = client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(10, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "key", i).into_bytes(), kv.take_key());
assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
}
}
#[tokio::test]
async fn test_range() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = RangeRequest::new().with_range(b"key-5".to_vec(), b"key-8".to_vec());
let res = client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(3, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key());
assert_eq!(
format!("{}-{}", "value", i + 5).into_bytes(),
kv.take_value()
);
}
}
#[tokio::test]
async fn test_range_keys_only() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = RangeRequest::new()
.with_range(b"key-5".to_vec(), b"key-8".to_vec())
.with_keys_only();
let res = client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(3, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key());
assert!(kv.take_value().is_empty());
}
}
#[tokio::test]
async fn test_put() {
let client = mocks::mock_client_with_memstore().await;
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value".to_vec());
let res = client.put(req).await;
assert!(res.unwrap().take_prev_kv().is_none());
}
#[tokio::test]
async fn test_put_with_prev_kv() {
let client = mocks::mock_client_with_memstore().await;
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value".to_vec())
.with_prev_kv();
let res = client.put(req).await;
assert!(res.unwrap().take_prev_kv().is_none());
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value1".to_vec())
.with_prev_kv();
let res = client.put(req).await;
let mut kv = res.unwrap().take_prev_kv().unwrap();
assert_eq!(b"key".to_vec(), kv.take_key());
assert_eq!(b"value".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_batch_put() {
let client = mocks::mock_client_with_memstore().await;
let req = BatchPutRequest::new()
.add_kv(b"key".to_vec(), b"value".to_vec())
.add_kv(b"key2".to_vec(), b"value2".to_vec());
let res = client.batch_put(req).await;
assert_eq!(0, res.unwrap().take_prev_kvs().len());
let req = RangeRequest::new().with_range(b"key".to_vec(), b"key3".to_vec());
let res = client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(2, kvs.len());
}
#[tokio::test]
async fn test_batch_put_with_prev_kv() {
let client = mocks::mock_client_with_memstore().await;
let req = BatchPutRequest::new().add_kv(b"key".to_vec(), b"value".to_vec());
let res = client.batch_put(req).await;
assert_eq!(0, res.unwrap().take_prev_kvs().len());
let req = BatchPutRequest::new()
.add_kv(b"key".to_vec(), b"value-".to_vec())
.add_kv(b"key2".to_vec(), b"value2-".to_vec())
.with_prev_kv();
let res = client.batch_put(req).await;
let mut kvs = res.unwrap().take_prev_kvs();
assert_eq!(1, kvs.len());
let mut kv = kvs.pop().unwrap();
assert_eq!(b"key".to_vec(), kv.take_key());
assert_eq!(b"value".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_compare_and_put() {
let client = mocks::mock_client_with_memstore().await;
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_expect(b"expect".to_vec())
.with_value(b"value".to_vec());
let res = client.compare_and_put(req).await;
assert!(!res.unwrap().is_success());
// empty expect key is not allowed
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value".to_vec());
let res = client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(!res.is_success());
let mut kv = res.take_prev_kv().unwrap();
assert_eq!(b"key".to_vec(), kv.take_key());
assert!(kv.take_value().is_empty());
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_value(b"value".to_vec());
let res = client.put(req).await;
assert!(res.is_ok());
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_expect(b"value".to_vec())
.with_value(b"value2".to_vec());
let res = client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(res.is_success());
assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
}
#[tokio::test]
async fn test_delete_with_key() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = DeleteRangeRequest::new()
.with_key(b"key-0".to_vec())
.with_prev_kv();
let res = client.delete_range(req).await;
let mut res = res.unwrap();
assert_eq!(1, res.deleted());
let mut kvs = res.take_prev_kvs();
assert_eq!(1, kvs.len());
let mut kv = kvs.pop().unwrap();
assert_eq!(b"value-0".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_delete_with_prefix() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = DeleteRangeRequest::new()
.with_prefix(b"key-".to_vec())
.with_prev_kv();
let res = client.delete_range(req).await;
let mut res = res.unwrap();
assert_eq!(10, res.deleted());
let kvs = res.take_prev_kvs();
assert_eq!(10, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
}
}
#[tokio::test]
async fn test_delete_with_range() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let req = DeleteRangeRequest::new()
.with_range(b"key-2".to_vec(), b"key-7".to_vec())
.with_prev_kv();
let res = client.delete_range(req).await;
let mut res = res.unwrap();
assert_eq!(5, res.deleted());
let kvs = res.take_prev_kvs();
assert_eq!(5, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(
format!("{}-{}", "value", i + 2).into_bytes(),
kv.take_value()
);
}
}
}

View File

@@ -1,3 +1,5 @@
pub mod client;
mod error;
pub mod error;
#[cfg(test)]
mod mocks;
pub mod rpc;

View File

@@ -0,0 +1,47 @@
use meta_srv::metasrv::SelectorRef;
use meta_srv::mocks as server_mock;
use meta_srv::mocks::MockInfo;
use crate::client::MetaClient;
use crate::client::MetaClientBuilder;
pub async fn mock_client_with_noopstore() -> MetaClient {
let mock_info = server_mock::mock_with_noopstore().await;
mock_client_by(mock_info).await
}
pub async fn mock_client_with_memstore() -> MetaClient {
let mock_info = server_mock::mock_with_memstore().await;
mock_client_by(mock_info).await
}
#[allow(dead_code)]
pub async fn mock_client_with_etcdstore(addr: &str) -> MetaClient {
let mock_info = server_mock::mock_with_etcdstore(addr).await;
mock_client_by(mock_info).await
}
pub async fn mock_client_with_selector(selector: SelectorRef) -> MetaClient {
let mock_info = server_mock::mock_with_selector(selector).await;
mock_client_by(mock_info).await
}
pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient {
let MockInfo {
server_addr,
channel_manager,
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, id.1)
.enable_heartbeat()
.enable_router()
.enable_store()
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();
// required only when the heartbeat_client is enabled
meta_client.ask_leader().await.unwrap();
meta_client
}

View File

@@ -3,11 +3,15 @@ name = "meta-srv"
version = "0.1.0"
edition = "2021"
[features]
mock = []
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-grpc = { path = "../common/grpc" }
common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
@@ -16,6 +20,7 @@ futures = "0.3"
h2 = "0.3"
http-body = "0.4"
lazy_static = "1.4"
parking_lot = "0.12"
regex = "1.6"
serde = "1.0"
serde_json = "1.0"
@@ -23,6 +28,7 @@ snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
tower = "0.4"
url = "2.3"
[dev-dependencies]

View File

@@ -22,7 +22,7 @@ pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> {
})?;
let listener = TcpListenerStream::new(listener);
let meta_srv = MetaSrv::new(opts, kv_store).await;
let meta_srv = MetaSrv::new(opts, kv_store, None).await;
tonic::transport::Server::builder()
.accept_http1(true) // for admin services

View File

@@ -65,12 +65,12 @@ pub enum Instruction {}
pub type Pusher = Sender<std::result::Result<HeartbeatResponse, tonic::Status>>;
#[derive(Clone, Default)]
pub struct HeartbeatHandlers {
pub struct HeartbeatHandlerGroup {
handlers: Arc<RwLock<Vec<Box<dyn HeartbeatHandler>>>>,
pushers: Arc<RwLock<BTreeMap<String, Pusher>>>,
}
impl HeartbeatHandlers {
impl HeartbeatHandlerGroup {
pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) {
let mut handlers = self.handlers.write().await;
handlers.push(Box::new(handler));

View File

@@ -1,9 +1,13 @@
#![feature(btree_drain_filter)]
pub mod bootstrap;
pub mod error;
pub mod handler;
mod keys;
pub mod lease;
pub mod metasrv;
#[cfg(feature = "mock")]
pub mod mocks;
pub mod selector;
pub mod service;
mod util;

View File

@@ -1,9 +1,14 @@
use std::sync::Arc;
use api::v1::meta::Peer;
use serde::Deserialize;
use serde::Serialize;
use crate::handler::datanode_lease::DatanodeLeaseHandler;
use crate::handler::response_header::ResponseHeaderHandler;
use crate::handler::HeartbeatHandlers;
use crate::handler::HeartbeatHandlerGroup;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::Selector;
use crate::service::store::kv::KvStoreRef;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
@@ -25,23 +30,38 @@ impl Default for MetaSrvOptions {
}
}
#[derive(Clone)]
pub struct Context {
pub datanode_lease_secs: i64,
pub kv_store: KvStoreRef,
}
pub type SelectorRef = Arc<dyn Selector<Context = Context, Output = Vec<Peer>>>;
#[derive(Clone)]
pub struct MetaSrv {
options: MetaSrvOptions,
kv_store: KvStoreRef,
heartbeat_handlers: HeartbeatHandlers,
selector: SelectorRef,
handler_group: HeartbeatHandlerGroup,
}
impl MetaSrv {
pub async fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self {
let heartbeat_handlers = HeartbeatHandlers::default();
heartbeat_handlers.add_handler(ResponseHeaderHandler).await;
heartbeat_handlers.add_handler(DatanodeLeaseHandler).await;
pub async fn new(
options: MetaSrvOptions,
kv_store: KvStoreRef,
selector: Option<SelectorRef>,
) -> Self {
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {}));
let handler_group = HeartbeatHandlerGroup::default();
handler_group.add_handler(ResponseHeaderHandler).await;
handler_group.add_handler(DatanodeLeaseHandler).await;
Self {
options,
kv_store,
heartbeat_handlers,
selector,
handler_group,
}
}
@@ -56,7 +76,22 @@ impl MetaSrv {
}
#[inline]
pub fn heartbeat_handlers(&self) -> HeartbeatHandlers {
self.heartbeat_handlers.clone()
pub fn selector(&self) -> SelectorRef {
self.selector.clone()
}
#[inline]
pub fn handler_group(&self) -> HeartbeatHandlerGroup {
self.handler_group.clone()
}
#[inline]
pub fn new_ctx(&self) -> Context {
let datanode_lease_secs = self.options().datanode_lease_secs;
let kv_store = self.kv_store();
Context {
datanode_lease_secs,
kv_store,
}
}
}

89
src/meta-srv/src/mocks.rs Normal file
View File

@@ -0,0 +1,89 @@
use std::sync::Arc;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use common_grpc::channel_manager::ChannelConfig;
use common_grpc::channel_manager::ChannelManager;
use tower::service_fn;
use crate::metasrv::MetaSrv;
use crate::metasrv::MetaSrvOptions;
use crate::metasrv::SelectorRef;
use crate::service::store::etcd::EtcdStore;
use crate::service::store::kv::KvStoreRef;
use crate::service::store::memory::MemStore;
use crate::service::store::noop::NoopKvStore;
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
}
pub async fn mock_with_noopstore() -> MockInfo {
let kv_store = Arc::new(NoopKvStore {});
mock(Default::default(), kv_store, None).await
}
pub async fn mock_with_memstore() -> MockInfo {
let kv_store = Arc::new(MemStore::default());
mock(Default::default(), kv_store, None).await
}
pub async fn mock_with_etcdstore(addr: &str) -> MockInfo {
let kv_store = EtcdStore::with_endpoints([addr]).await.unwrap();
mock(Default::default(), kv_store, None).await
}
pub async fn mock_with_selector(selector: SelectorRef) -> MockInfo {
let kv_store = Arc::new(NoopKvStore {});
mock(Default::default(), kv_store, Some(selector)).await
}
pub async fn mock(
opts: MetaSrvOptions,
kv_store: KvStoreRef,
selector: Option<SelectorRef>,
) -> MockInfo {
let server_addr = opts.server_addr.clone();
let meta_srv = MetaSrv::new(opts, kv_store, selector).await;
let (client, server) = tokio::io::duplex(1024);
tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::new(meta_srv.clone()))
.add_service(RouterServer::new(meta_srv.clone()))
.add_service(StoreServer::new(meta_srv.clone()))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});
let config = ChannelConfig::new();
let channel_manager = ChannelManager::with_config(config);
// Move client to an option so we can _move_ the inner value
// on the first attempt to connect. All other attempts will fail.
let mut client = Some(client);
let res = channel_manager.reset_with_connector(
&server_addr,
service_fn(move |_| {
let client = client.take();
async move {
if let Some(client) = client {
Ok(client)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Client already taken",
))
}
}
}),
);
assert!(res.is_ok());
MockInfo {
server_addr,
channel_manager,
}
}

View File

@@ -0,0 +1,13 @@
pub mod lease_based;
use crate::error::Result;
pub type Namespace = u64;
#[async_trait::async_trait]
pub trait Selector: Send + Sync {
type Context;
type Output;
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output>;
}

View File

@@ -0,0 +1,39 @@
use api::v1::meta::Peer;
use common_time::util as time_util;
use super::Namespace;
use super::Selector;
use crate::error::Result;
use crate::keys::LeaseKey;
use crate::keys::LeaseValue;
use crate::lease;
use crate::metasrv::Context;
pub struct LeaseBasedSelector;
#[async_trait::async_trait]
impl Selector for LeaseBasedSelector {
type Context = Context;
type Output = Vec<Peer>;
async fn select(&self, ns: Namespace, ctx: &Self::Context) -> Result<Self::Output> {
// filter out the nodes out lease
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs
};
let mut lease_kvs = lease::alive_datanodes(ns, ctx.kv_store.clone(), lease_filter).await?;
// TODO(jiachun): At the moment we are just pushing the latest to the forefront,
// and it is better to use load-based strategies in the future.
lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis));
let peers = lease_kvs
.into_iter()
.map(|(k, v)| Peer {
id: k.node_id,
addr: v.node_addr,
})
.collect::<Vec<_>>();
Ok(peers)
}
}

View File

@@ -38,7 +38,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
) -> GrpcResult<Self::HeartbeatStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handlers = self.heartbeat_handlers();
let handler_group = self.handler_group();
let ctx = Context {
server_addr: self.options().server_addr.clone(),
kv_store: self.kv_store(),
@@ -56,13 +56,13 @@ impl heartbeat_server::Heartbeat for MetaSrv {
peer.id,
PUSHER_ID.fetch_add(1, Ordering::Relaxed)
);
handlers.register(&key, tx.clone()).await;
handler_group.register(&key, tx.clone()).await;
pusher_key = Some(key);
}
}
tx.send(
handlers
handler_group
.handle(req, ctx.clone())
.await
.map_err(|e| e.into()),
@@ -91,7 +91,7 @@ impl heartbeat_server::Heartbeat for MetaSrv {
pusher_key.as_ref().unwrap_or(&"unknow".to_string())
);
if let Some(key) = pusher_key {
let _ = handlers.unregister(&key);
let _ = handler_group.unregister(&key);
}
});
@@ -147,7 +147,7 @@ mod tests {
#[tokio::test]
async fn test_ask_leader() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = AskLeaderRequest {
header: Some(RequestHeader::new((1, 1))),

View File

@@ -1,6 +1,5 @@
use api::v1::meta::router_server;
use api::v1::meta::CreateRequest;
use api::v1::meta::Peer;
use api::v1::meta::Region;
use api::v1::meta::RegionRoute;
use api::v1::meta::ResponseHeader;
@@ -8,91 +7,44 @@ use api::v1::meta::RouteRequest;
use api::v1::meta::RouteResponse;
use api::v1::meta::Table;
use api::v1::meta::TableRoute;
use common_time::util as time_util;
use snafu::OptionExt;
use tonic::Request;
use tonic::Response;
use super::store::kv::KvStoreRef;
use super::GrpcResult;
use crate::error;
use crate::error::Result;
use crate::keys::LeaseKey;
use crate::keys::LeaseValue;
use crate::lease;
use crate::metasrv::Context;
use crate::metasrv::MetaSrv;
#[derive(Clone)]
struct Context {
pub datanode_lease_secs: i64,
pub kv_store: KvStoreRef,
}
use crate::metasrv::SelectorRef;
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = Context {
datanode_lease_secs: self.options().datanode_lease_secs,
kv_store: self.kv_store(),
};
let res = handle_route(req, ctx).await?;
let res = handle_route(req).await?;
Ok(Response::new(res))
}
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = Context {
datanode_lease_secs: self.options().datanode_lease_secs,
kv_store: self.kv_store(),
};
let res = handle_create(req, ctx, LeaseBasedSelector::default()).await?;
let ctx = self.new_ctx();
let selector = self.selector();
let res = handle_create(req, ctx, selector).await?;
Ok(Response::new(res))
}
}
#[async_trait::async_trait]
trait DatanodeSelector {
async fn select(&self, id: u64, ctx: &Context) -> Result<Vec<Peer>>;
}
#[derive(Default)]
struct LeaseBasedSelector;
#[async_trait::async_trait]
impl DatanodeSelector for LeaseBasedSelector {
async fn select(&self, id: u64, ctx: &Context) -> Result<Vec<Peer>> {
// filter out the nodes out lease
let lease_filter = |_: &LeaseKey, v: &LeaseValue| {
time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs
};
let mut lease_kvs = lease::alive_datanodes(id, ctx.kv_store.clone(), lease_filter).await?;
// TODO(jiachun): At the moment we are just pushing the latest to the forefront,
// and it is better to use load-based strategies in the future.
lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis));
let peers = lease_kvs
.into_iter()
.map(|(k, v)| Peer {
id: k.node_id,
addr: v.node_addr,
})
.collect::<Vec<_>>();
Ok(peers)
}
}
async fn handle_route(_req: RouteRequest, _ctx: Context) -> Result<RouteResponse> {
async fn handle_route(_req: RouteRequest) -> Result<RouteResponse> {
todo!()
}
async fn handle_create(
req: CreateRequest,
ctx: Context,
selector: impl DatanodeSelector,
selector: SelectorRef,
) -> Result<RouteResponse> {
let CreateRequest {
header,
@@ -145,13 +97,14 @@ mod tests {
use super::*;
use crate::metasrv::MetaSrvOptions;
use crate::selector::{Namespace, Selector};
use crate::service::store::noop::NoopKvStore;
#[should_panic]
#[tokio::test]
async fn test_handle_route() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = RouteRequest {
header: Some(RequestHeader::new((1, 1))),
@@ -180,8 +133,11 @@ mod tests {
struct MockSelector;
#[async_trait::async_trait]
impl DatanodeSelector for MockSelector {
async fn select(&self, _: u64, _: &Context) -> Result<Vec<Peer>> {
impl Selector for MockSelector {
type Context = Context;
type Output = Vec<Peer>;
async fn select(&self, _ns: Namespace, _ctx: &Self::Context) -> Result<Self::Output> {
Ok(vec![
Peer {
id: 0,
@@ -220,7 +176,8 @@ mod tests {
datanode_lease_secs: 10,
kv_store,
};
let res = handle_create(req, ctx, MockSelector {}).await.unwrap();
let selector = Arc::new(MockSelector {});
let res = handle_create(req, ctx, selector).await.unwrap();
assert_eq!(
vec![

View File

@@ -1,7 +1,7 @@
pub mod etcd;
pub mod kv;
#[cfg(test)]
pub(crate) mod noop;
pub mod memory;
pub mod noop;
use api::v1::meta::store_server;
use api::v1::meta::BatchPutRequest;
@@ -79,7 +79,7 @@ mod tests {
#[tokio::test]
async fn test_range() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = RangeRequest::default();
let res = meta_srv.range(req.into_request()).await;
@@ -89,7 +89,7 @@ mod tests {
#[tokio::test]
async fn test_put() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = PutRequest::default();
let res = meta_srv.put(req.into_request()).await;
@@ -99,7 +99,7 @@ mod tests {
#[tokio::test]
async fn test_batch_put() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = BatchPutRequest::default();
let res = meta_srv.batch_put(req.into_request()).await;
@@ -109,7 +109,7 @@ mod tests {
#[tokio::test]
async fn test_compare_and_put() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = CompareAndPutRequest::default();
let res = meta_srv.compare_and_put(req.into_request()).await;
@@ -119,7 +119,7 @@ mod tests {
#[tokio::test]
async fn test_delete_range() {
let kv_store = Arc::new(NoopKvStore {});
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await;
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None).await;
let req = DeleteRangeRequest::default();
let res = meta_srv.delete_range(req.into_request()).await;

View File

@@ -159,29 +159,33 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let success = txn_res.succeeded();
let prev_kv;
let op_res = txn_res
.op_responses()
.pop()
.context(error::InvalidTxnResultSnafu {
err_msg: "empty response",
})?;
if success {
prev_kv = Some(KeyValue { key, value: expect });
let prev_kv = if success {
Some(KeyValue { key, value: expect })
} else {
match op_res {
TxnOpResponse::Get(get_res) => {
ensure!(
get_res.count() == 1,
error::InvalidTxnResultSnafu {
err_msg: format!("expect 1 response, actual {}", get_res.count())
}
);
prev_kv = Some(KeyValue::from(KvPair::new(&get_res.kvs()[0])));
if get_res.count() == 0 {
// do not exists
Some(KeyValue { key, value: vec![] })
} else {
ensure!(
get_res.count() == 1,
error::InvalidTxnResultSnafu {
err_msg: format!("expect 1 response, actual {}", get_res.count())
}
);
Some(KeyValue::from(KvPair::new(&get_res.kvs()[0])))
}
}
_ => unreachable!(), // never get here
}
}
};
let header = Some(ResponseHeader::success(cluster_id));
Ok(CompareAndPutResponse {

View File

@@ -0,0 +1,219 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::ops::Range;
use std::sync::Arc;
use api::v1::meta::BatchPutRequest;
use api::v1::meta::BatchPutResponse;
use api::v1::meta::CompareAndPutRequest;
use api::v1::meta::CompareAndPutResponse;
use api::v1::meta::DeleteRangeRequest;
use api::v1::meta::DeleteRangeResponse;
use api::v1::meta::KeyValue;
use api::v1::meta::PutRequest;
use api::v1::meta::PutResponse;
use api::v1::meta::RangeRequest;
use api::v1::meta::RangeResponse;
use api::v1::meta::ResponseHeader;
use parking_lot::RwLock;
use super::kv::KvStore;
use crate::error::Result;
/// Only for mock test
#[derive(Clone)]
pub struct MemStore {
inner: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
}
impl Default for MemStore {
fn default() -> Self {
Self::new()
}
}
impl MemStore {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(Default::default())),
}
}
}
#[async_trait::async_trait]
impl KvStore for MemStore {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let RangeRequest {
header,
key,
range_end,
limit,
keys_only,
} = req;
let memory = self.inner.read();
let mut kvs = if range_end.is_empty() {
memory.get_key_value(&key).map_or(vec![], |(k, v)| {
vec![KeyValue {
key: k.clone(),
value: if keys_only { vec![] } else { v.clone() },
}]
})
} else {
let range = Range {
start: key,
end: range_end,
};
memory
.range(range)
.map(|kv| KeyValue {
key: kv.0.clone(),
value: if keys_only { vec![] } else { kv.1.clone() },
})
.collect::<Vec<_>>()
};
let more = if limit > 0 {
kvs.truncate(limit as usize);
true
} else {
false
};
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(RangeResponse { header, kvs, more })
}
async fn put(&self, req: PutRequest) -> Result<PutResponse> {
let PutRequest {
header,
key,
value,
prev_kv,
} = req;
let mut memory = self.inner.write();
let prev_value = memory.insert(key.clone(), value);
let prev_kv = if prev_kv {
prev_value.map(|value| KeyValue { key, value })
} else {
None
};
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(PutResponse { header, prev_kv })
}
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
let BatchPutRequest {
header,
kvs,
prev_kv,
} = req;
let mut memory = self.inner.write();
let prev_kvs = if prev_kv {
kvs.into_iter()
.map(|kv| (kv.key.clone(), memory.insert(kv.key, kv.value)))
.filter(|(_, v)| v.is_some())
.map(|(key, value)| KeyValue {
key,
value: value.unwrap(),
})
.collect()
} else {
for kv in kvs.into_iter() {
memory.insert(kv.key, kv.value);
}
vec![]
};
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(BatchPutResponse { header, prev_kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
let CompareAndPutRequest {
header,
key,
expect,
value,
} = req;
let mut memory = self.inner.write();
let (success, prev_kv) = if expect.is_empty() {
(
false,
Some(KeyValue {
key: key.clone(),
value: vec![],
}),
)
} else {
let prev_val = memory.get(&key);
let success = prev_val
.map(|v| expect.cmp(v) == Ordering::Equal)
.unwrap_or(false);
(
success,
prev_val.map(|v| KeyValue {
key: key.clone(),
value: v.clone(),
}),
)
};
if success {
memory.insert(key, value);
}
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(CompareAndPutResponse {
header,
success,
prev_kv,
})
}
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let DeleteRangeRequest {
header,
key,
range_end,
prev_kv,
} = req;
let mut memory = self.inner.write();
let prev_kvs = if range_end.is_empty() {
let prev_val = memory.remove(&key);
prev_val.map_or(vec![], |value| vec![KeyValue { key, value }])
} else {
let range = Range {
start: key,
end: range_end,
};
memory
.drain_filter(|key, _| range.contains(key))
.map(|(key, value)| KeyValue { key, value })
.collect::<Vec<_>>()
};
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(DeleteRangeResponse {
header,
deleted: prev_kvs.len() as i64,
prev_kvs: if prev_kv {
prev_kvs
} else {
Default::default()
},
})
}
}