mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-13 08:43:00 +00:00
chore: internal ChrootKvBackend refactor and test (#2799)
* try avoid rate limit Signed-off-by: tison <wander4096@gmail.com> * chroot utilities as method Signed-off-by: tison <wander4096@gmail.com> * add test Signed-off-by: tison <wander4096@gmail.com> * make clippy happy Signed-off-by: tison <wander4096@gmail.com> --------- Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
@@ -41,9 +41,9 @@ impl TxnService for ChrootKvBackend {
|
||||
type Error = Error;
|
||||
|
||||
async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
|
||||
let txn = txn_prepend_root(&self.root, txn);
|
||||
let txn = self.txn_prepend_root(txn);
|
||||
let txn_res = self.inner.txn(txn).await?;
|
||||
Ok(chroot_txn_response(&self.root, txn_res))
|
||||
Ok(self.chroot_txn_response(txn_res))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,33 +58,33 @@ impl KvBackend for ChrootKvBackend {
|
||||
}
|
||||
|
||||
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse, Self::Error> {
|
||||
req.key = key_prepend_root(&self.root, req.key);
|
||||
req.range_end = range_end_prepend_root(&self.root, req.range_end);
|
||||
req.key = self.key_prepend_root(req.key);
|
||||
req.range_end = self.range_end_prepend_root(req.range_end);
|
||||
let mut res = self.inner.range(req).await?;
|
||||
res.kvs = res
|
||||
.kvs
|
||||
.drain(..)
|
||||
.map(chroot_key_value_with(&self.root))
|
||||
.map(self.chroot_key_value_with())
|
||||
.collect();
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn put(&self, mut req: PutRequest) -> Result<PutResponse, Self::Error> {
|
||||
req.key = key_prepend_root(&self.root, req.key);
|
||||
req.key = self.key_prepend_root(req.key);
|
||||
let mut res = self.inner.put(req).await?;
|
||||
res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root));
|
||||
res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with());
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
|
||||
for kv in req.kvs.iter_mut() {
|
||||
kv.key = key_prepend_root(&self.root, kv.key.drain(..).collect());
|
||||
kv.key = self.key_prepend_root(kv.key.drain(..).collect());
|
||||
}
|
||||
let mut res = self.inner.batch_put(req).await?;
|
||||
res.prev_kvs = res
|
||||
.prev_kvs
|
||||
.drain(..)
|
||||
.map(chroot_key_value_with(&self.root))
|
||||
.map(self.chroot_key_value_with())
|
||||
.collect();
|
||||
Ok(res)
|
||||
}
|
||||
@@ -93,13 +93,13 @@ impl KvBackend for ChrootKvBackend {
|
||||
req.keys = req
|
||||
.keys
|
||||
.drain(..)
|
||||
.map(|key| key_prepend_root(&self.root, key))
|
||||
.map(|key| self.key_prepend_root(key))
|
||||
.collect();
|
||||
let mut res = self.inner.batch_get(req).await?;
|
||||
res.kvs = res
|
||||
.kvs
|
||||
.drain(..)
|
||||
.map(chroot_key_value_with(&self.root))
|
||||
.map(self.chroot_key_value_with())
|
||||
.collect();
|
||||
Ok(res)
|
||||
}
|
||||
@@ -108,9 +108,9 @@ impl KvBackend for ChrootKvBackend {
|
||||
&self,
|
||||
mut req: CompareAndPutRequest,
|
||||
) -> Result<CompareAndPutResponse, Self::Error> {
|
||||
req.key = key_prepend_root(&self.root, req.key);
|
||||
req.key = self.key_prepend_root(req.key);
|
||||
let mut res = self.inner.compare_and_put(req).await?;
|
||||
res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root));
|
||||
res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with());
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -118,13 +118,13 @@ impl KvBackend for ChrootKvBackend {
|
||||
&self,
|
||||
mut req: DeleteRangeRequest,
|
||||
) -> Result<DeleteRangeResponse, Self::Error> {
|
||||
req.key = key_prepend_root(&self.root, req.key);
|
||||
req.range_end = range_end_prepend_root(&self.root, req.range_end);
|
||||
req.key = self.key_prepend_root(req.key);
|
||||
req.range_end = self.range_end_prepend_root(req.range_end);
|
||||
let mut res = self.inner.delete_range(req).await?;
|
||||
res.prev_kvs = res
|
||||
.prev_kvs
|
||||
.drain(..)
|
||||
.map(chroot_key_value_with(&self.root))
|
||||
.map(self.chroot_key_value_with())
|
||||
.collect();
|
||||
Ok(res)
|
||||
}
|
||||
@@ -136,121 +136,138 @@ impl KvBackend for ChrootKvBackend {
|
||||
req.keys = req
|
||||
.keys
|
||||
.drain(..)
|
||||
.map(|key| key_prepend_root(&self.root, key))
|
||||
.map(|key| self.key_prepend_root(key))
|
||||
.collect();
|
||||
let mut res = self.inner.batch_delete(req).await?;
|
||||
res.prev_kvs = res
|
||||
.prev_kvs
|
||||
.drain(..)
|
||||
.map(chroot_key_value_with(&self.root))
|
||||
.map(self.chroot_key_value_with())
|
||||
.collect();
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
fn key_strip_root(root: &[u8], mut key: Vec<u8>) -> Vec<u8> {
|
||||
debug_assert!(
|
||||
key.starts_with(root),
|
||||
"key={}, root={}",
|
||||
String::from_utf8_lossy(&key),
|
||||
String::from_utf8_lossy(root),
|
||||
);
|
||||
key.split_off(root.len())
|
||||
}
|
||||
|
||||
fn chroot_key_value_with(root: &[u8]) -> impl FnMut(KeyValue) -> KeyValue + '_ {
|
||||
|kv| KeyValue {
|
||||
key: key_strip_root(root, kv.key),
|
||||
value: kv.value,
|
||||
impl ChrootKvBackend {
|
||||
fn key_strip_root(&self, mut key: Vec<u8>) -> Vec<u8> {
|
||||
let root = &self.root;
|
||||
debug_assert!(
|
||||
key.starts_with(root),
|
||||
"key={}, root={}",
|
||||
String::from_utf8_lossy(&key),
|
||||
String::from_utf8_lossy(root),
|
||||
);
|
||||
key.split_off(root.len())
|
||||
}
|
||||
}
|
||||
fn chroot_txn_response(root: &[u8], mut txn_res: TxnResponse) -> TxnResponse {
|
||||
for resp in txn_res.responses.iter_mut() {
|
||||
match resp {
|
||||
TxnOpResponse::ResponsePut(r) => {
|
||||
r.prev_kv = r.prev_kv.take().map(chroot_key_value_with(root));
|
||||
}
|
||||
TxnOpResponse::ResponseGet(r) => {
|
||||
r.kvs = r.kvs.drain(..).map(chroot_key_value_with(root)).collect();
|
||||
}
|
||||
TxnOpResponse::ResponseDelete(r) => {
|
||||
r.prev_kvs = r
|
||||
.prev_kvs
|
||||
.drain(..)
|
||||
.map(chroot_key_value_with(root))
|
||||
.collect();
|
||||
}
|
||||
|
||||
fn chroot_key_value_with(&self) -> impl FnMut(KeyValue) -> KeyValue + '_ {
|
||||
|kv| KeyValue {
|
||||
key: self.key_strip_root(kv.key),
|
||||
value: kv.value,
|
||||
}
|
||||
}
|
||||
txn_res
|
||||
}
|
||||
|
||||
fn key_prepend_root(root: &[u8], mut key: Vec<u8>) -> Vec<u8> {
|
||||
let mut new_key = root.to_vec();
|
||||
new_key.append(&mut key);
|
||||
new_key
|
||||
}
|
||||
|
||||
// see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go
|
||||
fn range_end_prepend_root(root: &[u8], mut range_end: Vec<u8>) -> Vec<u8> {
|
||||
if range_end == [0] {
|
||||
// the edge of the keyspace
|
||||
let mut new_end = root.to_vec();
|
||||
let mut ok = false;
|
||||
for i in (0..new_end.len()).rev() {
|
||||
new_end[i] = new_end[i].wrapping_add(1);
|
||||
if new_end[i] != 0 {
|
||||
ok = true;
|
||||
break;
|
||||
fn chroot_txn_response(&self, mut txn_res: TxnResponse) -> TxnResponse {
|
||||
for resp in txn_res.responses.iter_mut() {
|
||||
match resp {
|
||||
TxnOpResponse::ResponsePut(r) => {
|
||||
r.prev_kv = r.prev_kv.take().map(self.chroot_key_value_with());
|
||||
}
|
||||
TxnOpResponse::ResponseGet(r) => {
|
||||
r.kvs = r.kvs.drain(..).map(self.chroot_key_value_with()).collect();
|
||||
}
|
||||
TxnOpResponse::ResponseDelete(r) => {
|
||||
r.prev_kvs = r
|
||||
.prev_kvs
|
||||
.drain(..)
|
||||
.map(self.chroot_key_value_with())
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
// 0xff..ff => 0x00
|
||||
new_end = vec![0];
|
||||
}
|
||||
new_end
|
||||
} else if !range_end.is_empty() {
|
||||
let mut new_end = root.to_vec();
|
||||
new_end.append(&mut range_end);
|
||||
new_end
|
||||
} else {
|
||||
vec![]
|
||||
txn_res
|
||||
}
|
||||
}
|
||||
|
||||
fn txn_prepend_root(root: &[u8], mut txn: Txn) -> Txn {
|
||||
fn op_prepend_root(root: &[u8], op: TxnOp) -> TxnOp {
|
||||
match op {
|
||||
TxnOp::Put(k, v) => TxnOp::Put(key_prepend_root(root, k), v),
|
||||
TxnOp::Get(k) => TxnOp::Get(key_prepend_root(root, k)),
|
||||
TxnOp::Delete(k) => TxnOp::Delete(key_prepend_root(root, k)),
|
||||
fn key_prepend_root(&self, mut key: Vec<u8>) -> Vec<u8> {
|
||||
let mut new_key = self.root.clone();
|
||||
new_key.append(&mut key);
|
||||
new_key
|
||||
}
|
||||
|
||||
// see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go
|
||||
fn range_end_prepend_root(&self, mut range_end: Vec<u8>) -> Vec<u8> {
|
||||
let root = &self.root;
|
||||
if range_end == [0] {
|
||||
// the edge of the keyspace
|
||||
let mut new_end = root.clone();
|
||||
let mut ok = false;
|
||||
for i in (0..new_end.len()).rev() {
|
||||
new_end[i] = new_end[i].wrapping_add(1);
|
||||
if new_end[i] != 0 {
|
||||
ok = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !ok {
|
||||
// 0xff..ff => 0x00
|
||||
new_end = vec![0];
|
||||
}
|
||||
new_end
|
||||
} else if !range_end.is_empty() {
|
||||
let mut new_end = root.clone();
|
||||
new_end.append(&mut range_end);
|
||||
new_end
|
||||
} else {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
txn.req.success = txn
|
||||
.req
|
||||
.success
|
||||
.drain(..)
|
||||
.map(|op| op_prepend_root(root, op))
|
||||
.collect();
|
||||
|
||||
txn.req.failure = txn
|
||||
.req
|
||||
.failure
|
||||
.drain(..)
|
||||
.map(|op| op_prepend_root(root, op))
|
||||
.collect();
|
||||
|
||||
txn.req.compare = txn
|
||||
.req
|
||||
.compare
|
||||
.drain(..)
|
||||
.map(|cmp| super::txn::Compare {
|
||||
key: key_prepend_root(root, cmp.key),
|
||||
cmp: cmp.cmp,
|
||||
target: cmp.target,
|
||||
})
|
||||
.collect();
|
||||
|
||||
txn
|
||||
fn txn_prepend_root(&self, mut txn: Txn) -> Txn {
|
||||
let op_prepend_root = |op: TxnOp| match op {
|
||||
TxnOp::Put(k, v) => TxnOp::Put(self.key_prepend_root(k), v),
|
||||
TxnOp::Get(k) => TxnOp::Get(self.key_prepend_root(k)),
|
||||
TxnOp::Delete(k) => TxnOp::Delete(self.key_prepend_root(k)),
|
||||
};
|
||||
txn.req.success = txn.req.success.drain(..).map(op_prepend_root).collect();
|
||||
txn.req.failure = txn.req.failure.drain(..).map(op_prepend_root).collect();
|
||||
txn.req.compare = txn
|
||||
.req
|
||||
.compare
|
||||
.drain(..)
|
||||
.map(|cmp| super::txn::Compare {
|
||||
key: self.key_prepend_root(cmp.key),
|
||||
cmp: cmp.cmp,
|
||||
target: cmp.target,
|
||||
})
|
||||
.collect();
|
||||
txn
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::kv_backend::chroot::ChrootKvBackend;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
|
||||
#[test]
|
||||
fn test_prefix_key_and_range_end() {
|
||||
fn run_test_case(pfx: &[u8], key: &[u8], end: &[u8], w_key: &[u8], w_end: &[u8]) {
|
||||
let chroot = ChrootKvBackend::new(pfx.into(), Arc::new(MemoryKvBackend::new()));
|
||||
assert_eq!(chroot.key_prepend_root(key.into()), w_key);
|
||||
assert_eq!(chroot.range_end_prepend_root(end.into()), w_end);
|
||||
}
|
||||
|
||||
// single key
|
||||
run_test_case(b"pfx/", b"a", b"", b"pfx/a", b"");
|
||||
|
||||
// range
|
||||
run_test_case(b"pfx/", b"abc", b"def", b"pfx/abc", b"pfx/def");
|
||||
|
||||
// one-sided range (HACK - b'/' + 1 = b'0')
|
||||
run_test_case(b"pfx/", b"abc", b"\0", b"pfx/abc", b"pfx0");
|
||||
|
||||
// one-sided range, end of keyspace
|
||||
run_test_case(b"\xFF\xFF", b"abc", b"\0", b"\xff\xffabc", b"\0");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
version: '3.8'
|
||||
services:
|
||||
etcd:
|
||||
image: bitnami/etcd:latest
|
||||
image: public.ecr.aws/bitnami/etcd:3.5
|
||||
ports:
|
||||
- "2379:2379"
|
||||
- "2380:2380"
|
||||
|
||||
Reference in New Issue
Block a user