mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: use txn to impl cas (#3936)
* feat: usr txn to impl cas * chore: fix test
This commit is contained in:
@@ -457,9 +457,8 @@ mod tests {
|
||||
use common_meta::kv_backend::{KvBackend, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
|
||||
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
|
||||
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest,
|
||||
RangeResponse,
|
||||
BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
|
||||
PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
use dashmap::DashMap;
|
||||
@@ -519,13 +518,6 @@ mod tests {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn compare_and_put(
|
||||
&self,
|
||||
_req: CompareAndPutRequest,
|
||||
) -> Result<CompareAndPutResponse, Self::Error> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn delete_range(
|
||||
&self,
|
||||
_req: DeleteRangeRequest,
|
||||
|
||||
@@ -20,6 +20,7 @@ use common_error::ext::ErrorExt;
|
||||
pub use txn::TxnService;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::kv_backend::txn::{Txn, TxnOpResponse};
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
@@ -52,11 +53,6 @@ where
|
||||
|
||||
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
|
||||
|
||||
async fn compare_and_put(
|
||||
&self,
|
||||
req: CompareAndPutRequest,
|
||||
) -> Result<CompareAndPutResponse, Self::Error>;
|
||||
|
||||
async fn delete_range(
|
||||
&self,
|
||||
req: DeleteRangeRequest,
|
||||
@@ -80,6 +76,33 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// CAS: Compares the value at the key with the given value, and if they are
|
||||
/// equal, puts the new value at the key.
|
||||
async fn compare_and_put(
|
||||
&self,
|
||||
req: CompareAndPutRequest,
|
||||
) -> Result<CompareAndPutResponse, Self::Error> {
|
||||
let CompareAndPutRequest { key, expect, value } = req;
|
||||
let txn = if expect.is_empty() {
|
||||
Txn::put_if_not_exists(key, value)
|
||||
} else {
|
||||
Txn::compare_and_put(key, expect, value)
|
||||
};
|
||||
let txn_res = self.txn(txn).await?;
|
||||
|
||||
let success = txn_res.succeeded;
|
||||
// The response is guaranteed to have at most one element.
|
||||
let op_res = txn_res.responses.into_iter().next();
|
||||
let prev_kv = match op_res {
|
||||
Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
|
||||
Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
|
||||
Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(CompareAndPutResponse { success, prev_kv })
|
||||
}
|
||||
|
||||
/// Puts a value at a key. If `if_not_exists` is `true`, the operation
|
||||
/// ensures the key does not exist before applying the PUT operation.
|
||||
/// Otherwise, it simply applies the PUT operation without checking for
|
||||
|
||||
@@ -16,10 +16,9 @@ use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use etcd_client::{
|
||||
Client, Compare, CompareOp, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse,
|
||||
TxnResponse,
|
||||
Client, DeleteOptions, GetOptions, PutOptions, Txn, TxnOp, TxnOpResponse, TxnResponse,
|
||||
};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use super::KvBackendRef;
|
||||
use crate::error::{self, Error, Result};
|
||||
@@ -28,8 +27,8 @@ use crate::kv_backend::{KvBackend, TxnService};
|
||||
use crate::metrics::METRIC_META_TXN_REQUEST;
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
@@ -202,53 +201,6 @@ impl KvBackend for EtcdStore {
|
||||
Ok(BatchGetResponse { kvs })
|
||||
}
|
||||
|
||||
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
|
||||
let CompareAndPut {
|
||||
key,
|
||||
expect,
|
||||
value,
|
||||
put_options,
|
||||
} = req.try_into()?;
|
||||
|
||||
let compare = if expect.is_empty() {
|
||||
// create if absent
|
||||
// revision 0 means key was not exist
|
||||
Compare::create_revision(key.clone(), CompareOp::Equal, 0)
|
||||
} else {
|
||||
// compare and put
|
||||
Compare::value(key.clone(), CompareOp::Equal, expect)
|
||||
};
|
||||
let put = TxnOp::put(key.clone(), value, put_options);
|
||||
let get = TxnOp::get(key, None);
|
||||
let txn = Txn::new()
|
||||
.when(vec![compare])
|
||||
.and_then(vec![put])
|
||||
.or_else(vec![get]);
|
||||
|
||||
let txn_res = self
|
||||
.client
|
||||
.kv_client()
|
||||
.txn(txn)
|
||||
.await
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
let success = txn_res.succeeded();
|
||||
let op_res = txn_res
|
||||
.op_responses()
|
||||
.pop()
|
||||
.context(error::InvalidTxnResultSnafu {
|
||||
err_msg: "empty response",
|
||||
})?;
|
||||
|
||||
let prev_kv = match op_res {
|
||||
TxnOpResponse::Put(mut res) => res.take_prev_key().map(convert_key_value),
|
||||
TxnOpResponse::Get(mut res) => res.take_kvs().into_iter().next().map(convert_key_value),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
Ok(CompareAndPutResponse { success, prev_kv })
|
||||
}
|
||||
|
||||
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
|
||||
let Delete { key, options } = req.try_into()?;
|
||||
|
||||
@@ -461,28 +413,6 @@ impl TryFrom<BatchDeleteRequest> for BatchDelete {
|
||||
}
|
||||
}
|
||||
|
||||
struct CompareAndPut {
|
||||
key: Vec<u8>,
|
||||
expect: Vec<u8>,
|
||||
value: Vec<u8>,
|
||||
put_options: Option<PutOptions>,
|
||||
}
|
||||
|
||||
impl TryFrom<CompareAndPutRequest> for CompareAndPut {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(req: CompareAndPutRequest) -> Result<Self> {
|
||||
let CompareAndPutRequest { key, expect, value } = req;
|
||||
|
||||
Ok(CompareAndPut {
|
||||
key,
|
||||
expect,
|
||||
value,
|
||||
put_options: Some(PutOptions::default().with_prev_key()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct Delete {
|
||||
key: Vec<u8>,
|
||||
options: Option<DeleteOptions>,
|
||||
@@ -597,22 +527,6 @@ mod tests {
|
||||
let _ = batch_delete.options.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_compare_and_put() {
|
||||
let req = CompareAndPutRequest {
|
||||
key: b"test_key".to_vec(),
|
||||
expect: b"test_expect".to_vec(),
|
||||
value: b"test_value".to_vec(),
|
||||
};
|
||||
|
||||
let compare_and_put: CompareAndPut = req.try_into().unwrap();
|
||||
|
||||
assert_eq!(b"test_key".to_vec(), compare_and_put.key);
|
||||
assert_eq!(b"test_expect".to_vec(), compare_and_put.expect);
|
||||
assert_eq!(b"test_value".to_vec(), compare_and_put.value);
|
||||
let _ = compare_and_put.put_options.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_delete() {
|
||||
let req = DeleteRangeRequest {
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::btree_map::Entry;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::marker::PhantomData;
|
||||
@@ -29,8 +28,8 @@ use crate::kv_backend::{KvBackend, TxnService};
|
||||
use crate::metrics::METRIC_META_TXN_REQUEST;
|
||||
use crate::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
};
|
||||
use crate::rpc::KeyValue;
|
||||
|
||||
@@ -190,41 +189,6 @@ impl<T: ErrorExt + Send + Sync + 'static> KvBackend for MemoryKvBackend<T> {
|
||||
Ok(BatchGetResponse { kvs })
|
||||
}
|
||||
|
||||
async fn compare_and_put(
|
||||
&self,
|
||||
req: CompareAndPutRequest,
|
||||
) -> Result<CompareAndPutResponse, Self::Error> {
|
||||
let CompareAndPutRequest { key, expect, value } = req;
|
||||
|
||||
let mut kvs = self.kvs.write().unwrap();
|
||||
|
||||
let existed = kvs.entry(key);
|
||||
let (success, prev_kv) = match existed {
|
||||
Entry::Vacant(e) => {
|
||||
let expected = expect.is_empty();
|
||||
if expected {
|
||||
let _ = e.insert(value);
|
||||
}
|
||||
(expected, None)
|
||||
}
|
||||
Entry::Occupied(mut existed) => {
|
||||
let expected = existed.get() == &expect;
|
||||
let prev_kv = if expected {
|
||||
let _ = existed.insert(value);
|
||||
None
|
||||
} else {
|
||||
Some(KeyValue {
|
||||
key: existed.key().clone(),
|
||||
value: existed.get().clone(),
|
||||
})
|
||||
};
|
||||
(expected, prev_kv)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(CompareAndPutResponse { success, prev_kv })
|
||||
}
|
||||
|
||||
async fn delete_range(
|
||||
&self,
|
||||
req: DeleteRangeRequest,
|
||||
|
||||
@@ -170,13 +170,13 @@ impl Txn {
|
||||
}
|
||||
|
||||
/// Builds a transaction that puts a value at a key if the key exists and the value
|
||||
/// is equal to `old_value`.
|
||||
pub fn compare_and_put(key: Vec<u8>, old_value: Vec<u8>, value: Vec<u8>) -> Self {
|
||||
/// is equal to `expect`.
|
||||
pub fn compare_and_put(key: Vec<u8>, expect: Vec<u8>, value: Vec<u8>) -> Self {
|
||||
Self::new()
|
||||
.when(vec![Compare::with_value(
|
||||
key.clone(),
|
||||
CompareOp::Equal,
|
||||
old_value,
|
||||
expect,
|
||||
)])
|
||||
.and_then(vec![TxnOp::Put(key.clone(), value)])
|
||||
.or_else(vec![TxnOp::Get(key)])
|
||||
|
||||
@@ -25,8 +25,8 @@ use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnRes
|
||||
use common_meta::kv_backend::{KvBackend, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
|
||||
RangeRequest, RangeResponse,
|
||||
};
|
||||
use common_meta::rpc::KeyValue;
|
||||
use common_meta::util::get_next_prefix_key;
|
||||
@@ -277,42 +277,6 @@ impl KvBackend for RaftEngineBackend {
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn compare_and_put(
|
||||
&self,
|
||||
req: CompareAndPutRequest,
|
||||
) -> Result<CompareAndPutResponse, Self::Error> {
|
||||
let CompareAndPutRequest { key, expect, value } = req;
|
||||
|
||||
let mut batch = LogBatch::with_capacity(1);
|
||||
let engine = self.engine.write().unwrap();
|
||||
let existing = engine_get(&engine, &key)?;
|
||||
let eq = existing
|
||||
.as_ref()
|
||||
.map(|kv| kv.value == expect)
|
||||
.unwrap_or_else(|| {
|
||||
// if the associated value of key does not exist and expect is empty,
|
||||
// then we still consider them as equal.
|
||||
expect.is_empty()
|
||||
});
|
||||
|
||||
if eq {
|
||||
batch
|
||||
.put(SYSTEM_NAMESPACE, key, value)
|
||||
.context(RaftEngineSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)?;
|
||||
engine
|
||||
.write(&mut batch, false)
|
||||
.context(RaftEngineSnafu)
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)?;
|
||||
}
|
||||
Ok(CompareAndPutResponse {
|
||||
success: eq,
|
||||
prev_kv: existing,
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_range(
|
||||
&self,
|
||||
req: DeleteRangeRequest,
|
||||
@@ -436,6 +400,7 @@ mod tests {
|
||||
prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
|
||||
test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
|
||||
};
|
||||
use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use raft_engine::{Config, ReadableSize, RecoveryMode};
|
||||
|
||||
@@ -510,7 +475,8 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(success);
|
||||
assert_eq!(b"word".as_slice(), &prev_kv.unwrap().value);
|
||||
// Do not return prev_kv on success
|
||||
assert!(prev_kv.is_none());
|
||||
|
||||
assert_eq!(
|
||||
b"world".as_slice(),
|
||||
|
||||
Reference in New Issue
Block a user