feat: add kafka wal integration test utils (#3069)

* feat(tests-integration): add wal_config

* feat: add kafka wal integration test utils
This commit is contained in:
Weny Xu
2024-01-02 16:38:43 +09:00
committed by GitHub
parent 5653389063
commit d87ab06b28
6 changed files with 161 additions and 10 deletions

View File

@@ -22,6 +22,7 @@ use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
use common_config::WalConfig;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
@@ -64,12 +65,14 @@ pub struct GreptimeDbCluster {
pub frontend: Arc<FeInstance>,
}
#[derive(Clone)]
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
wal_config: WalConfig,
}
impl GreptimeDbClusterBuilder {
@@ -95,6 +98,7 @@ impl GreptimeDbClusterBuilder {
store_config: None,
store_providers: None,
datanodes: None,
wal_config: WalConfig::default(),
}
}
@@ -113,6 +117,11 @@ impl GreptimeDbClusterBuilder {
self
}
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}
pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
@@ -176,19 +185,27 @@ impl GreptimeDbClusterBuilder {
for i in 0..datanodes {
let datanode_id = i as u64 + 1;
let mode = Mode::Distributed;
let mut opts = if let Some(store_config) = &self.store_config {
let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
dir_guards.push(FileDirGuard::new(home_tmp_dir));
create_datanode_opts(store_config.clone(), vec![], home_dir)
create_datanode_opts(
mode,
store_config.clone(),
vec![],
home_dir,
self.wal_config.clone(),
)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
mode,
StorageType::File,
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
self.wal_config.clone(),
);
storage_guards.push(guard.storage_guards);
@@ -197,7 +214,6 @@ impl GreptimeDbClusterBuilder {
opts
};
opts.node_id = Some(datanode_id);
opts.mode = Mode::Distributed;
let datanode = self.create_datanode(opts, meta_srv.clone()).await;

View File

@@ -21,7 +21,7 @@ mod otlp;
mod prom_store;
pub mod test_util;
mod standalone;
pub mod standalone;
#[cfg(test)]
mod tests;

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use cmd::options::MixOptions;
use common_base::Plugins;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::KvBackendConfig;
use common_config::{KvBackendConfig, WalConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl_manager::DdlManager;
@@ -32,6 +32,7 @@ use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager};
use servers::Mode;
use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard};
@@ -42,8 +43,10 @@ pub struct GreptimeDbStandalone {
pub guard: TestGuard,
}
#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: WalConfig,
store_providers: Option<Vec<StorageType>>,
default_store: Option<StorageType>,
plugin: Option<Plugins>,
@@ -56,6 +59,7 @@ impl GreptimeDbStandaloneBuilder {
store_providers: None,
plugin: None,
default_store: None,
wal_config: WalConfig::default(),
}
}
@@ -82,12 +86,22 @@ impl GreptimeDbStandaloneBuilder {
}
}
pub fn with_wal_config(mut self, wal_config: WalConfig) -> Self {
self.wal_config = wal_config;
self
}
pub async fn build(self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();
let (opts, guard) =
create_tmp_dir_and_datanode_opts(default_store_type, store_types, &self.instance_name);
let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);
let procedure_config = ProcedureConfig::default();
let kv_backend_config = KvBackendConfig::default();

View File

