test(tests-integration): add a naive test with kafka wal (#3071)

* chore(tests-integration): add setup tests with kafka wal to README.md

* feat(tests-integration): add meta wal config

* fix(tests-integration): fix sign of both_instances_cases_with_kafka_wal

* chore(tests-integration): set num_topic to 3 for tests

* test(tests-integration): add a naive test with kafka wal

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-02 18:05:20 +09:00
committed by GitHub
parent 2b181e91e0
commit 128d3717fa
7 changed files with 185 additions and 27 deletions

View File

@@ -19,3 +19,5 @@ GT_GCS_BUCKET = GCS bucket
GT_GCS_SCOPE = GCS scope
GT_GCS_CREDENTIAL_PATH = GCS credential path
GT_GCS_ENDPOINT = GCS end point
# Settings for kafka wal test
GT_KAFKA_ENDPOINTS = localhost:9092

View File

@@ -1,4 +1,4 @@
## Setup
## Setup tests for multiple storage backend
To run the integration test, please copy `.env.example` to `.env` in the project root folder and change the values on need.
@@ -13,7 +13,7 @@ GT_S3_ACCESS_KEY=S3 secret access key
```
## Run
### Run
Execute the following command in the project root folder:
@@ -37,4 +37,23 @@ Test azblob storage:
```
cargo test azblob
```
```
## Setup tests with Kafka wal
To run the integration test, please copy `.env.example` to `.env` in the project root folder and change the values on need.
```sh
GT_KAFKA_ENDPOINTS = localhost:9092
```
### Setup kafka standalone
```
cd tests-integration/fixtures/kafka
docker compose -f docker-compose-standalone.yml up
```

View File

@@ -31,6 +31,7 @@ use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_meta::DatanodeId;
use common_runtime::Builder as RuntimeBuilder;
use common_test_util::temp_dir::create_temp_dir;
@@ -73,6 +74,7 @@ pub struct GreptimeDbClusterBuilder {
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
}
impl GreptimeDbClusterBuilder {
@@ -99,6 +101,7 @@ impl GreptimeDbClusterBuilder {
store_providers: None,
datanodes: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
}
}
@@ -122,13 +125,29 @@ impl GreptimeDbClusterBuilder {
self
}
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
}
pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));
let meta_srv = self.build_metasrv(datanode_clients.clone()).await;
let opt = MetaSrvOptions {
procedure: ProcedureConfig {
// Due to large network delay during cross data-center.
// We only make max_retry_times and retry_delay large than the default in tests.
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
},
wal: self.meta_wal_config.clone(),
..Default::default()
};
let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await;
let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
@@ -156,17 +175,11 @@ impl GreptimeDbClusterBuilder {
}
}
async fn build_metasrv(&self, datanode_clients: Arc<DatanodeClients>) -> MockInfo {
let opt = MetaSrvOptions {
procedure: ProcedureConfig {
// Due to large network delay during cross data-center.
// We only make max_retry_times and retry_delay large than the default in tests.
max_retry_times: 5,
retry_delay: Duration::from_secs(1),
},
..Default::default()
};
async fn build_metasrv(
&self,
opt: MetaSrvOptions,
datanode_clients: Arc<DatanodeClients>,
) -> MockInfo {
meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await
}

View File

@@ -24,7 +24,7 @@ use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{WalConfig as MetaSrvWalConfig, WalOptionsAllocator};
use common_meta::wal::{WalConfig as MetaWalConfig, WalOptionsAllocator};
use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::LoggingOptions;
use datanode::config::DatanodeOptions;
@@ -47,6 +47,7 @@ pub struct GreptimeDbStandalone {
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: WalConfig,
meta_wal_config: MetaWalConfig,
store_providers: Option<Vec<StorageType>>,
default_store: Option<StorageType>,
plugin: Option<Plugins>,
@@ -60,6 +61,7 @@ impl GreptimeDbStandaloneBuilder {
plugin: None,
default_store: None,
wal_config: WalConfig::default(),
meta_wal_config: MetaWalConfig::default(),
}
}
@@ -91,6 +93,11 @@ impl GreptimeDbStandaloneBuilder {
self
}
pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self {
self.meta_wal_config = wal_meta;
self
}
pub async fn build(self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();
@@ -132,7 +139,7 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let wal_meta = MetaSrvWalConfig::default();
let wal_meta = self.meta_wal_config.clone();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
wal_meta.clone(),
kv_backend.clone(),

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod instance_kafka_wal_test;
mod instance_test;
mod promql_test;
// TODO(weny): Remove it.
#[allow(dead_code, unused_macros)]
mod test_util;
use std::collections::HashMap;

View File

@@ -0,0 +1,97 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_query::Output;
use common_recordbatch::util;
use datatypes::vectors::{TimestampMillisecondVector, VectorRef};
use frontend::error::Result;
use frontend::instance::Instance;
use rstest::rstest;
use rstest_reuse::apply;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use crate::tests::test_util::*;
#[apply(both_instances_cases_with_kafka_wal)]
async fn test_create_database_and_insert_query(instance: Option<Box<dyn RebuildableMockInstance>>) {
let Some(instance) = instance else { return };
let instance = instance.frontend();
let output = execute_sql(&instance, "create database test").await;
assert!(matches!(output, Output::AffectedRows(1)));
let output = execute_sql(
&instance,
r#"create table greptime.test.demo(
host STRING,
cpu DOUBLE,
memory DOUBLE,
ts timestamp,
TIME INDEX(ts)
)"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = execute_sql(
&instance,
r#"insert into test.demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));
let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
Arc::new(TimestampMillisecondVector::from_vec(vec![
1655276557000_i64
])) as VectorRef,
*batches[0].column(0)
);
}
_ => unreachable!(),
}
}
async fn execute_sql(instance: &Arc<Instance>, sql: &str) -> Output {
execute_sql_with(instance, sql, QueryContext::arc()).await
}
async fn try_execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Result<Output> {
instance.do_query(sql, query_ctx).await.remove(0)
}
async fn execute_sql_with(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> Output {
try_execute_sql_with(instance, sql, query_ctx)
.await
.unwrap()
}

View File

@@ -17,6 +17,8 @@ use std::sync::Arc;
use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig;
use common_meta::wal::WalConfig as MetaWalConfig;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
@@ -61,6 +63,8 @@ impl MockInstance for MockDistributedInstance {
}
}
/// For test purpose.
#[allow(clippy::large_enum_variant)]
pub(crate) enum MockInstanceBuilder {
Standalone(GreptimeDbStandaloneBuilder),
Distributed(GreptimeDbClusterBuilder),
@@ -153,14 +157,22 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMoc
return None;
}
let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka(
KafkaConfig {
broker_endpoints: endpoints,
let builder = GreptimeDbStandaloneBuilder::new(&test_name)
.with_wal_config(WalConfig::Kafka(KafkaConfig {
broker_endpoints: endpoints.clone(),
..Default::default()
},
));
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await;
Some(Box::new(instance))
}
@@ -174,12 +186,21 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
return None;
}
let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let endpoints = endpoints
.split(',')
.map(|s| s.trim().to_string())
.collect::<Vec<_>>();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_wal_config(WalConfig::Kafka(KafkaConfig {
broker_endpoints: endpoints.clone(),
..Default::default()
}))
.with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig {
broker_endpoints: endpoints,
topic_name_prefix: test_name.to_string(),
num_topics: 3,
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await;
@@ -195,7 +216,7 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMo
pub(crate) fn both_instances_cases_with_kafka_wal(
#[future]
#[case]
instance: Arc<dyn MockInstance>,
instance: Option<Box<dyn RebuildableMockInstance>>,
) {
}