1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::cache_invalidator::CacheInvalidatorRef;
26use common_meta::datanode::TopicStatsReporter;
27use common_meta::key::runtime_switch::RuntimeSwitchManager;
28use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
29use common_meta::kv_backend::KvBackendRef;
30pub use common_procedure::options::ProcedureConfig;
31use common_query::prelude::set_default_prefix;
32use common_stat::ResourceStatImpl;
33use common_telemetry::{error, info, warn};
34use common_wal::config::DatanodeWalConfig;
35use common_wal::config::kafka::DatanodeKafkaConfig;
36use common_wal::config::raft_engine::RaftEngineConfig;
37use file_engine::engine::FileRegionEngine;
38use log_store::kafka::log_store::KafkaLogStore;
39use log_store::kafka::{GlobalIndexCollector, default_index_file};
40use log_store::noop::log_store::NoopLogStore;
41use log_store::raft_engine::log_store::RaftEngineLogStore;
42use meta_client::MetaClientRef;
43use metric_engine::engine::MetricEngine;
44use mito2::config::MitoConfig;
45use mito2::engine::{MitoEngine, MitoEngineBuilder};
46use mito2::region::opener::PartitionExprFetcherRef;
47use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
48use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
49use object_store::util::normalize_dir;
50use query::QueryEngineFactory;
51use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
52use servers::server::ServerHandlers;
53use snafu::{OptionExt, ResultExt, ensure};
54use store_api::path_utils::WAL_DIR;
55use store_api::region_engine::{
56 RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
57};
58use tokio::fs;
59use tokio::sync::Notify;
60
61use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
62use crate::error::{
63 self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
64 GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
65 ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
66};
67use crate::event_listener::{
68 NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
69 new_region_server_event_channel,
70};
71use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
72use crate::heartbeat::HeartbeatTask;
73use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
74use crate::region_server::{DummyTableProviderFactory, RegionServer};
75use crate::store::{self, new_object_store_without_cache};
76use crate::utils::{RegionOpenRequests, build_region_open_requests};
77
78pub struct Datanode {
80 services: ServerHandlers,
81 heartbeat_task: Option<HeartbeatTask>,
82 region_event_receiver: Option<RegionServerEventReceiver>,
83 region_server: RegionServer,
84 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
85 leases_notifier: Option<Arc<Notify>>,
86 plugins: Plugins,
87}
88
89impl Datanode {
90 pub async fn start(&mut self) -> Result<()> {
91 info!("Starting datanode instance...");
92
93 self.start_heartbeat().await?;
94 self.wait_coordinated().await;
95
96 self.start_telemetry();
97
98 self.services.start_all().await.context(StartServerSnafu)
99 }
100
101 pub fn server_handlers(&self) -> &ServerHandlers {
102 &self.services
103 }
104
105 pub fn start_telemetry(&self) {
106 if let Err(e) = self.greptimedb_telemetry_task.start() {
107 warn!(e; "Failed to start telemetry task!");
108 }
109 }
110
111 pub async fn start_heartbeat(&mut self) -> Result<()> {
112 if let Some(task) = &self.heartbeat_task {
113 let receiver = self.region_event_receiver.take().unwrap();
115
116 task.start(receiver, self.leases_notifier.clone()).await?;
117 }
118 Ok(())
119 }
120
121 pub async fn wait_coordinated(&mut self) {
123 if let Some(notifier) = self.leases_notifier.take() {
124 notifier.notified().await;
125 }
126 }
127
128 pub fn setup_services(&mut self, services: ServerHandlers) {
129 self.services = services;
130 }
131
132 pub async fn shutdown(&mut self) -> Result<()> {
133 self.services
134 .shutdown_all()
135 .await
136 .context(ShutdownServerSnafu)?;
137
138 let _ = self.greptimedb_telemetry_task.stop().await;
139 if let Some(heartbeat_task) = &self.heartbeat_task {
140 heartbeat_task
141 .close()
142 .map_err(BoxedError::new)
143 .context(ShutdownInstanceSnafu)?;
144 }
145 self.region_server.stop().await?;
146 Ok(())
147 }
148
149 pub fn region_server(&self) -> RegionServer {
150 self.region_server.clone()
151 }
152
153 pub fn plugins(&self) -> Plugins {
154 self.plugins.clone()
155 }
156}
157
158pub struct DatanodeBuilder {
159 opts: DatanodeOptions,
160 table_provider_factory: Option<TableProviderFactoryRef>,
161 plugins: Plugins,
162 meta_client: Option<MetaClientRef>,
163 kv_backend: KvBackendRef,
164 cache_registry: Option<Arc<LayeredCacheRegistry>>,
165 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
166 open_regions_writable_override: Option<bool>,
167 #[cfg(feature = "enterprise")]
168 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
169}
170
171impl DatanodeBuilder {
172 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
173 Self {
174 opts,
175 table_provider_factory: None,
176 plugins,
177 meta_client: None,
178 kv_backend,
179 cache_registry: None,
180 open_regions_writable_override: None,
181 #[cfg(feature = "enterprise")]
182 extension_range_provider_factory: None,
183 topic_stats_reporter: None,
184 }
185 }
186
187 pub fn options(&self) -> &DatanodeOptions {
188 &self.opts
189 }
190
191 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
192 self.meta_client = Some(client);
193 self
194 }
195
196 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
197 self.cache_registry = Some(registry);
198 self
199 }
200
201 pub fn kv_backend(&self) -> &KvBackendRef {
202 &self.kv_backend
203 }
204
205 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
206 self.table_provider_factory = Some(factory);
207 self
208 }
209
210 pub fn with_open_regions_writable_override(&mut self, writable: bool) -> &mut Self {
220 self.open_regions_writable_override = Some(writable);
221 self
222 }
223
224 #[cfg(feature = "enterprise")]
225 pub fn with_extension_range_provider(
226 &mut self,
227 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
228 ) -> &mut Self {
229 self.extension_range_provider_factory = Some(extension_range_provider_factory);
230 self
231 }
232
233 pub async fn build(mut self) -> Result<Datanode> {
234 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
235 set_default_prefix(self.opts.default_column_prefix.as_deref())
236 .map_err(BoxedError::new)
237 .context(BuildDatanodeSnafu)?;
238
239 let meta_client = self.meta_client.take();
240
241 let controlled_by_metasrv = meta_client.is_some();
245
246 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
248 let (tx, rx) = new_region_server_event_channel();
249 (Box::new(tx) as _, Some(rx))
250 } else {
251 (Box::new(NoopRegionServerEventListener) as _, None)
252 };
253
254 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
255 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
256 let table_id_schema_cache: TableSchemaCacheRef =
257 cache_registry.get().context(MissingCacheSnafu)?;
258
259 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
260 table_id_schema_cache,
261 schema_cache,
262 ));
263
264 let gc_enabled = self.opts.region_engine.iter().any(|engine| {
265 if let RegionEngineConfig::Mito(config) = engine {
266 config.gc.enable
267 } else {
268 false
269 }
270 });
271
272 let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled(
273 Some(node_id),
274 gc_enabled,
275 ));
276 let region_server = self
277 .new_region_server(
278 schema_metadata_manager,
279 region_event_listener,
280 file_ref_manager,
281 )
282 .await?;
283
284 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
286 let is_recovery_mode = runtime_switch_manager
287 .recovery_mode()
288 .await
289 .context(GetMetadataSnafu)?;
290
291 let region_open_requests =
292 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
293 let open_with_writable = self
294 .open_regions_writable_override
295 .unwrap_or(!controlled_by_metasrv);
296 let open_all_regions = open_all_regions(
297 region_server.clone(),
298 region_open_requests,
299 open_with_writable,
300 self.opts.init_regions_parallelism,
301 is_recovery_mode,
303 );
304
305 if self.opts.init_regions_in_background {
306 common_runtime::spawn_global(async move {
308 if let Err(err) = open_all_regions.await {
309 error!(err; "Failed to open regions during the startup.");
310 }
311 });
312 } else {
313 open_all_regions.await?;
314 }
315
316 let heartbeat_task = if let Some(meta_client) = meta_client {
317 let task = self
318 .create_heartbeat_task(®ion_server, meta_client, cache_registry)
319 .await?;
320 Some(task)
321 } else {
322 None
323 };
324
325 let is_standalone = heartbeat_task.is_none();
326 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
327 Some(self.opts.storage.data_home.clone()),
328 is_standalone && self.opts.enable_telemetry,
329 )
330 .await;
331
332 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
333 Some(Arc::new(Notify::new()))
334 } else {
335 None
336 };
337
338 Ok(Datanode {
339 services: ServerHandlers::default(),
340 heartbeat_task,
341 region_server,
342 greptimedb_telemetry_task,
343 region_event_receiver,
344 leases_notifier,
345 plugins: self.plugins.clone(),
346 })
347 }
348
349 async fn create_heartbeat_task(
350 &self,
351 region_server: &RegionServer,
352 meta_client: MetaClientRef,
353 cache_invalidator: CacheInvalidatorRef,
354 ) -> Result<HeartbeatTask> {
355 let stat = {
356 let mut stat = ResourceStatImpl::default();
357 stat.start_collect_cpu_usage();
358 Arc::new(stat)
359 };
360
361 HeartbeatTask::try_new(
362 &self.opts,
363 region_server.clone(),
364 meta_client,
365 self.kv_backend.clone(),
366 cache_invalidator,
367 self.plugins.clone(),
368 stat,
369 )
370 .await
371 }
372
373 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
375 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
376 let default_name = cfg.store.config_name();
377 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
378 for store in &cfg.providers {
379 object_store_manager.add(
380 store.config_name(),
381 store::new_object_store(store.clone(), &cfg.data_home).await?,
382 );
383 }
384 Ok(Arc::new(object_store_manager))
385 }
386
387 #[cfg(test)]
388 async fn initialize_region_server(
390 &self,
391 region_server: &RegionServer,
392 open_with_writable: bool,
393 ) -> Result<()> {
394 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
395
396 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
398 let is_recovery_mode = runtime_switch_manager
399 .recovery_mode()
400 .await
401 .context(GetMetadataSnafu)?;
402 let region_open_requests =
403 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
404
405 open_all_regions(
406 region_server.clone(),
407 region_open_requests,
408 open_with_writable,
409 self.opts.init_regions_parallelism,
410 is_recovery_mode,
411 )
412 .await
413 }
414
415 async fn new_region_server(
416 &mut self,
417 schema_metadata_manager: SchemaMetadataManagerRef,
418 event_listener: RegionServerEventListenerRef,
419 file_ref_manager: FileReferenceManagerRef,
420 ) -> Result<RegionServer> {
421 let opts: &DatanodeOptions = &self.opts;
422
423 let query_engine_factory = QueryEngineFactory::new_with_plugins(
424 DummyCatalogManager::arc(),
426 None,
427 None,
428 None,
429 None,
430 None,
431 false,
432 self.plugins.clone(),
433 opts.query.clone(),
434 );
435 let query_engine = query_engine_factory.query_engine();
436
437 let table_provider_factory = self
438 .table_provider_factory
439 .clone()
440 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
441
442 let mut region_server = RegionServer::with_table_provider(
443 query_engine,
444 common_runtime::global_runtime(),
445 event_listener,
446 table_provider_factory,
447 opts.max_concurrent_queries,
448 opts.concurrent_query_limiter_timeout,
449 opts.grpc.flight_compression,
450 );
451
452 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
453 let engines = self
454 .build_store_engines(
455 object_store_manager,
456 schema_metadata_manager,
457 file_ref_manager,
458 self.plugins.clone(),
459 )
460 .await?;
461 for engine in engines {
462 region_server.register_engine(engine);
463 }
464 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
465 region_server.set_topic_stats_reporter(topic_stats_reporter);
466 }
467
468 Ok(region_server)
469 }
470
471 async fn build_store_engines(
475 &mut self,
476 object_store_manager: ObjectStoreManagerRef,
477 schema_metadata_manager: SchemaMetadataManagerRef,
478 file_ref_manager: FileReferenceManagerRef,
479 plugins: Plugins,
480 ) -> Result<Vec<RegionEngineRef>> {
481 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
482 let mut mito_engine_config = MitoConfig::default();
483 let mut file_engine_config = file_engine::config::EngineConfig::default();
484
485 for engine in &self.opts.region_engine {
486 match engine {
487 RegionEngineConfig::Mito(config) => {
488 mito_engine_config = config.clone();
489 }
490 RegionEngineConfig::File(config) => {
491 file_engine_config = config.clone();
492 }
493 RegionEngineConfig::Metric(metric_config) => {
494 metric_engine_config = metric_config.clone();
495 }
496 }
497 }
498
499 let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
501 let mito_engine = self
502 .build_mito_engine(
503 object_store_manager.clone(),
504 mito_engine_config,
505 schema_metadata_manager.clone(),
506 file_ref_manager.clone(),
507 fetcher.clone(),
508 plugins.clone(),
509 )
510 .await?;
511
512 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
513 .context(BuildMetricEngineSnafu)?;
514
515 let file_engine = FileRegionEngine::new(
516 file_engine_config,
517 object_store_manager.default_object_store().clone(), );
519
520 Ok(vec![
521 Arc::new(mito_engine) as _,
522 Arc::new(metric_engine) as _,
523 Arc::new(file_engine) as _,
524 ])
525 }
526
527 async fn build_mito_engine(
529 &mut self,
530 object_store_manager: ObjectStoreManagerRef,
531 mut config: MitoConfig,
532 schema_metadata_manager: SchemaMetadataManagerRef,
533 file_ref_manager: FileReferenceManagerRef,
534 partition_expr_fetcher: PartitionExprFetcherRef,
535 plugins: Plugins,
536 ) -> Result<MitoEngine> {
537 let opts = &self.opts;
538 if opts.storage.is_object_storage() {
539 config.enable_write_cache = true;
541 info!("Configured 'enable_write_cache=true' for mito engine.");
542 }
543
544 let mito_engine = match &opts.wal {
545 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
546 let log_store =
547 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
548 .await?;
549
550 let builder = MitoEngineBuilder::new(
551 &opts.storage.data_home,
552 config,
553 log_store,
554 object_store_manager,
555 schema_metadata_manager,
556 file_ref_manager,
557 partition_expr_fetcher.clone(),
558 plugins,
559 );
560
561 #[cfg(feature = "enterprise")]
562 let builder = builder.with_extension_range_provider_factory(
563 self.extension_range_provider_factory.take(),
564 );
565
566 builder.try_build().await.context(BuildMitoEngineSnafu)?
567 }
568 DatanodeWalConfig::Kafka(kafka_config) => {
569 if kafka_config.create_index && opts.node_id.is_none() {
570 warn!("The WAL index creation only available in distributed mode.")
571 }
572 let global_index_collector = if kafka_config.create_index
573 && let Some(node_id) = opts.node_id
574 {
575 let operator = new_object_store_without_cache(
576 &opts.storage.store,
577 &opts.storage.data_home,
578 )
579 .await?;
580 let path = default_index_file(node_id);
581 Some(Self::build_global_index_collector(
582 kafka_config.dump_index_interval,
583 operator,
584 path,
585 ))
586 } else {
587 None
588 };
589
590 let log_store =
591 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
592 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
593 let builder = MitoEngineBuilder::new(
594 &opts.storage.data_home,
595 config,
596 log_store,
597 object_store_manager,
598 schema_metadata_manager,
599 file_ref_manager,
600 partition_expr_fetcher,
601 plugins,
602 );
603
604 #[cfg(feature = "enterprise")]
605 let builder = builder.with_extension_range_provider_factory(
606 self.extension_range_provider_factory.take(),
607 );
608
609 builder.try_build().await.context(BuildMitoEngineSnafu)?
610 }
611 DatanodeWalConfig::Noop => {
612 let log_store = Arc::new(NoopLogStore);
613
614 let builder = MitoEngineBuilder::new(
615 &opts.storage.data_home,
616 config,
617 log_store,
618 object_store_manager,
619 schema_metadata_manager,
620 file_ref_manager,
621 partition_expr_fetcher.clone(),
622 plugins,
623 );
624
625 #[cfg(feature = "enterprise")]
626 let builder = builder.with_extension_range_provider_factory(
627 self.extension_range_provider_factory.take(),
628 );
629
630 builder.try_build().await.context(BuildMitoEngineSnafu)?
631 }
632 };
633 Ok(mito_engine)
634 }
635
636 async fn build_raft_engine_log_store(
638 data_home: &str,
639 config: &RaftEngineConfig,
640 ) -> Result<Arc<RaftEngineLogStore>> {
641 let data_home = normalize_dir(data_home);
642 let wal_dir = match &config.dir {
643 Some(dir) => dir.clone(),
644 None => format!("{}{WAL_DIR}", data_home),
645 };
646
647 fs::create_dir_all(Path::new(&wal_dir))
649 .await
650 .context(CreateDirSnafu { dir: &wal_dir })?;
651 info!(
652 "Creating raft-engine logstore with config: {:?} and storage path: {}",
653 config, &wal_dir
654 );
655 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
656 .await
657 .map_err(Box::new)
658 .context(OpenLogStoreSnafu)?;
659
660 Ok(Arc::new(logstore))
661 }
662
663 async fn build_kafka_log_store(
665 config: &DatanodeKafkaConfig,
666 global_index_collector: Option<GlobalIndexCollector>,
667 ) -> Result<Arc<KafkaLogStore>> {
668 KafkaLogStore::try_new(config, global_index_collector)
669 .await
670 .map_err(Box::new)
671 .context(OpenLogStoreSnafu)
672 .map(Arc::new)
673 }
674
675 fn build_global_index_collector(
677 dump_index_interval: Duration,
678 operator: object_store::ObjectStore,
679 path: String,
680 ) -> GlobalIndexCollector {
681 GlobalIndexCollector::new(dump_index_interval, operator, path)
682 }
683}
684
685async fn open_all_regions(
687 region_server: RegionServer,
688 region_open_requests: RegionOpenRequests,
689 open_with_writable: bool,
690 init_regions_parallelism: usize,
691 ignore_nonexistent_region: bool,
692) -> Result<()> {
693 let RegionOpenRequests {
694 leader_regions,
695 #[cfg(feature = "enterprise")]
696 follower_regions,
697 } = region_open_requests;
698
699 let leader_region_num = leader_regions.len();
700 info!("going to open {} region(s)", leader_region_num);
701 let now = Instant::now();
702 let open_regions = region_server
703 .handle_batch_open_requests(
704 init_regions_parallelism,
705 leader_regions,
706 ignore_nonexistent_region,
707 )
708 .await?;
709 info!(
710 "Opened {} regions in {:?}",
711 open_regions.len(),
712 now.elapsed()
713 );
714 if !ignore_nonexistent_region {
715 ensure!(
716 open_regions.len() == leader_region_num,
717 error::UnexpectedSnafu {
718 violated: format!(
719 "Expected to open {} of regions, only {} of regions has opened",
720 leader_region_num,
721 open_regions.len()
722 )
723 }
724 );
725 } else if open_regions.len() != leader_region_num {
726 warn!(
727 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
728 leader_region_num,
729 open_regions.len()
730 );
731 }
732
733 for region_id in open_regions {
734 if open_with_writable {
735 let res = region_server.set_region_role(region_id, RegionRole::Leader);
736 match res {
737 Ok(_) => {
738 if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
740 .set_region_role_state_gracefully(
741 region_id,
742 SettableRegionRoleState::Leader,
743 )
744 .await?
745 {
746 error!(err; "failed to convert region {region_id} to leader");
747 }
748 }
749 Err(e) => {
750 error!(e; "failed to convert region {region_id} to leader");
751 }
752 }
753 }
754 }
755
756 #[cfg(feature = "enterprise")]
757 if !follower_regions.is_empty() {
758 use tokio::time::Instant;
759
760 let follower_region_num = follower_regions.len();
761 info!("going to open {} follower region(s)", follower_region_num);
762
763 let now = Instant::now();
764 let open_regions = region_server
765 .handle_batch_open_requests(
766 init_regions_parallelism,
767 follower_regions,
768 ignore_nonexistent_region,
769 )
770 .await?;
771 info!(
772 "Opened {} follower regions in {:?}",
773 open_regions.len(),
774 now.elapsed()
775 );
776
777 if !ignore_nonexistent_region {
778 ensure!(
779 open_regions.len() == follower_region_num,
780 error::UnexpectedSnafu {
781 violated: format!(
782 "Expected to open {} of follower regions, only {} of regions has opened",
783 follower_region_num,
784 open_regions.len()
785 )
786 }
787 );
788 } else if open_regions.len() != follower_region_num {
789 warn!(
790 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
791 follower_region_num,
792 open_regions.len()
793 );
794 }
795 }
796
797 info!("all regions are opened");
798
799 Ok(())
800}
801
802#[cfg(test)]
803mod tests {
804 use std::assert_matches;
805 use std::collections::{BTreeMap, HashMap};
806 use std::sync::Arc;
807
808 use cache::build_datanode_cache_registry;
809 use common_base::Plugins;
810 use common_meta::cache::LayeredCacheRegistryBuilder;
811 use common_meta::key::RegionRoleSet;
812 use common_meta::key::datanode_table::DatanodeTableManager;
813 use common_meta::kv_backend::KvBackendRef;
814 use common_meta::kv_backend::memory::MemoryKvBackend;
815 use mito2::engine::MITO_ENGINE_NAME;
816 use store_api::region_request::RegionRequest;
817 use store_api::storage::RegionId;
818
819 use crate::config::DatanodeOptions;
820 use crate::datanode::DatanodeBuilder;
821 use crate::tests::{MockRegionEngine, mock_region_server};
822
823 async fn setup_table_datanode(kv: &KvBackendRef) {
824 let mgr = DatanodeTableManager::new(kv.clone());
825 let txn = mgr
826 .build_create_txn(
827 1028,
828 MITO_ENGINE_NAME,
829 "foo/bar/weny",
830 HashMap::from([("foo".to_string(), "bar".to_string())]),
831 HashMap::default(),
832 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
833 )
834 .unwrap();
835
836 let r = kv.txn(txn).await.unwrap();
837 assert!(r.succeeded);
838 }
839
840 #[tokio::test]
841 async fn test_initialize_region_server() {
842 common_telemetry::init_default_ut_logging();
843 let mut mock_region_server = mock_region_server();
844 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
845
846 mock_region_server.register_engine(mock_region.clone());
847
848 let kv_backend = Arc::new(MemoryKvBackend::new());
849 let layered_cache_registry = Arc::new(
850 LayeredCacheRegistryBuilder::default()
851 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
852 .build(),
853 );
854
855 let mut builder = DatanodeBuilder::new(
856 DatanodeOptions {
857 node_id: Some(0),
858 ..Default::default()
859 },
860 Plugins::default(),
861 kv_backend.clone(),
862 );
863 builder.with_cache_registry(layered_cache_registry);
864 setup_table_datanode(&(kv_backend as _)).await;
865
866 builder
867 .initialize_region_server(&mock_region_server, false)
868 .await
869 .unwrap();
870
871 for i in 0..3 {
872 let (region_id, req) = mock_region_handler.recv().await.unwrap();
873 assert_eq!(region_id, RegionId::new(1028, i));
874 if let RegionRequest::Open(req) = req {
875 assert_eq!(
876 req.options,
877 HashMap::from([("foo".to_string(), "bar".to_string())])
878 )
879 } else {
880 unreachable!()
881 }
882 }
883
884 assert_matches!(
885 mock_region_handler.try_recv(),
886 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
887 );
888 }
889}