feat: txn for meta (#1828)

* feat: txn for meta kvstore

* feat: txn

* chore: add unit test

* chore: more test

* chore: more test

* Update src/meta-srv/src/service/store/memory.rs

Co-authored-by: LFC <bayinamine@gmail.com>

* chore: by cr

---------

Co-authored-by: LFC <bayinamine@gmail.com>
This commit is contained in:
JeremyHi
2023-06-26 17:12:48 +08:00
committed by GitHub
parent 034564fd27
commit 78b07996b1
10 changed files with 758 additions and 27 deletions

View File

@@ -360,6 +360,9 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Etcd txn got an error: {err_msg}"))]
EtcdTxnOpResponse { err_msg: String, location: Location },
// this error is used for custom error mapping
// please do not delete it
#[snafu(display("Other error, source: {}", source))]
@@ -437,6 +440,7 @@ impl ErrorExt for Error {
| Error::InvalidTxnResult { .. }
| Error::InvalidUtf8Value { .. }
| Error::UnexpectedInstructionReply { .. }
| Error::EtcdTxnOpResponse { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),

View File

@@ -15,6 +15,7 @@
pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_TXN_REQUEST: &str = "meta.txn_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";

View File

@@ -165,6 +165,7 @@ mod tests {
use super::*;
use crate::service::store::kv::KvStore;
use crate::service::store::memory::MemStore;
use crate::service::store::txn::TxnService;
#[tokio::test]
async fn test_sequence() {
@@ -199,6 +200,8 @@ mod tests {
async fn test_sequence_force_quit() {
struct Noop;
impl TxnService for Noop {}
#[async_trait::async_trait]
impl KvStore for Noop {
async fn range(

View File

@@ -13,9 +13,11 @@
// limitations under the License.
pub mod etcd;
pub(crate) mod etcd_util;
pub mod ext;
pub mod kv;
pub mod memory;
pub mod txn;
use api::v1::meta::{
store_server, BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,

View File

@@ -29,8 +29,10 @@ use etcd_client::{
use crate::error;
use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::metrics::{METRIC_META_KV_REQUEST, METRIC_META_TXN_REQUEST};
use crate::service::store::etcd_util::KvPair;
use crate::service::store::kv::{KvStore, KvStoreRef};
use crate::service::store::txn::TxnService;
// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
@@ -463,6 +465,28 @@ impl KvStore for EtcdStore {
}
}
#[async_trait::async_trait]
impl TxnService for EtcdStore {
async fn txn(
&self,
txn: crate::service::store::txn::Txn,
) -> Result<crate::service::store::txn::TxnResponse> {
let _timer = timer!(
METRIC_META_TXN_REQUEST,
&[("target", "etcd".to_string()), ("op", "txn".to_string()),]
);
let etcd_txn: Txn = txn.into();
let txn_res = self
.client
.kv_client()
.txn(etcd_txn)
.await
.context(error::EtcdFailedSnafu)?;
txn_res.try_into()
}
}
struct Get {
cluster_id: u64,
key: Vec<u8>,
@@ -704,30 +728,6 @@ impl TryFrom<MoveValueRequest> for MoveValue {
}
}
struct KvPair<'a>(&'a etcd_client::KeyValue);
impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}
#[inline]
fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}
impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,39 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::KeyValue;
pub struct KvPair<'a>(&'a etcd_client::KeyValue);
impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
pub fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}
#[inline]
pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}
impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}

View File

@@ -22,12 +22,13 @@ use api::v1::meta::{
};
use crate::error::Result;
use crate::service::store::txn::TxnService;
pub type KvStoreRef = Arc<dyn KvStore>;
pub type ResettableKvStoreRef = Arc<dyn ResettableKvStore>;
#[async_trait::async_trait]
pub trait KvStore: Send + Sync {
pub trait KvStore: TxnService {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse>;
async fn put(&self, req: PutRequest) -> Result<PutResponse>;

View File

@@ -27,8 +27,9 @@ use parking_lot::RwLock;
use super::ext::KvStoreExt;
use crate::error::Result;
use crate::metrics::METRIC_META_KV_REQUEST;
use crate::metrics::{METRIC_META_KV_REQUEST, METRIC_META_TXN_REQUEST};
use crate::service::store::kv::{KvStore, ResettableKvStore};
use crate::service::store::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse, TxnService};
pub struct MemStore {
inner: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
@@ -119,6 +120,7 @@ impl KvStore for MemStore {
} = req;
let mut memory = self.inner.write();
let prev_value = memory.insert(key.clone(), value);
let prev_kv = if prev_kv {
prev_value.map(|value| KeyValue { key, value })
@@ -164,6 +166,7 @@ impl KvStore for MemStore {
} = req;
let mut memory = self.inner.write();
let prev_kvs = if prev_kv {
kvs.into_iter()
.map(|kv| (kv.key.clone(), memory.insert(kv.key, kv.value)))
@@ -198,6 +201,7 @@ impl KvStore for MemStore {
} = req;
let mut memory = self.inner.write();
let prev_kvs = if prev_kv {
keys.into_iter()
.filter_map(|key| memory.remove(&key).map(|value| KeyValue { key, value }))
@@ -330,6 +334,72 @@ impl KvStore for MemStore {
}
}
#[async_trait::async_trait]
impl TxnService for MemStore {
async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
let _timer = timer!(
METRIC_META_TXN_REQUEST,
&[("target", "memory".to_string()), ("op", "txn".to_string()),]
);
let TxnRequest {
compare,
success,
failure,
} = txn.into();
let mut memory = self.inner.write();
let succeeded = compare
.iter()
.all(|x| x.compare_with_value(memory.get(&x.key)));
let do_txn = |txn_op| match txn_op {
TxnOp::Put(key, value) => {
let prev_value = memory.insert(key.clone(), value);
let prev_kv = prev_value.map(|value| KeyValue { key, value });
let put_res = PutResponse {
prev_kv,
..Default::default()
};
TxnOpResponse::ResponsePut(put_res)
}
TxnOp::Get(key) => {
let value = memory.get(&key);
let kv = value.map(|value| KeyValue {
key,
value: value.clone(),
});
let get_res = RangeResponse {
kvs: kv.map(|kv| vec![kv]).unwrap_or(vec![]),
..Default::default()
};
TxnOpResponse::ResponseGet(get_res)
}
TxnOp::Delete(key) => {
let prev_value = memory.remove(&key);
let prev_kv = prev_value.map(|value| KeyValue { key, value });
let delete_res = DeleteRangeResponse {
prev_kvs: prev_kv.map(|kv| vec![kv]).unwrap_or(vec![]),
..Default::default()
};
TxnOpResponse::ResponseDelete(delete_res)
}
};
let responses: Vec<_> = if succeeded {
success.into_iter().map(do_txn).collect()
} else {
failure.into_iter().map(do_txn).collect()
};
Ok(TxnResponse {
succeeded,
responses,
})
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};

View File

@@ -0,0 +1,469 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{DeleteRangeResponse, PutResponse, RangeResponse};
use crate::error::Result;
mod etcd;
#[async_trait::async_trait]
pub trait TxnService: Sync + Send {
async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
unimplemented!("txn is not implemented")
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum CompareOp {
Equal,
Greater,
Less,
NotEqual,
}
#[derive(Debug, Clone, PartialEq)]
pub struct Compare {
pub key: Vec<u8>,
pub cmp: CompareOp,
/// None means the key does not exist.
pub target: Option<Vec<u8>>,
}
impl Compare {
pub fn new(key: Vec<u8>, cmp: CompareOp, target: Option<Vec<u8>>) -> Self {
Self { key, cmp, target }
}
pub fn with_value(key: Vec<u8>, cmp: CompareOp, target: Vec<u8>) -> Self {
Self::new(key, cmp, Some(target))
}
pub fn with_not_exist_value(key: Vec<u8>, cmp: CompareOp) -> Self {
Self::new(key, cmp, None)
}
pub fn compare_with_value(&self, value: Option<&Vec<u8>>) -> bool {
match (value, &self.target) {
(Some(value), Some(target)) => match self.cmp {
CompareOp::Equal => *value == *target,
CompareOp::Greater => *value > *target,
CompareOp::Less => *value < *target,
CompareOp::NotEqual => *value != *target,
},
(Some(_), None) => match self.cmp {
CompareOp::Equal => false,
CompareOp::Greater => true,
CompareOp::Less => false,
CompareOp::NotEqual => true,
},
(None, Some(_)) => match self.cmp {
CompareOp::Equal => false,
CompareOp::Greater => false,
CompareOp::Less => true,
CompareOp::NotEqual => true,
},
(None, None) => match self.cmp {
CompareOp::Equal => true,
CompareOp::Greater => false,
CompareOp::Less => false,
CompareOp::NotEqual => false,
},
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum TxnOp {
Put(Vec<u8>, Vec<u8>),
Get(Vec<u8>),
Delete(Vec<u8>),
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct TxnRequest {
pub compare: Vec<Compare>,
pub success: Vec<TxnOp>,
pub failure: Vec<TxnOp>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TxnOpResponse {
ResponsePut(PutResponse),
ResponseGet(RangeResponse),
ResponseDelete(DeleteRangeResponse),
}
pub struct TxnResponse {
pub succeeded: bool,
pub responses: Vec<TxnOpResponse>,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Txn {
req: TxnRequest,
c_when: bool,
c_then: bool,
c_else: bool,
}
impl Txn {
pub fn new() -> Self {
Txn::default()
}
/// Takes a list of comparison. If all comparisons passed in succeed,
/// the operations passed into `and_then()` will be executed. Or the operations
/// passed into `or_else()` will be executed.
#[inline]
pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
assert!(!self.c_when, "cannot call `when` twice");
assert!(!self.c_then, "cannot call `when` after `and_then`");
assert!(!self.c_else, "cannot call `when` after `or_else`");
self.c_when = true;
self.req.compare = compares.into();
self
}
/// Takes a list of operations. The operations list will be executed, if the
/// comparisons passed into `when()` succeed.
#[inline]
pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
assert!(!self.c_then, "cannot call `and_then` twice");
assert!(!self.c_else, "cannot call `and_then` after `or_else`");
self.c_then = true;
self.req.success = operations.into();
self
}
/// Takes a list of operations. The operations list will be executed, if the
/// comparisons passed into `when()` fail.
#[inline]
pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
assert!(!self.c_else, "cannot call `or_else` twice");
self.c_else = true;
self.req.failure = operations.into();
self
}
}
impl From<Txn> for TxnRequest {
fn from(txn: Txn) -> Self {
txn.req
}
}
#[cfg(test)]
mod tests {
use api::v1::meta::{KeyValue, PutRequest};
use super::*;
use crate::service::store::ext::KvStoreExt;
use crate::service::store::kv::KvStoreRef;
#[test]
fn test_compare() {
// Equal
let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]);
assert!(compare.compare_with_value(Some(&vec![1])));
assert!(!compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Equal);
assert!(compare.compare_with_value(None));
// Greater
let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]);
assert!(compare.compare_with_value(Some(&vec![2])));
assert!(!compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Greater);
assert!(!compare.compare_with_value(None));
assert!(compare.compare_with_value(Some(&vec![1])));
// Less
let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]);
assert!(compare.compare_with_value(Some(&vec![0])));
assert!(compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::Less);
assert!(!compare.compare_with_value(None));
assert!(!compare.compare_with_value(Some(&vec![1])));
// NotEqual
let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]);
assert!(!compare.compare_with_value(Some(&vec![1])));
assert!(compare.compare_with_value(Some(&vec![2])));
assert!(compare.compare_with_value(None));
let compare = Compare::with_not_exist_value(vec![1], CompareOp::NotEqual);
assert!(!compare.compare_with_value(None));
assert!(compare.compare_with_value(Some(&vec![1])));
}
#[test]
fn test_txn() {
let txn = Txn::new()
.when(vec![Compare::with_value(
vec![1],
CompareOp::Equal,
vec![1],
)])
.and_then(vec![TxnOp::Put(vec![1], vec![1])])
.or_else(vec![TxnOp::Put(vec![1], vec![2])]);
assert_eq!(
txn,
Txn {
req: TxnRequest {
compare: vec![Compare::with_value(vec![1], CompareOp::Equal, vec![1])],
success: vec![TxnOp::Put(vec![1], vec![1])],
failure: vec![TxnOp::Put(vec![1], vec![2])],
},
c_when: true,
c_then: true,
c_else: true,
}
);
}
#[tokio::test]
async fn test_txn_one_compare_op() {
let kv_store = create_kv_store().await;
let _ = kv_store
.put(PutRequest {
key: vec![11],
value: vec![3],
..Default::default()
})
.await
.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_value(
vec![11],
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(vec![11], vec![1])])
.or_else(vec![TxnOp::Put(vec![11], vec![2])]);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 1);
}
#[tokio::test]
async fn test_txn_multi_compare_op() {
let kv_store = create_kv_store().await;
for i in 1..3 {
let _ = kv_store
.put(PutRequest {
key: vec![i],
value: vec![i],
..Default::default()
})
.await
.unwrap();
}
let when: Vec<_> = (1..3u8)
.map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
.collect();
let txn = Txn::new()
.when(when)
.and_then(vec![
TxnOp::Put(vec![1], vec![10]),
TxnOp::Put(vec![2], vec![20]),
])
.or_else(vec![TxnOp::Put(vec![1], vec![11])]);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
assert_eq!(txn_response.responses.len(), 2);
}
#[tokio::test]
async fn test_txn_compare_equal() {
let kv_store = create_kv_store().await;
let key = vec![101u8];
kv_store.delete(key.clone(), false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Equal,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Put(key, vec![4])]);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
}
#[tokio::test]
async fn test_txn_compare_greater() {
let kv_store = create_kv_store().await;
let key = vec![102u8];
kv_store.delete(key.clone(), false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Greater,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Greater,
vec![1],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
header: None,
kvs: vec![KeyValue {
key,
value: vec![1],
}],
more: false,
})
);
}
#[tokio::test]
async fn test_txn_compare_less() {
let kv_store = create_kv_store().await;
let key = vec![103u8];
kv_store.delete(vec![3], false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::Less,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Less,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
header: None,
kvs: vec![KeyValue {
key,
value: vec![2],
}],
more: false,
})
);
}
#[tokio::test]
async fn test_txn_compare_not_equal() {
let kv_store = create_kv_store().await;
let key = vec![104u8];
kv_store.delete(key.clone(), false).await.unwrap();
let txn = Txn::new()
.when(vec![Compare::with_not_exist_value(
key.clone(),
CompareOp::NotEqual,
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![1])])
.or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
let txn_response = kv_store.txn(txn.clone()).await.unwrap();
assert!(!txn_response.succeeded);
let txn_response = kv_store.txn(txn).await.unwrap();
assert!(txn_response.succeeded);
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
vec![2],
)])
.and_then(vec![TxnOp::Put(key.clone(), vec![3])])
.or_else(vec![TxnOp::Get(key.clone())]);
let mut txn_response = kv_store.txn(txn).await.unwrap();
assert!(!txn_response.succeeded);
let res = txn_response.responses.pop().unwrap();
assert_eq!(
res,
TxnOpResponse::ResponseGet(RangeResponse {
header: None,
kvs: vec![KeyValue {
key,
value: vec![1],
}],
more: false,
})
);
}
async fn create_kv_store() -> KvStoreRef {
std::sync::Arc::new(crate::service::store::memory::MemStore::new())
// TODO(jiachun): Add a feature to test against etcd in github CI
//
// The same test can be run against etcd by uncommenting the following line
// crate::service::store::etcd::EtcdStore::with_endpoints(["127.0.0.1:2379"])
// .await
// .unwrap()
}
}

