From 78b07996b1afb75c6f4448710fa73343fd9c65fc Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Mon, 26 Jun 2023 17:12:48 +0800 Subject: [PATCH] feat: txn for meta (#1828) * feat: txn for meta kvstore * feat: txn * chore: add unit test * chore: more test * chore: more test * Update src/meta-srv/src/service/store/memory.rs Co-authored-by: LFC * chore: by cr --------- Co-authored-by: LFC --- src/meta-srv/src/error.rs | 4 + src/meta-srv/src/metrics.rs | 1 + src/meta-srv/src/sequence.rs | 3 + src/meta-srv/src/service/store.rs | 2 + src/meta-srv/src/service/store/etcd.rs | 50 +-- src/meta-srv/src/service/store/etcd_util.rs | 39 ++ src/meta-srv/src/service/store/kv.rs | 3 +- src/meta-srv/src/service/store/memory.rs | 72 ++- src/meta-srv/src/service/store/txn.rs | 469 ++++++++++++++++++++ src/meta-srv/src/service/store/txn/etcd.rs | 142 ++++++ 10 files changed, 758 insertions(+), 27 deletions(-) create mode 100644 src/meta-srv/src/service/store/etcd_util.rs create mode 100644 src/meta-srv/src/service/store/txn.rs create mode 100644 src/meta-srv/src/service/store/txn/etcd.rs diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 3ed8819d6e..c62f2e65d7 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -360,6 +360,9 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Etcd txn got an error: {err_msg}"))] + EtcdTxnOpResponse { err_msg: String, location: Location }, + // this error is used for custom error mapping // please do not delete it #[snafu(display("Other error, source: {}", source))] @@ -437,6 +440,7 @@ impl ErrorExt for Error { | Error::InvalidTxnResult { .. } | Error::InvalidUtf8Value { .. } | Error::UnexpectedInstructionReply { .. } + | Error::EtcdTxnOpResponse { .. } | Error::Unexpected { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::InvalidCatalogValue { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/metrics.rs b/src/meta-srv/src/metrics.rs index cac6598991..13cdd98c02 100644 --- a/src/meta-srv/src/metrics.rs +++ b/src/meta-srv/src/metrics.rs @@ -15,6 +15,7 @@ pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog"; pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema"; pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request"; +pub(crate) const METRIC_META_TXN_REQUEST: &str = "meta.txn_request"; pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request"; pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num"; pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute"; diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs index a4e763944f..747e488fb8 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/meta-srv/src/sequence.rs @@ -165,6 +165,7 @@ mod tests { use super::*; use crate::service::store::kv::KvStore; use crate::service::store::memory::MemStore; + use crate::service::store::txn::TxnService; #[tokio::test] async fn test_sequence() { @@ -199,6 +200,8 @@ mod tests { async fn test_sequence_force_quit() { struct Noop; + impl TxnService for Noop {} + #[async_trait::async_trait] impl KvStore for Noop { async fn range( diff --git a/src/meta-srv/src/service/store.rs b/src/meta-srv/src/service/store.rs index 926bd2f663..47c012a014 100644 --- a/src/meta-srv/src/service/store.rs +++ b/src/meta-srv/src/service/store.rs @@ -13,9 +13,11 @@ // limitations under the License. pub mod etcd; +pub(crate) mod etcd_util; pub mod ext; pub mod kv; pub mod memory; +pub mod txn; use api::v1::meta::{ store_server, BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, diff --git a/src/meta-srv/src/service/store/etcd.rs b/src/meta-srv/src/service/store/etcd.rs index c97543137c..820ec813e2 100644 --- a/src/meta-srv/src/service/store/etcd.rs +++ b/src/meta-srv/src/service/store/etcd.rs @@ -29,8 +29,10 @@ use etcd_client::{ use crate::error; use crate::error::Result; -use crate::metrics::METRIC_META_KV_REQUEST; +use crate::metrics::{METRIC_META_KV_REQUEST, METRIC_META_TXN_REQUEST}; +use crate::service::store::etcd_util::KvPair; use crate::service::store::kv::{KvStore, KvStoreRef}; +use crate::service::store::txn::TxnService; // Maximum number of operations permitted in a transaction. // The etcd default configuration's `--max-txn-ops` is 128. @@ -463,6 +465,28 @@ impl KvStore for EtcdStore { } } +#[async_trait::async_trait] +impl TxnService for EtcdStore { + async fn txn( + &self, + txn: crate::service::store::txn::Txn, + ) -> Result { + let _timer = timer!( + METRIC_META_TXN_REQUEST, + &[("target", "etcd".to_string()), ("op", "txn".to_string()),] + ); + + let etcd_txn: Txn = txn.into(); + let txn_res = self + .client + .kv_client() + .txn(etcd_txn) + .await + .context(error::EtcdFailedSnafu)?; + txn_res.try_into() + } +} + struct Get { cluster_id: u64, key: Vec, @@ -704,30 +728,6 @@ impl TryFrom for MoveValue { } } -struct KvPair<'a>(&'a etcd_client::KeyValue); - -impl<'a> KvPair<'a> { - /// Creates a `KvPair` from etcd KeyValue - #[inline] - fn new(kv: &'a etcd_client::KeyValue) -> Self { - Self(kv) - } - - #[inline] - fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue { - KeyValue::from(KvPair::new(kv)) - } -} - -impl<'a> From> for KeyValue { - fn from(kv: KvPair<'a>) -> Self { - Self { - key: kv.0.key().to_vec(), - value: kv.0.value().to_vec(), - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/meta-srv/src/service/store/etcd_util.rs b/src/meta-srv/src/service/store/etcd_util.rs new file mode 100644 index 0000000000..e97e8edba3 --- /dev/null +++ b/src/meta-srv/src/service/store/etcd_util.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::KeyValue; + +pub struct KvPair<'a>(&'a etcd_client::KeyValue); + +impl<'a> KvPair<'a> { + /// Creates a `KvPair` from etcd KeyValue + #[inline] + pub fn new(kv: &'a etcd_client::KeyValue) -> Self { + Self(kv) + } + + #[inline] + pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue { + KeyValue::from(KvPair::new(kv)) + } +} + +impl<'a> From> for KeyValue { + fn from(kv: KvPair<'a>) -> Self { + Self { + key: kv.0.key().to_vec(), + value: kv.0.value().to_vec(), + } + } +} diff --git a/src/meta-srv/src/service/store/kv.rs b/src/meta-srv/src/service/store/kv.rs index efbb6a4f00..5c03e9362e 100644 --- a/src/meta-srv/src/service/store/kv.rs +++ b/src/meta-srv/src/service/store/kv.rs @@ -22,12 +22,13 @@ use api::v1::meta::{ }; use crate::error::Result; +use crate::service::store::txn::TxnService; pub type KvStoreRef = Arc; pub type ResettableKvStoreRef = Arc; #[async_trait::async_trait] -pub trait KvStore: Send + Sync { +pub trait KvStore: TxnService { async fn range(&self, req: RangeRequest) -> Result; async fn put(&self, req: PutRequest) -> Result; diff --git a/src/meta-srv/src/service/store/memory.rs b/src/meta-srv/src/service/store/memory.rs index a0d2ed0920..b4cc67ad88 100644 --- a/src/meta-srv/src/service/store/memory.rs +++ b/src/meta-srv/src/service/store/memory.rs @@ -27,8 +27,9 @@ use parking_lot::RwLock; use super::ext::KvStoreExt; use crate::error::Result; -use crate::metrics::METRIC_META_KV_REQUEST; +use crate::metrics::{METRIC_META_KV_REQUEST, METRIC_META_TXN_REQUEST}; use crate::service::store::kv::{KvStore, ResettableKvStore}; +use crate::service::store::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse, TxnService}; pub struct MemStore { inner: RwLock, Vec>>, @@ -119,6 +120,7 @@ impl KvStore for MemStore { } = req; let mut memory = self.inner.write(); + let prev_value = memory.insert(key.clone(), value); let prev_kv = if prev_kv { prev_value.map(|value| KeyValue { key, value }) @@ -164,6 +166,7 @@ impl KvStore for MemStore { } = req; let mut memory = self.inner.write(); + let prev_kvs = if prev_kv { kvs.into_iter() .map(|kv| (kv.key.clone(), memory.insert(kv.key, kv.value))) @@ -198,6 +201,7 @@ impl KvStore for MemStore { } = req; let mut memory = self.inner.write(); + let prev_kvs = if prev_kv { keys.into_iter() .filter_map(|key| memory.remove(&key).map(|value| KeyValue { key, value })) @@ -330,6 +334,72 @@ impl KvStore for MemStore { } } +#[async_trait::async_trait] +impl TxnService for MemStore { + async fn txn(&self, txn: Txn) -> Result { + let _timer = timer!( + METRIC_META_TXN_REQUEST, + &[("target", "memory".to_string()), ("op", "txn".to_string()),] + ); + + let TxnRequest { + compare, + success, + failure, + } = txn.into(); + + let mut memory = self.inner.write(); + + let succeeded = compare + .iter() + .all(|x| x.compare_with_value(memory.get(&x.key))); + + let do_txn = |txn_op| match txn_op { + TxnOp::Put(key, value) => { + let prev_value = memory.insert(key.clone(), value); + let prev_kv = prev_value.map(|value| KeyValue { key, value }); + let put_res = PutResponse { + prev_kv, + ..Default::default() + }; + TxnOpResponse::ResponsePut(put_res) + } + TxnOp::Get(key) => { + let value = memory.get(&key); + let kv = value.map(|value| KeyValue { + key, + value: value.clone(), + }); + let get_res = RangeResponse { + kvs: kv.map(|kv| vec![kv]).unwrap_or(vec![]), + ..Default::default() + }; + TxnOpResponse::ResponseGet(get_res) + } + TxnOp::Delete(key) => { + let prev_value = memory.remove(&key); + let prev_kv = prev_value.map(|value| KeyValue { key, value }); + let delete_res = DeleteRangeResponse { + prev_kvs: prev_kv.map(|kv| vec![kv]).unwrap_or(vec![]), + ..Default::default() + }; + TxnOpResponse::ResponseDelete(delete_res) + } + }; + + let responses: Vec<_> = if succeeded { + success.into_iter().map(do_txn).collect() + } else { + failure.into_iter().map(do_txn).collect() + }; + + Ok(TxnResponse { + succeeded, + responses, + }) + } +} + #[cfg(test)] mod tests { use std::sync::atomic::{AtomicU8, Ordering}; diff --git a/src/meta-srv/src/service/store/txn.rs b/src/meta-srv/src/service/store/txn.rs new file mode 100644 index 0000000000..aaaf7b953a --- /dev/null +++ b/src/meta-srv/src/service/store/txn.rs @@ -0,0 +1,469 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{DeleteRangeResponse, PutResponse, RangeResponse}; + +use crate::error::Result; + +mod etcd; + +#[async_trait::async_trait] +pub trait TxnService: Sync + Send { + async fn txn(&self, _txn: Txn) -> Result { + unimplemented!("txn is not implemented") + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum CompareOp { + Equal, + Greater, + Less, + NotEqual, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Compare { + pub key: Vec, + pub cmp: CompareOp, + /// None means the key does not exist. + pub target: Option>, +} + +impl Compare { + pub fn new(key: Vec, cmp: CompareOp, target: Option>) -> Self { + Self { key, cmp, target } + } + + pub fn with_value(key: Vec, cmp: CompareOp, target: Vec) -> Self { + Self::new(key, cmp, Some(target)) + } + + pub fn with_not_exist_value(key: Vec, cmp: CompareOp) -> Self { + Self::new(key, cmp, None) + } + + pub fn compare_with_value(&self, value: Option<&Vec>) -> bool { + match (value, &self.target) { + (Some(value), Some(target)) => match self.cmp { + CompareOp::Equal => *value == *target, + CompareOp::Greater => *value > *target, + CompareOp::Less => *value < *target, + CompareOp::NotEqual => *value != *target, + }, + (Some(_), None) => match self.cmp { + CompareOp::Equal => false, + CompareOp::Greater => true, + CompareOp::Less => false, + CompareOp::NotEqual => true, + }, + (None, Some(_)) => match self.cmp { + CompareOp::Equal => false, + CompareOp::Greater => false, + CompareOp::Less => true, + CompareOp::NotEqual => true, + }, + (None, None) => match self.cmp { + CompareOp::Equal => true, + CompareOp::Greater => false, + CompareOp::Less => false, + CompareOp::NotEqual => false, + }, + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum TxnOp { + Put(Vec, Vec), + Get(Vec), + Delete(Vec), +} + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct TxnRequest { + pub compare: Vec, + pub success: Vec, + pub failure: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum TxnOpResponse { + ResponsePut(PutResponse), + ResponseGet(RangeResponse), + ResponseDelete(DeleteRangeResponse), +} + +pub struct TxnResponse { + pub succeeded: bool, + pub responses: Vec, +} + +#[derive(Debug, Clone, Default, PartialEq)] +pub struct Txn { + req: TxnRequest, + c_when: bool, + c_then: bool, + c_else: bool, +} + +impl Txn { + pub fn new() -> Self { + Txn::default() + } + + /// Takes a list of comparison. If all comparisons passed in succeed, + /// the operations passed into `and_then()` will be executed. Or the operations + /// passed into `or_else()` will be executed. + #[inline] + pub fn when(mut self, compares: impl Into>) -> Self { + assert!(!self.c_when, "cannot call `when` twice"); + assert!(!self.c_then, "cannot call `when` after `and_then`"); + assert!(!self.c_else, "cannot call `when` after `or_else`"); + + self.c_when = true; + self.req.compare = compares.into(); + self + } + + /// Takes a list of operations. The operations list will be executed, if the + /// comparisons passed into `when()` succeed. + #[inline] + pub fn and_then(mut self, operations: impl Into>) -> Self { + assert!(!self.c_then, "cannot call `and_then` twice"); + assert!(!self.c_else, "cannot call `and_then` after `or_else`"); + + self.c_then = true; + self.req.success = operations.into(); + self + } + + /// Takes a list of operations. The operations list will be executed, if the + /// comparisons passed into `when()` fail. + #[inline] + pub fn or_else(mut self, operations: impl Into>) -> Self { + assert!(!self.c_else, "cannot call `or_else` twice"); + + self.c_else = true; + self.req.failure = operations.into(); + self + } +} + +impl From for TxnRequest { + fn from(txn: Txn) -> Self { + txn.req + } +} + +#[cfg(test)] +mod tests { + use api::v1::meta::{KeyValue, PutRequest}; + + use super::*; + use crate::service::store::ext::KvStoreExt; + use crate::service::store::kv::KvStoreRef; + + #[test] + fn test_compare() { + // Equal + let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]); + assert!(compare.compare_with_value(Some(&vec![1]))); + assert!(!compare.compare_with_value(None)); + let compare = Compare::with_not_exist_value(vec![1], CompareOp::Equal); + assert!(compare.compare_with_value(None)); + + // Greater + let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]); + assert!(compare.compare_with_value(Some(&vec![2]))); + assert!(!compare.compare_with_value(None)); + let compare = Compare::with_not_exist_value(vec![1], CompareOp::Greater); + assert!(!compare.compare_with_value(None)); + assert!(compare.compare_with_value(Some(&vec![1]))); + + // Less + let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]); + assert!(compare.compare_with_value(Some(&vec![0]))); + assert!(compare.compare_with_value(None)); + let compare = Compare::with_not_exist_value(vec![1], CompareOp::Less); + assert!(!compare.compare_with_value(None)); + assert!(!compare.compare_with_value(Some(&vec![1]))); + + // NotEqual + let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]); + assert!(!compare.compare_with_value(Some(&vec![1]))); + assert!(compare.compare_with_value(Some(&vec![2]))); + assert!(compare.compare_with_value(None)); + let compare = Compare::with_not_exist_value(vec![1], CompareOp::NotEqual); + assert!(!compare.compare_with_value(None)); + assert!(compare.compare_with_value(Some(&vec![1]))); + } + + #[test] + fn test_txn() { + let txn = Txn::new() + .when(vec![Compare::with_value( + vec![1], + CompareOp::Equal, + vec![1], + )]) + .and_then(vec![TxnOp::Put(vec![1], vec![1])]) + .or_else(vec![TxnOp::Put(vec![1], vec![2])]); + + assert_eq!( + txn, + Txn { + req: TxnRequest { + compare: vec![Compare::with_value(vec![1], CompareOp::Equal, vec![1])], + success: vec![TxnOp::Put(vec![1], vec![1])], + failure: vec![TxnOp::Put(vec![1], vec![2])], + }, + c_when: true, + c_then: true, + c_else: true, + } + ); + } + + #[tokio::test] + async fn test_txn_one_compare_op() { + let kv_store = create_kv_store().await; + + let _ = kv_store + .put(PutRequest { + key: vec![11], + value: vec![3], + ..Default::default() + }) + .await + .unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_value( + vec![11], + CompareOp::Greater, + vec![1], + )]) + .and_then(vec![TxnOp::Put(vec![11], vec![1])]) + .or_else(vec![TxnOp::Put(vec![11], vec![2])]); + + let txn_response = kv_store.txn(txn).await.unwrap(); + + assert!(txn_response.succeeded); + assert_eq!(txn_response.responses.len(), 1); + } + + #[tokio::test] + async fn test_txn_multi_compare_op() { + let kv_store = create_kv_store().await; + + for i in 1..3 { + let _ = kv_store + .put(PutRequest { + key: vec![i], + value: vec![i], + ..Default::default() + }) + .await + .unwrap(); + } + + let when: Vec<_> = (1..3u8) + .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i])) + .collect(); + + let txn = Txn::new() + .when(when) + .and_then(vec![ + TxnOp::Put(vec![1], vec![10]), + TxnOp::Put(vec![2], vec![20]), + ]) + .or_else(vec![TxnOp::Put(vec![1], vec![11])]); + + let txn_response = kv_store.txn(txn).await.unwrap(); + + assert!(txn_response.succeeded); + assert_eq!(txn_response.responses.len(), 2); + } + + #[tokio::test] + async fn test_txn_compare_equal() { + let kv_store = create_kv_store().await; + let key = vec![101u8]; + kv_store.delete(key.clone(), false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_not_exist_value( + key.clone(), + CompareOp::Equal, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + assert!(txn_response.succeeded); + + let txn_response = kv_store.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + vec![2], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Put(key, vec![4])]); + let txn_response = kv_store.txn(txn).await.unwrap(); + assert!(txn_response.succeeded); + } + + #[tokio::test] + async fn test_txn_compare_greater() { + let kv_store = create_kv_store().await; + let key = vec![102u8]; + kv_store.delete(key.clone(), false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_not_exist_value( + key.clone(), + CompareOp::Greater, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn_response = kv_store.txn(txn).await.unwrap(); + assert!(txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Greater, + vec![1], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Get(key.clone())]); + let mut txn_response = kv_store.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + let res = txn_response.responses.pop().unwrap(); + assert_eq!( + res, + TxnOpResponse::ResponseGet(RangeResponse { + header: None, + kvs: vec![KeyValue { + key, + value: vec![1], + }], + more: false, + }) + ); + } + + #[tokio::test] + async fn test_txn_compare_less() { + let kv_store = create_kv_store().await; + let key = vec![103u8]; + kv_store.delete(vec![3], false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_not_exist_value( + key.clone(), + CompareOp::Less, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn_response = kv_store.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Less, + vec![2], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Get(key.clone())]); + let mut txn_response = kv_store.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + let res = txn_response.responses.pop().unwrap(); + assert_eq!( + res, + TxnOpResponse::ResponseGet(RangeResponse { + header: None, + kvs: vec![KeyValue { + key, + value: vec![2], + }], + more: false, + }) + ); + } + + #[tokio::test] + async fn test_txn_compare_not_equal() { + let kv_store = create_kv_store().await; + let key = vec![104u8]; + kv_store.delete(key.clone(), false).await.unwrap(); + + let txn = Txn::new() + .when(vec![Compare::with_not_exist_value( + key.clone(), + CompareOp::NotEqual, + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![1])]) + .or_else(vec![TxnOp::Put(key.clone(), vec![2])]); + let txn_response = kv_store.txn(txn.clone()).await.unwrap(); + assert!(!txn_response.succeeded); + + let txn_response = kv_store.txn(txn).await.unwrap(); + assert!(txn_response.succeeded); + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + vec![2], + )]) + .and_then(vec![TxnOp::Put(key.clone(), vec![3])]) + .or_else(vec![TxnOp::Get(key.clone())]); + let mut txn_response = kv_store.txn(txn).await.unwrap(); + assert!(!txn_response.succeeded); + let res = txn_response.responses.pop().unwrap(); + assert_eq!( + res, + TxnOpResponse::ResponseGet(RangeResponse { + header: None, + kvs: vec![KeyValue { + key, + value: vec![1], + }], + more: false, + }) + ); + } + + async fn create_kv_store() -> KvStoreRef { + std::sync::Arc::new(crate::service::store::memory::MemStore::new()) + // TODO(jiachun): Add a feature to test against etcd in github CI + // + // The same test can be run against etcd by uncommenting the following line + // crate::service::store::etcd::EtcdStore::with_endpoints(["127.0.0.1:2379"]) + // .await + // .unwrap() + } +} diff --git a/src/meta-srv/src/service/store/txn/etcd.rs b/src/meta-srv/src/service/store/txn/etcd.rs new file mode 100644 index 0000000000..e2fc3c0315 --- /dev/null +++ b/src/meta-srv/src/service/store/txn/etcd.rs @@ -0,0 +1,142 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::{DeleteRangeResponse, PutResponse, RangeResponse}; +use etcd_client::{ + Compare as EtcdCompare, CompareOp as EtcdCompareOp, Txn as EtcdTxn, TxnOp as EtcdTxnOp, + TxnOpResponse as EtcdTxnOpResponse, TxnResponse as EtcdTxnResponse, +}; + +use crate::error::{self, Result}; +use crate::service::store::etcd_util::KvPair; +use crate::service::store::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse}; + +impl From for EtcdTxn { + fn from(txn: Txn) -> Self { + let mut etcd_txn = EtcdTxn::new(); + if txn.c_when { + let compares = txn + .req + .compare + .into_iter() + .map(EtcdCompare::from) + .collect::>(); + etcd_txn = etcd_txn.when(compares); + } + if txn.c_then { + let success = txn + .req + .success + .into_iter() + .map(EtcdTxnOp::from) + .collect::>(); + etcd_txn = etcd_txn.and_then(success); + } + if txn.c_else { + let failure = txn + .req + .failure + .into_iter() + .map(EtcdTxnOp::from) + .collect::>(); + etcd_txn = etcd_txn.or_else(failure); + } + etcd_txn + } +} + +impl From for EtcdCompare { + fn from(cmp: Compare) -> Self { + let etcd_cmp = match cmp.cmp { + CompareOp::Equal => EtcdCompareOp::Equal, + CompareOp::Greater => EtcdCompareOp::Greater, + CompareOp::Less => EtcdCompareOp::Less, + CompareOp::NotEqual => EtcdCompareOp::NotEqual, + }; + match cmp.target { + Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target), + // create revision 0 means key was not exist + None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0), + } + } +} + +impl From for EtcdTxnOp { + fn from(op: TxnOp) -> Self { + match op { + TxnOp::Put(key, value) => EtcdTxnOp::put(key, value, None), + TxnOp::Get(key) => EtcdTxnOp::get(key, None), + TxnOp::Delete(key) => EtcdTxnOp::delete(key, None), + } + } +} + +impl TryFrom for TxnOpResponse { + type Error = error::Error; + + fn try_from(op_resp: EtcdTxnOpResponse) -> Result { + match op_resp { + EtcdTxnOpResponse::Put(res) => { + let prev_kv = res.prev_key().map(KvPair::from_etcd_kv); + let put_res = PutResponse { + prev_kv, + ..Default::default() + }; + Ok(TxnOpResponse::ResponsePut(put_res)) + } + EtcdTxnOpResponse::Get(res) => { + let kvs = res.kvs().iter().map(KvPair::from_etcd_kv).collect(); + let range_res = RangeResponse { + kvs, + ..Default::default() + }; + Ok(TxnOpResponse::ResponseGet(range_res)) + } + EtcdTxnOpResponse::Delete(res) => { + let prev_kvs = res + .prev_kvs() + .iter() + .map(KvPair::from_etcd_kv) + .collect::>(); + let delete_res = DeleteRangeResponse { + prev_kvs, + deleted: res.deleted(), + ..Default::default() + }; + Ok(TxnOpResponse::ResponseDelete(delete_res)) + } + EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu { + err_msg: "nested txn is not supported", + } + .fail(), + } + } +} + +impl TryFrom for TxnResponse { + type Error = error::Error; + + fn try_from(resp: EtcdTxnResponse) -> Result { + let succeeded = resp.succeeded(); + let responses = resp + .op_responses() + .into_iter() + .map(TxnOpResponse::try_from) + .collect::>>()?; + Ok(Self { + succeeded, + responses, + }) + } +}