fix: fix MockInstance rebuild issue (#3218)

* fix: fix MockInstance rebuild issue

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-24 16:52:47 +09:00
committed by GitHub
parent 1711ad4631
commit f82ddc9491
4 changed files with 183 additions and 83 deletions

View File

@@ -62,7 +62,8 @@ use crate::test_util::{
pub struct GreptimeDbCluster {
pub storage_guards: Vec<StorageGuard>,
pub _dir_guards: Vec<FileDirGuard>,
pub dir_guards: Vec<FileDirGuard>,
pub datanode_options: Vec<DatanodeOptions>,
pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
@@ -70,7 +71,6 @@ pub struct GreptimeDbCluster {
pub frontend: Arc<FeInstance>,
}
#[derive(Clone)]
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
@@ -157,9 +157,13 @@ impl GreptimeDbClusterBuilder {
self
}
pub async fn build(self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
pub async fn build_with(
&self,
datanode_options: Vec<DatanodeOptions>,
storage_guards: Vec<StorageGuard>,
dir_guards: Vec<FileDirGuard>,
) -> GreptimeDbCluster {
let datanodes = datanode_options.len();
let channel_config = ChannelConfig::new().timeout(Duration::from_secs(20));
let datanode_clients = Arc::new(DatanodeClients::new(channel_config));
@@ -182,8 +186,9 @@ impl GreptimeDbClusterBuilder {
)
.await;
let (datanode_instances, storage_guards, dir_guards) =
self.build_datanodes(meta_srv.clone(), datanodes).await;
let datanode_instances = self
.build_datanodes_with_options(&meta_srv, &datanode_options)
.await;
build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await;
@@ -199,8 +204,9 @@ impl GreptimeDbClusterBuilder {
frontend.start().await.unwrap();
GreptimeDbCluster {
datanode_options,
storage_guards,
_dir_guards: dir_guards,
dir_guards,
datanode_instances,
kv_backend: self.kv_backend.clone(),
meta_srv: meta_srv.meta_srv,
@@ -208,16 +214,19 @@ impl GreptimeDbClusterBuilder {
}
}
async fn build_datanodes(
pub async fn build(&self) -> GreptimeDbCluster {
let datanodes = self.datanodes.unwrap_or(4);
let (datanode_options, storage_guards, dir_guards) =
self.build_datanode_options_and_guards(datanodes).await;
self.build_with(datanode_options, storage_guards, dir_guards)
.await
}
async fn build_datanode_options_and_guards(
&self,
meta_srv: MockInfo,
datanodes: u32,
) -> (
HashMap<DatanodeId, Datanode>,
Vec<StorageGuard>,
Vec<FileDirGuard>,
) {
let mut instances = HashMap::with_capacity(datanodes as usize);
) -> (Vec<DatanodeOptions>, Vec<StorageGuard>, Vec<FileDirGuard>) {
let mut options = Vec::with_capacity(datanodes as usize);
let mut storage_guards = Vec::with_capacity(datanodes as usize);
let mut dir_guards = Vec::with_capacity(datanodes as usize);
@@ -258,28 +267,41 @@ impl GreptimeDbClusterBuilder {
};
opts.node_id = Some(datanode_id);
let datanode = self.create_datanode(opts, meta_srv.clone()).await;
instances.insert(datanode_id, datanode);
options.push(opts);
}
(
instances,
options,
storage_guards.into_iter().flatten().collect(),
dir_guards,
)
}
async fn build_datanodes_with_options(
&self,
meta_srv: &MockInfo,
options: &[DatanodeOptions],
) -> HashMap<DatanodeId, Datanode> {
let mut instances = HashMap::with_capacity(options.len());
for opts in options {
let datanode = self.create_datanode(opts.clone(), meta_srv.clone()).await;
instances.insert(opts.node_id.unwrap(), datanode);
}
instances
}
async fn wait_datanodes_alive(
&self,
meta_peer_client: &MetaPeerClientRef,
expected_datanodes: u32,
expected_datanodes: usize,
) {
for _ in 0..10 {
let alive_datanodes =
meta_srv::lease::filter_datanodes(1000, meta_peer_client, |_, _| true)
.await
.unwrap()
.len() as u32;
.len();
if alive_datanodes == expected_datanodes {
return;
}
@@ -355,7 +377,7 @@ impl GreptimeDbClusterBuilder {
async fn build_datanode_clients(
clients: Arc<DatanodeClients>,
instances: &HashMap<DatanodeId, Datanode>,
datanodes: u32,
datanodes: usize,
) {
for i in 0..datanodes {
let datanode_id = i as u64 + 1;

View File

@@ -22,13 +22,14 @@ use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::WalOptionsAllocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
@@ -39,12 +40,13 @@ use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, Test
pub struct GreptimeDbStandalone {
pub instance: Arc<Instance>,
pub datanode_opts: DatanodeOptions,
pub mix_options: MixOptions,
pub guard: TestGuard,
// Used in rebuild.
pub kv_backend: KvBackendRef,
pub procedure_manager: ProcedureManagerRef,
}
#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: DatanodeWalConfig,
@@ -104,31 +106,16 @@ impl GreptimeDbStandaloneBuilder {
self
}
pub async fn build(self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.unwrap_or_default();
pub async fn build_with(
&self,
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
guard: TestGuard,
mix_options: MixOptions,
) -> GreptimeDbStandalone {
let plugins = self.plugin.clone().unwrap_or_default();
let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);
let procedure_config = ProcedureConfig::default();
let kv_backend_config = KvBackendConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();
let plugins = self.plugin.unwrap_or_default();
let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone())
let datanode = DatanodeBuilder::new(mix_options.datanode.clone(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
@@ -145,9 +132,8 @@ impl GreptimeDbStandaloneBuilder {
.step(10)
.build(),
);
let wal_meta = self.meta_wal_config.clone();
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
wal_meta.clone(),
mix_options.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = TableMetadataAllocator::new(
@@ -168,11 +154,12 @@ impl GreptimeDbStandaloneBuilder {
.unwrap(),
);
let instance = FrontendBuilder::new(kv_backend, datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();
let instance =
FrontendBuilder::new(kv_backend.clone(), datanode_manager, ddl_task_executor)
.with_plugin(plugins)
.try_build()
.await
.unwrap();
procedure_manager.start().await.unwrap();
wal_options_allocator.start().await.unwrap();
@@ -183,17 +170,47 @@ impl GreptimeDbStandaloneBuilder {
GreptimeDbStandalone {
instance: Arc::new(instance),
datanode_opts: opts.clone(),
mix_options: MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
},
mix_options,
guard,
kv_backend,
procedure_manager,
}
}
pub async fn build(&self) -> GreptimeDbStandalone {
let default_store_type = self.default_store.unwrap_or(StorageType::File);
let store_types = self.store_providers.clone().unwrap_or_default();
let (opts, guard) = create_tmp_dir_and_datanode_opts(
Mode::Standalone,
default_store_type,
store_types,
&self.instance_name,
self.wal_config.clone(),
);
let kv_backend_config = KvBackendConfig::default();
let procedure_config = ProcedureConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
)
.await
.unwrap();
let wal_meta = self.meta_wal_config.clone();
let mix_options = MixOptions {
data_home: opts.storage.data_home.to_string(),
procedure: procedure_config,
metadata_store: kv_backend_config,
frontend: FrontendOptions::default(),
datanode: opts,
logging: LoggingOptions::default(),
wal_meta,
};
self.build_with(kv_backend, procedure_manager, guard, mix_options)
.await
}
}

View File

@@ -392,7 +392,7 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
None,
)
.with_metrics_handler(MetricsHandler)
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.build();
(http_server.build(http_server.make_app()), instance.guard)
}
@@ -463,7 +463,7 @@ pub async fn setup_test_prom_app_with_frontend(
)
.with_prom_handler(frontend_ref.clone(), true)
.with_prometheus_handler(frontend_ref)
.with_greptime_config_options(instance.datanode_opts.to_toml_string())
.with_greptime_config_options(instance.mix_options.datanode.to_toml_string())
.build();
let app = http_server.build(http_server.make_app());
(app, instance.guard)

View File

@@ -24,7 +24,7 @@ use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use frontend::instance::Instance;
use rstest_reuse::{self, template};
use crate::cluster::GreptimeDbClusterBuilder;
use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder};
use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
use crate::test_util::StorageType;
use crate::tests::{create_distributed_instance, MockDistributedInstance};
@@ -32,7 +32,7 @@ use crate::tests::{create_distributed_instance, MockDistributedInstance};
#[async_trait::async_trait]
pub(crate) trait RebuildableMockInstance: MockInstance {
// Rebuilds the instance and returns rebuilt frontend instance.
async fn rebuild(&mut self) -> Arc<Instance>;
async fn rebuild(&mut self);
}
pub(crate) trait MockInstance: Sync + Send {
@@ -68,19 +68,78 @@ pub(crate) enum MockInstanceBuilder {
Distributed(GreptimeDbClusterBuilder),
}
impl MockInstanceBuilder {
async fn build(&self) -> Arc<dyn MockInstance> {
pub(crate) enum MockInstanceImpl {
Standalone(GreptimeDbStandalone),
Distributed(GreptimeDbCluster),
}
impl MockInstance for MockInstanceImpl {
fn frontend(&self) -> Arc<Instance> {
match self {
MockInstanceBuilder::Standalone(builder) => Arc::new(builder.clone().build().await),
MockInstanceImpl::Standalone(instance) => instance.frontend(),
MockInstanceImpl::Distributed(instance) => instance.frontend.clone(),
}
}
fn is_distributed_mode(&self) -> bool {
matches!(self, &MockInstanceImpl::Distributed(_))
}
}
impl MockInstanceBuilder {
async fn build(&self) -> MockInstanceImpl {
match self {
MockInstanceBuilder::Standalone(builder) => {
MockInstanceImpl::Standalone(builder.build().await)
}
MockInstanceBuilder::Distributed(builder) => {
Arc::new(MockDistributedInstance(builder.clone().build().await))
MockInstanceImpl::Distributed(builder.build().await)
}
}
}
async fn rebuild(&self, instance: MockInstanceImpl) -> MockInstanceImpl {
match self {
MockInstanceBuilder::Standalone(builder) => {
let MockInstanceImpl::Standalone(instance) = instance else {
unreachable!()
};
let GreptimeDbStandalone {
mix_options,
guard,
kv_backend,
procedure_manager,
..
} = instance;
MockInstanceImpl::Standalone(
builder
.build_with(kv_backend, procedure_manager, guard, mix_options)
.await,
)
}
MockInstanceBuilder::Distributed(builder) => {
let MockInstanceImpl::Distributed(instance) = instance else {
unreachable!()
};
let GreptimeDbCluster {
storage_guards,
dir_guards,
datanode_options,
..
} = instance;
MockInstanceImpl::Distributed(
builder
.build_with(datanode_options, storage_guards, dir_guards)
.await,
)
}
}
}
}
pub(crate) struct TestContext {
instance: Arc<dyn MockInstance>,
instance: Option<MockInstanceImpl>,
builder: MockInstanceBuilder,
}
@@ -88,26 +147,28 @@ impl TestContext {
async fn new(builder: MockInstanceBuilder) -> Self {
let instance = builder.build().await;
Self { instance, builder }
Self {
instance: Some(instance),
builder,
}
}
}
#[async_trait::async_trait]
impl RebuildableMockInstance for TestContext {
async fn rebuild(&mut self) -> Arc<Instance> {
let instance = self.builder.build().await;
self.instance = instance;
self.instance.frontend()
async fn rebuild(&mut self) {
let instance = self.builder.rebuild(self.instance.take().unwrap()).await;
self.instance = Some(instance);
}
}
impl MockInstance for TestContext {
fn frontend(&self) -> Arc<Instance> {
self.instance.frontend()
self.instance.as_ref().unwrap().frontend()
}
fn is_distributed_mode(&self) -> bool {
self.instance.is_distributed_mode()
self.instance.as_ref().unwrap().is_distributed_mode()
}
}