feat: move_value & delete_route (#707)

* feat: move_value & delete_route

* chore: minor refactor

* chore: refactor unit test of metaclient

* chore: map to kv

* Update src/meta-srv/src/service/router.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* Update src/meta-srv/src/service/router.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: by code review

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Jiachun Feng
2022-12-09 11:07:48 +08:00
committed by GitHub
parent b26982c5d7
commit 949cd3e3af
17 changed files with 793 additions and 255 deletions

View File

@@ -5,6 +5,8 @@ package greptime.v1.meta;
import "greptime/v1/meta/common.proto";
service Router {
rpc Create(CreateRequest) returns (RouteResponse) {}
// Fetch routing information for tables. The smallest unit is the complete
// routing information(all regions) of a table.
//
@@ -26,7 +28,14 @@ service Router {
//
rpc Route(RouteRequest) returns (RouteResponse) {}
rpc Create(CreateRequest) returns (RouteResponse) {}
rpc Delete(DeleteRequest) returns (RouteResponse) {}
}
message CreateRequest {
RequestHeader header = 1;
TableName table_name = 2;
repeated Partition partitions = 3;
}
message RouteRequest {
@@ -35,6 +44,12 @@ message RouteRequest {
repeated TableName table_names = 2;
}
message DeleteRequest {
RequestHeader header = 1;
TableName table_name = 2;
}
message RouteResponse {
ResponseHeader header = 1;
@@ -42,13 +57,6 @@ message RouteResponse {
repeated TableRoute table_routes = 3;
}
message CreateRequest {
RequestHeader header = 1;
TableName table_name = 2;
repeated Partition partitions = 3;
}
message TableRoute {
Table table = 1;
repeated RegionRoute region_routes = 2;

View File

@@ -20,6 +20,9 @@ service Store {
// DeleteRange deletes the given range from the key-value store.
rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse);
// MoveValue atomically renames the key to the given updated key.
rpc MoveValue(MoveValueRequest) returns (MoveValueResponse);
}
message RangeRequest {
@@ -136,3 +139,21 @@ message DeleteRangeResponse {
// returned.
repeated KeyValue prev_kvs = 3;
}
message MoveValueRequest {
RequestHeader header = 1;
// If from_key dose not exist, return the value of to_key (if it exists).
// If from_key exists, move the value of from_key to to_key (i.e. rename),
// and return the value.
bytes from_key = 2;
bytes to_key = 3;
}
message MoveValueResponse {
ResponseHeader header = 1;
// If from_key dose not exist, return the value of to_key (if it exists).
// If from_key exists, return the value of from_key.
KeyValue kv = 2;
}

View File

@@ -145,10 +145,12 @@ gen_set_header!(HeartbeatRequest);
gen_set_header!(RouteRequest);
gen_set_header!(CreateRequest);
gen_set_header!(RangeRequest);
gen_set_header!(DeleteRequest);
gen_set_header!(PutRequest);
gen_set_header!(BatchPutRequest);
gen_set_header!(CompareAndPutRequest);
gen_set_header!(DeleteRangeRequest);
gen_set_header!(MoveValueRequest);
#[cfg(test)]
mod tests {

View File

@@ -27,10 +27,11 @@ use store::Client as StoreClient;
pub use self::heartbeat::{HeartbeatSender, HeartbeatStream};
use crate::error;
use crate::error::Result;
use crate::rpc::router::DeleteRequest;
use crate::rpc::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, CreateRequest,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
RouteRequest, RouteResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse, RouteRequest, RouteResponse,
};
pub type Id = (u64, u64);
@@ -206,6 +207,13 @@ impl MetaClient {
self.router_client()?.route(req.into()).await?.try_into()
}
/// Can be called repeatedly, the first call will delete and return the
/// table of routing information, the nth call can still return the
/// deleted route information.
pub async fn delete_route(&self, req: DeleteRequest) -> Result<RouteResponse> {
self.router_client()?.delete(req.into()).await?.try_into()
}
/// Range gets the keys in the range from the key-value store.
pub async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
self.store_client()?.range(req.into()).await?.try_into()
@@ -241,6 +249,14 @@ impl MetaClient {
.try_into()
}
/// MoveValue atomically renames the key to the given updated key.
pub async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
self.store_client()?
.move_value(req.into())
.await?
.try_into()
}
#[inline]
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
@@ -286,6 +302,52 @@ mod tests {
use crate::mocks;
use crate::rpc::{Partition, TableName};
const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
struct TestClient {
ns: String,
client: MetaClient,
}
impl TestClient {
async fn new(ns: impl Into<String>) -> Self {
// can also test with etcd: mocks::mock_client_with_etcdstore("127.0.0.1:2379").await;
let client = mocks::mock_client_with_memstore().await;
Self {
ns: ns.into(),
client,
}
}
fn key(&self, name: &str) -> Vec<u8> {
format!("{}-{}-{}", TEST_KEY_PREFIX, self.ns, name).into_bytes()
}
async fn gen_data(&self) {
for i in 0..10 {
let req = PutRequest::new()
.with_key(self.key(&format!("key-{}", i)))
.with_value(format!("{}-{}", "value", i).into_bytes())
.with_prev_kv();
let res = self.client.put(req).await;
assert!(res.is_ok());
}
}
async fn clear_data(&self) {
let req =
DeleteRangeRequest::new().with_prefix(format!("{}-{}", TEST_KEY_PREFIX, self.ns));
let res = self.client.delete_range(req).await;
assert!(res.is_ok());
}
}
async fn new_client(ns: impl Into<String>) -> TestClient {
let client = TestClient::new(ns).await;
client.clear_data().await;
client
}
#[tokio::test]
async fn test_meta_client_builder() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
@@ -373,15 +435,15 @@ mod tests {
#[tokio::test]
async fn test_ask_leader() {
let client = mocks::mock_client_with_memstore().await;
let res = client.ask_leader().await;
let tc = new_client("test_ask_leader").await;
let res = tc.client.ask_leader().await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_heartbeat() {
let client = mocks::mock_client_with_memstore().await;
let (sender, mut receiver) = client.heartbeat().await.unwrap();
let tc = new_client("test_heartbeat").await;
let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
// send heartbeats
tokio::spawn(async move {
for _ in 0..5 {
@@ -449,66 +511,58 @@ mod tests {
let res = client.create_route(req).await.unwrap();
assert_eq!(1, res.table_routes.len());
let req = RouteRequest::new().add_table_name(table_name);
let req = RouteRequest::new().add_table_name(table_name.clone());
let res = client.route(req).await.unwrap();
// empty table_routes since no TableGlobalValue is stored by datanode
assert!(res.table_routes.is_empty());
}
async fn gen_data(client: &MetaClient) {
for i in 0..10 {
let req = PutRequest::new()
.with_key(format!("{}-{}", "key", i).into_bytes())
.with_value(format!("{}-{}", "value", i).into_bytes())
.with_prev_kv();
let res = client.put(req).await;
assert!(res.is_ok());
}
let req = DeleteRequest::new(table_name.clone());
let res = client.delete_route(req).await;
// empty table_routes since no TableGlobalValue is stored by datanode
assert!(res.is_err());
}
#[tokio::test]
async fn test_range_get() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_range_get").await;
tc.gen_data().await;
gen_data(&client).await;
let req = RangeRequest::new().with_key(b"key-0".to_vec());
let res = client.range(req).await;
let key = tc.key("key-0");
let req = RangeRequest::new().with_key(key.as_slice());
let res = tc.client.range(req).await;
let mut kvs = res.unwrap().take_kvs();
assert_eq!(1, kvs.len());
let mut kv = kvs.pop().unwrap();
assert_eq!(b"key-0".to_vec(), kv.take_key());
assert_eq!(key, kv.take_key());
assert_eq!(b"value-0".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_range_get_prefix() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_range_get_prefix").await;
tc.gen_data().await;
gen_data(&client).await;
let req = RangeRequest::new().with_prefix(b"key-".to_vec());
let res = client.range(req).await;
let req = RangeRequest::new().with_prefix(tc.key("key-"));
let res = tc.client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(10, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "key", i).into_bytes(), kv.take_key());
assert_eq!(tc.key(&format!("key-{}", i)), kv.take_key());
assert_eq!(format!("{}-{}", "value", i).into_bytes(), kv.take_value());
}
}
#[tokio::test]
async fn test_range() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_range").await;
tc.gen_data().await;
gen_data(&client).await;
let req = RangeRequest::new().with_range(b"key-5".to_vec(), b"key-8".to_vec());
let res = client.range(req).await;
let req = RangeRequest::new().with_range(tc.key("key-5"), tc.key("key-8"));
let res = tc.client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(3, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key());
assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
assert_eq!(
format!("{}-{}", "value", i + 5).into_bytes(),
kv.take_value()
@@ -518,121 +572,129 @@ mod tests {
#[tokio::test]
async fn test_range_keys_only() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let tc = new_client("test_range_keys_only").await;
tc.gen_data().await;
let req = RangeRequest::new()
.with_range(b"key-5".to_vec(), b"key-8".to_vec())
.with_range(tc.key("key-5"), tc.key("key-8"))
.with_keys_only();
let res = client.range(req).await;
let res = tc.client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(3, kvs.len());
for (i, mut kv) in kvs.into_iter().enumerate() {
assert_eq!(format!("{}-{}", "key", i + 5).into_bytes(), kv.take_key());
assert_eq!(tc.key(&format!("key-{}", i + 5)), kv.take_key());
assert!(kv.take_value().is_empty());
}
}
#[tokio::test]
async fn test_put() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_put").await;
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_key(tc.key("key"))
.with_value(b"value".to_vec());
let res = client.put(req).await;
let res = tc.client.put(req).await;
assert!(res.unwrap().take_prev_kv().is_none());
}
#[tokio::test]
async fn test_put_with_prev_kv() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_put_with_prev_kv").await;
let key = tc.key("key");
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_key(key.as_slice())
.with_value(b"value".to_vec())
.with_prev_kv();
let res = client.put(req).await;
let res = tc.client.put(req).await;
assert!(res.unwrap().take_prev_kv().is_none());
let req = PutRequest::new()
.with_key(b"key".to_vec())
.with_key(key.as_slice())
.with_value(b"value1".to_vec())
.with_prev_kv();
let res = client.put(req).await;
let res = tc.client.put(req).await;
let mut kv = res.unwrap().take_prev_kv().unwrap();
assert_eq!(b"key".to_vec(), kv.take_key());
assert_eq!(key, kv.take_key());
assert_eq!(b"value".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_batch_put() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_batch_put").await;
let req = BatchPutRequest::new()
.add_kv(b"key".to_vec(), b"value".to_vec())
.add_kv(b"key2".to_vec(), b"value2".to_vec());
let res = client.batch_put(req).await;
.add_kv(tc.key("key"), b"value".to_vec())
.add_kv(tc.key("key2"), b"value2".to_vec());
let res = tc.client.batch_put(req).await;
assert_eq!(0, res.unwrap().take_prev_kvs().len());
let req = RangeRequest::new().with_range(b"key".to_vec(), b"key3".to_vec());
let res = client.range(req).await;
let req = RangeRequest::new().with_range(tc.key("key"), tc.key("key3"));
let res = tc.client.range(req).await;
let kvs = res.unwrap().take_kvs();
assert_eq!(2, kvs.len());
}
#[tokio::test]
async fn test_batch_put_with_prev_kv() {
let client = mocks::mock_client_with_memstore().await;
let req = BatchPutRequest::new().add_kv(b"key".to_vec(), b"value".to_vec());
let res = client.batch_put(req).await;
let tc = new_client("test_batch_put_with_prev_kv").await;
let key = tc.key("key");
let key2 = tc.key("key2");
let req = BatchPutRequest::new().add_kv(key.as_slice(), b"value".to_vec());
let res = tc.client.batch_put(req).await;
assert_eq!(0, res.unwrap().take_prev_kvs().len());
let req = BatchPutRequest::new()
.add_kv(b"key".to_vec(), b"value-".to_vec())
.add_kv(b"key2".to_vec(), b"value2-".to_vec())
.add_kv(key.as_slice(), b"value-".to_vec())
.add_kv(key2.as_slice(), b"value2-".to_vec())
.with_prev_kv();
let res = client.batch_put(req).await;
let res = tc.client.batch_put(req).await;
let mut kvs = res.unwrap().take_prev_kvs();
assert_eq!(1, kvs.len());
let mut kv = kvs.pop().unwrap();
assert_eq!(b"key".to_vec(), kv.take_key());
assert_eq!(key, kv.take_key());
assert_eq!(b"value".to_vec(), kv.take_value());
}
#[tokio::test]
async fn test_compare_and_put() {
let client = mocks::mock_client_with_memstore().await;
let tc = new_client("test_compare_and_put").await;
let key = tc.key("key");
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_key(key.as_slice())
.with_expect(b"expect".to_vec())
.with_value(b"value".to_vec());
let res = client.compare_and_put(req).await;
let res = tc.client.compare_and_put(req).await;
assert!(!res.unwrap().is_success());
// create if absent
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_key(key.as_slice())
.with_value(b"value".to_vec());
let res = client.compare_and_put(req).await;
let res = tc.client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(res.is_success());
assert!(res.take_prev_kv().is_none());
// compare and put fail
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_key(key.as_slice())
.with_expect(b"not_eq".to_vec())
.with_value(b"value2".to_vec());
let res = client.compare_and_put(req).await;
let res = tc.client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(!res.is_success());
assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
// compare and put success
let req = CompareAndPutRequest::new()
.with_key(b"key".to_vec())
.with_key(key.as_slice())
.with_expect(b"value".to_vec())
.with_value(b"value2".to_vec());
let res = client.compare_and_put(req).await;
let res = tc.client.compare_and_put(req).await;
let mut res = res.unwrap();
assert!(res.is_success());
assert_eq!(b"value".to_vec(), res.take_prev_kv().unwrap().take_value());
@@ -640,14 +702,13 @@ mod tests {
#[tokio::test]
async fn test_delete_with_key() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let tc = new_client("test_delete_with_key").await;
tc.gen_data().await;
let req = DeleteRangeRequest::new()
.with_key(b"key-0".to_vec())
.with_key(tc.key("key-0"))
.with_prev_kv();
let res = client.delete_range(req).await;
let res = tc.client.delete_range(req).await;
let mut res = res.unwrap();
assert_eq!(1, res.deleted());
let mut kvs = res.take_prev_kvs();
@@ -658,14 +719,13 @@ mod tests {
#[tokio::test]
async fn test_delete_with_prefix() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let tc = new_client("test_delete_with_prefix").await;
tc.gen_data().await;
let req = DeleteRangeRequest::new()
.with_prefix(b"key-".to_vec())
.with_prefix(tc.key("key-"))
.with_prev_kv();
let res = client.delete_range(req).await;
let res = tc.client.delete_range(req).await;
let mut res = res.unwrap();
assert_eq!(10, res.deleted());
let kvs = res.take_prev_kvs();
@@ -677,14 +737,13 @@ mod tests {
#[tokio::test]
async fn test_delete_with_range() {
let client = mocks::mock_client_with_memstore().await;
gen_data(&client).await;
let tc = new_client("test_delete_with_range").await;
tc.gen_data().await;
let req = DeleteRangeRequest::new()
.with_range(b"key-2".to_vec(), b"key-7".to_vec())
.with_range(tc.key("key-2"), tc.key("key-7"))
.with_prev_kv();
let res = client.delete_range(req).await;
let res = tc.client.delete_range(req).await;
let mut res = res.unwrap();
assert_eq!(5, res.deleted());
let kvs = res.take_prev_kvs();
@@ -696,4 +755,38 @@ mod tests {
);
}
}
#[tokio::test]
async fn test_move_value() {
let tc = new_client("test_move_value").await;
let from_key = tc.key("from_key");
let to_key = tc.key("to_key");
let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice());
let res = tc.client.move_value(req).await;
assert!(res.unwrap().take_kv().is_none());
let req = PutRequest::new()
.with_key(to_key.as_slice())
.with_value(b"value".to_vec());
let _ = tc.client.put(req).await;
let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice());
let res = tc.client.move_value(req).await;
let mut kv = res.unwrap().take_kv().unwrap();
assert_eq!(to_key.clone(), kv.take_key());
assert_eq!(b"value".to_vec(), kv.take_value());
let req = PutRequest::new()
.with_key(from_key.as_slice())
.with_value(b"value2".to_vec());
let _ = tc.client.put(req).await;
let req = MoveValueRequest::new(from_key.as_slice(), to_key.as_slice());
let res = tc.client.move_value(req).await;
let mut kv = res.unwrap().take_kv().unwrap();
assert_eq!(from_key, kv.take_key());
assert_eq!(b"value2".to_vec(), kv.take_value());
}
}

View File

@@ -16,7 +16,7 @@ use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::router_client::RouterClient;
use api::v1::meta::{CreateRequest, RouteRequest, RouteResponse};
use api::v1::meta::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::RwLock;
@@ -65,6 +65,11 @@ impl Client {
let inner = self.inner.read().await;
inner.route(req).await
}
pub async fn delete(&self, req: DeleteRequest) -> Result<RouteResponse> {
let inner = self.inner.read().await;
inner.delete(req).await
}
}
#[derive(Debug)]
@@ -98,6 +103,14 @@ impl Inner {
Ok(())
}
async fn create(&self, mut req: CreateRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
async fn route(&self, mut req: RouteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
@@ -106,10 +119,10 @@ impl Inner {
Ok(res.into_inner())
}
async fn create(&self, mut req: CreateRequest) -> Result<RouteResponse> {
async fn delete(&self, mut req: DeleteRequest) -> Result<RouteResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client.create(req).await.context(error::TonicStatusSnafu)?;
let res = client.delete(req).await.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}

View File

@@ -18,7 +18,8 @@ use std::sync::Arc;
use api::v1::meta::store_client::StoreClient;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use common_grpc::channel_manager::ChannelManager;
use snafu::{ensure, OptionExt, ResultExt};
@@ -86,6 +87,11 @@ impl Client {
let inner = self.inner.read().await;
inner.delete_range(req).await
}
pub async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
let inner = self.inner.read().await;
inner.move_value(req).await
}
}
#[derive(Debug)]
@@ -171,6 +177,17 @@ impl Inner {
Ok(res.into_inner())
}
async fn move_value(&self, mut req: MoveValueRequest) -> Result<MoveValueResponse> {
let mut client = self.random_client()?;
req.set_header(self.id);
let res = client
.move_value(req)
.await
.context(error::TonicStatusSnafu)?;
Ok(res.into_inner())
}
fn random_client(&self) -> Result<StoreClient<Channel>> {
let len = self.peers.len();
let peer = lb::random_get(len, |i| Some(&self.peers[i])).context(

View File

@@ -28,7 +28,8 @@ pub use router::{
use serde::{Deserialize, Serialize};
pub use store::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
#[derive(Debug, Clone)]

View File

@@ -15,8 +15,9 @@
use std::collections::HashMap;
use api::v1::meta::{
CreateRequest as PbCreateRequest, Partition as PbPartition, Region as PbRegion,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
CreateRequest as PbCreateRequest, DeleteRequest as PbDeleteRequest, Partition as PbPartition,
Region as PbRegion, RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse,
Table as PbTable,
};
use serde::{Deserialize, Serialize, Serializer};
use snafu::OptionExt;
@@ -25,6 +26,38 @@ use crate::error;
use crate::error::Result;
use crate::rpc::{util, Peer, TableName};
#[derive(Debug, Clone)]
pub struct CreateRequest {
pub table_name: TableName,
pub partitions: Vec<Partition>,
}
impl From<CreateRequest> for PbCreateRequest {
fn from(mut req: CreateRequest) -> Self {
Self {
header: None,
table_name: Some(req.table_name.into()),
partitions: req.partitions.drain(..).map(Into::into).collect(),
}
}
}
impl CreateRequest {
#[inline]
pub fn new(table_name: TableName) -> Self {
Self {
table_name,
partitions: vec![],
}
}
#[inline]
pub fn add_partition(mut self, partition: Partition) -> Self {
self.partitions.push(partition);
self
}
}
#[derive(Debug, Clone, Default)]
pub struct RouteRequest {
pub table_names: Vec<TableName>,
@@ -55,34 +88,23 @@ impl RouteRequest {
}
#[derive(Debug, Clone)]
pub struct CreateRequest {
pub struct DeleteRequest {
pub table_name: TableName,
pub partitions: Vec<Partition>,
}
impl From<CreateRequest> for PbCreateRequest {
fn from(mut req: CreateRequest) -> Self {
impl From<DeleteRequest> for PbDeleteRequest {
fn from(req: DeleteRequest) -> Self {
Self {
header: None,
table_name: Some(req.table_name.into()),
partitions: req.partitions.drain(..).map(Into::into).collect(),
}
}
}
impl CreateRequest {
impl DeleteRequest {
#[inline]
pub fn new(table_name: TableName) -> Self {
Self {
table_name,
partitions: vec![],
}
}
#[inline]
pub fn add_partition(mut self, partition: Partition) -> Self {
self.partitions.push(partition);
self
Self { table_name }
}
}
@@ -275,33 +297,14 @@ impl From<PbPartition> for Partition {
#[cfg(test)]
mod tests {
use api::v1::meta::{
Partition as PbPartition, Peer as PbPeer, Region as PbRegion, RegionRoute as PbRegionRoute,
RouteRequest as PbRouteRequest, RouteResponse as PbRouteResponse, Table as PbTable,
TableName as PbTableName, TableRoute as PbTableRoute,
DeleteRequest as PbDeleteRequest, Partition as PbPartition, Peer as PbPeer,
Region as PbRegion, RegionRoute as PbRegionRoute, RouteRequest as PbRouteRequest,
RouteResponse as PbRouteResponse, Table as PbTable, TableName as PbTableName,
TableRoute as PbTableRoute,
};
use super::*;
#[test]
fn test_route_request_trans() {
let req = RouteRequest {
table_names: vec![
TableName::new("c1", "s1", "t1"),
TableName::new("c2", "s2", "t2"),
],
};
let into_req: PbRouteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name);
assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name);
assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name);
assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name);
assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name);
assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name);
}
#[test]
fn test_create_request_trans() {
let req = CreateRequest {
@@ -343,6 +346,40 @@ mod tests {
);
}
#[test]
fn test_route_request_trans() {
let req = RouteRequest {
table_names: vec![
TableName::new("c1", "s1", "t1"),
TableName::new("c2", "s2", "t2"),
],
};
let into_req: PbRouteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!("c1", into_req.table_names.get(0).unwrap().catalog_name);
assert_eq!("s1", into_req.table_names.get(0).unwrap().schema_name);
assert_eq!("t1", into_req.table_names.get(0).unwrap().table_name);
assert_eq!("c2", into_req.table_names.get(1).unwrap().catalog_name);
assert_eq!("s2", into_req.table_names.get(1).unwrap().schema_name);
assert_eq!("t2", into_req.table_names.get(1).unwrap().table_name);
}
#[test]
fn test_delete_request_trans() {
let req = DeleteRequest {
table_name: TableName::new("c1", "s1", "t1"),
};
let into_req: PbDeleteRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!("c1", into_req.table_name.as_ref().unwrap().catalog_name);
assert_eq!("s1", into_req.table_name.as_ref().unwrap().schema_name);
assert_eq!("t1", into_req.table_name.as_ref().unwrap().table_name);
}
#[test]
fn test_route_response_trans() {
let res = PbRouteResponse {

View File

@@ -17,6 +17,7 @@ use api::v1::meta::{
CompareAndPutRequest as PbCompareAndPutRequest,
CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
DeleteRangeResponse as PbDeleteRangeResponse, KeyValue as PbKeyValue,
MoveValueRequest as PbMoveValueRequest, MoveValueResponse as PbMoveValueResponse,
PutRequest as PbPutRequest, PutResponse as PbPutResponse, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse,
};
@@ -511,6 +512,7 @@ impl DeleteRangeResponse {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn deleted(&self) -> i64 {
self.0.deleted
}
@@ -521,6 +523,65 @@ impl DeleteRangeResponse {
}
}
#[derive(Debug, Clone, Default)]
pub struct MoveValueRequest {
/// If from_key dose not exist, return the value of to_key (if it exists).
/// If from_key exists, move the value of from_key to to_key (i.e. rename),
/// and return the value.
pub from_key: Vec<u8>,
pub to_key: Vec<u8>,
}
impl From<MoveValueRequest> for PbMoveValueRequest {
fn from(req: MoveValueRequest) -> Self {
Self {
header: None,
from_key: req.from_key,
to_key: req.to_key,
}
}
}
impl MoveValueRequest {
#[inline]
pub fn new(from_key: impl Into<Vec<u8>>, to_key: impl Into<Vec<u8>>) -> Self {
Self {
from_key: from_key.into(),
to_key: to_key.into(),
}
}
}
#[derive(Debug, Clone)]
pub struct MoveValueResponse(PbMoveValueResponse);
impl TryFrom<PbMoveValueResponse> for MoveValueResponse {
type Error = error::Error;
fn try_from(pb: PbMoveValueResponse) -> Result<Self> {
util::check_response_header(pb.header.as_ref())?;
Ok(Self::new(pb))
}
}
impl MoveValueResponse {
#[inline]
pub fn new(res: PbMoveValueResponse) -> Self {
Self(res)
}
#[inline]
pub fn take_header(&mut self) -> Option<ResponseHeader> {
self.0.header.take().map(ResponseHeader::new)
}
#[inline]
pub fn take_kv(&mut self) -> Option<KeyValue> {
self.0.kv.take().map(KeyValue::new)
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::{
@@ -528,8 +589,10 @@ mod tests {
CompareAndPutRequest as PbCompareAndPutRequest,
CompareAndPutResponse as PbCompareAndPutResponse,
DeleteRangeRequest as PbDeleteRangeRequest, DeleteRangeResponse as PbDeleteRangeResponse,
KeyValue as PbKeyValue, PutRequest as PbPutRequest, PutResponse as PbPutResponse,
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
KeyValue as PbKeyValue, MoveValueRequest as PbMoveValueRequest,
MoveValueResponse as PbMoveValueResponse, PutRequest as PbPutRequest,
PutResponse as PbPutResponse, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse,
};
use super::*;
@@ -775,4 +838,35 @@ mod tests {
assert_eq!(b"v2".to_vec(), kv1.value().to_vec());
assert_eq!(b"v2".to_vec(), kv1.take_value());
}
#[test]
fn test_move_value_request_trans() {
let (from_key, to_key) = (b"test_key1".to_vec(), b"test_key2".to_vec());
let req = MoveValueRequest::new(from_key.clone(), to_key.clone());
let into_req: PbMoveValueRequest = req.into();
assert!(into_req.header.is_none());
assert_eq!(from_key, into_req.from_key);
assert_eq!(to_key, into_req.to_key);
}
#[test]
fn test_move_value_response_trans() {
let pb_res = PbMoveValueResponse {
header: None,
kv: Some(PbKeyValue {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
}),
};
let mut res = MoveValueResponse::new(pb_res);
assert!(res.take_header().is_none());
let mut kv = res.take_kv().unwrap();
assert_eq!(b"k1".to_vec(), kv.key().to_vec());
assert_eq!(b"k1".to_vec(), kv.take_key());
assert_eq!(b"v1".to_vec(), kv.value().to_vec());
assert_eq!(b"v1".to_vec(), kv.take_value());
}
}

View File

@@ -123,6 +123,15 @@ pub enum Error {
#[snafu(display("MetaSrv has no leader at this moment"))]
NoLeader { backtrace: Backtrace },
#[snafu(display("Table {} not found", name))]
TableNotFound { name: String, backtrace: Backtrace },
#[snafu(display(
"Failed to move the value of {} because other clients caused a race condition",
key
))]
MoveValue { key: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -162,7 +171,9 @@ impl ErrorExt for Error {
| Error::UnexceptedSequenceValue { .. }
| Error::TableRouteNotFound { .. }
| Error::NextSequence { .. }
| Error::MoveValue { .. }
| Error::InvalidTxnResult { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),
}
}

View File

@@ -24,6 +24,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error;
use crate::error::Result;
pub(crate) const REMOVED_PREFIX: &str = "__removed";
pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease";
pub(crate) const SEQ_PREFIX: &str = "__meta_seq";
pub(crate) const TABLE_ROUTE_PREFIX: &str = "__meta_table_route";
@@ -149,6 +150,7 @@ impl<'a> TableRouteKey<'a> {
}
}
#[inline]
pub fn prefix(&self) -> String {
format!(
"{}-{}-{}-{}",
@@ -156,9 +158,15 @@ impl<'a> TableRouteKey<'a> {
)
}
#[inline]
pub fn key(&self) -> String {
format!("{}-{}", self.prefix(), self.table_id)
}
#[inline]
pub fn removed_key(&self) -> String {
format!("{}-{}", REMOVED_PREFIX, self.key())
}
}
#[cfg(test)]

View File

@@ -205,6 +205,13 @@ mod tests {
) -> Result<api::v1::meta::DeleteRangeResponse> {
unreachable!()
}
async fn move_value(
&self,
_: api::v1::meta::MoveValueRequest,
) -> Result<api::v1::meta::MoveValueResponse> {
unreachable!()
}
}
let kv_store = Arc::new(Noop {});

View File

@@ -13,8 +13,9 @@
// limitations under the License.
use api::v1::meta::{
router_server, CreateRequest, Error, PeerDict, PutRequest, RangeRequest, Region, RegionRoute,
ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue,
router_server, CreateRequest, DeleteRequest, Error, MoveValueRequest, Peer, PeerDict,
PutRequest, RangeRequest, Region, RegionRoute, ResponseHeader, RouteRequest, RouteResponse,
Table, TableRoute, TableRouteValue,
};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_telemetry::warn;
@@ -31,14 +32,6 @@ use crate::service::GrpcResult;
#[async_trait::async_trait]
impl router_server::Router for MetaSrv {
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = self.new_ctx();
let res = handle_route(req, ctx).await?;
Ok(Response::new(res))
}
async fn create(&self, req: Request<CreateRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = self.new_ctx();
@@ -48,56 +41,22 @@ impl router_server::Router for MetaSrv {
Ok(Response::new(res))
}
}
async fn handle_route(req: RouteRequest, ctx: Context) -> Result<RouteResponse> {
let RouteRequest {
header,
table_names,
} = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey {
catalog_name: t.catalog_name,
schema_name: t.schema_name,
table_name: t.table_name,
});
let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?;
async fn route(&self, req: Request<RouteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = self.new_ctx();
let res = handle_route(req, ctx).await?;
let mut peer_dict = PeerDict::default();
let mut table_routes = vec![];
for (tg, tr) in tables {
let TableRouteValue {
peers,
mut table_route,
} = tr;
if let Some(table_route) = &mut table_route {
for rr in &mut table_route.region_routes {
if let Some(peer) = peers.get(rr.leader_peer_index as usize) {
rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64;
}
for index in &mut rr.follower_peer_indexes {
if let Some(peer) = peers.get(*index as usize) {
*index = peer_dict.get_or_insert(peer.clone()) as u64;
}
}
}
if let Some(table) = &mut table_route.table {
table.table_schema = tg.as_bytes().context(error::InvalidCatalogValueSnafu)?;
}
}
if let Some(table_route) = table_route {
table_routes.push(table_route)
}
Ok(Response::new(res))
}
let peers = peer_dict.into_peers();
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes,
})
async fn delete(&self, req: Request<DeleteRequest>) -> GrpcResult<RouteResponse> {
let req = req.into_inner();
let ctx = self.new_ctx();
let res = handle_delete(req, ctx).await?;
Ok(Response::new(res))
}
}
async fn handle_create(
@@ -169,6 +128,90 @@ async fn handle_create(
})
}
async fn handle_route(req: RouteRequest, ctx: Context) -> Result<RouteResponse> {
let RouteRequest {
header,
table_names,
} = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let table_global_keys = table_names.into_iter().map(|t| TableGlobalKey {
catalog_name: t.catalog_name,
schema_name: t.schema_name,
table_name: t.table_name,
});
let tables = fetch_tables(&ctx.kv_store, table_global_keys).await?;
let (peers, table_routes) = fill_table_routes(tables)?;
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes,
})
}
async fn handle_delete(req: DeleteRequest, ctx: Context) -> Result<RouteResponse> {
let DeleteRequest { header, table_name } = req;
let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id);
let tgk = table_name
.map(|t| TableGlobalKey {
catalog_name: t.catalog_name,
schema_name: t.schema_name,
table_name: t.table_name,
})
.context(error::EmptyTableNameSnafu)?;
let tgv = get_table_global_value(&ctx.kv_store, &tgk)
.await?
.with_context(|| error::TableNotFoundSnafu {
name: format!("{}", tgk),
})?;
let trk = TableRouteKey::with_table_global_key(tgv.table_id() as u64, &tgk);
let (_, trv) = remove_table_route_value(&ctx.kv_store, &trk).await?;
let (peers, table_routes) = fill_table_routes(vec![(tgv, trv)])?;
let header = Some(ResponseHeader::success(cluster_id));
Ok(RouteResponse {
header,
peers,
table_routes,
})
}
fn fill_table_routes(
tables: Vec<(TableGlobalValue, TableRouteValue)>,
) -> Result<(Vec<Peer>, Vec<TableRoute>)> {
let mut peer_dict = PeerDict::default();
let mut table_routes = vec![];
for (tgv, trv) in tables {
let TableRouteValue {
peers,
mut table_route,
} = trv;
if let Some(table_route) = &mut table_route {
for rr in &mut table_route.region_routes {
if let Some(peer) = peers.get(rr.leader_peer_index as usize) {
rr.leader_peer_index = peer_dict.get_or_insert(peer.clone()) as u64;
}
for index in &mut rr.follower_peer_indexes {
if let Some(peer) = peers.get(*index as usize) {
*index = peer_dict.get_or_insert(peer.clone()) as u64;
}
}
}
if let Some(table) = &mut table_route.table {
table.table_schema = tgv.as_bytes().context(error::InvalidCatalogValueSnafu)?;
}
}
if let Some(table_route) = table_route {
table_routes.push(table_route)
}
}
Ok((peer_dict.into_peers(), table_routes))
}
async fn fetch_tables(
kv_store: &KvStoreRef,
keys: impl Iterator<Item = TableGlobalKey>,
@@ -176,18 +219,18 @@ async fn fetch_tables(
let mut tables = vec![];
// Maybe we can optimize the for loop in the future, but in general,
// there won't be many keys, in fact, there is usually just one.
for tk in keys {
let tv = get_table_global_value(kv_store, &tk).await?;
if tv.is_none() {
warn!("Table global value is absent: {}", tk);
for tgk in keys {
let tgv = get_table_global_value(kv_store, &tgk).await?;
if tgv.is_none() {
warn!("Table global value is absent: {}", tgk);
continue;
}
let tv = tv.unwrap();
let tgv = tgv.unwrap();
let tr_key = TableRouteKey::with_table_global_key(tv.table_id() as u64, &tk);
let tr = get_table_route_value(kv_store, &tr_key).await?;
let trk = TableRouteKey::with_table_global_key(tgv.table_id() as u64, &tgk);
let trv = get_table_route_value(kv_store, &trk).await?;
tables.push((tv, tr));
tables.push((tgv, trv));
}
Ok(tables)
@@ -197,15 +240,32 @@ async fn get_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,
) -> Result<TableRouteValue> {
let tr = get_from_store(kv_store, key.key().into_bytes())
let trv = get_from_store(kv_store, key.key().into_bytes())
.await?
.context(error::TableRouteNotFoundSnafu { key: key.key() })?;
let tr: TableRouteValue = tr
let trv: TableRouteValue = trv
.as_slice()
.try_into()
.context(error::DecodeTableRouteSnafu)?;
Ok(tr)
Ok(trv)
}
async fn remove_table_route_value(
kv_store: &KvStoreRef,
key: &TableRouteKey<'_>,
) -> Result<(Vec<u8>, TableRouteValue)> {
let from_key = key.key().into_bytes();
let to_key = key.removed_key().into_bytes();
let v = move_value(kv_store, from_key, to_key)
.await?
.context(error::TableRouteNotFoundSnafu { key: key.key() })?;
let trv: TableRouteValue =
v.1.as_slice()
.try_into()
.context(error::DecodeTableRouteSnafu)?;
Ok((v.0, trv))
}
async fn get_table_global_value(
@@ -223,6 +283,23 @@ async fn get_table_global_value(
}
}
async fn move_value(
kv_store: &KvStoreRef,
from_key: impl Into<Vec<u8>>,
to_key: impl Into<Vec<u8>>,
) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let from_key = from_key.into();
let to_key = to_key.into();
let move_req = MoveValueRequest {
from_key,
to_key,
..Default::default()
};
let res = kv_store.move_value(move_req).await?;
Ok(res.kv.map(|kv| (kv.key, kv.value)))
}
async fn put_into_store(
kv_store: &KvStoreRef,
key: impl Into<Vec<u8>>,

View File

@@ -18,7 +18,8 @@ pub mod memory;
use api::v1::meta::{
store_server, BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use tonic::{Request, Response};
@@ -67,6 +68,13 @@ impl store_server::Store for MetaSrv {
Ok(Response::new(res))
}
async fn move_value(&self, req: Request<MoveValueRequest>) -> GrpcResult<MoveValueResponse> {
let req = req.into_inner();
let res = self.kv_store().move_value(req).await?;
Ok(Response::new(res))
}
}
#[cfg(test)]
@@ -130,4 +138,14 @@ mod tests {
assert!(res.is_ok());
}
#[tokio::test]
async fn test_move_value() {
let kv_store = Arc::new(MemStore::new());
let meta_srv = MetaSrv::new(MetaSrvOptions::default(), kv_store, None, None).await;
let req = MoveValueRequest::default();
let res = meta_srv.move_value(req.into_request()).await;
assert!(res.is_ok());
}
}

View File

@@ -16,10 +16,11 @@ use std::sync::Arc;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, KeyValue, PutRequest, PutResponse, RangeRequest,
RangeResponse, ResponseHeader,
DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse,
PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
};
use common_error::prelude::*;
use common_telemetry::warn;
use etcd_client::{
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
};
@@ -63,11 +64,7 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let kvs = res
.kvs()
.iter()
.map(|kv| KvPair::new(kv).into())
.collect::<Vec<_>>();
let kvs = res.kvs().iter().map(KvPair::to_kv).collect::<Vec<_>>();
let header = Some(ResponseHeader::success(cluster_id));
Ok(RangeResponse {
@@ -92,7 +89,7 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let prev_kv = res.prev_key().map(|kv| KvPair::new(kv).into());
let prev_kv = res.prev_key().map(KvPair::to_kv);
let header = Some(ResponseHeader::success(cluster_id));
Ok(PutResponse { header, prev_kv })
@@ -123,7 +120,7 @@ impl KvStore for EtcdStore {
match op_res {
TxnOpResponse::Put(put_res) => {
if let Some(prev_kv) = put_res.prev_key() {
prev_kvs.push(KvPair::new(prev_kv).into());
prev_kvs.push(KvPair::to_kv(prev_kv));
}
}
_ => unreachable!(), // never get here
@@ -140,20 +137,23 @@ impl KvStore for EtcdStore {
key,
expect,
value,
options,
put_options,
} = req.try_into()?;
let put_op = vec![TxnOp::put(key.clone(), value, options)];
let get_op = vec![TxnOp::get(key.clone(), None)];
let mut txn = if expect.is_empty() {
let compare = if expect.is_empty() {
// create if absent
// revision 0 means key was not exist
Txn::new().when(vec![Compare::create_revision(key, CompareOp::Equal, 0)])
Compare::create_revision(key.clone(), CompareOp::Equal, 0)
} else {
// compare and put
Txn::new().when(vec![Compare::value(key, CompareOp::Equal, expect)])
Compare::value(key.clone(), CompareOp::Equal, expect)
};
txn = txn.and_then(put_op).or_else(get_op);
let put = TxnOp::put(key.clone(), value, put_options);
let get = TxnOp::get(key, None);
let txn = Txn::new()
.when(vec![compare])
.and_then(vec![put])
.or_else(vec![get]);
let txn_res = self
.client
@@ -171,23 +171,8 @@ impl KvStore for EtcdStore {
})?;
let prev_kv = match op_res {
TxnOpResponse::Put(put_res) => {
put_res.prev_key().map(|kv| KeyValue::from(KvPair::new(kv)))
}
TxnOpResponse::Get(get_res) => {
if get_res.count() == 0 {
// do not exists
None
} else {
ensure!(
get_res.count() == 1,
error::InvalidTxnResultSnafu {
err_msg: format!("expect 1 response, actual {}", get_res.count())
}
);
Some(KeyValue::from(KvPair::new(&get_res.kvs()[0])))
}
}
TxnOpResponse::Put(res) => res.prev_key().map(KvPair::to_kv),
TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::to_kv),
_ => unreachable!(), // never get here
};
@@ -213,11 +198,7 @@ impl KvStore for EtcdStore {
.await
.context(error::EtcdFailedSnafu)?;
let prev_kvs = res
.prev_kvs()
.iter()
.map(|kv| KvPair::new(kv).into())
.collect::<Vec<_>>();
let prev_kvs = res.prev_kvs().iter().map(KvPair::to_kv).collect::<Vec<_>>();
let header = Some(ResponseHeader::success(cluster_id));
Ok(DeleteRangeResponse {
@@ -226,6 +207,83 @@ impl KvStore for EtcdStore {
prev_kvs,
})
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
let MoveValue {
cluster_id,
from_key,
to_key,
delete_options,
} = req.try_into()?;
let mut client = self.client.kv_client();
let header = Some(ResponseHeader::success(cluster_id));
// TODO(jiachun): Maybe it's better to let the users control it in the request
const MAX_RETRIES: usize = 8;
for _ in 0..MAX_RETRIES {
let from_key = from_key.as_slice();
let to_key = to_key.as_slice();
let res = client
.get(from_key, None)
.await
.context(error::EtcdFailedSnafu)?;
let txn = match res.kvs().first() {
None => {
// get `to_key` if `from_key` absent
// revision 0 means key was not exist
let compare = Compare::create_revision(from_key, CompareOp::Equal, 0);
let get = TxnOp::get(to_key, None);
Txn::new().when(vec![compare]).and_then(vec![get])
}
Some(kv) => {
// compare `from_key` and move to `to_key`
let value = kv.value();
let compare = Compare::value(from_key, CompareOp::Equal, value);
let delete = TxnOp::delete(from_key, delete_options.clone());
let put = TxnOp::put(to_key, value, None);
Txn::new().when(vec![compare]).and_then(vec![delete, put])
}
};
let txn_res = client.txn(txn).await.context(error::EtcdFailedSnafu)?;
if !txn_res.succeeded() {
warn!(
"Failed to atomically move {:?} to {:?}, try again...",
String::from_utf8_lossy(from_key),
String::from_utf8_lossy(to_key)
);
continue;
}
// [`get_res'] or [`delete_res`, `put_res`], `put_res` will be ignored.
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Get(res) => {
return Ok(MoveValueResponse {
header,
kv: res.kvs().first().map(KvPair::to_kv),
});
}
TxnOpResponse::Delete(res) => {
return Ok(MoveValueResponse {
header,
kv: res.prev_kvs().first().map(KvPair::to_kv),
});
}
_ => {}
}
}
}
error::MoveValueSnafu {
key: String::from_utf8_lossy(&from_key),
}
.fail()
}
}
struct Get {
@@ -333,7 +391,7 @@ struct CompareAndPut {
key: Vec<u8>,
expect: Vec<u8>,
value: Vec<u8>,
options: Option<PutOptions>,
put_options: Option<PutOptions>,
}
impl TryFrom<CompareAndPutRequest> for CompareAndPut {
@@ -352,7 +410,7 @@ impl TryFrom<CompareAndPutRequest> for CompareAndPut {
key,
expect,
value,
options: Some(PutOptions::default().with_prev_key()),
put_options: Some(PutOptions::default().with_prev_key()),
})
}
}
@@ -392,6 +450,32 @@ impl TryFrom<DeleteRangeRequest> for Delete {
}
}
struct MoveValue {
cluster_id: u64,
from_key: Vec<u8>,
to_key: Vec<u8>,
delete_options: Option<DeleteOptions>,
}
impl TryFrom<MoveValueRequest> for MoveValue {
type Error = error::Error;
fn try_from(req: MoveValueRequest) -> Result<Self> {
let MoveValueRequest {
header,
from_key,
to_key,
} = req;
Ok(MoveValue {
cluster_id: header.map_or(0, |h| h.cluster_id),
from_key,
to_key,
delete_options: Some(DeleteOptions::default().with_prev_key()),
})
}
}
struct KvPair<'a>(&'a etcd_client::KeyValue);
impl<'a> KvPair<'a> {
@@ -400,6 +484,11 @@ impl<'a> KvPair<'a> {
fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}
#[inline]
fn to_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}
impl<'a> From<KvPair<'a>> for KeyValue {
@@ -479,7 +568,7 @@ mod tests {
assert_eq!(b"test_key".to_vec(), compare_and_put.key);
assert_eq!(b"test_expect".to_vec(), compare_and_put.expect);
assert_eq!(b"test_value".to_vec(), compare_and_put.value);
assert!(compare_and_put.options.is_some());
assert!(compare_and_put.put_options.is_some());
}
#[test]
@@ -496,4 +585,19 @@ mod tests {
assert_eq!(b"test_key".to_vec(), delete.key);
assert!(delete.options.is_some());
}
#[test]
fn test_parse_move_value() {
let req = MoveValueRequest {
from_key: b"test_from_key".to_vec(),
to_key: b"test_to_key".to_vec(),
..Default::default()
};
let move_value: MoveValue = req.try_into().unwrap();
assert_eq!(b"test_from_key".to_vec(), move_value.from_key);
assert_eq!(b"test_to_key".to_vec(), move_value.to_key);
assert!(move_value.delete_options.is_some());
}
}

View File

@@ -16,7 +16,8 @@ use std::sync::Arc;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest,
PutResponse, RangeRequest, RangeResponse,
};
use crate::error::Result;
@@ -34,4 +35,6 @@ pub trait KvStore: Send + Sync {
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse>;
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse>;
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse>;
}

View File

@@ -19,8 +19,8 @@ use std::sync::Arc;
use api::v1::meta::{
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, KeyValue, PutRequest, PutResponse, RangeRequest,
RangeResponse, ResponseHeader,
DeleteRangeRequest, DeleteRangeResponse, KeyValue, MoveValueRequest, MoveValueResponse,
PutRequest, PutResponse, RangeRequest, RangeResponse, ResponseHeader,
};
use parking_lot::RwLock;
@@ -219,4 +219,28 @@ impl KvStore for MemStore {
},
})
}
async fn move_value(&self, req: MoveValueRequest) -> Result<MoveValueResponse> {
let MoveValueRequest {
header,
from_key,
to_key,
} = req;
let mut memory = self.inner.write();
let kv = match memory.remove(&from_key) {
Some(v) => {
memory.insert(to_key, v.clone());
Some((from_key, v))
}
None => memory.get(&to_key).map(|v| (to_key, v.clone())),
};
let kv = kv.map(|(key, value)| KeyValue { key, value });
let cluster_id = header.map_or(0, |h| h.cluster_id);
let header = Some(ResponseHeader::success(cluster_id));
Ok(MoveValueResponse { header, kv })
}
}