diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index a8182353aa..e48866b275 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -33,7 +33,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTIO use crate::region::RegionRoleState; use crate::request::{ BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx, - RegionEditResult, WorkerRequest, WorkerRequestWithTime, + RegionEditResult, Waiters, WorkerRequest, WorkerRequestWithTime, }; use crate::sst::file::FileMeta; use crate::worker::WorkerListener; @@ -162,7 +162,7 @@ impl CompactionTaskImpl { region_id, notify: BackgroundNotify::RegionEdit(RegionEditResult { region_id, - sender: expire_delete_sender, + waiters: Waiters::one(expire_delete_sender), edit, result: Ok(()), update_region_state: false, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 57ea9b2986..ec38dec105 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -455,12 +455,7 @@ impl MitoEngine { ); let (tx, rx) = oneshot::channel(); - let request = WorkerRequest::EditRegion(RegionEditRequest { - region_id, - edit, - tx, - preload_sst_cache: true, - }); + let request = WorkerRequest::EditRegion(RegionEditRequest::new(region_id, edit, true, tx)); self.inner .workers .submit_to_worker(region_id, request) diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 84bed66e38..e05e1a847a 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroU64; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -239,13 +240,8 @@ async fn test_write_during_region_editing_is_queued() { let edit_engine = engine.clone(); let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); - tokio::time::timeout(Duration::from_secs(3), async { - while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .unwrap(); + wait_until_region_is_in_editing(®ion).await; + drain_worker_recv_events(&mut request_rx); let write_engine = engine.clone(); @@ -280,12 +276,7 @@ async fn test_write_during_region_editing_is_queued() { assert_eq!(1, output.affected_rows); second_edit_task.await.unwrap().unwrap(); - let second_file_sequence = region.version().ssts.levels()[0] - .files - .iter() - .find(|(file_id, _)| **file_id == second_file_id) - .and_then(|(_, file)| file.meta_ref().sequence) - .map(|sequence| sequence.get()); + let second_file_sequence = region_file_sequence(®ion, second_file_id); assert_eq!(Some(3), second_file_sequence); } @@ -310,13 +301,8 @@ async fn test_bulk_insert_during_region_editing_is_queued() { let edit_engine = engine.clone(); let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); - tokio::time::timeout(Duration::from_secs(3), async { - while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .unwrap(); + wait_until_region_is_in_editing(®ion).await; + drain_worker_recv_events(&mut request_rx); let bulk_engine = engine.clone(); @@ -344,12 +330,7 @@ async fn test_bulk_insert_during_region_editing_is_queued() { assert_eq!(1, output.affected_rows); second_edit_task.await.unwrap().unwrap(); - let second_file_sequence = region.version().ssts.levels()[0] - .files - .iter() - .find(|(file_id, _)| **file_id == second_file_id) - .and_then(|(_, file)| file.meta_ref().sequence) - .map(|sequence| sequence.get()); + let second_file_sequence = region_file_sequence(®ion, second_file_id); assert_eq!(Some(3), second_file_sequence); } @@ -373,13 +354,7 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() { let edit_engine = engine.clone(); let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); - tokio::time::timeout(Duration::from_secs(3), async { - while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .unwrap(); + wait_until_region_is_in_editing(®ion).await; drain_worker_recv_events(&mut request_rx); @@ -436,6 +411,142 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() { assert!(edit_task.await.unwrap().is_err()); } +#[tokio::test] +async fn test_raw_add_only_region_edits_are_merged() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + // Hold the manifest lock so the first edit keeps the region in `Editing`. + // The following two edits will be queued in `RegionEditQueue`. + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + + let first_file_id = FileId::random(); + let first_edit = test_region_edit(region.region_id, first_file_id); + let first_edit_engine = engine.clone(); + let first_edit_task = + tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await }); + + wait_until_region_is_in_editing(®ion).await; + + drain_worker_recv_events(&mut request_rx); + + // Both queued edits are raw add-only edits, so they should be merged into + // a single edit and share the same committed sequence. + let second_file_id = FileId::random(); + let second_edit = test_region_edit(region.region_id, second_file_id); + let second_edit_engine = engine.clone(); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + let third_file_id = FileId::random(); + let third_edit = test_region_edit(region.region_id, third_file_id); + let third_edit_engine = engine.clone(); + let third_edit_task = + tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + + first_edit_task.await.unwrap().unwrap(); + second_edit_task.await.unwrap().unwrap(); + third_edit_task.await.unwrap().unwrap(); + + // The first edit gets sequence 1. The merged queued edit gets sequence 2, + // so both files from the second and third requests share sequence 2. + assert_eq!(Some(1), region_file_sequence(®ion, first_file_id)); + assert_eq!(Some(2), region_file_sequence(®ion, second_file_id)); + assert_eq!(Some(2), region_file_sequence(®ion, third_file_id)); +} + +#[tokio::test] +async fn test_region_edit_with_file_sequence_is_not_merged() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + // Hold the manifest lock so the first edit keeps the region in `Editing`. + // The following two edits will be queued in `RegionEditQueue`. + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + + let first_file_id = FileId::random(); + let first_edit = test_region_edit(region.region_id, first_file_id); + let first_edit_engine = engine.clone(); + let first_edit_task = + tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await }); + + wait_until_region_is_in_editing(®ion).await; + + drain_worker_recv_events(&mut request_rx); + + let second_file_id = FileId::random(); + let mut second_edit = test_region_edit(region.region_id, second_file_id); + // A file that already carries a sequence is not a raw add-only edit and + // must not be merged with the next queued edit. + second_edit.files_to_add[0].sequence = NonZeroU64::new(99); + let second_edit_engine = engine.clone(); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + let third_file_id = FileId::random(); + let third_edit = test_region_edit(region.region_id, third_file_id); + let third_edit_engine = engine.clone(); + let third_edit_task = + tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + + first_edit_task.await.unwrap().unwrap(); + second_edit_task.await.unwrap().unwrap(); + third_edit_task.await.unwrap().unwrap(); + + // Because the second edit is not mergeable, the third edit runs separately + // and receives the next committed sequence. + assert_eq!(Some(1), region_file_sequence(®ion, first_file_id)); + assert_eq!(Some(2), region_file_sequence(®ion, second_file_id)); + assert_eq!(Some(3), region_file_sequence(®ion, third_file_id)); +} + +async fn wait_until_region_is_in_editing(region: &MitoRegionRef) { + tokio::time::timeout(Duration::from_secs(3), async { + while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); +} + +fn region_file_sequence(region: &MitoRegionRef, file_id: FileId) -> Option { + region.version().ssts.levels()[0] + .files + .iter() + .find(|(id, _)| **id == file_id) + .and_then(|(_, file)| file.meta_ref().sequence) + .map(|sequence| sequence.get()) +} + struct RecvRequestListener { tx: mpsc::UnboundedSender, } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 189354f1da..3571f7c0c4 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -542,6 +542,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to edit region {}", region_id))] + EditRegion { + region_id: RegionId, + source: Arc, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to compat readers for region {}, reason: {}", region_id, @@ -1377,6 +1385,7 @@ impl ErrorExt for Error { RegionTruncated { .. } => StatusCode::Cancelled, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), + EditRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4c50d340e8..8ae05177da 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -1100,6 +1100,27 @@ pub(crate) struct CopyRegionFromFinished { pub(crate) sender: Sender>, } +#[derive(Debug, Default)] +pub(crate) struct Waiters(SmallVec<[Sender>; 1]>); + +impl Waiters { + pub(crate) fn one(waiter: Sender>) -> Self { + let mut waiters = SmallVec::new(); + waiters.push(waiter); + Self(waiters) + } + + pub(crate) fn reply_with Result<()>>(self, f: F) { + for tx in self.0 { + let _ = tx.send(f()); + } + } + + pub(crate) fn merge(&mut self, other: Self) { + self.0.extend(other.0); + } +} + /// Request to edit a region directly. #[derive(Debug)] pub(crate) struct RegionEditRequest { @@ -1107,8 +1128,24 @@ pub(crate) struct RegionEditRequest { pub(crate) edit: RegionEdit, /// Whether to preload SST files into the write cache. pub(crate) preload_sst_cache: bool, - /// The sender to notify the result to the region engine. - pub(crate) tx: Sender>, + /// The waiters that are waiting for this region edit's result. + pub(crate) waiters: Waiters, +} + +impl RegionEditRequest { + pub(crate) fn new( + region_id: RegionId, + edit: RegionEdit, + preload_sst_cache: bool, + waiter: Sender>, + ) -> Self { + Self { + region_id, + edit, + preload_sst_cache, + waiters: Waiters::one(waiter), + } + } } /// Notifies the regin the result of editing region. @@ -1116,12 +1153,12 @@ pub(crate) struct RegionEditRequest { pub(crate) struct RegionEditResult { /// Region id. pub(crate) region_id: RegionId, - /// Result sender. - pub(crate) sender: Sender>, + /// Result waiters. + pub(crate) waiters: Waiters, /// Region edit to apply. pub(crate) edit: RegionEdit, /// Result from the manifest manager. - pub(crate) result: Result<()>, + pub(crate) result: std::result::Result<(), Arc>, /// Whether region state need to be set to Writable after handling this request. pub(crate) update_region_state: bool, /// The region is in staging mode before handling this request. @@ -1214,6 +1251,55 @@ mod tests { } } + fn waiter() -> (Sender>, Receiver>) { + oneshot::channel() + } + + fn assert_waiter_ok(rx: &mut Receiver>) { + rx.try_recv().unwrap().unwrap(); + } + + #[test] + fn test_waiters_reply_with_single_waiter() { + let (tx, mut rx) = waiter(); + Waiters::one(tx).reply_with(|| Ok(())); + assert_waiter_ok(&mut rx); + } + + #[test] + fn test_waiters_reply_with_many_waiters() { + let (tx1, mut rx1) = waiter(); + let (tx2, mut rx2) = waiter(); + let (tx3, mut rx3) = waiter(); + + let waiters = Waiters(vec![tx1, tx2, tx3].into()); + waiters.reply_with(|| Ok(())); + + assert_waiter_ok(&mut rx1); + assert_waiter_ok(&mut rx2); + assert_waiter_ok(&mut rx3); + } + + #[test] + fn test_waiters_merge() { + let (tx1, mut rx1) = waiter(); + let (tx2, mut rx2) = waiter(); + let (tx3, mut rx3) = waiter(); + let (tx4, mut rx4) = waiter(); + + let mut waiters = Waiters::one(tx1); + waiters.merge(Waiters::one(tx2)); + waiters.merge(Waiters(vec![tx3, tx4].into())); + assert_eq!(4, waiters.0.len()); + + waiters.reply_with(|| Ok(())); + + assert_waiter_ok(&mut rx1); + assert_waiter_ok(&mut rx2); + assert_waiter_ok(&mut rx3); + assert_waiter_ok(&mut rx4); + } + #[test] fn test_write_request_duplicate_column() { let rows = Rows { diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index b183fed9c1..dee66752cf 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -136,13 +136,11 @@ impl RegionWorkerLoop { ); let _ = worker_sender .send(WorkerRequestWithTime::new(WorkerRequest::EditRegion( - RegionEditRequest { - region_id: region.region_id, - edit, + RegionEditRequest::new( + region_id, edit, // we don't need to preload sst cache during repartition, as it may cause extra network overhead. - preload_sst_cache: false, - tx, - }, + false, tx, + ), ))) .await; diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 306bdffee9..76b2649043 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -20,8 +20,9 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZeroU64; use std::sync::Arc; -use common_telemetry::{info, warn}; +use common_telemetry::{debug, info, warn}; use parquet::file::metadata::PageIndexPolicy; +use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -29,7 +30,7 @@ use store_api::storage::RegionId; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileType, IndexKey}; use crate::config::IndexBuildMode; -use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; +use crate::error::{EditRegionSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; @@ -73,19 +74,60 @@ impl RegionEditQueue { fn enqueue(&mut self, request: RegionEditRequest) { if self.requests.len() > Self::QUEUE_MAX_LEN { - let _ = request.tx.send( + request.waiters.reply_with(|| { RegionBusySnafu { region_id: self.region_id, } - .fail(), - ); + .fail() + }); return; }; self.requests.push_back(request); } fn dequeue(&mut self) -> Option { - self.requests.pop_front() + fn can_merge(edit: &RegionEdit) -> bool { + // Only the `RegionEdit`: + // 1. contains the "raw" (file without a sequence) files to add, + // 2. and no `committed_sequence`, + // 3. and all other fields are empty, + // can it be merged. + // + // However, merging them means they will all share a same sequence, and if there are + // overlapping data in the files, the dedup is uncertain. This is a caution that must + // be noticed for editing region. + edit.files_to_add.iter().all(|f| f.sequence.is_none()) + && edit.files_to_remove.is_empty() + && edit.timestamp_ms.is_none() + && edit.compaction_time_window.is_none() + && edit.flushed_entry_id.is_none() + && edit.flushed_sequence.is_none() + && edit.committed_sequence.is_none() + } + + let mut merged = self.requests.pop_front()?; + if !can_merge(&merged.edit) { + return Some(merged); + } + + while let Some(request) = self + .requests + .pop_front_if(|request| can_merge(&request.edit)) + { + merged.edit.files_to_add.extend(request.edit.files_to_add); + merged.waiters.merge(request.waiters); + } + debug!( + "the files to add: [{}] are merged in one edit", + merged + .edit + .files_to_add + .iter() + .map(|x| x.file_id.to_string()) + .collect::>() + .join(", ") + ); + Some(merged) } fn is_empty(&self) -> bool { @@ -94,12 +136,12 @@ impl RegionEditQueue { fn reject_all_as_not_found(mut self) { while let Some(request) = self.requests.pop_front() { - let _ = request.tx.send( + request.waiters.reply_with(|| { RegionNotFoundSnafu { region_id: self.region_id, } - .fail(), - ); + .fail() + }); } } } @@ -243,7 +285,9 @@ impl RegionWorkerLoop { pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) { let region_id = request.region_id; let Some(region) = self.regions.get_region(region_id) else { - let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail()); + request + .waiters + .reply_with(|| RegionNotFoundSnafu { region_id }.fail()); return; }; @@ -254,7 +298,9 @@ impl RegionWorkerLoop { .or_insert_with(|| RegionEditQueue::new(region_id)) .enqueue(request); } else { - let _ = request.tx.send(RegionBusySnafu { region_id }.fail()); + request + .waiters + .reply_with(|| RegionBusySnafu { region_id }.fail()); } return; } @@ -262,7 +308,7 @@ impl RegionWorkerLoop { let RegionEditRequest { region_id: _, mut edit, - tx: sender, + waiters, preload_sst_cache, } = request; let file_sequence = region.version_control.committed_sequence() + 1; @@ -282,7 +328,8 @@ impl RegionWorkerLoop { }; // Marks the region as editing. if let Err(e) = region.set_editing(expect_state) { - let _ = sender.send(Err(e)); + let e = Arc::new(e); + waiters.reply_with(|| Err(e.clone()).context(EditRegionSnafu { region_id })); return; } @@ -300,12 +347,13 @@ impl RegionWorkerLoop { is_staging, preload_sst_cache, ) - .await; + .await + .map_err(Arc::new); let notify = WorkerRequest::Background { region_id, notify: BackgroundNotify::RegionEdit(RegionEditResult { region_id, - sender, + waiters, edit, result, // we always need to restore region state after region edit @@ -336,12 +384,13 @@ impl RegionWorkerLoop { // edit-completion notification reached the worker. self.fail_region_stalled_requests_as_not_found(&edit_result.region_id); self.reject_region_edit_queue_as_not_found(edit_result.region_id); - let _ = edit_result.sender.send( + + edit_result.waiters.reply_with(|| { RegionNotFoundSnafu { region_id: edit_result.region_id, } - .fail(), - ); + .fail() + }); return; } }; @@ -373,7 +422,14 @@ impl RegionWorkerLoop { need_compaction }; - let _ = edit_result.sender.send(edit_result.result); + edit_result + .waiters + .reply_with(|| match &edit_result.result { + Ok(()) => Ok(()), + Err(e) => Err(e.clone()).context(EditRegionSnafu { + region_id: edit_result.region_id, + }), + }); if edit_result.update_region_state { // Writes stalled specifically by this edit are handled before the next queued edit.