From 2f4726f7b59dffdcd34106d0efecff0dbfb5a8b3 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 15 Apr 2024 13:48:25 +0800 Subject: [PATCH] refactor: Move manifest manager lock to `MitoRegion` (#3689) * feat: remove manager inner wip * feat: put manifest lock in region * feat: don't update manifest if manager is stopped * chore: address CR comments --- src/mito2/src/engine/catchup_test.rs | 4 +- src/mito2/src/error.rs | 7 + src/mito2/src/manifest/manager.rs | 224 ++++++++------------- src/mito2/src/manifest/tests/checkpoint.rs | 28 +-- src/mito2/src/region.rs | 9 +- src/mito2/src/region/opener.rs | 7 +- src/mito2/src/worker/handle_alter.rs | 7 +- src/mito2/src/worker/handle_catchup.rs | 47 ++--- src/mito2/src/worker/handle_compaction.rs | 8 +- src/mito2/src/worker/handle_truncate.rs | 7 +- 10 files changed, 160 insertions(+), 188 deletions(-) diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index cc520d2357..b0dd75fe53 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -345,7 +345,7 @@ async fn test_catchup_with_manifest_update() { // Ensures the mutable is empty. assert!(region.version().memtables.mutable.is_empty()); - let manifest = region.manifest_manager.manifest().await; + let manifest = region.manifest_manager.read().await.manifest(); assert_eq!(manifest.manifest_version, 0); let resp = follower_engine @@ -361,7 +361,7 @@ async fn test_catchup_with_manifest_update() { // The inner region was replaced. We must get it again. let region = follower_engine.get_region(region_id).unwrap(); - let manifest = region.manifest_manager.manifest().await; + let manifest = region.manifest_manager.read().await.manifest(); assert_eq!(manifest.manifest_version, 2); assert!(!region.is_writable()); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 926b7638a8..03d0e60715 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -578,6 +578,12 @@ pub enum Error { #[snafu(display("checksum mismatch (actual: {}, expected: {})", actual, expected))] ChecksumMismatch { actual: u32, expected: u32 }, + + #[snafu(display("Region {} is stopped", region_id))] + RegionStopped { + region_id: RegionId, + location: Location, + }, } pub type Result = std::result::Result; @@ -684,6 +690,7 @@ impl ErrorExt for Error { BiError { .. } => StatusCode::Internal, EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, ChecksumMismatch { .. } => StatusCode::Unexpected, + RegionStopped { .. } => StatusCode::RegionNotReady, } } diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e2de8d0189..74aa1626d3 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -18,12 +18,11 @@ use common_datasource::compression::CompressionType; use common_telemetry::{debug, info}; use futures::TryStreamExt; use object_store::ObjectStore; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION}; use store_api::metadata::RegionMetadataRef; -use tokio::sync::RwLock; -use crate::error::{self, Result}; +use crate::error::{self, RegionStoppedSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionMetaActionList, @@ -111,99 +110,18 @@ pub struct RegionManifestOptions { /// ``` #[derive(Debug)] pub struct RegionManifestManager { - inner: RwLock, -} - -impl RegionManifestManager { - /// Construct a region's manifest and persist it. - pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { - let inner = RegionManifestManagerInner::new(metadata, options).await?; - Ok(Self { - inner: RwLock::new(inner), - }) - } - - /// Open an existing manifest. - pub async fn open(options: RegionManifestOptions) -> Result> { - if let Some(inner) = RegionManifestManagerInner::open(options).await? { - Ok(Some(Self { - inner: RwLock::new(inner), - })) - } else { - Ok(None) - } - } - - /// Stop background tasks gracefully. - pub async fn stop(&self) -> Result<()> { - let mut inner = self.inner.write().await; - inner.stop().await - } - - /// Update the manifest. Return the current manifest version number. - pub async fn update(&self, action_list: RegionMetaActionList) -> Result { - let _t = MANIFEST_OP_ELAPSED - .with_label_values(&["update"]) - .start_timer(); - - let mut inner = self.inner.write().await; - inner.update(action_list).await - } - - /// Retrieve the current [RegionManifest]. - pub async fn manifest(&self) -> Arc { - let inner = self.inner.read().await; - inner.manifest.clone() - } - - #[cfg(test)] - pub async fn store(&self) -> ManifestObjectStore { - let inner = self.inner.read().await; - inner.store.clone() - } - - /// Returns total manifest size. - pub async fn manifest_usage(&self) -> u64 { - let inner = self.inner.read().await; - inner.total_manifest_size() - } - - /// Returns true if a newer version manifest file is found. - pub async fn has_update(&self) -> Result { - let inner = self.inner.read().await; - inner.has_update().await - } -} - -#[cfg(test)] -impl RegionManifestManager { - pub(crate) async fn validate_manifest( - &self, - expect: &RegionMetadataRef, - last_version: ManifestVersion, - ) { - let manifest = self.manifest().await; - assert_eq!(manifest.metadata, *expect); - - let inner = self.inner.read().await; - assert_eq!(inner.manifest.manifest_version, inner.last_version); - assert_eq!(last_version, inner.last_version); - } -} - -#[derive(Debug)] -pub(crate) struct RegionManifestManagerInner { store: ManifestObjectStore, options: RegionManifestOptions, last_version: ManifestVersion, /// The last version included in checkpoint file. last_checkpoint_version: ManifestVersion, manifest: Arc, + stopped: bool, } -impl RegionManifestManagerInner { - /// Creates a new manifest. - async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { +impl RegionManifestManager { + /// Constructs a region's manifest and persist it. + pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { // construct storage let mut store = ManifestObjectStore::new( &options.manifest_dir, @@ -243,13 +161,14 @@ impl RegionManifestManagerInner { last_version: version, last_checkpoint_version: MIN_VERSION, manifest: Arc::new(manifest), + stopped: false, }) } /// Opens an existing manifest. /// /// Returns `Ok(None)` if no such manifest. - async fn open(options: RegionManifestOptions) -> Result> { + pub async fn open(options: RegionManifestOptions) -> Result> { let _t = MANIFEST_OP_ELAPSED .with_label_values(&["open"]) .start_timer(); @@ -333,15 +252,29 @@ impl RegionManifestManagerInner { last_version: version, last_checkpoint_version, manifest: Arc::new(manifest), + stopped: false, })) } - async fn stop(&mut self) -> Result<()> { + /// Stops the manager. + pub async fn stop(&mut self) -> Result<()> { + self.stopped = true; Ok(()) } - /// Updates the manifest. Return the current manifest version number. - async fn update(&mut self, action_list: RegionMetaActionList) -> Result { + /// Updates the manifest. Returns the current manifest version number. + pub async fn update(&mut self, action_list: RegionMetaActionList) -> Result { + let _t = MANIFEST_OP_ELAPSED + .with_label_values(&["update"]) + .start_timer(); + + ensure!( + !self.stopped, + RegionStoppedSnafu { + region_id: self.manifest.metadata.region_id, + } + ); + let version = self.increase_version(); self.store.save(version, &action_list.encode()?).await?; @@ -373,20 +306,56 @@ impl RegionManifestManagerInner { Ok(version) } + /// Retrieves the current [RegionManifest]. + pub fn manifest(&self) -> Arc { + self.manifest.clone() + } + /// Returns total manifest size. - pub(crate) fn total_manifest_size(&self) -> u64 { + pub fn manifest_usage(&self) -> u64 { self.store.total_manifest_size() } -} -impl RegionManifestManagerInner { + /// Returns true if a newer version manifest file is found. + /// + /// It is typically used in read-only regions to catch up with manifest. + /// It doesn't lock the manifest directory in the object store so the result + /// may be inaccurate if there are concurrent writes. + pub async fn has_update(&self) -> Result { + let last_version = self.last_version; + + let streamer = + self.store + .manifest_lister() + .await? + .context(error::EmptyManifestDirSnafu { + manifest_dir: self.store.manifest_dir(), + })?; + + let need_update = streamer + .try_any(|entry| async move { + let file_name = entry.name(); + if is_delta_file(file_name) { + let version = file_version(file_name); + if version > last_version { + return true; + } + } + false + }) + .await + .context(error::OpenDalSnafu)?; + + Ok(need_update) + } + /// Increases last version and returns the increased version. fn increase_version(&mut self) -> ManifestVersion { self.last_version += 1; self.last_version } - pub(crate) async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> { + async fn may_do_checkpoint(&mut self, version: ManifestVersion) -> Result<()> { if version - self.last_checkpoint_version >= self.options.checkpoint_distance && self.options.checkpoint_distance != 0 { @@ -493,36 +462,19 @@ impl RegionManifestManagerInner { Ok(None) } } +} - /// Returns true if a newer version manifest file is found. - /// - /// It is typically used in read-only regions to catch up with manifest. - pub(crate) async fn has_update(&self) -> Result { - let last_version = self.last_version; +#[cfg(test)] +impl RegionManifestManager { + fn validate_manifest(&self, expect: &RegionMetadataRef, last_version: ManifestVersion) { + let manifest = self.manifest(); + assert_eq!(manifest.metadata, *expect); + assert_eq!(self.manifest.manifest_version, self.last_version); + assert_eq!(last_version, self.last_version); + } - let streamer = - self.store - .manifest_lister() - .await? - .context(error::EmptyManifestDirSnafu { - manifest_dir: self.store.manifest_dir(), - })?; - - let need_update = streamer - .try_any(|entry| async move { - let file_name = entry.name(); - if is_delta_file(file_name) { - let version = file_version(file_name); - if version > last_version { - return true; - } - } - false - }) - .await - .context(error::OpenDalSnafu)?; - - Ok(need_update) + pub fn store(&self) -> ManifestObjectStore { + self.store.clone() } } @@ -551,7 +503,7 @@ mod test { .unwrap() .unwrap(); - manager.validate_manifest(&metadata, 0).await; + manager.validate_manifest(&metadata, 0); } #[tokio::test] @@ -566,7 +518,7 @@ mod test { // Creates a manifest. let metadata = Arc::new(basic_region_metadata()); - let manager = env + let mut manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await .unwrap() @@ -581,14 +533,14 @@ mod test { .unwrap() .unwrap(); - manager.validate_manifest(&metadata, 0).await; + manager.validate_manifest(&metadata, 0); } #[tokio::test] async fn region_change_add_column() { let metadata = Arc::new(basic_region_metadata()); let env = TestEnv::new(); - let manager = env + let mut manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await .unwrap() @@ -609,7 +561,7 @@ mod test { let current_version = manager.update(action_list).await.unwrap(); assert_eq!(current_version, 1); - manager.validate_manifest(&new_metadata, 1).await; + manager.validate_manifest(&new_metadata, 1); // Reopen the manager. manager.stop().await.unwrap(); @@ -618,7 +570,7 @@ mod test { .await .unwrap() .unwrap(); - manager.validate_manifest(&new_metadata, 1).await; + manager.validate_manifest(&new_metadata, 1); } /// Just for test, refer to wal_dir_usage in src/store-api/src/logstore.rs. @@ -650,7 +602,7 @@ mod test { let manifest_dir = format!("{}/manifest", data_home_path); - let manager = env + let mut manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await .unwrap() @@ -671,10 +623,10 @@ mod test { let current_version = manager.update(action_list).await.unwrap(); assert_eq!(current_version, 1); - manager.validate_manifest(&new_metadata, 1).await; + manager.validate_manifest(&new_metadata, 1); // get manifest size - let manifest_size = manager.manifest_usage().await; + let manifest_size = manager.manifest_usage(); assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); // update 10 times nop_action to trigger checkpoint @@ -694,7 +646,7 @@ mod test { } // check manifest size again - let manifest_size = manager.manifest_usage().await; + let manifest_size = manager.manifest_usage(); assert_eq!(manifest_size, manifest_dir_usage(&manifest_dir).await); // Reopen the manager, @@ -705,10 +657,10 @@ mod test { .await .unwrap() .unwrap(); - manager.validate_manifest(&new_metadata, 11).await; + manager.validate_manifest(&new_metadata, 11); // get manifest size again - let manifest_size = manager.manifest_usage().await; + let manifest_size = manager.manifest_usage(); assert_eq!(manifest_size, 1312); } } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index cab8b43d0c..f712dc8b90 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -23,7 +23,7 @@ use crate::error::Error::ChecksumMismatch; use crate::manifest::action::{ RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList, }; -use crate::manifest::manager::{RegionManifestManager, RegionManifestManagerInner}; +use crate::manifest::manager::RegionManifestManager; use crate::manifest::tests::utils::basic_region_metadata; use crate::sst::file::{FileId, FileMeta}; use crate::test_util::TestEnv; @@ -66,7 +66,7 @@ fn nop_action() -> RegionMetaActionList { #[tokio::test] async fn manager_without_checkpoint() { - let (_env, manager) = build_manager(0, CompressionType::Uncompressed).await; + let (_env, mut manager) = build_manager(0, CompressionType::Uncompressed).await; // apply 10 actions for _ in 0..10 { @@ -76,7 +76,6 @@ async fn manager_without_checkpoint() { // no checkpoint assert!(manager .store() - .await .load_last_checkpoint() .await .unwrap() @@ -99,7 +98,6 @@ async fn manager_without_checkpoint() { expected.sort_unstable(); let mut paths = manager .store() - .await .get_paths(|e| Some(e.name().to_string())) .await .unwrap(); @@ -110,7 +108,7 @@ async fn manager_without_checkpoint() { #[tokio::test] async fn manager_with_checkpoint_distance_1() { common_telemetry::init_default_ut_logging(); - let (env, manager) = build_manager(1, CompressionType::Uncompressed).await; + let (env, mut manager) = build_manager(1, CompressionType::Uncompressed).await; // apply 10 actions for _ in 0..10 { @@ -120,7 +118,6 @@ async fn manager_with_checkpoint_distance_1() { // has checkpoint assert!(manager .store() - .await .load_last_checkpoint() .await .unwrap() @@ -137,7 +134,6 @@ async fn manager_with_checkpoint_distance_1() { expected.sort_unstable(); let mut paths = manager .store() - .await .get_paths(|e| Some(e.name().to_string())) .await .unwrap(); @@ -147,8 +143,7 @@ async fn manager_with_checkpoint_distance_1() { // check content in `_last_checkpoint` let raw_bytes = manager .store() - .await - .read_file(&manager.store().await.last_checkpoint_path()) + .read_file(&manager.store().last_checkpoint_path()) .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); @@ -159,14 +154,14 @@ async fn manager_with_checkpoint_distance_1() { // reopen the manager manager.stop().await.unwrap(); let manager = reopen_manager(&env, 1, CompressionType::Uncompressed).await; - assert_eq!(10, manager.manifest().await.manifest_version); + assert_eq!(10, manager.manifest().manifest_version); } #[tokio::test] async fn test_corrupted_data_causing_checksum_error() { // Initialize manager common_telemetry::init_default_ut_logging(); - let (_env, manager) = build_manager(1, CompressionType::Uncompressed).await; + let (_env, mut manager) = build_manager(1, CompressionType::Uncompressed).await; // Apply actions for _ in 0..10 { @@ -176,7 +171,6 @@ async fn test_corrupted_data_causing_checksum_error() { // Check if there is a checkpoint assert!(manager .store() - .await .load_last_checkpoint() .await .unwrap() @@ -185,8 +179,7 @@ async fn test_corrupted_data_causing_checksum_error() { // Corrupt the last checkpoint data let mut corrupted_bytes = manager .store() - .await - .read_file(&manager.store().await.last_checkpoint_path()) + .read_file(&manager.store().last_checkpoint_path()) .await .unwrap(); corrupted_bytes[0] ^= 1; @@ -194,13 +187,12 @@ async fn test_corrupted_data_causing_checksum_error() { // Overwrite the latest checkpoint data manager .store() - .await .write_last_checkpoint(9, &corrupted_bytes) .await .unwrap(); // Attempt to load the corrupted checkpoint - let load_corrupted_result = manager.store().await.load_last_checkpoint().await; + let load_corrupted_result = manager.store().load_last_checkpoint().await; // Check if the result is an error and if it's of type VerifyChecksum assert_matches!(load_corrupted_result, Err(ChecksumMismatch { .. })); @@ -245,13 +237,13 @@ async fn generate_checkpoint_with_compression_types( compress_type: CompressionType, actions: Vec, ) -> RegionCheckpoint { - let (_env, manager) = build_manager(1, compress_type).await; + let (_env, mut manager) = build_manager(1, compress_type).await; for action in actions { manager.update(action).await.unwrap(); } - RegionManifestManagerInner::last_checkpoint(&mut manager.store().await) + RegionManifestManager::last_checkpoint(&mut manager.store()) .await .unwrap() .unwrap() diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 1fa3fb7d4f..d29e9fc469 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -27,6 +27,7 @@ use common_wal::options::WalOptions; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; +use tokio::sync::RwLock as TokioRwLock; use crate::access_layer::AccessLayerRef; use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result}; @@ -74,7 +75,7 @@ pub(crate) struct MitoRegion { /// SSTs accessor for this region. pub(crate) access_layer: AccessLayerRef, /// Manager to maintain manifest for this region. - pub(crate) manifest_manager: RegionManifestManager, + pub(crate) manifest_manager: TokioRwLock, /// SST file purger. pub(crate) file_purger: FilePurgerRef, /// Wal options of this region. @@ -94,7 +95,7 @@ pub(crate) type MitoRegionRef = Arc; impl MitoRegion { /// Stop background managers for this region. pub(crate) async fn stop(&self) -> Result<()> { - self.manifest_manager.stop().await?; + self.manifest_manager.write().await.stop().await?; info!( "Stopped region manifest manager, region_id: {}", @@ -154,7 +155,7 @@ impl MitoRegion { let wal_usage = self.estimated_wal_usage(memtable_usage); - let manifest_usage = self.manifest_manager.manifest_usage().await; + let manifest_usage = self.manifest_manager.read().await.manifest_usage(); RegionUsage { region_id, @@ -178,6 +179,8 @@ impl MitoRegion { info!("Applying {edit:?} to region {}", self.region_id); self.manifest_manager + .write() + .await .update(RegionMetaActionList::with_action(RegionMetaAction::Edit( edit.clone(), ))) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 5549e55c44..739ae30603 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -27,6 +27,7 @@ use snafu::{ensure, OptionExt}; use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{ColumnId, RegionId}; +use tokio::sync::RwLock; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; @@ -202,7 +203,7 @@ impl RegionOpener { region_id, version_control, access_layer: access_layer.clone(), - manifest_manager, + manifest_manager: RwLock::new(manifest_manager), file_purger: Arc::new(LocalFilePurger::new( self.purge_scheduler, access_layer, @@ -263,7 +264,7 @@ impl RegionOpener { return Ok(None); }; - let manifest = manifest_manager.manifest().await; + let manifest = manifest_manager.manifest(); let metadata = manifest.metadata.clone(); let region_id = self.region_id; @@ -330,7 +331,7 @@ impl RegionOpener { region_id: self.region_id, version_control, access_layer, - manifest_manager, + manifest_manager: RwLock::new(manifest_manager), file_purger, wal_options, last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index d0d6e51039..4c88358be2 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -138,7 +138,12 @@ async fn alter_region_schema( metadata: new_meta.clone(), }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change)); - region.manifest_manager.update(action_list).await?; + region + .manifest_manager + .write() + .await + .update(action_list) + .await?; // Apply the metadata to region's version. region diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index ed2c52c211..13c5902663 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -45,30 +45,31 @@ impl RegionWorkerLoop { let is_mutable_empty = region.version().memtables.mutable.is_empty(); // Utilizes the short circuit evaluation. - let region = if !is_mutable_empty || region.manifest_manager.has_update().await? { - info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}"); - let reopened_region = Arc::new( - RegionOpener::new( - region_id, - region.region_dir(), - self.memtable_builder_provider.clone(), - self.object_store_manager.clone(), - self.purge_scheduler.clone(), - self.intermediate_manager.clone(), - ) - .cache(Some(self.cache_manager.clone())) - .options(region.version().options.clone()) - .skip_wal_replay(true) - .open(&self.config, &self.wal) - .await?, - ); - debug_assert!(!reopened_region.is_writable()); - self.regions.insert_region(reopened_region.clone()); + let region = + if !is_mutable_empty || region.manifest_manager.read().await.has_update().await? { + info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}"); + let reopened_region = Arc::new( + RegionOpener::new( + region_id, + region.region_dir(), + self.memtable_builder_provider.clone(), + self.object_store_manager.clone(), + self.purge_scheduler.clone(), + self.intermediate_manager.clone(), + ) + .cache(Some(self.cache_manager.clone())) + .options(region.version().options.clone()) + .skip_wal_replay(true) + .open(&self.config, &self.wal) + .await?, + ); + debug_assert!(!reopened_region.is_writable()); + self.regions.insert_region(reopened_region.clone()); - reopened_region - } else { - region - }; + reopened_region + } else { + region + }; let flushed_entry_id = region.version_control.current().last_entry_id; info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 5f797f894f..6fcf382035 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -79,7 +79,13 @@ impl RegionWorkerLoop { }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - if let Err(e) = region.manifest_manager.update(action_list).await { + if let Err(e) = region + .manifest_manager + .write() + .await + .update(action_list) + .await + { error!(e; "Failed to update manifest, region: {}", region_id); manifest_timer.stop_and_discard(); request.on_failure(e); diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index c853f5eb03..aa3c34b49b 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -44,7 +44,12 @@ impl RegionWorkerLoop { }; let action_list = RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); - region.manifest_manager.update(action_list).await?; + region + .manifest_manager + .write() + .await + .update(action_list) + .await?; // Notifies flush scheduler. self.flush_scheduler.on_region_truncated(region_id);