mito2/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Mito region.
16
17pub mod catchup;
18pub mod opener;
19pub mod options;
20pub mod utils;
21pub(crate) mod version;
22
23use std::collections::hash_map::Entry;
24use std::collections::{HashMap, HashSet};
25use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
26use std::sync::{Arc, Mutex, RwLock};
27
28use common_telemetry::{error, info, warn};
29use crossbeam_utils::atomic::AtomicCell;
30use partition::expr::PartitionExpr;
31use snafu::{OptionExt, ResultExt, ensure};
32use store_api::ManifestVersion;
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::logstore::provider::Provider;
35use store_api::metadata::RegionMetadataRef;
36use store_api::region_engine::{
37    RegionManifestInfo, RegionRole, RegionStatistic, SettableRegionRoleState,
38};
39use store_api::region_request::PathType;
40use store_api::sst_entry::ManifestSstEntry;
41use store_api::storage::{FileId, RegionId, SequenceNumber};
42use tokio::sync::RwLockWriteGuard;
43pub use utils::*;
44
45use crate::access_layer::AccessLayerRef;
46use crate::error::{
47    FlushableRegionStateSnafu, InvalidPartitionExprSnafu, RegionNotFoundSnafu, RegionStateSnafu,
48    RegionTruncatedSnafu, Result, UnexpectedSnafu, UpdateManifestSnafu,
49};
50use crate::manifest::action::{
51    RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
52};
53use crate::manifest::manager::RegionManifestManager;
54use crate::region::version::{VersionControlRef, VersionRef};
55use crate::request::{OnFailure, OptionOutputTx};
56use crate::sst::file::FileMeta;
57use crate::sst::file_purger::FilePurgerRef;
58use crate::sst::location::{index_file_path, sst_file_path};
59use crate::time_provider::TimeProviderRef;
60
61/// This is the approximate factor to estimate the size of wal.
62const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
63
64/// Region status include region id, memtable usage, sst usage, wal usage and manifest usage.
65#[derive(Debug)]
66pub struct RegionUsage {
67    pub region_id: RegionId,
68    pub wal_usage: u64,
69    pub sst_usage: u64,
70    pub manifest_usage: u64,
71}
72
73impl RegionUsage {
74    pub fn disk_usage(&self) -> u64 {
75        self.wal_usage + self.sst_usage + self.manifest_usage
76    }
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
80pub enum RegionLeaderState {
81    /// The region is opened and is writable.
82    Writable,
83    /// The region is in staging mode - writable but no checkpoint/compaction.
84    Staging,
85    /// The region is entering staging mode. - write requests will be stalled.
86    EnteringStaging,
87    /// The region is altering.
88    Altering,
89    /// The region is dropping.
90    Dropping,
91    /// The region is truncating.
92    Truncating,
93    /// The region is handling a region edit.
94    Editing,
95    /// The region is stepping down.
96    Downgrading,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum RegionRoleState {
101    Leader(RegionLeaderState),
102    Follower,
103}
104
105impl RegionRoleState {
106    /// Converts the region role state to leader state if it is a leader state.
107    pub fn into_leader_state(self) -> Option<RegionLeaderState> {
108        match self {
109            RegionRoleState::Leader(leader_state) => Some(leader_state),
110            RegionRoleState::Follower => None,
111        }
112    }
113}
114
115/// Metadata and runtime status of a region.
116///
117/// Writing and reading a region follow a single-writer-multi-reader rule:
118/// - Only the region worker thread this region belongs to can modify the metadata.
119/// - Multiple reader threads are allowed to read a specific `version` of a region.
120#[derive(Debug)]
121pub struct MitoRegion {
122    /// Id of this region.
123    ///
124    /// Accessing region id from the version control is inconvenient so
125    /// we also store it here.
126    pub(crate) region_id: RegionId,
127
128    /// Version controller for this region.
129    ///
130    /// We MUST update the version control inside the write lock of the region manifest manager.
131    pub(crate) version_control: VersionControlRef,
132    /// SSTs accessor for this region.
133    pub(crate) access_layer: AccessLayerRef,
134    /// Context to maintain manifest for this region.
135    pub(crate) manifest_ctx: ManifestContextRef,
136    /// SST file purger.
137    pub(crate) file_purger: FilePurgerRef,
138    /// The provider of log store.
139    pub(crate) provider: Provider,
140    /// Last flush time in millis.
141    last_flush_millis: AtomicI64,
142    /// Last compaction time in millis.
143    last_compaction_millis: AtomicI64,
144    /// Provider to get current time.
145    time_provider: TimeProviderRef,
146    /// The topic's latest entry id since the region's last flushing.
147    /// **Only used for remote WAL pruning.**
148    ///
149    /// The value will be updated to the latest offset of the topic
150    /// if region receives a flush request or schedules a periodic flush task
151    /// and the region's memtable is empty.
152    ///
153    /// There are no WAL entries in range [flushed_entry_id, topic_latest_entry_id] for current region,
154    /// which means these WAL entries maybe able to be pruned up to `topic_latest_entry_id`.
155    pub(crate) topic_latest_entry_id: AtomicU64,
156    /// The total bytes written to the region.
157    pub(crate) written_bytes: Arc<AtomicU64>,
158    /// The partition expression of the region in staging mode.
159    ///
160    /// During the staging mode, the region metadata in [`VersionControlRef`] is not updated,
161    /// so we need to store the partition expression separately.
162    /// TODO(weny):
163    /// 1. Reload the staging partition expr during region open.
164    /// 2. Rejects requests with mismatching partition expr.
165    pub(crate) staging_partition_expr: Mutex<Option<String>>,
166    /// manifest stats
167    stats: ManifestStats,
168}
169
170pub type MitoRegionRef = Arc<MitoRegion>;
171
172impl MitoRegion {
173    /// Stop background managers for this region.
174    pub(crate) async fn stop(&self) {
175        self.manifest_ctx
176            .manifest_manager
177            .write()
178            .await
179            .stop()
180            .await;
181
182        info!(
183            "Stopped region manifest manager, region_id: {}",
184            self.region_id
185        );
186    }
187
188    /// Returns current metadata of the region.
189    pub(crate) fn metadata(&self) -> RegionMetadataRef {
190        let version_data = self.version_control.current();
191        version_data.version.metadata.clone()
192    }
193
194    /// Returns primary key encoding of the region.
195    pub(crate) fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
196        let version_data = self.version_control.current();
197        version_data.version.metadata.primary_key_encoding
198    }
199
200    /// Returns current version of the region.
201    pub(crate) fn version(&self) -> VersionRef {
202        let version_data = self.version_control.current();
203        version_data.version
204    }
205
206    /// Returns last flush timestamp in millis.
207    pub(crate) fn last_flush_millis(&self) -> i64 {
208        self.last_flush_millis.load(Ordering::Relaxed)
209    }
210
211    /// Update flush time to current time.
212    pub(crate) fn update_flush_millis(&self) {
213        let now = self.time_provider.current_time_millis();
214        self.last_flush_millis.store(now, Ordering::Relaxed);
215    }
216
217    /// Returns last compaction timestamp in millis.
218    pub(crate) fn last_compaction_millis(&self) -> i64 {
219        self.last_compaction_millis.load(Ordering::Relaxed)
220    }
221
222    /// Update compaction time to current time.
223    pub(crate) fn update_compaction_millis(&self) {
224        let now = self.time_provider.current_time_millis();
225        self.last_compaction_millis.store(now, Ordering::Relaxed);
226    }
227
228    /// Returns the table dir.
229    pub(crate) fn table_dir(&self) -> &str {
230        self.access_layer.table_dir()
231    }
232
233    /// Returns the path type of the region.
234    pub(crate) fn path_type(&self) -> PathType {
235        self.access_layer.path_type()
236    }
237
238    /// Returns whether the region is writable.
239    pub(crate) fn is_writable(&self) -> bool {
240        matches!(
241            self.manifest_ctx.state.load(),
242            RegionRoleState::Leader(RegionLeaderState::Writable)
243                | RegionRoleState::Leader(RegionLeaderState::Staging)
244        )
245    }
246
247    /// Returns whether the region is flushable.
248    pub(crate) fn is_flushable(&self) -> bool {
249        matches!(
250            self.manifest_ctx.state.load(),
251            RegionRoleState::Leader(RegionLeaderState::Writable)
252                | RegionRoleState::Leader(RegionLeaderState::Staging)
253                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
254        )
255    }
256
257    /// Returns whether the region should abort index building.
258    pub(crate) fn should_abort_index(&self) -> bool {
259        matches!(
260            self.manifest_ctx.state.load(),
261            RegionRoleState::Follower
262                | RegionRoleState::Leader(RegionLeaderState::Dropping)
263                | RegionRoleState::Leader(RegionLeaderState::Truncating)
264                | RegionRoleState::Leader(RegionLeaderState::Downgrading)
265                | RegionRoleState::Leader(RegionLeaderState::Staging)
266        )
267    }
268
269    /// Returns whether the region is downgrading.
270    pub(crate) fn is_downgrading(&self) -> bool {
271        matches!(
272            self.manifest_ctx.state.load(),
273            RegionRoleState::Leader(RegionLeaderState::Downgrading)
274        )
275    }
276
277    /// Returns whether the region is in staging mode.
278    #[allow(dead_code)]
279    pub(crate) fn is_staging(&self) -> bool {
280        self.manifest_ctx.state.load() == RegionRoleState::Leader(RegionLeaderState::Staging)
281    }
282
283    pub fn region_id(&self) -> RegionId {
284        self.region_id
285    }
286
287    pub fn find_committed_sequence(&self) -> SequenceNumber {
288        self.version_control.committed_sequence()
289    }
290
291    /// Returns whether the region is readonly.
292    pub fn is_follower(&self) -> bool {
293        self.manifest_ctx.state.load() == RegionRoleState::Follower
294    }
295
296    /// Returns the state of the region.
297    pub(crate) fn state(&self) -> RegionRoleState {
298        self.manifest_ctx.state.load()
299    }
300
301    /// Sets the region role state.
302    pub(crate) fn set_role(&self, next_role: RegionRole) {
303        self.manifest_ctx.set_role(next_role, self.region_id);
304    }
305
306    /// Sets the altering state.
307    /// You should call this method in the worker loop.
308    pub(crate) fn set_altering(&self) -> Result<()> {
309        self.compare_exchange_state(
310            RegionLeaderState::Writable,
311            RegionRoleState::Leader(RegionLeaderState::Altering),
312        )
313    }
314
315    /// Sets the dropping state.
316    /// You should call this method in the worker loop.
317    pub(crate) fn set_dropping(&self) -> Result<()> {
318        self.compare_exchange_state(
319            RegionLeaderState::Writable,
320            RegionRoleState::Leader(RegionLeaderState::Dropping),
321        )
322    }
323
324    /// Sets the truncating state.
325    /// You should call this method in the worker loop.
326    pub(crate) fn set_truncating(&self) -> Result<()> {
327        self.compare_exchange_state(
328            RegionLeaderState::Writable,
329            RegionRoleState::Leader(RegionLeaderState::Truncating),
330        )
331    }
332
333    /// Sets the editing state.
334    /// You should call this method in the worker loop.
335    pub(crate) fn set_editing(&self, expect: RegionLeaderState) -> Result<()> {
336        self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Editing))
337    }
338
339    /// Sets the staging state.
340    ///
341    /// You should call this method in the worker loop.
342    /// Transitions from Writable to Staging state.
343    /// Cleans any existing staging manifests before entering staging mode.
344    pub(crate) async fn set_staging(
345        &self,
346        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
347    ) -> Result<()> {
348        manager.store().clear_staging_manifests().await?;
349
350        self.compare_exchange_state(
351            RegionLeaderState::Writable,
352            RegionRoleState::Leader(RegionLeaderState::Staging),
353        )
354    }
355
356    /// Sets the entering staging state.
357    pub(crate) fn set_entering_staging(&self) -> Result<()> {
358        self.compare_exchange_state(
359            RegionLeaderState::Writable,
360            RegionRoleState::Leader(RegionLeaderState::EnteringStaging),
361        )
362    }
363
364    /// Exits the staging state back to writable.
365    ///
366    /// You should call this method in the worker loop.
367    /// Transitions from Staging to Writable state.
368    pub fn exit_staging(&self) -> Result<()> {
369        *self.staging_partition_expr.lock().unwrap() = None;
370        self.compare_exchange_state(
371            RegionLeaderState::Staging,
372            RegionRoleState::Leader(RegionLeaderState::Writable),
373        )
374    }
375
376    /// Sets the region role state gracefully. This acquires the manifest write lock.
377    pub(crate) async fn set_role_state_gracefully(
378        &self,
379        state: SettableRegionRoleState,
380    ) -> Result<()> {
381        let mut manager: RwLockWriteGuard<'_, RegionManifestManager> =
382            self.manifest_ctx.manifest_manager.write().await;
383        let current_state = self.state();
384
385        match state {
386            SettableRegionRoleState::Leader => {
387                // Exit staging mode and return to normal writable leader
388                // Only allowed from staging state
389                match current_state {
390                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
391                        info!("Exiting staging mode for region {}", self.region_id);
392                        // Use the success exit path that merges all staged manifests
393                        self.exit_staging_on_success(&mut manager).await?;
394                    }
395                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
396                        // Already in desired state - no-op
397                        info!("Region {} already in normal leader mode", self.region_id);
398                    }
399                    _ => {
400                        // Only staging -> leader transition is allowed
401                        return Err(RegionStateSnafu {
402                            region_id: self.region_id,
403                            state: current_state,
404                            expect: RegionRoleState::Leader(RegionLeaderState::Staging),
405                        }
406                        .build());
407                    }
408                }
409            }
410
411            SettableRegionRoleState::StagingLeader => {
412                // Enter staging mode from normal writable leader
413                // Only allowed from writable leader state
414                match current_state {
415                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
416                        info!("Entering staging mode for region {}", self.region_id);
417                        self.set_staging(&mut manager).await?;
418                    }
419                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
420                        // Already in desired state - no-op
421                        info!("Region {} already in staging mode", self.region_id);
422                    }
423                    _ => {
424                        return Err(RegionStateSnafu {
425                            region_id: self.region_id,
426                            state: current_state,
427                            expect: RegionRoleState::Leader(RegionLeaderState::Writable),
428                        }
429                        .build());
430                    }
431                }
432            }
433
434            SettableRegionRoleState::Follower => {
435                // Make this region a follower
436                match current_state {
437                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
438                        info!(
439                            "Exiting staging and demoting region {} to follower",
440                            self.region_id
441                        );
442                        self.exit_staging()?;
443                        self.set_role(RegionRole::Follower);
444                    }
445                    RegionRoleState::Leader(_) => {
446                        info!("Demoting region {} from leader to follower", self.region_id);
447                        self.set_role(RegionRole::Follower);
448                    }
449                    RegionRoleState::Follower => {
450                        // Already in desired state - no-op
451                        info!("Region {} already in follower mode", self.region_id);
452                    }
453                }
454            }
455
456            SettableRegionRoleState::DowngradingLeader => {
457                // downgrade this region to downgrading leader
458                match current_state {
459                    RegionRoleState::Leader(RegionLeaderState::Staging) => {
460                        info!(
461                            "Exiting staging and entering downgrade for region {}",
462                            self.region_id
463                        );
464                        self.exit_staging()?;
465                        self.set_role(RegionRole::DowngradingLeader);
466                    }
467                    RegionRoleState::Leader(RegionLeaderState::Writable) => {
468                        info!("Starting downgrade for region {}", self.region_id);
469                        self.set_role(RegionRole::DowngradingLeader);
470                    }
471                    RegionRoleState::Leader(RegionLeaderState::Downgrading) => {
472                        // Already in desired state - no-op
473                        info!("Region {} already in downgrading mode", self.region_id);
474                    }
475                    _ => {
476                        warn!(
477                            "Cannot start downgrade for region {} from state {:?}",
478                            self.region_id, current_state
479                        );
480                    }
481                }
482            }
483        }
484
485        // Hack(zhongzc): If we have just become leader (writable), persist any backfilled metadata.
486        if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
487            // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
488            let manifest_meta = &manager.manifest().metadata;
489            let current_version = self.version();
490            let current_meta = &current_version.metadata;
491            if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
492                let action = RegionMetaAction::Change(RegionChange {
493                    metadata: current_meta.clone(),
494                    sst_format: current_version.options.sst_format.unwrap_or_default(),
495                });
496                let result = manager
497                    .update(RegionMetaActionList::with_action(action), false)
498                    .await;
499
500                match result {
501                    Ok(version) => {
502                        info!(
503                            "Successfully persisted backfilled metadata for region {}, version: {}",
504                            self.region_id, version
505                        );
506                    }
507                    Err(e) => {
508                        warn!(e; "Failed to persist backfilled metadata for region {}", self.region_id);
509                    }
510                }
511            }
512        }
513
514        drop(manager);
515
516        Ok(())
517    }
518
519    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Writable)` if the current state is `expect`.
520    /// Otherwise, logs an error.
521    pub(crate) fn switch_state_to_writable(&self, expect: RegionLeaderState) {
522        if let Err(e) = self
523            .compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Writable))
524        {
525            error!(e; "failed to switch region state to writable, expect state is {:?}", expect);
526        }
527    }
528
529    /// Switches the region state to `RegionRoleState::Leader(RegionLeaderState::Staging)` if the current state is `expect`.
530    /// Otherwise, logs an error.
531    pub(crate) fn switch_state_to_staging(&self, expect: RegionLeaderState) {
532        if let Err(e) =
533            self.compare_exchange_state(expect, RegionRoleState::Leader(RegionLeaderState::Staging))
534        {
535            error!(e; "failed to switch region state to staging, expect state is {:?}", expect);
536        }
537    }
538
539    /// Returns the region statistic.
540    pub(crate) fn region_statistic(&self) -> RegionStatistic {
541        let version = self.version();
542        let memtables = &version.memtables;
543        let memtable_usage = (memtables.mutable_usage() + memtables.immutables_usage()) as u64;
544
545        let sst_usage = version.ssts.sst_usage();
546        let index_usage = version.ssts.index_usage();
547        let flushed_entry_id = version.flushed_entry_id;
548
549        let wal_usage = self.estimated_wal_usage(memtable_usage);
550        let manifest_usage = self.stats.total_manifest_size();
551        let num_rows = version.ssts.num_rows() + version.memtables.num_rows();
552        let num_files = version.ssts.num_files();
553        let manifest_version = self.stats.manifest_version();
554        let file_removed_cnt = self.stats.file_removed_cnt();
555
556        let topic_latest_entry_id = self.topic_latest_entry_id.load(Ordering::Relaxed);
557        let written_bytes = self.written_bytes.load(Ordering::Relaxed);
558
559        RegionStatistic {
560            num_rows,
561            memtable_size: memtable_usage,
562            wal_size: wal_usage,
563            manifest_size: manifest_usage,
564            sst_size: sst_usage,
565            sst_num: num_files,
566            index_size: index_usage,
567            manifest: RegionManifestInfo::Mito {
568                manifest_version,
569                flushed_entry_id,
570                file_removed_cnt,
571            },
572            data_topic_latest_entry_id: topic_latest_entry_id,
573            metadata_topic_latest_entry_id: topic_latest_entry_id,
574            written_bytes,
575        }
576    }
577
578    /// Estimated WAL size in bytes.
579    /// Use the memtables size to estimate the size of wal.
580    fn estimated_wal_usage(&self, memtable_usage: u64) -> u64 {
581        ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64
582    }
583
584    /// Sets the state of the region to given state if the current state equals to
585    /// the expected.
586    fn compare_exchange_state(
587        &self,
588        expect: RegionLeaderState,
589        state: RegionRoleState,
590    ) -> Result<()> {
591        self.manifest_ctx
592            .state
593            .compare_exchange(RegionRoleState::Leader(expect), state)
594            .map_err(|actual| {
595                RegionStateSnafu {
596                    region_id: self.region_id,
597                    state: actual,
598                    expect: RegionRoleState::Leader(expect),
599                }
600                .build()
601            })?;
602        Ok(())
603    }
604
605    pub fn access_layer(&self) -> AccessLayerRef {
606        self.access_layer.clone()
607    }
608
609    /// Returns the SST entries of the region.
610    pub async fn manifest_sst_entries(&self) -> Vec<ManifestSstEntry> {
611        let table_dir = self.table_dir();
612        let path_type = self.access_layer.path_type();
613
614        let visible_ssts = self
615            .version()
616            .ssts
617            .levels()
618            .iter()
619            .flat_map(|level| level.files().map(|file| file.file_id().file_id()))
620            .collect::<HashSet<_>>();
621
622        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
623        let staging_files = self
624            .manifest_ctx
625            .staging_manifest()
626            .await
627            .map(|m| m.files.clone())
628            .unwrap_or_default();
629        let files = manifest_files
630            .into_iter()
631            .chain(staging_files.into_iter())
632            .collect::<HashMap<_, _>>();
633
634        files
635            .values()
636            .map(|meta| {
637                let region_id = self.region_id;
638                let origin_region_id = meta.region_id;
639                let (index_version, index_file_path, index_file_size) = if meta.index_file_size > 0
640                {
641                    let index_file_path = index_file_path(table_dir, meta.index_id(), path_type);
642                    (
643                        meta.index_version,
644                        Some(index_file_path),
645                        Some(meta.index_file_size),
646                    )
647                } else {
648                    (0, None, None)
649                };
650                let visible = visible_ssts.contains(&meta.file_id);
651                ManifestSstEntry {
652                    table_dir: table_dir.to_string(),
653                    region_id,
654                    table_id: region_id.table_id(),
655                    region_number: region_id.region_number(),
656                    region_group: region_id.region_group(),
657                    region_sequence: region_id.region_sequence(),
658                    file_id: meta.file_id.to_string(),
659                    index_version,
660                    level: meta.level,
661                    file_path: sst_file_path(table_dir, meta.file_id(), path_type),
662                    file_size: meta.file_size,
663                    index_file_path,
664                    index_file_size,
665                    num_rows: meta.num_rows,
666                    num_row_groups: meta.num_row_groups,
667                    num_series: Some(meta.num_series),
668                    min_ts: meta.time_range.0,
669                    max_ts: meta.time_range.1,
670                    sequence: meta.sequence.map(|s| s.get()),
671                    origin_region_id,
672                    node_id: None,
673                    visible,
674                }
675            })
676            .collect()
677    }
678
679    /// Returns the file metas of the region by file ids.
680    pub async fn file_metas(&self, file_ids: &[FileId]) -> Vec<Option<FileMeta>> {
681        let manifest_files = self.manifest_ctx.manifest().await.files.clone();
682
683        file_ids
684            .iter()
685            .map(|file_id| manifest_files.get(file_id).cloned())
686            .collect::<Vec<_>>()
687    }
688
689    /// Exit staging mode successfully by merging all staged manifests and making them visible.
690    pub(crate) async fn exit_staging_on_success(
691        &self,
692        manager: &mut RwLockWriteGuard<'_, RegionManifestManager>,
693    ) -> Result<()> {
694        let current_state = self.manifest_ctx.current_state();
695        ensure!(
696            current_state == RegionRoleState::Leader(RegionLeaderState::Staging),
697            RegionStateSnafu {
698                region_id: self.region_id,
699                state: current_state,
700                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
701            }
702        );
703
704        // Merge all staged manifest actions
705        let merged_actions = match manager.merge_staged_actions(current_state).await? {
706            Some(actions) => actions,
707            None => {
708                info!(
709                    "No staged manifests to merge for region {}, exiting staging mode without changes",
710                    self.region_id
711                );
712                // Even if no manifests to merge, we still need to exit staging mode
713                self.exit_staging()?;
714                return Ok(());
715            }
716        };
717        let expect_change = merged_actions.actions.iter().any(|a| a.is_change());
718        let expect_edit = merged_actions.actions.iter().any(|a| a.is_edit());
719        ensure!(
720            expect_change,
721            UnexpectedSnafu {
722                reason: "expect a change action in merged actions"
723            }
724        );
725        ensure!(
726            expect_edit,
727            UnexpectedSnafu {
728                reason: "expect an edit action in merged actions"
729            }
730        );
731
732        // Submit merged actions using the manifest manager's update method
733        // Pass the `false` so it saves to normal directory, not staging
734        let new_version = manager.update(merged_actions.clone(), false).await?;
735
736        info!(
737            "Successfully submitted merged staged manifests for region {}, new version: {}",
738            self.region_id, new_version
739        );
740
741        // Apply the merged changes to in-memory version control
742        let (merged_change, merged_edit) = merged_actions.split_region_change_and_edit();
743        // Safety: we have already ensured that there is a change action in the merged actions.
744        let new_metadata = merged_change.as_ref().unwrap().metadata.clone();
745        self.version_control.alter_schema(new_metadata);
746        self.version_control
747            .apply_edit(Some(merged_edit), &[], self.file_purger.clone());
748
749        // Clear all staging manifests and transit state
750        if let Err(e) = manager.clear_staging_manifest_and_dir().await {
751            error!(e; "Failed to clear staging manifest dir for region {}", self.region_id);
752        }
753        self.exit_staging()?;
754
755        Ok(())
756    }
757
758    /// Returns the partition expression string for this region.
759    ///
760    /// If the region is currently in staging state, this returns the partition expression held in
761    /// the staging partition field. Otherwise, it returns the partition expression from the primary
762    /// region metadata (current committed version).
763    pub fn maybe_staging_partition_expr_str(&self) -> Option<String> {
764        let is_staging = self.is_staging();
765        if is_staging {
766            let staging_partition_expr = self.staging_partition_expr.lock().unwrap();
767            if staging_partition_expr.is_none() {
768                warn!(
769                    "Staging partition expr is none for region {} in staging state",
770                    self.region_id
771                );
772            }
773            staging_partition_expr.clone()
774        } else {
775            let version = self.version();
776            version.metadata.partition_expr.clone()
777        }
778    }
779}
780
781/// Context to update the region manifest.
782#[derive(Debug)]
783pub(crate) struct ManifestContext {
784    /// Manager to maintain manifest for this region.
785    pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
786    /// The state of the region. The region checks the state before updating
787    /// manifest.
788    state: AtomicCell<RegionRoleState>,
789}
790
791impl ManifestContext {
792    pub(crate) fn new(manager: RegionManifestManager, state: RegionRoleState) -> Self {
793        ManifestContext {
794            manifest_manager: tokio::sync::RwLock::new(manager),
795            state: AtomicCell::new(state),
796        }
797    }
798
799    pub(crate) async fn manifest_version(&self) -> ManifestVersion {
800        self.manifest_manager
801            .read()
802            .await
803            .manifest()
804            .manifest_version
805    }
806
807    pub(crate) async fn has_update(&self) -> Result<bool> {
808        self.manifest_manager.read().await.has_update().await
809    }
810
811    /// Returns the current region role state.
812    pub(crate) fn current_state(&self) -> RegionRoleState {
813        self.state.load()
814    }
815
816    /// Installs the manifest changes from the current version to the target version (inclusive).
817    ///
818    /// Returns installed [RegionManifest].
819    /// **Note**: This function is not guaranteed to install the target version strictly.
820    /// The installed version may be greater than the target version.
821    pub(crate) async fn install_manifest_to(
822        &self,
823        version: ManifestVersion,
824    ) -> Result<Arc<RegionManifest>> {
825        let mut manager = self.manifest_manager.write().await;
826        manager.install_manifest_to(version).await?;
827
828        Ok(manager.manifest())
829    }
830
831    /// Updates the manifest if current state is `expect_state`.
832    pub(crate) async fn update_manifest(
833        &self,
834        expect_state: RegionLeaderState,
835        action_list: RegionMetaActionList,
836        is_staging: bool,
837    ) -> Result<ManifestVersion> {
838        // Acquires the write lock of the manifest manager.
839        let mut manager = self.manifest_manager.write().await;
840        // Gets current manifest.
841        let manifest = manager.manifest();
842        // Checks state inside the lock. This is to ensure that we won't update the manifest
843        // after `set_readonly_gracefully()` is called.
844        let current_state = self.state.load();
845
846        // If expect_state is not downgrading, the current state must be either `expect_state` or downgrading.
847        //
848        // A downgrading leader rejects user writes but still allows
849        // flushing the memtable and updating the manifest.
850        if expect_state != RegionLeaderState::Downgrading {
851            if current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading) {
852                info!(
853                    "Region {} is in downgrading leader state, updating manifest. state is {:?}",
854                    manifest.metadata.region_id, expect_state
855                );
856            }
857            ensure!(
858                current_state == RegionRoleState::Leader(expect_state)
859                    || current_state == RegionRoleState::Leader(RegionLeaderState::Downgrading),
860                UpdateManifestSnafu {
861                    region_id: manifest.metadata.region_id,
862                    state: current_state,
863                }
864            );
865        } else {
866            ensure!(
867                current_state == RegionRoleState::Leader(expect_state),
868                RegionStateSnafu {
869                    region_id: manifest.metadata.region_id,
870                    state: current_state,
871                    expect: RegionRoleState::Leader(expect_state),
872                }
873            );
874        }
875
876        for action in &action_list.actions {
877            // Checks whether the edit is still applicable.
878            let RegionMetaAction::Edit(edit) = &action else {
879                continue;
880            };
881
882            // Checks whether the region is truncated.
883            let Some(truncated_entry_id) = manifest.truncated_entry_id else {
884                continue;
885            };
886
887            // This is an edit from flush.
888            if let Some(flushed_entry_id) = edit.flushed_entry_id {
889                ensure!(
890                    truncated_entry_id < flushed_entry_id,
891                    RegionTruncatedSnafu {
892                        region_id: manifest.metadata.region_id,
893                    }
894                );
895            }
896
897            // This is an edit from compaction.
898            if !edit.files_to_remove.is_empty() {
899                // Input files of the compaction task has been truncated.
900                for file in &edit.files_to_remove {
901                    ensure!(
902                        manifest.files.contains_key(&file.file_id),
903                        RegionTruncatedSnafu {
904                            region_id: manifest.metadata.region_id,
905                        }
906                    );
907                }
908            }
909        }
910
911        // Now we can update the manifest.
912        let version = manager.update(action_list, is_staging).await.inspect_err(
913            |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
914        )?;
915
916        if self.state.load() == RegionRoleState::Follower {
917            warn!(
918                "Region {} becomes follower while updating manifest which may cause inconsistency, manifest version: {version}",
919                manifest.metadata.region_id
920            );
921        }
922
923        Ok(version)
924    }
925
926    /// Sets the [`RegionRole`].
927    ///
928    /// ```text
929    ///     +------------------------------------------+
930    ///     |                      +-----------------+ |
931    ///     |                      |                 | |
932    /// +---+------+       +-------+-----+        +--v-v---+
933    /// | Follower |       | Downgrading |        | Leader |
934    /// +---^-^----+       +-----+-^-----+        +--+-+---+
935    ///     | |                  | |                 | |
936    ///     | +------------------+ +-----------------+ |
937    ///     +------------------------------------------+
938    ///
939    /// Transition:
940    /// - Follower -> Leader
941    /// - Downgrading Leader -> Leader
942    /// - Leader -> Follower
943    /// - Downgrading Leader -> Follower
944    /// - Leader -> Downgrading Leader
945    ///
946    /// ```
947    pub(crate) fn set_role(&self, next_role: RegionRole, region_id: RegionId) {
948        match next_role {
949            RegionRole::Follower => {
950                match self.state.fetch_update(|state| {
951                    if !matches!(state, RegionRoleState::Follower) {
952                        Some(RegionRoleState::Follower)
953                    } else {
954                        None
955                    }
956                }) {
957                    Ok(state) => info!(
958                        "Convert region {} to follower, previous role state: {:?}",
959                        region_id, state
960                    ),
961                    Err(state) => {
962                        if state != RegionRoleState::Follower {
963                            warn!(
964                                "Failed to convert region {} to follower, current role state: {:?}",
965                                region_id, state
966                            )
967                        }
968                    }
969                }
970            }
971            RegionRole::Leader => {
972                match self.state.fetch_update(|state| {
973                    if matches!(
974                        state,
975                        RegionRoleState::Follower
976                            | RegionRoleState::Leader(RegionLeaderState::Downgrading)
977                    ) {
978                        Some(RegionRoleState::Leader(RegionLeaderState::Writable))
979                    } else {
980                        None
981                    }
982                }) {
983                    Ok(state) => info!(
984                        "Convert region {} to leader, previous role state: {:?}",
985                        region_id, state
986                    ),
987                    Err(state) => {
988                        if state != RegionRoleState::Leader(RegionLeaderState::Writable) {
989                            warn!(
990                                "Failed to convert region {} to leader, current role state: {:?}",
991                                region_id, state
992                            )
993                        }
994                    }
995                }
996            }
997            RegionRole::DowngradingLeader => {
998                match self.state.compare_exchange(
999                    RegionRoleState::Leader(RegionLeaderState::Writable),
1000                    RegionRoleState::Leader(RegionLeaderState::Downgrading),
1001                ) {
1002                    Ok(state) => info!(
1003                        "Convert region {} to downgrading region, previous role state: {:?}",
1004                        region_id, state
1005                    ),
1006                    Err(state) => {
1007                        if state != RegionRoleState::Leader(RegionLeaderState::Downgrading) {
1008                            warn!(
1009                                "Failed to convert region {} to downgrading leader, current role state: {:?}",
1010                                region_id, state
1011                            )
1012                        }
1013                    }
1014                }
1015            }
1016        }
1017    }
1018
1019    /// Returns the normal manifest of the region.
1020    pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
1021        self.manifest_manager.read().await.manifest()
1022    }
1023
1024    /// Returns the staging manifest of the region.
1025    pub(crate) async fn staging_manifest(
1026        &self,
1027    ) -> Option<Arc<crate::manifest::action::RegionManifest>> {
1028        self.manifest_manager.read().await.staging_manifest()
1029    }
1030}
1031
1032pub(crate) type ManifestContextRef = Arc<ManifestContext>;
1033
1034/// Regions indexed by ids.
1035#[derive(Debug, Default)]
1036pub(crate) struct RegionMap {
1037    regions: RwLock<HashMap<RegionId, MitoRegionRef>>,
1038}
1039
1040impl RegionMap {
1041    /// Returns true if the region exists.
1042    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1043        let regions = self.regions.read().unwrap();
1044        regions.contains_key(&region_id)
1045    }
1046
1047    /// Inserts a new region into the map.
1048    pub(crate) fn insert_region(&self, region: MitoRegionRef) {
1049        let mut regions = self.regions.write().unwrap();
1050        regions.insert(region.region_id, region);
1051    }
1052
1053    /// Gets region by region id.
1054    pub(crate) fn get_region(&self, region_id: RegionId) -> Option<MitoRegionRef> {
1055        let regions = self.regions.read().unwrap();
1056        regions.get(&region_id).cloned()
1057    }
1058
1059    /// Gets writable region by region id.
1060    ///
1061    /// Returns error if the region does not exist or is readonly.
1062    pub(crate) fn writable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1063        let region = self
1064            .get_region(region_id)
1065            .context(RegionNotFoundSnafu { region_id })?;
1066        ensure!(
1067            region.is_writable(),
1068            RegionStateSnafu {
1069                region_id,
1070                state: region.state(),
1071                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1072            }
1073        );
1074        Ok(region)
1075    }
1076
1077    /// Gets readonly region by region id.
1078    ///
1079    /// Returns error if the region does not exist or is writable.
1080    pub(crate) fn follower_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1081        let region = self
1082            .get_region(region_id)
1083            .context(RegionNotFoundSnafu { region_id })?;
1084        ensure!(
1085            region.is_follower(),
1086            RegionStateSnafu {
1087                region_id,
1088                state: region.state(),
1089                expect: RegionRoleState::Follower,
1090            }
1091        );
1092
1093        Ok(region)
1094    }
1095
1096    /// Gets region by region id.
1097    ///
1098    /// Calls the callback if the region does not exist.
1099    pub(crate) fn get_region_or<F: OnFailure>(
1100        &self,
1101        region_id: RegionId,
1102        cb: &mut F,
1103    ) -> Option<MitoRegionRef> {
1104        match self
1105            .get_region(region_id)
1106            .context(RegionNotFoundSnafu { region_id })
1107        {
1108            Ok(region) => Some(region),
1109            Err(e) => {
1110                cb.on_failure(e);
1111                None
1112            }
1113        }
1114    }
1115
1116    /// Gets writable region by region id.
1117    ///
1118    /// Calls the callback if the region does not exist or is readonly.
1119    pub(crate) fn writable_region_or<F: OnFailure>(
1120        &self,
1121        region_id: RegionId,
1122        cb: &mut F,
1123    ) -> Option<MitoRegionRef> {
1124        match self.writable_region(region_id) {
1125            Ok(region) => Some(region),
1126            Err(e) => {
1127                cb.on_failure(e);
1128                None
1129            }
1130        }
1131    }
1132
1133    /// Gets writable non-staging region by region id.
1134    ///
1135    /// Returns error if the region does not exist, is readonly, or is in staging mode.
1136    pub(crate) fn writable_non_staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1137        let region = self.writable_region(region_id)?;
1138        if region.is_staging() {
1139            return Err(crate::error::RegionStateSnafu {
1140                region_id,
1141                state: region.state(),
1142                expect: RegionRoleState::Leader(RegionLeaderState::Writable),
1143            }
1144            .build());
1145        }
1146        Ok(region)
1147    }
1148
1149    /// Gets staging region by region id.
1150    ///
1151    /// Returns error if the region does not exist or is not in staging state.
1152    pub(crate) fn staging_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1153        let region = self
1154            .get_region(region_id)
1155            .context(RegionNotFoundSnafu { region_id })?;
1156        ensure!(
1157            region.is_staging(),
1158            RegionStateSnafu {
1159                region_id,
1160                state: region.state(),
1161                expect: RegionRoleState::Leader(RegionLeaderState::Staging),
1162            }
1163        );
1164        Ok(region)
1165    }
1166
1167    /// Gets flushable region by region id.
1168    ///
1169    /// Returns error if the region does not exist or is not operable.
1170    fn flushable_region(&self, region_id: RegionId) -> Result<MitoRegionRef> {
1171        let region = self
1172            .get_region(region_id)
1173            .context(RegionNotFoundSnafu { region_id })?;
1174        ensure!(
1175            region.is_flushable(),
1176            FlushableRegionStateSnafu {
1177                region_id,
1178                state: region.state(),
1179            }
1180        );
1181        Ok(region)
1182    }
1183
1184    /// Gets flushable region by region id.
1185    ///
1186    /// Calls the callback if the region does not exist or is not operable.
1187    pub(crate) fn flushable_region_or<F: OnFailure>(
1188        &self,
1189        region_id: RegionId,
1190        cb: &mut F,
1191    ) -> Option<MitoRegionRef> {
1192        match self.flushable_region(region_id) {
1193            Ok(region) => Some(region),
1194            Err(e) => {
1195                cb.on_failure(e);
1196                None
1197            }
1198        }
1199    }
1200
1201    /// Remove region by id.
1202    pub(crate) fn remove_region(&self, region_id: RegionId) {
1203        let mut regions = self.regions.write().unwrap();
1204        regions.remove(&region_id);
1205    }
1206
1207    /// List all regions.
1208    pub(crate) fn list_regions(&self) -> Vec<MitoRegionRef> {
1209        let regions = self.regions.read().unwrap();
1210        regions.values().cloned().collect()
1211    }
1212
1213    /// Clear the map.
1214    pub(crate) fn clear(&self) {
1215        self.regions.write().unwrap().clear();
1216    }
1217}
1218
1219pub(crate) type RegionMapRef = Arc<RegionMap>;
1220
1221/// Opening regions
1222#[derive(Debug, Default)]
1223pub(crate) struct OpeningRegions {
1224    regions: RwLock<HashMap<RegionId, Vec<OptionOutputTx>>>,
1225}
1226
1227impl OpeningRegions {
1228    /// Registers `sender` for an opening region; Otherwise, it returns `None`.
1229    pub(crate) fn wait_for_opening_region(
1230        &self,
1231        region_id: RegionId,
1232        sender: OptionOutputTx,
1233    ) -> Option<OptionOutputTx> {
1234        let mut regions = self.regions.write().unwrap();
1235        match regions.entry(region_id) {
1236            Entry::Occupied(mut senders) => {
1237                senders.get_mut().push(sender);
1238                None
1239            }
1240            Entry::Vacant(_) => Some(sender),
1241        }
1242    }
1243
1244    /// Returns true if the region exists.
1245    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1246        let regions = self.regions.read().unwrap();
1247        regions.contains_key(&region_id)
1248    }
1249
1250    /// Inserts a new region into the map.
1251    pub(crate) fn insert_sender(&self, region: RegionId, sender: OptionOutputTx) {
1252        let mut regions = self.regions.write().unwrap();
1253        regions.insert(region, vec![sender]);
1254    }
1255
1256    /// Remove region by id.
1257    pub(crate) fn remove_sender(&self, region_id: RegionId) -> Vec<OptionOutputTx> {
1258        let mut regions = self.regions.write().unwrap();
1259        regions.remove(&region_id).unwrap_or_default()
1260    }
1261
1262    #[cfg(test)]
1263    pub(crate) fn sender_len(&self, region_id: RegionId) -> usize {
1264        let regions = self.regions.read().unwrap();
1265        if let Some(senders) = regions.get(&region_id) {
1266            senders.len()
1267        } else {
1268            0
1269        }
1270    }
1271}
1272
1273pub(crate) type OpeningRegionsRef = Arc<OpeningRegions>;
1274
1275/// The regions that are catching up.
1276#[derive(Debug, Default)]
1277pub(crate) struct CatchupRegions {
1278    regions: RwLock<HashSet<RegionId>>,
1279}
1280
1281impl CatchupRegions {
1282    /// Returns true if the region exists.
1283    pub(crate) fn is_region_exists(&self, region_id: RegionId) -> bool {
1284        let regions = self.regions.read().unwrap();
1285        regions.contains(&region_id)
1286    }
1287
1288    /// Inserts a new region into the set.
1289    pub(crate) fn insert_region(&self, region_id: RegionId) {
1290        let mut regions = self.regions.write().unwrap();
1291        regions.insert(region_id);
1292    }
1293
1294    /// Remove region by id.
1295    pub(crate) fn remove_region(&self, region_id: RegionId) {
1296        let mut regions = self.regions.write().unwrap();
1297        regions.remove(&region_id);
1298    }
1299}
1300
1301pub(crate) type CatchupRegionsRef = Arc<CatchupRegions>;
1302
1303/// Manifest stats.
1304#[derive(Default, Debug, Clone)]
1305pub struct ManifestStats {
1306    pub(crate) total_manifest_size: Arc<AtomicU64>,
1307    pub(crate) manifest_version: Arc<AtomicU64>,
1308    pub(crate) file_removed_cnt: Arc<AtomicU64>,
1309}
1310
1311impl ManifestStats {
1312    fn total_manifest_size(&self) -> u64 {
1313        self.total_manifest_size.load(Ordering::Relaxed)
1314    }
1315
1316    fn manifest_version(&self) -> u64 {
1317        self.manifest_version.load(Ordering::Relaxed)
1318    }
1319
1320    fn file_removed_cnt(&self) -> u64 {
1321        self.file_removed_cnt.load(Ordering::Relaxed)
1322    }
1323}
1324
1325/// Parses the partition expression from a JSON string.
1326pub fn parse_partition_expr(partition_expr_str: Option<&str>) -> Result<Option<PartitionExpr>> {
1327    match partition_expr_str {
1328        None => Ok(None),
1329        Some("") => Ok(None),
1330        Some(json_str) => {
1331            let expr = partition::expr::PartitionExpr::from_json_str(json_str)
1332                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?;
1333            Ok(expr)
1334        }
1335    }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340    use std::sync::atomic::AtomicU64;
1341    use std::sync::{Arc, Mutex};
1342
1343    use common_datasource::compression::CompressionType;
1344    use common_test_util::temp_dir::create_temp_dir;
1345    use crossbeam_utils::atomic::AtomicCell;
1346    use object_store::ObjectStore;
1347    use object_store::services::Fs;
1348    use store_api::logstore::provider::Provider;
1349    use store_api::region_engine::RegionRole;
1350    use store_api::region_request::PathType;
1351    use store_api::storage::RegionId;
1352
1353    use crate::access_layer::AccessLayer;
1354    use crate::manifest::action::RegionMetaActionList;
1355    use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
1356    use crate::region::{
1357        ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
1358    };
1359    use crate::sst::FormatType;
1360    use crate::sst::index::intermediate::IntermediateManager;
1361    use crate::sst::index::puffin_manager::PuffinManagerFactory;
1362    use crate::test_util::scheduler_util::SchedulerEnv;
1363    use crate::test_util::version_util::VersionControlBuilder;
1364    use crate::time_provider::StdTimeProvider;
1365
1366    #[test]
1367    fn test_region_state_lock_free() {
1368        assert!(AtomicCell::<RegionRoleState>::is_lock_free());
1369    }
1370
1371    #[tokio::test]
1372    async fn test_set_region_state() {
1373        let env = SchedulerEnv::new().await;
1374        let builder = VersionControlBuilder::new();
1375        let version_control = Arc::new(builder.build());
1376        let manifest_ctx = env
1377            .mock_manifest_context(version_control.current().version.metadata.clone())
1378            .await;
1379
1380        let region_id = RegionId::new(1024, 0);
1381        // Leader -> Follower
1382        manifest_ctx.set_role(RegionRole::Follower, region_id);
1383        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1384
1385        // Follower -> Leader
1386        manifest_ctx.set_role(RegionRole::Leader, region_id);
1387        assert_eq!(
1388            manifest_ctx.state.load(),
1389            RegionRoleState::Leader(RegionLeaderState::Writable)
1390        );
1391
1392        // Leader -> Downgrading Leader
1393        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1394        assert_eq!(
1395            manifest_ctx.state.load(),
1396            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1397        );
1398
1399        // Downgrading Leader -> Follower
1400        manifest_ctx.set_role(RegionRole::Follower, region_id);
1401        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1402
1403        // Can't downgrade from follower (Follower -> Downgrading Leader)
1404        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1405        assert_eq!(manifest_ctx.state.load(), RegionRoleState::Follower);
1406
1407        // Set region role too Downgrading Leader
1408        manifest_ctx.set_role(RegionRole::Leader, region_id);
1409        manifest_ctx.set_role(RegionRole::DowngradingLeader, region_id);
1410        assert_eq!(
1411            manifest_ctx.state.load(),
1412            RegionRoleState::Leader(RegionLeaderState::Downgrading)
1413        );
1414
1415        // Downgrading Leader -> Leader
1416        manifest_ctx.set_role(RegionRole::Leader, region_id);
1417        assert_eq!(
1418            manifest_ctx.state.load(),
1419            RegionRoleState::Leader(RegionLeaderState::Writable)
1420        );
1421    }
1422
1423    #[tokio::test]
1424    async fn test_staging_state_validation() {
1425        let env = SchedulerEnv::new().await;
1426        let builder = VersionControlBuilder::new();
1427        let version_control = Arc::new(builder.build());
1428
1429        // Create context with staging state using the correct pattern from SchedulerEnv
1430        let staging_ctx = {
1431            let manager = RegionManifestManager::new(
1432                version_control.current().version.metadata.clone(),
1433                0,
1434                RegionManifestOptions {
1435                    manifest_dir: "".to_string(),
1436                    object_store: env.access_layer.object_store().clone(),
1437                    compress_type: CompressionType::Uncompressed,
1438                    checkpoint_distance: 10,
1439                    remove_file_options: Default::default(),
1440                    manifest_cache: None,
1441                },
1442                FormatType::PrimaryKey,
1443                &Default::default(),
1444            )
1445            .await
1446            .unwrap();
1447            Arc::new(ManifestContext::new(
1448                manager,
1449                RegionRoleState::Leader(RegionLeaderState::Staging),
1450            ))
1451        };
1452
1453        // Test staging state behavior
1454        assert_eq!(
1455            staging_ctx.current_state(),
1456            RegionRoleState::Leader(RegionLeaderState::Staging)
1457        );
1458
1459        // Test writable context for comparison
1460        let writable_ctx = env
1461            .mock_manifest_context(version_control.current().version.metadata.clone())
1462            .await;
1463
1464        assert_eq!(
1465            writable_ctx.current_state(),
1466            RegionRoleState::Leader(RegionLeaderState::Writable)
1467        );
1468    }
1469
1470    #[tokio::test]
1471    async fn test_staging_state_transitions() {
1472        let builder = VersionControlBuilder::new();
1473        let version_control = Arc::new(builder.build());
1474        let metadata = version_control.current().version.metadata.clone();
1475
1476        // Create MitoRegion for testing state transitions
1477        let temp_dir = create_temp_dir("");
1478        let path_str = temp_dir.path().display().to_string();
1479        let fs_builder = Fs::default().root(&path_str);
1480        let object_store = ObjectStore::new(fs_builder).unwrap().finish();
1481
1482        let index_aux_path = temp_dir.path().join("index_aux");
1483        let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
1484            .await
1485            .unwrap();
1486        let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
1487            .await
1488            .unwrap();
1489
1490        let access_layer = Arc::new(AccessLayer::new(
1491            "",
1492            PathType::Bare,
1493            object_store,
1494            puffin_mgr,
1495            intm_mgr,
1496        ));
1497
1498        let manager = RegionManifestManager::new(
1499            metadata.clone(),
1500            0,
1501            RegionManifestOptions {
1502                manifest_dir: "".to_string(),
1503                object_store: access_layer.object_store().clone(),
1504                compress_type: CompressionType::Uncompressed,
1505                checkpoint_distance: 10,
1506                remove_file_options: Default::default(),
1507                manifest_cache: None,
1508            },
1509            FormatType::PrimaryKey,
1510            &Default::default(),
1511        )
1512        .await
1513        .unwrap();
1514
1515        let manifest_ctx = Arc::new(ManifestContext::new(
1516            manager,
1517            RegionRoleState::Leader(RegionLeaderState::Writable),
1518        ));
1519
1520        let region = MitoRegion {
1521            region_id: metadata.region_id,
1522            version_control,
1523            access_layer,
1524            manifest_ctx: manifest_ctx.clone(),
1525            file_purger: crate::test_util::new_noop_file_purger(),
1526            provider: Provider::noop_provider(),
1527            last_flush_millis: Default::default(),
1528            last_compaction_millis: Default::default(),
1529            time_provider: Arc::new(StdTimeProvider),
1530            topic_latest_entry_id: Default::default(),
1531            written_bytes: Arc::new(AtomicU64::new(0)),
1532            stats: ManifestStats::default(),
1533            staging_partition_expr: Mutex::new(None),
1534        };
1535
1536        // Test initial state
1537        assert_eq!(
1538            region.state(),
1539            RegionRoleState::Leader(RegionLeaderState::Writable)
1540        );
1541        assert!(!region.is_staging());
1542
1543        // Test transition to staging
1544        let mut manager = manifest_ctx.manifest_manager.write().await;
1545        region.set_staging(&mut manager).await.unwrap();
1546        drop(manager);
1547        assert_eq!(
1548            region.state(),
1549            RegionRoleState::Leader(RegionLeaderState::Staging)
1550        );
1551        assert!(region.is_staging());
1552
1553        // Test transition back to writable
1554        region.exit_staging().unwrap();
1555        assert_eq!(
1556            region.state(),
1557            RegionRoleState::Leader(RegionLeaderState::Writable)
1558        );
1559        assert!(!region.is_staging());
1560
1561        // Test staging directory cleanup: Create dirty staging files before entering staging mode
1562        {
1563            // Create some dummy staging manifest files to simulate interrupted session
1564            let manager = manifest_ctx.manifest_manager.write().await;
1565            let dummy_actions = RegionMetaActionList::new(vec![]);
1566            let dummy_bytes = dummy_actions.encode().unwrap();
1567
1568            // Create dirty staging files with versions 100 and 101
1569            manager.store().save(100, &dummy_bytes, true).await.unwrap();
1570            manager.store().save(101, &dummy_bytes, true).await.unwrap();
1571            drop(manager);
1572
1573            // Verify dirty files exist before entering staging
1574            let manager = manifest_ctx.manifest_manager.read().await;
1575            let dirty_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1576            assert_eq!(
1577                dirty_manifests.len(),
1578                2,
1579                "Should have 2 dirty staging files"
1580            );
1581            drop(manager);
1582
1583            // Enter staging mode - this should clean up the dirty files
1584            let mut manager = manifest_ctx.manifest_manager.write().await;
1585            region.set_staging(&mut manager).await.unwrap();
1586            drop(manager);
1587
1588            // Verify dirty files are cleaned up after entering staging
1589            let manager = manifest_ctx.manifest_manager.read().await;
1590            let cleaned_manifests = manager.store().fetch_staging_manifests().await.unwrap();
1591            assert_eq!(
1592                cleaned_manifests.len(),
1593                0,
1594                "Dirty staging files should be cleaned up"
1595            );
1596            drop(manager);
1597
1598            // Exit staging to restore normal state for remaining tests
1599            region.exit_staging().unwrap();
1600        }
1601
1602        // Test invalid transitions
1603        let mut manager = manifest_ctx.manifest_manager.write().await;
1604        assert!(region.set_staging(&mut manager).await.is_ok()); // Writable -> Staging should work
1605        drop(manager);
1606        let mut manager = manifest_ctx.manifest_manager.write().await;
1607        assert!(region.set_staging(&mut manager).await.is_err()); // Staging -> Staging should fail
1608        drop(manager);
1609        assert!(region.exit_staging().is_ok()); // Staging -> Writable should work
1610        assert!(region.exit_staging().is_err()); // Writable -> Writable should fail
1611    }
1612}