diff --git a/src/common/meta/src/kv_backend/chroot.rs b/src/common/meta/src/kv_backend/chroot.rs index ba2e35f41b..6f097cd83e 100644 --- a/src/common/meta/src/kv_backend/chroot.rs +++ b/src/common/meta/src/kv_backend/chroot.rs @@ -41,9 +41,9 @@ impl TxnService for ChrootKvBackend { type Error = Error; async fn txn(&self, txn: Txn) -> Result { - let txn = txn_prepend_root(&self.root, txn); + let txn = self.txn_prepend_root(txn); let txn_res = self.inner.txn(txn).await?; - Ok(chroot_txn_response(&self.root, txn_res)) + Ok(self.chroot_txn_response(txn_res)) } } @@ -58,33 +58,33 @@ impl KvBackend for ChrootKvBackend { } async fn range(&self, mut req: RangeRequest) -> Result { - req.key = key_prepend_root(&self.root, req.key); - req.range_end = range_end_prepend_root(&self.root, req.range_end); + req.key = self.key_prepend_root(req.key); + req.range_end = self.range_end_prepend_root(req.range_end); let mut res = self.inner.range(req).await?; res.kvs = res .kvs .drain(..) - .map(chroot_key_value_with(&self.root)) + .map(self.chroot_key_value_with()) .collect(); Ok(res) } async fn put(&self, mut req: PutRequest) -> Result { - req.key = key_prepend_root(&self.root, req.key); + req.key = self.key_prepend_root(req.key); let mut res = self.inner.put(req).await?; - res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root)); + res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with()); Ok(res) } async fn batch_put(&self, mut req: BatchPutRequest) -> Result { for kv in req.kvs.iter_mut() { - kv.key = key_prepend_root(&self.root, kv.key.drain(..).collect()); + kv.key = self.key_prepend_root(kv.key.drain(..).collect()); } let mut res = self.inner.batch_put(req).await?; res.prev_kvs = res .prev_kvs .drain(..) - .map(chroot_key_value_with(&self.root)) + .map(self.chroot_key_value_with()) .collect(); Ok(res) } @@ -93,13 +93,13 @@ impl KvBackend for ChrootKvBackend { req.keys = req .keys .drain(..) - .map(|key| key_prepend_root(&self.root, key)) + .map(|key| self.key_prepend_root(key)) .collect(); let mut res = self.inner.batch_get(req).await?; res.kvs = res .kvs .drain(..) - .map(chroot_key_value_with(&self.root)) + .map(self.chroot_key_value_with()) .collect(); Ok(res) } @@ -108,9 +108,9 @@ impl KvBackend for ChrootKvBackend { &self, mut req: CompareAndPutRequest, ) -> Result { - req.key = key_prepend_root(&self.root, req.key); + req.key = self.key_prepend_root(req.key); let mut res = self.inner.compare_and_put(req).await?; - res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root)); + res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with()); Ok(res) } @@ -118,13 +118,13 @@ impl KvBackend for ChrootKvBackend { &self, mut req: DeleteRangeRequest, ) -> Result { - req.key = key_prepend_root(&self.root, req.key); - req.range_end = range_end_prepend_root(&self.root, req.range_end); + req.key = self.key_prepend_root(req.key); + req.range_end = self.range_end_prepend_root(req.range_end); let mut res = self.inner.delete_range(req).await?; res.prev_kvs = res .prev_kvs .drain(..) - .map(chroot_key_value_with(&self.root)) + .map(self.chroot_key_value_with()) .collect(); Ok(res) } @@ -136,121 +136,138 @@ impl KvBackend for ChrootKvBackend { req.keys = req .keys .drain(..) - .map(|key| key_prepend_root(&self.root, key)) + .map(|key| self.key_prepend_root(key)) .collect(); let mut res = self.inner.batch_delete(req).await?; res.prev_kvs = res .prev_kvs .drain(..) - .map(chroot_key_value_with(&self.root)) + .map(self.chroot_key_value_with()) .collect(); Ok(res) } } -fn key_strip_root(root: &[u8], mut key: Vec) -> Vec { - debug_assert!( - key.starts_with(root), - "key={}, root={}", - String::from_utf8_lossy(&key), - String::from_utf8_lossy(root), - ); - key.split_off(root.len()) -} - -fn chroot_key_value_with(root: &[u8]) -> impl FnMut(KeyValue) -> KeyValue + '_ { - |kv| KeyValue { - key: key_strip_root(root, kv.key), - value: kv.value, +impl ChrootKvBackend { + fn key_strip_root(&self, mut key: Vec) -> Vec { + let root = &self.root; + debug_assert!( + key.starts_with(root), + "key={}, root={}", + String::from_utf8_lossy(&key), + String::from_utf8_lossy(root), + ); + key.split_off(root.len()) } -} -fn chroot_txn_response(root: &[u8], mut txn_res: TxnResponse) -> TxnResponse { - for resp in txn_res.responses.iter_mut() { - match resp { - TxnOpResponse::ResponsePut(r) => { - r.prev_kv = r.prev_kv.take().map(chroot_key_value_with(root)); - } - TxnOpResponse::ResponseGet(r) => { - r.kvs = r.kvs.drain(..).map(chroot_key_value_with(root)).collect(); - } - TxnOpResponse::ResponseDelete(r) => { - r.prev_kvs = r - .prev_kvs - .drain(..) - .map(chroot_key_value_with(root)) - .collect(); - } + + fn chroot_key_value_with(&self) -> impl FnMut(KeyValue) -> KeyValue + '_ { + |kv| KeyValue { + key: self.key_strip_root(kv.key), + value: kv.value, } } - txn_res -} - -fn key_prepend_root(root: &[u8], mut key: Vec) -> Vec { - let mut new_key = root.to_vec(); - new_key.append(&mut key); - new_key -} - -// see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go -fn range_end_prepend_root(root: &[u8], mut range_end: Vec) -> Vec { - if range_end == [0] { - // the edge of the keyspace - let mut new_end = root.to_vec(); - let mut ok = false; - for i in (0..new_end.len()).rev() { - new_end[i] = new_end[i].wrapping_add(1); - if new_end[i] != 0 { - ok = true; - break; + fn chroot_txn_response(&self, mut txn_res: TxnResponse) -> TxnResponse { + for resp in txn_res.responses.iter_mut() { + match resp { + TxnOpResponse::ResponsePut(r) => { + r.prev_kv = r.prev_kv.take().map(self.chroot_key_value_with()); + } + TxnOpResponse::ResponseGet(r) => { + r.kvs = r.kvs.drain(..).map(self.chroot_key_value_with()).collect(); + } + TxnOpResponse::ResponseDelete(r) => { + r.prev_kvs = r + .prev_kvs + .drain(..) + .map(self.chroot_key_value_with()) + .collect(); + } } } - if !ok { - // 0xff..ff => 0x00 - new_end = vec![0]; - } - new_end - } else if !range_end.is_empty() { - let mut new_end = root.to_vec(); - new_end.append(&mut range_end); - new_end - } else { - vec![] + txn_res } -} -fn txn_prepend_root(root: &[u8], mut txn: Txn) -> Txn { - fn op_prepend_root(root: &[u8], op: TxnOp) -> TxnOp { - match op { - TxnOp::Put(k, v) => TxnOp::Put(key_prepend_root(root, k), v), - TxnOp::Get(k) => TxnOp::Get(key_prepend_root(root, k)), - TxnOp::Delete(k) => TxnOp::Delete(key_prepend_root(root, k)), + fn key_prepend_root(&self, mut key: Vec) -> Vec { + let mut new_key = self.root.clone(); + new_key.append(&mut key); + new_key + } + + // see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go + fn range_end_prepend_root(&self, mut range_end: Vec) -> Vec { + let root = &self.root; + if range_end == [0] { + // the edge of the keyspace + let mut new_end = root.clone(); + let mut ok = false; + for i in (0..new_end.len()).rev() { + new_end[i] = new_end[i].wrapping_add(1); + if new_end[i] != 0 { + ok = true; + break; + } + } + if !ok { + // 0xff..ff => 0x00 + new_end = vec![0]; + } + new_end + } else if !range_end.is_empty() { + let mut new_end = root.clone(); + new_end.append(&mut range_end); + new_end + } else { + vec![] } } - txn.req.success = txn - .req - .success - .drain(..) - .map(|op| op_prepend_root(root, op)) - .collect(); - - txn.req.failure = txn - .req - .failure - .drain(..) - .map(|op| op_prepend_root(root, op)) - .collect(); - - txn.req.compare = txn - .req - .compare - .drain(..) - .map(|cmp| super::txn::Compare { - key: key_prepend_root(root, cmp.key), - cmp: cmp.cmp, - target: cmp.target, - }) - .collect(); - - txn + fn txn_prepend_root(&self, mut txn: Txn) -> Txn { + let op_prepend_root = |op: TxnOp| match op { + TxnOp::Put(k, v) => TxnOp::Put(self.key_prepend_root(k), v), + TxnOp::Get(k) => TxnOp::Get(self.key_prepend_root(k)), + TxnOp::Delete(k) => TxnOp::Delete(self.key_prepend_root(k)), + }; + txn.req.success = txn.req.success.drain(..).map(op_prepend_root).collect(); + txn.req.failure = txn.req.failure.drain(..).map(op_prepend_root).collect(); + txn.req.compare = txn + .req + .compare + .drain(..) + .map(|cmp| super::txn::Compare { + key: self.key_prepend_root(cmp.key), + cmp: cmp.cmp, + target: cmp.target, + }) + .collect(); + txn + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::kv_backend::chroot::ChrootKvBackend; + use crate::kv_backend::memory::MemoryKvBackend; + + #[test] + fn test_prefix_key_and_range_end() { + fn run_test_case(pfx: &[u8], key: &[u8], end: &[u8], w_key: &[u8], w_end: &[u8]) { + let chroot = ChrootKvBackend::new(pfx.into(), Arc::new(MemoryKvBackend::new())); + assert_eq!(chroot.key_prepend_root(key.into()), w_key); + assert_eq!(chroot.range_end_prepend_root(end.into()), w_end); + } + + // single key + run_test_case(b"pfx/", b"a", b"", b"pfx/a", b""); + + // range + run_test_case(b"pfx/", b"abc", b"def", b"pfx/abc", b"pfx/def"); + + // one-sided range (HACK - b'/' + 1 = b'0') + run_test_case(b"pfx/", b"abc", b"\0", b"pfx/abc", b"pfx0"); + + // one-sided range, end of keyspace + run_test_case(b"\xFF\xFF", b"abc", b"\0", b"\xff\xffabc", b"\0"); + } } diff --git a/tests-integration/fixtures/etcd/docker-compose-standalone.yml b/tests-integration/fixtures/etcd/docker-compose-standalone.yml index 3700a42ba9..2f57f9b425 100644 --- a/tests-integration/fixtures/etcd/docker-compose-standalone.yml +++ b/tests-integration/fixtures/etcd/docker-compose-standalone.yml @@ -1,7 +1,7 @@ version: '3.8' services: etcd: - image: bitnami/etcd:latest + image: public.ecr.aws/bitnami/etcd:3.5 ports: - "2379:2379" - "2380:2380"