test: use EtcdStore in IT cases (#2734)

* test: use EtcdStore in IT cases

Signed-off-by: tison <wander4096@gmail.com>

* retrigger CI

Signed-off-by: tison <wander4096@gmail.com>

* refactor: KvPair can take etcd KeyValue

Signed-off-by: tison <wander4096@gmail.com>

* temporary use fork

Signed-off-by: tison <wander4096@gmail.com>

* drop cloned

Signed-off-by: tison <wander4096@gmail.com>

* chroot_key_value

Signed-off-by: tison <wander4096@gmail.com>

* chroot and prepend in each point

Signed-off-by: tison <wander4096@gmail.com>

* adjust call points

Signed-off-by: tison <wander4096@gmail.com>

* cargo clippy

Signed-off-by: tison <wander4096@gmail.com>

* point to upstream etcd-client

Signed-off-by: tison <wander4096@gmail.com>

* test etcd chroot

Signed-off-by: tison <wander4096@gmail.com>

* add NO_CHROOT constant

Signed-off-by: tison <wander4096@gmail.com>

* check

Signed-off-by: tison <wander4096@gmail.com>

* handle range end

Signed-off-by: tison <wander4096@gmail.com>

* handle special encoded key or range_end

Signed-off-by: tison <wander4096@gmail.com>

* fixup implementation

Signed-off-by: tison <wander4096@gmail.com>

* clippy

Signed-off-by: tison <wander4096@gmail.com>

* avoid test name conflict

Signed-off-by: tison <wander4096@gmail.com>

* chroot to kvbackend level

Signed-off-by: tison <wander4096@gmail.com>

* fixup all occurances

Signed-off-by: tison <wander4096@gmail.com>

* fix type

Signed-off-by: tison <wander4096@gmail.com>

* Update src/common/meta/src/kv_backend/txn.rs

* make github happy

---------

Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: LFC <990479+MichaelScofield@users.noreply.github.com>
This commit is contained in:
tison
2023-11-23 13:47:00 +08:00
committed by GitHub
parent 56fc77e573
commit 102e43aace
11 changed files with 356 additions and 75 deletions

View File

@@ -161,15 +161,18 @@ jobs:
uses: Swatinem/rust-cache@v2
- name: Install latest nextest release
uses: taiki-e/install-action@nextest
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install PyArrow Package
run: pip install pyarrow
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Collect coverage data
- name: Setup etcd server
working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env:
CARGO_BUILD_RUSTFLAGS: "-C link-arg=-fuse-ld=lld"
@@ -179,6 +182,7 @@ jobs:
GT_S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
GT_S3_ACCESS_KEY: ${{ secrets.S3_ACCESS_KEY }}
GT_S3_REGION: ${{ secrets.S3_REGION }}
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Codecov upload
uses: codecov/codecov-action@v2

18
Cargo.lock generated
View File

@@ -3016,9 +3016,9 @@ dependencies = [
[[package]]
name = "etcd-client"
version = "0.12.2"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4846d3e260468c0bb5244d74b7b9e81885cf255dbd7c74031aeb117a97e81ff2"
checksum = "f5231ad671c74ee5dc02753a0a9c855fe6e90de2a07acb2582f8a702470e04d1"
dependencies = [
"http",
"prost 0.12.2",
@@ -7001,7 +7001,7 @@ dependencies = [
"quick-xml 0.31.0",
"rand",
"reqwest",
"rsa 0.9.3",
"rsa 0.9.4",
"rust-ini 0.20.0",
"serde",
"serde_json",
@@ -7214,9 +7214,9 @@ dependencies = [
[[package]]
name = "rsa"
version = "0.9.3"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86ef35bf3e7fe15a53c4ab08a998e42271eab13eb0db224126bc7bc4c4bad96d"
checksum = "6a3211b01eea83d80687da9eef70e39d65144a3894866a5153a2723e425a157f"
dependencies = [
"const-oid 0.9.5",
"digest",
@@ -8035,18 +8035,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.192"
version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001"
checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.192"
version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1"
checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -12,11 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod etcd;
pub mod memory;
pub mod test;
pub mod txn;
use std::any::Any;
use std::sync::Arc;
@@ -32,6 +27,12 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
pub mod chroot;
pub mod etcd;
pub mod memory;
pub mod test;
pub mod txn;
pub type KvBackendRef = Arc<dyn KvBackend<Error = Error> + Send + Sync>;
#[async_trait]

View File

@@ -0,0 +1,256 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use crate::error::Error;
use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnResponse};
use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;
pub struct ChrootKvBackend {
root: Vec<u8>,
inner: KvBackendRef,
}
impl ChrootKvBackend {
pub fn new(root: Vec<u8>, inner: KvBackendRef) -> ChrootKvBackend {
debug_assert!(!root.is_empty());
ChrootKvBackend { root, inner }
}
}
#[async_trait::async_trait]
impl TxnService for ChrootKvBackend {
type Error = Error;
async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
let txn = txn_prepend_root(&self.root, txn);
let txn_res = self.inner.txn(txn).await?;
Ok(chroot_txn_response(&self.root, txn_res))
}
}
#[async_trait::async_trait]
impl KvBackend for ChrootKvBackend {
fn name(&self) -> &str {
self.inner.name()
}
fn as_any(&self) -> &dyn Any {
self
}
async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse, Self::Error> {
req.key = key_prepend_root(&self.root, req.key);
req.range_end = range_end_prepend_root(&self.root, req.range_end);
let mut res = self.inner.range(req).await?;
res.kvs = res
.kvs
.drain(..)
.map(chroot_key_value_with(&self.root))
.collect();
Ok(res)
}
async fn put(&self, mut req: PutRequest) -> Result<PutResponse, Self::Error> {
req.key = key_prepend_root(&self.root, req.key);
let mut res = self.inner.put(req).await?;
res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root));
Ok(res)
}
async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
for kv in req.kvs.iter_mut() {
kv.key = key_prepend_root(&self.root, kv.key.drain(..).collect());
}
let mut res = self.inner.batch_put(req).await?;
res.prev_kvs = res
.prev_kvs
.drain(..)
.map(chroot_key_value_with(&self.root))
.collect();
Ok(res)
}
async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
req.keys = req
.keys
.drain(..)
.map(|key| key_prepend_root(&self.root, key))
.collect();
let mut res = self.inner.batch_get(req).await?;
res.kvs = res
.kvs
.drain(..)
.map(chroot_key_value_with(&self.root))
.collect();
Ok(res)
}
async fn compare_and_put(
&self,
mut req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
req.key = key_prepend_root(&self.root, req.key);
let mut res = self.inner.compare_and_put(req).await?;
res.prev_kv = res.prev_kv.take().map(chroot_key_value_with(&self.root));
Ok(res)
}
async fn delete_range(
&self,
mut req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error> {
req.key = key_prepend_root(&self.root, req.key);
req.range_end = range_end_prepend_root(&self.root, req.range_end);
let mut res = self.inner.delete_range(req).await?;
res.prev_kvs = res
.prev_kvs
.drain(..)
.map(chroot_key_value_with(&self.root))
.collect();
Ok(res)
}
async fn batch_delete(
&self,
mut req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error> {
req.keys = req
.keys
.drain(..)
.map(|key| key_prepend_root(&self.root, key))
.collect();
let mut res = self.inner.batch_delete(req).await?;
res.prev_kvs = res
.prev_kvs
.drain(..)
.map(chroot_key_value_with(&self.root))
.collect();
Ok(res)
}
}
fn key_strip_root(root: &[u8], mut key: Vec<u8>) -> Vec<u8> {
debug_assert!(
key.starts_with(root),
"key={}, root={}",
String::from_utf8_lossy(&key),
String::from_utf8_lossy(root),
);
key.split_off(root.len())
}
fn chroot_key_value_with(root: &[u8]) -> impl FnMut(KeyValue) -> KeyValue + '_ {
|kv| KeyValue {
key: key_strip_root(root, kv.key),
value: kv.value,
}
}
fn chroot_txn_response(root: &[u8], mut txn_res: TxnResponse) -> TxnResponse {
for resp in txn_res.responses.iter_mut() {
match resp {
TxnOpResponse::ResponsePut(r) => {
r.prev_kv = r.prev_kv.take().map(chroot_key_value_with(root));
}
TxnOpResponse::ResponseGet(r) => {
r.kvs = r.kvs.drain(..).map(chroot_key_value_with(root)).collect();
}
TxnOpResponse::ResponseDelete(r) => {
r.prev_kvs = r
.prev_kvs
.drain(..)
.map(chroot_key_value_with(root))
.collect();
}
}
}
txn_res
}
fn key_prepend_root(root: &[u8], mut key: Vec<u8>) -> Vec<u8> {
let mut new_key = root.to_vec();
new_key.append(&mut key);
new_key
}
// see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go
fn range_end_prepend_root(root: &[u8], mut range_end: Vec<u8>) -> Vec<u8> {
if range_end == [0] {
// the edge of the keyspace
let mut new_end = root.to_vec();
let mut ok = false;
for i in (0..new_end.len()).rev() {
new_end[i] = new_end[i].wrapping_add(1);
if new_end[i] != 0 {
ok = true;
break;
}
}
if !ok {
// 0xff..ff => 0x00
new_end = vec![0];
}
new_end
} else if !range_end.is_empty() {
let mut new_end = root.to_vec();
new_end.append(&mut range_end);
new_end
} else {
vec![]
}
}
fn txn_prepend_root(root: &[u8], mut txn: Txn) -> Txn {
fn op_prepend_root(root: &[u8], op: TxnOp) -> TxnOp {
match op {
TxnOp::Put(k, v) => TxnOp::Put(key_prepend_root(root, k), v),
TxnOp::Get(k) => TxnOp::Get(key_prepend_root(root, k)),
TxnOp::Delete(k) => TxnOp::Delete(key_prepend_root(root, k)),
}
}
txn.req.success = txn
.req
.success
.drain(..)
.map(|op| op_prepend_root(root, op))
.collect();
txn.req.failure = txn
.req
.failure
.drain(..)
.map(|op| op_prepend_root(root, op))
.collect();
txn.req.compare = txn
.req
.compare
.drain(..)
.map(|cmp| super::txn::Compare {
key: key_prepend_root(root, cmp.key),
cmp: cmp.cmp,
target: cmp.target,
})
.collect();
txn
}

View File

@@ -33,36 +33,17 @@ use crate::rpc::store::{
};
use crate::rpc::KeyValue;
pub struct KvPair<'a>(&'a etcd_client::KeyValue);
impl<'a> KvPair<'a> {
/// Creates a `KvPair` from etcd KeyValue
#[inline]
pub fn new(kv: &'a etcd_client::KeyValue) -> Self {
Self(kv)
}
#[inline]
pub fn from_etcd_kv(kv: &etcd_client::KeyValue) -> KeyValue {
KeyValue::from(KvPair::new(kv))
}
}
impl<'a> From<KvPair<'a>> for KeyValue {
fn from(kv: KvPair<'a>) -> Self {
Self {
key: kv.0.key().to_vec(),
value: kv.0.value().to_vec(),
}
}
}
// Maximum number of operations permitted in a transaction.
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
const MAX_TXN_SIZE: usize = 128;
fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
let (key, value) = kv.into_key_value();
KeyValue { key, value }
}
pub struct EtcdStore {
client: Client,
}
@@ -124,7 +105,7 @@ impl KvBackend for EtcdStore {
async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
let Get { key, options } = req.try_into()?;
let res = self
let mut res = self
.client
.kv_client()
.get(key, options)
@@ -132,9 +113,9 @@ impl KvBackend for EtcdStore {
.context(error::EtcdFailedSnafu)?;
let kvs = res
.kvs()
.iter()
.map(KvPair::from_etcd_kv)
.take_kvs()
.into_iter()
.map(convert_key_value)
.collect::<Vec<_>>();
Ok(RangeResponse {
@@ -150,14 +131,14 @@ impl KvBackend for EtcdStore {
options,
} = req.try_into()?;
let res = self
let mut res = self
.client
.kv_client()
.put(key, value, options)
.await
.context(error::EtcdFailedSnafu)?;
let prev_kv = res.prev_key().map(KvPair::from_etcd_kv);
let prev_kv = res.take_prev_key().map(convert_key_value);
Ok(PutResponse { prev_kv })
}
@@ -166,7 +147,7 @@ impl KvBackend for EtcdStore {
let put_ops = kvs
.into_iter()
.map(|kv| (TxnOp::put(kv.key, kv.value, options.clone())))
.map(|kv| TxnOp::put(kv.key, kv.value, options.clone()))
.collect::<Vec<_>>();
let txn_responses = self.do_multi_txn(put_ops).await?;
@@ -175,9 +156,9 @@ impl KvBackend for EtcdStore {
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Put(put_res) => {
if let Some(prev_kv) = put_res.prev_key() {
prev_kvs.push(KvPair::from_etcd_kv(prev_kv));
TxnOpResponse::Put(mut put_res) => {
if let Some(prev_kv) = put_res.take_prev_key().map(convert_key_value) {
prev_kvs.push(prev_kv);
}
}
_ => unreachable!(),
@@ -193,7 +174,7 @@ impl KvBackend for EtcdStore {
let get_ops: Vec<_> = keys
.into_iter()
.map(|k| TxnOp::get(k, options.clone()))
.map(|key| TxnOp::get(key, options.clone()))
.collect();
let txn_responses = self.do_multi_txn(get_ops).await?;
@@ -201,12 +182,11 @@ impl KvBackend for EtcdStore {
let mut kvs = vec![];
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
let get_res = match op_res {
let mut get_res = match op_res {
TxnOpResponse::Get(get_res) => get_res,
_ => unreachable!(),
};
kvs.extend(get_res.kvs().iter().map(KvPair::from_etcd_kv));
kvs.extend(get_res.take_kvs().into_iter().map(convert_key_value));
}
}
@@ -252,8 +232,8 @@ impl KvBackend for EtcdStore {
})?;
let prev_kv = match op_res {
TxnOpResponse::Put(res) => res.prev_key().map(KvPair::from_etcd_kv),
TxnOpResponse::Get(res) => res.kvs().first().map(KvPair::from_etcd_kv),
TxnOpResponse::Put(mut res) => res.take_prev_key().map(convert_key_value),
TxnOpResponse::Get(mut res) => res.take_kvs().into_iter().next().map(convert_key_value),
_ => unreachable!(),
};
@@ -263,7 +243,7 @@ impl KvBackend for EtcdStore {
async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let Delete { key, options } = req.try_into()?;
let res = self
let mut res = self
.client
.kv_client()
.delete(key, options)
@@ -271,9 +251,9 @@ impl KvBackend for EtcdStore {
.context(error::EtcdFailedSnafu)?;
let prev_kvs = res
.prev_kvs()
.iter()
.map(KvPair::from_etcd_kv)
.take_prev_kvs()
.into_iter()
.map(convert_key_value)
.collect::<Vec<_>>();
Ok(DeleteRangeResponse {
@@ -289,7 +269,7 @@ impl KvBackend for EtcdStore {
let delete_ops = keys
.into_iter()
.map(|k| TxnOp::delete(k, options.clone()))
.map(|key| TxnOp::delete(key, options.clone()))
.collect::<Vec<_>>();
let txn_responses = self.do_multi_txn(delete_ops).await?;
@@ -297,10 +277,14 @@ impl KvBackend for EtcdStore {
for txn_res in txn_responses {
for op_res in txn_res.op_responses() {
match op_res {
TxnOpResponse::Delete(delete_res) => {
delete_res.prev_kvs().iter().for_each(|kv| {
prev_kvs.push(KvPair::from_etcd_kv(kv));
});
TxnOpResponse::Delete(mut delete_res) => {
delete_res
.take_prev_kvs()
.into_iter()
.map(convert_key_value)
.for_each(|kv| {
prev_kvs.push(kv);
});
}
_ => unreachable!(),
}

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod etcd;
use common_error::ext::ErrorExt;
use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
mod etcd;
#[async_trait::async_trait]
pub trait TxnService: Sync + Send {
type Error: ErrorExt;
@@ -122,7 +122,8 @@ pub struct TxnResponse {
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Txn {
req: TxnRequest,
// HACK - chroot would modify this field
pub(super) req: TxnRequest,
c_when: bool,
c_then: bool,
c_else: bool,

View File

@@ -0,0 +1,13 @@
version: '3.8'
services:
etcd:
image: bitnami/etcd:latest
ports:
- "2379:2379"
- "2380:2380"
environment:
ALLOW_NONE_AUTHENTICATION: "yes"
ETCD_NAME: etcd
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ETCD_ADVERTISE_CLIENT_URLS: http://etcd:2379
ETCD_MAX_REQUEST_BYTES: 10485760

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::Duration;
@@ -21,6 +22,8 @@ use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
@@ -63,10 +66,25 @@ pub struct GreptimeDbClusterBuilder {
}
impl GreptimeDbClusterBuilder {
pub fn new(cluster_name: &str) -> Self {
pub async fn new(cluster_name: &str) -> Self {
let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
let kv_backend: KvBackendRef = if endpoints.is_empty() {
Arc::new(MemoryKvBackend::new())
} else {
let endpoints = endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let backend = EtcdStore::with_endpoints(endpoints)
.await
.expect("malformed endpoints");
Arc::new(ChrootKvBackend::new(cluster_name.into(), backend))
};
Self {
cluster_name: cluster_name.to_string(),
kv_backend: Arc::new(MemoryKvBackend::new()),
kv_backend,
store_config: None,
datanodes: None,
}

View File

@@ -56,8 +56,10 @@ mod test {
#[tokio::test(flavor = "multi_thread")]
async fn test_distributed_put_influxdb_lines_without_time_column() {
let instance =
tests::create_distributed_instance("test_distributed_put_influxdb_lines").await;
let instance = tests::create_distributed_instance(
"test_distributed_put_influxdb_lines_without_time_column",
)
.await;
test_put_influxdb_lines_without_time_column(&instance.frontend()).await;
}

View File

@@ -42,6 +42,7 @@ impl MockDistributedInstance {
}
pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance {
let cluster = GreptimeDbClusterBuilder::new(test_name).build().await;
let builder = GreptimeDbClusterBuilder::new(test_name).await;
let cluster = builder.build().await;
MockDistributedInstance(cluster)
}

View File

@@ -89,7 +89,8 @@ pub async fn test_region_failover(store_type: StorageType) {
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 5u64;
let cluster = GreptimeDbClusterBuilder::new(cluster_name)
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.build()