// Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. use std::collections::{BTreeSet, HashMap}; use std::env; use std::net::TcpListener; use std::ops::RangeInclusive; use std::sync::Arc; use std::time::Duration; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; use cache::{ build_datanode_cache_registry, build_fundamental_cache_registry, with_default_composite_cache_registry, }; use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; use catalog::process_manager::ProcessManager; use client::Client; use client::client_manager::NodeClients; use cmd::frontend::create_heartbeat_task; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::DatanodeId; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use common_runtime::Builder as RuntimeBuilder; use common_runtime::runtime::BuilderBuild; use common_test_util::temp_dir::create_temp_dir; use common_time::util::DefaultSystemTimer; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig}; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::instance::Instance as FeInstance; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; use futures::TryStreamExt; use hyper_util::rt::TokioIo; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::discovery; use meta_srv::gc::GcSchedulerOptions; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; use mito2::gc::GcConfig; use mito2::region::MitoRegionRef; use object_store::config::ObjectStoreConfig; use rand::Rng; use servers::grpc::GrpcOptions; use servers::grpc::flight::FlightCraftWrapper; use servers::grpc::region_server::RegionServerRequestHandler; use servers::server::ServerHandlers; use store_api::storage::RegionId; use tempfile::TempDir; use tonic::codec::CompressionEncoding; use tonic::transport::Server; use tower::service_fn; use uuid::Uuid; use crate::test_util::{ self, FileDirGuard, PEER_PLACEHOLDER_ADDR, StorageType, TestGuard, create_datanode_opts, create_tmp_dir_and_datanode_opts, }; pub struct GreptimeDbCluster { pub guards: Vec, pub datanode_options: Vec, pub datanode_instances: HashMap, pub kv_backend: KvBackendRef, pub metasrv: Arc, pub frontend: Arc, } impl GreptimeDbCluster { pub fn fe_instance(&self) -> &Arc { &self.frontend.instance } /// List all SST files from all datanodes. pub async fn list_sst_files_from_all_datanodes(&self) -> BTreeSet { let mut sst_files = BTreeSet::new(); for datanode in self.datanode_instances.values() { let region_server = datanode.region_server(); let mito = region_server.mito_engine().unwrap(); let all_files = mito .all_ssts_from_storage() .try_collect::>() .await .unwrap() .into_iter() .map(|e| e.file_path) .collect::>(); sst_files.extend(all_files); } sst_files } /// List all SST files from the manifests of all datanodes. pub async fn list_sst_files_from_manifests(&self) -> BTreeSet { let mut sst_files = BTreeSet::new(); for datanode in self.datanode_instances.values() { let region_server = datanode.region_server(); let mito = region_server.mito_engine().unwrap(); let all_files = mito .all_ssts_from_manifest() .await .into_iter() .flat_map(|e| { if e.index_file_path.is_some() { vec![e.file_path, e.index_file_path.unwrap()] } else { vec![e.file_path] } }) .collect::>(); sst_files.extend(all_files); } sst_files } pub async fn list_all_regions(&self) -> HashMap { let mut regions = HashMap::new(); for datanode in self.datanode_instances.values() { let region_server = datanode.region_server(); let mito = region_server.mito_engine().unwrap(); for region in mito.regions() { regions.insert(region.region_id(), region); } } regions } } pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_backend: KvBackendRef, store_config: Option, store_providers: Option>, datanodes: Option, datanode_wal_config: DatanodeWalConfig, metasrv_wal_config: MetasrvWalConfig, datanode_gc_config: GcConfig, metasrv_gc_config: GcSchedulerOptions, shared_home_dir: Option>, meta_selector: Option, } impl GreptimeDbClusterBuilder { pub async fn new(cluster_name: &str) -> Self { let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default(); let kv_backend: KvBackendRef = if endpoints.is_empty() { Arc::new(MemoryKvBackend::new()) } else { let endpoints = endpoints .split(',') .map(|s| s.to_string()) .collect::>(); let backend = EtcdStore::with_endpoints(endpoints, 128) .await .expect("malformed endpoints"); // Each retry requires a new isolation namespace. let chroot = format!("{}{}", cluster_name, Uuid::new_v4()); Arc::new(ChrootKvBackend::new(chroot.into(), backend)) }; Self { cluster_name: cluster_name.to_string(), kv_backend, store_config: None, store_providers: None, datanodes: None, datanode_wal_config: DatanodeWalConfig::default(), metasrv_wal_config: MetasrvWalConfig::default(), datanode_gc_config: GcConfig::default(), metasrv_gc_config: GcSchedulerOptions::default(), shared_home_dir: None, meta_selector: None, } } #[must_use] pub fn with_store_config(mut self, store_config: ObjectStoreConfig) -> Self { self.store_config = Some(store_config); self } #[must_use] pub fn with_store_providers(mut self, store_providers: Vec) -> Self { self.store_providers = Some(store_providers); self } #[must_use] pub fn with_datanodes(mut self, datanodes: u32) -> Self { self.datanodes = Some(datanodes); self } #[must_use] pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self { self.datanode_wal_config = datanode_wal_config; self } #[must_use] pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetasrvWalConfig) -> Self { self.metasrv_wal_config = metasrv_wal_config; self } #[must_use] pub fn with_datanode_gc_config(mut self, datanode_gc_config: GcConfig) -> Self { self.datanode_gc_config = datanode_gc_config; self } pub fn with_metasrv_gc_config(mut self, metasrv_gc_config: GcSchedulerOptions) -> Self { self.metasrv_gc_config = metasrv_gc_config; self } #[must_use] pub fn with_shared_home_dir(mut self, shared_home_dir: Arc) -> Self { self.shared_home_dir = Some(shared_home_dir); self } #[must_use] pub fn with_meta_selector(mut self, selector: SelectorRef) -> Self { self.meta_selector = Some(selector); self } pub async fn build_with( &self, datanode_options: Vec, start_frontend_servers: bool, guards: Vec, ) -> GreptimeDbCluster { let datanodes = datanode_options.len(); let channel_config = ChannelConfig::new().timeout(Some(Duration::from_secs(20))); let datanode_clients = Arc::new(NodeClients::new(channel_config)); let opt = MetasrvOptions { procedure: ProcedureConfig { // Due to large network delay during cross data-center. // We only make max_retry_times and retry_delay large than the default in tests. max_retry_times: 5, retry_delay: Duration::from_secs(1), max_metadata_value_size: None, max_running_procedures: 128, }, wal: self.metasrv_wal_config.clone(), grpc: GrpcOptions { server_addr: "127.0.0.1:3002".to_string(), ..Default::default() }, gc: self.metasrv_gc_config.clone(), ..Default::default() }; let metasrv = meta_srv::mocks::mock( opt, self.kv_backend.clone(), self.meta_selector.clone(), Some(datanode_clients.clone()), None, ) .await; let datanode_instances = self .build_datanodes_with_options(&metasrv, &datanode_options) .await; build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await; self.wait_datanodes_alive(metasrv.metasrv.meta_peer_client(), datanodes) .await; let mut frontend = self .build_frontend(metasrv.clone(), datanode_clients, start_frontend_servers) .await; test_util::prepare_another_catalog_and_schema(&frontend.instance).await; frontend.start().await.unwrap(); GreptimeDbCluster { datanode_options, guards, datanode_instances, kv_backend: self.kv_backend.clone(), metasrv: metasrv.metasrv, frontend: Arc::new(frontend), } } pub async fn build(&self, start_frontend_servers: bool) -> GreptimeDbCluster { let datanodes = self.datanodes.unwrap_or(4); let (datanode_options, guards) = self.build_datanode_options_and_guards(datanodes).await; self.build_with(datanode_options, start_frontend_servers, guards) .await } async fn build_datanode_options_and_guards( &self, datanodes: u32, ) -> (Vec, Vec) { let mut options = Vec::with_capacity(datanodes as usize); let mut guards = Vec::with_capacity(datanodes as usize); for i in 0..datanodes { let datanode_id = i as u64 + 1; let mut opts = if let Some(store_config) = &self.store_config { let home_dir = if let Some(home_dir) = &self.shared_home_dir { home_dir.path().to_str().unwrap().to_string() } else { let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); guards.push(TestGuard { home_guard: FileDirGuard::new(home_tmp_dir), storage_guards: Vec::new(), }); home_dir }; create_datanode_opts( store_config.clone(), vec![], home_dir, self.datanode_wal_config.clone(), self.datanode_gc_config.clone(), ) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( StorageType::File, self.store_providers.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), self.datanode_wal_config.clone(), self.datanode_gc_config.clone(), ); guards.push(guard); opts }; opts.node_id = Some(datanode_id); options.push(opts); } (options, guards) } async fn build_datanodes_with_options( &self, metasrv: &MockInfo, options: &[DatanodeOptions], ) -> HashMap { let mut instances = HashMap::with_capacity(options.len()); for opts in options { let datanode = self.create_datanode(opts.clone(), metasrv.clone()).await; instances.insert(opts.node_id.unwrap(), datanode); } instances } async fn wait_datanodes_alive( &self, meta_peer_client: &MetaPeerClientRef, expected_datanodes: usize, ) { for _ in 0..100 { let alive_datanodes = discovery::utils::alive_datanodes( &DefaultSystemTimer, meta_peer_client.as_ref(), Duration::from_secs(u64::MAX), None, ) .await .unwrap() .len(); if alive_datanodes == expected_datanodes { return; } tokio::time::sleep(Duration::from_micros(100)).await } panic!("Some Datanodes are not alive in 10 seconds!") } async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode { let mut meta_client = MetaClientBuilder::datanode_default_options(opts.node_id.unwrap()) .channel_manager(metasrv.channel_manager) .build(); meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); let meta_backend = Arc::new(MetaKvBackend { client: meta_client.clone(), }); let layered_cache_registry = Arc::new( LayeredCacheRegistryBuilder::default() .add_cache_registry(build_datanode_cache_registry(meta_backend.clone())) .build(), ); let mut builder = DatanodeBuilder::new(opts, Plugins::default(), meta_backend); builder .with_cache_registry(layered_cache_registry) .with_meta_client(meta_client); let mut datanode = builder.build().await.unwrap(); datanode.start_heartbeat().await.unwrap(); datanode } async fn build_frontend( &self, metasrv: MockInfo, datanode_clients: Arc, start_frontend_servers: bool, ) -> Frontend { let mut meta_client = MetaClientBuilder::frontend_default_options() .channel_manager(metasrv.channel_manager) .enable_access_cluster_info() .build(); meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); let cached_meta_backend = Arc::new( CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(), ); let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( CacheRegistryBuilder::default() .add_cache(cached_meta_backend.clone()) .build(), ); let fundamental_cache_registry = build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone()))); let cache_registry = Arc::new( with_default_composite_cache_registry( layered_cache_builder.add_cache_registry(fundamental_cache_registry), ) .unwrap() .build(), ); let information_extension = Arc::new(DistributedInformationExtension::new( meta_client.clone(), datanode_clients.clone(), )); let catalog_manager = KvBackendCatalogManagerBuilder::new( information_extension, cached_meta_backend.clone(), cache_registry.clone(), ) .build(); let fe_opts = self.build_frontend_options(); let instance = FrontendBuilder::new( fe_opts.clone(), cached_meta_backend.clone(), cache_registry.clone(), catalog_manager, datanode_clients, meta_client.clone(), Arc::new(ProcessManager::new(fe_opts.grpc.server_addr.clone(), None)), ) .with_local_cache_invalidator(cache_registry) .try_build() .await .unwrap(); let heartbeat_task = create_heartbeat_task(&fe_opts, meta_client, &instance); let instance = Arc::new(instance); // Build the servers for the frontend. let servers = if start_frontend_servers { Services::new(fe_opts, instance.clone(), Plugins::default()) .build() .unwrap() } else { ServerHandlers::default() }; Frontend { instance, servers, heartbeat_task: Some(heartbeat_task), } } fn build_frontend_options(&self) -> FrontendOptions { let mut fe_opts = FrontendOptions::default(); // Choose a random unused port between [14000, 24000] for local test to avoid conflicts. let port_range = 14000..=24000; let max_attempts = 10; let localhost = "127.0.0.1"; let construct_addr = |port: u16| format!("{}:{}", localhost, port); fe_opts.http.addr = construct_addr(self.choose_random_unused_port( port_range.clone(), max_attempts, localhost, )); let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost); fe_opts.grpc.bind_addr = construct_addr(grpc_port); fe_opts.grpc.server_addr = construct_addr(grpc_port); fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port( port_range.clone(), max_attempts, localhost, )); fe_opts.postgres.addr = construct_addr(self.choose_random_unused_port(port_range, max_attempts, localhost)); fe_opts } // Choose a random unused port between [start, end]. fn choose_random_unused_port( &self, port_range: RangeInclusive, max_attempts: u16, addr: &str, ) -> u16 { let mut rng = rand::rng(); let mut attempts = 0; while attempts < max_attempts { let port = rng.random_range(port_range.clone()); if TcpListener::bind(format!("{}:{}", addr, port)).is_ok() { return port; } attempts += 1; } panic!("No unused port found"); } } async fn build_datanode_clients( clients: Arc, instances: &HashMap, datanodes: usize, ) { for i in 0..datanodes { let datanode_id = i as u64 + 1; let instance = instances.get(&datanode_id).unwrap(); let (addr, client) = create_datanode_client(instance).await; clients .insert_client(Peer::new(datanode_id, addr), client) .await; } } async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let (client, server) = tokio::io::duplex(1024); let runtime = RuntimeBuilder::default() .worker_threads(2) .thread_name("grpc-handlers") .build() .unwrap(); let flight_handler = FlightCraftWrapper(datanode.region_server()); let region_server_handler = RegionServerRequestHandler::new(Arc::new(datanode.region_server()), runtime); let _handle = tokio::spawn(async move { Server::builder() .add_service( FlightServiceServer::new(flight_handler) .accept_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Zstd) .send_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Zstd), ) .add_service( RegionServer::new(region_server_handler) .accept_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Zstd) .send_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Zstd), ) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); // Move client to an option so we can _move_ the inner value // on the first attempt to connect. All other attempts will fail. let mut client = Some(client); // `PEER_PLACEHOLDER_ADDR` is just a placeholder, does not actually connect to it. let addr = PEER_PLACEHOLDER_ADDR; let channel_manager = ChannelManager::new(); let _ = channel_manager .reset_with_connector( addr, service_fn(move |_| { let client = client.take(); async move { if let Some(client) = client { Ok(TokioIo::new(client)) } else { Err(std::io::Error::other("Client already taken")) } } }), ) .unwrap(); ( addr.to_string(), Client::with_manager_and_urls(channel_manager, vec![addr]), ) }