From 128d3717fab12c4af362de1e4854b94c5c021685 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 2 Jan 2024 18:05:20 +0900 Subject: [PATCH] test(tests-integration): add a naive test with kafka wal (#3071) * chore(tests-integration): add setup tests with kafka wal to README.md * feat(tests-integration): add meta wal config * fix(tests-integration): fix sign of both_instances_cases_with_kafka_wal * chore(tests-integration): set num_topic to 3 for tests * test(tests-integration): add a naive test with kafka wal * chore: apply suggestions from CR --- .env.example | 2 + tests-integration/README.md | 25 ++++- tests-integration/src/cluster.rs | 37 ++++--- tests-integration/src/standalone.rs | 11 ++- tests-integration/src/tests.rs | 3 +- .../src/tests/instance_kafka_wal_test.rs | 97 +++++++++++++++++++ tests-integration/src/tests/test_util.rs | 37 +++++-- 7 files changed, 185 insertions(+), 27 deletions(-) create mode 100644 tests-integration/src/tests/instance_kafka_wal_test.rs diff --git a/.env.example b/.env.example index 4d45913df0..2f51a7cc65 100644 --- a/.env.example +++ b/.env.example @@ -19,3 +19,5 @@ GT_GCS_BUCKET = GCS bucket GT_GCS_SCOPE = GCS scope GT_GCS_CREDENTIAL_PATH = GCS credential path GT_GCS_ENDPOINT = GCS end point +# Settings for kafka wal test +GT_KAFKA_ENDPOINTS = localhost:9092 diff --git a/tests-integration/README.md b/tests-integration/README.md index 27f66da867..a1622174a1 100644 --- a/tests-integration/README.md +++ b/tests-integration/README.md @@ -1,4 +1,4 @@ -## Setup +## Setup tests for multiple storage backend To run the integration test, please copy `.env.example` to `.env` in the project root folder and change the values on need. @@ -13,7 +13,7 @@ GT_S3_ACCESS_KEY=S3 secret access key ``` -## Run +### Run Execute the following command in the project root folder: @@ -37,4 +37,23 @@ Test azblob storage: ``` cargo test azblob -``` \ No newline at end of file +``` + +## Setup tests with Kafka wal + +To run the integration test, please copy `.env.example` to `.env` in the project root folder and change the values on need. + +```sh +GT_KAFKA_ENDPOINTS = localhost:9092 +``` + +### Setup kafka standalone + +``` +cd tests-integration/fixtures/kafka + +docker compose -f docker-compose-standalone.yml up +``` + + + diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index ae54a00d98..336d1d68d0 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -31,6 +31,7 @@ use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; +use common_meta::wal::WalConfig as MetaWalConfig; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; @@ -73,6 +74,7 @@ pub struct GreptimeDbClusterBuilder { store_providers: Option>, datanodes: Option, wal_config: WalConfig, + meta_wal_config: MetaWalConfig, } impl GreptimeDbClusterBuilder { @@ -99,6 +101,7 @@ impl GreptimeDbClusterBuilder { store_providers: None, datanodes: None, wal_config: WalConfig::default(), + meta_wal_config: MetaWalConfig::default(), } } @@ -122,13 +125,29 @@ impl GreptimeDbClusterBuilder { self } + pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self { + self.meta_wal_config = wal_meta; + self + } + pub async fn build(self) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20)); let datanode_clients = Arc::new(DatanodeClients::new(channel_config)); - let meta_srv = self.build_metasrv(datanode_clients.clone()).await; + let opt = MetaSrvOptions { + procedure: ProcedureConfig { + // Due to large network delay during cross data-center. + // We only make max_retry_times and retry_delay large than the default in tests. + max_retry_times: 5, + retry_delay: Duration::from_secs(1), + }, + wal: self.meta_wal_config.clone(), + ..Default::default() + }; + + let meta_srv = self.build_metasrv(opt, datanode_clients.clone()).await; let (datanode_instances, storage_guards, dir_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; @@ -156,17 +175,11 @@ impl GreptimeDbClusterBuilder { } } - async fn build_metasrv(&self, datanode_clients: Arc) -> MockInfo { - let opt = MetaSrvOptions { - procedure: ProcedureConfig { - // Due to large network delay during cross data-center. - // We only make max_retry_times and retry_delay large than the default in tests. - max_retry_times: 5, - retry_delay: Duration::from_secs(1), - }, - ..Default::default() - }; - + async fn build_metasrv( + &self, + opt: MetaSrvOptions, + datanode_clients: Arc, + ) -> MockInfo { meta_srv::mocks::mock(opt, self.kv_backend.clone(), None, Some(datanode_clients)).await } diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 20348c462a..9c6956a6bb 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -24,7 +24,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::{WalConfig as MetaSrvWalConfig, WalOptionsAllocator}; +use common_meta::wal::{WalConfig as MetaWalConfig, WalOptionsAllocator}; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; @@ -47,6 +47,7 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, wal_config: WalConfig, + meta_wal_config: MetaWalConfig, store_providers: Option>, default_store: Option, plugin: Option, @@ -60,6 +61,7 @@ impl GreptimeDbStandaloneBuilder { plugin: None, default_store: None, wal_config: WalConfig::default(), + meta_wal_config: MetaWalConfig::default(), } } @@ -91,6 +93,11 @@ impl GreptimeDbStandaloneBuilder { self } + pub fn with_meta_wal_config(mut self, wal_meta: MetaWalConfig) -> Self { + self.meta_wal_config = wal_meta; + self + } + pub async fn build(self) -> GreptimeDbStandalone { let default_store_type = self.default_store.unwrap_or(StorageType::File); let store_types = self.store_providers.unwrap_or_default(); @@ -132,7 +139,7 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - let wal_meta = MetaSrvWalConfig::default(); + let wal_meta = self.meta_wal_config.clone(); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( wal_meta.clone(), kv_backend.clone(), diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 8d1e421738..22c0d591de 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -12,10 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod instance_kafka_wal_test; mod instance_test; mod promql_test; -// TODO(weny): Remove it. -#[allow(dead_code, unused_macros)] mod test_util; use std::collections::HashMap; diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs new file mode 100644 index 0000000000..c882f17f95 --- /dev/null +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_query::Output; +use common_recordbatch::util; +use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; +use frontend::error::Result; +use frontend::instance::Instance; +use rstest::rstest; +use rstest_reuse::apply; +use servers::query_handler::sql::SqlQueryHandler; +use session::context::{QueryContext, QueryContextRef}; + +use crate::tests::test_util::*; + +#[apply(both_instances_cases_with_kafka_wal)] +async fn test_create_database_and_insert_query(instance: Option>) { + let Some(instance) = instance else { return }; + + let instance = instance.frontend(); + + let output = execute_sql(&instance, "create database test").await; + assert!(matches!(output, Output::AffectedRows(1))); + + let output = execute_sql( + &instance, + r#"create table greptime.test.demo( + host STRING, + cpu DOUBLE, + memory DOUBLE, + ts timestamp, + TIME INDEX(ts) +)"#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); + + let output = execute_sql( + &instance, + r#"insert into test.demo(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await; + match query_output { + Output::Stream(s) => { + let batches = util::collect(s).await.unwrap(); + assert_eq!(1, batches[0].num_columns()); + assert_eq!( + Arc::new(TimestampMillisecondVector::from_vec(vec![ + 1655276557000_i64 + ])) as VectorRef, + *batches[0].column(0) + ); + } + _ => unreachable!(), + } +} + +async fn execute_sql(instance: &Arc, sql: &str) -> Output { + execute_sql_with(instance, sql, QueryContext::arc()).await +} + +async fn try_execute_sql_with( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> Result { + instance.do_query(sql, query_ctx).await.remove(0) +} + +async fn execute_sql_with( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> Output { + try_execute_sql_with(instance, sql, query_ctx) + .await + .unwrap() +} diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 32be423e69..5a2f41316b 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use common_config::wal::KafkaConfig; use common_config::WalConfig; +use common_meta::wal::kafka::KafkaConfig as MetaKafkaConfig; +use common_meta::wal::WalConfig as MetaWalConfig; use common_query::Output; use common_recordbatch::util; use common_telemetry::warn; @@ -61,6 +63,8 @@ impl MockInstance for MockDistributedInstance { } } +/// For test purpose. +#[allow(clippy::large_enum_variant)] pub(crate) enum MockInstanceBuilder { Standalone(GreptimeDbStandaloneBuilder), Distributed(GreptimeDbClusterBuilder), @@ -153,14 +157,22 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option>(); let test_name = uuid::Uuid::new_v4().to_string(); - let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka( - KafkaConfig { - broker_endpoints: endpoints, + let builder = GreptimeDbStandaloneBuilder::new(&test_name) + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), ..Default::default() - }, - )); + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { + broker_endpoints: endpoints, + topic_name_prefix: test_name.to_string(), + num_topics: 3, + ..Default::default() + })); let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await; Some(Box::new(instance)) } @@ -174,12 +186,21 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option>(); let test_name = uuid::Uuid::new_v4().to_string(); let builder = GreptimeDbClusterBuilder::new(&test_name) .await .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints.clone(), + ..Default::default() + })) + .with_meta_wal_config(MetaWalConfig::Kafka(MetaKafkaConfig { broker_endpoints: endpoints, + topic_name_prefix: test_name.to_string(), + num_topics: 3, ..Default::default() })); let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; @@ -195,7 +216,7 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option, + instance: Option>, ) { }