diff --git a/Cargo.lock b/Cargo.lock index 5d3aae350f..575c657d5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2718,11 +2718,15 @@ dependencies = [ "common-error", "common-runtime", "common-telemetry", + "common-time", "etcd-client", "futures", "h2", "http-body", + "lazy_static", + "regex", "serde", + "serde_json", "snafu", "tokio", "tokio-stream", diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 2d3da3f29f..00ca366dc0 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -1,3 +1,4 @@ bind_addr = '127.0.0.1:3002' server_addr = '0.0.0.0:3002' store_addr = '127.0.0.1:2380' +datanode_lease_secs = 30 diff --git a/src/api/greptime/v1/meta/store.proto b/src/api/greptime/v1/meta/store.proto index acb2312ab1..c710c2928f 100644 --- a/src/api/greptime/v1/meta/store.proto +++ b/src/api/greptime/v1/meta/store.proto @@ -11,6 +11,13 @@ service Store { // Put puts the given key into the key-value store. rpc Put(PutRequest) returns (PutResponse); + // BatchPut atomically puts the given keys into the key-value store. + rpc BatchPut(BatchPutRequest) returns (BatchPutResponse); + + // CompareAndPut atomically puts the value to the given updated + // value if the current value == the expected value. + rpc CompareAndPut(CompareAndPutRequest) returns (CompareAndPutResponse); + // DeleteRange deletes the given range from the key-value store. rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse); } @@ -64,6 +71,42 @@ message PutResponse { KeyValue prev_kv = 2; } +message BatchPutRequest { + RequestHeader header = 1; + + repeated KeyValue kvs = 2; + // If prev_kv is set, gets the previous key-value pairs before changing it. + // The previous key-value pairs will be returned in the batch put response. + bool prev_kv = 3; +} + +message BatchPutResponse { + ResponseHeader header = 1; + + // If prev_kv is set in the request, the previous key-value pairs will be + // returned. + repeated KeyValue prev_kvs = 2; +} + +message CompareAndPutRequest { + RequestHeader header = 1; + + // key is the key, in bytes, to put into the key-value store. + bytes key = 2; + // expect is the previous value, in bytes + bytes expect = 3; + // value is the value, in bytes, to associate with the key in the + // key-value store. + bytes value = 4; +} + +message CompareAndPutResponse { + ResponseHeader header = 1; + + bool success = 2; + KeyValue prev_kv = 3; +} + message DeleteRangeRequest { RequestHeader header = 1; diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs index ecde5a81e5..cddcdc2571 100644 --- a/src/api/src/v1/meta.rs +++ b/src/api/src/v1/meta.rs @@ -2,18 +2,34 @@ tonic::include_proto!("greptime.v1.meta"); pub const PROTOCOL_VERSION: u64 = 1; -pub const fn request_header((cluster_id, member_id): (u64, u64)) -> Option { - Some(RequestHeader::new((cluster_id, member_id))) -} - impl RequestHeader { #[inline] - pub const fn new((cluster_id, member_id): (u64, u64)) -> Self { - Self { + pub fn new((cluster_id, member_id): (u64, u64)) -> Option { + Some(Self { protocol_version: PROTOCOL_VERSION, cluster_id, member_id, - } + }) + } +} + +impl ResponseHeader { + #[inline] + pub fn success(cluster_id: u64) -> Option { + Some(Self { + protocol_version: PROTOCOL_VERSION, + cluster_id, + ..Default::default() + }) + } + + #[inline] + pub fn failed(cluster_id: u64, error: Error) -> Option { + Some(Self { + protocol_version: PROTOCOL_VERSION, + cluster_id, + error: Some(error), + }) } } diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 28c3630cf3..0e98b5419a 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -117,5 +117,6 @@ mod tests { assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr); assert_eq!("0.0.0.0:3002".to_string(), options.server_addr); assert_eq!("127.0.0.1:2380".to_string(), options.store_addr); + assert_eq!(30, options.datanode_lease_secs); } } diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index bdb97460cc..302950bcdf 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -5,6 +5,8 @@ use api::v1::meta::Peer; use common_grpc::channel_manager::ChannelConfig; use common_grpc::channel_manager::ChannelManager; use meta_client::client::MetaClientBuilder; +use meta_client::rpc::BatchPutRequest; +use meta_client::rpc::CompareAndPutRequest; use meta_client::rpc::CreateRequest; use meta_client::rpc::DeleteRangeRequest; use meta_client::rpc::Partition; @@ -90,10 +92,40 @@ async fn run() { event!(Level::INFO, "put result: {:#?}", res); // get - let range = RangeRequest::new().with_key(b"key2".to_vec()); + let range = RangeRequest::new().with_key(b"key1".to_vec()); let res = meta_client.range(range.clone()).await.unwrap(); event!(Level::INFO, "get range result: {:#?}", res); + // get prefix + let range2 = RangeRequest::new().with_prefix(b"key1".to_vec()); + let res = meta_client.range(range2.clone()).await.unwrap(); + event!(Level::INFO, "get prefix result: {:#?}", res); + + // batch put + let batch_put = BatchPutRequest::new() + .add_kv(b"batch_put1".to_vec(), b"batch_put_v1".to_vec()) + .add_kv(b"batch_put2".to_vec(), b"batch_put_v2".to_vec()) + .with_prev_kv(); + let res = meta_client.batch_put(batch_put).await.unwrap(); + event!(Level::INFO, "batch put result: {:#?}", res); + + // cas + let cas = CompareAndPutRequest::new() + .with_key(b"batch_put1".to_vec()) + .with_expect(b"batch_put_v_fail".to_vec()) + .with_value(b"batch_put_v111".to_vec()); + + let res = meta_client.compare_and_put(cas).await.unwrap(); + event!(Level::INFO, "cas 0 result: {:#?}", res); + + let cas = CompareAndPutRequest::new() + .with_key(b"batch_put1".to_vec()) + .with_expect(b"batch_put_v1".to_vec()) + .with_value(b"batch_put_v111".to_vec()); + + let res = meta_client.compare_and_put(cas).await.unwrap(); + event!(Level::INFO, "cas 1 result: {:#?}", res); + // delete let delete_range = DeleteRangeRequest::new().with_key(b"key1".to_vec()); let res = meta_client.delete_range(delete_range).await.unwrap(); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 77c4d30eea..f9e697dc7d 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -15,6 +15,10 @@ use self::heartbeat::HeartbeatSender; use self::heartbeat::HeartbeatStream; use crate::error; use crate::error::Result; +use crate::rpc::BatchPutRequest; +use crate::rpc::BatchPutResponse; +use crate::rpc::CompareAndPutRequest; +use crate::rpc::CompareAndPutResponse; use crate::rpc::CreateRequest; use crate::rpc::DeleteRangeRequest; use crate::rpc::DeleteRangeResponse; @@ -217,8 +221,8 @@ impl MetaClient { name: "store_client", })? .range(req.into()) - .await - .map(Into::into) + .await? + .try_into() } /// Put puts the given key into the key-value store. @@ -228,8 +232,34 @@ impl MetaClient { name: "store_client", })? .put(req.into()) - .await - .map(Into::into) + .await? + .try_into() + } + + /// BatchPut atomically puts the given keys into the key-value store. + pub async fn batch_put(&self, req: BatchPutRequest) -> Result { + self.store_client() + .context(error::NotStartedSnafu { + name: "store_client", + })? + .batch_put(req.into()) + .await? + .try_into() + } + + /// CompareAndPut atomically puts the value to the given updated + /// value if the current value == the expected value. + pub async fn compare_and_put( + &self, + req: CompareAndPutRequest, + ) -> Result { + self.store_client() + .context(error::NotStartedSnafu { + name: "store_client", + })? + .compare_and_put(req.into()) + .await? + .try_into() } /// DeleteRange deletes the given range from the key-value store. @@ -239,8 +269,8 @@ impl MetaClient { name: "store_client", })? .delete_range(req.into()) - .await - .map(Into::into) + .await? + .try_into() } #[inline] @@ -318,49 +348,38 @@ mod tests { #[tokio::test] async fn test_not_start_heartbeat_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0) .enable_router() .enable_store() .build(); - meta_client.start(urls).await.unwrap(); - let res = meta_client.ask_leader().await; - assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } #[tokio::test] async fn test_not_start_router_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0) .enable_heartbeat() .enable_store() .build(); - meta_client.start(urls).await.unwrap(); - let req = CreateRequest::new(TableName::new("c", "s", "t")); let res = meta_client.create_route(req).await; - assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } #[tokio::test] async fn test_not_start_store_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; - let mut meta_client = MetaClientBuilder::new(0, 0) .enable_heartbeat() .enable_router() .build(); meta_client.start(urls).await.unwrap(); - let res = meta_client.put(PutRequest::default()).await; - assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index a6388e3352..8bed9dee4e 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -2,10 +2,10 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; -use api::v1::meta::request_header; use api::v1::meta::AskLeaderRequest; use api::v1::meta::HeartbeatRequest; use api::v1::meta::HeartbeatResponse; +use api::v1::meta::RequestHeader; use common_grpc::channel_manager::ChannelManager; use common_telemetry::debug; use common_telemetry::info; @@ -40,7 +40,7 @@ impl HeartbeatSender { #[inline] pub async fn send(&self, mut req: HeartbeatRequest) -> Result<()> { - req.header = request_header(self.id); + req.header = RequestHeader::new(self.id); self.sender.send(req).await.map_err(|e| { error::SendHeartbeatSnafu { err_msg: e.to_string(), @@ -154,7 +154,7 @@ impl Inner { } ); - let header = request_header(self.id); + let header = RequestHeader::new(self.id); let mut leader = None; for addr in &self.peers { let req = AskLeaderRequest { @@ -182,9 +182,8 @@ impl Inner { let mut leader = self.make_client(leader)?; let (sender, receiver) = mpsc::channel::(128); - let header = request_header(self.id); let handshake = HeartbeatRequest { - header, + header: RequestHeader::new(self.id), ..Default::default() }; sender.send(handshake).await.map_err(|e| { @@ -236,14 +235,11 @@ mod test { #[tokio::test] async fn test_start_client() { let mut client = Client::new((0, 0), ChannelManager::default()); - assert!(!client.is_started().await); - client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); } @@ -254,13 +250,9 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); - let res = client.start(&["127.0.0.1:1002"]).await; - assert!(res.is_err()); - assert!(matches!( res.err(), Some(error::Error::IllegalGrpcClientState { .. }) @@ -274,48 +266,18 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await .unwrap(); - assert_eq!(1, client.inner.write().await.peers.len()); } - #[tokio::test] - async fn test_ask_leader_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unavailable_peer"]).await.unwrap(); - - let res = client.ask_leader().await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!(matches!(err, error::Error::AskLeader { .. })); - } - - #[tokio::test] - async fn test_heartbeat_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unavailable_peer"]).await.unwrap(); - client.inner.write().await.leader = Some("unavailable".to_string()); - - let res = client.heartbeat().await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!(matches!(err, error::Error::TonicStatus { .. })); - } - #[tokio::test] async fn test_heartbeat_stream() { let (sender, mut receiver) = mpsc::channel::(100); let sender = HeartbeatSender::new((8, 8), sender); - tokio::spawn(async move { for _ in 0..10 { sender.send(HeartbeatRequest::default()).await.unwrap(); } }); - while let Some(req) = receiver.recv().await { let header = req.header.unwrap(); assert_eq!(8, header.cluster_id); diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index 0f16963ef1..d5ad3c03d4 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -1,9 +1,9 @@ use std::collections::HashSet; use std::sync::Arc; -use api::v1::meta::request_header; use api::v1::meta::router_client::RouterClient; use api::v1::meta::CreateRequest; +use api::v1::meta::RequestHeader; use api::v1::meta::RouteRequest; use api::v1::meta::RouteResponse; use common_grpc::channel_manager::ChannelManager; @@ -92,7 +92,7 @@ impl Inner { async fn route(&self, mut req: RouteRequest) -> Result { let mut client = self.random_client()?; - req.header = request_header(self.id); + req.header = RequestHeader::new(self.id); let res = client.route(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -100,7 +100,7 @@ impl Inner { async fn create(&self, mut req: CreateRequest) -> Result { let mut client = self.random_client()?; - req.header = request_header(self.id); + req.header = RequestHeader::new(self.id); let res = client.create(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -139,14 +139,11 @@ mod test { #[tokio::test] async fn test_start_client() { let mut client = Client::new((0, 0), ChannelManager::default()); - assert!(!client.is_started().await); - client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); } @@ -157,13 +154,9 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); - let res = client.start(&["127.0.0.1:1002"]).await; - assert!(res.is_err()); - assert!(matches!( res.err(), Some(error::Error::IllegalGrpcClientState { .. }) @@ -180,42 +173,4 @@ mod test { assert_eq!(1, client.inner.write().await.peers.len()); } - - #[tokio::test] - async fn test_create_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unavailable_peer"]).await.unwrap(); - - let req = CreateRequest { - header: request_header((0, 0)), - ..Default::default() - }; - let res = client.create(req).await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!( - matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) - ); - } - - #[tokio::test] - async fn test_route_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unavailable_peer"]).await.unwrap(); - - let req = RouteRequest { - header: request_header((0, 0)), - ..Default::default() - }; - let res = client.route(req).await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!( - matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) - ); - } } diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index b1353b3ba5..bbaa46043e 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -1,14 +1,18 @@ use std::collections::HashSet; use std::sync::Arc; -use api::v1::meta::request_header; use api::v1::meta::store_client::StoreClient; +use api::v1::meta::BatchPutRequest; +use api::v1::meta::BatchPutResponse; +use api::v1::meta::CompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; use api::v1::meta::PutRequest; use api::v1::meta::PutResponse; use api::v1::meta::RangeRequest; use api::v1::meta::RangeResponse; +use api::v1::meta::RequestHeader; use common_grpc::channel_manager::ChannelManager; use snafu::ensure; use snafu::OptionExt; @@ -61,6 +65,19 @@ impl Client { inner.put(req).await } + pub async fn batch_put(&self, req: BatchPutRequest) -> Result { + let inner = self.inner.read().await; + inner.batch_put(req).await + } + + pub async fn compare_and_put( + &self, + req: CompareAndPutRequest, + ) -> Result { + let inner = self.inner.read().await; + inner.compare_and_put(req).await + } + pub async fn delete_range(&self, req: DeleteRangeRequest) -> Result { let inner = self.inner.read().await; inner.delete_range(req).await @@ -100,7 +117,7 @@ impl Inner { async fn range(&self, mut req: RangeRequest) -> Result { let mut client = self.random_client()?; - req.header = request_header(self.id); + req.header = RequestHeader::new(self.id); let res = client.range(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) @@ -108,15 +125,40 @@ impl Inner { async fn put(&self, mut req: PutRequest) -> Result { let mut client = self.random_client()?; - req.header = request_header(self.id); + req.header = RequestHeader::new(self.id); let res = client.put(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) } + async fn batch_put(&self, mut req: BatchPutRequest) -> Result { + let mut client = self.random_client()?; + req.header = RequestHeader::new(self.id); + let res = client + .batch_put(req) + .await + .context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + + async fn compare_and_put( + &self, + mut req: CompareAndPutRequest, + ) -> Result { + let mut client = self.random_client()?; + req.header = RequestHeader::new(self.id); + let res = client + .compare_and_put(req) + .await + .context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result { let mut client = self.random_client()?; - req.header = request_header(self.id); + req.header = RequestHeader::new(self.id); let res = client .delete_range(req) .await @@ -158,14 +200,11 @@ mod test { #[tokio::test] async fn test_start_client() { let mut client = Client::new((0, 0), ChannelManager::default()); - assert!(!client.is_started().await); - client .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); } @@ -176,13 +215,9 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1001"]) .await .unwrap(); - assert!(client.is_started().await); - let res = client.start(&["127.0.0.1:1002"]).await; - assert!(res.is_err()); - assert!(matches!( res.err(), Some(error::Error::IllegalGrpcClientState { .. }) @@ -196,66 +231,6 @@ mod test { .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) .await .unwrap(); - assert_eq!(1, client.inner.write().await.peers.len()); } - - #[tokio::test] - async fn test_range_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unknow_peer"]).await.unwrap(); - - let req = RangeRequest { - key: b"key1".to_vec(), - ..Default::default() - }; - let res = client.range(req).await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!( - matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) - ); - } - - #[tokio::test] - async fn test_put_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unavailable_peer"]).await.unwrap(); - - let req = PutRequest { - key: b"key1".to_vec(), - value: b"value1".to_vec(), - prev_kv: true, - ..Default::default() - }; - let res = client.put(req).await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!( - matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) - ); - } - - #[tokio::test] - async fn test_delete_range_unavailable() { - let mut client = Client::new((0, 0), ChannelManager::default()); - client.start(&["unavailable_peer"]).await.unwrap(); - - let req = DeleteRangeRequest { - key: b"key1".to_vec(), - ..Default::default() - }; - let res = client.delete_range(req).await; - - assert!(res.is_err()); - - let err = res.err().unwrap(); - assert!( - matches!(err, error::Error::TonicStatus { source, .. } if source.code() == tonic::Code::Unavailable) - ); - } } diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index fac406ff7e..af857aaf7d 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -51,6 +51,13 @@ pub enum Error { err_msg: String, backtrace: Backtrace, }, + + #[snafu(display("Illegal state from server, code: {}, error: {}", code, err_msg))] + IllegalServerState { + code: i32, + err_msg: String, + backtrace: Backtrace, + }, } #[allow(dead_code)] @@ -75,7 +82,8 @@ impl ErrorExt for Error { | Error::NotStarted { .. } | Error::SendHeartbeat { .. } | Error::CreateHeartbeatStream { .. } - | Error::CreateChannel { .. } => StatusCode::Internal, + | Error::CreateChannel { .. } + | Error::IllegalServerState { .. } => StatusCode::Internal, Error::RouteInfoCorrupted { .. } => StatusCode::Unexpected, } } @@ -96,12 +104,10 @@ mod tests { fn throw_tonic_error() -> StdResult { tonic::transport::Endpoint::new("http//http").map(|_| ()) } - let e = throw_tonic_error() .context(ConnectFailedSnafu { url: "" }) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -112,7 +118,6 @@ mod tests { .context(IllegalGrpcClientStateSnafu { err_msg: "" }) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -122,12 +127,10 @@ mod tests { fn throw_tonic_status_error() -> StdResult { Err(tonic::Status::new(tonic::Code::Aborted, "")) } - let e = throw_tonic_status_error() .context(TonicStatusSnafu) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -135,7 +138,6 @@ mod tests { #[test] fn test_ask_leader_error() { let e = throw_none_option().context(AskLeaderSnafu).err().unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -143,7 +145,6 @@ mod tests { #[test] fn test_no_leader_error() { let e = throw_none_option().context(NoLeaderSnafu).err().unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -155,12 +156,20 @@ mod tests { .map(|_| ()) .context(common_grpc::error::CreateChannelSnafu) } - let e = throw_common_grpc_error() .context(CreateChannelSnafu) .err() .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + #[test] + fn test_not_started_error() { + let e = throw_none_option() + .context(NotStartedSnafu { name: "" }) + .err() + .unwrap(); assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -171,7 +180,6 @@ mod tests { .context(SendHeartbeatSnafu { err_msg: "" }) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -197,4 +205,18 @@ mod tests { assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Unexpected); } + + #[test] + fn test_illegal_server_state_error() { + let e = throw_none_option() + .context(IllegalServerStateSnafu { + code: 1, + err_msg: "", + }) + .err() + .unwrap(); + + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } } diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs index 721cea6825..0e34f0057c 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/meta-client/src/rpc.rs @@ -1,5 +1,6 @@ mod router; mod store; +mod util; use api::v1::meta::KeyValue as PbKeyValue; use api::v1::meta::Peer as PbPeer; @@ -12,6 +13,10 @@ pub use router::RouteRequest; pub use router::RouteResponse; pub use router::Table; pub use router::TableRoute; +pub use store::BatchPutRequest; +pub use store::BatchPutResponse; +pub use store::CompareAndPutRequest; +pub use store::CompareAndPutResponse; pub use store::DeleteRangeRequest; pub use store::DeleteRangeResponse; pub use store::PutRequest; diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index e5ca8c8f14..024aa9da92 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -8,6 +8,7 @@ use api::v1::meta::RouteResponse as PbRouteResponse; use api::v1::meta::Table as PbTable; use snafu::OptionExt; +use super::util; use super::Peer; use super::TableName; use crate::error; @@ -83,6 +84,8 @@ impl TryFrom for RouteResponse { type Error = error::Error; fn try_from(pb: PbRouteResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + let peers: Vec = pb.peers.into_iter().map(Into::into).collect(); let get_peer = |index: u64| peers.get(index as usize).map(ToOwned::to_owned); let mut table_routes = Vec::with_capacity(pb.table_routes.len()); diff --git a/src/meta-client/src/rpc/store.rs b/src/meta-client/src/rpc/store.rs index 1fbe4e9a0f..343c1e1d93 100644 --- a/src/meta-client/src/rpc/store.rs +++ b/src/meta-client/src/rpc/store.rs @@ -1,12 +1,20 @@ +use api::v1::meta::BatchPutRequest as PbBatchPutRequest; +use api::v1::meta::BatchPutResponse as PbBatchPutResponse; +use api::v1::meta::CompareAndPutRequest as PbCompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse as PbCompareAndPutResponse; use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest; use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse; +use api::v1::meta::KeyValue as PbKeyValue; use api::v1::meta::PutRequest as PbPutRequest; use api::v1::meta::PutResponse as PbPutResponse; use api::v1::meta::RangeRequest as PbRangeRequest; use api::v1::meta::RangeResponse as PbRangeResponse; +use super::util; use super::KeyValue; use super::ResponseHeader; +use crate::error; +use crate::error::Result; #[derive(Debug, Clone, Default)] pub struct RangeRequest { @@ -58,6 +66,9 @@ impl RangeRequest { self } + /// key is the first key for the range, If range_end is not given, the + /// request only looks up key. + /// /// range_end is the upper bound on the requested range [key, range_end). /// If range_end is '\0', the range is all keys >= key. /// If range_end is key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), @@ -65,11 +76,21 @@ impl RangeRequest { /// If both key and range_end are '\0', then the range request returns all /// keys. #[inline] - pub fn with_range_end(mut self, range_end: impl Into>) -> Self { + pub fn with_range(mut self, key: impl Into>, range_end: impl Into>) -> Self { + self.key = key.into(); self.range_end = range_end.into(); self } + /// Gets all keys prefixed with key. + /// range_end is the key plus one (e.g., "aa"+1 == "ab", "a\xff"+1 == "b"), + #[inline] + pub fn with_prefix(mut self, key: impl Into>) -> Self { + self.key = key.into(); + self.range_end = util::get_prefix_end_key(&self.key); + self + } + /// limit is a limit on the number of keys returned for the request. When /// limit is set to 0, it is treated as no limit. #[inline] @@ -89,9 +110,13 @@ impl RangeRequest { #[derive(Debug, Clone)] pub struct RangeResponse(PbRangeResponse); -impl From for RangeResponse { - fn from(res: PbRangeResponse) -> Self { - Self::new(res) +impl TryFrom for RangeResponse { + type Error = error::Error; + + fn try_from(pb: PbRangeResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self::new(pb)) } } @@ -177,9 +202,13 @@ impl PutRequest { #[derive(Debug, Clone)] pub struct PutResponse(PbPutResponse); -impl From for PutResponse { - fn from(res: PbPutResponse) -> Self { - Self::new(res) +impl TryFrom for PutResponse { + type Error = error::Error; + + fn try_from(pb: PbPutResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self::new(pb)) } } @@ -200,6 +229,170 @@ impl PutResponse { } } +#[derive(Debug, Clone, Default)] +pub struct BatchPutRequest { + pub kvs: Vec, + /// If prev_kv is set, gets the previous key-value pairs before changing it. + /// The previous key-value pairs will be returned in the batch put response. + pub prev_kv: bool, +} + +impl From for PbBatchPutRequest { + fn from(req: BatchPutRequest) -> Self { + Self { + header: None, + kvs: req.kvs, + prev_kv: req.prev_kv, + } + } +} + +impl BatchPutRequest { + #[inline] + pub fn new() -> Self { + Self { + kvs: vec![], + prev_kv: false, + } + } + + #[inline] + pub fn add_kv(mut self, key: impl Into>, value: impl Into>) -> Self { + self.kvs.push(PbKeyValue { + key: key.into(), + value: value.into(), + }); + self + } + + /// If prev_kv is set, gets the previous key-value pair before changing it. + /// The previous key-value pair will be returned in the put response. + #[inline] + pub fn with_prev_kv(mut self) -> Self { + self.prev_kv = true; + self + } +} + +#[derive(Debug, Clone)] +pub struct BatchPutResponse(PbBatchPutResponse); + +impl TryFrom for BatchPutResponse { + type Error = error::Error; + + fn try_from(pb: PbBatchPutResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self::new(pb)) + } +} + +impl BatchPutResponse { + #[inline] + pub fn new(res: PbBatchPutResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + #[inline] + pub fn take_prev_kvs(&mut self) -> Vec { + self.0.prev_kvs.drain(..).map(KeyValue::new).collect() + } +} + +#[derive(Debug, Clone, Default)] +pub struct CompareAndPutRequest { + /// key is the key, in bytes, to put into the key-value store. + pub key: Vec, + pub expect: Vec, + /// value is the value, in bytes, to associate with the key in the + /// key-value store. + pub value: Vec, +} + +impl From for PbCompareAndPutRequest { + fn from(req: CompareAndPutRequest) -> Self { + Self { + header: None, + key: req.key, + expect: req.expect, + value: req.value, + } + } +} + +impl CompareAndPutRequest { + #[inline] + pub fn new() -> Self { + Self { + key: vec![], + expect: vec![], + value: vec![], + } + } + + /// key is the key, in bytes, to put into the key-value store. + #[inline] + pub fn with_key(mut self, key: impl Into>) -> Self { + self.key = key.into(); + self + } + + /// expect is the previous value, in bytes + #[inline] + pub fn with_expect(mut self, expect: impl Into>) -> Self { + self.expect = expect.into(); + self + } + + /// value is the value, in bytes, to associate with the key in the + /// key-value store. + #[inline] + pub fn with_value(mut self, value: impl Into>) -> Self { + self.value = value.into(); + self + } +} + +#[derive(Debug, Clone)] +pub struct CompareAndPutResponse(PbCompareAndPutResponse); + +impl TryFrom for CompareAndPutResponse { + type Error = error::Error; + + fn try_from(pb: PbCompareAndPutResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self::new(pb)) + } +} + +impl CompareAndPutResponse { + #[inline] + pub fn new(res: PbCompareAndPutResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + #[inline] + pub fn is_success(&self) -> bool { + self.0.success + } + + #[inline] + pub fn take_prev_kv(&mut self) -> Option { + self.0.prev_kv.take().map(KeyValue::new) + } +} + #[derive(Debug, Clone, Default)] pub struct DeleteRangeRequest { /// key is the first key to delete in the range. @@ -242,13 +435,16 @@ impl DeleteRangeRequest { } } - /// key is the first key to delete in the range. + /// key is the first key to delete in the range. If range_end is not given, + /// the range is defined to contain only the key argument. #[inline] pub fn with_key(mut self, key: impl Into>) -> Self { self.key = key.into(); self } + /// key is the first key to delete in the range. + /// /// range_end is the key following the last key to delete for the range /// [key, range_end). /// If range_end is not given, the range is defined to contain only the key @@ -258,11 +454,21 @@ impl DeleteRangeRequest { /// If range_end is '\0', the range is all keys greater than or equal to the /// key argument. #[inline] - pub fn with_range_end(mut self, range_end: impl Into>) -> Self { + pub fn with_range(mut self, key: impl Into>, range_end: impl Into>) -> Self { + self.key = key.into(); self.range_end = range_end.into(); self } + /// Deletes all keys prefixed with key. + /// range_end is one bit larger than the given key. + #[inline] + pub fn with_prefix(mut self, key: impl Into>) -> Self { + self.key = key.into(); + self.range_end = util::get_prefix_end_key(&self.key); + self + } + /// If prev_kv is set, gets the previous key-value pairs before deleting it. /// The previous key-value pairs will be returned in the delete response. #[inline] @@ -275,9 +481,13 @@ impl DeleteRangeRequest { #[derive(Debug, Clone)] pub struct DeleteRangeResponse(PbDeleteRangeResponse); -impl From for DeleteRangeResponse { - fn from(res: PbDeleteRangeResponse) -> Self { - Self::new(res) +impl TryFrom for DeleteRangeResponse { + type Error = error::Error; + + fn try_from(pb: PbDeleteRangeResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self::new(pb)) } } @@ -304,6 +514,10 @@ impl DeleteRangeResponse { #[cfg(test)] mod tests { + use api::v1::meta::BatchPutRequest as PbBatchPutRequest; + use api::v1::meta::BatchPutResponse as PbBatchPutResponse; + use api::v1::meta::CompareAndPutRequest as PbCompareAndPutRequest; + use api::v1::meta::CompareAndPutResponse as PbCompareAndPutResponse; use api::v1::meta::DeleteRangeRequest as PbDeleteRangeRequest; use api::v1::meta::DeleteRangeResponse as PbDeleteRangeResponse; use api::v1::meta::KeyValue as PbKeyValue; @@ -319,8 +533,7 @@ mod tests { let (key, range_end, limit) = (b"test_key1".to_vec(), b"test_range_end1".to_vec(), 1); let req = RangeRequest::new() - .with_key(key.clone()) - .with_range_end(range_end.clone()) + .with_range(key.clone(), range_end.clone()) .with_limit(limit) .with_keys_only(); @@ -332,6 +545,23 @@ mod tests { assert!(into_req.keys_only); } + #[test] + fn test_prefix_request_trans() { + let (key, limit) = (b"test_key1".to_vec(), 1); + + let req = RangeRequest::new() + .with_prefix(key.clone()) + .with_limit(limit) + .with_keys_only(); + + let into_req: PbRangeRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(key, into_req.key); + assert_eq!(b"test_key2".to_vec(), into_req.range_end); + assert_eq!(limit, into_req.limit); + assert!(into_req.keys_only); + } + #[test] fn test_range_response_trans() { let pb_res = PbRangeResponse { @@ -400,13 +630,88 @@ mod tests { assert_eq!(b"v1".to_vec(), kv.take_value()); } + #[test] + fn test_batch_put_request_trans() { + let req = BatchPutRequest::new() + .add_kv(b"test_key1".to_vec(), b"test_value1".to_vec()) + .add_kv(b"test_key2".to_vec(), b"test_value2".to_vec()) + .add_kv(b"test_key3".to_vec(), b"test_value3".to_vec()) + .with_prev_kv(); + + let into_req: PbBatchPutRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(b"test_key1".to_vec(), into_req.kvs.get(0).unwrap().key); + assert_eq!(b"test_key2".to_vec(), into_req.kvs.get(1).unwrap().key); + assert_eq!(b"test_key3".to_vec(), into_req.kvs.get(2).unwrap().key); + assert_eq!(b"test_value1".to_vec(), into_req.kvs.get(0).unwrap().value); + assert_eq!(b"test_value2".to_vec(), into_req.kvs.get(1).unwrap().value); + assert_eq!(b"test_value3".to_vec(), into_req.kvs.get(2).unwrap().value); + assert!(into_req.prev_kv); + } + + #[test] + fn test_batch_put_response_trans() { + let pb_res = PbBatchPutResponse { + header: None, + prev_kvs: vec![PbKeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }], + }; + + let mut res = BatchPutResponse::new(pb_res); + assert!(res.take_header().is_none()); + let kvs = res.take_prev_kvs(); + assert_eq!(b"k1".to_vec(), kvs[0].key().to_vec()); + assert_eq!(b"v1".to_vec(), kvs[0].value().to_vec()); + } + + #[test] + fn test_compare_and_put_request_trans() { + let (key, expect, value) = ( + b"test_key1".to_vec(), + b"test_expect1".to_vec(), + b"test_value1".to_vec(), + ); + + let req = CompareAndPutRequest::new() + .with_key(key.clone()) + .with_expect(expect.clone()) + .with_value(value.clone()); + + let into_req: PbCompareAndPutRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(key, into_req.key); + assert_eq!(expect, into_req.expect); + assert_eq!(value, into_req.value); + } + + #[test] + fn test_compare_and_put_response_trans() { + let pb_res = PbCompareAndPutResponse { + header: None, + success: true, + prev_kv: Some(PbKeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }), + }; + + let mut res = CompareAndPutResponse::new(pb_res); + assert!(res.take_header().is_none()); + let mut kv = res.take_prev_kv().unwrap(); + assert_eq!(b"k1".to_vec(), kv.key().to_vec()); + assert_eq!(b"k1".to_vec(), kv.take_key()); + assert_eq!(b"v1".to_vec(), kv.value().to_vec()); + assert_eq!(b"v1".to_vec(), kv.take_value()); + } + #[test] fn test_delete_range_request_trans() { let (key, range_end) = (b"test_key1".to_vec(), b"test_range_end1".to_vec()); let req = DeleteRangeRequest::new() - .with_key(key.clone()) - .with_range_end(range_end.clone()) + .with_range(key.clone(), range_end.clone()) .with_prev_kv(); let into_req: PbDeleteRangeRequest = req.into(); @@ -416,6 +721,21 @@ mod tests { assert!(into_req.prev_kv); } + #[test] + fn test_delete_prefix_request_trans() { + let key = b"test_key1".to_vec(); + + let req = DeleteRangeRequest::new() + .with_prefix(key.clone()) + .with_prev_kv(); + + let into_req: PbDeleteRangeRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(key, into_req.key); + assert_eq!(b"test_key2".to_vec(), into_req.range_end); + assert!(into_req.prev_kv); + } + #[test] fn test_delete_range_response_trans() { let pb_res = PbDeleteRangeResponse { @@ -433,7 +753,7 @@ mod tests { ], }; - let mut res: DeleteRangeResponse = pb_res.into(); + let mut res: DeleteRangeResponse = pb_res.try_into().unwrap(); assert!(res.take_header().is_none()); assert_eq!(2, res.deleted()); let mut kvs = res.take_prev_kvs(); diff --git a/src/meta-client/src/rpc/util.rs b/src/meta-client/src/rpc/util.rs new file mode 100644 index 0000000000..d563b4a860 --- /dev/null +++ b/src/meta-client/src/rpc/util.rs @@ -0,0 +1,32 @@ +use api::v1::meta::ResponseHeader; + +use crate::error; +use crate::error::Result; + +#[inline] +pub(crate) fn check_response_header(header: Option<&ResponseHeader>) -> Result<()> { + if let Some(header) = header { + if let Some(ref error) = header.error { + let code = error.code; + let err_msg = &error.err_msg; + return error::IllegalServerStateSnafu { code, err_msg }.fail(); + } + } + + Ok(()) +} + +/// Get prefix end key of `key`. +#[inline] +pub fn get_prefix_end_key(key: &[u8]) -> Vec { + for (i, v) in key.iter().enumerate().rev() { + if *v < 0xFF { + let mut end = Vec::from(&key[..=i]); + end[i] = *v + 1; + return end; + } + } + + // next prefix does not exist (e.g., 0xffff); + vec![0] +} diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index f5cf35aa81..f9b8064c3a 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -10,11 +10,15 @@ common-base = { path = "../common/base" } common-error = { path = "../common/error" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } etcd-client = "0.10" futures = "0.3" h2 = "0.3" http-body = "0.4" +lazy_static = "1.4" +regex = "1.6" serde = "1.0" +serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.0", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 63989ca8bf..25d2dd49ee 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -37,6 +37,48 @@ pub enum Error { #[snafu(display("Empty table name"))] EmptyTableName { backtrace: Backtrace }, + + #[snafu(display("Invalid datanode lease key: {}", key))] + InvalidLeaseKey { key: String, backtrace: Backtrace }, + + #[snafu(display("Failed to parse datanode lease key from utf8: {}", source))] + LeaseKeyFromUtf8 { + source: std::string::FromUtf8Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to serialize to json: {}", input))] + SerializeToJson { + input: String, + source: serde_json::error::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to deserialize from json: {}", input))] + DeserializeFromJson { + input: String, + source: serde_json::error::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to parse number: {}, source: {}", err_msg, source))] + ParseNum { + err_msg: String, + source: std::num::ParseIntError, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid arguments: {}", err_msg))] + InvalidArguments { + err_msg: String, + backtrace: Backtrace, + }, + + #[snafu(display("Invalid result with a txn response: {}", err_msg))] + InvalidTxnResult { + err_msg: String, + backtrace: Backtrace, + }, } pub type Result = std::result::Result; @@ -62,8 +104,17 @@ impl ErrorExt for Error { | Error::EtcdFailed { .. } | Error::ConnectEtcd { .. } | Error::TcpBind { .. } + | Error::SerializeToJson { .. } + | Error::DeserializeFromJson { .. } | Error::StartGrpc { .. } => StatusCode::Internal, - Error::EmptyKey { .. } | Error::EmptyTableName { .. } => StatusCode::InvalidArguments, + Error::EmptyKey { .. } + | Error::EmptyTableName { .. } + | Error::InvalidLeaseKey { .. } + | Error::ParseNum { .. } + | Error::InvalidArguments { .. } => StatusCode::InvalidArguments, + Error::LeaseKeyFromUtf8 { .. } | Error::InvalidTxnResult { .. } => { + StatusCode::Unexpected + } } } } @@ -106,10 +157,13 @@ mod tests { Err(etcd_client::Error::InvalidArgs("".to_string())) } + fn throw_serde_json_error() -> StdResult { + serde_json::from_str("invalid json") + } + #[test] fn test_stream_node_error() { let e = throw_none_option().context(StreamNoneSnafu).err().unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -117,7 +171,6 @@ mod tests { #[test] fn test_empty_key_error() { let e = throw_none_option().context(EmptyKeySnafu).err().unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::InvalidArguments); } @@ -128,7 +181,6 @@ mod tests { .context(EtcdFailedSnafu) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -139,7 +191,6 @@ mod tests { .context(ConnectEtcdSnafu) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -153,7 +204,6 @@ mod tests { .context(TcpBindSnafu { addr: "127.0.0.1" }) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -163,9 +213,7 @@ mod tests { fn throw_tonic_error() -> StdResult { tonic::transport::Endpoint::new("http//http").map(|_| ()) } - let e = throw_tonic_error().context(StartGrpcSnafu).err().unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::Internal); } @@ -176,8 +224,84 @@ mod tests { .context(EmptyTableNameSnafu) .err() .unwrap(); - assert!(e.backtrace_opt().is_some()); assert_eq!(e.status_code(), StatusCode::InvalidArguments); } + + #[test] + fn test_invalid_lease_key_error() { + let e = throw_none_option() + .context(InvalidLeaseKeySnafu { key: "test" }) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_lease_key_fromutf8_test() { + fn throw_fromutf8_error() -> StdResult { + let sparkle_heart = vec![0, 159, 146, 150]; + String::from_utf8(sparkle_heart).map(|_| ()) + } + let e = throw_fromutf8_error() + .context(LeaseKeyFromUtf8Snafu) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Unexpected); + } + + #[test] + fn test_serialize_to_json_error() { + let e = throw_serde_json_error() + .context(SerializeToJsonSnafu { input: "" }) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_deserialize_from_json_error() { + let e = throw_serde_json_error() + .context(DeserializeFromJsonSnafu { input: "" }) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Internal); + } + + #[test] + fn test_parse_num_error() { + fn throw_parse_int_error() -> StdResult { + "invalid num".parse::().map(|_| ()) + } + let e = throw_parse_int_error() + .context(ParseNumSnafu { err_msg: "" }) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_invalid_arguments_error() { + let e = throw_none_option() + .context(InvalidArgumentsSnafu { err_msg: "test" }) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::InvalidArguments); + } + + #[test] + fn test_invalid_txn_error() { + let e = throw_none_option() + .context(InvalidTxnResultSnafu { err_msg: "test" }) + .err() + .unwrap(); + assert!(e.backtrace_opt().is_some()); + assert_eq!(e.status_code(), StatusCode::Unexpected); + } } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index d9563b7d26..e7439d19a1 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -1,3 +1,4 @@ +pub(crate) mod datanode_lease; pub(crate) mod response_header; use std::collections::BTreeMap; @@ -18,11 +19,29 @@ pub trait HeartbeatHandler: Send + Sync { async fn handle( &self, req: &HeartbeatRequest, - ctx: &mut HeartbeatAccumulator, - store: KvStoreRef, + ctx: &Context, + acc: &mut HeartbeatAccumulator, ) -> Result<()>; } +#[derive(Clone)] +pub struct Context { + pub server_addr: String, // also server_id + pub kv_store: KvStoreRef, +} + +impl Context { + #[inline] + pub fn server_addr(&self) -> &str { + &self.server_addr + } + + #[inline] + pub fn kv_store(&self) -> KvStoreRef { + self.kv_store.clone() + } +} + #[derive(Debug, Default)] pub struct HeartbeatAccumulator { pub header: Option, @@ -45,22 +64,13 @@ pub enum Instruction {} pub type Pusher = Sender>; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct HeartbeatHandlers { - kv_store: KvStoreRef, handlers: Arc>>>, pushers: Arc>>, } impl HeartbeatHandlers { - pub fn new(kv_store: KvStoreRef) -> Self { - Self { - kv_store, - handlers: Arc::new(RwLock::new(Default::default())), - pushers: Arc::new(RwLock::new(Default::default())), - } - } - pub async fn add_handler(&self, handler: impl HeartbeatHandler + 'static) { let mut handlers = self.handlers.write().await; handlers.push(Box::new(handler)); @@ -80,11 +90,11 @@ impl HeartbeatHandlers { pushers.remove(key) } - pub async fn handle(&self, req: HeartbeatRequest) -> Result { + pub async fn handle(&self, req: HeartbeatRequest, ctx: Context) -> Result { let mut acc = HeartbeatAccumulator::default(); let handlers = self.handlers.read().await; for h in handlers.iter() { - h.handle(&req, &mut acc, self.kv_store.clone()).await?; + h.handle(&req, &ctx, &mut acc).await?; } let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { diff --git a/src/meta-srv/src/handler/datanode_lease.rs b/src/meta-srv/src/handler/datanode_lease.rs new file mode 100644 index 0000000000..ea1ae6d4ee --- /dev/null +++ b/src/meta-srv/src/handler/datanode_lease.rs @@ -0,0 +1,50 @@ +use api::v1::meta::HeartbeatRequest; +use api::v1::meta::PutRequest; +use common_telemetry::info; +use common_time::util as time_util; + +use super::Context; +use super::HeartbeatAccumulator; +use super::HeartbeatHandler; +use crate::error::Result; +use crate::keys::LeaseKey; +use crate::keys::LeaseValue; + +pub struct DatanodeLeaseHandler; + +#[async_trait::async_trait] +impl HeartbeatHandler for DatanodeLeaseHandler { + async fn handle( + &self, + req: &HeartbeatRequest, + ctx: &Context, + _acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + let HeartbeatRequest { header, peer, .. } = req; + if let Some(ref peer) = peer { + let key = LeaseKey { + cluster_id: header.as_ref().map_or(0, |h| h.cluster_id), + node_id: peer.id, + }; + let value = LeaseValue { + timestamp_millis: time_util::current_time_millis(), + node_addr: peer.addr.clone(), + }; + + info!("Receive a heartbeat from datanode: {:?}, {:?}", key, value); + + let key = key.try_into()?; + let value = value.try_into()?; + let put = PutRequest { + key, + value, + ..Default::default() + }; + + let kv_store = ctx.kv_store(); + let _ = kv_store.put(put).await?; + } + + Ok(()) + } +} diff --git a/src/meta-srv/src/handler/response_header.rs b/src/meta-srv/src/handler/response_header.rs index 94d07c6cbb..d808cd26ac 100644 --- a/src/meta-srv/src/handler/response_header.rs +++ b/src/meta-srv/src/handler/response_header.rs @@ -2,10 +2,10 @@ use api::v1::meta::HeartbeatRequest; use api::v1::meta::ResponseHeader; use api::v1::meta::PROTOCOL_VERSION; +use super::Context; use super::HeartbeatAccumulator; use super::HeartbeatHandler; use crate::error::Result; -use crate::service::store::kv::KvStoreRef; pub struct ResponseHeaderHandler; @@ -14,8 +14,8 @@ impl HeartbeatHandler for ResponseHeaderHandler { async fn handle( &self, req: &HeartbeatRequest, + _ctx: &Context, acc: &mut HeartbeatAccumulator, - _store: KvStoreRef, ) -> Result<()> { let HeartbeatRequest { header, .. } = req; let res_header = ResponseHeader { @@ -32,25 +32,27 @@ impl HeartbeatHandler for ResponseHeaderHandler { mod tests { use std::sync::Arc; - use api::v1::meta::{request_header, HeartbeatResponse}; + use api::v1::meta::{HeartbeatResponse, RequestHeader}; use super::*; - use crate::service::store::noop::NoopKvStore; + use crate::{handler::Context, service::store::noop::NoopKvStore}; #[tokio::test] async fn test_handle_heartbeat_resp_header() { let kv_store = Arc::new(NoopKvStore {}); + let ctx = Context { + server_addr: "0.0.0.0:0000".to_string(), + kv_store, + }; + let req = HeartbeatRequest { - header: request_header((1, 2)), + header: RequestHeader::new((1, 2)), ..Default::default() }; let mut acc = HeartbeatAccumulator::default(); let response_handler = ResponseHeaderHandler {}; - response_handler - .handle(&req, &mut acc, kv_store) - .await - .unwrap(); + response_handler.handle(&req, &ctx, &mut acc).await.unwrap(); let header = std::mem::take(&mut acc.header); let res = HeartbeatResponse { header, diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs new file mode 100644 index 0000000000..47416dc4f9 --- /dev/null +++ b/src/meta-srv/src/keys.rs @@ -0,0 +1,140 @@ +use std::str::FromStr; + +use lazy_static::lazy_static; +use regex::Regex; +use serde::Deserialize; +use serde::Serialize; +use snafu::ensure; +use snafu::OptionExt; +use snafu::ResultExt; + +use crate::error; +use crate::error::Result; + +pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; + +lazy_static! { + static ref DATANODE_KEY_PATTERN: Regex = + Regex::new(&format!("^{}-([0-9]+)-([0-9]+)$", DN_LEASE_PREFIX)).unwrap(); +} +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct LeaseKey { + pub cluster_id: u64, + pub node_id: u64, +} + +impl FromStr for LeaseKey { + type Err = error::Error; + + fn from_str(key: &str) -> Result { + let caps = DATANODE_KEY_PATTERN + .captures(key) + .context(error::InvalidLeaseKeySnafu { key })?; + + ensure!(caps.len() == 3, error::InvalidLeaseKeySnafu { key }); + + let cluster_id = caps[1].to_string(); + let node_id = caps[2].to_string(); + let cluster_id = cluster_id.parse::().context(error::ParseNumSnafu { + err_msg: format!("invalid cluster_id: {}", cluster_id), + })?; + let node_id = node_id.parse::().context(error::ParseNumSnafu { + err_msg: format!("invalid node_id: {}", node_id), + })?; + + Ok(Self { + cluster_id, + node_id, + }) + } +} + +impl TryFrom> for LeaseKey { + type Error = error::Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(error::LeaseKeyFromUtf8Snafu {}) + .map(|x| x.parse())? + } +} + +impl TryFrom for Vec { + type Error = error::Error; + + fn try_from(dn_key: LeaseKey) -> Result { + Ok(format!( + "{}-{}-{}", + DN_LEASE_PREFIX, dn_key.cluster_id, dn_key.node_id + ) + .into_bytes()) + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct LeaseValue { + // last activity + pub timestamp_millis: i64, + pub node_addr: String, +} + +impl FromStr for LeaseValue { + type Err = error::Error; + + fn from_str(value: &str) -> Result { + serde_json::from_str(value).context(error::DeserializeFromJsonSnafu { input: value }) + } +} + +impl TryFrom> for LeaseValue { + type Error = error::Error; + + fn try_from(bytes: Vec) -> Result { + String::from_utf8(bytes) + .context(error::LeaseKeyFromUtf8Snafu {}) + .map(|x| x.parse())? + } +} + +impl TryFrom for Vec { + type Error = error::Error; + + fn try_from(dn_value: LeaseValue) -> Result { + Ok(serde_json::to_string(&dn_value) + .context(error::SerializeToJsonSnafu { + input: format!("{:?}", dn_value), + })? + .into_bytes()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_datanode_lease_key() { + let key = LeaseKey { + cluster_id: 0, + node_id: 1, + }; + + let key_bytes: Vec = key.clone().try_into().unwrap(); + let new_key: LeaseKey = key_bytes.try_into().unwrap(); + + assert_eq!(new_key, key); + } + + #[test] + fn test_datanode_lease_value() { + let value = LeaseValue { + timestamp_millis: 111, + node_addr: "0.0.0.0:3002".to_string(), + }; + + let value_bytes: Vec = value.clone().try_into().unwrap(); + let new_value: LeaseValue = value_bytes.try_into().unwrap(); + + assert_eq!(new_value, value); + } +} diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs new file mode 100644 index 0000000000..d57f258243 --- /dev/null +++ b/src/meta-srv/src/lease.rs @@ -0,0 +1,46 @@ +use api::v1::meta::RangeRequest; +use api::v1::meta::RangeResponse; + +use crate::error::Result; +use crate::keys::LeaseKey; +use crate::keys::LeaseValue; +use crate::keys::DN_LEASE_PREFIX; +use crate::service::store::kv::KvStoreRef; +use crate::util; + +pub async fn alive_datanodes

( + cluster_id: u64, + kv_store: KvStoreRef, + predicate: P, +) -> Result> +where + P: Fn(&LeaseKey, &LeaseValue) -> bool, +{ + let key = get_lease_prefix(cluster_id); + let range_end = util::get_prefix_end_key(&key); + let req = RangeRequest { + key, + range_end, + ..Default::default() + }; + + let res = kv_store.range(req).await?; + + let RangeResponse { kvs, .. } = res; + let mut lease_kvs = vec![]; + for kv in kvs { + let lease_key: LeaseKey = kv.key.try_into()?; + let lease_value: LeaseValue = kv.value.try_into()?; + if !predicate(&lease_key, &lease_value) { + continue; + } + lease_kvs.push((lease_key, lease_value)); + } + + Ok(lease_kvs) +} + +#[inline] +pub fn get_lease_prefix(cluster_id: u64) -> Vec { + format!("{}-{}", DN_LEASE_PREFIX, cluster_id).into_bytes() +} diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 740fcde1e2..296749906c 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -1,7 +1,10 @@ pub mod bootstrap; pub mod error; pub mod handler; +mod keys; +pub mod lease; pub mod metasrv; pub mod service; +mod util; pub use crate::error::Result; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 16858fedaf..206ad16ff7 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -1,6 +1,7 @@ use serde::Deserialize; use serde::Serialize; +use crate::handler::datanode_lease::DatanodeLeaseHandler; use crate::handler::response_header::ResponseHeaderHandler; use crate::handler::HeartbeatHandlers; use crate::service::store::kv::KvStoreRef; @@ -10,6 +11,7 @@ pub struct MetaSrvOptions { pub bind_addr: String, pub server_addr: String, pub store_addr: String, + pub datanode_lease_secs: i64, } impl Default for MetaSrvOptions { @@ -18,6 +20,7 @@ impl Default for MetaSrvOptions { bind_addr: "0.0.0.0:3002".to_string(), server_addr: "0.0.0.0:3002".to_string(), store_addr: "0.0.0.0:2380".to_string(), + datanode_lease_secs: 15, } } } @@ -31,8 +34,10 @@ pub struct MetaSrv { impl MetaSrv { pub async fn new(options: MetaSrvOptions, kv_store: KvStoreRef) -> Self { - let heartbeat_handlers = HeartbeatHandlers::new(kv_store.clone()); + let heartbeat_handlers = HeartbeatHandlers::default(); heartbeat_handlers.add_handler(ResponseHeaderHandler).await; + heartbeat_handlers.add_handler(DatanodeLeaseHandler).await; + Self { options, kv_store, diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index cd3829d5b8..5d7b524f49 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -23,6 +23,7 @@ use super::GrpcResult; use super::GrpcStream; use crate::error; use crate::error::Result; +use crate::handler::Context; use crate::metasrv::MetaSrv; static PUSHER_ID: AtomicU64 = AtomicU64::new(0); @@ -38,6 +39,10 @@ impl heartbeat_server::Heartbeat for MetaSrv { let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); let handlers = self.heartbeat_handlers(); + let ctx = Context { + server_addr: self.options().server_addr.clone(), + kv_store: self.kv_store(), + }; common_runtime::spawn_bg(async move { let mut pusher_key = None; while let Some(msg) = in_stream.next().await { @@ -56,9 +61,14 @@ impl heartbeat_server::Heartbeat for MetaSrv { } } - tx.send(handlers.handle(req).await.map_err(|e| e.into())) - .await - .expect("working rx"); + tx.send( + handlers + .handle(req, ctx.clone()) + .await + .map_err(|e| e.into()), + ) + .await + .expect("working rx"); } Err(err) => { if let Some(io_err) = error::match_for_io_error(&err) { @@ -140,7 +150,7 @@ mod tests { let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; let req = AskLeaderRequest { - header: request_header((1, 1)), + header: RequestHeader::new((1, 1)), }; let res = meta_srv.ask_leader(req.into_request()).await.unwrap(); diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 5473b18775..711dfa9ea3 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -1,8 +1,14 @@ use api::v1::meta::router_server; use api::v1::meta::CreateRequest; use api::v1::meta::Peer; +use api::v1::meta::Region; +use api::v1::meta::RegionRoute; +use api::v1::meta::ResponseHeader; use api::v1::meta::RouteRequest; use api::v1::meta::RouteResponse; +use api::v1::meta::Table; +use api::v1::meta::TableRoute; +use common_time::util as time_util; use snafu::OptionExt; use tonic::Request; use tonic::Response; @@ -11,44 +17,120 @@ use super::store::kv::KvStoreRef; use super::GrpcResult; use crate::error; use crate::error::Result; +use crate::keys::LeaseKey; +use crate::keys::LeaseValue; +use crate::lease; use crate::metasrv::MetaSrv; +#[derive(Clone)] +struct Context { + pub datanode_lease_secs: i64, + pub kv_store: KvStoreRef, +} + #[async_trait::async_trait] impl router_server::Router for MetaSrv { async fn route(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let kv_store = self.kv_store(); - let res = handle_route(req, kv_store).await?; + let ctx = Context { + datanode_lease_secs: self.options().datanode_lease_secs, + kv_store: self.kv_store(), + }; + let res = handle_route(req, ctx).await?; Ok(Response::new(res)) } async fn create(&self, req: Request) -> GrpcResult { let req = req.into_inner(); - let kv_store = self.kv_store(); - let res = handle_create(req, kv_store).await?; + let ctx = Context { + datanode_lease_secs: self.options().datanode_lease_secs, + kv_store: self.kv_store(), + }; + let res = handle_create(req, ctx, LeaseBasedSelector::default()).await?; Ok(Response::new(res)) } } -async fn handle_route(_req: RouteRequest, _kv_store: KvStoreRef) -> Result { +#[async_trait::async_trait] +trait DatanodeSelector { + async fn select(&self, id: u64, ctx: &Context) -> Result>; +} + +#[derive(Default)] +struct LeaseBasedSelector; + +#[async_trait::async_trait] +impl DatanodeSelector for LeaseBasedSelector { + async fn select(&self, id: u64, ctx: &Context) -> Result> { + // filter out the nodes out lease + let lease_filter = |_: &LeaseKey, v: &LeaseValue| { + time_util::current_time_millis() - v.timestamp_millis < ctx.datanode_lease_secs + }; + let mut lease_kvs = lease::alive_datanodes(id, ctx.kv_store.clone(), lease_filter).await?; + // TODO(jiachun): At the moment we are just pushing the latest to the forefront, + // and it is better to use load-based strategies in the future. + lease_kvs.sort_by(|a, b| b.1.timestamp_millis.cmp(&a.1.timestamp_millis)); + + let peers = lease_kvs + .into_iter() + .map(|(k, v)| Peer { + id: k.node_id, + addr: v.node_addr, + }) + .collect::>(); + + Ok(peers) + } +} + +async fn handle_route(_req: RouteRequest, _ctx: Context) -> Result { todo!() } -async fn handle_create(req: CreateRequest, _kv_store: KvStoreRef) -> Result { - let CreateRequest { table_name, .. } = req; - let _table_name = table_name.context(error::EmptyTableNameSnafu)?; +async fn handle_create( + req: CreateRequest, + ctx: Context, + selector: impl DatanodeSelector, +) -> Result { + let CreateRequest { + header, + table_name, + partitions, + } = req; + let table_name = table_name.context(error::EmptyTableNameSnafu)?; + let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - // TODO(jiachun): - let peers = vec![Peer { - id: 0, - addr: "127.0.0.1:3000".to_string(), - }]; + let peers = selector.select(cluster_id, &ctx).await?; + + let table = Table { + table_name: Some(table_name), + ..Default::default() + }; + let region_num = partitions.len(); + let mut region_routes = Vec::with_capacity(region_num); + for i in 0..region_num { + let region = Region { + id: i as u64, + ..Default::default() + }; + let region_route = RegionRoute { + region: Some(region), + leader_peer_index: (i % peers.len()) as u64, + follower_peer_indexes: vec![(i % peers.len()) as u64], + }; + region_routes.push(region_route); + } + let table_route = TableRoute { + table: Some(table), + region_routes, + }; Ok(RouteResponse { + header: ResponseHeader::success(cluster_id), peers, - ..Default::default() + table_routes: vec![table_route], }) } @@ -71,7 +153,7 @@ mod tests { let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; let req = RouteRequest { - header: request_header((1, 1)), + header: RequestHeader::new((1, 1)), ..Default::default() }; let req = req @@ -82,32 +164,60 @@ mod tests { let _res = meta_srv.route(req.into_request()).await.unwrap(); } + struct MockSelector; + + #[async_trait::async_trait] + impl DatanodeSelector for MockSelector { + async fn select(&self, _: u64, _: &Context) -> Result> { + Ok(vec![ + Peer { + id: 0, + addr: "127.0.0.1:3000".to_string(), + }, + Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }, + ]) + } + } + #[tokio::test] async fn test_handle_create() { let kv_store = Arc::new(NoopKvStore {}); - let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; - let table_name = TableName::new("test_catalog", "test_db", "table1"); let req = CreateRequest { - header: request_header((1, 1)), + header: RequestHeader::new((1, 1)), table_name: Some(table_name), ..Default::default() }; - let p0 = Partition::new() .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) .value_list(vec![b"v1".to_vec(), b"v2".to_vec()]); - let p1 = Partition::new() .column_list(vec![b"col1".to_vec(), b"col2".to_vec()]) .value_list(vec![b"v11".to_vec(), b"v22".to_vec()]); - let req = req.add_partition(p0).add_partition(p1); + let ctx = Context { + datanode_lease_secs: 10, + kv_store, + }; + let res = handle_create(req, ctx, MockSelector {}).await.unwrap(); - let res = meta_srv.create(req.into_request()).await.unwrap(); - - for r in res.into_inner().peers { - assert_eq!("127.0.0.1:3000", r.addr); - } + assert_eq!( + vec![ + Peer { + id: 0, + addr: "127.0.0.1:3000".to_string(), + }, + Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }, + ], + res.peers + ); + assert_eq!(1, res.table_routes.len()); + assert_eq!(2, res.table_routes.get(0).unwrap().region_routes.len()); } } diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 5cef83aa24..a205817880 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -1,8 +1,13 @@ pub mod etcd; pub mod kv; +#[cfg(test)] pub(crate) mod noop; use api::v1::meta::store_server; +use api::v1::meta::BatchPutRequest; +use api::v1::meta::BatchPutResponse; +use api::v1::meta::CompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; use api::v1::meta::PutRequest; @@ -31,6 +36,23 @@ impl store_server::Store for MetaSrv { Ok(Response::new(res)) } + async fn batch_put(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().batch_put(req).await?; + + Ok(Response::new(res)) + } + + async fn compare_and_put( + &self, + req: Request, + ) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().compare_and_put(req).await?; + + Ok(Response::new(res)) + } + async fn delete_range( &self, req: Request, @@ -74,6 +96,26 @@ mod tests { assert!(res.is_ok()); } + #[tokio::test] + async fn test_batch_put() { + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let req = BatchPutRequest::default(); + let res = meta_srv.batch_put(req.into_request()).await; + + assert!(res.is_ok()); + } + + #[tokio::test] + async fn test_compare_and_put() { + let kv_store = Arc::new(NoopKvStore {}); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store).await; + let req = CompareAndPutRequest::default(); + let res = meta_srv.compare_and_put(req.into_request()).await; + + assert!(res.is_ok()); + } + #[tokio::test] async fn test_delete_range() { let kv_store = Arc::new(NoopKvStore {}); diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index d5256e9614..59bd70f771 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -1,5 +1,9 @@ use std::sync::Arc; +use api::v1::meta::BatchPutRequest; +use api::v1::meta::BatchPutResponse; +use api::v1::meta::CompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; use api::v1::meta::KeyValue; @@ -7,11 +11,17 @@ use api::v1::meta::PutRequest; use api::v1::meta::PutResponse; use api::v1::meta::RangeRequest; use api::v1::meta::RangeResponse; +use api::v1::meta::ResponseHeader; use common_error::prelude::*; use etcd_client::Client; +use etcd_client::Compare; +use etcd_client::CompareOp; use etcd_client::DeleteOptions; use etcd_client::GetOptions; use etcd_client::PutOptions; +use etcd_client::Txn; +use etcd_client::TxnOp; +use etcd_client::TxnOpResponse; use super::kv::KvStore; use super::kv::KvStoreRef; @@ -40,7 +50,11 @@ impl EtcdStore { #[async_trait::async_trait] impl KvStore for EtcdStore { async fn range(&self, req: RangeRequest) -> Result { - let Get { key, options } = req.try_into()?; + let Get { + cluster_id, + key, + options, + } = req.try_into()?; let res = self .client @@ -56,14 +70,15 @@ impl KvStore for EtcdStore { .collect::>(); Ok(RangeResponse { + header: ResponseHeader::success(cluster_id), kvs, more: res.more(), - ..Default::default() }) } async fn put(&self, req: PutRequest) -> Result { let Put { + cluster_id, key, value, options, @@ -79,13 +94,111 @@ impl KvStore for EtcdStore { let prev_kv = res.prev_key().map(|kv| KvPair::new(kv).into()); Ok(PutResponse { + header: ResponseHeader::success(cluster_id), + prev_kv, + }) + } + + async fn batch_put(&self, req: BatchPutRequest) -> Result { + let BatchPut { + cluster_id, + kvs, + options, + } = req.try_into()?; + + let put_ops = kvs + .into_iter() + .map(|kv| (TxnOp::put(kv.key, kv.value, options.clone()))) + .collect::>(); + let txn = Txn::new().and_then(put_ops); + + let txn_res = self + .client + .kv_client() + .txn(txn) + .await + .context(error::EtcdFailedSnafu)?; + + let mut prev_kvs = vec![]; + for op_res in txn_res.op_responses() { + match op_res { + TxnOpResponse::Put(put_res) => { + if let Some(prev_kv) = put_res.prev_key() { + prev_kvs.push(KvPair::new(prev_kv).into()); + } + } + _ => unreachable!(), // never get here + } + } + + Ok(BatchPutResponse { + header: ResponseHeader::success(cluster_id), + prev_kvs, + }) + } + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { + let CompareAndPut { + cluster_id, + key, + expect, + value, + options, + } = req.try_into()?; + + let txn = Txn::new() + .when(vec![Compare::value( + key.clone(), + CompareOp::Equal, + expect.clone(), + )]) + .and_then(vec![TxnOp::put(key.clone(), value, options)]) + .or_else(vec![TxnOp::get(key.clone(), None)]); + + let txn_res = self + .client + .kv_client() + .txn(txn) + .await + .context(error::EtcdFailedSnafu)?; + let success = txn_res.succeeded(); + let prev_kv; + let op_res = txn_res + .op_responses() + .pop() + .context(error::InvalidTxnResultSnafu { + err_msg: "empty response", + })?; + if success { + prev_kv = Some(KeyValue { key, value: expect }); + } else { + match op_res { + TxnOpResponse::Get(get_res) => { + ensure!( + get_res.count() == 1, + error::InvalidTxnResultSnafu { + err_msg: format!("expect 1 response, actual {}", get_res.count()) + } + ); + prev_kv = Some(KeyValue::from(KvPair::new(&get_res.kvs()[0]))); + } + _ => unreachable!(), // never get here + } + } + + Ok(CompareAndPutResponse { + header: ResponseHeader::success(cluster_id), + success, prev_kv, - ..Default::default() }) } async fn delete_range(&self, req: DeleteRangeRequest) -> Result { - let Delete { key, options } = req.try_into()?; + let Delete { + cluster_id, + key, + options, + } = req.try_into()?; let res = self .client @@ -101,14 +214,15 @@ impl KvStore for EtcdStore { .collect::>(); Ok(DeleteRangeResponse { + header: ResponseHeader::success(cluster_id), deleted: res.deleted(), prev_kvs, - ..Default::default() }) } } struct Get { + cluster_id: u64, key: Vec, options: Option, } @@ -118,11 +232,11 @@ impl TryFrom for Get { fn try_from(req: RangeRequest) -> Result { let RangeRequest { + header, key, range_end, limit, keys_only, - .. } = req; ensure!(!key.is_empty(), error::EmptyKeySnafu); @@ -139,6 +253,7 @@ impl TryFrom for Get { } Ok(Get { + cluster_id: header.map_or(0, |h| h.cluster_id), key, options: Some(options), }) @@ -146,6 +261,7 @@ impl TryFrom for Get { } struct Put { + cluster_id: u64, key: Vec, value: Vec, options: Option, @@ -156,10 +272,10 @@ impl TryFrom for Put { fn try_from(req: PutRequest) -> Result { let PutRequest { + header, key, value, prev_kv, - .. } = req; let mut options = PutOptions::default(); @@ -168,6 +284,7 @@ impl TryFrom for Put { } Ok(Put { + cluster_id: header.map_or(0, |h| h.cluster_id), key, value, options: Some(options), @@ -175,7 +292,66 @@ impl TryFrom for Put { } } +struct BatchPut { + cluster_id: u64, + kvs: Vec, + options: Option, +} + +impl TryFrom for BatchPut { + type Error = error::Error; + + fn try_from(req: BatchPutRequest) -> Result { + let BatchPutRequest { + header, + kvs, + prev_kv, + } = req; + + let mut options = PutOptions::default(); + if prev_kv { + options = options.with_prev_key(); + } + + Ok(BatchPut { + cluster_id: header.map_or(0, |h| h.cluster_id), + kvs, + options: Some(options), + }) + } +} + +struct CompareAndPut { + cluster_id: u64, + key: Vec, + expect: Vec, + value: Vec, + options: Option, +} + +impl TryFrom for CompareAndPut { + type Error = error::Error; + + fn try_from(req: CompareAndPutRequest) -> Result { + let CompareAndPutRequest { + header, + key, + expect, + value, + } = req; + + Ok(CompareAndPut { + cluster_id: header.map_or(0, |h| h.cluster_id), + key, + expect, + value, + options: Some(PutOptions::default().with_prev_key()), + }) + } +} + struct Delete { + cluster_id: u64, key: Vec, options: Option, } @@ -185,10 +361,10 @@ impl TryFrom for Delete { fn try_from(req: DeleteRangeRequest) -> Result { let DeleteRangeRequest { + header, key, range_end, prev_kv, - .. } = req; ensure!(!key.is_empty(), error::EmptyKeySnafu); @@ -202,6 +378,7 @@ impl TryFrom for Delete { } Ok(Delete { + cluster_id: header.map_or(0, |h| h.cluster_id), key, options: Some(options), }) @@ -213,7 +390,7 @@ struct KvPair<'a>(&'a etcd_client::KeyValue); impl<'a> KvPair<'a> { /// Creates a `KvPair` from etcd KeyValue #[inline] - const fn new(kv: &'a etcd_client::KeyValue) -> Self { + fn new(kv: &'a etcd_client::KeyValue) -> Self { Self(kv) } } @@ -263,6 +440,41 @@ mod tests { assert!(put.options.is_some()); } + #[test] + fn test_parse_batch_put() { + let req = BatchPutRequest { + kvs: vec![KeyValue { + key: b"test_key".to_vec(), + value: b"test_value".to_vec(), + }], + prev_kv: true, + ..Default::default() + }; + + let batch_put: BatchPut = req.try_into().unwrap(); + + assert_eq!(b"test_key".to_vec(), batch_put.kvs.get(0).unwrap().key); + assert_eq!(b"test_value".to_vec(), batch_put.kvs.get(0).unwrap().value); + assert!(batch_put.options.is_some()); + } + + #[test] + fn test_parse_compare_and_put() { + let req = CompareAndPutRequest { + key: b"test_key".to_vec(), + expect: b"test_expect".to_vec(), + value: b"test_value".to_vec(), + ..Default::default() + }; + + let compare_and_put: CompareAndPut = req.try_into().unwrap(); + + assert_eq!(b"test_key".to_vec(), compare_and_put.key); + assert_eq!(b"test_expect".to_vec(), compare_and_put.expect); + assert_eq!(b"test_value".to_vec(), compare_and_put.value); + assert!(compare_and_put.options.is_some()); + } + #[test] fn test_parse_delete() { let req = DeleteRangeRequest { diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index f079de71b7..98978ba63e 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -1,5 +1,9 @@ use std::sync::Arc; +use api::v1::meta::BatchPutRequest; +use api::v1::meta::BatchPutResponse; +use api::v1::meta::CompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; use api::v1::meta::PutRequest; @@ -17,5 +21,9 @@ pub trait KvStore: Send + Sync { async fn put(&self, req: PutRequest) -> Result; + async fn batch_put(&self, req: BatchPutRequest) -> Result; + + async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result; + async fn delete_range(&self, req: DeleteRangeRequest) -> Result; } diff --git a/src/meta-srv/src/service/store/noop.rs b/src/meta-srv/src/service/store/noop.rs index c4852003d6..e113ebfdcb 100644 --- a/src/meta-srv/src/service/store/noop.rs +++ b/src/meta-srv/src/service/store/noop.rs @@ -1,3 +1,7 @@ +use api::v1::meta::BatchPutRequest; +use api::v1::meta::BatchPutResponse; +use api::v1::meta::CompareAndPutRequest; +use api::v1::meta::CompareAndPutResponse; use api::v1::meta::DeleteRangeRequest; use api::v1::meta::DeleteRangeResponse; use api::v1::meta::PutRequest; @@ -23,6 +27,14 @@ impl KvStore for NoopKvStore { Ok(PutResponse::default()) } + async fn batch_put(&self, _req: BatchPutRequest) -> Result { + Ok(BatchPutResponse::default()) + } + + async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result { + Ok(CompareAndPutResponse::default()) + } + async fn delete_range(&self, _req: DeleteRangeRequest) -> Result { Ok(DeleteRangeResponse::default()) } diff --git a/src/meta-srv/src/util.rs b/src/meta-srv/src/util.rs new file mode 100644 index 0000000000..d8b2ec7e60 --- /dev/null +++ b/src/meta-srv/src/util.rs @@ -0,0 +1,34 @@ +/// Get prefix end key of `key`. +#[inline] +pub fn get_prefix_end_key(key: &[u8]) -> Vec { + for (i, v) in key.iter().enumerate().rev() { + if *v < 0xFF { + let mut end = Vec::from(&key[..=i]); + end[i] = *v + 1; + return end; + } + } + + // next prefix does not exist (e.g., 0xffff); + vec![0] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_get_prefix() { + let key = b"testa"; + assert_eq!(b"testb".to_vec(), get_prefix_end_key(key)); + + let key = vec![0, 0, 26]; + assert_eq!(vec![0, 0, 27], get_prefix_end_key(&key)); + + let key = vec![0, 0, 255]; + assert_eq!(vec![0, 1], get_prefix_end_key(&key)); + + let key = vec![0, 255, 255]; + assert_eq!(vec![1], get_prefix_end_key(&key)); + } +}