diff --git a/Cargo.lock b/Cargo.lock index 7873434f40..8469f5696a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2639,6 +2639,7 @@ dependencies = [ "rand 0.8.5", "snafu", "tokio", + "tokio-stream", "tonic", "tracing", "tracing-subscriber", @@ -2652,9 +2653,11 @@ dependencies = [ "async-trait", "common-base", "common-error", + "common-runtime", "common-telemetry", "etcd-client", "futures", + "h2", "http-body", "serde", "snafu", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 1fdf4bc8cf..2d3da3f29f 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -1,2 +1,3 @@ +bind_addr = '127.0.0.1:3002' server_addr = '0.0.0.0:3002' store_addr = '127.0.0.1:2380' diff --git a/src/api/greptime/v1/meta/common.proto b/src/api/greptime/v1/meta/common.proto index 3d3259b7f1..8d3cff5d22 100644 --- a/src/api/greptime/v1/meta/common.proto +++ b/src/api/greptime/v1/meta/common.proto @@ -31,6 +31,12 @@ message Endpoint { string addr = 1; } +message TableName { + string catalog_name = 1; + string schema_name = 2; + string table_name = 3; +} + message TimeInterval { // The unix timestamp in millis of the start of this period. uint64 start_timestamp_millis = 1; diff --git a/src/api/greptime/v1/meta/heartbeat.proto b/src/api/greptime/v1/meta/heartbeat.proto index c8c48363f7..b5c96c2b67 100644 --- a/src/api/greptime/v1/meta/heartbeat.proto +++ b/src/api/greptime/v1/meta/heartbeat.proto @@ -18,10 +18,10 @@ service Heartbeat { message HeartbeatRequest { RequestHeader header = 1; + // Self peer + Peer peer = 2; // Leader node - bool is_leader = 2; - // Leader Peer - Endpoint leader_endpoint = 3; + bool is_leader = 3; // Actually reported time interval TimeInterval report_interval = 4; // Node stat @@ -48,15 +48,25 @@ message NodeStat { double read_io_rate = 7; // Write disk I/O in the node double write_io_rate = 8; + + // Others + map attrs = 100; } message RegionStat { - string table_name = 1; - uint64 region_id = 2; + uint64 region_id = 1; + TableName table_name = 2; // The read capacity units during this period uint64 rcus = 3; // The write capacity units during this period uint64 wcus = 4; + // Approximate region size + uint64 approximate_size = 5; + // Approximate number of rows + uint64 approximate_rows = 6; + + // Others + map attrs = 100; } message ReplicaStat { diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto index 448b74619c..07ac11cbc0 100644 --- a/src/api/greptime/v1/meta/route.proto +++ b/src/api/greptime/v1/meta/route.proto @@ -24,7 +24,7 @@ service Router { // rpc Route(RouteRequest) returns (RouteResponse) {} - rpc Create(CreateRequest) returns (CreateResponse) {} + rpc Create(CreateRequest) returns (RouteResponse) {} } message RouteRequest { @@ -44,13 +44,7 @@ message CreateRequest { RequestHeader header = 1; TableName table_name = 2; - repeated Region regions = 3; -} - -message CreateResponse { - ResponseHeader header = 1; - - repeated Region regions = 2; + repeated Partition partitions = 3; } message TableRoute { @@ -66,12 +60,6 @@ message RegionRoute { repeated uint64 follower_peer_indexes = 3; } -message TableName { - string catalog_name = 1; - string schema_name = 2; - string table_name = 3; -} - message Table { TableName table_name = 1; bytes table_schema = 2; @@ -80,14 +68,13 @@ message Table { message Region { uint64 id = 1; string name = 2; - Peer peer = 3; - - // PARTITION `region_name` VALUES LESS THAN (value_list) - message Partition { - repeated bytes column_list = 1; - repeated bytes value_list = 2; - } - Partition partition = 4; - - map attrs = 5; + Partition partition = 3; + + map attrs = 100; +} + +// PARTITION `region_name` VALUES LESS THAN (value_list) +message Partition { + repeated bytes column_list = 1; + repeated bytes value_list = 2; } diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs index 16fbc506cb..78e64905c6 100644 --- a/src/api/src/v1/meta.rs +++ b/src/api/src/v1/meta.rs @@ -20,8 +20,16 @@ impl From<&str> for Endpoint { } impl RequestHeader { - pub fn new(cluster_id: u64, member_id: u64) -> RequestHeader { - RequestHeader { + pub fn new(cluster_id: u64, member_id: u64) -> Self { + Self { + protocol_version: PROTOCOL_VERSION, + cluster_id, + member_id, + } + } + + pub fn with_id((cluster_id, member_id): (u64, u64)) -> Self { + Self { protocol_version: PROTOCOL_VERSION, cluster_id, member_id, @@ -83,14 +91,14 @@ impl CreateRequest { } } - pub fn add_region(mut self, region: Region) -> Self { - self.regions.push(region); + pub fn add_partition(mut self, partition: Partition) -> Self { + self.partitions.push(partition); self } } impl Region { - pub fn new(id: u64, name: impl Into, partition: region::Partition) -> Self { + pub fn new(id: u64, name: impl Into, partition: Partition) -> Self { Self { id, name: name.into(), @@ -105,7 +113,7 @@ impl Region { } } -impl region::Partition { +impl Partition { pub fn new() -> Self { Default::default() } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index f070b5d37e..28c3630cf3 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -36,6 +36,8 @@ impl SubCommand { #[derive(Debug, Parser)] struct StartCommand { + #[clap(long)] + bind_addr: Option, #[clap(long)] server_addr: Option, #[clap(long)] @@ -68,6 +70,9 @@ impl TryFrom for MetaSrvOptions { MetaSrvOptions::default() }; + if let Some(addr) = cmd.bind_addr { + opts.bind_addr = addr; + } if let Some(addr) = cmd.server_addr { opts.server_addr = addr; } @@ -86,11 +91,13 @@ mod tests { #[test] fn test_read_from_cmd() { let cmd = StartCommand { + bind_addr: Some("127.0.0.1:3002".to_string()), server_addr: Some("0.0.0.0:3002".to_string()), store_addr: Some("127.0.0.1:2380".to_string()), config_file: None, }; let options: MetaSrvOptions = cmd.try_into().unwrap(); + assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); } @@ -98,6 +105,7 @@ mod tests { #[test] fn test_read_from_config_file() { let cmd = StartCommand { + bind_addr: None, server_addr: None, store_addr: None, config_file: Some(format!( @@ -106,6 +114,7 @@ mod tests { )), }; let options: MetaSrvOptions = cmd.try_into().unwrap(); + assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); } diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 25545c5475..b247ca57cf 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -14,10 +14,95 @@ const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; #[derive(Clone, Debug)] pub struct ChannelManager { - config: Option, + config: ChannelConfig, pool: Arc>, } +impl Default for ChannelManager { + fn default() -> Self { + ChannelManager::with_config(ChannelConfig::default()) + } +} + +impl ChannelManager { + pub fn new() -> Self { + Default::default() + } + + pub fn with_config(config: ChannelConfig) -> Self { + let pool = Pool { + channels: HashMap::default(), + }; + let pool = Arc::new(Mutex::new(pool)); + let cloned_pool = pool.clone(); + + common_runtime::spawn_bg(async move { + recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; + }); + + Self { pool, config } + } + + pub fn config(&self) -> &ChannelConfig { + &self.config + } + + pub fn get(&self, addr: impl AsRef) -> Result { + let addr = addr.as_ref(); + let mut pool = self.pool.lock().unwrap(); + if let Some(ch) = pool.get_mut(addr) { + ch.access += 1; + return Ok(ch.channel.clone()); + } + + let mut endpoint = + Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?; + + if let Some(dur) = self.config.timeout { + endpoint = endpoint.timeout(dur); + } + if let Some(dur) = self.config.connect_timeout { + endpoint = endpoint.connect_timeout(dur); + } + if let Some(limit) = self.config.concurrency_limit { + endpoint = endpoint.concurrency_limit(limit); + } + if let Some((limit, dur)) = self.config.rate_limit { + endpoint = endpoint.rate_limit(limit, dur); + } + if let Some(size) = self.config.initial_stream_window_size { + endpoint = endpoint.initial_stream_window_size(size); + } + if let Some(size) = self.config.initial_connection_window_size { + endpoint = endpoint.initial_connection_window_size(size); + } + if let Some(dur) = self.config.http2_keep_alive_interval { + endpoint = endpoint.http2_keep_alive_interval(dur); + } + if let Some(dur) = self.config.http2_keep_alive_timeout { + endpoint = endpoint.keep_alive_timeout(dur); + } + if let Some(enabled) = self.config.http2_keep_alive_while_idle { + endpoint = endpoint.keep_alive_while_idle(enabled); + } + if let Some(enabled) = self.config.http2_adaptive_window { + endpoint = endpoint.http2_adaptive_window(enabled); + } + endpoint = endpoint + .tcp_keepalive(self.config.tcp_keepalive) + .tcp_nodelay(self.config.tcp_nodelay); + + let inner_channel = endpoint.connect_lazy(); + let channel = Channel { + channel: inner_channel.clone(), + access: 1, + }; + pool.put(addr, channel); + + Ok(inner_channel) + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct ChannelConfig { pub timeout: Option, @@ -172,85 +257,6 @@ struct Pool { channels: HashMap, } -#[derive(Debug)] -struct Channel { - channel: InnerChannel, - access: usize, -} - -impl ChannelManager { - pub fn new() -> Self { - Default::default() - } - - pub fn with_config(config: ChannelConfig) -> Self { - let mut manager = ChannelManager::new(); - manager.config = Some(config); - manager - } - - pub fn config(&self) -> Option { - self.config.clone() - } - - pub fn get(&self, addr: impl AsRef) -> Result { - let addr = addr.as_ref(); - let mut pool = self.pool.lock().unwrap(); - if let Some(ch) = pool.get_mut(addr) { - ch.access += 1; - return Ok(ch.channel.clone()); - } - - let mut endpoint = - Endpoint::new(format!("http://{}", addr)).context(error::CreateChannelSnafu)?; - - if let Some(cfg) = &self.config { - if let Some(dur) = cfg.timeout { - endpoint = endpoint.timeout(dur); - } - if let Some(dur) = cfg.connect_timeout { - endpoint = endpoint.connect_timeout(dur); - } - if let Some(limit) = cfg.concurrency_limit { - endpoint = endpoint.concurrency_limit(limit); - } - if let Some((limit, dur)) = cfg.rate_limit { - endpoint = endpoint.rate_limit(limit, dur); - } - if let Some(size) = cfg.initial_stream_window_size { - endpoint = endpoint.initial_stream_window_size(size); - } - if let Some(size) = cfg.initial_connection_window_size { - endpoint = endpoint.initial_connection_window_size(size); - } - if let Some(dur) = cfg.http2_keep_alive_interval { - endpoint = endpoint.http2_keep_alive_interval(dur); - } - if let Some(dur) = cfg.http2_keep_alive_timeout { - endpoint = endpoint.keep_alive_timeout(dur); - } - if let Some(enabled) = cfg.http2_keep_alive_while_idle { - endpoint = endpoint.keep_alive_while_idle(enabled); - } - if let Some(enabled) = cfg.http2_adaptive_window { - endpoint = endpoint.http2_adaptive_window(enabled); - } - endpoint = endpoint - .tcp_keepalive(cfg.tcp_keepalive) - .tcp_nodelay(cfg.tcp_nodelay); - } - - let inner_channel = endpoint.connect_lazy(); - let channel = Channel { - channel: inner_channel.clone(), - access: 1, - }; - pool.put(addr, channel); - - Ok(inner_channel) - } -} - impl Pool { #[inline] fn get_mut(&mut self, addr: &str) -> Option<&mut Channel> { @@ -271,20 +277,10 @@ impl Pool { } } -impl Default for ChannelManager { - fn default() -> Self { - let pool = Pool { - channels: HashMap::default(), - }; - let pool = Arc::new(Mutex::new(pool)); - let cloned_pool = pool.clone(); - - common_runtime::spawn_bg(async move { - recycle_channel_in_loop(cloned_pool, RECYCLE_CHANNEL_INTERVAL_SECS).await; - }); - - Self { pool, config: None } - } +#[derive(Debug)] +struct Channel { + channel: InnerChannel, + access: usize, } async fn recycle_channel_in_loop(pool: Arc>, interval_secs: u64) { @@ -315,7 +311,10 @@ mod tests { channels: HashMap::default(), }; let pool = Arc::new(Mutex::new(pool)); - let mgr = ChannelManager { pool, config: None }; + let mgr = ChannelManager { + pool, + ..Default::default() + }; let addr = "http://test"; let _ = mgr.get(addr).unwrap(); @@ -340,10 +339,7 @@ mod tests { .http2_adaptive_window(true) .tcp_keepalive(Duration::from_secs(1)) .tcp_nodelay(true); - let mgr = ChannelManager { - pool, - config: Some(config), - }; + let mgr = ChannelManager { pool, config }; let addr = "test_uri"; for i in 0..10 { diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 13cd828f61..415834f871 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -13,6 +13,7 @@ etcd-client = "0.10" rand = "0.8" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.18", features = ["full"] } +tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" [dev-dependencies] diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 1012f6dc03..0d0f4866c6 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -1,11 +1,11 @@ use std::time::Duration; -use api::v1::meta::region::Partition; use api::v1::meta::CreateRequest; use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::HeartbeatRequest; +use api::v1::meta::Partition; use api::v1::meta::PutRequest; use api::v1::meta::RangeRequest; -use api::v1::meta::Region; use api::v1::meta::RequestHeader; use api::v1::meta::TableName; use common_grpc::channel_manager::ChannelConfig; @@ -23,22 +23,37 @@ fn main() { #[tokio::main] async fn run() { + let id = (1000u64, 2000u64); let config = ChannelConfig::new() .timeout(Duration::from_secs(3)) .connect_timeout(Duration::from_secs(5)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config); - let mut meta_client = MetaClientBuilder::new() - .heartbeat_client(true) - .router_client(true) - .store_client(true) + let mut meta_client = MetaClientBuilder::new(id.0, id.1) + .enable_heartbeat() + .enable_router() + .enable_store() .channel_manager(channel_manager) .build(); meta_client.start(&["127.0.0.1:3002"]).await.unwrap(); // required only when the heartbeat_client is enabled meta_client.ask_leader().await.unwrap(); - let header = RequestHeader::new(0, 0); + let (sender, mut receiver) = meta_client.heartbeat().await.unwrap(); + + // send heartbeats + tokio::spawn(async move { + for _ in 0..5 { + let req = HeartbeatRequest::new(RequestHeader::with_id(id)); + sender.send(req).await.unwrap(); + } + }); + + while let Some(res) = receiver.message().await.unwrap() { + event!(Level::INFO, "heartbeat response: {:#?}", res); + } + + let header = RequestHeader::with_id(id); let p1 = Partition::new() .column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()]) @@ -51,8 +66,8 @@ async fn run() { let table_name = TableName::new("test_catlog", "test_schema", "test_table"); let create_req = CreateRequest::new(header, table_name) - .add_region(Region::new(0, "test_region1", p1)) - .add_region(Region::new(1, "test_region2", p2)); + .add_partition(p1) + .add_partition(p2); let res = meta_client.create_route(create_req).await.unwrap(); event!(Level::INFO, "create_route result: {:#?}", res); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index f096093b82..4894627c7a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -4,7 +4,6 @@ mod router; mod store; use api::v1::meta::CreateRequest; -use api::v1::meta::CreateResponse; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; use api::v1::meta::PutRequest; @@ -21,39 +20,47 @@ use router::Client as RouterClient; use snafu::OptionExt; use store::Client as StoreClient; +use self::heartbeat::HeartbeatSender; +use self::heartbeat::HeartbeatStream; use crate::error; use crate::error::Result; +pub type Id = (u64, u64); + #[derive(Clone, Debug, Default)] pub struct MetaClientBuilder { - heartbeat_client: bool, - router_client: bool, - store_client: bool, + id: Id, + enable_heartbeat: bool, + enable_router: bool, + enable_store: bool, channel_manager: Option, } impl MetaClientBuilder { - pub fn new() -> Self { - MetaClientBuilder::default() + pub fn new(cluster_id: u64, member_id: u64) -> Self { + Self { + id: (cluster_id, member_id), + ..Default::default() + } } - pub fn heartbeat_client(self, enabled: bool) -> Self { + pub fn enable_heartbeat(self) -> Self { Self { - heartbeat_client: enabled, + enable_heartbeat: true, ..self } } - pub fn router_client(self, enabled: bool) -> Self { + pub fn enable_router(self) -> Self { Self { - router_client: enabled, + enable_router: true, ..self } } - pub fn store_client(self, enabled: bool) -> Self { + pub fn enable_store(self) -> Self { Self { - store_client: enabled, + enable_store: true, ..self } } @@ -66,39 +73,37 @@ impl MetaClientBuilder { } pub fn build(self) -> MetaClient { - let mut meta_client = if let Some(mgr) = self.channel_manager { - MetaClient { - channel_manager: mgr, - ..Default::default() - } + let mut client = if let Some(mgr) = self.channel_manager { + MetaClient::with_channel_manager(self.id, mgr) } else { - Default::default() + MetaClient::new(self.id) }; if let (false, false, false) = - (self.heartbeat_client, self.router_client, self.store_client) + (self.enable_heartbeat, self.enable_router, self.enable_store) { panic!("At least one client needs to be enabled.") } - let mgr = meta_client.channel_manager.clone(); + let mgr = client.channel_manager.clone(); - if self.heartbeat_client { - meta_client.heartbeat_client = Some(HeartbeatClient::new(mgr.clone())); + if self.enable_heartbeat { + client.heartbeat_client = Some(HeartbeatClient::new(self.id, mgr.clone())); } - if self.router_client { - meta_client.router_client = Some(RouterClient::new(mgr.clone())); + if self.enable_router { + client.router_client = Some(RouterClient::new(self.id, mgr.clone())); } - if self.store_client { - meta_client.store_client = Some(StoreClient::new(mgr)); + if self.enable_store { + client.store_client = Some(StoreClient::new(self.id, mgr)); } - meta_client + client } } #[derive(Clone, Debug, Default)] pub struct MetaClient { + id: Id, channel_manager: ChannelManager, heartbeat_client: Option, router_client: Option, @@ -106,6 +111,21 @@ pub struct MetaClient { } impl MetaClient { + pub fn new(id: Id) -> Self { + Self { + id, + ..Default::default() + } + } + + pub fn with_channel_manager(id: Id, channel_manager: ChannelManager) -> Self { + Self { + id, + channel_manager, + ..Default::default() + } + } + pub async fn start(&mut self, urls: A) -> Result<()> where U: AsRef, @@ -142,7 +162,16 @@ impl MetaClient { todo!() } - pub async fn create_route(&self, req: CreateRequest) -> Result { + pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> { + self.heartbeat_client() + .context(error::NotStartedSnafu { + name: "heartbeat_client", + })? + .heartbeat() + .await + } + + pub async fn create_route(&self, req: CreateRequest) -> Result { self.router_client() .context(error::NotStartedSnafu { name: "route_client", @@ -224,9 +253,15 @@ impl MetaClient { self.store_client.clone() } - pub fn channel_config(&self) -> Option { + #[inline] + pub fn channel_config(&self) -> &ChannelConfig { self.channel_manager.config() } + + #[inline] + pub fn id(&self) -> Id { + self.id + } } #[cfg(test)] @@ -237,32 +272,34 @@ mod tests { async fn test_meta_client_builder() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new().heartbeat_client(true).build(); + let mut meta_client = MetaClientBuilder::new(0, 0).enable_heartbeat().build(); assert!(meta_client.heartbeat_client().is_some()); assert!(meta_client.router_client().is_none()); assert!(meta_client.store_client().is_none()); meta_client.start(urls).await.unwrap(); assert!(meta_client.heartbeat_client().unwrap().is_started().await); - let mut meta_client = MetaClientBuilder::new().router_client(true).build(); + let mut meta_client = MetaClientBuilder::new(0, 0).enable_router().build(); assert!(meta_client.heartbeat_client().is_none()); assert!(meta_client.router_client().is_some()); assert!(meta_client.store_client().is_none()); meta_client.start(urls).await.unwrap(); assert!(meta_client.router_client().unwrap().is_started().await); - let mut meta_client = MetaClientBuilder::new().store_client(true).build(); + let mut meta_client = MetaClientBuilder::new(0, 0).enable_store().build(); assert!(meta_client.heartbeat_client().is_none()); assert!(meta_client.router_client().is_none()); assert!(meta_client.store_client().is_some()); meta_client.start(urls).await.unwrap(); assert!(meta_client.store_client().unwrap().is_started().await); - let mut meta_client = MetaClientBuilder::new() - .heartbeat_client(true) - .router_client(true) - .store_client(true) + let mut meta_client = MetaClientBuilder::new(1, 2) + .enable_heartbeat() + .enable_router() + .enable_store() .build(); + assert_eq!(1, meta_client.id().0); + assert_eq!(2, meta_client.id().1); assert!(meta_client.heartbeat_client().is_some()); assert!(meta_client.router_client().is_some()); assert!(meta_client.store_client().is_some()); @@ -276,9 +313,9 @@ mod tests { async fn test_not_start_heartbeat_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new() - .router_client(true) - .store_client(true) + let mut meta_client = MetaClientBuilder::new(0, 0) + .enable_router() + .enable_store() .build(); meta_client.start(urls).await.unwrap(); @@ -292,9 +329,9 @@ mod tests { async fn test_not_start_router_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new() - .heartbeat_client(true) - .store_client(true) + let mut meta_client = MetaClientBuilder::new(0, 0) + .enable_heartbeat() + .enable_store() .build(); meta_client.start(urls).await.unwrap(); @@ -308,9 +345,9 @@ mod tests { async fn test_not_start_store_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new() - .heartbeat_client(true) - .router_client(true) + let mut meta_client = MetaClientBuilder::new(0, 0) + .enable_heartbeat() + .enable_router() .build(); meta_client.start(urls).await.unwrap(); @@ -323,10 +360,6 @@ mod tests { #[should_panic] #[test] fn test_enable_at_least_one_client() { - let _ = MetaClientBuilder::new() - .heartbeat_client(false) - .router_client(false) - .store_client(false) - .build(); + let _ = MetaClientBuilder::new(0, 0).build(); } } diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index a6e3188f26..03955edf09 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -3,34 +3,79 @@ use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::AskLeaderRequest; +use api::v1::meta::HeartbeatRequest; +use api::v1::meta::HeartbeatResponse; use api::v1::meta::RequestHeader; use common_grpc::channel_manager::ChannelManager; use common_telemetry::debug; +use common_telemetry::info; use snafu::ensure; use snafu::OptionExt; use snafu::ResultExt; +use tokio::sync::mpsc; use tokio::sync::RwLock; +use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; +use tonic::Streaming; +use super::Id; use crate::error; use crate::error::Result; +pub struct HeartbeatSender { + sender: mpsc::Sender, +} + +impl HeartbeatSender { + #[inline] + const fn new(sender: mpsc::Sender) -> Self { + Self { sender } + } + + #[inline] + pub async fn send(&self, req: HeartbeatRequest) -> Result<()> { + self.sender.send(req).await.map_err(|e| { + error::SendHeartbeatSnafu { + err_msg: e.to_string(), + } + .build() + }) + } +} + +#[derive(Debug)] +pub struct HeartbeatStream { + stream: Streaming, +} + +impl HeartbeatStream { + #[inline] + const fn new(stream: Streaming) -> Self { + Self { stream } + } + + /// Fetch the next message from this stream. + #[inline] + pub async fn message(&mut self) -> Result> { + self.stream.message().await.context(error::TonicStatusSnafu) + } +} + #[derive(Clone, Debug)] pub struct Client { inner: Arc>, } impl Client { - pub fn new(channel_manager: ChannelManager) -> Self { - let inner = Inner { + pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + let inner = Arc::new(RwLock::new(Inner { + id, channel_manager, peers: HashSet::default(), leader: None, - }; + })); - Self { - inner: Arc::new(RwLock::new(inner)), - } + Self { inner } } pub async fn start(&mut self, urls: A) -> Result<()> @@ -47,16 +92,20 @@ impl Client { inner.ask_leader().await } + pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> { + let inner = self.inner.read().await; + inner.heartbeat().await + } + pub async fn is_started(&self) -> bool { let inner = self.inner.read().await; inner.is_started() } - - // TODO(jiachun) send heartbeat } #[derive(Debug)] struct Inner { + id: Id, channel_manager: ChannelManager, peers: HashSet, leader: Option, @@ -93,7 +142,7 @@ impl Inner { ); // TODO(jiachun): set cluster_id and member_id - let header = RequestHeader::new(0, 0); + let header = RequestHeader::with_id(self.id); let mut leader = None; for addr in &self.peers { let req = AskLeaderRequest::new(header.clone()); @@ -114,6 +163,36 @@ impl Inner { Ok(()) } + async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> { + let leader = self.leader.as_ref().context(error::NoLeaderSnafu)?; + let mut leader = self.make_client(leader)?; + + let (sender, receiver) = mpsc::channel::(128); + let handshake = HeartbeatRequest::new(RequestHeader::with_id(self.id)); + sender.send(handshake).await.map_err(|e| { + error::SendHeartbeatSnafu { + err_msg: e.to_string(), + } + .build() + })?; + let receiver = ReceiverStream::new(receiver); + + let mut stream = leader + .heartbeat(receiver) + .await + .context(error::TonicStatusSnafu)? + .into_inner(); + + let res = stream + .message() + .await + .context(error::TonicStatusSnafu)? + .context(error::CreateHeartbeatStreamSnafu)?; + info!("Success to create heartbeat stream to server: {:#?}", res); + + Ok((HeartbeatSender::new(sender), HeartbeatStream::new(stream))) + } + fn make_client(&self, addr: impl AsRef) -> Result> { let channel = self .channel_manager @@ -135,7 +214,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); assert!(!client.is_started().await); @@ -149,7 +228,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -169,7 +248,7 @@ mod test { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await @@ -180,7 +259,7 @@ mod test { #[tokio::test] async fn test_ask_leader_unavailable() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); let res = client.ask_leader().await; @@ -190,4 +269,41 @@ mod test { let err = res.err().unwrap(); assert!(matches!(err, error::Error::AskLeader { .. })); } + + #[tokio::test] + async fn test_heartbeat_unavailable() { + let mut client = Client::new((0, 0), ChannelManager::default()); + client.start(&["unavailable_peer"]).await.unwrap(); + client.inner.write().await.leader = Some("unavailable".to_string()); + + let res = client.heartbeat().await; + + assert!(res.is_err()); + + let err = res.err().unwrap(); + assert!(matches!(err, error::Error::TonicStatus { .. })); + } + + #[tokio::test] + async fn test_heartbeat_stream() { + let (sender, mut receiver) = mpsc::channel::(100); + let sender = HeartbeatSender::new(sender); + + tokio::spawn(async move { + for i in 0..10 { + sender + .send(HeartbeatRequest::new(RequestHeader::new(i, i))) + .await + .unwrap(); + } + }); + + let mut i = 0; + while let Some(req) = receiver.recv().await { + let header = req.header.unwrap(); + assert_eq!(i, header.cluster_id); + assert_eq!(i, header.member_id); + i += 1; + } + } } diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index b048cecfd4..aa1d8544e4 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use api::v1::meta::router_client::RouterClient; use api::v1::meta::CreateRequest; -use api::v1::meta::CreateResponse; use api::v1::meta::RouteRequest; use api::v1::meta::RouteResponse; use common_grpc::channel_manager::ChannelManager; @@ -13,6 +12,7 @@ use snafu::ResultExt; use tokio::sync::RwLock; use tonic::transport::Channel; +use super::Id; use crate::client::load_balance as lb; use crate::error; use crate::error::Result; @@ -23,15 +23,14 @@ pub struct Client { } impl Client { - pub fn new(channel_manager: ChannelManager) -> Self { - let inner = Inner { + pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + let inner = Arc::new(RwLock::new(Inner { + id, channel_manager, peers: vec![], - }; + })); - Self { - inner: Arc::new(RwLock::new(inner)), - } + Self { inner } } pub async fn start(&mut self, urls: A) -> Result<()> @@ -48,7 +47,7 @@ impl Client { inner.is_started() } - pub async fn create(&self, req: CreateRequest) -> Result { + pub async fn create(&self, req: CreateRequest) -> Result { let inner = self.inner.read().await; inner.create(req).await } @@ -61,6 +60,8 @@ impl Client { #[derive(Debug)] struct Inner { + #[allow(dead_code)] + id: Id, // TODO(jiachun): will use it later channel_manager: ChannelManager, peers: Vec, } @@ -97,7 +98,7 @@ impl Inner { Ok(res.into_inner()) } - async fn create(&self, req: CreateRequest) -> Result { + async fn create(&self, req: CreateRequest) -> Result { let mut client = self.random_client()?; let res = client.create(req).await.context(error::TonicStatusSnafu)?; @@ -139,7 +140,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); assert!(!client.is_started().await); @@ -153,7 +154,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -173,7 +174,7 @@ mod test { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await @@ -184,7 +185,7 @@ mod test { #[tokio::test] async fn test_create_unavailable() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); let header = RequestHeader::new(0, 0); @@ -201,7 +202,7 @@ mod test { #[tokio::test] async fn test_route_unavailable() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); let header = RequestHeader::new(0, 0); diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index f6a587a229..d861133c7a 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -15,6 +15,7 @@ use snafu::ResultExt; use tokio::sync::RwLock; use tonic::transport::Channel; +use super::Id; use crate::client::load_balance as lb; use crate::error; use crate::error::Result; @@ -25,15 +26,14 @@ pub struct Client { } impl Client { - pub fn new(channel_manager: ChannelManager) -> Self { - let inner = Inner { + pub fn new(id: Id, channel_manager: ChannelManager) -> Self { + let inner = Arc::new(RwLock::new(Inner { + id, channel_manager, peers: vec![], - }; + })); - Self { - inner: Arc::new(RwLock::new(inner)), - } + Self { inner } } pub async fn start(&mut self, urls: A) -> Result<()> @@ -68,6 +68,8 @@ impl Client { #[derive(Debug)] struct Inner { + #[allow(dead_code)] + id: Id, // TODO(jiachun): will use it later channel_manager: ChannelManager, peers: Vec, } @@ -155,7 +157,7 @@ mod test { #[tokio::test] async fn test_start_client() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); assert!(!client.is_started().await); @@ -169,7 +171,7 @@ mod test { #[tokio::test] async fn test_already_start() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await @@ -189,7 +191,7 @@ mod test { #[tokio::test] async fn test_start_with_duplicate_peers() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await @@ -200,7 +202,7 @@ mod test { #[tokio::test] async fn test_range_unavailable() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unknow_peer"]).await.unwrap(); let req = RangeRequest { @@ -219,7 +221,7 @@ mod test { #[tokio::test] async fn test_put_unavailable() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); let req = PutRequest { @@ -240,7 +242,7 @@ mod test { #[tokio::test] async fn test_delete_range_unavailable() { - let mut client = Client::new(ChannelManager::default()); + let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); let req = DeleteRangeRequest { diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index 0aa215ffb6..68043256c9 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -25,6 +25,9 @@ pub enum Error { #[snafu(display("Failed to ask leader from all endpoints"))] AskLeader { backtrace: Backtrace }, + #[snafu(display("No leader, should ask leader first"))] + NoLeader { backtrace: Backtrace }, + #[snafu(display("Failed to create gRPC channel, source: {}", source))] CreateChannel { #[snafu(backtrace)] @@ -33,6 +36,15 @@ pub enum Error { #[snafu(display("{} not started", name))] NotStarted { name: String, backtrace: Backtrace }, + + #[snafu(display("Failed to send heartbeat: {}", err_msg))] + SendHeartbeat { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Failed create heartbeat stream to server"))] + CreateHeartbeatStream { backtrace: Backtrace }, } #[allow(dead_code)] @@ -53,7 +65,10 @@ impl ErrorExt for Error { | Error::IllegalGrpcClientState { .. } | Error::TonicStatus { .. } | Error::AskLeader { .. } + | Error::NoLeader { .. } | Error::NotStarted { .. } + | Error::SendHeartbeat { .. } + | Error::CreateHeartbeatStream { .. } | Error::CreateChannel { .. } => StatusCode::Internal, } } @@ -118,6 +133,14 @@ mod tests { assert_eq!(e.status_code(), StatusCode::Internal); } + #[test] + fn test_no_leader_error() { + let e = throw_none_option().context(NoLeaderSnafu).err().unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + #[test] fn test_create_channel_error() { fn throw_common_grpc_error() -> StdResult { @@ -134,4 +157,26 @@ mod tests { assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } + + #[test] + fn test_send_heartbeat_error() { + let e = throw_none_option() + .context(SendHeartbeatSnafu { err_msg: "" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_create_heartbeat_stream_error() { + let e = throw_none_option() + .context(CreateHeartbeatStreamSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index e058219d34..3e7014f706 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -9,10 +9,12 @@ api = { path = "../api" } async-trait = "0.1" common-base = { path = "../common/base" } common-error = { path = "../common/error" } +common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } etcd-client = "0.10" futures = "0.3" http-body = "0.4" +h2 = "0.3" serde = "1.0" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 7eae327227..015c133f9d 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -14,15 +14,16 @@ use crate::service::store::etcd::EtcdStore; // Bootstrap the rpc server to serve incoming request pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { let kv_store = EtcdStore::with_endpoints([&opts.store_addr]).await?; - let meta_srv = MetaSrv::new(kv_store); - let listener = TcpListener::bind(&opts.server_addr) + let listener = TcpListener::bind(&opts.bind_addr) .await .context(error::TcpBindSnafu { - addr: &opts.server_addr, + addr: &opts.bind_addr, })?; let listener = TcpListenerStream::new(listener); + let meta_srv = MetaSrv::new(opts, kv_store); + tonic::transport::Server::builder() .accept_http1(true) // for admin services .add_service(HeartbeatServer::new(meta_srv.clone())) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index e10bf905c1..63989ca8bf 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -34,6 +34,9 @@ pub enum Error { source: tonic::transport::Error, backtrace: Backtrace, }, + + #[snafu(display("Empty table name"))] + EmptyTableName { backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -60,11 +63,35 @@ impl ErrorExt for Error { | Error::ConnectEtcd { .. } | Error::TcpBind { .. } | Error::StartGrpc { .. } => StatusCode::Internal, - Error::EmptyKey { .. } => StatusCode::InvalidArguments, + Error::EmptyKey { .. } | Error::EmptyTableName { .. } => StatusCode::InvalidArguments, } } } +// for form tonic +pub(crate) fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { + let mut err: &(dyn std::error::Error + 'static) = err_status; + + loop { + if let Some(io_err) = err.downcast_ref::() { + return Some(io_err); + } + + // h2::Error do not expose std::io::Error with `source()` + // https://github.com/hyperium/h2/pull/462 + if let Some(h2_err) = err.downcast_ref::() { + if let Some(io_err) = h2_err.get_io() { + return Some(io_err); + } + } + + err = match err.source() { + Some(err) => err, + None => return None, + }; + } +} + #[cfg(test)] mod tests { use super::*; @@ -142,4 +169,15 @@ mod tests { assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } + + #[test] + fn test_empty_table_error() { + let e = throw_none_option() + .context(EmptyTableNameSnafu) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index f3e07cb5e5..b490956eb6 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -5,6 +5,7 @@ use crate::service::store::kv::KvStoreRef; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct MetaSrvOptions { + pub bind_addr: String, pub server_addr: String, pub store_addr: String, } @@ -12,6 +13,7 @@ pub struct MetaSrvOptions { impl Default for MetaSrvOptions { fn default() -> Self { Self { + bind_addr: "0.0.0.0:3002".to_string(), server_addr: "0.0.0.0:3002".to_string(), store_addr: "0.0.0.0:2380".to_string(), } @@ -20,12 +22,17 @@ impl Default for MetaSrvOptions { #[derive(Clone)] pub struct MetaSrv { + options: MetaSrvOptions, kv_store: KvStoreRef, } impl MetaSrv { - pub fn new(kv_store: KvStoreRef) -> Self { - Self { kv_store } + pub fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self { + Self { options, kv_store } + } + + pub fn options(&self) -> &MetaSrvOptions { + &self.options } pub fn kv_store(&self) -> KvStoreRef { diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 6f2a9b1614..2afb3fea45 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -1,3 +1,5 @@ +use std::io::ErrorKind; + use api::v1::meta::heartbeat_server; use api::v1::meta::AskLeaderRequest; use api::v1::meta::AskLeaderResponse; @@ -6,9 +8,11 @@ use api::v1::meta::HeartbeatRequest; use api::v1::meta::HeartbeatResponse; use api::v1::meta::ResponseHeader; use api::v1::meta::PROTOCOL_VERSION; +use common_telemetry::error; +use common_telemetry::info; use futures::StreamExt; -use futures::TryFutureExt; -use snafu::OptionExt; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tonic::Response; use tonic::Streaming; @@ -28,30 +32,81 @@ impl heartbeat_server::Heartbeat for MetaSrv { &self, req: Request>, ) -> GrpcResult { - let msg = req - .into_inner() - .next() - .await - .context(error::StreamNoneSnafu {})??; + let mut in_stream = req.into_inner(); + let (tx, rx) = mpsc::channel(128); - let res = handle_heartbeat(msg).map_err(|e| e.into()); + let kv_store = self.kv_store(); + common_runtime::spawn_bg(async move { + while let Some(msg) = in_stream.next().await { + match msg { + Ok(req) => tx + .send( + handle_heartbeat(req, kv_store.clone()) + .await + .map_err(|e| e.into()), + ) + .await + .expect("working rx"), + Err(err) => { + if let Some(io_err) = error::match_for_io_error(&err) { + if io_err.kind() == ErrorKind::BrokenPipe { + // client disconnected in unexpected way + error!("Client disconnected: broken pipe"); + break; + } + } - let output = futures::stream::once(res); + match tx.send(Err(err)).await { + Ok(_) => (), + Err(_err) => break, // response was droped + } + } + } + } + info!("Heartbeat stream broken: {:?}", in_stream); + }); - Ok(Response::new(Box::pin(output))) + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new(Box::pin(out_stream) as Self::HeartbeatStream)) } async fn ask_leader(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let kv_store = self.kv_store(); - let res = handle_ask_leader(req, kv_store).await?; + let res = self.handle_ask_leader(req).await?; Ok(Response::new(res)) } } -async fn handle_heartbeat(msg: HeartbeatRequest) -> Result { - let HeartbeatRequest { header, .. } = msg; +impl MetaSrv { + // TODO(jiachun): move out when we can get the leader peer from kv store + async fn handle_ask_leader(&self, req: AskLeaderRequest) -> Result { + let AskLeaderRequest { header, .. } = req; + + let res_header = ResponseHeader { + protocol_version: PROTOCOL_VERSION, + cluster_id: header.map_or(0u64, |h| h.cluster_id), + ..Default::default() + }; + + // TODO(jiachun): return leader + let res = AskLeaderResponse { + header: Some(res_header), + leader: Some(Endpoint { + addr: self.options().server_addr.clone(), + }), + }; + + Ok(res) + } +} + +async fn handle_heartbeat( + req: HeartbeatRequest, + _kv_store: KvStoreRef, +) -> Result { + let HeartbeatRequest { header, .. } = req; let res_header = ResponseHeader { protocol_version: PROTOCOL_VERSION, @@ -69,29 +124,6 @@ async fn handle_heartbeat(msg: HeartbeatRequest) -> Result { Ok(res) } -async fn handle_ask_leader( - req: AskLeaderRequest, - _kv_store: KvStoreRef, -) -> Result { - let AskLeaderRequest { header, .. } = req; - - let res_header = ResponseHeader { - protocol_version: PROTOCOL_VERSION, - cluster_id: header.map_or(0u64, |h| h.cluster_id), - ..Default::default() - }; - - // TODO(jiachun): return leader - let res = AskLeaderResponse { - header: Some(res_header), - leader: Some(Endpoint { - addr: "127.0.0.1:3002".to_string(), - }), - }; - - Ok(res) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -105,15 +137,7 @@ mod tests { use crate::service::store::kv::KvStore; #[derive(Clone)] - pub struct NoopKvStore { - _opts: MetaSrvOptions, - } - - impl NoopKvStore { - pub fn new(opts: MetaSrvOptions) -> Self { - Self { _opts: opts } - } - } + pub struct NoopKvStore; #[async_trait::async_trait] impl KvStore for NoopKvStore { @@ -135,18 +159,20 @@ mod tests { #[tokio::test] async fn test_handle_heartbeat_resp_header() { + let kv_store = Arc::new(NoopKvStore {}); + let header = RequestHeader::new(1, 2); let req = HeartbeatRequest::new(header); - let res = handle_heartbeat(req).await.unwrap(); + let res = handle_heartbeat(req, kv_store).await.unwrap(); assert_eq!(1, res.header.unwrap().cluster_id); } #[tokio::test] async fn test_ask_leader() { - let kv_store = Arc::new(NoopKvStore::new(MetaSrvOptions::default())); - let meta_srv = MetaSrv::new(kv_store); + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); let header = RequestHeader::new(1, 1); let req = AskLeaderRequest::new(header); @@ -154,6 +180,6 @@ mod tests { let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); let res = res.into_inner(); assert_eq!(1, res.header.unwrap().cluster_id); - assert_eq!("127.0.0.1:3002".to_string(), res.leader.unwrap().addr); + assert_eq!(meta_srv.options().bind_addr, res.leader.unwrap().addr); } } diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index b1775df5ab..efb599430a 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -1,14 +1,15 @@ use api::v1::meta::router_server; use api::v1::meta::CreateRequest; -use api::v1::meta::CreateResponse; use api::v1::meta::Peer; use api::v1::meta::RouteRequest; use api::v1::meta::RouteResponse; +use snafu::OptionExt; use tonic::Request; use tonic::Response; use super::store::kv::KvStoreRef; use super::GrpcResult; +use crate::error; use crate::error::Result; use crate::metasrv::MetaSrv; @@ -22,7 +23,7 @@ impl router_server::Router for MetaSrv { Ok(Response::new(res)) } - async fn create(&self, req: Request) -> GrpcResult { + async fn create(&self, req: Request) -> GrpcResult { let req = req.into_inner(); let kv_store = self.kv_store(); let res = handle_create(req, kv_store).await?; @@ -35,16 +36,15 @@ async fn handle_route(_req: RouteRequest, _kv_store: KvStoreRef) -> Result Result { - let CreateRequest { mut regions, .. } = req; +async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result { + let CreateRequest { table_name, .. } = req; + let _table_name = table_name.context(error::EmptyTableNameSnafu)?; - // TODO(jiachun): route table - for r in &mut regions { - r.peer = Some(Peer::new(0, "127.0.0.1:3000")); - } + // TODO(jiachun): + let peers = vec![Peer::new(0, "127.0.0.1:3000")]; - Ok(CreateResponse { - regions, + Ok(RouteResponse { + peers, ..Default::default() }) } @@ -58,6 +58,7 @@ mod tests { use tonic::IntoRequest; use super::*; + use crate::metasrv::MetaSrvOptions; use crate::service::store::kv::KvStore; struct MockKvStore; @@ -84,7 +85,7 @@ mod tests { #[tokio::test] async fn test_handle_route() { let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(kv_store); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); let header = RequestHeader::new(1, 1); let req = RouteRequest::new(header); @@ -99,28 +100,26 @@ mod tests { #[tokio::test] async fn test_handle_create() { let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(kv_store); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); let header = RequestHeader::new(1, 1); let table_name = TableName::new("test_catalog", "test_db", "table1"); let req = CreateRequest::new(header, table_name); - let p = region::Partition::new() + let p0 = Partition::new() .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) .value_list(vec![b"v1".to_vec(), b"v2".to_vec()]); - let r1 = Region::new(1, "region1", p); - let p = region::Partition::new() + let p1 = Partition::new() .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) .value_list(vec![b"v11".to_vec(), b"v22".to_vec()]); - let r2 = Region::new(1, "region2", p); - let req = req.add_region(r1).add_region(r2); + let req = req.add_partition(p0).add_partition(p1); let res = meta_srv.create(req.into_request()).await.unwrap(); - for r in res.into_inner().regions { - assert_eq!("127.0.0.1:3000", r.peer.unwrap().endpoint.unwrap().addr); + for r in res.into_inner().peers { + assert_eq!("127.0.0.1:3000", r.endpoint.unwrap().addr); } } } diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 848c1a1801..b4ad937fcb 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -50,6 +50,7 @@ mod tests { use tonic::IntoRequest; use super::*; + use crate::metasrv::MetaSrvOptions; use crate::service::store::kv::KvStore; struct MockKvStore; @@ -75,7 +76,7 @@ mod tests { #[tokio::test] async fn test_range() { let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(kv_store); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); let req = RangeRequest::default(); let res = meta_srv.range(req.into_request()).await; @@ -85,7 +86,7 @@ mod tests { #[tokio::test] async fn test_put() { let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(kv_store); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); let req = PutRequest::default(); let res = meta_srv.put(req.into_request()).await; @@ -95,7 +96,7 @@ mod tests { #[tokio::test] async fn test_delete_range() { let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(kv_store); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); let req = DeleteRangeRequest::default(); let res = meta_srv.delete_range(req.into_request()).await;