diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index dfdb8a4aa9..d032c046d3 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -58,7 +58,6 @@ use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file::{FileHandle, FileId, Level}; -use crate::sst::file_purger::FilePurgerRef; use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; @@ -71,12 +70,10 @@ pub struct CompactionRequest { pub(crate) request_sender: mpsc::Sender, /// Waiters of the compaction request. pub(crate) waiters: Vec, - pub(crate) file_purger: FilePurgerRef, /// Start time of compaction task. pub(crate) start_time: Instant, pub(crate) cache_manager: CacheManagerRef, pub(crate) manifest_ctx: ManifestContextRef, - pub(crate) version_control: VersionControlRef, pub(crate) listener: WorkerListener, } @@ -142,7 +139,6 @@ impl CompactionScheduler { compact_options: compact_request::Options, version_control: &VersionControlRef, access_layer: &AccessLayerRef, - file_purger: &FilePurgerRef, waiter: OptionOutputTx, manifest_ctx: &ManifestContextRef, ) -> Result<()> { @@ -153,12 +149,8 @@ impl CompactionScheduler { } // The region can compact directly. - let mut status = CompactionStatus::new( - region_id, - version_control.clone(), - access_layer.clone(), - file_purger.clone(), - ); + let mut status = + CompactionStatus::new(region_id, version_control.clone(), access_layer.clone()); let request = status.new_compaction_request( self.request_sender.clone(), waiter, @@ -330,8 +322,6 @@ struct CompactionStatus { version_control: VersionControlRef, /// Access layer of the region. access_layer: AccessLayerRef, - /// File purger of the region. - file_purger: FilePurgerRef, /// Compaction pending to schedule. /// /// For simplicity, we merge all pending compaction requests into one. @@ -344,13 +334,11 @@ impl CompactionStatus { region_id: RegionId, version_control: VersionControlRef, access_layer: AccessLayerRef, - file_purger: FilePurgerRef, ) -> CompactionStatus { CompactionStatus { region_id, version_control, access_layer, - file_purger, pending_compaction: None, } } @@ -392,11 +380,9 @@ impl CompactionStatus { access_layer: self.access_layer.clone(), request_sender: request_sender.clone(), waiters: Vec::new(), - file_purger: self.file_purger.clone(), start_time, cache_manager, manifest_ctx: manifest_ctx.clone(), - version_control: self.version_control.clone(), listener, }; @@ -547,7 +533,6 @@ mod tests { let (tx, _rx) = mpsc::channel(4); let mut scheduler = env.mock_compaction_scheduler(tx); let mut builder = VersionControlBuilder::new(); - let purger = builder.file_purger(); // Nothing to compact. let version_control = Arc::new(builder.build()); @@ -562,7 +547,6 @@ mod tests { compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - &purger, waiter, &manifest_ctx, ) @@ -581,7 +565,6 @@ mod tests { compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - &purger, waiter, &manifest_ctx, ) @@ -644,7 +627,6 @@ mod tests { compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - &purger, OptionOutputTx::none(), &manifest_ctx, ) @@ -673,7 +655,6 @@ mod tests { compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - &purger, OptionOutputTx::none(), &manifest_ctx, ) @@ -705,7 +686,6 @@ mod tests { compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, - &purger, OptionOutputTx::none(), &manifest_ctx, ) diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index a675ceafb3..fe7e637a9e 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -34,13 +34,11 @@ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; use crate::read::Source; use crate::region::options::IndexOptions; -use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionState}; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; use crate::sst::file::{FileHandle, FileMeta, IndexType}; -use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; @@ -54,7 +52,6 @@ pub(crate) struct CompactionTaskImpl { pub outputs: Vec, pub expired_ssts: Vec, pub compaction_time_window: Option, - pub file_purger: FilePurgerRef, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, /// Senders that are used to notify waiters waiting for pending compaction tasks. @@ -70,8 +67,6 @@ pub(crate) struct CompactionTaskImpl { pub(crate) append_mode: bool, /// Manifest context. pub(crate) manifest_ctx: ManifestContextRef, - /// Version control to update. - pub(crate) version_control: VersionControlRef, /// Event listener. pub(crate) listener: WorkerListener, } @@ -216,7 +211,7 @@ impl CompactionTaskImpl { Ok((output_files, inputs)) } - async fn handle_compaction(&mut self) -> error::Result<()> { + async fn handle_compaction(&mut self) -> error::Result { self.mark_files_compacting(true); let merge_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["merge"]) @@ -260,11 +255,10 @@ impl CompactionTaskImpl { // We might leak files if we fail to update manifest. We can add a cleanup task to // remove them later. self.manifest_ctx - .update_manifest(RegionState::Writable, action_list, || { - self.version_control - .apply_edit(edit, &[], self.file_purger.clone()); - }) - .await + .update_manifest(RegionState::Writable, action_list) + .await?; + + Ok(edit) } /// Handles compaction failure, notifies all waiters. @@ -292,10 +286,11 @@ impl CompactionTaskImpl { impl CompactionTask for CompactionTaskImpl { async fn run(&mut self) { let notify = match self.handle_compaction().await { - Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished { + Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.region_id, senders: std::mem::take(&mut self.waiters), start_time: self.start_time, + edit, }), Err(e) => { error!(e; "Failed to compact region, region id: {}", self.region_id); diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 7c6bf08275..eebbd1f48d 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -117,11 +117,9 @@ impl Picker for TwcsPicker { access_layer, request_sender, waiters, - file_purger, start_time, cache_manager, manifest_ctx, - version_control, listener, .. } = req; @@ -175,14 +173,12 @@ impl Picker for TwcsPicker { compaction_time_window: Some(time_window_size), request_sender, waiters, - file_purger, start_time, cache_manager, storage: current_version.options.storage.clone(), index_options: current_version.options.index_options.clone(), append_mode: current_version.options.append_mode, manifest_ctx, - version_control, listener, }; Some(Box::new(task)) diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index 2f0ff49c7f..a2b3f066ef 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -109,11 +109,9 @@ impl Picker for WindowedCompactionPicker { access_layer, request_sender, waiters, - file_purger, start_time, cache_manager, manifest_ctx, - version_control, listener, } = req; @@ -130,14 +128,12 @@ impl Picker for WindowedCompactionPicker { compaction_time_window: Some(time_window), request_sender, waiters, - file_purger, start_time, cache_manager, storage: current_version.options.storage.clone(), index_options: current_version.options.index_options.clone(), append_mode: current_version.options.append_mode, manifest_ctx, - version_control, listener, }; Some(Box::new(task)) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 991db08ebc..5bfec096cb 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -43,7 +43,6 @@ use crate::request::{ }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::{FileId, FileMeta, IndexType}; -use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; use crate::worker::WorkerListener; @@ -201,7 +200,6 @@ pub(crate) struct RegionFlushTask { pub(crate) request_sender: mpsc::Sender, pub(crate) access_layer: AccessLayerRef, - pub(crate) file_purger: FilePurgerRef, pub(crate) listener: WorkerListener, pub(crate) engine_config: Arc, pub(crate) row_group_size: Option, @@ -243,31 +241,34 @@ impl RegionFlushTask { // Get a version of this region before creating a job to get current // wal entry id, sequence and immutable memtables. let version_data = version_control.current(); - // This is used to update the version. - let version_control = version_control.clone(); Box::pin(async move { - self.do_flush(version_data, &version_control).await; + self.do_flush(version_data).await; }) } /// Runs the flush task. - async fn do_flush( - &mut self, - version_data: VersionControlData, - version_control: &VersionControlRef, - ) { + async fn do_flush(&mut self, version_data: VersionControlData) { let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer(); self.listener.on_flush_begin(self.region_id).await; - let worker_request = match self.flush_memtables(&version_data, version_control).await { - Ok(()) => { + let worker_request = match self.flush_memtables(&version_data).await { + Ok(edit) => { + let memtables_to_remove = version_data + .version + .memtables + .immutables() + .iter() + .map(|m| m.id()) + .collect(); let flush_finished = FlushFinished { region_id: self.region_id, // The last entry has been flushed. flushed_entry_id: version_data.last_entry_id, senders: std::mem::take(&mut self.senders), _timer: timer, + edit, + memtables_to_remove, }; WorkerRequest::Background { region_id: self.region_id, @@ -291,11 +292,10 @@ impl RegionFlushTask { } /// Flushes memtables to level 0 SSTs and updates the manifest. - async fn flush_memtables( - &self, - version_data: &VersionControlData, - version_control: &VersionControlRef, - ) -> Result<()> { + /// Returns the [RegionEdit] to apply. + async fn flush_memtables(&self, version_data: &VersionControlData) -> Result { + // We must use the immutable memtables list and entry ids from the `version_data` + // for consistency as others might already modify the version in the `version_control`. let version = &version_data.version; let timer = FLUSH_ELAPSED .with_label_values(&["flush_memtables"]) @@ -384,13 +384,6 @@ impl RegionFlushTask { timer.stop_and_record(), ); - let memtables_to_remove: SmallVec<[_; 2]> = version_data - .version - .memtables - .immutables() - .iter() - .map(|m| m.id()) - .collect(); let edit = RegionEdit { files_to_add: file_metas, files_to_remove: Vec::new(), @@ -405,10 +398,10 @@ impl RegionFlushTask { // We will leak files if the manifest update fails, but we ignore them for simplicity. We can // add a cleanup job to remove them later. self.manifest_ctx - .update_manifest(RegionState::Writable, action_list, || { - version_control.apply_edit(edit, &memtables_to_remove, self.file_purger.clone()); - }) - .await + .update_manifest(RegionState::Writable, action_list) + .await?; + + Ok(edit) } /// Notify flush job status. @@ -796,7 +789,6 @@ mod tests { senders: Vec::new(), request_sender: tx, access_layer: env.access_layer.clone(), - file_purger: builder.file_purger(), listener: WorkerListener::default(), engine_config: Arc::new(MitoConfig::default()), row_group_size: None, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index fc000f3e81..5f945390f9 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -301,13 +301,11 @@ impl ManifestContext { self.manifest_manager.read().await.has_update().await } - /// Updates the manifest if current state is `expect_state` and executes - /// the `applier` if the manifest is updated. + /// Updates the manifest if current state is `expect_state`. pub(crate) async fn update_manifest( &self, expect_state: RegionState, action_list: RegionMetaActionList, - applier: impl FnOnce(), ) -> Result<()> { // Acquires the write lock of the manifest manager. let mut manager = self.manifest_manager.write().await; @@ -365,9 +363,6 @@ impl ManifestContext { |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id), )?; - // Executes the applier. We MUST hold the write lock. - applier(); - if self.state.load() == RegionState::ReadOnly { warn!( "Region {} becomes read-only while updating manifest which may cause inconsistency", diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index efd796b44d..ece3a49d63 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -27,8 +27,9 @@ use common_telemetry::info; use datatypes::prelude::DataType; use prometheus::HistogramTimer; use prost::Message; +use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metadata::{ColumnMetadata, RegionMetadata}; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::region_engine::SetReadonlyResponse; use store_api::region_request::{ AffectedRows, RegionAlterRequest, RegionCatchupRequest, RegionCloseRequest, @@ -43,6 +44,7 @@ use crate::error::{ FlushRegionSnafu, InvalidRequestSnafu, Result, }; use crate::manifest::action::RegionEdit; +use crate::memtable::MemtableId; use crate::metrics::COMPACTION_ELAPSED_TOTAL; use crate::wal::entry_distributor::WalEntryReceiver; use crate::wal::EntryId; @@ -635,6 +637,10 @@ pub(crate) enum BackgroundNotify { CompactionFailed(CompactionFailed), /// Truncate result. Truncate(TruncateResult), + /// Region change result. + RegionChange(RegionChangeResult), + /// Region edit result. + RegionEdit(RegionEditResult), } /// Notifies a flush job is finished. @@ -648,6 +654,10 @@ pub(crate) struct FlushFinished { pub(crate) senders: Vec, /// Flush timer. pub(crate) _timer: HistogramTimer, + /// Region edit to apply. + pub(crate) edit: RegionEdit, + /// Memtables to remove. + pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, } impl FlushFinished { @@ -686,6 +696,8 @@ pub(crate) struct CompactionFinished { pub(crate) senders: Vec, /// Start time of compaction task. pub(crate) start_time: Instant, + /// Region edit to apply. + pub(crate) edit: RegionEdit, } impl CompactionFinished { @@ -735,6 +747,32 @@ pub(crate) struct TruncateResult { pub(crate) truncated_sequence: SequenceNumber, } +/// Notifies the region the result of writing region change action. +#[derive(Debug)] +pub(crate) struct RegionChangeResult { + /// Region id. + pub(crate) region_id: RegionId, + /// The new region metadata to apply. + pub(crate) new_meta: RegionMetadataRef, + /// Result sender. + pub(crate) sender: OptionOutputTx, + /// Result from the manifest manager. + pub(crate) result: Result<()>, +} + +/// Notifies the regin the result of editing region. +#[derive(Debug)] +pub(crate) struct RegionEditResult { + /// Region id. + pub(crate) region_id: RegionId, + /// Result sender. + pub(crate) sender: Sender>, + /// Region edit to apply. + pub(crate) edit: RegionEdit, + /// Result from the manifest manager. + pub(crate) result: Result<()>, +} + #[cfg(test)] mod tests { use api::v1::value::ValueData; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index b34d5b8566..0aa4dbb921 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -783,6 +783,8 @@ impl RegionWorkerLoop { } BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await, + BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req), + BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req), } } diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 57dd53c8c8..1c5d968383 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_telemetry::{error, info, warn}; +use common_telemetry::{error, info}; use store_api::logstore::LogStore; use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; +use crate::error::RegionNotFoundSnafu; use crate::metrics::COMPACTION_REQUEST_COUNT; -use crate::request::{CompactionFailed, CompactionFinished, OptionOutputTx}; +use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -38,7 +39,6 @@ impl RegionWorkerLoop { req.options, ®ion.version_control, ®ion.access_layer, - ®ion.file_purger, sender, ®ion.manifest_ctx, ) { @@ -57,14 +57,18 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: CompactionFinished, ) { - let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { - warn!( - "Unable to finish the compaction task for a read only region {}", - region_id - ); - return; + let region = match self.regions.get_region(region_id) { + Some(region) => region, + None => { + request.on_failure(RegionNotFoundSnafu { region_id }.build()); + return; + } }; + region + .version_control + .apply_edit(request.edit.clone(), &[], region.file_purger.clone()); + // compaction finished. request.on_success(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 2d1c4b96ca..b5d27e57ed 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -23,7 +23,7 @@ use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::Result; +use crate::error::{RegionNotFoundSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; @@ -173,7 +173,6 @@ impl RegionWorkerLoop { senders: Vec::new(), request_sender: self.sender.clone(), access_layer: region.access_layer.clone(), - file_purger: region.file_purger.clone(), listener: self.listener.clone(), engine_config, row_group_size, @@ -195,14 +194,20 @@ impl RegionWorkerLoop { // wake up other workers as we have released some memory by flush. self.notify_group(); - let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { - warn!( - "Unable to finish the flush task for a read only region {}", - region_id - ); - return; + let region = match self.regions.get_region(region_id) { + Some(region) => region, + None => { + request.on_failure(RegionNotFoundSnafu { region_id }.build()); + return; + } }; + region.version_control.apply_edit( + request.edit.clone(), + &request.memtables_to_remove, + region.file_purger.clone(), + ); + region.update_flush_millis(); // Delete wal. @@ -242,7 +247,6 @@ impl RegionWorkerLoop { compact_request::Options::Regular(Default::default()), ®ion.version_control, ®ion.access_layer, - ®ion.file_purger, OptionOutputTx::none(), ®ion.manifest_ctx, ) { diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index ca785d4f86..60ace00cd5 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -21,12 +21,15 @@ use snafu::ensure; use store_api::storage::RegionId; use tokio::sync::oneshot::Sender; -use crate::error::{InvalidRequestSnafu, Result}; +use crate::error::{InvalidRequestSnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; use crate::region::{MitoRegionRef, RegionState}; -use crate::request::{BackgroundNotify, OptionOutputTx, TruncateResult, WorkerRequest}; +use crate::request::{ + BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditResult, TruncateResult, + WorkerRequest, +}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -51,24 +54,58 @@ impl RegionWorkerLoop { return; } + let request_sender = self.sender.clone(); // Now the region is in editing state. // Updates manifest in background. common_runtime::spawn_bg(async move { - let result = edit_region(®ion, edit).await; - - if let Err(res) = sender.send(result) { + let result = edit_region(®ion, edit.clone()).await; + let notify = WorkerRequest::Background { + region_id, + notify: BackgroundNotify::RegionEdit(RegionEditResult { + region_id, + sender, + edit, + result, + }), + }; + // We don't set state back as the worker loop is already exited. + if let Err(res) = request_sender.send(notify).await { warn!( - "Failed to send result back to the worker, region_id: {}, res: {:?}", + "Failed to send region edit result back to the worker, region_id: {}, res: {:?}", region_id, res ); } - - // Sets the region as writable. For simplicity, we don't send the result - // back to the worker. - region.switch_state_to_writable(RegionState::Editing); }); } + /// Handles region edit result. + pub(crate) fn handle_region_edit_result(&self, edit_result: RegionEditResult) { + let region = match self.regions.get_region(edit_result.region_id) { + Some(region) => region, + None => { + let _ = edit_result.sender.send( + RegionNotFoundSnafu { + region_id: edit_result.region_id, + } + .fail(), + ); + return; + } + }; + + if edit_result.result.is_ok() { + // Applies the edit to the region. + region + .version_control + .apply_edit(edit_result.edit, &[], region.file_purger.clone()); + } + + // Sets the region as writable. + region.switch_state_to_writable(RegionState::Editing); + + let _ = edit_result.sender.send(edit_result.result); + } + /// Writes truncate action to the manifest and then applies it to the region in background. pub(crate) fn handle_manifest_truncate_action( &self, @@ -86,8 +123,6 @@ impl RegionWorkerLoop { let request_sender = self.sender.clone(); let manifest_ctx = region.manifest_ctx.clone(); - let version_control = region.version_control.clone(); - let memtable_builder = region.memtable_builder.clone(); // Updates manifest in background. common_runtime::spawn_bg(async move { @@ -96,14 +131,7 @@ impl RegionWorkerLoop { RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); let result = manifest_ctx - .update_manifest(RegionState::Truncating, action_list, || { - // Applies the truncate action to the region. - version_control.truncate( - truncate.truncated_entry_id, - truncate.truncated_sequence, - &memtable_builder, - ); - }) + .update_manifest(RegionState::Truncating, action_list) .await; // Sends the result back to the request sender. @@ -137,6 +165,7 @@ impl RegionWorkerLoop { return; } + let request_sender = self.sender.clone(); // Now the region is in altering state. common_runtime::spawn_bg(async move { let new_meta = change.metadata.clone(); @@ -144,28 +173,60 @@ impl RegionWorkerLoop { let result = region .manifest_ctx - .update_manifest(RegionState::Altering, action_list, || { - // Apply the metadata to region's version. - region - .version_control - .alter_schema(new_meta, ®ion.memtable_builder); - }) + .update_manifest(RegionState::Altering, action_list) .await; + let notify = WorkerRequest::Background { + region_id: region.region_id, + notify: BackgroundNotify::RegionChange(RegionChangeResult { + region_id: region.region_id, + sender, + result, + new_meta, + }), + }; - // Sets the region as writable. - region.switch_state_to_writable(RegionState::Altering); - - if result.is_ok() { - info!( - "Region {} is altered, schema version is {}", - region.region_id, - region.metadata().schema_version + if let Err(res) = request_sender.send(notify).await { + warn!( + "Failed to send region change result back to the worker, region_id: {}, res: {:?}", + region.region_id, res ); } - - sender.send(result.map(|_| 0)); }); } + + /// Handles region change result. + pub(crate) fn handle_manifest_region_change_result(&self, change_result: RegionChangeResult) { + let region = match self.regions.get_region(change_result.region_id) { + Some(region) => region, + None => { + change_result.sender.send( + RegionNotFoundSnafu { + region_id: change_result.region_id, + } + .fail(), + ); + return; + } + }; + + if change_result.result.is_ok() { + // Apply the metadata to region's version. + region + .version_control + .alter_schema(change_result.new_meta, ®ion.memtable_builder); + + info!( + "Region {} is altered, schema version is {}", + region.region_id, + region.metadata().schema_version + ); + } + + // Sets the region as writable. + region.switch_state_to_writable(RegionState::Altering); + + change_result.sender.send(change_result.result.map(|_| 0)); + } } /// Checks the edit, writes and applies it. @@ -187,14 +248,9 @@ async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> { info!("Applying {edit:?} to region {}", region_id); - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit)); region .manifest_ctx - .update_manifest(RegionState::Editing, action_list, || { - // Applies the edit to the region. - region - .version_control - .apply_edit(edit, &[], region.file_purger.clone()); - }) + .update_manifest(RegionState::Editing, action_list) .await } diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 70aca9f6ac..da5b74e511 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -65,10 +65,20 @@ impl RegionWorkerLoop { // We are already in the worker loop so we can set the state first. region.switch_state_to_writable(RegionState::Truncating); - if let Err(e) = truncate_result.result { - // Unable to truncate the region. - truncate_result.sender.send(Err(e)); - return; + match truncate_result.result { + Ok(()) => { + // Applies the truncate action to the region. + region.version_control.truncate( + truncate_result.truncated_entry_id, + truncate_result.truncated_sequence, + ®ion.memtable_builder, + ); + } + Err(e) => { + // Unable to truncate the region. + truncate_result.sender.send(Err(e)); + return; + } } // Notifies flush scheduler.