@@ -21,6 +21,7 @@ use std::time::Duration;
use auth::UserProviderRef;
use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use common_config::WalConfig;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::Output;
@@ -294,9 +295,11 @@ impl TestGuard {
}
pub fn create_tmp_dir_and_datanode_opts(
mode: Mode,
default_store_type: StorageType,
store_provider_types: Vec<StorageType>,
name: &str,
wal_config: WalConfig,
) -> (DatanodeOptions, TestGuard) {
let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let home_dir = home_tmp_dir.path().to_str().unwrap().to_string();
@@ -314,7 +317,7 @@ pub fn create_tmp_dir_and_datanode_opts(
store_providers.push(store);
storage_guards.push(StorageGuard(data_tmp_dir))
}
let opts = create_datanode_opts(default_store, store_providers, home_dir);
let opts = create_datanode_opts(mode, default_store, store_providers, home_dir, wal_config);
(
opts,
@@ -326,9 +329,11 @@ pub fn create_tmp_dir_and_datanode_opts(
}
pub(crate) fn create_datanode_opts(
mode: Mode,
default_store: ObjectStoreConfig,
providers: Vec<ObjectStoreConfig>,
home_dir: String,
wal_config: WalConfig,
) -> DatanodeOptions {
DatanodeOptions {
node_id: Some(0),
@@ -339,7 +344,8 @@ pub(crate) fn create_datanode_opts(
store: default_store,
..Default::default()
},
mode: Mode::Standalone,
mode,
wal: wal_config,
..Default::default()
}
}

View File

@@ -14,6 +14,8 @@
mod instance_test;
mod promql_test;
// TODO(weny): Remove it.
#[allow(dead_code, unused_macros)]
mod test_util;
use std::collections::HashMap;

View File

@@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::env;
use std::sync::Arc;
use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use rstest_reuse::{self, template};
@@ -25,7 +29,13 @@ use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
use crate::test_util::StorageType;
use crate::tests::{create_distributed_instance, MockDistributedInstance};
pub(crate) trait MockInstance {
#[async_trait::async_trait]
pub(crate) trait RebuildableMockInstance: MockInstance {
// Rebuilds the instance and returns rebuilt frontend instance.
async fn rebuild(&mut self) -> Arc<Instance>;
}
pub(crate) trait MockInstance: Sync + Send {
fn frontend(&self) -> Arc<Instance>;
fn is_distributed_mode(&self) -> bool;
@@ -51,6 +61,54 @@ impl MockInstance for MockDistributedInstance {
}
}
pub(crate) enum MockInstanceBuilder {
Standalone(GreptimeDbStandaloneBuilder),
Distributed(GreptimeDbClusterBuilder),
}
impl MockInstanceBuilder {
async fn build(&self) -> Arc<dyn MockInstance> {
match self {
MockInstanceBuilder::Standalone(builder) => Arc::new(builder.clone().build().await),
MockInstanceBuilder::Distributed(builder) => {
Arc::new(MockDistributedInstance(builder.clone().build().await))
}
}
}
}
pub(crate) struct TestContext {
instance: Arc<dyn MockInstance>,
builder: MockInstanceBuilder,
}
impl TestContext {
async fn new(builder: MockInstanceBuilder) -> Self {
let instance = builder.build().await;
Self { instance, builder }
}
}
#[async_trait::async_trait]
impl RebuildableMockInstance for TestContext {
async fn rebuild(&mut self) -> Arc<Instance> {
let instance = self.builder.build().await;
self.instance = instance;
self.instance.frontend()
}
}
impl MockInstance for TestContext {
fn frontend(&self) -> Arc<Instance> {
self.instance.frontend()
}
fn is_distributed_mode(&self) -> bool {
self.instance.is_distributed_mode()
}
}
pub(crate) async fn standalone() -> Arc<dyn MockInstance> {
let test_name = uuid::Uuid::new_v4().to_string();
let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await;
@@ -86,6 +144,61 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc<dyn MockIns
Arc::new(MockDistributedInstance(cluster))
}
pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMockInstance>> {
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return None;
}
let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka(
KafkaConfig {
broker_endpoints: endpoints,
..Default::default()
},
));
let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await;
Some(Box::new(instance))
}
pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMockInstance>> {
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return None;
}
let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_wal_config(WalConfig::Kafka(KafkaConfig {
broker_endpoints: endpoints,
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await;
Some(Box::new(instance))
}
#[template]
#[rstest]
#[case::test_with_standalone(standalone_with_kafka_wal())]
#[case::test_with_distributed(distributed_with_kafka_wal())]
#[awt]
#[tokio::test(flavor = "multi_thread")]
pub(crate) fn both_instances_cases_with_kafka_wal(
#[future]
#[case]
instance: Arc<dyn MockInstance>,
) {
}
#[template]
#[rstest]
#[case::test_with_standalone(standalone_with_multiple_object_stores())]