diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index a8182353aa..29d836385a 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -33,7 +33,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTIO use crate::region::RegionRoleState; use crate::request::{ BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx, - RegionEditResult, WorkerRequest, WorkerRequestWithTime, + RegionEditResult, Waiters, WorkerRequest, WorkerRequestWithTime, }; use crate::sst::file::FileMeta; use crate::worker::WorkerListener; @@ -162,7 +162,7 @@ impl CompactionTaskImpl { region_id, notify: BackgroundNotify::RegionEdit(RegionEditResult { region_id, - sender: expire_delete_sender, + waiters: Waiters::One(expire_delete_sender), edit, result: Ok(()), update_region_state: false, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index d4c52c7162..d1f322980c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -455,11 +455,7 @@ impl MitoEngine { ); let (tx, rx) = oneshot::channel(); - let request = WorkerRequest::EditRegion(RegionEditRequest { - region_id, - edit, - tx, - }); + let request = WorkerRequest::EditRegion(RegionEditRequest::new(region_id, edit, tx)); self.inner .workers .submit_to_worker(region_id, request) diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 84bed66e38..e05e1a847a 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::num::NonZeroU64; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -239,13 +240,8 @@ async fn test_write_during_region_editing_is_queued() { let edit_engine = engine.clone(); let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); - tokio::time::timeout(Duration::from_secs(3), async { - while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .unwrap(); + wait_until_region_is_in_editing(®ion).await; + drain_worker_recv_events(&mut request_rx); let write_engine = engine.clone(); @@ -280,12 +276,7 @@ async fn test_write_during_region_editing_is_queued() { assert_eq!(1, output.affected_rows); second_edit_task.await.unwrap().unwrap(); - let second_file_sequence = region.version().ssts.levels()[0] - .files - .iter() - .find(|(file_id, _)| **file_id == second_file_id) - .and_then(|(_, file)| file.meta_ref().sequence) - .map(|sequence| sequence.get()); + let second_file_sequence = region_file_sequence(®ion, second_file_id); assert_eq!(Some(3), second_file_sequence); } @@ -310,13 +301,8 @@ async fn test_bulk_insert_during_region_editing_is_queued() { let edit_engine = engine.clone(); let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); - tokio::time::timeout(Duration::from_secs(3), async { - while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .unwrap(); + wait_until_region_is_in_editing(®ion).await; + drain_worker_recv_events(&mut request_rx); let bulk_engine = engine.clone(); @@ -344,12 +330,7 @@ async fn test_bulk_insert_during_region_editing_is_queued() { assert_eq!(1, output.affected_rows); second_edit_task.await.unwrap().unwrap(); - let second_file_sequence = region.version().ssts.levels()[0] - .files - .iter() - .find(|(file_id, _)| **file_id == second_file_id) - .and_then(|(_, file)| file.meta_ref().sequence) - .map(|sequence| sequence.get()); + let second_file_sequence = region_file_sequence(®ion, second_file_id); assert_eq!(Some(3), second_file_sequence); } @@ -373,13 +354,7 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() { let edit_engine = engine.clone(); let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); - tokio::time::timeout(Duration::from_secs(3), async { - while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { - tokio::time::sleep(Duration::from_millis(10)).await; - } - }) - .await - .unwrap(); + wait_until_region_is_in_editing(®ion).await; drain_worker_recv_events(&mut request_rx); @@ -436,6 +411,142 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() { assert!(edit_task.await.unwrap().is_err()); } +#[tokio::test] +async fn test_raw_add_only_region_edits_are_merged() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + // Hold the manifest lock so the first edit keeps the region in `Editing`. + // The following two edits will be queued in `RegionEditQueue`. + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + + let first_file_id = FileId::random(); + let first_edit = test_region_edit(region.region_id, first_file_id); + let first_edit_engine = engine.clone(); + let first_edit_task = + tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await }); + + wait_until_region_is_in_editing(®ion).await; + + drain_worker_recv_events(&mut request_rx); + + // Both queued edits are raw add-only edits, so they should be merged into + // a single edit and share the same committed sequence. + let second_file_id = FileId::random(); + let second_edit = test_region_edit(region.region_id, second_file_id); + let second_edit_engine = engine.clone(); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + let third_file_id = FileId::random(); + let third_edit = test_region_edit(region.region_id, third_file_id); + let third_edit_engine = engine.clone(); + let third_edit_task = + tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + + first_edit_task.await.unwrap().unwrap(); + second_edit_task.await.unwrap().unwrap(); + third_edit_task.await.unwrap().unwrap(); + + // The first edit gets sequence 1. The merged queued edit gets sequence 2, + // so both files from the second and third requests share sequence 2. + assert_eq!(Some(1), region_file_sequence(®ion, first_file_id)); + assert_eq!(Some(2), region_file_sequence(®ion, second_file_id)); + assert_eq!(Some(2), region_file_sequence(®ion, third_file_id)); +} + +#[tokio::test] +async fn test_region_edit_with_file_sequence_is_not_merged() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + // Hold the manifest lock so the first edit keeps the region in `Editing`. + // The following two edits will be queued in `RegionEditQueue`. + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + + let first_file_id = FileId::random(); + let first_edit = test_region_edit(region.region_id, first_file_id); + let first_edit_engine = engine.clone(); + let first_edit_task = + tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await }); + + wait_until_region_is_in_editing(®ion).await; + + drain_worker_recv_events(&mut request_rx); + + let second_file_id = FileId::random(); + let mut second_edit = test_region_edit(region.region_id, second_file_id); + // A file that already carries a sequence is not a raw add-only edit and + // must not be merged with the next queued edit. + second_edit.files_to_add[0].sequence = NonZeroU64::new(99); + let second_edit_engine = engine.clone(); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + let third_file_id = FileId::random(); + let third_edit = test_region_edit(region.region_id, third_file_id); + let third_edit_engine = engine.clone(); + let third_edit_task = + tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + + first_edit_task.await.unwrap().unwrap(); + second_edit_task.await.unwrap().unwrap(); + third_edit_task.await.unwrap().unwrap(); + + // Because the second edit is not mergeable, the third edit runs separately + // and receives the next committed sequence. + assert_eq!(Some(1), region_file_sequence(®ion, first_file_id)); + assert_eq!(Some(2), region_file_sequence(®ion, second_file_id)); + assert_eq!(Some(3), region_file_sequence(®ion, third_file_id)); +} + +async fn wait_until_region_is_in_editing(region: &MitoRegionRef) { + tokio::time::timeout(Duration::from_secs(3), async { + while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); +} + +fn region_file_sequence(region: &MitoRegionRef, file_id: FileId) -> Option { + region.version().ssts.levels()[0] + .files + .iter() + .find(|(id, _)| **id == file_id) + .and_then(|(_, file)| file.meta_ref().sequence) + .map(|sequence| sequence.get()) +} + struct RecvRequestListener { tx: mpsc::UnboundedSender, } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 9ae748205e..1c17a22a7a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -542,6 +542,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to edit region {}", region_id))] + EditRegion { + region_id: RegionId, + source: Arc, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Failed to compat readers for region {}, reason: {}", region_id, @@ -1355,6 +1363,7 @@ impl ErrorExt for Error { RegionTruncated { .. } => StatusCode::Cancelled, RejectWrite { .. } => StatusCode::StorageUnavailable, CompactRegion { source, .. } => source.status_code(), + EditRegion { source, .. } => source.status_code(), CompatReader { .. } => StatusCode::Unexpected, InvalidRegionRequest { source, .. } => source.status_code(), RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady, diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index fefac39e6a..f6388f8b3a 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -1100,13 +1100,67 @@ pub(crate) struct CopyRegionFromFinished { pub(crate) sender: Sender>, } +#[derive(Debug, Default)] +pub(crate) enum Waiters { + #[default] + None, + One(Sender>), + Many(Vec>>), +} + +impl Waiters { + pub(crate) fn reply_with Result<()>>(self, f: F) { + match self { + Self::None => {} + Self::One(tx) => { + let _ = tx.send(f()); + } + Self::Many(txs) => { + for tx in txs { + let _ = tx.send(f()); + } + } + } + } + + pub(crate) fn merge(&mut self, other: Self) { + let zelf = std::mem::take(self); + *self = match (zelf, other) { + (Waiters::None, x) | (x, Waiters::None) => x, + (Waiters::One(x), Waiters::One(y)) => Waiters::Many(vec![x, y]), + (Waiters::One(x), Waiters::Many(mut ys)) => { + ys.insert(0, x); + Waiters::Many(ys) + } + (Waiters::Many(mut xs), Waiters::One(y)) => { + xs.push(y); + Waiters::Many(xs) + } + (Waiters::Many(mut xs), Waiters::Many(ys)) => { + xs.extend(ys); + Waiters::Many(xs) + } + }; + } +} + /// Request to edit a region directly. #[derive(Debug)] pub(crate) struct RegionEditRequest { pub(crate) region_id: RegionId, pub(crate) edit: RegionEdit, - /// The sender to notify the result to the region engine. - pub(crate) tx: Sender>, + /// The waiters that are waiting for this region edit's result. + pub(crate) waiters: Waiters, +} + +impl RegionEditRequest { + pub(crate) fn new(region_id: RegionId, edit: RegionEdit, waiter: Sender>) -> Self { + Self { + region_id, + edit, + waiters: Waiters::One(waiter), + } + } } /// Notifies the regin the result of editing region. @@ -1114,12 +1168,12 @@ pub(crate) struct RegionEditRequest { pub(crate) struct RegionEditResult { /// Region id. pub(crate) region_id: RegionId, - /// Result sender. - pub(crate) sender: Sender>, + /// Result waiters. + pub(crate) waiters: Waiters, /// Region edit to apply. pub(crate) edit: RegionEdit, /// Result from the manifest manager. - pub(crate) result: Result<()>, + pub(crate) result: std::result::Result<(), Arc>, /// Whether region state need to be set to Writable after handling this request. pub(crate) update_region_state: bool, /// The region is in staging mode before handling this request. @@ -1212,6 +1266,58 @@ mod tests { } } + fn waiter() -> (Sender>, Receiver>) { + oneshot::channel() + } + + fn assert_waiter_ok(rx: &mut Receiver>) { + rx.try_recv().unwrap().unwrap(); + } + + #[test] + fn test_waiters_reply_with_single_waiter() { + let (tx, mut rx) = waiter(); + Waiters::One(tx).reply_with(|| Ok(())); + assert_waiter_ok(&mut rx); + } + + #[test] + fn test_waiters_reply_with_many_waiters() { + let (tx1, mut rx1) = waiter(); + let (tx2, mut rx2) = waiter(); + let (tx3, mut rx3) = waiter(); + + Waiters::Many(vec![tx1, tx2, tx3]).reply_with(|| Ok(())); + + assert_waiter_ok(&mut rx1); + assert_waiter_ok(&mut rx2); + assert_waiter_ok(&mut rx3); + } + + #[test] + fn test_waiters_merge() { + let (tx1, mut rx1) = waiter(); + let (tx2, mut rx2) = waiter(); + let (tx3, mut rx3) = waiter(); + let (tx4, mut rx4) = waiter(); + + let mut waiters = Waiters::One(tx1); + waiters.merge(Waiters::One(tx2)); + waiters.merge(Waiters::Many(vec![tx3, tx4])); + + let Waiters::Many(txs) = &waiters else { + panic!("expected many waiters after merging"); + }; + assert_eq!(4, txs.len()); + + waiters.reply_with(|| Ok(())); + + assert_waiter_ok(&mut rx1); + assert_waiter_ok(&mut rx2); + assert_waiter_ok(&mut rx3); + assert_waiter_ok(&mut rx4); + } + #[test] fn test_write_request_duplicate_column() { let rows = Rows { diff --git a/src/mito2/src/worker/handle_apply_staging.rs b/src/mito2/src/worker/handle_apply_staging.rs index 876d5c3c31..1150542be4 100644 --- a/src/mito2/src/worker/handle_apply_staging.rs +++ b/src/mito2/src/worker/handle_apply_staging.rs @@ -136,11 +136,7 @@ impl RegionWorkerLoop { ); let _ = worker_sender .send(WorkerRequestWithTime::new(WorkerRequest::EditRegion( - RegionEditRequest { - region_id: region.region_id, - edit, - tx, - }, + RegionEditRequest::new(region_id, edit, tx), ))) .await; diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 40366f9ccd..6215e52b07 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -20,8 +20,9 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZeroU64; use std::sync::Arc; -use common_telemetry::{info, warn}; +use common_telemetry::{debug, info, warn}; use parquet::file::metadata::PageIndexPolicy; +use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; @@ -29,7 +30,7 @@ use store_api::storage::RegionId; use crate::cache::CacheManagerRef; use crate::cache::file_cache::{FileType, IndexKey}; use crate::config::IndexBuildMode; -use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; +use crate::error::{EditRegionSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; @@ -73,19 +74,64 @@ impl RegionEditQueue { fn enqueue(&mut self, request: RegionEditRequest) { if self.requests.len() > Self::QUEUE_MAX_LEN { - let _ = request.tx.send( + request.waiters.reply_with(|| { RegionBusySnafu { region_id: self.region_id, } - .fail(), - ); + .fail() + }); return; }; self.requests.push_back(request); } - fn dequeue(&mut self) -> Option { - self.requests.pop_front() + fn dequeue(&mut self, allow_merge_edits: bool) -> Option { + if !allow_merge_edits { + return self.requests.pop_front(); + } + + fn can_merge(edit: &RegionEdit) -> bool { + // Only the `RegionEdit`: + // 1. contains the "raw" (file without a sequence) files to add, + // 2. and no `committed_sequence`, + // 3. and all other fields are empty, + // can it be merged. + // + // However, merging them means they will all share a same sequence, and if there are + // overlapping data in the files, the dedup is uncertain. This is a caution that must + // be noticed for editing region. + edit.files_to_add.iter().all(|f| f.sequence.is_none()) + && edit.files_to_remove.is_empty() + && edit.timestamp_ms.is_none() + && edit.compaction_time_window.is_none() + && edit.flushed_entry_id.is_none() + && edit.flushed_sequence.is_none() + && edit.committed_sequence.is_none() + } + + let mut merged = self.requests.pop_front()?; + if !can_merge(&merged.edit) { + return Some(merged); + } + + while let Some(request) = self + .requests + .pop_front_if(|request| can_merge(&request.edit)) + { + merged.edit.files_to_add.extend(request.edit.files_to_add); + merged.waiters.merge(request.waiters); + } + debug!( + "the files to add: [{}] are merged in one edit", + merged + .edit + .files_to_add + .iter() + .map(|x| x.file_id.to_string()) + .collect::>() + .join(", ") + ); + Some(merged) } fn is_empty(&self) -> bool { @@ -94,12 +140,12 @@ impl RegionEditQueue { fn reject_all_as_not_found(mut self) { while let Some(request) = self.requests.pop_front() { - let _ = request.tx.send( + request.waiters.reply_with(|| { RegionNotFoundSnafu { region_id: self.region_id, } - .fail(), - ); + .fail() + }); } } } @@ -243,7 +289,9 @@ impl RegionWorkerLoop { pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) { let region_id = request.region_id; let Some(region) = self.regions.get_region(region_id) else { - let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail()); + request + .waiters + .reply_with(|| RegionNotFoundSnafu { region_id }.fail()); return; }; @@ -254,7 +302,9 @@ impl RegionWorkerLoop { .or_insert_with(|| RegionEditQueue::new(region_id)) .enqueue(request); } else { - let _ = request.tx.send(RegionBusySnafu { region_id }.fail()); + request + .waiters + .reply_with(|| RegionBusySnafu { region_id }.fail()); } return; } @@ -262,7 +312,7 @@ impl RegionWorkerLoop { let RegionEditRequest { region_id: _, mut edit, - tx: sender, + waiters, } = request; let file_sequence = region.version_control.committed_sequence() + 1; edit.committed_sequence = Some(file_sequence); @@ -281,7 +331,8 @@ impl RegionWorkerLoop { }; // Marks the region as editing. if let Err(e) = region.set_editing(expect_state) { - let _ = sender.send(Err(e)); + let e = Arc::new(e); + waiters.reply_with(|| Err(e.clone()).context(EditRegionSnafu { region_id })); return; } @@ -291,13 +342,14 @@ impl RegionWorkerLoop { // Now the region is in editing state. // Updates manifest in background. common_runtime::spawn_global(async move { - let result = - edit_region(®ion, edit.clone(), cache_manager, listener, is_staging).await; + let result = edit_region(®ion, edit.clone(), cache_manager, listener, is_staging) + .await + .map_err(Arc::new); let notify = WorkerRequest::Background { region_id, notify: BackgroundNotify::RegionEdit(RegionEditResult { region_id, - sender, + waiters, edit, result, // we always need to restore region state after region edit @@ -328,12 +380,13 @@ impl RegionWorkerLoop { // edit-completion notification reached the worker. self.fail_region_stalled_requests_as_not_found(&edit_result.region_id); self.reject_region_edit_queue_as_not_found(edit_result.region_id); - let _ = edit_result.sender.send( + + edit_result.waiters.reply_with(|| { RegionNotFoundSnafu { region_id: edit_result.region_id, } - .fail(), - ); + .fail() + }); return; } }; @@ -365,7 +418,14 @@ impl RegionWorkerLoop { need_compaction }; - let _ = edit_result.sender.send(edit_result.result); + edit_result + .waiters + .reply_with(|| match &edit_result.result { + Ok(()) => Ok(()), + Err(e) => Err(e.clone()).context(EditRegionSnafu { + region_id: edit_result.region_id, + }), + }); if edit_result.update_region_state { // Writes stalled specifically by this edit are handled before the next queued edit. @@ -374,9 +434,14 @@ impl RegionWorkerLoop { .await; } + let has_pending_stalled_writes = self + .stalled_requests + .requests + .contains_key(&edit_result.region_id); + let allow_merge_edits = !has_pending_stalled_writes && !region.is_staging(); let next_request = if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) { - let request = edit_queue.dequeue(); + let request = edit_queue.dequeue(allow_merge_edits); if edit_queue.is_empty() { self.region_edit_queues.remove(&edit_result.region_id); }