diff --git a/src/api/greptime/v1/meta/route.proto b/src/api/greptime/v1/meta/route.proto index 2c5a43b45d..d5bd2e043b 100644 --- a/src/api/greptime/v1/meta/route.proto +++ b/src/api/greptime/v1/meta/route.proto @@ -5,6 +5,8 @@ package greptime.v1.meta; import "greptime/v1/meta/common.proto"; service Router { + rpc Create(CreateRequest) returns (RouteResponse) {} + // Fetch routing information for tables. The smallest unit is the complete // routing information(all regions) of a table. // @@ -26,7 +28,14 @@ service Router { // rpc Route(RouteRequest) returns (RouteResponse) {} - rpc Create(CreateRequest) returns (RouteResponse) {} + rpc Delete(DeleteRequest) returns (RouteResponse) {} +} + +message CreateRequest { + RequestHeader header = 1; + + TableName table_name = 2; + repeated Partition partitions = 3; } message RouteRequest { @@ -35,6 +44,12 @@ message RouteRequest { repeated TableName table_names = 2; } +message DeleteRequest { + RequestHeader header = 1; + + TableName table_name = 2; +} + message RouteResponse { ResponseHeader header = 1; @@ -42,13 +57,6 @@ message RouteResponse { repeated TableRoute table_routes = 3; } -message CreateRequest { - RequestHeader header = 1; - - TableName table_name = 2; - repeated Partition partitions = 3; -} - message TableRoute { Table table = 1; repeated RegionRoute region_routes = 2; diff --git a/src/api/greptime/v1/meta/store.proto b/src/api/greptime/v1/meta/store.proto index 3931cc1af1..cd951f454e 100644 --- a/src/api/greptime/v1/meta/store.proto +++ b/src/api/greptime/v1/meta/store.proto @@ -20,6 +20,9 @@ service Store { // DeleteRange deletes the given range from the key-value store. rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse); + + // MoveValue atomically renames the key to the given updated key. + rpc MoveValue(MoveValueRequest) returns (MoveValueResponse); } message RangeRequest { @@ -136,3 +139,21 @@ message DeleteRangeResponse { // returned. repeated KeyValue prev_kvs = 3; } + +message MoveValueRequest { + RequestHeader header = 1; + + // If from_key dose not exist, return the value of to_key (if it exists). + // If from_key exists, move the value of from_key to to_key (i.e. rename), + // and return the value. + bytes from_key = 2; + bytes to_key = 3; +} + +message MoveValueResponse { + ResponseHeader header = 1; + + // If from_key dose not exist, return the value of to_key (if it exists). + // If from_key exists, return the value of from_key. + KeyValue kv = 2; +} diff --git a/src/api/src/v1/meta.rs b/src/api/src/v1/meta.rs index 2959e08d68..d2db34c5fc 100644 --- a/src/api/src/v1/meta.rs +++ b/src/api/src/v1/meta.rs @@ -145,10 +145,12 @@ gen_set_header!(HeartbeatRequest); gen_set_header!(RouteRequest); gen_set_header!(CreateRequest); gen_set_header!(RangeRequest); +gen_set_header!(DeleteRequest); gen_set_header!(PutRequest); gen_set_header!(BatchPutRequest); gen_set_header!(CompareAndPutRequest); gen_set_header!(DeleteRangeRequest); +gen_set_header!(MoveValueRequest); #[cfg(test)] mod tests { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 547bf63870..1c88c832c1 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -27,10 +27,11 @@ use store::Client as StoreClient; pub use self::heartbeat::{HeartbeatSender, HeartbeatStream}; use crate::error; use crate::error::Result; +use crate::rpc::router::DeleteRequest; use crate::rpc::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest, - DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, - RouteRequest, RouteResponse, + DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, RouteRequest, RouteResponse, }; pub type Id = (u64, u64); @@ -206,6 +207,13 @@ impl MetaClient { self.router_client()?.route(req.into()).await?.try_into() } + /// Can be called repeatedly, the first call will delete and return the + /// table of routing information, the nth call can still return the + /// deleted route information. + pub async fn delete_route(&self, req: DeleteRequest) -> Result { + self.router_client()?.delete(req.into()).await?.try_into() + } + /// Range gets the keys in the range from the key-value store. pub async fn range(&self, req: RangeRequest) -> Result { self.store_client()?.range(req.into()).await?.try_into() @@ -241,6 +249,14 @@ impl MetaClient { .try_into() } + /// MoveValue atomically renames the key to the given updated key. + pub async fn move_value(&self, req: MoveValueRequest) -> Result { + self.store_client()? + .move_value(req.into()) + .await? + .try_into() + } + #[inline] pub fn heartbeat_client(&self) -> Result { self.heartbeat.clone().context(error::NotStartedSnafu { @@ -286,6 +302,52 @@ mod tests { use crate::mocks; use crate::rpc::{Partition, TableName}; + const TEST_KEY_PREFIX: &str = "__unit_test__meta__"; + + struct TestClient { + ns: String, + client: MetaClient, + } + + impl TestClient { + async fn new(ns: impl Into) -> Self { + // can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await; + let client = mocks::mock_client_with_memstore().await; + Self { + ns: ns.into(), + client, + } + } + + fn key(&self, name: &str) -> Vec { + format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes() + } + + async fn gen_data(&self) { + for i in 0..10 { + let req = PutRequest::new() + .with_key(self.key(&format!("key-{}", i))) + .with_value(format!("{}-{}", "value", i).into_bytes()) + .with_prev_kv(); + let res = self.client.put(req).await; + assert!(res.is_ok()); + } + } + + async fn clear_data(&self) { + let req = + DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns)); + let res = self.client.delete_range(req).await; + assert!(res.is_ok()); + } + } + + async fn new_client(ns: impl Into) -> TestClient { + let client = TestClient::new(ns).await; + client.clear_data().await; + client + } + #[tokio::test] async fn test_meta_client_builder() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; @@ -373,15 +435,15 @@ mod tests { #[tokio::test] async fn test_ask_leader() { - let client = mocks::mock_client_with_memstore().await; - let res = client.ask_leader().await; + let tc = new_client("test_ask_leader").await; + let res = tc.client.ask_leader().await; assert!(res.is_ok()); } #[tokio::test] async fn test_heartbeat() { - let client = mocks::mock_client_with_memstore().await; - let (sender, mut receiver) = client.heartbeat().await.unwrap(); + let tc = new_client("test_heartbeat").await; + let (sender, mut receiver) = tc.client.heartbeat().await.unwrap(); // send heartbeats tokio::spawn(async move { for _ in 0..5 { @@ -449,66 +511,58 @@ mod tests { let res = client.create_route(req).await.unwrap(); assert_eq!(1, res.table_routes.len()); - let req = RouteRequest::new().add_table_name(table_name); + let req = RouteRequest::new().add_table_name(table_name.clone()); let res = client.route(req).await.unwrap(); // empty table_routes since no TableGlobalValue is stored by datanode assert!(res.table_routes.is_empty()); - } - async fn gen_data(client: &MetaClient) { - for i in 0..10 { - let req = PutRequest::new() - .with_key(format!("{}-{}", "key", i).into_bytes()) - .with_value(format!("{}-{}", "value", i).into_bytes()) - .with_prev_kv(); - let res = client.put(req).await; - assert!(res.is_ok()); - } + let req = DeleteRequest::new(table_name.clone()); + let res = client.delete_route(req).await; + // empty table_routes since no TableGlobalValue is stored by datanode + assert!(res.is_err()); } #[tokio::test] async fn test_range_get() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_range_get").await; + tc.gen_data().await; - gen_data(&client).await; - - let req = RangeRequest::new().with_key(b"key-0".to_vec()); - let res = client.range(req).await; + let key = tc.key("key-0"); + let req = RangeRequest::new().with_key(key.as_slice()); + let res = tc.client.range(req).await; let mut kvs = res.unwrap().take_kvs(); assert_eq!(1, kvs.len()); let mut kv = kvs.pop().unwrap(); - assert_eq!(b"key-0".to_vec(), kv.take_key()); + assert_eq!(key, kv.take_key()); assert_eq!(b"value-0".to_vec(), kv.take_value()); } #[tokio::test] async fn test_range_get_prefix() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_range_get_prefix").await; + tc.gen_data().await; - gen_data(&client).await; - - let req = RangeRequest::new().with_prefix(b"key-".to_vec()); - let res = client.range(req).await; + let req = RangeRequest::new().with_prefix(tc.key("key-")); + let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); assert_eq!(10, kvs.len()); for (i, mut kv) in kvs.into_iter().enumerate() { - assert_eq!(format!("{}-{}", "key", i).into_bytes(), kv.take_key()); + assert_eq!(tc.key(&format!("key-{}", i)), kv.take_key()); assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value()); } } #[tokio::test] async fn test_range() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_range").await; + tc.gen_data().await; - gen_data(&client).await; - - let req = RangeRequest::new().with_range(b"key-5".to_vec(), b"key-8".to_vec()); - let res = client.range(req).await; + let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8")); + let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); assert_eq!(3, kvs.len()); for (i, mut kv) in kvs.into_iter().enumerate() { - assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key()); + assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key()); assert_eq!( format!("{}-{}", "value", i + 5).into_bytes(), kv.take_value() @@ -518,121 +572,129 @@ mod tests { #[tokio::test] async fn test_range_keys_only() { - let client = mocks::mock_client_with_memstore().await; - - gen_data(&client).await; + let tc = new_client("test_range_keys_only").await; + tc.gen_data().await; let req = RangeRequest::new() - .with_range(b"key-5".to_vec(), b"key-8".to_vec()) + .with_range(tc.key("key-5"), tc.key("key-8")) .with_keys_only(); - let res = client.range(req).await; + let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); assert_eq!(3, kvs.len()); for (i, mut kv) in kvs.into_iter().enumerate() { - assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key()); + assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key()); assert!(kv.take_value().is_empty()); } } #[tokio::test] async fn test_put() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_put").await; + let req = PutRequest::new() - .with_key(b"key".to_vec()) + .with_key(tc.key("key")) .with_value(b"value".to_vec()); - let res = client.put(req).await; + let res = tc.client.put(req).await; assert!(res.unwrap().take_prev_kv().is_none()); } #[tokio::test] async fn test_put_with_prev_kv() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_put_with_prev_kv").await; + + let key = tc.key("key"); let req = PutRequest::new() - .with_key(b"key".to_vec()) + .with_key(key.as_slice()) .with_value(b"value".to_vec()) .with_prev_kv(); - let res = client.put(req).await; + let res = tc.client.put(req).await; assert!(res.unwrap().take_prev_kv().is_none()); let req = PutRequest::new() - .with_key(b"key".to_vec()) + .with_key(key.as_slice()) .with_value(b"value1".to_vec()) .with_prev_kv(); - let res = client.put(req).await; + let res = tc.client.put(req).await; let mut kv = res.unwrap().take_prev_kv().unwrap(); - assert_eq!(b"key".to_vec(), kv.take_key()); + assert_eq!(key, kv.take_key()); assert_eq!(b"value".to_vec(), kv.take_value()); } #[tokio::test] async fn test_batch_put() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_batch_put").await; + let req = BatchPutRequest::new() - .add_kv(b"key".to_vec(), b"value".to_vec()) - .add_kv(b"key2".to_vec(), b"value2".to_vec()); - let res = client.batch_put(req).await; + .add_kv(tc.key("key"), b"value".to_vec()) + .add_kv(tc.key("key2"), b"value2".to_vec()); + let res = tc.client.batch_put(req).await; assert_eq!(0, res.unwrap().take_prev_kvs().len()); - let req = RangeRequest::new().with_range(b"key".to_vec(), b"key3".to_vec()); - let res = client.range(req).await; + let req = RangeRequest::new().with_range(tc.key("key"), tc.key("key3")); + let res = tc.client.range(req).await; let kvs = res.unwrap().take_kvs(); assert_eq!(2, kvs.len()); } #[tokio::test] async fn test_batch_put_with_prev_kv() { - let client = mocks::mock_client_with_memstore().await; - let req = BatchPutRequest::new().add_kv(b"key".to_vec(), b"value".to_vec()); - let res = client.batch_put(req).await; + let tc = new_client("test_batch_put_with_prev_kv").await; + + let key = tc.key("key"); + let key2 = tc.key("key2"); + let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec()); + let res = tc.client.batch_put(req).await; assert_eq!(0, res.unwrap().take_prev_kvs().len()); let req = BatchPutRequest::new() - .add_kv(b"key".to_vec(), b"value-".to_vec()) - .add_kv(b"key2".to_vec(), b"value2-".to_vec()) + .add_kv(key.as_slice(), b"value-".to_vec()) + .add_kv(key2.as_slice(), b"value2-".to_vec()) .with_prev_kv(); - let res = client.batch_put(req).await; + let res = tc.client.batch_put(req).await; let mut kvs = res.unwrap().take_prev_kvs(); assert_eq!(1, kvs.len()); let mut kv = kvs.pop().unwrap(); - assert_eq!(b"key".to_vec(), kv.take_key()); + assert_eq!(key, kv.take_key()); assert_eq!(b"value".to_vec(), kv.take_value()); } #[tokio::test] async fn test_compare_and_put() { - let client = mocks::mock_client_with_memstore().await; + let tc = new_client("test_compare_and_put").await; + + let key = tc.key("key"); let req = CompareAndPutRequest::new() - .with_key(b"key".to_vec()) + .with_key(key.as_slice()) .with_expect(b"expect".to_vec()) .with_value(b"value".to_vec()); - let res = client.compare_and_put(req).await; + let res = tc.client.compare_and_put(req).await; assert!(!res.unwrap().is_success()); // create if absent let req = CompareAndPutRequest::new() - .with_key(b"key".to_vec()) + .with_key(key.as_slice()) .with_value(b"value".to_vec()); - let res = client.compare_and_put(req).await; + let res = tc.client.compare_and_put(req).await; let mut res = res.unwrap(); assert!(res.is_success()); assert!(res.take_prev_kv().is_none()); // compare and put fail let req = CompareAndPutRequest::new() - .with_key(b"key".to_vec()) + .with_key(key.as_slice()) .with_expect(b"not_eq".to_vec()) .with_value(b"value2".to_vec()); - let res = client.compare_and_put(req).await; + let res = tc.client.compare_and_put(req).await; let mut res = res.unwrap(); assert!(!res.is_success()); assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value()); // compare and put success let req = CompareAndPutRequest::new() - .with_key(b"key".to_vec()) + .with_key(key.as_slice()) .with_expect(b"value".to_vec()) .with_value(b"value2".to_vec()); - let res = client.compare_and_put(req).await; + let res = tc.client.compare_and_put(req).await; let mut res = res.unwrap(); assert!(res.is_success()); assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value()); @@ -640,14 +702,13 @@ mod tests { #[tokio::test] async fn test_delete_with_key() { - let client = mocks::mock_client_with_memstore().await; - - gen_data(&client).await; + let tc = new_client("test_delete_with_key").await; + tc.gen_data().await; let req = DeleteRangeRequest::new() - .with_key(b"key-0".to_vec()) + .with_key(tc.key("key-0")) .with_prev_kv(); - let res = client.delete_range(req).await; + let res = tc.client.delete_range(req).await; let mut res = res.unwrap(); assert_eq!(1, res.deleted()); let mut kvs = res.take_prev_kvs(); @@ -658,14 +719,13 @@ mod tests { #[tokio::test] async fn test_delete_with_prefix() { - let client = mocks::mock_client_with_memstore().await; - - gen_data(&client).await; + let tc = new_client("test_delete_with_prefix").await; + tc.gen_data().await; let req = DeleteRangeRequest::new() - .with_prefix(b"key-".to_vec()) + .with_prefix(tc.key("key-")) .with_prev_kv(); - let res = client.delete_range(req).await; + let res = tc.client.delete_range(req).await; let mut res = res.unwrap(); assert_eq!(10, res.deleted()); let kvs = res.take_prev_kvs(); @@ -677,14 +737,13 @@ mod tests { #[tokio::test] async fn test_delete_with_range() { - let client = mocks::mock_client_with_memstore().await; - - gen_data(&client).await; + let tc = new_client("test_delete_with_range").await; + tc.gen_data().await; let req = DeleteRangeRequest::new() - .with_range(b"key-2".to_vec(), b"key-7".to_vec()) + .with_range(tc.key("key-2"), tc.key("key-7")) .with_prev_kv(); - let res = client.delete_range(req).await; + let res = tc.client.delete_range(req).await; let mut res = res.unwrap(); assert_eq!(5, res.deleted()); let kvs = res.take_prev_kvs(); @@ -696,4 +755,38 @@ mod tests { ); } } + + #[tokio::test] + async fn test_move_value() { + let tc = new_client("test_move_value").await; + + let from_key = tc.key("from_key"); + let to_key = tc.key("to_key"); + + let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice()); + let res = tc.client.move_value(req).await; + assert!(res.unwrap().take_kv().is_none()); + + let req = PutRequest::new() + .with_key(to_key.as_slice()) + .with_value(b"value".to_vec()); + let _ = tc.client.put(req).await; + + let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice()); + let res = tc.client.move_value(req).await; + let mut kv = res.unwrap().take_kv().unwrap(); + assert_eq!(to_key.clone(), kv.take_key()); + assert_eq!(b"value".to_vec(), kv.take_value()); + + let req = PutRequest::new() + .with_key(from_key.as_slice()) + .with_value(b"value2".to_vec()); + let _ = tc.client.put(req).await; + + let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice()); + let res = tc.client.move_value(req).await; + let mut kv = res.unwrap().take_kv().unwrap(); + assert_eq!(from_key, kv.take_key()); + assert_eq!(b"value2".to_vec(), kv.take_value()); + } } diff --git a/src/meta-client/src/client/router.rs b/src/meta-client/src/client/router.rs index ec3126f483..ab6f3b459f 100644 --- a/src/meta-client/src/client/router.rs +++ b/src/meta-client/src/client/router.rs @@ -16,7 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::router_client::RouterClient; -use api::v1::meta::{CreateRequest, RouteRequest, RouteResponse}; +use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse}; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::RwLock; @@ -65,6 +65,11 @@ impl Client { let inner = self.inner.read().await; inner.route(req).await } + + pub async fn delete(&self, req: DeleteRequest) -> Result { + let inner = self.inner.read().await; + inner.delete(req).await + } } #[derive(Debug)] @@ -98,6 +103,14 @@ impl Inner { Ok(()) } + async fn create(&self, mut req: CreateRequest) -> Result { + let mut client = self.random_client()?; + req.set_header(self.id); + let res = client.create(req).await.context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + async fn route(&self, mut req: RouteRequest) -> Result { let mut client = self.random_client()?; req.set_header(self.id); @@ -106,10 +119,10 @@ impl Inner { Ok(res.into_inner()) } - async fn create(&self, mut req: CreateRequest) -> Result { + async fn delete(&self, mut req: DeleteRequest) -> Result { let mut client = self.random_client()?; req.set_header(self.id); - let res = client.create(req).await.context(error::TonicStatusSnafu)?; + let res = client.delete(req).await.context(error::TonicStatusSnafu)?; Ok(res.into_inner()) } diff --git a/src/meta-client/src/client/store.rs b/src/meta-client/src/client/store.rs index 400e2fbe4f..be860419f9 100644 --- a/src/meta-client/src/client/store.rs +++ b/src/meta-client/src/client/store.rs @@ -18,7 +18,8 @@ use std::sync::Arc; use api::v1::meta::store_client::StoreClient; use api::v1::meta::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, }; use common_grpc::channel_manager::ChannelManager; use snafu::{ensure, OptionExt, ResultExt}; @@ -86,6 +87,11 @@ impl Client { let inner = self.inner.read().await; inner.delete_range(req).await } + + pub async fn move_value(&self, req: MoveValueRequest) -> Result { + let inner = self.inner.read().await; + inner.move_value(req).await + } } #[derive(Debug)] @@ -171,6 +177,17 @@ impl Inner { Ok(res.into_inner()) } + async fn move_value(&self, mut req: MoveValueRequest) -> Result { + let mut client = self.random_client()?; + req.set_header(self.id); + let res = client + .move_value(req) + .await + .context(error::TonicStatusSnafu)?; + + Ok(res.into_inner()) + } + fn random_client(&self) -> Result> { let len = self.peers.len(); let peer = lb::random_get(len, |i| Some(&self.peers[i])).context( diff --git a/src/meta-client/src/rpc.rs b/src/meta-client/src/rpc.rs index b6f0dc7b5c..23c4f2ac58 100644 --- a/src/meta-client/src/rpc.rs +++ b/src/meta-client/src/rpc.rs @@ -28,7 +28,8 @@ pub use router::{ use serde::{Deserialize, Serialize}; pub use store::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, }; #[derive(Debug, Clone)] diff --git a/src/meta-client/src/rpc/router.rs b/src/meta-client/src/rpc/router.rs index 361b2fe788..9cc63acb70 100644 --- a/src/meta-client/src/rpc/router.rs +++ b/src/meta-client/src/rpc/router.rs @@ -15,8 +15,9 @@ use std::collections::HashMap; use api::v1::meta::{ - CreateRequest as PbCreateRequest, Partition as PbPartition, Region as PbRegion, - RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, + CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition, + Region as PbRegion, RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, + Table as PbTable, }; use serde::{Deserialize, Serialize, Serializer}; use snafu::OptionExt; @@ -25,6 +26,38 @@ use crate::error; use crate::error::Result; use crate::rpc::{util, Peer, TableName}; +#[derive(Debug, Clone)] +pub struct CreateRequest { + pub table_name: TableName, + pub partitions: Vec, +} + +impl From for PbCreateRequest { + fn from(mut req: CreateRequest) -> Self { + Self { + header: None, + table_name: Some(req.table_name.into()), + partitions: req.partitions.drain(..).map(Into::into).collect(), + } + } +} + +impl CreateRequest { + #[inline] + pub fn new(table_name: TableName) -> Self { + Self { + table_name, + partitions: vec![], + } + } + + #[inline] + pub fn add_partition(mut self, partition: Partition) -> Self { + self.partitions.push(partition); + self + } +} + #[derive(Debug, Clone, Default)] pub struct RouteRequest { pub table_names: Vec, @@ -55,34 +88,23 @@ impl RouteRequest { } #[derive(Debug, Clone)] -pub struct CreateRequest { +pub struct DeleteRequest { pub table_name: TableName, - pub partitions: Vec, } -impl From for PbCreateRequest { - fn from(mut req: CreateRequest) -> Self { +impl From for PbDeleteRequest { + fn from(req: DeleteRequest) -> Self { Self { header: None, table_name: Some(req.table_name.into()), - partitions: req.partitions.drain(..).map(Into::into).collect(), } } } -impl CreateRequest { +impl DeleteRequest { #[inline] pub fn new(table_name: TableName) -> Self { - Self { - table_name, - partitions: vec![], - } - } - - #[inline] - pub fn add_partition(mut self, partition: Partition) -> Self { - self.partitions.push(partition); - self + Self { table_name } } } @@ -275,33 +297,14 @@ impl From for Partition { #[cfg(test)] mod tests { use api::v1::meta::{ - Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute, - RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable, - TableName as PbTableName, TableRoute as PbTableRoute, + DeleteRequest as PbDeleteRequest, Partition as PbPartition, Peer as PbPeer, + Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest, + RouteResponse as PbRouteResponse, Table as PbTable, TableName as PbTableName, + TableRoute as PbTableRoute, }; use super::*; - #[test] - fn test_route_request_trans() { - let req = RouteRequest { - table_names: vec![ - TableName::new("c1", "s1", "t1"), - TableName::new("c2", "s2", "t2"), - ], - }; - - let into_req: PbRouteRequest = req.into(); - - assert!(into_req.header.is_none()); - assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name); - assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name); - assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name); - assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name); - assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name); - assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name); - } - #[test] fn test_create_request_trans() { let req = CreateRequest { @@ -343,6 +346,40 @@ mod tests { ); } + #[test] + fn test_route_request_trans() { + let req = RouteRequest { + table_names: vec![ + TableName::new("c1", "s1", "t1"), + TableName::new("c2", "s2", "t2"), + ], + }; + + let into_req: PbRouteRequest = req.into(); + + assert!(into_req.header.is_none()); + assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name); + assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name); + assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name); + assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name); + assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name); + assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name); + } + + #[test] + fn test_delete_request_trans() { + let req = DeleteRequest { + table_name: TableName::new("c1", "s1", "t1"), + }; + + let into_req: PbDeleteRequest = req.into(); + + assert!(into_req.header.is_none()); + assert_eq!("c1", into_req.table_name.as_ref().unwrap().catalog_name); + assert_eq!("s1", into_req.table_name.as_ref().unwrap().schema_name); + assert_eq!("t1", into_req.table_name.as_ref().unwrap().table_name); + } + #[test] fn test_route_response_trans() { let res = PbRouteResponse { diff --git a/src/meta-client/src/rpc/store.rs b/src/meta-client/src/rpc/store.rs index 73fa8e002d..9c7f53dc6e 100644 --- a/src/meta-client/src/rpc/store.rs +++ b/src/meta-client/src/rpc/store.rs @@ -17,6 +17,7 @@ use api::v1::meta::{ CompareAndPutRequest as PbCompareAndPutRequest, CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest, DeleteRangeResponse as PbDeleteRangeResponse, KeyValue as PbKeyValue, + MoveValueRequest as PbMoveValueRequest, MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, }; @@ -511,6 +512,7 @@ impl DeleteRangeResponse { self.0.header.take().map(ResponseHeader::new) } + #[inline] pub fn deleted(&self) -> i64 { self.0.deleted } @@ -521,6 +523,65 @@ impl DeleteRangeResponse { } } +#[derive(Debug, Clone, Default)] +pub struct MoveValueRequest { + /// If from_key dose not exist, return the value of to_key (if it exists). + /// If from_key exists, move the value of from_key to to_key (i.e. rename), + /// and return the value. + pub from_key: Vec, + pub to_key: Vec, +} + +impl From for PbMoveValueRequest { + fn from(req: MoveValueRequest) -> Self { + Self { + header: None, + from_key: req.from_key, + to_key: req.to_key, + } + } +} + +impl MoveValueRequest { + #[inline] + pub fn new(from_key: impl Into>, to_key: impl Into>) -> Self { + Self { + from_key: from_key.into(), + to_key: to_key.into(), + } + } +} + +#[derive(Debug, Clone)] +pub struct MoveValueResponse(PbMoveValueResponse); + +impl TryFrom for MoveValueResponse { + type Error = error::Error; + + fn try_from(pb: PbMoveValueResponse) -> Result { + util::check_response_header(pb.header.as_ref())?; + + Ok(Self::new(pb)) + } +} + +impl MoveValueResponse { + #[inline] + pub fn new(res: PbMoveValueResponse) -> Self { + Self(res) + } + + #[inline] + pub fn take_header(&mut self) -> Option { + self.0.header.take().map(ResponseHeader::new) + } + + #[inline] + pub fn take_kv(&mut self) -> Option { + self.0.kv.take().map(KeyValue::new) + } +} + #[cfg(test)] mod tests { use api::v1::meta::{ @@ -528,8 +589,10 @@ mod tests { CompareAndPutRequest as PbCompareAndPutRequest, CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest, DeleteRangeResponse as PbDeleteRangeResponse, - KeyValue as PbKeyValue, PutRequest as PbPutRequest, PutResponse as PbPutResponse, - RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, + KeyValue as PbKeyValue, MoveValueRequest as PbMoveValueRequest, + MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest, + PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, + RangeResponse as PbRangeResponse, }; use super::*; @@ -775,4 +838,35 @@ mod tests { assert_eq!(b"v2".to_vec(), kv1.value().to_vec()); assert_eq!(b"v2".to_vec(), kv1.take_value()); } + + #[test] + fn test_move_value_request_trans() { + let (from_key, to_key) = (b"test_key1".to_vec(), b"test_key2".to_vec()); + + let req = MoveValueRequest::new(from_key.clone(), to_key.clone()); + + let into_req: PbMoveValueRequest = req.into(); + assert!(into_req.header.is_none()); + assert_eq!(from_key, into_req.from_key); + assert_eq!(to_key, into_req.to_key); + } + + #[test] + fn test_move_value_response_trans() { + let pb_res = PbMoveValueResponse { + header: None, + kv: Some(PbKeyValue { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + }), + }; + + let mut res = MoveValueResponse::new(pb_res); + assert!(res.take_header().is_none()); + let mut kv = res.take_kv().unwrap(); + assert_eq!(b"k1".to_vec(), kv.key().to_vec()); + assert_eq!(b"k1".to_vec(), kv.take_key()); + assert_eq!(b"v1".to_vec(), kv.value().to_vec()); + assert_eq!(b"v1".to_vec(), kv.take_value()); + } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 359547c461..f010303a98 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -123,6 +123,15 @@ pub enum Error { #[snafu(display("MetaSrv has no leader at this moment"))] NoLeader { backtrace: Backtrace }, + + #[snafu(display("Table {} not found", name))] + TableNotFound { name: String, backtrace: Backtrace }, + + #[snafu(display( + "Failed to move the value of {} because other clients caused a race condition", + key + ))] + MoveValue { key: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -162,7 +171,9 @@ impl ErrorExt for Error { | Error::UnexceptedSequenceValue { .. } | Error::TableRouteNotFound { .. } | Error::NextSequence { .. } + | Error::MoveValue { .. } | Error::InvalidTxnResult { .. } => StatusCode::Unexpected, + Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidCatalogValue { source, .. } => source.status_code(), } } diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index b7e215fec9..6add27c86b 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -24,6 +24,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error; use crate::error::Result; +pub(crate) const REMOVED_PREFIX: &str = "__removed"; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route"; @@ -149,6 +150,7 @@ impl<'a> TableRouteKey<'a> { } } + #[inline] pub fn prefix(&self) -> String { format!( "{}-{}-{}-{}", @@ -156,9 +158,15 @@ impl<'a> TableRouteKey<'a> { ) } + #[inline] pub fn key(&self) -> String { format!("{}-{}", self.prefix(), self.table_id) } + + #[inline] + pub fn removed_key(&self) -> String { + format!("{}-{}", REMOVED_PREFIX, self.key()) + } } #[cfg(test)] diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs index 2737925bbd..ffbf250b17 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/meta-srv/src/sequence.rs @@ -205,6 +205,13 @@ mod tests { ) -> Result { unreachable!() } + + async fn move_value( + &self, + _: api::v1::meta::MoveValueRequest, + ) -> Result { + unreachable!() + } } let kv_store = Arc::new(Noop {}); diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 0c502be094..1162b34dac 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -13,8 +13,9 @@ // limitations under the License. use api::v1::meta::{ - router_server, CreateRequest, Error, PeerDict, PutRequest, RangeRequest, Region, RegionRoute, - ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue, + router_server, CreateRequest, DeleteRequest, Error, MoveValueRequest, Peer, PeerDict, + PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse, + Table, TableRoute, TableRouteValue, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_telemetry::warn; @@ -31,14 +32,6 @@ use crate::service::GrpcResult; #[async_trait::async_trait] impl router_server::Router for MetaSrv { - async fn route(&self, req: Request) -> GrpcResult { - let req = req.into_inner(); - let ctx = self.new_ctx(); - let res = handle_route(req, ctx).await?; - - Ok(Response::new(res)) - } - async fn create(&self, req: Request) -> GrpcResult { let req = req.into_inner(); let ctx = self.new_ctx(); @@ -48,56 +41,22 @@ impl router_server::Router for MetaSrv { Ok(Response::new(res)) } -} -async fn handle_route(req: RouteRequest, ctx: Context) -> Result { - let RouteRequest { - header, - table_names, - } = req; - let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); - let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey { - catalog_name: t.catalog_name, - schema_name: t.schema_name, - table_name: t.table_name, - }); - let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?; + async fn route(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let ctx = self.new_ctx(); + let res = handle_route(req, ctx).await?; - let mut peer_dict = PeerDict::default(); - let mut table_routes = vec![]; - for (tg, tr) in tables { - let TableRouteValue { - peers, - mut table_route, - } = tr; - if let Some(table_route) = &mut table_route { - for rr in &mut table_route.region_routes { - if let Some(peer) = peers.get(rr.leader_peer_index as usize) { - rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64; - } - for index in &mut rr.follower_peer_indexes { - if let Some(peer) = peers.get(*index as usize) { - *index = peer_dict.get_or_insert(peer.clone()) as u64; - } - } - } - - if let Some(table) = &mut table_route.table { - table.table_schema = tg.as_bytes().context(error::InvalidCatalogValueSnafu)?; - } - } - if let Some(table_route) = table_route { - table_routes.push(table_route) - } + Ok(Response::new(res)) } - let peers = peer_dict.into_peers(); - let header = Some(ResponseHeader::success(cluster_id)); - Ok(RouteResponse { - header, - peers, - table_routes, - }) + async fn delete(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let ctx = self.new_ctx(); + let res = handle_delete(req, ctx).await?; + + Ok(Response::new(res)) + } } async fn handle_create( @@ -169,6 +128,90 @@ async fn handle_create( }) } +async fn handle_route(req: RouteRequest, ctx: Context) -> Result { + let RouteRequest { + header, + table_names, + } = req; + let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); + let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey { + catalog_name: t.catalog_name, + schema_name: t.schema_name, + table_name: t.table_name, + }); + let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?; + let (peers, table_routes) = fill_table_routes(tables)?; + + let header = Some(ResponseHeader::success(cluster_id)); + Ok(RouteResponse { + header, + peers, + table_routes, + }) +} + +async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result { + let DeleteRequest { header, table_name } = req; + let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); + let tgk = table_name + .map(|t| TableGlobalKey { + catalog_name: t.catalog_name, + schema_name: t.schema_name, + table_name: t.table_name, + }) + .context(error::EmptyTableNameSnafu)?; + + let tgv = get_table_global_value(&ctx.kv_store, &tgk) + .await? + .with_context(|| error::TableNotFoundSnafu { + name: format!("{}", tgk), + })?; + let trk = TableRouteKey::with_table_global_key(tgv.table_id() as u64, &tgk); + let (_, trv) = remove_table_route_value(&ctx.kv_store, &trk).await?; + let (peers, table_routes) = fill_table_routes(vec![(tgv, trv)])?; + + let header = Some(ResponseHeader::success(cluster_id)); + Ok(RouteResponse { + header, + peers, + table_routes, + }) +} + +fn fill_table_routes( + tables: Vec<(TableGlobalValue, TableRouteValue)>, +) -> Result<(Vec, Vec)> { + let mut peer_dict = PeerDict::default(); + let mut table_routes = vec![]; + for (tgv, trv) in tables { + let TableRouteValue { + peers, + mut table_route, + } = trv; + if let Some(table_route) = &mut table_route { + for rr in &mut table_route.region_routes { + if let Some(peer) = peers.get(rr.leader_peer_index as usize) { + rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64; + } + for index in &mut rr.follower_peer_indexes { + if let Some(peer) = peers.get(*index as usize) { + *index = peer_dict.get_or_insert(peer.clone()) as u64; + } + } + } + + if let Some(table) = &mut table_route.table { + table.table_schema = tgv.as_bytes().context(error::InvalidCatalogValueSnafu)?; + } + } + if let Some(table_route) = table_route { + table_routes.push(table_route) + } + } + + Ok((peer_dict.into_peers(), table_routes)) +} + async fn fetch_tables( kv_store: &KvStoreRef, keys: impl Iterator, @@ -176,18 +219,18 @@ async fn fetch_tables( let mut tables = vec![]; // Maybe we can optimize the for loop in the future, but in general, // there won't be many keys, in fact, there is usually just one. - for tk in keys { - let tv = get_table_global_value(kv_store, &tk).await?; - if tv.is_none() { - warn!("Table global value is absent: {}", tk); + for tgk in keys { + let tgv = get_table_global_value(kv_store, &tgk).await?; + if tgv.is_none() { + warn!("Table global value is absent: {}", tgk); continue; } - let tv = tv.unwrap(); + let tgv = tgv.unwrap(); - let tr_key = TableRouteKey::with_table_global_key(tv.table_id() as u64, &tk); - let tr = get_table_route_value(kv_store, &tr_key).await?; + let trk = TableRouteKey::with_table_global_key(tgv.table_id() as u64, &tgk); + let trv = get_table_route_value(kv_store, &trk).await?; - tables.push((tv, tr)); + tables.push((tgv, trv)); } Ok(tables) @@ -197,15 +240,32 @@ async fn get_table_route_value( kv_store: &KvStoreRef, key: &TableRouteKey<'_>, ) -> Result { - let tr = get_from_store(kv_store, key.key().into_bytes()) + let trv = get_from_store(kv_store, key.key().into_bytes()) .await? .context(error::TableRouteNotFoundSnafu { key: key.key() })?; - let tr: TableRouteValue = tr + let trv: TableRouteValue = trv .as_slice() .try_into() .context(error::DecodeTableRouteSnafu)?; - Ok(tr) + Ok(trv) +} + +async fn remove_table_route_value( + kv_store: &KvStoreRef, + key: &TableRouteKey<'_>, +) -> Result<(Vec, TableRouteValue)> { + let from_key = key.key().into_bytes(); + let to_key = key.removed_key().into_bytes(); + let v = move_value(kv_store, from_key, to_key) + .await? + .context(error::TableRouteNotFoundSnafu { key: key.key() })?; + let trv: TableRouteValue = + v.1.as_slice() + .try_into() + .context(error::DecodeTableRouteSnafu)?; + + Ok((v.0, trv)) } async fn get_table_global_value( @@ -223,6 +283,23 @@ async fn get_table_global_value( } } +async fn move_value( + kv_store: &KvStoreRef, + from_key: impl Into>, + to_key: impl Into>, +) -> Result, Vec)>> { + let from_key = from_key.into(); + let to_key = to_key.into(); + let move_req = MoveValueRequest { + from_key, + to_key, + ..Default::default() + }; + let res = kv_store.move_value(move_req).await?; + + Ok(res.kv.map(|kv| (kv.key, kv.value))) +} + async fn put_into_store( kv_store: &KvStoreRef, key: impl Into>, diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 0bb202a9b9..fc11900827 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -18,7 +18,8 @@ pub mod memory; use api::v1::meta::{ store_server, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, }; use tonic::{Request, Response}; @@ -67,6 +68,13 @@ impl store_server::Store for MetaSrv { Ok(Response::new(res)) } + + async fn move_value(&self, req: Request) -> GrpcResult { + let req = req.into_inner(); + let res = self.kv_store().move_value(req).await?; + + Ok(Response::new(res)) + } } #[cfg(test)] @@ -130,4 +138,14 @@ mod tests { assert!(res.is_ok()); } + + #[tokio::test] + async fn test_move_value() { + let kv_store = Arc::new(MemStore::new()); + let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await; + let req = MoveValueRequest::default(); + let res = meta_srv.move_value(req.into_request()).await; + + assert!(res.is_ok()); + } } diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index ffffabac9b..19b8f8da10 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -16,10 +16,11 @@ use std::sync::Arc; use api::v1::meta::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, KeyValue, PutRequest, PutResponse, RangeRequest, - RangeResponse, ResponseHeader, + DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, + PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, }; use common_error::prelude::*; +use common_telemetry::warn; use etcd_client::{ Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, }; @@ -63,11 +64,7 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let kvs = res - .kvs() - .iter() - .map(|kv| KvPair::new(kv).into()) - .collect::>(); + let kvs = res.kvs().iter().map(KvPair::to_kv).collect::>(); let header = Some(ResponseHeader::success(cluster_id)); Ok(RangeResponse { @@ -92,7 +89,7 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let prev_kv = res.prev_key().map(|kv| KvPair::new(kv).into()); + let prev_kv = res.prev_key().map(KvPair::to_kv); let header = Some(ResponseHeader::success(cluster_id)); Ok(PutResponse { header, prev_kv }) @@ -123,7 +120,7 @@ impl KvStore for EtcdStore { match op_res { TxnOpResponse::Put(put_res) => { if let Some(prev_kv) = put_res.prev_key() { - prev_kvs.push(KvPair::new(prev_kv).into()); + prev_kvs.push(KvPair::to_kv(prev_kv)); } } _ => unreachable!(), // never get here @@ -140,20 +137,23 @@ impl KvStore for EtcdStore { key, expect, value, - options, + put_options, } = req.try_into()?; - let put_op = vec![TxnOp::put(key.clone(), value, options)]; - let get_op = vec![TxnOp::get(key.clone(), None)]; - let mut txn = if expect.is_empty() { + let compare = if expect.is_empty() { // create if absent // revision 0 means key was not exist - Txn::new().when(vec![Compare::create_revision(key, CompareOp::Equal, 0)]) + Compare::create_revision(key.clone(), CompareOp::Equal, 0) } else { // compare and put - Txn::new().when(vec![Compare::value(key, CompareOp::Equal, expect)]) + Compare::value(key.clone(), CompareOp::Equal, expect) }; - txn = txn.and_then(put_op).or_else(get_op); + let put = TxnOp::put(key.clone(), value, put_options); + let get = TxnOp::get(key, None); + let txn = Txn::new() + .when(vec![compare]) + .and_then(vec![put]) + .or_else(vec![get]); let txn_res = self .client @@ -171,23 +171,8 @@ impl KvStore for EtcdStore { })?; let prev_kv = match op_res { - TxnOpResponse::Put(put_res) => { - put_res.prev_key().map(|kv| KeyValue::from(KvPair::new(kv))) - } - TxnOpResponse::Get(get_res) => { - if get_res.count() == 0 { - // do not exists - None - } else { - ensure!( - get_res.count() == 1, - error::InvalidTxnResultSnafu { - err_msg: format!("expect 1 response, actual {}", get_res.count()) - } - ); - Some(KeyValue::from(KvPair::new(&get_res.kvs()[0]))) - } - } + TxnOpResponse::Put(res) => res.prev_key().map(KvPair::to_kv), + TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::to_kv), _ => unreachable!(), // never get here }; @@ -213,11 +198,7 @@ impl KvStore for EtcdStore { .await .context(error::EtcdFailedSnafu)?; - let prev_kvs = res - .prev_kvs() - .iter() - .map(|kv| KvPair::new(kv).into()) - .collect::>(); + let prev_kvs = res.prev_kvs().iter().map(KvPair::to_kv).collect::>(); let header = Some(ResponseHeader::success(cluster_id)); Ok(DeleteRangeResponse { @@ -226,6 +207,83 @@ impl KvStore for EtcdStore { prev_kvs, }) } + + async fn move_value(&self, req: MoveValueRequest) -> Result { + let MoveValue { + cluster_id, + from_key, + to_key, + delete_options, + } = req.try_into()?; + + let mut client = self.client.kv_client(); + + let header = Some(ResponseHeader::success(cluster_id)); + // TODO(jiachun): Maybe it's better to let the users control it in the request + const MAX_RETRIES: usize = 8; + for _ in 0..MAX_RETRIES { + let from_key = from_key.as_slice(); + let to_key = to_key.as_slice(); + + let res = client + .get(from_key, None) + .await + .context(error::EtcdFailedSnafu)?; + + let txn = match res.kvs().first() { + None => { + // get `to_key` if `from_key` absent + // revision 0 means key was not exist + let compare = Compare::create_revision(from_key, CompareOp::Equal, 0); + let get = TxnOp::get(to_key, None); + Txn::new().when(vec![compare]).and_then(vec![get]) + } + Some(kv) => { + // compare `from_key` and move to `to_key` + let value = kv.value(); + let compare = Compare::value(from_key, CompareOp::Equal, value); + let delete = TxnOp::delete(from_key, delete_options.clone()); + let put = TxnOp::put(to_key, value, None); + Txn::new().when(vec![compare]).and_then(vec![delete, put]) + } + }; + + let txn_res = client.txn(txn).await.context(error::EtcdFailedSnafu)?; + + if !txn_res.succeeded() { + warn!( + "Failed to atomically move {:?} to {:?}, try again...", + String::from_utf8_lossy(from_key), + String::from_utf8_lossy(to_key) + ); + continue; + } + + // [`get_res'] or [`delete_res`, `put_res`], `put_res` will be ignored. + for op_res in txn_res.op_responses() { + match op_res { + TxnOpResponse::Get(res) => { + return Ok(MoveValueResponse { + header, + kv: res.kvs().first().map(KvPair::to_kv), + }); + } + TxnOpResponse::Delete(res) => { + return Ok(MoveValueResponse { + header, + kv: res.prev_kvs().first().map(KvPair::to_kv), + }); + } + _ => {} + } + } + } + + error::MoveValueSnafu { + key: String::from_utf8_lossy(&from_key), + } + .fail() + } } struct Get { @@ -333,7 +391,7 @@ struct CompareAndPut { key: Vec, expect: Vec, value: Vec, - options: Option, + put_options: Option, } impl TryFrom for CompareAndPut { @@ -352,7 +410,7 @@ impl TryFrom for CompareAndPut { key, expect, value, - options: Some(PutOptions::default().with_prev_key()), + put_options: Some(PutOptions::default().with_prev_key()), }) } } @@ -392,6 +450,32 @@ impl TryFrom for Delete { } } +struct MoveValue { + cluster_id: u64, + from_key: Vec, + to_key: Vec, + delete_options: Option, +} + +impl TryFrom for MoveValue { + type Error = error::Error; + + fn try_from(req: MoveValueRequest) -> Result { + let MoveValueRequest { + header, + from_key, + to_key, + } = req; + + Ok(MoveValue { + cluster_id: header.map_or(0, |h| h.cluster_id), + from_key, + to_key, + delete_options: Some(DeleteOptions::default().with_prev_key()), + }) + } +} + struct KvPair<'a>(&'a etcd_client::KeyValue); impl<'a> KvPair<'a> { @@ -400,6 +484,11 @@ impl<'a> KvPair<'a> { fn new(kv: &'a etcd_client::KeyValue) -> Self { Self(kv) } + + #[inline] + fn to_kv(kv: &etcd_client::KeyValue) -> KeyValue { + KeyValue::from(KvPair::new(kv)) + } } impl<'a> From> for KeyValue { @@ -479,7 +568,7 @@ mod tests { assert_eq!(b"test_key".to_vec(), compare_and_put.key); assert_eq!(b"test_expect".to_vec(), compare_and_put.expect); assert_eq!(b"test_value".to_vec(), compare_and_put.value); - assert!(compare_and_put.options.is_some()); + assert!(compare_and_put.put_options.is_some()); } #[test] @@ -496,4 +585,19 @@ mod tests { assert_eq!(b"test_key".to_vec(), delete.key); assert!(delete.options.is_some()); } + + #[test] + fn test_parse_move_value() { + let req = MoveValueRequest { + from_key: b"test_from_key".to_vec(), + to_key: b"test_to_key".to_vec(), + ..Default::default() + }; + + let move_value: MoveValue = req.try_into().unwrap(); + + assert_eq!(b"test_from_key".to_vec(), move_value.from_key); + assert_eq!(b"test_to_key".to_vec(), move_value.to_key); + assert!(move_value.delete_options.is_some()); + } } diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index 9238422831..7b5b43f9db 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -16,7 +16,8 @@ use std::sync::Arc; use api::v1::meta::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, + DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, }; use crate::error::Result; @@ -34,4 +35,6 @@ pub trait KvStore: Send + Sync { async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result; async fn delete_range(&self, req: DeleteRangeRequest) -> Result; + + async fn move_value(&self, req: MoveValueRequest) -> Result; } diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index 2e52e2bbe8..03efc74292 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use api::v1::meta::{ BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, - DeleteRangeRequest, DeleteRangeResponse, KeyValue, PutRequest, PutResponse, RangeRequest, - RangeResponse, ResponseHeader, + DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse, + PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader, }; use parking_lot::RwLock; @@ -219,4 +219,28 @@ impl KvStore for MemStore { }, }) } + + async fn move_value(&self, req: MoveValueRequest) -> Result { + let MoveValueRequest { + header, + from_key, + to_key, + } = req; + + let mut memory = self.inner.write(); + + let kv = match memory.remove(&from_key) { + Some(v) => { + memory.insert(to_key, v.clone()); + Some((from_key, v)) + } + None => memory.get(&to_key).map(|v| (to_key, v.clone())), + }; + + let kv = kv.map(|(key, value)| KeyValue { key, value }); + + let cluster_id = header.map_or(0, |h| h.cluster_id); + let header = Some(ResponseHeader::success(cluster_id)); + Ok(MoveValueResponse { header, kv }) + } }