From 4685b59ef1e5c903aec8bfd7fdf40b518388a077 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 24 Apr 2024 11:09:48 +0800 Subject: [PATCH] feat: write manifests in background tasks (#3709) * chore: truncate wip * feat: truncate and edit write manifest in background * refactor: wrap in manifest context * feat: alter write manifest in background * chore: fix compiler errors * feat: flush update manifest in background * feat: compaction update manifest in background * feat: set dropping state * feat: reset drop state * feat: check state before updating manifest * test: fix compaction test * refactor: rename method * chore: update comment * chore: discard state guard * refactor: use atomic cell to store state enum * chore: fix clippy * chore: update toml * chore: remove unused type alias * feat: check state after writing manifest * chore: address CR comments * chore: change status code * chore: Update src/mito2/src/region.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> * fix: executes applier --------- Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- Cargo.lock | 1 + Cargo.toml | 1 + src/common/error/src/status_code.rs | 1 + src/mito2/Cargo.toml | 1 + src/mito2/src/compaction.rs | 46 +++- src/mito2/src/compaction/twcs.rs | 95 ++++--- src/mito2/src/engine/catchup_test.rs | 4 +- src/mito2/src/engine/listener.rs | 8 +- src/mito2/src/engine/open_test.rs | 2 +- src/mito2/src/engine/set_readonly_test.rs | 2 +- src/mito2/src/error.rs | 9 +- src/mito2/src/flush.rs | 69 +++-- src/mito2/src/manifest/manager.rs | 9 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 +- src/mito2/src/region.rs | 285 ++++++++++++++++++--- src/mito2/src/region/opener.rs | 21 +- src/mito2/src/request.rs | 59 ++--- src/mito2/src/test_util/scheduler_util.rs | 35 ++- src/mito2/src/worker.rs | 58 ++--- src/mito2/src/worker/handle_alter.rs | 55 ++-- src/mito2/src/worker/handle_catchup.rs | 47 ++-- src/mito2/src/worker/handle_close.rs | 2 +- src/mito2/src/worker/handle_compaction.rs | 43 +--- src/mito2/src/worker/handle_drop.rs | 26 +- src/mito2/src/worker/handle_flush.rs | 29 +-- src/mito2/src/worker/handle_manifest.rs | 200 +++++++++++++++ src/mito2/src/worker/handle_truncate.rs | 73 ++++-- src/script/Cargo.toml | 2 +- 28 files changed, 818 insertions(+), 367 deletions(-) create mode 100644 src/mito2/src/worker/handle_manifest.rs diff --git a/Cargo.lock b/Cargo.lock index b273680cd3..4671cbf698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5416,6 +5416,7 @@ dependencies = [ "common-wal", "crc32fast", "criterion", + "crossbeam-utils", "datafusion", "datafusion-common", "datafusion-expr", diff --git a/Cargo.toml b/Cargo.toml index 62faf6e308..a74099fa4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ bytemuck = "1.12" bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.4", features = ["derive"] } +crossbeam-utils = "0.8" dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index e9602a184b..6f17221729 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -59,6 +59,7 @@ pub enum StatusCode { RegionNotFound = 4005, RegionAlreadyExists = 4006, RegionReadonly = 4007, + /// Region is not in a proper state to handle specific request. RegionNotReady = 4008, // If mutually exclusive operations are reached at the same time, // only one can be executed, another one will get region busy. diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f3a66c9999..f9fdb5b574 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,6 +32,7 @@ common-test-util = { workspace = true, optional = true } common-time.workspace = true common-wal.workspace = true crc32fast = "1" +crossbeam-utils.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 000a6e2a88..9cde6a0aad 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -37,9 +37,11 @@ use crate::error::{ use crate::metrics::COMPACTION_STAGE_ELAPSED; use crate::region::options::CompactionOptions; use crate::region::version::{VersionControlRef, VersionRef}; +use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::FilePurgerRef; +use crate::worker::WorkerListener; /// Region compaction request. pub struct CompactionRequest { @@ -54,6 +56,9 @@ pub struct CompactionRequest { /// Start time of compaction task. pub(crate) start_time: Instant, pub(crate) cache_manager: CacheManagerRef, + pub(crate) manifest_ctx: ManifestContextRef, + pub(crate) version_control: VersionControlRef, + pub(crate) listener: WorkerListener, } impl CompactionRequest { @@ -88,6 +93,8 @@ pub(crate) struct CompactionScheduler { /// Request sender of the worker that this scheduler belongs to. request_sender: Sender, cache_manager: CacheManagerRef, + engine_config: Arc, + listener: WorkerListener, } impl CompactionScheduler { @@ -95,12 +102,16 @@ impl CompactionScheduler { scheduler: SchedulerRef, request_sender: Sender, cache_manager: CacheManagerRef, + engine_config: Arc, + listener: WorkerListener, ) -> Self { Self { scheduler, region_status: HashMap::new(), request_sender, cache_manager, + engine_config, + listener, } } @@ -112,7 +123,7 @@ impl CompactionScheduler { access_layer: &AccessLayerRef, file_purger: &FilePurgerRef, waiter: OptionOutputTx, - engine_config: Arc, + manifest_ctx: &ManifestContextRef, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { // Region is compacting. Add the waiter to pending list. @@ -130,8 +141,10 @@ impl CompactionScheduler { let request = status.new_compaction_request( self.request_sender.clone(), waiter, - engine_config, + self.engine_config.clone(), self.cache_manager.clone(), + manifest_ctx, + self.listener.clone(), ); self.region_status.insert(region_id, status); self.schedule_compaction_request(request) @@ -141,7 +154,7 @@ impl CompactionScheduler { pub(crate) fn on_compaction_finished( &mut self, region_id: RegionId, - engine_config: Arc, + manifest_ctx: &ManifestContextRef, ) { let Some(status) = self.region_status.get_mut(®ion_id) else { return; @@ -150,8 +163,10 @@ impl CompactionScheduler { let request = status.new_compaction_request( self.request_sender.clone(), OptionOutputTx::none(), - engine_config, + self.engine_config.clone(), self.cache_manager.clone(), + manifest_ctx, + self.listener.clone(), ); // Try to schedule next compaction task for this region. if let Err(e) = self.schedule_compaction_request(request) { @@ -325,6 +340,8 @@ impl CompactionStatus { waiter: OptionOutputTx, engine_config: Arc, cache_manager: CacheManagerRef, + manifest_ctx: &ManifestContextRef, + listener: WorkerListener, ) -> CompactionRequest { let current_version = self.version_control.current().version; let start_time = Instant::now(); @@ -337,6 +354,9 @@ impl CompactionStatus { file_purger: self.file_purger.clone(), start_time, cache_manager, + manifest_ctx: manifest_ctx.clone(), + version_control: self.version_control.clone(), + listener, }; if let Some(pending) = self.pending_compaction.take() { @@ -371,6 +391,9 @@ mod tests { let version_control = Arc::new(builder.build()); let (output_tx, output_rx) = oneshot::channel(); let waiter = OptionOutputTx::from(output_tx); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; scheduler .schedule_compaction( builder.region_id(), @@ -378,7 +401,7 @@ mod tests { &env.access_layer, &purger, waiter, - Arc::new(MitoConfig::default()), + &manifest_ctx, ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); @@ -396,7 +419,7 @@ mod tests { &env.access_layer, &purger, waiter, - Arc::new(MitoConfig::default()), + &manifest_ctx, ) .unwrap(); let output = output_rx.await.unwrap().unwrap(); @@ -448,6 +471,9 @@ mod tests { .push_l0_file(90, end) .build(), ); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; scheduler .schedule_compaction( region_id, @@ -455,7 +481,7 @@ mod tests { &env.access_layer, &purger, OptionOutputTx::none(), - Arc::new(MitoConfig::default()), + &manifest_ctx, ) .unwrap(); // Should schedule 1 compaction. @@ -483,7 +509,7 @@ mod tests { &env.access_layer, &purger, OptionOutputTx::none(), - Arc::new(MitoConfig::default()), + &manifest_ctx, ) .unwrap(); assert_eq!(1, scheduler.region_status.len()); @@ -496,7 +522,7 @@ mod tests { .is_some()); // On compaction finished and schedule next compaction. - scheduler.on_compaction_finished(region_id, Arc::new(MitoConfig::default())); + scheduler.on_compaction_finished(region_id, &manifest_ctx); assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); // 5 files for next compaction. @@ -514,7 +540,7 @@ mod tests { &env.access_layer, &purger, OptionOutputTx::none(), - Arc::new(MitoConfig::default()), + &manifest_ctx, ) .unwrap(); assert_eq!(2, job_scheduler.num_jobs()); diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 7aedf73515..996af733e3 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -34,12 +34,15 @@ use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::CompactionRequest; use crate::config::MitoConfig; use crate::error::{self, CompactRegionSnafu}; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; use crate::read::{BoxedBatchReader, Source}; use crate::region::options::IndexOptions; +use crate::region::version::VersionControlRef; +use crate::region::{ManifestContextRef, RegionState}; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; @@ -47,6 +50,7 @@ use crate::sst::file::{FileHandle, FileId, FileMeta, IndexType, Level}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::parquet::WriteOptions; use crate::sst::version::LevelMeta; +use crate::worker::WorkerListener; const MAX_PARALLEL_COMPACTION: usize = 8; @@ -140,6 +144,9 @@ impl Picker for TwcsPicker { file_purger, start_time, cache_manager, + manifest_ctx, + version_control, + listener, } = req; let region_metadata = current_version.metadata.clone(); @@ -197,6 +204,9 @@ impl Picker for TwcsPicker { storage: current_version.options.storage.clone(), index_options: current_version.options.index_options.clone(), append_mode: current_version.options.append_mode, + manifest_ctx, + version_control, + listener, }; Some(Box::new(task)) } @@ -341,6 +351,12 @@ pub(crate) struct TwcsCompactionTask { pub(crate) index_options: IndexOptions, /// The region is using append mode. pub(crate) append_mode: bool, + /// Manifest context. + pub(crate) manifest_ctx: ManifestContextRef, + /// Version control to update. + pub(crate) version_control: VersionControlRef, + /// Event listener. + pub(crate) listener: WorkerListener, } impl Debug for TwcsCompactionTask { @@ -481,18 +497,55 @@ impl TwcsCompactionTask { Ok((output_files, inputs)) } - async fn handle_compaction(&mut self) -> error::Result<(Vec, Vec)> { + async fn handle_compaction(&mut self) -> error::Result<()> { self.mark_files_compacting(true); let merge_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["merge"]) .start_timer(); - let (output, mut compacted) = self.merge_ssts().await.map_err(|e| { - error!(e; "Failed to compact region: {}", self.region_id); - merge_timer.stop_and_discard(); - e - })?; - compacted.extend(self.expired_ssts.iter().map(FileHandle::meta)); - Ok((output, compacted)) + let (added, mut deleted) = match self.merge_ssts().await { + Ok(v) => v, + Err(e) => { + error!(e; "Failed to compact region: {}", self.region_id); + merge_timer.stop_and_discard(); + return Err(e); + } + }; + deleted.extend(self.expired_ssts.iter().map(FileHandle::meta)); + let merge_time = merge_timer.stop_and_record(); + info!( + "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", + self.region_id, + deleted, + added, + self.compaction_time_window, + self.waiters.len(), + merge_time, + ); + + self.listener.on_merge_ssts_finished(self.region_id).await; + + let _manifest_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["write_manifest"]) + .start_timer(); + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: added, + files_to_remove: deleted, + compaction_time_window: self + .compaction_time_window + .map(|seconds| Duration::from_secs(seconds as u64)), + flushed_entry_id: None, + flushed_sequence: None, + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + // We might leak files if we fail to update manifest. We can add a cleanup task to + // remove them later. + self.manifest_ctx + .update_manifest(RegionState::Writable, action_list, || { + self.version_control + .apply_edit(edit, &[], self.file_purger.clone()); + }) + .await } /// Handles compaction failure, notifies all waiters. @@ -520,27 +573,11 @@ impl TwcsCompactionTask { impl CompactionTask for TwcsCompactionTask { async fn run(&mut self) { let notify = match self.handle_compaction().await { - Ok((added, deleted)) => { - info!( - "Compacted SST files, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}", - deleted, - added, - self.compaction_time_window, - self.waiters.len(), - ); - - BackgroundNotify::CompactionFinished(CompactionFinished { - region_id: self.region_id, - compaction_outputs: added, - compacted_files: deleted, - senders: std::mem::take(&mut self.waiters), - file_purger: self.file_purger.clone(), - compaction_time_window: self - .compaction_time_window - .map(|seconds| Duration::from_secs(seconds as u64)), - start_time: self.start_time, - }) - } + Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished { + region_id: self.region_id, + senders: std::mem::take(&mut self.waiters), + start_time: self.start_time, + }), Err(e) => { error!(e; "Failed to compact region, region id: {}", self.region_id); let err = Arc::new(e); diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index b0dd75fe53..b9779b2ea1 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -345,7 +345,7 @@ async fn test_catchup_with_manifest_update() { // Ensures the mutable is empty. assert!(region.version().memtables.mutable.is_empty()); - let manifest = region.manifest_manager.read().await.manifest(); + let manifest = region.manifest_ctx.manifest().await; assert_eq!(manifest.manifest_version, 0); let resp = follower_engine @@ -361,7 +361,7 @@ async fn test_catchup_with_manifest_update() { // The inner region was replaced. We must get it again. let region = follower_engine.get_region(region_id).unwrap(); - let manifest = region.manifest_manager.read().await.manifest(); + let manifest = region.manifest_ctx.manifest().await; assert_eq!(manifest.manifest_version, 2); assert!(!region.is_writable()); diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 95ef82f277..8a4c33c6a8 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -51,9 +51,9 @@ pub trait EventListener: Send + Sync { let _ = removed; } - /// Notifies the listener that the region is going to handle the compaction - /// finished request. - async fn on_handle_compaction_finished(&self, region_id: RegionId) { + /// Notifies the listener that ssts has been merged and the region + /// is going to update its manifest. + async fn on_merge_ssts_finished(&self, region_id: RegionId) { let _ = region_id; } } @@ -201,7 +201,7 @@ impl CompactionListener { #[async_trait] impl EventListener for CompactionListener { - async fn on_handle_compaction_finished(&self, region_id: RegionId) { + async fn on_merge_ssts_finished(&self, region_id: RegionId) { info!("Handle compaction finished request, region {region_id}"); self.handle_finished_notify.notify_one(); diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index b68082aeac..dc0590cdd0 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -127,7 +127,7 @@ async fn test_engine_open_readonly() { ) .await .unwrap_err(); - assert_eq!(StatusCode::RegionReadonly, err.status_code()); + assert_eq!(StatusCode::RegionNotReady, err.status_code()); assert_eq!(Some(RegionRole::Follower), engine.role(region_id)); // Set writable and write. diff --git a/src/mito2/src/engine/set_readonly_test.rs b/src/mito2/src/engine/set_readonly_test.rs index 9b92d7c23a..9de3f0a832 100644 --- a/src/mito2/src/engine/set_readonly_test.rs +++ b/src/mito2/src/engine/set_readonly_test.rs @@ -66,7 +66,7 @@ async fn test_set_readonly_gracefully() { .await .unwrap_err(); - assert_eq!(error.status_code(), StatusCode::RegionReadonly); + assert_eq!(error.status_code(), StatusCode::RegionNotReady); engine.set_writable(region_id, true).unwrap(); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 201717fc7e..605b5c6df3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -29,6 +29,7 @@ use store_api::manifest::ManifestVersion; use store_api::storage::RegionId; use crate::cache::file_cache::FileType; +use crate::region::RegionState; use crate::sst::file::FileId; use crate::worker::WorkerId; @@ -395,9 +396,11 @@ pub enum Error { location: Location, }, - #[snafu(display("Region {} is read only", region_id))] - RegionReadonly { + #[snafu(display("Region {} is in {:?} state, expect: {:?}", region_id, state, expect))] + RegionState { region_id: RegionId, + state: RegionState, + expect: RegionState, location: Location, }, @@ -669,7 +672,7 @@ impl ErrorExt for Error { CompactRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), - RegionReadonly { .. } => StatusCode::RegionReadonly, + RegionState { .. } => StatusCode::RegionNotReady, JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index bdf27bdafd..0240e3274e 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -31,10 +31,12 @@ use crate::config::MitoConfig; use crate::error::{ Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, }; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL}; use crate::read::Source; use crate::region::options::IndexOptions; -use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; +use crate::region::version::{VersionControlData, VersionControlRef}; +use crate::region::{ManifestContextRef, RegionState}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -204,6 +206,7 @@ pub(crate) struct RegionFlushTask { pub(crate) engine_config: Arc, pub(crate) row_group_size: Option, pub(crate) cache_manager: CacheManagerRef, + pub(crate) manifest_ctx: ManifestContextRef, /// Index options for the region. pub(crate) index_options: IndexOptions, @@ -240,36 +243,30 @@ impl RegionFlushTask { // Get a version of this region before creating a job to get current // wal entry id, sequence and immutable memtables. let version_data = version_control.current(); + // This is used to update the version. + let version_control = version_control.clone(); Box::pin(async move { - self.do_flush(version_data).await; + self.do_flush(version_data, &version_control).await; }) } /// Runs the flush task. - async fn do_flush(&mut self, version_data: VersionControlData) { + async fn do_flush( + &mut self, + version_data: VersionControlData, + version_control: &VersionControlRef, + ) { let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer(); self.listener.on_flush_begin(self.region_id).await; - let worker_request = match self.flush_memtables(&version_data.version).await { - Ok(file_metas) => { - let memtables_to_remove = version_data - .version - .memtables - .immutables() - .iter() - .map(|m| m.id()) - .collect(); - + let worker_request = match self.flush_memtables(&version_data, version_control).await { + Ok(()) => { let flush_finished = FlushFinished { region_id: self.region_id, - file_metas, // The last entry has been flushed. flushed_entry_id: version_data.last_entry_id, - flushed_sequence: version_data.committed_sequence, - memtables_to_remove, senders: std::mem::take(&mut self.senders), - file_purger: self.file_purger.clone(), _timer: timer, }; WorkerRequest::Background { @@ -293,8 +290,13 @@ impl RegionFlushTask { self.send_worker_request(worker_request).await; } - /// Flushes memtables to level 0 SSTs. - async fn flush_memtables(&self, version: &VersionRef) -> Result> { + /// Flushes memtables to level 0 SSTs and updates the manifest. + async fn flush_memtables( + &self, + version_data: &VersionControlData, + version_control: &VersionControlRef, + ) -> Result<()> { + let version = &version_data.version; let timer = FLUSH_ELAPSED .with_label_values(&["flush_memtables"]) .start_timer(); @@ -382,7 +384,31 @@ impl RegionFlushTask { timer.stop_and_record(), ); - Ok(file_metas) + let memtables_to_remove: SmallVec<[_; 2]> = version_data + .version + .memtables + .immutables() + .iter() + .map(|m| m.id()) + .collect(); + let edit = RegionEdit { + files_to_add: file_metas, + files_to_remove: Vec::new(), + compaction_time_window: None, + // The last entry has been flushed. + flushed_entry_id: Some(version_data.last_entry_id), + flushed_sequence: Some(version_data.committed_sequence), + }; + info!("Applying {edit:?} to region {}", self.region_id); + + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + // We will leak files if the manifest update fails, but we ignore them for simplicity. We can + // add a cleanup job to remove them later. + self.manifest_ctx + .update_manifest(RegionState::Writable, action_list, || { + version_control.apply_edit(edit, &memtables_to_remove, self.file_purger.clone()); + }) + .await } /// Notify flush job status. @@ -775,6 +801,9 @@ mod tests { engine_config: Arc::new(MitoConfig::default()), row_group_size: None, cache_manager: Arc::new(CacheManager::default()), + manifest_ctx: env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await, index_options: IndexOptions::default(), }; task.push_sender(OptionOutputTx::from(output_tx)); diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 74aa1626d3..b121db9c48 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -257,9 +257,8 @@ impl RegionManifestManager { } /// Stops the manager. - pub async fn stop(&mut self) -> Result<()> { + pub async fn stop(&mut self) { self.stopped = true; - Ok(()) } /// Updates the manifest. Returns the current manifest version number. @@ -524,7 +523,7 @@ mod test { .unwrap() .unwrap(); // Stops it. - manager.stop().await.unwrap(); + manager.stop().await; // Open it. let manager = env @@ -564,7 +563,7 @@ mod test { manager.validate_manifest(&new_metadata, 1); // Reopen the manager. - manager.stop().await.unwrap(); + manager.stop().await; let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, None) .await @@ -651,7 +650,7 @@ mod test { // Reopen the manager, // we just calculate the size from the latest checkpoint file - manager.stop().await.unwrap(); + manager.stop().await; let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, None) .await diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index f712dc8b90..c79cfab43a 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -152,7 +152,7 @@ async fn manager_with_checkpoint_distance_1() { assert_eq!(expected_json, raw_json); // reopen the manager - manager.stop().await.unwrap(); + manager.stop().await; let manager = reopen_manager(&env, 1, CompressionType::Uncompressed).await; assert_eq!(10, manager.manifest().manifest_version); } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index d29e9fc469..95068d55f4 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -19,21 +19,21 @@ pub mod options; pub(crate) mod version; use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; -use common_telemetry::info; +use common_telemetry::{error, info, warn}; use common_wal::options::WalOptions; +use crossbeam_utils::atomic::AtomicCell; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; -use tokio::sync::RwLock as TokioRwLock; use crate::access_layer::AccessLayerRef; -use crate::error::{RegionNotFoundSnafu, RegionReadonlySnafu, Result}; -use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::error::{RegionNotFoundSnafu, RegionStateSnafu, RegionTruncatedSnafu, Result}; +use crate::manifest::action::{RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::RegionManifestManager; -use crate::memtable::{MemtableBuilderRef, MemtableId}; +use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::OnFailure; use crate::sst::file_purger::FilePurgerRef; @@ -57,6 +57,23 @@ impl RegionUsage { } } +/// State of the region. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegionState { + /// The region is opened but is still read-only. + ReadOnly, + /// The region is opened and is writable. + Writable, + /// The region is altering. + Altering, + /// The region is dropping. + Dropping, + /// The region is truncating. + Truncating, + /// The region is handling a region edit. + Editing, +} + /// Metadata and runtime status of a region. /// /// Writing and reading a region follow a single-writer-multi-reader rule: @@ -71,19 +88,19 @@ pub(crate) struct MitoRegion { pub(crate) region_id: RegionId, /// Version controller for this region. + /// + /// We MUST update the version control inside the write lock of the region manifest manager. pub(crate) version_control: VersionControlRef, /// SSTs accessor for this region. pub(crate) access_layer: AccessLayerRef, - /// Manager to maintain manifest for this region. - pub(crate) manifest_manager: TokioRwLock, + /// Context to maintain manifest for this region. + pub(crate) manifest_ctx: ManifestContextRef, /// SST file purger. pub(crate) file_purger: FilePurgerRef, /// Wal options of this region. pub(crate) wal_options: WalOptions, /// Last flush time in millis. last_flush_millis: AtomicI64, - /// Whether the region is writable. - writable: AtomicBool, /// Provider to get current time. time_provider: TimeProviderRef, /// Memtable builder for the region. @@ -94,15 +111,18 @@ pub(crate) type MitoRegionRef = Arc; impl MitoRegion { /// Stop background managers for this region. - pub(crate) async fn stop(&self) -> Result<()> { - self.manifest_manager.write().await.stop().await?; + pub(crate) async fn stop(&self) { + self.manifest_ctx + .manifest_manager + .write() + .await + .stop() + .await; info!( "Stopped region manifest manager, region_id: {}", self.region_id ); - - Ok(()) } /// Returns current metadata of the region. @@ -128,19 +148,73 @@ impl MitoRegion { self.last_flush_millis.store(now, Ordering::Relaxed); } - /// Returns whether the region is writable. - pub(crate) fn is_writable(&self) -> bool { - self.writable.load(Ordering::Relaxed) - } - /// Returns the region dir. pub(crate) fn region_dir(&self) -> &str { self.access_layer.region_dir() } - /// Sets the writable flag. + /// Returns whether the region is writable. + pub(crate) fn is_writable(&self) -> bool { + self.manifest_ctx.state.load() == RegionState::Writable + } + + /// Returns the state of the region. + pub(crate) fn state(&self) -> RegionState { + self.manifest_ctx.state.load() + } + + /// Sets the writable state. pub(crate) fn set_writable(&self, writable: bool) { - self.writable.store(writable, Ordering::Relaxed); + if writable { + // Only sets the region to writable if it is read only. + // This prevents others updating the manifest. + let _ = self + .manifest_ctx + .state + .compare_exchange(RegionState::ReadOnly, RegionState::Writable); + } else { + self.manifest_ctx.state.store(RegionState::ReadOnly); + } + } + + /// Sets the altering state. + /// You should call this method in the worker loop. + pub(crate) fn set_altering(&self) -> Result<()> { + self.compare_exchange_state(RegionState::Writable, RegionState::Altering) + } + + /// Sets the dropping state. + /// You should call this method in the worker loop. + pub(crate) fn set_dropping(&self) -> Result<()> { + self.compare_exchange_state(RegionState::Writable, RegionState::Dropping) + } + + /// Sets the truncating state. + /// You should call this method in the worker loop. + pub(crate) fn set_truncating(&self) -> Result<()> { + self.compare_exchange_state(RegionState::Writable, RegionState::Truncating) + } + + /// Sets the editing state. + /// You should call this method in the worker loop. + pub(crate) fn set_editing(&self) -> Result<()> { + self.compare_exchange_state(RegionState::Writable, RegionState::Editing) + } + + /// Sets the region to readonly gracefully. This acquires the manifest write lock. + pub(crate) async fn set_readonly_gracefully(&self) { + let _manager = self.manifest_ctx.manifest_manager.write().await; + // We acquires the write lock of the manifest manager to ensure that no one is updating the manifest. + // Then we change the state. + self.set_writable(false); + } + + /// Switches the region state to `RegionState::Writable` if the current state is `expect`. + /// Otherwise, logs an error. + pub(crate) fn switch_state_to_writable(&self, expect: RegionState) { + if let Err(e) = self.compare_exchange_state(expect, RegionState::Writable) { + error!(e; "failed to switch region state to writable, expect state is {:?}", expect); + } } /// Returns the region usage in bytes. @@ -155,7 +229,12 @@ impl MitoRegion { let wal_usage = self.estimated_wal_usage(memtable_usage); - let manifest_usage = self.manifest_manager.read().await.manifest_usage(); + let manifest_usage = self + .manifest_ctx + .manifest_manager + .read() + .await + .manifest_usage(); RegionUsage { region_id, @@ -171,28 +250,133 @@ impl MitoRegion { ((memtable_usage as f32) * ESTIMATED_WAL_FACTOR) as u64 } - pub(crate) async fn apply_edit( - &self, - edit: RegionEdit, - memtables_to_remove: &[MemtableId], - ) -> Result<()> { - info!("Applying {edit:?} to region {}", self.region_id); - - self.manifest_manager - .write() - .await - .update(RegionMetaActionList::with_action(RegionMetaAction::Edit( - edit.clone(), - ))) - .await?; - - // Apply edit to region's version. - self.version_control - .apply_edit(edit, memtables_to_remove, self.file_purger.clone()); + /// Sets the state of the region to given state if the current state equals to + /// the expected. + fn compare_exchange_state(&self, expect: RegionState, state: RegionState) -> Result<()> { + self.manifest_ctx + .state + .compare_exchange(expect, state) + .map_err(|actual| { + RegionStateSnafu { + region_id: self.region_id, + state: actual, + expect, + } + .build() + })?; Ok(()) } } +/// Context to update the region manifest. +#[derive(Debug)] +pub(crate) struct ManifestContext { + /// Manager to maintain manifest for this region. + manifest_manager: tokio::sync::RwLock, + /// The state of the region. The region checks the state before updating + /// manifest. + state: AtomicCell, +} + +impl ManifestContext { + pub(crate) fn new(manager: RegionManifestManager, state: RegionState) -> Self { + ManifestContext { + manifest_manager: tokio::sync::RwLock::new(manager), + state: AtomicCell::new(state), + } + } + + pub(crate) async fn has_update(&self) -> Result { + self.manifest_manager.read().await.has_update().await + } + + /// Updates the manifest if current state is `expect_state` and executes + /// the `applier` if the manifest is updated. + pub(crate) async fn update_manifest( + &self, + expect_state: RegionState, + action_list: RegionMetaActionList, + applier: impl FnOnce(), + ) -> Result<()> { + // Acquires the write lock of the manifest manager. + let mut manager = self.manifest_manager.write().await; + // Gets current manifest. + let manifest = manager.manifest(); + // Checks state inside the lock. This is to ensure that we won't update the manifest + // after `set_readonly_gracefully()` is called. + let current_state = self.state.load(); + ensure!( + current_state == expect_state, + RegionStateSnafu { + region_id: manifest.metadata.region_id, + state: current_state, + expect: expect_state, + } + ); + + for action in &action_list.actions { + // Checks whether the edit is still applicable. + let RegionMetaAction::Edit(edit) = &action else { + continue; + }; + + // Checks whether the region is truncated. + let Some(truncated_entry_id) = manifest.truncated_entry_id else { + continue; + }; + + // This is an edit from flush. + if let Some(flushed_entry_id) = edit.flushed_entry_id { + ensure!( + truncated_entry_id < flushed_entry_id, + RegionTruncatedSnafu { + region_id: manifest.metadata.region_id, + } + ); + } + + // This is an edit from compaction. + if !edit.files_to_remove.is_empty() { + // Input files of the compaction task has been truncated. + for file in &edit.files_to_remove { + ensure!( + manifest.files.contains_key(&file.file_id), + RegionTruncatedSnafu { + region_id: manifest.metadata.region_id, + } + ); + } + } + } + + // Now we can update the manifest. + manager.update(action_list).await.inspect_err( + |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id), + )?; + + // Executes the applier. We MUST hold the write lock. + applier(); + + if self.state.load() == RegionState::ReadOnly { + warn!( + "Region {} becomes read-only while updating manifest which may cause inconsistency", + manifest.metadata.region_id + ); + } + + Ok(()) + } +} + +#[cfg(test)] +impl ManifestContext { + pub(crate) async fn manifest(&self) -> Arc { + self.manifest_manager.read().await.manifest() + } +} + +pub(crate) type ManifestContextRef = Arc; + /// Regions indexed by ids. #[derive(Debug, Default)] pub(crate) struct RegionMap { @@ -225,7 +409,14 @@ impl RegionMap { let region = self .get_region(region_id) .context(RegionNotFoundSnafu { region_id })?; - ensure!(region.is_writable(), RegionReadonlySnafu { region_id }); + ensure!( + region.is_writable(), + RegionStateSnafu { + region_id, + state: region.state(), + expect: RegionState::Writable, + } + ); Ok(region) } @@ -265,3 +456,15 @@ impl RegionMap { } pub(crate) type RegionMapRef = Arc; + +#[cfg(test)] +mod tests { + use crossbeam_utils::atomic::AtomicCell; + + use crate::region::RegionState; + + #[test] + fn test_region_state_lock_free() { + assert!(AtomicCell::::is_lock_free()); + } +} diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 739ae30603..0744c49202 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -15,7 +15,7 @@ //! Region opener. use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, AtomicI64}; +use std::sync::atomic::AtomicI64; use std::sync::Arc; use common_telemetry::{debug, error, info, warn}; @@ -27,7 +27,6 @@ use snafu::{ensure, OptionExt}; use store_api::logstore::LogStore; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::storage::{ColumnId, RegionId}; -use tokio::sync::RwLock; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; @@ -41,7 +40,7 @@ use crate::memtable::time_partition::TimePartitions; use crate::memtable::MemtableBuilderProvider; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; -use crate::region::MitoRegion; +use crate::region::{ManifestContext, MitoRegion, RegionState}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; @@ -203,7 +202,11 @@ impl RegionOpener { region_id, version_control, access_layer: access_layer.clone(), - manifest_manager: RwLock::new(manifest_manager), + // Region is writable after it is created. + manifest_ctx: Arc::new(ManifestContext::new( + manifest_manager, + RegionState::Writable, + )), file_purger: Arc::new(LocalFilePurger::new( self.purge_scheduler, access_layer, @@ -211,8 +214,6 @@ impl RegionOpener { )), wal_options, last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), - // Region is writable after it is created. - writable: AtomicBool::new(true), time_provider, memtable_builder, }) @@ -331,12 +332,14 @@ impl RegionOpener { region_id: self.region_id, version_control, access_layer, - manifest_manager: RwLock::new(manifest_manager), + // Region is always opened in read only mode. + manifest_ctx: Arc::new(ManifestContext::new( + manifest_manager, + RegionState::ReadOnly, + )), file_purger, wal_options, last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), - // Region is always opened in read only mode. - writable: AtomicBool::new(false), time_provider, memtable_builder, }; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 43a7b2eb57..b7eed93275 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -16,18 +16,17 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use api::helper::{ is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value, ColumnDataTypeWrapper, }; use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value}; -use common_telemetry::{info, warn}; +use common_telemetry::info; use datatypes::prelude::DataType; use prometheus::HistogramTimer; use prost::Message; -use smallvec::SmallVec; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadata}; use store_api::region_engine::SetReadonlyResponse; @@ -44,10 +43,7 @@ use crate::error::{ FlushRegionSnafu, InvalidRequestSnafu, Result, }; use crate::manifest::action::RegionEdit; -use crate::memtable::MemtableId; use crate::metrics::COMPACTION_ELAPSED_TOTAL; -use crate::sst::file::FileMeta; -use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; use crate::wal::EntryId; /// Request to write a region. @@ -620,6 +616,8 @@ pub(crate) enum BackgroundNotify { CompactionFinished(CompactionFinished), /// Compaction has failed. CompactionFailed(CompactionFailed), + /// Truncate result. + Truncate(TruncateResult), } /// Notifies a flush job is finished. @@ -627,18 +625,10 @@ pub(crate) enum BackgroundNotify { pub(crate) struct FlushFinished { /// Region id. pub(crate) region_id: RegionId, - /// Meta of the flushed SSTs. - pub(crate) file_metas: Vec, /// Entry id of flushed data. pub(crate) flushed_entry_id: EntryId, - /// Sequence of flushed data. - pub(crate) flushed_sequence: SequenceNumber, - /// Id of memtables to remove. - pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>, /// Flush result senders. pub(crate) senders: Vec, - /// File purger for cleaning files on failure. - pub(crate) file_purger: FilePurgerRef, /// Flush timer. pub(crate) _timer: HistogramTimer, } @@ -660,12 +650,6 @@ impl OnFailure for FlushFinished { region_id: self.region_id, })); } - // Clean flushed files. - for file in &self.file_metas { - self.file_purger.send_request(PurgeRequest { - file_meta: file.clone(), - }); - } } } @@ -681,16 +665,8 @@ pub(crate) struct FlushFailed { pub(crate) struct CompactionFinished { /// Region id. pub(crate) region_id: RegionId, - /// Compaction output files that are to be added to region version. - pub(crate) compaction_outputs: Vec, - /// Compacted files that are to be removed from region version. - pub(crate) compacted_files: Vec, /// Compaction result senders. pub(crate) senders: Vec, - /// File purger for cleaning files on failure. - pub(crate) file_purger: FilePurgerRef, - /// Inferred Compaction time window. - pub(crate) compaction_time_window: Option, /// Start time of compaction task. pub(crate) start_time: Instant, } @@ -708,8 +684,7 @@ impl CompactionFinished { } impl OnFailure for CompactionFinished { - /// Compaction succeeded but failed to update manifest or region's already been dropped, - /// clean compaction output files. + /// Compaction succeeded but failed to update manifest or region's already been dropped. fn on_failure(&mut self, err: Error) { let err = Arc::new(err); for sender in self.senders.drain(..) { @@ -717,15 +692,6 @@ impl OnFailure for CompactionFinished { region_id: self.region_id, })); } - for file in &self.compaction_outputs { - warn!( - "Cleaning region {} compaction output file: {}", - self.region_id, file.file_id - ); - self.file_purger.send_request(PurgeRequest { - file_meta: file.clone(), - }); - } } } @@ -737,6 +703,21 @@ pub(crate) struct CompactionFailed { pub(crate) err: Arc, } +/// Notifies the truncate result of a region. +#[derive(Debug)] +pub(crate) struct TruncateResult { + /// Region id. + pub(crate) region_id: RegionId, + /// Result sender. + pub(crate) sender: OptionOutputTx, + /// Truncate result. + pub(crate) result: Result<()>, + /// Truncated entry id. + pub(crate) truncated_entry_id: EntryId, + /// Truncated sequence. + pub(crate) truncated_sequence: SequenceNumber, +} + #[cfg(test)] mod tests { use api::v1::value::ValueData; diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 4a08099391..bfaf569123 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -16,19 +16,25 @@ use std::sync::Arc; +use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use object_store::services::Fs; use object_store::util::join_dir; use object_store::ObjectStore; +use store_api::metadata::RegionMetadataRef; use tokio::sync::mpsc::Sender; use crate::access_layer::{AccessLayer, AccessLayerRef}; use crate::cache::CacheManager; use crate::compaction::CompactionScheduler; +use crate::config::MitoConfig; use crate::flush::FlushScheduler; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::region::{ManifestContext, ManifestContextRef, RegionState}; use crate::request::WorkerRequest; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; +use crate::worker::WorkerListener; /// Scheduler mocker. pub(crate) struct SchedulerEnv { @@ -73,7 +79,13 @@ impl SchedulerEnv { ) -> CompactionScheduler { let scheduler = self.get_scheduler(); - CompactionScheduler::new(scheduler, request_sender, Arc::new(CacheManager::default())) + CompactionScheduler::new( + scheduler, + request_sender, + Arc::new(CacheManager::default()), + Arc::new(MitoConfig::default()), + WorkerListener::default(), + ) } /// Creates a new flush scheduler. @@ -83,6 +95,27 @@ impl SchedulerEnv { FlushScheduler::new(scheduler) } + /// Creates a new manifest context. + pub(crate) async fn mock_manifest_context( + &self, + metadata: RegionMetadataRef, + ) -> ManifestContextRef { + Arc::new(ManifestContext::new( + RegionManifestManager::new( + metadata, + RegionManifestOptions { + manifest_dir: "".to_string(), + object_store: self.access_layer.object_store().clone(), + compress_type: CompressionType::Uncompressed, + checkpoint_distance: 10, + }, + ) + .await + .unwrap(), + RegionState::Writable, + )) + } + fn get_scheduler(&self) -> SchedulerRef { self.scheduler .clone() diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 851928ec14..ca6eaa6bfe 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -21,6 +21,7 @@ mod handle_compaction; mod handle_create; mod handle_drop; mod handle_flush; +mod handle_manifest; mod handle_open; mod handle_truncate; mod handle_write; @@ -45,9 +46,8 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; -use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; +use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; -use crate::manifest::action::RegionEdit; use crate::memtable::MemtableBuilderProvider; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{ @@ -367,7 +367,7 @@ impl WorkerStarter { running: running.clone(), memtable_builder_provider: MemtableBuilderProvider::new( Some(self.write_buffer_manager.clone()), - self.config, + self.config.clone(), ), purge_scheduler: self.purge_scheduler.clone(), write_buffer_manager: self.write_buffer_manager, @@ -376,6 +376,8 @@ impl WorkerStarter { self.scheduler, sender.clone(), self.cache_manager.clone(), + self.config, + self.listener.clone(), ), stalled_requests: StalledRequests::default(), listener: self.listener, @@ -622,10 +624,7 @@ impl RegionWorkerLoop { edit, tx, } => { - let result = self.edit_region(region_id, edit).await; - if let Err(Err(e)) = tx.send(result) { - warn!("Failed to send edit region error to caller, error: {e:?}"); - } + self.handle_region_edit(region_id, edit, tx).await; } // We receive a stop signal, but we still want to process remaining // requests. The worker thread will then check the running flag and @@ -669,7 +668,11 @@ impl RegionWorkerLoop { self.handle_compaction_request(ddl.region_id, ddl.sender); continue; } - DdlRequest::Truncate(_) => self.handle_truncate_request(ddl.region_id).await, + DdlRequest::Truncate(_) => { + self.handle_truncate_request(ddl.region_id, ddl.sender) + .await; + continue; + } DdlRequest::Catchup(req) => self.handle_catchup_request(ddl.region_id, req).await, }; @@ -706,6 +709,7 @@ impl RegionWorkerLoop { self.handle_compaction_finished(region_id, req).await } BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, + BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await, } } @@ -716,35 +720,17 @@ impl RegionWorkerLoop { sender: oneshot::Sender, ) { if let Some(region) = self.regions.get_region(region_id) { - region.set_writable(false); + // We need to do this in background as we need the manifest lock. + common_runtime::spawn_bg(async move { + region.set_readonly_gracefully().await; - let last_entry_id = region.version_control.current().last_entry_id; - let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id))); + let last_entry_id = region.version_control.current().last_entry_id; + let _ = sender.send(SetReadonlyResponse::success(Some(last_entry_id))); + }); } else { let _ = sender.send(SetReadonlyResponse::NotFound); } } - - async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> { - let region = self.regions.writable_region(region_id)?; - - for file_meta in &edit.files_to_add { - let is_exist = region.access_layer.is_exist(file_meta).await?; - ensure!( - is_exist, - InvalidRequestSnafu { - region_id, - reason: format!( - "trying to add a not exist file '{}' when editing region", - file_meta.file_id - ) - } - ); - } - - // Applying region edit directly has nothing to do with memtables (at least for now). - region.apply_edit(edit, &[]).await - } } impl RegionWorkerLoop { @@ -753,9 +739,7 @@ impl RegionWorkerLoop { // Closes remaining regions. let regions = self.regions.list_regions(); for region in regions { - if let Err(e) = region.stop().await { - error!(e; "Failed to stop region {}", region.region_id); - } + region.stop().await; } self.regions.clear(); @@ -825,10 +809,10 @@ impl WorkerListener { let _ = removed; } - pub(crate) async fn on_handle_compaction_finished(&self, region_id: RegionId) { + pub(crate) async fn on_merge_ssts_finished(&self, region_id: RegionId) { #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { - listener.on_handle_compaction_finished(region_id).await; + listener.on_merge_ssts_finished(region_id).await; } // Avoid compiler warning. let _ = region_id; diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 4c88358be2..27aadbfc0d 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -16,7 +16,7 @@ use std::sync::Arc; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, info}; use snafu::ResultExt; use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::region_request::RegionAlterRequest; @@ -26,9 +26,7 @@ use crate::error::{ InvalidMetadataSnafu, InvalidRegionRequestSchemaVersionSnafu, InvalidRegionRequestSnafu, Result, }; use crate::flush::FlushReason; -use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList}; -use crate::region::version::Version; -use crate::region::MitoRegionRef; +use crate::manifest::action::RegionChange; use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; use crate::worker::RegionWorkerLoop; @@ -107,51 +105,28 @@ impl RegionWorkerLoop { return; } - // Now we can alter the region directly. - if let Err(e) = alter_region_schema(®ion, &version, request).await { - error!(e; "Failed to alter region schema, region_id: {}", region_id); - sender.send(Err(e)); - return; - } - info!( - "Schema of region {} is altered from {} to {}", + "Try to alter region {} from version {} to {}", region_id, version.metadata.schema_version, region.metadata().schema_version ); - // Notifies waiters. - sender.send(Ok(0)); + let new_meta = match metadata_after_alteration(&version.metadata, request) { + Ok(new_meta) => new_meta, + Err(e) => { + sender.send(Err(e)); + return; + } + }; + // Persist the metadata to region's manifest. + let change = RegionChange { + metadata: new_meta.clone(), + }; + self.handle_manifest_region_change(region, change, sender) } } -/// Alter the schema of the region. -async fn alter_region_schema( - region: &MitoRegionRef, - version: &Version, - request: RegionAlterRequest, -) -> Result<()> { - let new_meta = metadata_after_alteration(&version.metadata, request)?; - // Persist the metadata to region's manifest. - let change = RegionChange { - metadata: new_meta.clone(), - }; - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change)); - region - .manifest_manager - .write() - .await - .update(action_list) - .await?; - - // Apply the metadata to region's version. - region - .version_control - .alter_schema(new_meta, ®ion.memtable_builder); - Ok(()) -} - /// Creates a metadata after applying the alter `request` to the old `metadata`. /// /// Returns an error if the `request` is invalid. diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 13c5902663..f6d890dc8f 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -45,31 +45,30 @@ impl RegionWorkerLoop { let is_mutable_empty = region.version().memtables.mutable.is_empty(); // Utilizes the short circuit evaluation. - let region = - if !is_mutable_empty || region.manifest_manager.read().await.has_update().await? { - info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}"); - let reopened_region = Arc::new( - RegionOpener::new( - region_id, - region.region_dir(), - self.memtable_builder_provider.clone(), - self.object_store_manager.clone(), - self.purge_scheduler.clone(), - self.intermediate_manager.clone(), - ) - .cache(Some(self.cache_manager.clone())) - .options(region.version().options.clone()) - .skip_wal_replay(true) - .open(&self.config, &self.wal) - .await?, - ); - debug_assert!(!reopened_region.is_writable()); - self.regions.insert_region(reopened_region.clone()); + let region = if !is_mutable_empty || region.manifest_ctx.has_update().await? { + info!("Reopening the region: {region_id}, empty mutable: {is_mutable_empty}"); + let reopened_region = Arc::new( + RegionOpener::new( + region_id, + region.region_dir(), + self.memtable_builder_provider.clone(), + self.object_store_manager.clone(), + self.purge_scheduler.clone(), + self.intermediate_manager.clone(), + ) + .cache(Some(self.cache_manager.clone())) + .options(region.version().options.clone()) + .skip_wal_replay(true) + .open(&self.config, &self.wal) + .await?, + ); + debug_assert!(!reopened_region.is_writable()); + self.regions.insert_region(reopened_region.clone()); - reopened_region - } else { - region - }; + reopened_region + } else { + region + }; let flushed_entry_id = region.version_control.current().last_entry_id; info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 80f1bfa632..26a6f9a34d 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -33,7 +33,7 @@ impl RegionWorkerLoop { info!("Try to close region {}", region_id); - region.stop().await?; + region.stop().await; self.regions.remove_region(region_id); // Clean flush status. self.flush_scheduler.on_region_closed(region_id); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 6fcf382035..dd624c95e5 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -16,9 +16,8 @@ use common_telemetry::{error, info, warn}; use store_api::logstore::LogStore; use store_api::storage::RegionId; -use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::metrics::{COMPACTION_REQUEST_COUNT, COMPACTION_STAGE_ELAPSED}; -use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; +use crate::metrics::COMPACTION_REQUEST_COUNT; +use crate::request::{CompactionFailed, CompactionFinished, OptionOutputTx}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -38,7 +37,7 @@ impl RegionWorkerLoop { ®ion.access_layer, ®ion.file_purger, sender, - self.config.clone(), + ®ion.manifest_ctx, ) { error!(e; "Failed to schedule compaction task for region: {}", region_id); } else { @@ -55,8 +54,6 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: CompactionFinished, ) { - self.listener.on_handle_compaction_finished(region_id).await; - let Some(region) = self.regions.writable_region_or(region_id, &mut request) else { warn!( "Unable to finish the compaction task for a read only region {}", @@ -65,44 +62,12 @@ impl RegionWorkerLoop { return; }; - { - let manifest_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["write_manifest"]) - .start_timer(); - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: std::mem::take(&mut request.compaction_outputs), - files_to_remove: std::mem::take(&mut request.compacted_files), - compaction_time_window: request.compaction_time_window, - flushed_entry_id: None, - flushed_sequence: None, - }; - let action_list = - RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - if let Err(e) = region - .manifest_manager - .write() - .await - .update(action_list) - .await - { - error!(e; "Failed to update manifest, region: {}", region_id); - manifest_timer.stop_and_discard(); - request.on_failure(e); - return; - } - - // Apply edit to region's version. - region - .version_control - .apply_edit(edit, &[], region.file_purger.clone()); - } // compaction finished. request.on_success(); // Schedule next compaction if necessary. self.compaction_scheduler - .on_compaction_finished(region_id, self.config.clone()); + .on_compaction_finished(region_id, ®ion.manifest_ctx); } /// When compaction fails, we simply log the error. diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 490fa432aa..814d48bd1b 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -16,7 +16,7 @@ use std::time::Duration; -use common_telemetry::{info, warn}; +use common_telemetry::{error, info, warn}; use futures::TryStreamExt; use object_store::util::join_path; use object_store::{EntryMode, ObjectStore}; @@ -27,7 +27,7 @@ use tokio::time::sleep; use crate::error::{OpenDalSnafu, Result}; use crate::metrics::REGION_COUNT; -use crate::region::RegionMapRef; +use crate::region::{RegionMapRef, RegionState}; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; const GC_TASK_INTERVAL_SEC: u64 = 5 * 60; // 5 minutes @@ -42,17 +42,27 @@ impl RegionWorkerLoop { info!("Try to drop region: {}", region_id); - // write dropping marker + // Marks the region as dropping. + region.set_dropping()?; + // Writes dropping marker + // We rarely drop a region so we still operate in the worker loop. let marker_path = join_path(region.access_layer.region_dir(), DROPPING_MARKER_FILE); region .access_layer .object_store() .write(&marker_path, vec![]) .await - .context(OpenDalSnafu)?; + .context(OpenDalSnafu) + .inspect_err(|e| { + error!(e; "Failed to write the drop marker file for region {}", region_id); - region.stop().await?; - // remove this region from region map to prevent other requests from accessing this region + // Sets the state back to writable. It's possible that the marker file has been written. + // We sets the state back to writable so we can retry the drop operation. + region.switch_state_to_writable(RegionState::Dropping); + })?; + + region.stop().await; + // Removes this region from region map to prevent other requests from accessing this region self.regions.remove_region(region_id); self.dropping_regions.insert_region(region.clone()); // Notifies flush scheduler. @@ -60,7 +70,7 @@ impl RegionWorkerLoop { // Notifies compaction scheduler. self.compaction_scheduler.on_region_dropped(region_id); - // mark region version as dropped + // Marks region version as dropped region .version_control .mark_dropped(®ion.memtable_builder); @@ -71,7 +81,7 @@ impl RegionWorkerLoop { REGION_COUNT.dec(); - // detach a background task to delete the region dir + // Detaches a background task to delete the region dir let region_dir = region.access_layer.region_dir().to_owned(); let object_store = region.access_layer.object_store().clone(); let dropping_regions = self.dropping_regions.clone(); diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 2f3e184bf7..5ac2447c4c 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -22,9 +22,8 @@ use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; use crate::config::MitoConfig; -use crate::error::{RegionTruncatedSnafu, Result}; +use crate::error::Result; use crate::flush::{FlushReason, RegionFlushTask}; -use crate::manifest::action::RegionEdit; use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -178,6 +177,7 @@ impl RegionWorkerLoop { engine_config, row_group_size, cache_manager: self.cache_manager.clone(), + manifest_ctx: region.manifest_ctx.clone(), index_options: region.version().options.index_options.clone(), } } @@ -198,29 +198,6 @@ impl RegionWorkerLoop { return; }; - // The flush task before truncating the region fails immediately. - let version_data = region.version_control.current(); - if let Some(truncated_entry_id) = version_data.version.truncated_entry_id { - if truncated_entry_id >= request.flushed_entry_id { - request.on_failure(RegionTruncatedSnafu { region_id }.build()); - return; - } - } - - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: std::mem::take(&mut request.file_metas), - files_to_remove: Vec::new(), - compaction_time_window: None, - flushed_entry_id: Some(request.flushed_entry_id), - flushed_sequence: Some(request.flushed_sequence), - }; - if let Err(e) = region.apply_edit(edit, &request.memtables_to_remove).await { - error!(e; "Failed to write manifest, region: {}", region_id); - request.on_failure(e); - return; - } - region.update_flush_millis(); // Delete wal. @@ -263,7 +240,7 @@ impl RegionWorkerLoop { ®ion.access_layer, ®ion.file_purger, OptionOutputTx::none(), - self.config.clone(), + ®ion.manifest_ctx, ) { warn!( "Failed to schedule compaction after flush, region: {}, err: {}", diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs new file mode 100644 index 0000000000..ca785d4f86 --- /dev/null +++ b/src/mito2/src/worker/handle_manifest.rs @@ -0,0 +1,200 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handles manifest. +//! +//! It updates the manifest and applies the changes to the region in background. + +use common_telemetry::{info, warn}; +use snafu::ensure; +use store_api::storage::RegionId; +use tokio::sync::oneshot::Sender; + +use crate::error::{InvalidRequestSnafu, Result}; +use crate::manifest::action::{ + RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, +}; +use crate::region::{MitoRegionRef, RegionState}; +use crate::request::{BackgroundNotify, OptionOutputTx, TruncateResult, WorkerRequest}; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + /// Handles region edit request. + pub(crate) async fn handle_region_edit( + &self, + region_id: RegionId, + edit: RegionEdit, + sender: Sender>, + ) { + let region = match self.regions.writable_region(region_id) { + Ok(region) => region, + Err(e) => { + let _ = sender.send(Err(e)); + return; + } + }; + + // Marks the region as editing. + if let Err(e) = region.set_editing() { + let _ = sender.send(Err(e)); + return; + } + + // Now the region is in editing state. + // Updates manifest in background. + common_runtime::spawn_bg(async move { + let result = edit_region(®ion, edit).await; + + if let Err(res) = sender.send(result) { + warn!( + "Failed to send result back to the worker, region_id: {}, res: {:?}", + region_id, res + ); + } + + // Sets the region as writable. For simplicity, we don't send the result + // back to the worker. + region.switch_state_to_writable(RegionState::Editing); + }); + } + + /// Writes truncate action to the manifest and then applies it to the region in background. + pub(crate) fn handle_manifest_truncate_action( + &self, + region: MitoRegionRef, + truncate: RegionTruncate, + sender: OptionOutputTx, + ) { + // Marks the region as truncating. + // This prevents the region from being accessed by other write requests. + if let Err(e) = region.set_truncating() { + sender.send(Err(e)); + return; + } + // Now the region is in truncating state. + + let request_sender = self.sender.clone(); + let manifest_ctx = region.manifest_ctx.clone(); + let version_control = region.version_control.clone(); + let memtable_builder = region.memtable_builder.clone(); + + // Updates manifest in background. + common_runtime::spawn_bg(async move { + // Write region truncated to manifest. + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); + + let result = manifest_ctx + .update_manifest(RegionState::Truncating, action_list, || { + // Applies the truncate action to the region. + version_control.truncate( + truncate.truncated_entry_id, + truncate.truncated_sequence, + &memtable_builder, + ); + }) + .await; + + // Sends the result back to the request sender. + let truncate_result = TruncateResult { + region_id: truncate.region_id, + sender, + result, + truncated_entry_id: truncate.truncated_entry_id, + truncated_sequence: truncate.truncated_sequence, + }; + let _ = request_sender + .send(WorkerRequest::Background { + region_id: truncate.region_id, + notify: BackgroundNotify::Truncate(truncate_result), + }) + .await + .inspect_err(|_| warn!("failed to send truncate result")); + }); + } + + /// Writes region change action to the manifest and then applies it to the region in background. + pub(crate) fn handle_manifest_region_change( + &self, + region: MitoRegionRef, + change: RegionChange, + sender: OptionOutputTx, + ) { + // Marks the region as altering. + if let Err(e) = region.set_altering() { + sender.send(Err(e)); + return; + } + + // Now the region is in altering state. + common_runtime::spawn_bg(async move { + let new_meta = change.metadata.clone(); + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change)); + + let result = region + .manifest_ctx + .update_manifest(RegionState::Altering, action_list, || { + // Apply the metadata to region's version. + region + .version_control + .alter_schema(new_meta, ®ion.memtable_builder); + }) + .await; + + // Sets the region as writable. + region.switch_state_to_writable(RegionState::Altering); + + if result.is_ok() { + info!( + "Region {} is altered, schema version is {}", + region.region_id, + region.metadata().schema_version + ); + } + + sender.send(result.map(|_| 0)); + }); + } +} + +/// Checks the edit, writes and applies it. +async fn edit_region(region: &MitoRegionRef, edit: RegionEdit) -> Result<()> { + let region_id = region.region_id; + for file_meta in &edit.files_to_add { + let is_exist = region.access_layer.is_exist(file_meta).await?; + ensure!( + is_exist, + InvalidRequestSnafu { + region_id, + reason: format!( + "trying to add a not exist file '{}' when editing region", + file_meta.file_id + ) + } + ); + } + + info!("Applying {edit:?} to region {}", region_id); + + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + region + .manifest_ctx + .update_manifest(RegionState::Editing, action_list, || { + // Applies the edit to the region. + region + .version_control + .apply_edit(edit, &[], region.file_purger.clone()); + }) + .await +} diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index aa3c34b49b..f5598286a5 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -16,19 +16,23 @@ use common_telemetry::info; use store_api::logstore::LogStore; -use store_api::region_request::AffectedRows; use store_api::storage::RegionId; -use crate::error::Result; -use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionTruncate}; +use crate::error::RegionNotFoundSnafu; +use crate::manifest::action::RegionTruncate; +use crate::region::RegionState; +use crate::request::{OptionOutputTx, TruncateResult}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { pub(crate) async fn handle_truncate_request( &mut self, region_id: RegionId, - ) -> Result { - let region = self.regions.writable_region(region_id)?; + mut sender: OptionOutputTx, + ) { + let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { + return; + }; info!("Try to truncate region {}", region_id); @@ -42,36 +46,55 @@ impl RegionWorkerLoop { truncated_entry_id, truncated_sequence, }; - let action_list = - RegionMetaActionList::with_action(RegionMetaAction::Truncate(truncate.clone())); - region - .manifest_manager - .write() - .await - .update(action_list) - .await?; + self.handle_manifest_truncate_action(region, truncate, sender); + } + + /// Handles truncate result. + pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) { + let region_id = truncate_result.region_id; + let Some(region) = self.regions.get_region(region_id) else { + truncate_result.sender.send( + RegionNotFoundSnafu { + region_id: truncate_result.region_id, + } + .fail(), + ); + return; + }; + + // We are already in the worker loop so we can set the state first. + region.switch_state_to_writable(RegionState::Truncating); + + if let Err(e) = truncate_result.result { + // Unable to truncate the region. + truncate_result.sender.send(Err(e)); + return; + } // Notifies flush scheduler. self.flush_scheduler.on_region_truncated(region_id); // Notifies compaction scheduler. self.compaction_scheduler.on_region_truncated(region_id); - // Reset region's version and mark all SSTs deleted. - region.version_control.truncate( - truncated_entry_id, - truncated_sequence, - ®ion.memtable_builder, - ); - // Make all data obsolete. - self.wal - .obsolete(region_id, truncated_entry_id, ®ion.wal_options) - .await?; + if let Err(e) = self + .wal + .obsolete( + region_id, + truncate_result.truncated_entry_id, + ®ion.wal_options, + ) + .await + { + truncate_result.sender.send(Err(e)); + return; + } + info!( "Complete truncating region: {}, entry id: {} and sequence: {}.", - region_id, truncated_entry_id, truncated_sequence + region_id, truncate_result.truncated_entry_id, truncate_result.truncated_sequence ); - Ok(0) + truncate_result.sender.send(Ok(0)); } } diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 4602a334c9..88d10c9509 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -42,7 +42,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true console = "0.15" -crossbeam-utils = "0.8.14" +crossbeam-utils.workspace = true datafusion = { workspace = true, optional = true } datafusion-common = { workspace = true, optional = true } datafusion-expr = { workspace = true, optional = true }