View File

@@ -0,0 +1,142 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::{DeleteRangeResponse, PutResponse, RangeResponse};
use etcd_client::{
Compare as EtcdCompare, CompareOp as EtcdCompareOp, Txn as EtcdTxn, TxnOp as EtcdTxnOp,
TxnOpResponse as EtcdTxnOpResponse, TxnResponse as EtcdTxnResponse,
};
use crate::error::{self, Result};
use crate::service::store::etcd_util::KvPair;
use crate::service::store::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse};
impl From<Txn> for EtcdTxn {
fn from(txn: Txn) -> Self {
let mut etcd_txn = EtcdTxn::new();
if txn.c_when {
let compares = txn
.req
.compare
.into_iter()
.map(EtcdCompare::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.when(compares);
}
if txn.c_then {
let success = txn
.req
.success
.into_iter()
.map(EtcdTxnOp::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.and_then(success);
}
if txn.c_else {
let failure = txn
.req
.failure
.into_iter()
.map(EtcdTxnOp::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.or_else(failure);
}
etcd_txn
}
}
impl From<Compare> for EtcdCompare {
fn from(cmp: Compare) -> Self {
let etcd_cmp = match cmp.cmp {
CompareOp::Equal => EtcdCompareOp::Equal,
CompareOp::Greater => EtcdCompareOp::Greater,
CompareOp::Less => EtcdCompareOp::Less,
CompareOp::NotEqual => EtcdCompareOp::NotEqual,
};
match cmp.target {
Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target),
// create revision 0 means key was not exist
None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0),
}
}
}
impl From<TxnOp> for EtcdTxnOp {
fn from(op: TxnOp) -> Self {
match op {
TxnOp::Put(key, value) => EtcdTxnOp::put(key, value, None),
TxnOp::Get(key) => EtcdTxnOp::get(key, None),
TxnOp::Delete(key) => EtcdTxnOp::delete(key, None),
}
}
}
impl TryFrom<EtcdTxnOpResponse> for TxnOpResponse {
type Error = error::Error;
fn try_from(op_resp: EtcdTxnOpResponse) -> Result<Self> {
match op_resp {
EtcdTxnOpResponse::Put(res) => {
let prev_kv = res.prev_key().map(KvPair::from_etcd_kv);
let put_res = PutResponse {
prev_kv,
..Default::default()
};
Ok(TxnOpResponse::ResponsePut(put_res))
}
EtcdTxnOpResponse::Get(res) => {
let kvs = res.kvs().iter().map(KvPair::from_etcd_kv).collect();
let range_res = RangeResponse {
kvs,
..Default::default()
};
Ok(TxnOpResponse::ResponseGet(range_res))
}
EtcdTxnOpResponse::Delete(res) => {
let prev_kvs = res
.prev_kvs()
.iter()
.map(KvPair::from_etcd_kv)
.collect::<Vec<_>>();
let delete_res = DeleteRangeResponse {
prev_kvs,
deleted: res.deleted(),
..Default::default()
};
Ok(TxnOpResponse::ResponseDelete(delete_res))
}
EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu {
err_msg: "nested txn is not supported",
}
.fail(),
}
}
}
impl TryFrom<EtcdTxnResponse> for TxnResponse {
type Error = error::Error;
fn try_from(resp: EtcdTxnResponse) -> Result<Self> {
let succeeded = resp.succeeded();
let responses = resp
.op_responses()
.into_iter()
.map(TxnOpResponse::try_from)
.collect::<Result<Vec<_>>>()?;
Ok(Self {
succeeded,
responses,
})
}
}