1pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_telemetry::{error, info, warn};
29use crossbeam_utils::atomic::AtomicCell;
30use partition::expr::PartitionExpr;
31use snafu::{OptionExt, ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37 RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
38};
39use store_api::region_request::PathType;
40use store_api::sst_entry::ManifestSstEntry;
41use store_api::storage::{FileId, RegionId, SequenceNumber};
42use tokio::sync::RwLockWriteGuard;
43pub use utils::*;
44
45use crate::access_layer::AccessLayerRef;
46use crate::error::{
47 FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
48 RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
49};
50use crate::manifest::action::{
51 RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
52};
53use crate::manifest::manager::RegionManifestManager;
54use crate::region::version::{VersionControlRef, VersionRef};
55use crate::request::{OnFailure, OptionOutputTx};
56use crate::sst::file::FileMeta;
57use crate::sst::file_purger::FilePurgerRef;
58use crate::sst::location::{index_file_path, sst_file_path};
59use crate::time_provider::TimeProviderRef;
60
61const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
63
64#[derive(Debug)]
66pub struct RegionUsage {
67 pub region_id: RegionId,
68 pub wal_usage: u64,
69 pub sst_usage: u64,
70 pub manifest_usage: u64,
71}
72
73impl RegionUsage {
74 pub fn disk_usage(&self) -> u64 {
75 self.wal_usage + self.sst_usage + self.manifest_usage
76 }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum RegionLeaderState {
81 Writable,
83 Staging,
85 EnteringStaging,
87 Altering,
89 Dropping,
91 Truncating,
93 Editing,
95 Downgrading,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum RegionRoleState {
101 Leader(RegionLeaderState),
102 Follower,
103}
104
105impl RegionRoleState {
106 pub fn into_leader_state(self) -> Option<RegionLeaderState> {
108 match self {
109 RegionRoleState::Leader(leader_state) => Some(leader_state),
110 RegionRoleState::Follower => None,
111 }
112 }
113}
114
115#[derive(Debug)]
121pub struct MitoRegion {
122 pub(crate) region_id: RegionId,
127
128 pub(crate) version_control: VersionControlRef,
132 pub(crate) access_layer: AccessLayerRef,
134 pub(crate) manifest_ctx: ManifestContextRef,
136 pub(crate) file_purger: FilePurgerRef,
138 pub(crate) provider: Provider,
140 last_flush_millis: AtomicI64,
142 last_compaction_millis: AtomicI64,
144 time_provider: TimeProviderRef,
146 pub(crate) topic_latest_entry_id: AtomicU64,
156 pub(crate) written_bytes: Arc<AtomicU64>,
158 pub(crate) staging_partition_expr: Mutex<Option<String>>,
166 stats: ManifestStats,
168}
169
170pub type MitoRegionRef = Arc<MitoRegion>;
171
172impl MitoRegion {
173 pub(crate) async fn stop(&self) {
175 self.manifest_ctx
176 .manifest_manager
177 .write()
178 .await
179 .stop()
180 .await;
181
182 info!(
183 "Stopped region manifest manager, region_id: {}",
184 self.region_id
185 );
186 }
187
188 pub(crate) fn metadata(&self) -> RegionMetadataRef {
190 let version_data = self.version_control.current();
191 version_data.version.metadata.clone()
192 }
193
194 pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
196 let version_data = self.version_control.current();
197 version_data.version.metadata.primary_key_encoding
198 }
199
200 pub(crate) fn version(&self) -> VersionRef {
202 let version_data = self.version_control.current();
203 version_data.version
204 }
205
206 pub(crate) fn last_flush_millis(&self) -> i64 {
208 self.last_flush_millis.load(Ordering::Relaxed)
209 }
210
211 pub(crate) fn update_flush_millis(&self) {
213 let now = self.time_provider.current_time_millis();
214 self.last_flush_millis.store(now, Ordering::Relaxed);
215 }
216
217 pub(crate) fn last_compaction_millis(&self) -> i64 {
219 self.last_compaction_millis.load(Ordering::Relaxed)
220 }
221
222 pub(crate) fn update_compaction_millis(&self) {
224 let now = self.time_provider.current_time_millis();
225 self.last_compaction_millis.store(now, Ordering::Relaxed);
226 }
227
228 pub(crate) fn table_dir(&self) -> &str {
230 self.access_layer.table_dir()
231 }
232
233 pub(crate) fn path_type(&self) -> PathType {
235 self.access_layer.path_type()
236 }
237
238 pub(crate) fn is_writable(&self) -> bool {
240 matches!(
241 self.manifest_ctx.state.load(),
242 RegionRoleState::Leader(RegionLeaderState::Writable)
243 | RegionRoleState::Leader(RegionLeaderState::Staging)
244 )
245 }
246
247 pub(crate) fn is_flushable(&self) -> bool {
249 matches!(
250 self.manifest_ctx.state.load(),
251 RegionRoleState::Leader(RegionLeaderState::Writable)
252 | RegionRoleState::Leader(RegionLeaderState::Staging)
253 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
254 )
255 }
256
257 pub(crate) fn should_abort_index(&self) -> bool {
259 matches!(
260 self.manifest_ctx.state.load(),
261 RegionRoleState::Follower
262 | RegionRoleState::Leader(RegionLeaderState::Dropping)
263 | RegionRoleState::Leader(RegionLeaderState::Truncating)
264 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
265 | RegionRoleState::Leader(RegionLeaderState::Staging)
266 )
267 }
268
269 pub(crate) fn is_downgrading(&self) -> bool {
271 matches!(
272 self.manifest_ctx.state.load(),
273 RegionRoleState::Leader(RegionLeaderState::Downgrading)
274 )
275 }
276
277 #[allow(dead_code)]
279 pub(crate) fn is_staging(&self) -> bool {
280 self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
281 }
282
283 pub fn region_id(&self) -> RegionId {
284 self.region_id
285 }
286
287 pub fn find_committed_sequence(&self) -> SequenceNumber {
288 self.version_control.committed_sequence()
289 }
290
291 pub fn is_follower(&self) -> bool {
293 self.manifest_ctx.state.load() == RegionRoleState::Follower
294 }
295
296 pub(crate) fn state(&self) -> RegionRoleState {
298 self.manifest_ctx.state.load()
299 }
300
301 pub(crate) fn set_role(&self, next_role: RegionRole) {
303 self.manifest_ctx.set_role(next_role, self.region_id);
304 }
305
306 pub(crate) fn set_altering(&self) -> Result<()> {
309 self.compare_exchange_state(
310 RegionLeaderState::Writable,
311 RegionRoleState::Leader(RegionLeaderState::Altering),
312 )
313 }
314
315 pub(crate) fn set_dropping(&self) -> Result<()> {
318 self.compare_exchange_state(
319 RegionLeaderState::Writable,
320 RegionRoleState::Leader(RegionLeaderState::Dropping),
321 )
322 }
323
324 pub(crate) fn set_truncating(&self) -> Result<()> {
327 self.compare_exchange_state(
328 RegionLeaderState::Writable,
329 RegionRoleState::Leader(RegionLeaderState::Truncating),
330 )
331 }
332
333 pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
336 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
337 }
338
339 pub(crate) async fn set_staging(
345 &self,
346 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
347 ) -> Result<()> {
348 manager.store().clear_staging_manifests().await?;
349
350 self.compare_exchange_state(
351 RegionLeaderState::Writable,
352 RegionRoleState::Leader(RegionLeaderState::Staging),
353 )
354 }
355
356 pub(crate) fn set_entering_staging(&self) -> Result<()> {
358 self.compare_exchange_state(
359 RegionLeaderState::Writable,
360 RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
361 )
362 }
363
364 pub fn exit_staging(&self) -> Result<()> {
369 *self.staging_partition_expr.lock().unwrap() = None;
370 self.compare_exchange_state(
371 RegionLeaderState::Staging,
372 RegionRoleState::Leader(RegionLeaderState::Writable),
373 )
374 }
375
376 pub(crate) async fn set_role_state_gracefully(
378 &self,
379 state: SettableRegionRoleState,
380 ) -> Result<()> {
381 let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
382 self.manifest_ctx.manifest_manager.write().await;
383 let current_state = self.state();
384
385 match state {
386 SettableRegionRoleState::Leader => {
387 match current_state {
390 RegionRoleState::Leader(RegionLeaderState::Staging) => {
391 info!("Exiting staging mode for region {}", self.region_id);
392 self.exit_staging_on_success(&mut manager).await?;
394 }
395 RegionRoleState::Leader(RegionLeaderState::Writable) => {
396 info!("Region {} already in normal leader mode", self.region_id);
398 }
399 _ => {
400 return Err(RegionStateSnafu {
402 region_id: self.region_id,
403 state: current_state,
404 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
405 }
406 .build());
407 }
408 }
409 }
410
411 SettableRegionRoleState::StagingLeader => {
412 match current_state {
415 RegionRoleState::Leader(RegionLeaderState::Writable) => {
416 info!("Entering staging mode for region {}", self.region_id);
417 self.set_staging(&mut manager).await?;
418 }
419 RegionRoleState::Leader(RegionLeaderState::Staging) => {
420 info!("Region {} already in staging mode", self.region_id);
422 }
423 _ => {
424 return Err(RegionStateSnafu {
425 region_id: self.region_id,
426 state: current_state,
427 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
428 }
429 .build());
430 }
431 }
432 }
433
434 SettableRegionRoleState::Follower => {
435 match current_state {
437 RegionRoleState::Leader(RegionLeaderState::Staging) => {
438 info!(
439 "Exiting staging and demoting region {} to follower",
440 self.region_id
441 );
442 self.exit_staging()?;
443 self.set_role(RegionRole::Follower);
444 }
445 RegionRoleState::Leader(_) => {
446 info!("Demoting region {} from leader to follower", self.region_id);
447 self.set_role(RegionRole::Follower);
448 }
449 RegionRoleState::Follower => {
450 info!("Region {} already in follower mode", self.region_id);
452 }
453 }
454 }
455
456 SettableRegionRoleState::DowngradingLeader => {
457 match current_state {
459 RegionRoleState::Leader(RegionLeaderState::Staging) => {
460 info!(
461 "Exiting staging and entering downgrade for region {}",
462 self.region_id
463 );
464 self.exit_staging()?;
465 self.set_role(RegionRole::DowngradingLeader);
466 }
467 RegionRoleState::Leader(RegionLeaderState::Writable) => {
468 info!("Starting downgrade for region {}", self.region_id);
469 self.set_role(RegionRole::DowngradingLeader);
470 }
471 RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
472 info!("Region {} already in downgrading mode", self.region_id);
474 }
475 _ => {
476 warn!(
477 "Cannot start downgrade for region {} from state {:?}",
478 self.region_id, current_state
479 );
480 }
481 }
482 }
483 }
484
485 if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
487 let manifest_meta = &manager.manifest().metadata;
489 let current_version = self.version();
490 let current_meta = ¤t_version.metadata;
491 if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
492 let action = RegionMetaAction::Change(RegionChange {
493 metadata: current_meta.clone(),
494 sst_format: current_version.options.sst_format.unwrap_or_default(),
495 });
496 let result = manager
497 .update(RegionMetaActionList::with_action(action), false)
498 .await;
499
500 match result {
501 Ok(version) => {
502 info!(
503 "Successfully persisted backfilled metadata for region {}, version: {}",
504 self.region_id, version
505 );
506 }
507 Err(e) => {
508 warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
509 }
510 }
511 }
512 }
513
514 drop(manager);
515
516 Ok(())
517 }
518
519 pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
522 if let Err(e) = self
523 .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
524 {
525 error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
526 }
527 }
528
529 pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
532 if let Err(e) =
533 self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
534 {
535 error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
536 }
537 }
538
539 pub(crate) fn region_statistic(&self) -> RegionStatistic {
541 let version = self.version();
542 let memtables = &version.memtables;
543 let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
544
545 let sst_usage = version.ssts.sst_usage();
546 let index_usage = version.ssts.index_usage();
547 let flushed_entry_id = version.flushed_entry_id;
548
549 let wal_usage = self.estimated_wal_usage(memtable_usage);
550 let manifest_usage = self.stats.total_manifest_size();
551 let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
552 let num_files = version.ssts.num_files();
553 let manifest_version = self.stats.manifest_version();
554 let file_removed_cnt = self.stats.file_removed_cnt();
555
556 let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
557 let written_bytes = self.written_bytes.load(Ordering::Relaxed);
558
559 RegionStatistic {
560 num_rows,
561 memtable_size: memtable_usage,
562 wal_size: wal_usage,
563 manifest_size: manifest_usage,
564 sst_size: sst_usage,
565 sst_num: num_files,
566 index_size: index_usage,
567 manifest: RegionManifestInfo::Mito {
568 manifest_version,
569 flushed_entry_id,
570 file_removed_cnt,
571 },
572 data_topic_latest_entry_id: topic_latest_entry_id,
573 metadata_topic_latest_entry_id: topic_latest_entry_id,
574 written_bytes,
575 }
576 }
577
578 fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
581 ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
582 }
583
584 fn compare_exchange_state(
587 &self,
588 expect: RegionLeaderState,
589 state: RegionRoleState,
590 ) -> Result<()> {
591 self.manifest_ctx
592 .state
593 .compare_exchange(RegionRoleState::Leader(expect), state)
594 .map_err(|actual| {
595 RegionStateSnafu {
596 region_id: self.region_id,
597 state: actual,
598 expect: RegionRoleState::Leader(expect),
599 }
600 .build()
601 })?;
602 Ok(())
603 }
604
605 pub fn access_layer(&self) -> AccessLayerRef {
606 self.access_layer.clone()
607 }
608
609 pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
611 let table_dir = self.table_dir();
612 let path_type = self.access_layer.path_type();
613
614 let visible_ssts = self
615 .version()
616 .ssts
617 .levels()
618 .iter()
619 .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
620 .collect::<HashSet<_>>();
621
622 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
623 let staging_files = self
624 .manifest_ctx
625 .staging_manifest()
626 .await
627 .map(|m| m.files.clone())
628 .unwrap_or_default();
629 let files = manifest_files
630 .into_iter()
631 .chain(staging_files.into_iter())
632 .collect::<HashMap<_, _>>();
633
634 files
635 .values()
636 .map(|meta| {
637 let region_id = self.region_id;
638 let origin_region_id = meta.region_id;
639 let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
640 {
641 let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
642 (
643 meta.index_version,
644 Some(index_file_path),
645 Some(meta.index_file_size),
646 )
647 } else {
648 (0, None, None)
649 };
650 let visible = visible_ssts.contains(&meta.file_id);
651 ManifestSstEntry {
652 table_dir: table_dir.to_string(),
653 region_id,
654 table_id: region_id.table_id(),
655 region_number: region_id.region_number(),
656 region_group: region_id.region_group(),
657 region_sequence: region_id.region_sequence(),
658 file_id: meta.file_id.to_string(),
659 index_version,
660 level: meta.level,
661 file_path: sst_file_path(table_dir, meta.file_id(), path_type),
662 file_size: meta.file_size,
663 index_file_path,
664 index_file_size,
665 num_rows: meta.num_rows,
666 num_row_groups: meta.num_row_groups,
667 num_series: Some(meta.num_series),
668 min_ts: meta.time_range.0,
669 max_ts: meta.time_range.1,
670 sequence: meta.sequence.map(|s| s.get()),
671 origin_region_id,
672 node_id: None,
673 visible,
674 }
675 })
676 .collect()
677 }
678
679 pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
681 let manifest_files = self.manifest_ctx.manifest().await.files.clone();
682
683 file_ids
684 .iter()
685 .map(|file_id| manifest_files.get(file_id).cloned())
686 .collect::<Vec<_>>()
687 }
688
689 pub(crate) async fn exit_staging_on_success(
691 &self,
692 manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
693 ) -> Result<()> {
694 let current_state = self.manifest_ctx.current_state();
695 ensure!(
696 current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
697 RegionStateSnafu {
698 region_id: self.region_id,
699 state: current_state,
700 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
701 }
702 );
703
704 let merged_actions = match manager.merge_staged_actions(current_state).await? {
706 Some(actions) => actions,
707 None => {
708 info!(
709 "No staged manifests to merge for region {}, exiting staging mode without changes",
710 self.region_id
711 );
712 self.exit_staging()?;
714 return Ok(());
715 }
716 };
717 let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
718 let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
719 ensure!(
720 expect_change,
721 UnexpectedSnafu {
722 reason: "expect a change action in merged actions"
723 }
724 );
725 ensure!(
726 expect_edit,
727 UnexpectedSnafu {
728 reason: "expect an edit action in merged actions"
729 }
730 );
731
732 let new_version = manager.update(merged_actions.clone(), false).await?;
735
736 info!(
737 "Successfully submitted merged staged manifests for region {}, new version: {}",
738 self.region_id, new_version
739 );
740
741 let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
743 let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
745 self.version_control.alter_schema(new_metadata);
746 self.version_control
747 .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
748
749 if let Err(e) = manager.clear_staging_manifest_and_dir().await {
751 error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
752 }
753 self.exit_staging()?;
754
755 Ok(())
756 }
757
758 pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
764 let is_staging = self.is_staging();
765 if is_staging {
766 let staging_partition_expr = self.staging_partition_expr.lock().unwrap();
767 if staging_partition_expr.is_none() {
768 warn!(
769 "Staging partition expr is none for region {} in staging state",
770 self.region_id
771 );
772 }
773 staging_partition_expr.clone()
774 } else {
775 let version = self.version();
776 version.metadata.partition_expr.clone()
777 }
778 }
779}
780
781#[derive(Debug)]
783pub(crate) struct ManifestContext {
784 pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
786 state: AtomicCell<RegionRoleState>,
789}
790
791impl ManifestContext {
792 pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
793 ManifestContext {
794 manifest_manager: tokio::sync::RwLock::new(manager),
795 state: AtomicCell::new(state),
796 }
797 }
798
799 pub(crate) async fn manifest_version(&self) -> ManifestVersion {
800 self.manifest_manager
801 .read()
802 .await
803 .manifest()
804 .manifest_version
805 }
806
807 pub(crate) async fn has_update(&self) -> Result<bool> {
808 self.manifest_manager.read().await.has_update().await
809 }
810
811 pub(crate) fn current_state(&self) -> RegionRoleState {
813 self.state.load()
814 }
815
816 pub(crate) async fn install_manifest_to(
822 &self,
823 version: ManifestVersion,
824 ) -> Result<Arc<RegionManifest>> {
825 let mut manager = self.manifest_manager.write().await;
826 manager.install_manifest_to(version).await?;
827
828 Ok(manager.manifest())
829 }
830
831 pub(crate) async fn update_manifest(
833 &self,
834 expect_state: RegionLeaderState,
835 action_list: RegionMetaActionList,
836 is_staging: bool,
837 ) -> Result<ManifestVersion> {
838 let mut manager = self.manifest_manager.write().await;
840 let manifest = manager.manifest();
842 let current_state = self.state.load();
845
846 if expect_state != RegionLeaderState::Downgrading {
851 if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
852 info!(
853 "Region {} is in downgrading leader state, updating manifest. state is {:?}",
854 manifest.metadata.region_id, expect_state
855 );
856 }
857 ensure!(
858 current_state == RegionRoleState::Leader(expect_state)
859 || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
860 UpdateManifestSnafu {
861 region_id: manifest.metadata.region_id,
862 state: current_state,
863 }
864 );
865 } else {
866 ensure!(
867 current_state == RegionRoleState::Leader(expect_state),
868 RegionStateSnafu {
869 region_id: manifest.metadata.region_id,
870 state: current_state,
871 expect: RegionRoleState::Leader(expect_state),
872 }
873 );
874 }
875
876 for action in &action_list.actions {
877 let RegionMetaAction::Edit(edit) = &action else {
879 continue;
880 };
881
882 let Some(truncated_entry_id) = manifest.truncated_entry_id else {
884 continue;
885 };
886
887 if let Some(flushed_entry_id) = edit.flushed_entry_id {
889 ensure!(
890 truncated_entry_id < flushed_entry_id,
891 RegionTruncatedSnafu {
892 region_id: manifest.metadata.region_id,
893 }
894 );
895 }
896
897 if !edit.files_to_remove.is_empty() {
899 for file in &edit.files_to_remove {
901 ensure!(
902 manifest.files.contains_key(&file.file_id),
903 RegionTruncatedSnafu {
904 region_id: manifest.metadata.region_id,
905 }
906 );
907 }
908 }
909 }
910
911 let version = manager.update(action_list, is_staging).await.inspect_err(
913 |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
914 )?;
915
916 if self.state.load() == RegionRoleState::Follower {
917 warn!(
918 "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
919 manifest.metadata.region_id
920 );
921 }
922
923 Ok(version)
924 }
925
926 pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
948 match next_role {
949 RegionRole::Follower => {
950 match self.state.fetch_update(|state| {
951 if !matches!(state, RegionRoleState::Follower) {
952 Some(RegionRoleState::Follower)
953 } else {
954 None
955 }
956 }) {
957 Ok(state) => info!(
958 "Convert region {} to follower, previous role state: {:?}",
959 region_id, state
960 ),
961 Err(state) => {
962 if state != RegionRoleState::Follower {
963 warn!(
964 "Failed to convert region {} to follower, current role state: {:?}",
965 region_id, state
966 )
967 }
968 }
969 }
970 }
971 RegionRole::Leader => {
972 match self.state.fetch_update(|state| {
973 if matches!(
974 state,
975 RegionRoleState::Follower
976 | RegionRoleState::Leader(RegionLeaderState::Downgrading)
977 ) {
978 Some(RegionRoleState::Leader(RegionLeaderState::Writable))
979 } else {
980 None
981 }
982 }) {
983 Ok(state) => info!(
984 "Convert region {} to leader, previous role state: {:?}",
985 region_id, state
986 ),
987 Err(state) => {
988 if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
989 warn!(
990 "Failed to convert region {} to leader, current role state: {:?}",
991 region_id, state
992 )
993 }
994 }
995 }
996 }
997 RegionRole::DowngradingLeader => {
998 match self.state.compare_exchange(
999 RegionRoleState::Leader(RegionLeaderState::Writable),
1000 RegionRoleState::Leader(RegionLeaderState::Downgrading),
1001 ) {
1002 Ok(state) => info!(
1003 "Convert region {} to downgrading region, previous role state: {:?}",
1004 region_id, state
1005 ),
1006 Err(state) => {
1007 if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1008 warn!(
1009 "Failed to convert region {} to downgrading leader, current role state: {:?}",
1010 region_id, state
1011 )
1012 }
1013 }
1014 }
1015 }
1016 }
1017 }
1018
1019 pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1021 self.manifest_manager.read().await.manifest()
1022 }
1023
1024 pub(crate) async fn staging_manifest(
1026 &self,
1027 ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1028 self.manifest_manager.read().await.staging_manifest()
1029 }
1030}
1031
1032pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1033
1034#[derive(Debug, Default)]
1036pub(crate) struct RegionMap {
1037 regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1038}
1039
1040impl RegionMap {
1041 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1043 let regions = self.regions.read().unwrap();
1044 regions.contains_key(®ion_id)
1045 }
1046
1047 pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1049 let mut regions = self.regions.write().unwrap();
1050 regions.insert(region.region_id, region);
1051 }
1052
1053 pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1055 let regions = self.regions.read().unwrap();
1056 regions.get(®ion_id).cloned()
1057 }
1058
1059 pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1063 let region = self
1064 .get_region(region_id)
1065 .context(RegionNotFoundSnafu { region_id })?;
1066 ensure!(
1067 region.is_writable(),
1068 RegionStateSnafu {
1069 region_id,
1070 state: region.state(),
1071 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1072 }
1073 );
1074 Ok(region)
1075 }
1076
1077 pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1081 let region = self
1082 .get_region(region_id)
1083 .context(RegionNotFoundSnafu { region_id })?;
1084 ensure!(
1085 region.is_follower(),
1086 RegionStateSnafu {
1087 region_id,
1088 state: region.state(),
1089 expect: RegionRoleState::Follower,
1090 }
1091 );
1092
1093 Ok(region)
1094 }
1095
1096 pub(crate) fn get_region_or<F: OnFailure>(
1100 &self,
1101 region_id: RegionId,
1102 cb: &mut F,
1103 ) -> Option<MitoRegionRef> {
1104 match self
1105 .get_region(region_id)
1106 .context(RegionNotFoundSnafu { region_id })
1107 {
1108 Ok(region) => Some(region),
1109 Err(e) => {
1110 cb.on_failure(e);
1111 None
1112 }
1113 }
1114 }
1115
1116 pub(crate) fn writable_region_or<F: OnFailure>(
1120 &self,
1121 region_id: RegionId,
1122 cb: &mut F,
1123 ) -> Option<MitoRegionRef> {
1124 match self.writable_region(region_id) {
1125 Ok(region) => Some(region),
1126 Err(e) => {
1127 cb.on_failure(e);
1128 None
1129 }
1130 }
1131 }
1132
1133 pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1137 let region = self.writable_region(region_id)?;
1138 if region.is_staging() {
1139 return Err(crate::error::RegionStateSnafu {
1140 region_id,
1141 state: region.state(),
1142 expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1143 }
1144 .build());
1145 }
1146 Ok(region)
1147 }
1148
1149 pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1153 let region = self
1154 .get_region(region_id)
1155 .context(RegionNotFoundSnafu { region_id })?;
1156 ensure!(
1157 region.is_staging(),
1158 RegionStateSnafu {
1159 region_id,
1160 state: region.state(),
1161 expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1162 }
1163 );
1164 Ok(region)
1165 }
1166
1167 fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1171 let region = self
1172 .get_region(region_id)
1173 .context(RegionNotFoundSnafu { region_id })?;
1174 ensure!(
1175 region.is_flushable(),
1176 FlushableRegionStateSnafu {
1177 region_id,
1178 state: region.state(),
1179 }
1180 );
1181 Ok(region)
1182 }
1183
1184 pub(crate) fn flushable_region_or<F: OnFailure>(
1188 &self,
1189 region_id: RegionId,
1190 cb: &mut F,
1191 ) -> Option<MitoRegionRef> {
1192 match self.flushable_region(region_id) {
1193 Ok(region) => Some(region),
1194 Err(e) => {
1195 cb.on_failure(e);
1196 None
1197 }
1198 }
1199 }
1200
1201 pub(crate) fn remove_region(&self, region_id: RegionId) {
1203 let mut regions = self.regions.write().unwrap();
1204 regions.remove(®ion_id);
1205 }
1206
1207 pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1209 let regions = self.regions.read().unwrap();
1210 regions.values().cloned().collect()
1211 }
1212
1213 pub(crate) fn clear(&self) {
1215 self.regions.write().unwrap().clear();
1216 }
1217}
1218
1219pub(crate) type RegionMapRef = Arc<RegionMap>;
1220
1221#[derive(Debug, Default)]
1223pub(crate) struct OpeningRegions {
1224 regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1225}
1226
1227impl OpeningRegions {
1228 pub(crate) fn wait_for_opening_region(
1230 &self,
1231 region_id: RegionId,
1232 sender: OptionOutputTx,
1233 ) -> Option<OptionOutputTx> {
1234 let mut regions = self.regions.write().unwrap();
1235 match regions.entry(region_id) {
1236 Entry::Occupied(mut senders) => {
1237 senders.get_mut().push(sender);
1238 None
1239 }
1240 Entry::Vacant(_) => Some(sender),
1241 }
1242 }
1243
1244 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1246 let regions = self.regions.read().unwrap();
1247 regions.contains_key(®ion_id)
1248 }
1249
1250 pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1252 let mut regions = self.regions.write().unwrap();
1253 regions.insert(region, vec![sender]);
1254 }
1255
1256 pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1258 let mut regions = self.regions.write().unwrap();
1259 regions.remove(®ion_id).unwrap_or_default()
1260 }
1261
1262 #[cfg(test)]
1263 pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1264 let regions = self.regions.read().unwrap();
1265 if let Some(senders) = regions.get(®ion_id) {
1266 senders.len()
1267 } else {
1268 0
1269 }
1270 }
1271}
1272
1273pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1274
1275#[derive(Debug, Default)]
1277pub(crate) struct CatchupRegions {
1278 regions: RwLock<HashSet<RegionId>>,
1279}
1280
1281impl CatchupRegions {
1282 pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1284 let regions = self.regions.read().unwrap();
1285 regions.contains(®ion_id)
1286 }
1287
1288 pub(crate) fn insert_region(&self, region_id: RegionId) {
1290 let mut regions = self.regions.write().unwrap();
1291 regions.insert(region_id);
1292 }
1293
1294 pub(crate) fn remove_region(&self, region_id: RegionId) {
1296 let mut regions = self.regions.write().unwrap();
1297 regions.remove(®ion_id);
1298 }
1299}
1300
1301pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1302
1303#[derive(Default, Debug, Clone)]
1305pub struct ManifestStats {
1306 pub(crate) total_manifest_size: Arc<AtomicU64>,
1307 pub(crate) manifest_version: Arc<AtomicU64>,
1308 pub(crate) file_removed_cnt: Arc<AtomicU64>,
1309}
1310
1311impl ManifestStats {
1312 fn total_manifest_size(&self) -> u64 {
1313 self.total_manifest_size.load(Ordering::Relaxed)
1314 }
1315
1316 fn manifest_version(&self) -> u64 {
1317 self.manifest_version.load(Ordering::Relaxed)
1318 }
1319
1320 fn file_removed_cnt(&self) -> u64 {
1321 self.file_removed_cnt.load(Ordering::Relaxed)
1322 }
1323}
1324
1325pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1327 match partition_expr_str {
1328 None => Ok(None),
1329 Some("") => Ok(None),
1330 Some(json_str) => {
1331 let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1332 .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1333 Ok(expr)
1334 }
1335 }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340 use std::sync::atomic::AtomicU64;
1341 use std::sync::{Arc, Mutex};
1342
1343 use common_datasource::compression::CompressionType;
1344 use common_test_util::temp_dir::create_temp_dir;
1345 use crossbeam_utils::atomic::AtomicCell;
1346 use object_store::ObjectStore;
1347 use object_store::services::Fs;
1348 use store_api::logstore::provider::Provider;
1349 use store_api::region_engine::RegionRole;
1350 use store_api::region_request::PathType;
1351 use store_api::storage::RegionId;
1352
1353 use crate::access_layer::AccessLayer;
1354 use crate::manifest::action::RegionMetaActionList;
1355 use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1356 use crate::region::{
1357 ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1358 };
1359 use crate::sst::FormatType;
1360 use crate::sst::index::intermediate::IntermediateManager;
1361 use crate::sst::index::puffin_manager::PuffinManagerFactory;
1362 use crate::test_util::scheduler_util::SchedulerEnv;
1363 use crate::test_util::version_util::VersionControlBuilder;
1364 use crate::time_provider::StdTimeProvider;
1365
1366 #[test]
1367 fn test_region_state_lock_free() {
1368 assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1369 }
1370
1371 #[tokio::test]
1372 async fn test_set_region_state() {
1373 let env = SchedulerEnv::new().await;
1374 let builder = VersionControlBuilder::new();
1375 let version_control = Arc::new(builder.build());
1376 let manifest_ctx = env
1377 .mock_manifest_context(version_control.current().version.metadata.clone())
1378 .await;
1379
1380 let region_id = RegionId::new(1024, 0);
1381 manifest_ctx.set_role(RegionRole::Follower, region_id);
1383 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1384
1385 manifest_ctx.set_role(RegionRole::Leader, region_id);
1387 assert_eq!(
1388 manifest_ctx.state.load(),
1389 RegionRoleState::Leader(RegionLeaderState::Writable)
1390 );
1391
1392 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1394 assert_eq!(
1395 manifest_ctx.state.load(),
1396 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1397 );
1398
1399 manifest_ctx.set_role(RegionRole::Follower, region_id);
1401 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1402
1403 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1405 assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1406
1407 manifest_ctx.set_role(RegionRole::Leader, region_id);
1409 manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1410 assert_eq!(
1411 manifest_ctx.state.load(),
1412 RegionRoleState::Leader(RegionLeaderState::Downgrading)
1413 );
1414
1415 manifest_ctx.set_role(RegionRole::Leader, region_id);
1417 assert_eq!(
1418 manifest_ctx.state.load(),
1419 RegionRoleState::Leader(RegionLeaderState::Writable)
1420 );
1421 }
1422
1423 #[tokio::test]
1424 async fn test_staging_state_validation() {
1425 let env = SchedulerEnv::new().await;
1426 let builder = VersionControlBuilder::new();
1427 let version_control = Arc::new(builder.build());
1428
1429 let staging_ctx = {
1431 let manager = RegionManifestManager::new(
1432 version_control.current().version.metadata.clone(),
1433 0,
1434 RegionManifestOptions {
1435 manifest_dir: "".to_string(),
1436 object_store: env.access_layer.object_store().clone(),
1437 compress_type: CompressionType::Uncompressed,
1438 checkpoint_distance: 10,
1439 remove_file_options: Default::default(),
1440 manifest_cache: None,
1441 },
1442 FormatType::PrimaryKey,
1443 &Default::default(),
1444 )
1445 .await
1446 .unwrap();
1447 Arc::new(ManifestContext::new(
1448 manager,
1449 RegionRoleState::Leader(RegionLeaderState::Staging),
1450 ))
1451 };
1452
1453 assert_eq!(
1455 staging_ctx.current_state(),
1456 RegionRoleState::Leader(RegionLeaderState::Staging)
1457 );
1458
1459 let writable_ctx = env
1461 .mock_manifest_context(version_control.current().version.metadata.clone())
1462 .await;
1463
1464 assert_eq!(
1465 writable_ctx.current_state(),
1466 RegionRoleState::Leader(RegionLeaderState::Writable)
1467 );
1468 }
1469
1470 #[tokio::test]
1471 async fn test_staging_state_transitions() {
1472 let builder = VersionControlBuilder::new();
1473 let version_control = Arc::new(builder.build());
1474 let metadata = version_control.current().version.metadata.clone();
1475
1476 let temp_dir = create_temp_dir("");
1478 let path_str = temp_dir.path().display().to_string();
1479 let fs_builder = Fs::default().root(&path_str);
1480 let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1481
1482 let index_aux_path = temp_dir.path().join("index_aux");
1483 let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1484 .await
1485 .unwrap();
1486 let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1487 .await
1488 .unwrap();
1489
1490 let access_layer = Arc::new(AccessLayer::new(
1491 "",
1492 PathType::Bare,
1493 object_store,
1494 puffin_mgr,
1495 intm_mgr,
1496 ));
1497
1498 let manager = RegionManifestManager::new(
1499 metadata.clone(),
1500 0,
1501 RegionManifestOptions {
1502 manifest_dir: "".to_string(),
1503 object_store: access_layer.object_store().clone(),
1504 compress_type: CompressionType::Uncompressed,
1505 checkpoint_distance: 10,
1506 remove_file_options: Default::default(),
1507 manifest_cache: None,
1508 },
1509 FormatType::PrimaryKey,
1510 &Default::default(),
1511 )
1512 .await
1513 .unwrap();
1514
1515 let manifest_ctx = Arc::new(ManifestContext::new(
1516 manager,
1517 RegionRoleState::Leader(RegionLeaderState::Writable),
1518 ));
1519
1520 let region = MitoRegion {
1521 region_id: metadata.region_id,
1522 version_control,
1523 access_layer,
1524 manifest_ctx: manifest_ctx.clone(),
1525 file_purger: crate::test_util::new_noop_file_purger(),
1526 provider: Provider::noop_provider(),
1527 last_flush_millis: Default::default(),
1528 last_compaction_millis: Default::default(),
1529 time_provider: Arc::new(StdTimeProvider),
1530 topic_latest_entry_id: Default::default(),
1531 written_bytes: Arc::new(AtomicU64::new(0)),
1532 stats: ManifestStats::default(),
1533 staging_partition_expr: Mutex::new(None),
1534 };
1535
1536 assert_eq!(
1538 region.state(),
1539 RegionRoleState::Leader(RegionLeaderState::Writable)
1540 );
1541 assert!(!region.is_staging());
1542
1543 let mut manager = manifest_ctx.manifest_manager.write().await;
1545 region.set_staging(&mut manager).await.unwrap();
1546 drop(manager);
1547 assert_eq!(
1548 region.state(),
1549 RegionRoleState::Leader(RegionLeaderState::Staging)
1550 );
1551 assert!(region.is_staging());
1552
1553 region.exit_staging().unwrap();
1555 assert_eq!(
1556 region.state(),
1557 RegionRoleState::Leader(RegionLeaderState::Writable)
1558 );
1559 assert!(!region.is_staging());
1560
1561 {
1563 let manager = manifest_ctx.manifest_manager.write().await;
1565 let dummy_actions = RegionMetaActionList::new(vec![]);
1566 let dummy_bytes = dummy_actions.encode().unwrap();
1567
1568 manager.store().save(100, &dummy_bytes, true).await.unwrap();
1570 manager.store().save(101, &dummy_bytes, true).await.unwrap();
1571 drop(manager);
1572
1573 let manager = manifest_ctx.manifest_manager.read().await;
1575 let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1576 assert_eq!(
1577 dirty_manifests.len(),
1578 2,
1579 "Should have 2 dirty staging files"
1580 );
1581 drop(manager);
1582
1583 let mut manager = manifest_ctx.manifest_manager.write().await;
1585 region.set_staging(&mut manager).await.unwrap();
1586 drop(manager);
1587
1588 let manager = manifest_ctx.manifest_manager.read().await;
1590 let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1591 assert_eq!(
1592 cleaned_manifests.len(),
1593 0,
1594 "Dirty staging files should be cleaned up"
1595 );
1596 drop(manager);
1597
1598 region.exit_staging().unwrap();
1600 }
1601
1602 let mut manager = manifest_ctx.manifest_manager.write().await;
1604 assert!(region.set_staging(&mut manager).await.is_ok()); drop(manager);
1606 let mut manager = manifest_ctx.manifest_manager.write().await;
1607 assert!(region.set_staging(&mut manager).await.is_err()); drop(manager);
1609 assert!(region.exit_staging().is_ok()); assert!(region.exit_staging().is_err()); }
1612}