diff --git a/src/api/greptime/v1/meta/common.proto b/src/api/greptime/v1/meta/common.proto index 8d3cff5d22..af0fcd47e7 100644 --- a/src/api/greptime/v1/meta/common.proto +++ b/src/api/greptime/v1/meta/common.proto @@ -24,11 +24,7 @@ message Error { message Peer { uint64 id = 1; - Endpoint endpoint = 2; -} - -message Endpoint { - string addr = 1; + string addr = 2; } message TableName { diff --git a/src/api/greptime/v1/meta/heartbeat.proto b/src/api/greptime/v1/meta/heartbeat.proto index b5c96c2b67..4cc89bc943 100644 --- a/src/api/greptime/v1/meta/heartbeat.proto +++ b/src/api/greptime/v1/meta/heartbeat.proto @@ -88,5 +88,5 @@ message AskLeaderRequest { message AskLeaderResponse { ResponseHeader header = 1; - Endpoint leader = 2; + Peer leader = 2; } diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto index 07ac11cbc0..33976bf91d 100644 --- a/src/api/greptime/v1/meta/route.proto +++ b/src/api/greptime/v1/meta/route.proto @@ -8,19 +8,21 @@ service Router { // Fetch routing information for tables. The smallest unit is the complete // routing information(all regions) of a table. // + // ```text // table_1 // table_name // table_schema // regions // region_1 - // mutate_endpoint - // select_endpoint_1, select_endpoint_2 + // leader_peer + // follower_peer_1, follower_peer_2 // region_2 - // mutate_endpoint - // select_endpoint_1, select_endpoint_2, select_endpoint_3 + // leader_peer + // follower_peer_1, follower_peer_2, follower_peer_3 // region_xxx // table_2 // ... + // ``` // rpc Route(RouteRequest) returns (RouteResponse) {} diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs index 78e64905c6..ecde5a81e5 100644 --- a/src/api/src/v1/meta.rs +++ b/src/api/src/v1/meta.rs @@ -2,56 +2,19 @@ tonic::include_proto!("greptime.v1.meta"); pub const PROTOCOL_VERSION: u64 = 1; -impl Peer { - pub fn new(id: u64, addr: impl AsRef) -> Self { - Self { - id, - endpoint: Some(addr.as_ref().into()), - } - } -} - -impl From<&str> for Endpoint { - fn from(s: &str) -> Self { - Self { - addr: s.to_string(), - } - } +pub const fn request_header((cluster_id, member_id): (u64, u64)) -> Option { + Some(RequestHeader::new((cluster_id, member_id))) } impl RequestHeader { - pub fn new(cluster_id: u64, member_id: u64) -> Self { + #[inline] + pub const fn new((cluster_id, member_id): (u64, u64)) -> Self { Self { protocol_version: PROTOCOL_VERSION, cluster_id, member_id, } } - - pub fn with_id((cluster_id, member_id): (u64, u64)) -> Self { - Self { - protocol_version: PROTOCOL_VERSION, - cluster_id, - member_id, - } - } -} - -impl HeartbeatRequest { - pub fn new(header: RequestHeader) -> Self { - Self { - header: Some(header), - ..Default::default() - } - } -} - -impl AskLeaderRequest { - pub fn new(header: RequestHeader) -> Self { - Self { - header: Some(header), - } - } } impl TableName { @@ -69,13 +32,6 @@ impl TableName { } impl RouteRequest { - pub fn new(header: RequestHeader) -> Self { - Self { - header: Some(header), - ..Default::default() - } - } - pub fn add_table(mut self, table_name: TableName) -> Self { self.table_names.push(table_name); self @@ -83,14 +39,6 @@ impl RouteRequest { } impl CreateRequest { - pub fn new(header: RequestHeader, table_name: TableName) -> Self { - Self { - header: Some(header), - table_name: Some(table_name), - ..Default::default() - } - } - pub fn add_partition(mut self, partition: Partition) -> Self { self.partitions.push(partition); self @@ -128,20 +76,3 @@ impl Partition { self } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_peer() { - let peer = Peer::new(1, "test_addr"); - assert_eq!(1, peer.id); - assert_eq!( - Endpoint { - addr: "test_addr".to_string() - }, - peer.endpoint.unwrap() - ); - } -} diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 0d0f4866c6..bdb97460cc 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -1,16 +1,16 @@ use std::time::Duration; -use api::v1::meta::CreateRequest; -use api::v1::meta::DeleteRangeRequest; use api::v1::meta::HeartbeatRequest; -use api::v1::meta::Partition; -use api::v1::meta::PutRequest; -use api::v1::meta::RangeRequest; -use api::v1::meta::RequestHeader; -use api::v1::meta::TableName; +use api::v1::meta::Peer; use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelManager; use meta_client::client::MetaClientBuilder; +use meta_client::rpc::CreateRequest; +use meta_client::rpc::DeleteRangeRequest; +use meta_client::rpc::Partition; +use meta_client::rpc::PutRequest; +use meta_client::rpc::RangeRequest; +use meta_client::rpc::TableName; use tracing::event; use tracing::subscriber; use tracing::Level; @@ -44,28 +44,37 @@ async fn run() { // send heartbeats tokio::spawn(async move { for _ in 0..5 { - let req = HeartbeatRequest::new(RequestHeader::with_id(id)); + let req = HeartbeatRequest { + peer: Some(Peer { + id: 1, + addr: "meta_client_peer".to_string(), + }), + ..Default::default() + }; sender.send(req).await.unwrap(); } + tokio::time::sleep(Duration::from_secs(10)).await; + }); + + tokio::spawn(async move { + while let Some(res) = receiver.message().await.unwrap() { + event!(Level::INFO, "heartbeat response: {:#?}", res); + } }); - while let Some(res) = receiver.message().await.unwrap() { - event!(Level::INFO, "heartbeat response: {:#?}", res); - } + let p1 = Partition { + column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], + value_list: vec![b"k1".to_vec(), b"k2".to_vec()], + }; - let header = RequestHeader::with_id(id); - - let p1 = Partition::new() - .column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()]) - .value_list(vec![b"k1".to_vec(), b"k2".to_vec()]); - - let p2 = Partition::new() - .column_list(vec![b"col_1".to_vec(), b"col_2".to_vec()]) - .value_list(vec![b"Max1".to_vec(), b"Max2".to_vec()]); + let p2 = Partition { + column_list: vec![b"col_1".to_vec(), b"col_2".to_vec()], + value_list: vec![b"Max1".to_vec(), b"Max2".to_vec()], + }; let table_name = TableName::new("test_catlog", "test_schema", "test_table"); - let create_req = CreateRequest::new(header, table_name) + let create_req = CreateRequest::new(table_name) .add_partition(p1) .add_partition(p2); @@ -73,32 +82,24 @@ async fn run() { event!(Level::INFO, "create_route result: {:#?}", res); // put - let put_req = PutRequest { - key: b"key1".to_vec(), - value: b"value1".to_vec(), - prev_kv: true, - ..Default::default() - }; - let res = meta_client.put(put_req).await.unwrap(); + let put = PutRequest::new() + .with_key(b"key1".to_vec()) + .with_value(b"value1".to_vec()) + .with_prev_kv(); + let res = meta_client.put(put).await.unwrap(); event!(Level::INFO, "put result: {:#?}", res); // get - let range_req = RangeRequest { - key: b"key1".to_vec(), - ..Default::default() - }; - let res = meta_client.range(range_req.clone()).await.unwrap(); + let range = RangeRequest::new().with_key(b"key2".to_vec()); + let res = meta_client.range(range.clone()).await.unwrap(); event!(Level::INFO, "get range result: {:#?}", res); // delete - let delete_range_req = DeleteRangeRequest { - key: b"key1".to_vec(), - ..Default::default() - }; - let res = meta_client.delete_range(delete_range_req).await.unwrap(); + let delete_range = DeleteRangeRequest::new().with_key(b"key1".to_vec()); + let res = meta_client.delete_range(delete_range).await.unwrap(); event!(Level::INFO, "delete range result: {:#?}", res); // get none - let res = meta_client.range(range_req).await; + let res = meta_client.range(range).await.unwrap(); event!(Level::INFO, "get range result: {:#?}", res); } diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 4894627c7a..77c4d30eea 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -3,15 +3,6 @@ mod load_balance; mod router; mod store; -use api::v1::meta::CreateRequest; -use api::v1::meta::DeleteRangeRequest; -use api::v1::meta::DeleteRangeResponse; -use api::v1::meta::PutRequest; -use api::v1::meta::PutResponse; -use api::v1::meta::RangeRequest; -use api::v1::meta::RangeResponse; -use api::v1::meta::RouteRequest; -use api::v1::meta::RouteResponse; use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelManager; use common_telemetry::info; @@ -24,6 +15,15 @@ use self::heartbeat::HeartbeatSender; use self::heartbeat::HeartbeatStream; use crate::error; use crate::error::Result; +use crate::rpc::CreateRequest; +use crate::rpc::DeleteRangeRequest; +use crate::rpc::DeleteRangeResponse; +use crate::rpc::PutRequest; +use crate::rpc::PutResponse; +use crate::rpc::RangeRequest; +use crate::rpc::RangeResponse; +use crate::rpc::RouteRequest; +use crate::rpc::RouteResponse; pub type Id = (u64, u64); @@ -176,8 +176,9 @@ impl MetaClient { .context(error::NotStartedSnafu { name: "route_client", })? - .create(req) - .await + .create(req.into()) + .await? + .try_into() } /// Fetch routing information for tables. The smallest unit is the complete @@ -189,11 +190,11 @@ impl MetaClient { /// table_schema /// regions /// region_1 - /// mutate_endpoint - /// select_endpoint_1, select_endpoint_2 + /// leader_peer + /// follower_peer_1, follower_peer_2 /// region_2 - /// mutate_endpoint - /// select_endpoint_1, select_endpoint_2, select_endpoint_3 + /// leader_peer + /// follower_peer_1, follower_peer_2, follower_peer_3 /// region_xxx /// table_2 /// ... @@ -204,8 +205,9 @@ impl MetaClient { .context(error::NotStartedSnafu { name: "route_client", })? - .route(req) - .await + .route(req.into()) + .await? + .try_into() } /// Range gets the keys in the range from the key-value store. @@ -214,8 +216,9 @@ impl MetaClient { .context(error::NotStartedSnafu { name: "store_client", })? - .range(req) + .range(req.into()) .await + .map(Into::into) } /// Put puts the given key into the key-value store. @@ -224,8 +227,9 @@ impl MetaClient { .context(error::NotStartedSnafu { name: "store_client", })? - .put(req) + .put(req.into()) .await + .map(Into::into) } /// DeleteRange deletes the given range from the key-value store. @@ -234,8 +238,9 @@ impl MetaClient { .context(error::NotStartedSnafu { name: "store_client", })? - .delete_range(req) + .delete_range(req.into()) .await + .map(Into::into) } #[inline] @@ -267,6 +272,7 @@ impl MetaClient { #[cfg(test)] mod tests { use super::*; + use crate::rpc::TableName; #[tokio::test] async fn test_meta_client_builder() { @@ -336,7 +342,8 @@ mod tests { meta_client.start(urls).await.unwrap(); - let res = meta_client.create_route(CreateRequest::default()).await; + let req = CreateRequest::new(TableName::new("c", "s", "t")); + let res = meta_client.create_route(req).await; assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 03955edf09..a6388e3352 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -2,10 +2,10 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; +use api::v1::meta::request_header; use api::v1::meta::AskLeaderRequest; use api::v1::meta::HeartbeatRequest; use api::v1::meta::HeartbeatResponse; -use api::v1::meta::RequestHeader; use common_grpc::channel_manager::ChannelManager; use common_telemetry::debug; use common_telemetry::info; @@ -23,17 +23,24 @@ use crate::error; use crate::error::Result; pub struct HeartbeatSender { + id: Id, sender: mpsc::Sender, } impl HeartbeatSender { #[inline] - const fn new(sender: mpsc::Sender) -> Self { - Self { sender } + const fn new(id: Id, sender: mpsc::Sender) -> Self { + Self { id, sender } } #[inline] - pub async fn send(&self, req: HeartbeatRequest) -> Result<()> { + pub fn id(&self) -> Id { + self.id + } + + #[inline] + pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> { + req.header = request_header(self.id); self.sender.send(req).await.map_err(|e| { error::SendHeartbeatSnafu { err_msg: e.to_string(), @@ -45,13 +52,19 @@ impl HeartbeatSender { #[derive(Debug)] pub struct HeartbeatStream { + id: Id, stream: Streaming, } impl HeartbeatStream { #[inline] - const fn new(stream: Streaming) -> Self { - Self { stream } + const fn new(id: Id, stream: Streaming) -> Self { + Self { id, stream } + } + + #[inline] + pub fn id(&self) -> Id { + self.id } /// Fetch the next message from this stream. @@ -141,11 +154,12 @@ impl Inner { } ); - // TODO(jiachun): set cluster_id and member_id - let header = RequestHeader::with_id(self.id); + let header = request_header(self.id); let mut leader = None; for addr in &self.peers { - let req = AskLeaderRequest::new(header.clone()); + let req = AskLeaderRequest { + header: header.clone(), + }; let mut client = self.make_client(addr)?; match client.ask_leader(req).await { Ok(res) => { @@ -168,7 +182,11 @@ impl Inner { let mut leader = self.make_client(leader)?; let (sender, receiver) = mpsc::channel::(128); - let handshake = HeartbeatRequest::new(RequestHeader::with_id(self.id)); + let header = request_header(self.id); + let handshake = HeartbeatRequest { + header, + ..Default::default() + }; sender.send(handshake).await.map_err(|e| { error::SendHeartbeatSnafu { err_msg: e.to_string(), @@ -190,7 +208,10 @@ impl Inner { .context(error::CreateHeartbeatStreamSnafu)?; info!("Success to create heartbeat stream to server: {:#?}", res); - Ok((HeartbeatSender::new(sender), HeartbeatStream::new(stream))) + Ok(( + HeartbeatSender::new(self.id, sender), + HeartbeatStream::new(self.id, stream), + )) } fn make_client(&self, addr: impl AsRef) -> Result> { @@ -287,23 +308,18 @@ mod test { #[tokio::test] async fn test_heartbeat_stream() { let (sender, mut receiver) = mpsc::channel::(100); - let sender = HeartbeatSender::new(sender); + let sender = HeartbeatSender::new((8, 8), sender); tokio::spawn(async move { - for i in 0..10 { - sender - .send(HeartbeatRequest::new(RequestHeader::new(i, i))) - .await - .unwrap(); + for _ in 0..10 { + sender.send(HeartbeatRequest::default()).await.unwrap(); } }); - let mut i = 0; while let Some(req) = receiver.recv().await { let header = req.header.unwrap(); - assert_eq!(i, header.cluster_id); - assert_eq!(i, header.member_id); - i += 1; + assert_eq!(8, header.cluster_id); + assert_eq!(8, header.member_id); } } } diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index aa1d8544e4..0f16963ef1 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::sync::Arc; +use api::v1::meta::request_header; use api::v1::meta::router_client::RouterClient; use api::v1::meta::CreateRequest; use api::v1::meta::RouteRequest; @@ -60,8 +61,7 @@ impl Client { #[derive(Debug)] struct Inner { - #[allow(dead_code)] - id: Id, // TODO(jiachun): will use it later + id: Id, channel_manager: ChannelManager, peers: Vec, } @@ -90,17 +90,17 @@ impl Inner { Ok(()) } - async fn route(&self, req: RouteRequest) -> Result { + async fn route(&self, mut req: RouteRequest) -> Result { let mut client = self.random_client()?; - + req.header = request_header(self.id); let res = client.route(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) } - async fn create(&self, req: CreateRequest) -> Result { + async fn create(&self, mut req: CreateRequest) -> Result { let mut client = self.random_client()?; - + req.header = request_header(self.id); let res = client.create(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -134,8 +134,6 @@ impl Inner { #[cfg(test)] mod test { - use api::v1::meta::{RequestHeader, TableName}; - use super::*; #[tokio::test] @@ -188,8 +186,10 @@ mod test { let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); - let header = RequestHeader::new(0, 0); - let req = CreateRequest::new(header, TableName::default()); + let req = CreateRequest { + header: request_header((0, 0)), + ..Default::default() + }; let res = client.create(req).await; assert!(res.is_err()); @@ -205,8 +205,10 @@ mod test { let mut client = Client::new((0, 0), ChannelManager::default()); client.start(&["unavailable_peer"]).await.unwrap(); - let header = RequestHeader::new(0, 0); - let req = RouteRequest::new(header); + let req = RouteRequest { + header: request_header((0, 0)), + ..Default::default() + }; let res = client.route(req).await; assert!(res.is_err()); diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index d861133c7a..b1353b3ba5 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::sync::Arc; +use api::v1::meta::request_header; use api::v1::meta::store_client::StoreClient; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; @@ -68,8 +69,7 @@ impl Client { #[derive(Debug)] struct Inner { - #[allow(dead_code)] - id: Id, // TODO(jiachun): will use it later + id: Id, channel_manager: ChannelManager, peers: Vec, } @@ -98,25 +98,25 @@ impl Inner { Ok(()) } - async fn range(&self, req: RangeRequest) -> Result { + async fn range(&self, mut req: RangeRequest) -> Result { let mut client = self.random_client()?; - + req.header = request_header(self.id); let res = client.range(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) } - async fn put(&self, req: PutRequest) -> Result { + async fn put(&self, mut req: PutRequest) -> Result { let mut client = self.random_client()?; - + req.header = request_header(self.id); let res = client.put(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) } - async fn delete_range(&self, req: DeleteRangeRequest) -> Result { + async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result { let mut client = self.random_client()?; - + req.header = request_header(self.id); let res = client .delete_range(req) .await diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index 68043256c9..fac406ff7e 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -45,6 +45,12 @@ pub enum Error { #[snafu(display("Failed create heartbeat stream to server"))] CreateHeartbeatStream { backtrace: Backtrace }, + + #[snafu(display("Route info corruped: {}", err_msg))] + RouteInfoCorrupted { + err_msg: String, + backtrace: Backtrace, + }, } #[allow(dead_code)] @@ -70,6 +76,7 @@ impl ErrorExt for Error { | Error::SendHeartbeat { .. } | Error::CreateHeartbeatStream { .. } | Error::CreateChannel { .. } => StatusCode::Internal, + Error::RouteInfoCorrupted { .. } => StatusCode::Unexpected, } } } @@ -179,4 +186,15 @@ mod tests { assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } + + #[test] + fn test_route_info_corruped_error() { + let e = throw_none_option() + .context(RouteInfoCorruptedSnafu { err_msg: "" }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Unexpected); + } } diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 610c471ca3..85e7ff643a 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -1,2 +1,3 @@ pub mod client; mod error; +pub mod rpc; diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs new file mode 100644 index 0000000000..721cea6825 --- /dev/null +++ b/src/meta-client/src/rpc.rs @@ -0,0 +1,176 @@ +mod router; +mod store; + +use api::v1::meta::KeyValue as PbKeyValue; +use api::v1::meta::Peer as PbPeer; +use api::v1::meta::ResponseHeader as PbResponseHeader; +use api::v1::meta::TableName as PbTableName; +pub use router::CreateRequest; +pub use router::Partition; +pub use router::Region; +pub use router::RouteRequest; +pub use router::RouteResponse; +pub use router::Table; +pub use router::TableRoute; +pub use store::DeleteRangeRequest; +pub use store::DeleteRangeResponse; +pub use store::PutRequest; +pub use store::PutResponse; +pub use store::RangeRequest; +pub use store::RangeResponse; + +#[derive(Debug, Clone)] +pub struct ResponseHeader(PbResponseHeader); + +impl ResponseHeader { + #[inline] + pub(crate) fn new(header: PbResponseHeader) -> Self { + Self(header) + } + + #[inline] + pub fn protocol_version(&self) -> u64 { + self.0.protocol_version + } + + #[inline] + pub fn cluster_id(&self) -> u64 { + self.0.cluster_id + } + + #[inline] + pub fn error_code(&self) -> i32 { + match self.0.error.as_ref() { + Some(err) => err.code, + None => 0, + } + } + + #[inline] + pub fn error_msg(&self) -> String { + match self.0.error.as_ref() { + Some(err) => err.err_msg.clone(), + None => "None".to_string(), + } + } +} + +#[derive(Debug, Clone)] +pub struct KeyValue(PbKeyValue); + +impl KeyValue { + #[inline] + pub(crate) fn new(kv: PbKeyValue) -> Self { + Self(kv) + } + + #[inline] + pub fn key(&self) -> &[u8] { + &self.0.key + } + + #[inline] + pub fn take_key(&mut self) -> Vec { + std::mem::take(&mut self.0.key) + } + + #[inline] + pub fn value(&self) -> &[u8] { + &self.0.value + } + + #[inline] + pub fn take_value(&mut self) -> Vec { + std::mem::take(&mut self.0.value) + } +} + +#[derive(Debug, Clone)] +pub struct TableName { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, +} + +impl TableName { + pub fn new( + catalog_name: impl Into, + schema_name: impl Into, + table_name: impl Into, + ) -> Self { + Self { + catalog_name: catalog_name.into(), + schema_name: schema_name.into(), + table_name: table_name.into(), + } + } +} + +impl From for PbTableName { + fn from(tb: TableName) -> Self { + Self { + catalog_name: tb.catalog_name, + schema_name: tb.schema_name, + table_name: tb.table_name, + } + } +} + +impl From for TableName { + fn from(tb: PbTableName) -> Self { + Self { + catalog_name: tb.catalog_name, + schema_name: tb.schema_name, + table_name: tb.table_name, + } + } +} + +#[derive(Debug, Clone)] +pub struct Peer { + pub id: u64, + pub addr: String, +} + +impl From for Peer { + fn from(p: PbPeer) -> Self { + Self { + id: p.id, + addr: p.addr, + } + } +} + +impl Peer { + pub fn new(id: u64, addr: impl Into) -> Self { + Self { + id, + addr: addr.into(), + } + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::{Error, ResponseHeader as PbResponseHeader}; + + use super::*; + + #[test] + fn test_response_header_trans() { + let pb_header = PbResponseHeader { + protocol_version: 101, + cluster_id: 1, + error: Some(Error { + code: 100, + err_msg: "test".to_string(), + }), + }; + + let header = ResponseHeader::new(pb_header); + assert_eq!(101, header.protocol_version()); + assert_eq!(1, header.cluster_id()); + assert_eq!(100, header.error_code()); + assert_eq!("test".to_string(), header.error_msg()); + } +} diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs new file mode 100644 index 0000000000..e5ca8c8f14 --- /dev/null +++ b/src/meta-client/src/rpc/router.rs @@ -0,0 +1,345 @@ +use std::collections::HashMap; + +use api::v1::meta::CreateRequest as PbCreateRequest; +use api::v1::meta::Partition as PbPartition; +use api::v1::meta::Region as PbRegion; +use api::v1::meta::RouteRequest as PbRouteRequest; +use api::v1::meta::RouteResponse as PbRouteResponse; +use api::v1::meta::Table as PbTable; +use snafu::OptionExt; + +use super::Peer; +use super::TableName; +use crate::error; +use crate::error::Result; + +#[derive(Debug, Clone, Default)] +pub struct RouteRequest { + pub table_names: Vec, +} + +impl From for PbRouteRequest { + fn from(mut req: RouteRequest) -> Self { + Self { + header: None, + table_names: req.table_names.drain(..).map(Into::into).collect(), + } + } +} + +impl RouteRequest { + #[inline] + pub fn new() -> Self { + Self { + table_names: vec![], + } + } + + #[inline] + pub fn add_table_name(mut self, table_name: TableName) -> Self { + self.table_names.push(table_name); + self + } +} + +#[derive(Debug, Clone)] +pub struct CreateRequest { + pub table_name: TableName, + pub partitions: Vec, +} + +impl From for PbCreateRequest { + fn from(mut req: CreateRequest) -> Self { + Self { + header: None, + table_name: Some(req.table_name.into()), + partitions: req.partitions.drain(..).map(Into::into).collect(), + } + } +} + +impl CreateRequest { + #[inline] + pub fn new(table_name: TableName) -> Self { + Self { + table_name, + partitions: vec![], + } + } + + #[inline] + pub fn add_partition(mut self, partition: Partition) -> Self { + self.partitions.push(partition); + self + } +} + +#[derive(Debug, Clone)] +pub struct RouteResponse { + pub table_routes: Vec, +} + +impl TryFrom for RouteResponse { + type Error = error::Error; + + fn try_from(pb: PbRouteResponse) -> Result { + let peers: Vec = pb.peers.into_iter().map(Into::into).collect(); + let get_peer = |index: u64| peers.get(index as usize).map(ToOwned::to_owned); + let mut table_routes = Vec::with_capacity(pb.table_routes.len()); + for table_route in pb.table_routes.into_iter() { + let table = table_route + .table + .context(error::RouteInfoCorruptedSnafu { + err_msg: "table required", + })? + .try_into()?; + let region_routes = table_route + .region_routes + .into_iter() + .map(|region_route| { + let region = region_route.region.map(Into::into); + let leader_peer = get_peer(region_route.leader_peer_index); + let follower_peers = region_route + .follower_peer_indexes + .into_iter() + .filter_map(get_peer) + .collect::>(); + + RegionRoute { + region, + leader_peer, + follower_peers, + } + }) + .collect::>(); + + table_routes.push(TableRoute { + table, + region_routes, + }); + } + + Ok(Self { table_routes }) + } +} + +#[derive(Debug, Clone)] +pub struct TableRoute { + pub table: Table, + pub region_routes: Vec, +} + +#[derive(Debug, Clone)] +pub struct Table { + pub table_name: TableName, + pub table_schema: Vec, +} + +impl TryFrom for Table { + type Error = error::Error; + + fn try_from(t: PbTable) -> Result { + let table_name = t + .table_name + .context(error::RouteInfoCorruptedSnafu { + err_msg: "table name requied", + })? + .into(); + Ok(Self { + table_name, + table_schema: t.table_schema, + }) + } +} + +#[derive(Debug, Clone, Default)] +pub struct RegionRoute { + pub region: Option, + pub leader_peer: Option, + pub follower_peers: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct Region { + pub id: u64, + pub name: String, + pub partition: Option, + pub attrs: HashMap, +} + +impl From for Region { + fn from(r: PbRegion) -> Self { + Self { + id: r.id, + name: r.name, + partition: r.partition.map(Into::into), + attrs: r.attrs, + } + } +} + +#[derive(Debug, Clone)] +pub struct Partition { + pub column_list: Vec>, + pub value_list: Vec>, +} + +impl From for PbPartition { + fn from(p: Partition) -> Self { + Self { + column_list: p.column_list, + value_list: p.value_list, + } + } +} + +impl From for Partition { + fn from(p: PbPartition) -> Self { + Self { + column_list: p.column_list, + value_list: p.value_list, + } + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::Partition as PbPartition; + use api::v1::meta::Peer as PbPeer; + use api::v1::meta::Region as PbRegion; + use api::v1::meta::RegionRoute as PbRegionRoute; + use api::v1::meta::RouteRequest as PbRouteRequest; + use api::v1::meta::RouteResponse as PbRouteResponse; + use api::v1::meta::Table as PbTable; + use api::v1::meta::TableName as PbTableName; + use api::v1::meta::TableRoute as PbTableRoute; + + use super::*; + + #[test] + fn test_route_request_trans() { + let req = RouteRequest { + table_names: vec![ + TableName::new("c1", "s1", "t1"), + TableName::new("c2", "s2", "t2"), + ], + }; + + let into_req: PbRouteRequest = req.into(); + + assert!(into_req.header.is_none()); + assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name); + assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name); + assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name); + assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name); + assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name); + assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name); + } + + #[test] + fn test_create_request_trans() { + let req = CreateRequest { + table_name: TableName::new("c1", "s1", "t1"), + partitions: vec![ + Partition { + column_list: vec![b"c1".to_vec(), b"c2".to_vec()], + value_list: vec![b"v1".to_vec(), b"v2".to_vec()], + }, + Partition { + column_list: vec![b"c1".to_vec(), b"c2".to_vec()], + value_list: vec![b"v11".to_vec(), b"v22".to_vec()], + }, + ], + }; + + let into_req: PbCreateRequest = req.into(); + + assert!(into_req.header.is_none()); + let table_name = into_req.table_name; + assert_eq!("c1", table_name.as_ref().unwrap().catalog_name); + assert_eq!("s1", table_name.as_ref().unwrap().schema_name); + assert_eq!("t1", table_name.as_ref().unwrap().table_name); + assert_eq!( + vec![b"c1".to_vec(), b"c2".to_vec()], + into_req.partitions.get(0).unwrap().column_list + ); + assert_eq!( + vec![b"v1".to_vec(), b"v2".to_vec()], + into_req.partitions.get(0).unwrap().value_list + ); + assert_eq!( + vec![b"c1".to_vec(), b"c2".to_vec()], + into_req.partitions.get(1).unwrap().column_list + ); + assert_eq!( + vec![b"v11".to_vec(), b"v22".to_vec()], + into_req.partitions.get(1).unwrap().value_list + ); + } + + #[test] + fn test_route_response_trans() { + let res = PbRouteResponse { + header: None, + peers: vec![ + PbPeer { + id: 1, + addr: "peer1".to_string(), + }, + PbPeer { + id: 2, + addr: "peer2".to_string(), + }, + ], + table_routes: vec![PbTableRoute { + table: Some(PbTable { + table_name: Some(PbTableName { + catalog_name: "c1".to_string(), + schema_name: "s1".to_string(), + table_name: "t1".to_string(), + }), + table_schema: b"schema".to_vec(), + }), + region_routes: vec![PbRegionRoute { + region: Some(PbRegion { + id: 1, + name: "region1".to_string(), + partition: Some(PbPartition { + column_list: vec![b"c1".to_vec(), b"c2".to_vec()], + value_list: vec![b"v1".to_vec(), b"v2".to_vec()], + }), + attrs: Default::default(), + }), + leader_peer_index: 0, + follower_peer_indexes: vec![1], + }], + }], + }; + + let res: RouteResponse = res.try_into().unwrap(); + let mut table_routes = res.table_routes; + assert_eq!(1, table_routes.len()); + let table_route = table_routes.remove(0); + let table = table_route.table; + assert_eq!("c1", table.table_name.catalog_name); + assert_eq!("s1", table.table_name.schema_name); + assert_eq!("t1", table.table_name.table_name); + + let mut region_routes = table_route.region_routes; + assert_eq!(1, region_routes.len()); + let region_route = region_routes.remove(0); + let region = region_route.region.unwrap(); + assert_eq!(1, region.id); + assert_eq!("region1", region.name); + let partition = region.partition.unwrap(); + assert_eq!(vec![b"c1".to_vec(), b"c2".to_vec()], partition.column_list); + assert_eq!(vec![b"v1".to_vec(), b"v2".to_vec()], partition.value_list); + + assert_eq!(1, region_route.leader_peer.as_ref().unwrap().id); + assert_eq!("peer1", region_route.leader_peer.as_ref().unwrap().addr); + + assert_eq!(1, region_route.follower_peers.len()); + assert_eq!(2, region_route.follower_peers.get(0).unwrap().id); + assert_eq!("peer2", region_route.follower_peers.get(0).unwrap().addr); + } +} diff --git a/src/meta-client/src/rpc/store.rs b/src/meta-client/src/rpc/store.rs new file mode 100644 index 0000000000..1fbe4e9a0f --- /dev/null +++ b/src/meta-client/src/rpc/store.rs @@ -0,0 +1,452 @@ +use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse; +use api::v1::meta::PutRequest as PbPutRequest; +use api::v1::meta::PutResponse as PbPutResponse; +use api::v1::meta::RangeRequest as PbRangeRequest; +use api::v1::meta::RangeResponse as PbRangeResponse; + +use super::KeyValue; +use super::ResponseHeader; + +#[derive(Debug, Clone, Default)] +pub struct RangeRequest { + /// key is the first key for the range, If range_end is not given, the + /// request only looks up key. + pub key: Vec, + /// range_end is the upper bound on the requested range [key, range_end). + /// If range_end is '\0', the range is all keys >= key. + /// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + /// then the range request gets all keys prefixed with key. + /// If both key and range_end are '\0', then the range request returns all + /// keys. + pub range_end: Vec, + /// limit is a limit on the number of keys returned for the request. When + /// limit is set to 0, it is treated as no limit. + pub limit: i64, + /// keys_only when set returns only the keys and not the values. + pub keys_only: bool, +} + +impl From for PbRangeRequest { + fn from(req: RangeRequest) -> Self { + Self { + header: None, + key: req.key, + range_end: req.range_end, + limit: req.limit, + keys_only: req.keys_only, + } + } +} + +impl RangeRequest { + #[inline] + pub fn new() -> Self { + Self { + key: vec![], + range_end: vec![], + limit: 0, + keys_only: false, + } + } + + /// key is the first key for the range, If range_end is not given, the + /// request only looks up key. + #[inline] + pub fn with_key(mut self, key: impl Into>) -> Self { + self.key = key.into(); + self + } + + /// range_end is the upper bound on the requested range [key, range_end). + /// If range_end is '\0', the range is all keys >= key. + /// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + /// then the range request gets all keys prefixed with key. + /// If both key and range_end are '\0', then the range request returns all + /// keys. + #[inline] + pub fn with_range_end(mut self, range_end: impl Into>) -> Self { + self.range_end = range_end.into(); + self + } + + /// limit is a limit on the number of keys returned for the request. When + /// limit is set to 0, it is treated as no limit. + #[inline] + pub fn with_limit(mut self, limit: i64) -> Self { + self.limit = limit; + self + } + + /// keys_only when set returns only the keys and not the values. + #[inline] + pub fn with_keys_only(mut self) -> Self { + self.keys_only = true; + self + } +} + +#[derive(Debug, Clone)] +pub struct RangeResponse(PbRangeResponse); + +impl From for RangeResponse { + fn from(res: PbRangeResponse) -> Self { + Self::new(res) + } +} + +impl RangeResponse { + #[inline] + pub fn new(res: PbRangeResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + #[inline] + pub fn take_kvs(&mut self) -> Vec { + self.0.kvs.drain(..).map(KeyValue::new).collect() + } + + #[inline] + pub fn more(&self) -> bool { + self.0.more + } +} + +#[derive(Debug, Clone, Default)] +pub struct PutRequest { + /// key is the key, in bytes, to put into the key-value store. + pub key: Vec, + /// value is the value, in bytes, to associate with the key in the + /// key-value store. + pub value: Vec, + /// If prev_kv is set, gets the previous key-value pair before changing it. + /// The previous key-value pair will be returned in the put response. + pub prev_kv: bool, +} + +impl From for PbPutRequest { + fn from(req: PutRequest) -> Self { + Self { + header: None, + key: req.key, + value: req.value, + prev_kv: req.prev_kv, + } + } +} + +impl PutRequest { + #[inline] + pub fn new() -> Self { + Self { + key: vec![], + value: vec![], + prev_kv: false, + } + } + + /// key is the key, in bytes, to put into the key-value store. + #[inline] + pub fn with_key(mut self, key: impl Into>) -> Self { + self.key = key.into(); + self + } + + /// value is the value, in bytes, to associate with the key in the + /// key-value store. + #[inline] + pub fn with_value(mut self, value: impl Into>) -> Self { + self.value = value.into(); + self + } + + /// If prev_kv is set, gets the previous key-value pair before changing it. + /// The previous key-value pair will be returned in the put response. + #[inline] + pub fn with_prev_kv(mut self) -> Self { + self.prev_kv = true; + self + } +} + +#[derive(Debug, Clone)] +pub struct PutResponse(PbPutResponse); + +impl From for PutResponse { + fn from(res: PbPutResponse) -> Self { + Self::new(res) + } +} + +impl PutResponse { + #[inline] + pub fn new(res: PbPutResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + #[inline] + pub fn take_prev_kv(&mut self) -> Option { + self.0.prev_kv.take().map(KeyValue::new) + } +} + +#[derive(Debug, Clone, Default)] +pub struct DeleteRangeRequest { + /// key is the first key to delete in the range. + pub key: Vec, + /// range_end is the key following the last key to delete for the range + /// [key, range_end). + /// If range_end is not given, the range is defined to contain only the key + /// argument. + /// If range_end is one bit larger than the given key, then the range is all + /// the keys with the prefix (the given key). + /// If range_end is '\0', the range is all keys greater than or equal to the + /// key argument. + pub range_end: Vec, + /// If prev_kv is set, gets the previous key-value pairs before deleting it. + /// The previous key-value pairs will be returned in the delete response. + pub prev_kv: bool, + // TODO(jiachun): + // Add a "limit" in delete request? + // To avoid a huge delete block everything. +} + +impl From for PbDeleteRangeRequest { + fn from(req: DeleteRangeRequest) -> Self { + Self { + header: None, + key: req.key, + range_end: req.range_end, + prev_kv: req.prev_kv, + } + } +} + +impl DeleteRangeRequest { + #[inline] + pub fn new() -> Self { + Self { + key: vec![], + range_end: vec![], + prev_kv: false, + } + } + + /// key is the first key to delete in the range. + #[inline] + pub fn with_key(mut self, key: impl Into>) -> Self { + self.key = key.into(); + self + } + + /// range_end is the key following the last key to delete for the range + /// [key, range_end). + /// If range_end is not given, the range is defined to contain only the key + /// argument. + /// If range_end is one bit larger than the given key, then the range is all + /// the keys with the prefix (the given key). + /// If range_end is '\0', the range is all keys greater than or equal to the + /// key argument. + #[inline] + pub fn with_range_end(mut self, range_end: impl Into>) -> Self { + self.range_end = range_end.into(); + self + } + + /// If prev_kv is set, gets the previous key-value pairs before deleting it. + /// The previous key-value pairs will be returned in the delete response. + #[inline] + pub fn with_prev_kv(mut self) -> Self { + self.prev_kv = true; + self + } +} + +#[derive(Debug, Clone)] +pub struct DeleteRangeResponse(PbDeleteRangeResponse); + +impl From for DeleteRangeResponse { + fn from(res: PbDeleteRangeResponse) -> Self { + Self::new(res) + } +} + +impl DeleteRangeResponse { + #[inline] + pub fn new(res: PbDeleteRangeResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + pub fn deleted(&self) -> i64 { + self.0.deleted + } + + #[inline] + pub fn take_prev_kvs(&mut self) -> Vec { + self.0.prev_kvs.drain(..).map(KeyValue::new).collect() + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest; + use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse; + use api::v1::meta::KeyValue as PbKeyValue; + use api::v1::meta::PutRequest as PbPutRequest; + use api::v1::meta::PutResponse as PbPutResponse; + use api::v1::meta::RangeRequest as PbRangeRequest; + use api::v1::meta::RangeResponse as PbRangeResponse; + + use super::*; + + #[test] + fn test_range_request_trans() { + let (key, range_end, limit) = (b"test_key1".to_vec(), b"test_range_end1".to_vec(), 1); + + let req = RangeRequest::new() + .with_key(key.clone()) + .with_range_end(range_end.clone()) + .with_limit(limit) + .with_keys_only(); + + let into_req: PbRangeRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(key, into_req.key); + assert_eq!(range_end, into_req.range_end); + assert_eq!(limit, into_req.limit); + assert!(into_req.keys_only); + } + + #[test] + fn test_range_response_trans() { + let pb_res = PbRangeResponse { + header: None, + kvs: vec![ + PbKeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }, + PbKeyValue { + key: b"k2".to_vec(), + value: b"v2".to_vec(), + }, + ], + more: true, + }; + + let mut res = RangeResponse::new(pb_res); + assert!(res.take_header().is_none()); + assert!(res.more()); + let mut kvs = res.take_kvs(); + let kv0 = kvs.get_mut(0).unwrap(); + assert_eq!(b"k1".to_vec(), kv0.key().to_vec()); + assert_eq!(b"k1".to_vec(), kv0.take_key()); + assert_eq!(b"v1".to_vec(), kv0.value().to_vec()); + assert_eq!(b"v1".to_vec(), kv0.take_value()); + + let kv1 = kvs.get_mut(1).unwrap(); + assert_eq!(b"k2".to_vec(), kv1.key().to_vec()); + assert_eq!(b"k2".to_vec(), kv1.take_key()); + assert_eq!(b"v2".to_vec(), kv1.value().to_vec()); + assert_eq!(b"v2".to_vec(), kv1.take_value()); + } + + #[test] + fn test_put_request_trans() { + let (key, value) = (b"test_key1".to_vec(), b"test_value1".to_vec()); + + let req = PutRequest::new() + .with_key(key.clone()) + .with_value(value.clone()) + .with_prev_kv(); + + let into_req: PbPutRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(key, into_req.key); + assert_eq!(value, into_req.value); + } + + #[test] + fn test_put_response_trans() { + let pb_res = PbPutResponse { + header: None, + prev_kv: Some(PbKeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }), + }; + + let mut res = PutResponse::new(pb_res); + assert!(res.take_header().is_none()); + let mut kv = res.take_prev_kv().unwrap(); + assert_eq!(b"k1".to_vec(), kv.key().to_vec()); + assert_eq!(b"k1".to_vec(), kv.take_key()); + assert_eq!(b"v1".to_vec(), kv.value().to_vec()); + assert_eq!(b"v1".to_vec(), kv.take_value()); + } + + #[test] + fn test_delete_range_request_trans() { + let (key, range_end) = (b"test_key1".to_vec(), b"test_range_end1".to_vec()); + + let req = DeleteRangeRequest::new() + .with_key(key.clone()) + .with_range_end(range_end.clone()) + .with_prev_kv(); + + let into_req: PbDeleteRangeRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(key, into_req.key); + assert_eq!(range_end, into_req.range_end); + assert!(into_req.prev_kv); + } + + #[test] + fn test_delete_range_response_trans() { + let pb_res = PbDeleteRangeResponse { + header: None, + deleted: 2, + prev_kvs: vec![ + PbKeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }, + PbKeyValue { + key: b"k2".to_vec(), + value: b"v2".to_vec(), + }, + ], + }; + + let mut res: DeleteRangeResponse = pb_res.into(); + assert!(res.take_header().is_none()); + assert_eq!(2, res.deleted()); + let mut kvs = res.take_prev_kvs(); + let kv0 = kvs.get_mut(0).unwrap(); + assert_eq!(b"k1".to_vec(), kv0.key().to_vec()); + assert_eq!(b"k1".to_vec(), kv0.take_key()); + assert_eq!(b"v1".to_vec(), kv0.value().to_vec()); + assert_eq!(b"v1".to_vec(), kv0.take_value()); + + let kv1 = kvs.get_mut(1).unwrap(); + assert_eq!(b"k2".to_vec(), kv1.key().to_vec()); + assert_eq!(b"k2".to_vec(), kv1.take_key()); + assert_eq!(b"v2".to_vec(), kv1.value().to_vec()); + assert_eq!(b"v2".to_vec(), kv1.take_value()); + } +} diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 015c133f9d..e7d0cddbe3 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -22,7 +22,7 @@ pub async fn bootstrap_meta_srv(opts: MetaSrvOptions) -> crate::Result<()> { })?; let listener = TcpListenerStream::new(listener); - let meta_srv = MetaSrv::new(opts, kv_store); + let meta_srv = MetaSrv::new(opts, kv_store).await; tonic::transport::Server::builder() .accept_http1(true) // for admin services diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs new file mode 100644 index 0000000000..d9563b7d26 --- /dev/null +++ b/src/meta-srv/src/handler.rs @@ -0,0 +1,96 @@ +pub(crate) mod response_header; + +use std::collections::BTreeMap; +use std::sync::Arc; + +use api::v1::meta::HeartbeatRequest; +use api::v1::meta::HeartbeatResponse; +use api::v1::meta::ResponseHeader; +use common_telemetry::info; +use tokio::sync::mpsc::Sender; +use tokio::sync::RwLock; + +use crate::error::Result; +use crate::service::store::kv::KvStoreRef; + +#[async_trait::async_trait] +pub trait HeartbeatHandler: Send + Sync { + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &mut HeartbeatAccumulator, + store: KvStoreRef, + ) -> Result<()>; +} + +#[derive(Debug, Default)] +pub struct HeartbeatAccumulator { + pub header: Option, + pub states: Vec, + pub instructions: Vec, +} + +impl HeartbeatAccumulator { + pub fn into_payload(self) -> Vec> { + // TODO(jiachun): to HeartbeatResponse payload + vec![] + } +} + +#[derive(Debug)] +pub enum State {} + +#[derive(Debug)] +pub enum Instruction {} + +pub type Pusher = Sender>; + +#[derive(Clone)] +pub struct HeartbeatHandlers { + kv_store: KvStoreRef, + handlers: Arc>>>, + pushers: Arc>>, +} + +impl HeartbeatHandlers { + pub fn new(kv_store: KvStoreRef) -> Self { + Self { + kv_store, + handlers: Arc::new(RwLock::new(Default::default())), + pushers: Arc::new(RwLock::new(Default::default())), + } + } + + pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) { + let mut handlers = self.handlers.write().await; + handlers.push(Box::new(handler)); + } + + pub async fn register(&self, key: impl AsRef, pusher: Pusher) { + let mut pushers = self.pushers.write().await; + let key = key.as_ref(); + info!("Pusher register: {}", key); + pushers.insert(key.into(), pusher); + } + + pub async fn unregister(&self, key: impl AsRef) -> Option { + let mut pushers = self.pushers.write().await; + let key = key.as_ref(); + info!("Pusher unregister: {}", key); + pushers.remove(key) + } + + pub async fn handle(&self, req: HeartbeatRequest) -> Result { + let mut acc = HeartbeatAccumulator::default(); + let handlers = self.handlers.read().await; + for h in handlers.iter() { + h.handle(&req, &mut acc, self.kv_store.clone()).await?; + } + let header = std::mem::take(&mut acc.header); + let res = HeartbeatResponse { + header, + payload: acc.into_payload(), + }; + Ok(res) + } +} diff --git a/src/meta-srv/src/handler/response_header.rs b/src/meta-srv/src/handler/response_header.rs new file mode 100644 index 0000000000..94d07c6cbb --- /dev/null +++ b/src/meta-srv/src/handler/response_header.rs @@ -0,0 +1,61 @@ +use api::v1::meta::HeartbeatRequest; +use api::v1::meta::ResponseHeader; +use api::v1::meta::PROTOCOL_VERSION; + +use super::HeartbeatAccumulator; +use super::HeartbeatHandler; +use crate::error::Result; +use crate::service::store::kv::KvStoreRef; + +pub struct ResponseHeaderHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for ResponseHeaderHandler { + async fn handle( + &self, + req: &HeartbeatRequest, + acc: &mut HeartbeatAccumulator, + _store: KvStoreRef, + ) -> Result<()> { + let HeartbeatRequest { header, .. } = req; + let res_header = ResponseHeader { + protocol_version: PROTOCOL_VERSION, + cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), + ..Default::default() + }; + acc.header = Some(res_header); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::meta::{request_header, HeartbeatResponse}; + + use super::*; + use crate::service::store::noop::NoopKvStore; + + #[tokio::test] + async fn test_handle_heartbeat_resp_header() { + let kv_store = Arc::new(NoopKvStore {}); + let req = HeartbeatRequest { + header: request_header((1, 2)), + ..Default::default() + }; + let mut acc = HeartbeatAccumulator::default(); + + let response_handler = ResponseHeaderHandler {}; + response_handler + .handle(&req, &mut acc, kv_store) + .await + .unwrap(); + let header = std::mem::take(&mut acc.header); + let res = HeartbeatResponse { + header, + payload: acc.into_payload(), + }; + assert_eq!(1, res.header.unwrap().cluster_id); + } +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index e42b7ced31..740fcde1e2 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -1,5 +1,6 @@ pub mod bootstrap; pub mod error; +pub mod handler; pub mod metasrv; pub mod service; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index b490956eb6..16858fedaf 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -1,6 +1,8 @@ use serde::Deserialize; use serde::Serialize; +use crate::handler::response_header::ResponseHeaderHandler; +use crate::handler::HeartbeatHandlers; use crate::service::store::kv::KvStoreRef; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -24,18 +26,32 @@ impl Default for MetaSrvOptions { pub struct MetaSrv { options: MetaSrvOptions, kv_store: KvStoreRef, + heartbeat_handlers: HeartbeatHandlers, } impl MetaSrv { - pub fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self { - Self { options, kv_store } + pub async fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self { + let heartbeat_handlers = HeartbeatHandlers::new(kv_store.clone()); + heartbeat_handlers.add_handler(ResponseHeaderHandler).await; + Self { + options, + kv_store, + heartbeat_handlers, + } } + #[inline] pub fn options(&self) -> &MetaSrvOptions { &self.options } + #[inline] pub fn kv_store(&self) -> KvStoreRef { self.kv_store.clone() } + + #[inline] + pub fn heartbeat_handlers(&self) -> HeartbeatHandlers { + self.heartbeat_handlers.clone() + } } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 2afb3fea45..cd3829d5b8 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -1,11 +1,13 @@ use std::io::ErrorKind; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; use api::v1::meta::heartbeat_server; use api::v1::meta::AskLeaderRequest; use api::v1::meta::AskLeaderResponse; -use api::v1::meta::Endpoint; use api::v1::meta::HeartbeatRequest; use api::v1::meta::HeartbeatResponse; +use api::v1::meta::Peer; use api::v1::meta::ResponseHeader; use api::v1::meta::PROTOCOL_VERSION; use common_telemetry::error; @@ -17,13 +19,14 @@ use tonic::Request; use tonic::Response; use tonic::Streaming; -use super::store::kv::KvStoreRef; use super::GrpcResult; use super::GrpcStream; use crate::error; use crate::error::Result; use crate::metasrv::MetaSrv; +static PUSHER_ID: AtomicU64 = AtomicU64::new(0); + #[async_trait::async_trait] impl heartbeat_server::Heartbeat for MetaSrv { type HeartbeatStream = GrpcStream; @@ -34,19 +37,29 @@ impl heartbeat_server::Heartbeat for MetaSrv { ) -> GrpcResult { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); - - let kv_store = self.kv_store(); + let handlers = self.heartbeat_handlers(); common_runtime::spawn_bg(async move { + let mut pusher_key = None; while let Some(msg) = in_stream.next().await { match msg { - Ok(req) => tx - .send( - handle_heartbeat(req, kv_store.clone()) - .await - .map_err(|e| e.into()), - ) - .await - .expect("working rx"), + Ok(req) => { + if pusher_key.is_none() { + if let Some(ref peer) = req.peer { + let key = format!( + "{}-{}-{}", + peer.addr, + peer.id, + PUSHER_ID.fetch_add(1, Ordering::Relaxed) + ); + handlers.register(&key, tx.clone()).await; + pusher_key = Some(key); + } + } + + tx.send(handlers.handle(req).await.map_err(|e| e.into())) + .await + .expect("working rx"); + } Err(err) => { if let Some(io_err) = error::match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { @@ -63,12 +76,18 @@ impl heartbeat_server::Heartbeat for MetaSrv { } } } - info!("Heartbeat stream broken: {:?}", in_stream); + info!( + "Heartbeat stream broken: {:?}", + pusher_key.as_ref().unwrap_or(&"unknow".to_string()) + ); + if let Some(key) = pusher_key { + let _ = handlers.unregister(&key); + } }); let out_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(out_stream) as Self::HeartbeatStream)) + Ok(Response::new(Box::pin(out_stream))) } async fn ask_leader(&self, req: Request) -> GrpcResult { @@ -93,7 +112,8 @@ impl MetaSrv { // TODO(jiachun): return leader let res = AskLeaderResponse { header: Some(res_header), - leader: Some(Endpoint { + leader: Some(Peer { + id: 0, addr: self.options().server_addr.clone(), }), }; @@ -102,28 +122,6 @@ impl MetaSrv { } } -async fn handle_heartbeat( - req: HeartbeatRequest, - _kv_store: KvStoreRef, -) -> Result { - let HeartbeatRequest { header, .. } = req; - - let res_header = ResponseHeader { - protocol_version: PROTOCOL_VERSION, - cluster_id: header.map_or(0, |h| h.cluster_id), - ..Default::default() - }; - - // TODO(jiachun) Do something high-end - - let res = HeartbeatResponse { - header: Some(res_header), - ..Default::default() - }; - - Ok(res) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -134,48 +132,16 @@ mod tests { use super::*; use crate::metasrv::MetaSrvOptions; - use crate::service::store::kv::KvStore; - - #[derive(Clone)] - pub struct NoopKvStore; - - #[async_trait::async_trait] - impl KvStore for NoopKvStore { - async fn range(&self, _req: RangeRequest) -> crate::Result { - unreachable!() - } - - async fn put(&self, _req: PutRequest) -> crate::Result { - unreachable!() - } - - async fn delete_range( - &self, - _req: DeleteRangeRequest, - ) -> crate::Result { - unreachable!() - } - } - - #[tokio::test] - async fn test_handle_heartbeat_resp_header() { - let kv_store = Arc::new(NoopKvStore {}); - - let header = RequestHeader::new(1, 2); - let req = HeartbeatRequest::new(header); - - let res = handle_heartbeat(req, kv_store).await.unwrap(); - - assert_eq!(1, res.header.unwrap().cluster_id); - } + use crate::service::store::noop::NoopKvStore; #[tokio::test] async fn test_ask_leader() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; - let header = RequestHeader::new(1, 1); - let req = AskLeaderRequest::new(header); + let req = AskLeaderRequest { + header: request_header((1, 1)), + }; let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); let res = res.into_inner(); diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index efb599430a..5473b18775 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -41,7 +41,10 @@ async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result crate::Result { - unreachable!() - } - - async fn put(&self, _req: PutRequest) -> crate::Result { - unreachable!() - } - - async fn delete_range( - &self, - _req: DeleteRangeRequest, - ) -> crate::Result { - unreachable!() - } - } + use crate::service::store::noop::NoopKvStore; #[should_panic] #[tokio::test] async fn test_handle_route() { - let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; - let header = RequestHeader::new(1, 1); - let req = RouteRequest::new(header); + let req = RouteRequest { + header: request_header((1, 1)), + ..Default::default() + }; let req = req .add_table(TableName::new("catalog1", "schema1", "table1")) .add_table(TableName::new("catalog1", "schema1", "table2")) @@ -99,12 +84,15 @@ mod tests { #[tokio::test] async fn test_handle_create() { - let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; - let header = RequestHeader::new(1, 1); let table_name = TableName::new("test_catalog", "test_db", "table1"); - let req = CreateRequest::new(header, table_name); + let req = CreateRequest { + header: request_header((1, 1)), + table_name: Some(table_name), + ..Default::default() + }; let p0 = Partition::new() .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) @@ -119,7 +107,7 @@ mod tests { let res = meta_srv.create(req.into_request()).await.unwrap(); for r in res.into_inner().peers { - assert_eq!("127.0.0.1:3000", r.endpoint.unwrap().addr); + assert_eq!("127.0.0.1:3000", r.addr); } } } diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index b4ad937fcb..5cef83aa24 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -1,5 +1,6 @@ pub mod etcd; pub mod kv; +pub(crate) mod noop; use api::v1::meta::store_server; use api::v1::meta::DeleteRangeRequest; @@ -51,32 +52,12 @@ mod tests { use super::*; use crate::metasrv::MetaSrvOptions; - use crate::service::store::kv::KvStore; - - struct MockKvStore; - - #[async_trait::async_trait] - impl KvStore for MockKvStore { - async fn range(&self, _req: RangeRequest) -> crate::Result { - Ok(RangeResponse::default()) - } - - async fn put(&self, _req: PutRequest) -> crate::Result { - Ok(PutResponse::default()) - } - - async fn delete_range( - &self, - _req: DeleteRangeRequest, - ) -> crate::Result { - Ok(DeleteRangeResponse::default()) - } - } + use crate::service::store::noop::NoopKvStore; #[tokio::test] async fn test_range() { - let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; let req = RangeRequest::default(); let res = meta_srv.range(req.into_request()).await; @@ -85,8 +66,8 @@ mod tests { #[tokio::test] async fn test_put() { - let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; let req = PutRequest::default(); let res = meta_srv.put(req.into_request()).await; @@ -95,8 +76,8 @@ mod tests { #[tokio::test] async fn test_delete_range() { - let kv_store = Arc::new(MockKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store); + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; let req = DeleteRangeRequest::default(); let res = meta_srv.delete_range(req.into_request()).await; diff --git a/src/meta-srv/src/service/store/noop.rs b/src/meta-srv/src/service/store/noop.rs new file mode 100644 index 0000000000..c4852003d6 --- /dev/null +++ b/src/meta-srv/src/service/store/noop.rs @@ -0,0 +1,29 @@ +use api::v1::meta::DeleteRangeRequest; +use api::v1::meta::DeleteRangeResponse; +use api::v1::meta::PutRequest; +use api::v1::meta::PutResponse; +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; + +use super::kv::KvStore; +use crate::error::Result; + +/// A noop kv_store which only for test +// TODO(jiachun): Add a test feature +#[derive(Clone)] +pub struct NoopKvStore; + +#[async_trait::async_trait] +impl KvStore for NoopKvStore { + async fn range(&self, _req: RangeRequest) -> Result { + Ok(RangeResponse::default()) + } + + async fn put(&self, _req: PutRequest) -> Result { + Ok(PutResponse::default()) + } + + async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { + Ok(DeleteRangeResponse::default()) + } +}