diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 76d026682b..73da3e75f8 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -161,15 +161,18 @@ jobs: uses: Swatinem/rust-cache@v2 - name: Install latest nextest release uses: taiki-e/install-action@nextest + - name: Install cargo-llvm-cov + uses: taiki-e/install-action@cargo-llvm-cov - name: Install Python uses: actions/setup-python@v4 with: python-version: '3.10' - name: Install PyArrow Package run: pip install pyarrow - - name: Install cargo-llvm-cov - uses: taiki-e/install-action@cargo-llvm-cov - - name: Collect coverage data + - name: Setup etcd server + working-directory: tests-integration/fixtures/etcd + run: docker compose -f docker-compose-standalone.yml up -d --wait + - name: Run nextest cases run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard env: CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld" @@ -179,6 +182,7 @@ jobs: GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }} GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }} GT_S3_REGION: ${{ secrets.S3_REGION }} + GT_ETCD_ENDPOINTS: http://127.0.0.1:2379 UNITTEST_LOG_DIR: "__unittest_logs" - name: Codecov upload uses: codecov/codecov-action@v2 diff --git a/Cargo.lock b/Cargo.lock index 5da1245210..def2104e34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3016,9 +3016,9 @@ dependencies = [ [[package]] name = "etcd-client" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4846d3e260468c0bb5244d74b7b9e81885cf255dbd7c74031aeb117a97e81ff2" +checksum = "f5231ad671c74ee5dc02753a0a9c855fe6e90de2a07acb2582f8a702470e04d1" dependencies = [ "http", "prost 0.12.2", @@ -7001,7 +7001,7 @@ dependencies = [ "quick-xml 0.31.0", "rand", "reqwest", - "rsa 0.9.3", + "rsa 0.9.4", "rust-ini 0.20.0", "serde", "serde_json", @@ -7214,9 +7214,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ef35bf3e7fe15a53c4ab08a998e42271eab13eb0db224126bc7bc4c4bad96d" +checksum = "6a3211b01eea83d80687da9eef70e39d65144a3894866a5153a2723e425a157f" dependencies = [ "const-oid 0.9.5", "digest", @@ -8035,18 +8035,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", diff --git a/src/common/meta/src/kv_backend.rs b/src/common/meta/src/kv_backend.rs index cf8cc89168..5c31ff2529 100644 --- a/src/common/meta/src/kv_backend.rs +++ b/src/common/meta/src/kv_backend.rs @@ -12,11 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod etcd; -pub mod memory; -pub mod test; -pub mod txn; - use std::any::Any; use std::sync::Arc; @@ -32,6 +27,12 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; +pub mod chroot; +pub mod etcd; +pub mod memory; +pub mod test; +pub mod txn; + pub type KvBackendRef = Arc + Send + Sync>; #[async_trait] diff --git a/src/common/meta/src/kv_backend/chroot.rs b/src/common/meta/src/kv_backend/chroot.rs new file mode 100644 index 0000000000..ba2e35f41b --- /dev/null +++ b/src/common/meta/src/kv_backend/chroot.rs @@ -0,0 +1,256 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use crate::error::Error; +use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnResponse}; +use crate::kv_backend::{KvBackend, KvBackendRef, TxnService}; +use crate::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, + BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest, + DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, +}; +use crate::rpc::KeyValue; + +pub struct ChrootKvBackend { + root: Vec, + inner: KvBackendRef, +} + +impl ChrootKvBackend { + pub fn new(root: Vec, inner: KvBackendRef) -> ChrootKvBackend { + debug_assert!(!root.is_empty()); + ChrootKvBackend { root, inner } + } +} + +#[async_trait::async_trait] +impl TxnService for ChrootKvBackend { + type Error = Error; + + async fn txn(&self, txn: Txn) -> Result { + let txn = txn_prepend_root(&self.root, txn); + let txn_res = self.inner.txn(txn).await?; + Ok(chroot_txn_response(&self.root, txn_res)) + } +} + +#[async_trait::async_trait] +impl KvBackend for ChrootKvBackend { + fn name(&self) -> &str { + self.inner.name() + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, mut req: RangeRequest) -> Result { + req.key = key_prepend_root(&self.root, req.key); + req.range_end = range_end_prepend_root(&self.root, req.range_end); + let mut res = self.inner.range(req).await?; + res.kvs = res + .kvs + .drain(..) + .map(chroot_key_value_with(&self.root)) + .collect(); + Ok(res) + } + + async fn put(&self, mut req: PutRequest) -> Result { + req.key = key_prepend_root(&self.root, req.key); + let mut res = self.inner.put(req).await?; + res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root)); + Ok(res) + } + + async fn batch_put(&self, mut req: BatchPutRequest) -> Result { + for kv in req.kvs.iter_mut() { + kv.key = key_prepend_root(&self.root, kv.key.drain(..).collect()); + } + let mut res = self.inner.batch_put(req).await?; + res.prev_kvs = res + .prev_kvs + .drain(..) + .map(chroot_key_value_with(&self.root)) + .collect(); + Ok(res) + } + + async fn batch_get(&self, mut req: BatchGetRequest) -> Result { + req.keys = req + .keys + .drain(..) + .map(|key| key_prepend_root(&self.root, key)) + .collect(); + let mut res = self.inner.batch_get(req).await?; + res.kvs = res + .kvs + .drain(..) + .map(chroot_key_value_with(&self.root)) + .collect(); + Ok(res) + } + + async fn compare_and_put( + &self, + mut req: CompareAndPutRequest, + ) -> Result { + req.key = key_prepend_root(&self.root, req.key); + let mut res = self.inner.compare_and_put(req).await?; + res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root)); + Ok(res) + } + + async fn delete_range( + &self, + mut req: DeleteRangeRequest, + ) -> Result { + req.key = key_prepend_root(&self.root, req.key); + req.range_end = range_end_prepend_root(&self.root, req.range_end); + let mut res = self.inner.delete_range(req).await?; + res.prev_kvs = res + .prev_kvs + .drain(..) + .map(chroot_key_value_with(&self.root)) + .collect(); + Ok(res) + } + + async fn batch_delete( + &self, + mut req: BatchDeleteRequest, + ) -> Result { + req.keys = req + .keys + .drain(..) + .map(|key| key_prepend_root(&self.root, key)) + .collect(); + let mut res = self.inner.batch_delete(req).await?; + res.prev_kvs = res + .prev_kvs + .drain(..) + .map(chroot_key_value_with(&self.root)) + .collect(); + Ok(res) + } +} + +fn key_strip_root(root: &[u8], mut key: Vec) -> Vec { + debug_assert!( + key.starts_with(root), + "key={}, root={}", + String::from_utf8_lossy(&key), + String::from_utf8_lossy(root), + ); + key.split_off(root.len()) +} + +fn chroot_key_value_with(root: &[u8]) -> impl FnMut(KeyValue) -> KeyValue + '_ { + |kv| KeyValue { + key: key_strip_root(root, kv.key), + value: kv.value, + } +} +fn chroot_txn_response(root: &[u8], mut txn_res: TxnResponse) -> TxnResponse { + for resp in txn_res.responses.iter_mut() { + match resp { + TxnOpResponse::ResponsePut(r) => { + r.prev_kv = r.prev_kv.take().map(chroot_key_value_with(root)); + } + TxnOpResponse::ResponseGet(r) => { + r.kvs = r.kvs.drain(..).map(chroot_key_value_with(root)).collect(); + } + TxnOpResponse::ResponseDelete(r) => { + r.prev_kvs = r + .prev_kvs + .drain(..) + .map(chroot_key_value_with(root)) + .collect(); + } + } + } + txn_res +} + +fn key_prepend_root(root: &[u8], mut key: Vec) -> Vec { + let mut new_key = root.to_vec(); + new_key.append(&mut key); + new_key +} + +// see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go +fn range_end_prepend_root(root: &[u8], mut range_end: Vec) -> Vec { + if range_end == [0] { + // the edge of the keyspace + let mut new_end = root.to_vec(); + let mut ok = false; + for i in (0..new_end.len()).rev() { + new_end[i] = new_end[i].wrapping_add(1); + if new_end[i] != 0 { + ok = true; + break; + } + } + if !ok { + // 0xff..ff => 0x00 + new_end = vec![0]; + } + new_end + } else if !range_end.is_empty() { + let mut new_end = root.to_vec(); + new_end.append(&mut range_end); + new_end + } else { + vec![] + } +} + +fn txn_prepend_root(root: &[u8], mut txn: Txn) -> Txn { + fn op_prepend_root(root: &[u8], op: TxnOp) -> TxnOp { + match op { + TxnOp::Put(k, v) => TxnOp::Put(key_prepend_root(root, k), v), + TxnOp::Get(k) => TxnOp::Get(key_prepend_root(root, k)), + TxnOp::Delete(k) => TxnOp::Delete(key_prepend_root(root, k)), + } + } + + txn.req.success = txn + .req + .success + .drain(..) + .map(|op| op_prepend_root(root, op)) + .collect(); + + txn.req.failure = txn + .req + .failure + .drain(..) + .map(|op| op_prepend_root(root, op)) + .collect(); + + txn.req.compare = txn + .req + .compare + .drain(..) + .map(|cmp| super::txn::Compare { + key: key_prepend_root(root, cmp.key), + cmp: cmp.cmp, + target: cmp.target, + }) + .collect(); + + txn +} diff --git a/src/common/meta/src/kv_backend/etcd.rs b/src/common/meta/src/kv_backend/etcd.rs index ee1ebb05aa..5becb4b332 100644 --- a/src/common/meta/src/kv_backend/etcd.rs +++ b/src/common/meta/src/kv_backend/etcd.rs @@ -33,36 +33,17 @@ use crate::rpc::store::{ }; use crate::rpc::KeyValue; -pub struct KvPair<'a>(&'a etcd_client::KeyValue); - -impl<'a> KvPair<'a> { - /// Creates a `KvPair` from etcd KeyValue - #[inline] - pub fn new(kv: &'a etcd_client::KeyValue) -> Self { - Self(kv) - } - - #[inline] - pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue { - KeyValue::from(KvPair::new(kv)) - } -} - -impl<'a> From> for KeyValue { - fn from(kv: KvPair<'a>) -> Self { - Self { - key: kv.0.key().to_vec(), - value: kv.0.value().to_vec(), - } - } -} - // Maximum number of operations permitted in a transaction. // The etcd default configuration's `--max-txn-ops` is 128. // // For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/ const MAX_TXN_SIZE: usize = 128; +fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue { + let (key, value) = kv.into_key_value(); + KeyValue { key, value } +} + pub struct EtcdStore { client: Client, } @@ -124,7 +105,7 @@ impl KvBackend for EtcdStore { async fn range(&self, req: RangeRequest) -> Result { let Get { key, options } = req.try_into()?; - let res = self + let mut res = self .client .kv_client() .get(key, options) @@ -132,9 +113,9 @@ impl KvBackend for EtcdStore { .context(error::EtcdFailedSnafu)?; let kvs = res - .kvs() - .iter() - .map(KvPair::from_etcd_kv) + .take_kvs() + .into_iter() + .map(convert_key_value) .collect::>(); Ok(RangeResponse { @@ -150,14 +131,14 @@ impl KvBackend for EtcdStore { options, } = req.try_into()?; - let res = self + let mut res = self .client .kv_client() .put(key, value, options) .await .context(error::EtcdFailedSnafu)?; - let prev_kv = res.prev_key().map(KvPair::from_etcd_kv); + let prev_kv = res.take_prev_key().map(convert_key_value); Ok(PutResponse { prev_kv }) } @@ -166,7 +147,7 @@ impl KvBackend for EtcdStore { let put_ops = kvs .into_iter() - .map(|kv| (TxnOp::put(kv.key, kv.value, options.clone()))) + .map(|kv| TxnOp::put(kv.key, kv.value, options.clone())) .collect::>(); let txn_responses = self.do_multi_txn(put_ops).await?; @@ -175,9 +156,9 @@ impl KvBackend for EtcdStore { for txn_res in txn_responses { for op_res in txn_res.op_responses() { match op_res { - TxnOpResponse::Put(put_res) => { - if let Some(prev_kv) = put_res.prev_key() { - prev_kvs.push(KvPair::from_etcd_kv(prev_kv)); + TxnOpResponse::Put(mut put_res) => { + if let Some(prev_kv) = put_res.take_prev_key().map(convert_key_value) { + prev_kvs.push(prev_kv); } } _ => unreachable!(), @@ -193,7 +174,7 @@ impl KvBackend for EtcdStore { let get_ops: Vec<_> = keys .into_iter() - .map(|k| TxnOp::get(k, options.clone())) + .map(|key| TxnOp::get(key, options.clone())) .collect(); let txn_responses = self.do_multi_txn(get_ops).await?; @@ -201,12 +182,11 @@ impl KvBackend for EtcdStore { let mut kvs = vec![]; for txn_res in txn_responses { for op_res in txn_res.op_responses() { - let get_res = match op_res { + let mut get_res = match op_res { TxnOpResponse::Get(get_res) => get_res, _ => unreachable!(), }; - - kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv)); + kvs.extend(get_res.take_kvs().into_iter().map(convert_key_value)); } } @@ -252,8 +232,8 @@ impl KvBackend for EtcdStore { })?; let prev_kv = match op_res { - TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv), - TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv), + TxnOpResponse::Put(mut res) => res.take_prev_key().map(convert_key_value), + TxnOpResponse::Get(mut res) => res.take_kvs().into_iter().next().map(convert_key_value), _ => unreachable!(), }; @@ -263,7 +243,7 @@ impl KvBackend for EtcdStore { async fn delete_range(&self, req: DeleteRangeRequest) -> Result { let Delete { key, options } = req.try_into()?; - let res = self + let mut res = self .client .kv_client() .delete(key, options) @@ -271,9 +251,9 @@ impl KvBackend for EtcdStore { .context(error::EtcdFailedSnafu)?; let prev_kvs = res - .prev_kvs() - .iter() - .map(KvPair::from_etcd_kv) + .take_prev_kvs() + .into_iter() + .map(convert_key_value) .collect::>(); Ok(DeleteRangeResponse { @@ -289,7 +269,7 @@ impl KvBackend for EtcdStore { let delete_ops = keys .into_iter() - .map(|k| TxnOp::delete(k, options.clone())) + .map(|key| TxnOp::delete(key, options.clone())) .collect::>(); let txn_responses = self.do_multi_txn(delete_ops).await?; @@ -297,10 +277,14 @@ impl KvBackend for EtcdStore { for txn_res in txn_responses { for op_res in txn_res.op_responses() { match op_res { - TxnOpResponse::Delete(delete_res) => { - delete_res.prev_kvs().iter().for_each(|kv| { - prev_kvs.push(KvPair::from_etcd_kv(kv)); - }); + TxnOpResponse::Delete(mut delete_res) => { + delete_res + .take_prev_kvs() + .into_iter() + .map(convert_key_value) + .for_each(|kv| { + prev_kvs.push(kv); + }); } _ => unreachable!(), } diff --git a/src/common/meta/src/kv_backend/txn.rs b/src/common/meta/src/kv_backend/txn.rs index b07fe1ae33..d69a1e1992 100644 --- a/src/common/meta/src/kv_backend/txn.rs +++ b/src/common/meta/src/kv_backend/txn.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod etcd; - use common_error::ext::ErrorExt; use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse}; +mod etcd; + #[async_trait::async_trait] pub trait TxnService: Sync + Send { type Error: ErrorExt; @@ -122,7 +122,8 @@ pub struct TxnResponse { #[derive(Debug, Clone, Default, PartialEq)] pub struct Txn { - req: TxnRequest, + // HACK - chroot would modify this field + pub(super) req: TxnRequest, c_when: bool, c_then: bool, c_else: bool, diff --git a/tests-integration/fixtures/etcd/docker-compose-standalone.yml b/tests-integration/fixtures/etcd/docker-compose-standalone.yml new file mode 100644 index 0000000000..3700a42ba9 --- /dev/null +++ b/tests-integration/fixtures/etcd/docker-compose-standalone.yml @@ -0,0 +1,13 @@ +version: '3.8' +services: + etcd: + image: bitnami/etcd:latest + ports: + - "2379:2379" + - "2380:2380" + environment: + ALLOW_NONE_AUTHENTICATION: "yes" + ETCD_NAME: etcd + ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379 + ETCD_ADVERTISE_CLIENT_URLS: http://etcd:2379 + ETCD_MAX_REQUEST_BYTES: 10485760 diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 3e2c1383a7..2f84fd756d 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::env; use std::sync::Arc; use std::time::Duration; @@ -21,6 +22,8 @@ use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::kv_backend::chroot::ChrootKvBackend; +use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; @@ -63,10 +66,25 @@ pub struct GreptimeDbClusterBuilder { } impl GreptimeDbClusterBuilder { - pub fn new(cluster_name: &str) -> Self { + pub async fn new(cluster_name: &str) -> Self { + let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); + + let kv_backend: KvBackendRef = if endpoints.is_empty() { + Arc::new(MemoryKvBackend::new()) + } else { + let endpoints = endpoints + .split(',') + .map(|s| s.to_string()) + .collect::>(); + let backend = EtcdStore::with_endpoints(endpoints) + .await + .expect("malformed endpoints"); + Arc::new(ChrootKvBackend::new(cluster_name.into(), backend)) + }; + Self { cluster_name: cluster_name.to_string(), - kv_backend: Arc::new(MemoryKvBackend::new()), + kv_backend, store_config: None, datanodes: None, } diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index 8848c0a679..3dc647bd51 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -56,8 +56,10 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_distributed_put_influxdb_lines_without_time_column() { - let instance = - tests::create_distributed_instance("test_distributed_put_influxdb_lines").await; + let instance = tests::create_distributed_instance( + "test_distributed_put_influxdb_lines_without_time_column", + ) + .await; test_put_influxdb_lines_without_time_column(&instance.frontend()).await; } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 2781d04b2f..692a9de8d6 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -42,6 +42,7 @@ impl MockDistributedInstance { } pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance { - let cluster = GreptimeDbClusterBuilder::new(test_name).build().await; + let builder = GreptimeDbClusterBuilder::new(test_name).await; + let cluster = builder.build().await; MockDistributedInstance(cluster) } diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 612a1fd4ab..47dac1ca31 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -89,7 +89,8 @@ pub async fn test_region_failover(store_type: StorageType) { let (store_config, _guard) = get_test_store_config(&store_type); let datanodes = 5u64; - let cluster = GreptimeDbClusterBuilder::new(cluster_name) + let builder = GreptimeDbClusterBuilder::new(cluster_name).await; + let cluster = builder .with_datanodes(datanodes as u32) .with_store_config(store_config) .build()