feat: merge files to add in one region edit (#8141)

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2026-05-21 18:55:10 +08:00
committed by GitHub
parent 13fe5bc8a3
commit 4668dd43bd
7 changed files with 326 additions and 71 deletions

View File

@@ -33,7 +33,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTIO
use crate::region::RegionRoleState;
use crate::request::{
BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
RegionEditResult, WorkerRequest, WorkerRequestWithTime,
RegionEditResult, Waiters, WorkerRequest, WorkerRequestWithTime,
};
use crate::sst::file::FileMeta;
use crate::worker::WorkerListener;
@@ -162,7 +162,7 @@ impl CompactionTaskImpl {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
region_id,
sender: expire_delete_sender,
waiters: Waiters::one(expire_delete_sender),
edit,
result: Ok(()),
update_region_state: false,

View File

@@ -455,12 +455,7 @@ impl MitoEngine {
);
let (tx, rx) = oneshot::channel();
let request = WorkerRequest::EditRegion(RegionEditRequest {
region_id,
edit,
tx,
preload_sst_cache: true,
});
let request = WorkerRequest::EditRegion(RegionEditRequest::new(region_id, edit, true, tx));
self.inner
.workers
.submit_to_worker(region_id, request)

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroU64;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -239,13 +240,8 @@ async fn test_write_during_region_editing_is_queued() {
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
let write_engine = engine.clone();
@@ -280,12 +276,7 @@ async fn test_write_during_region_editing_is_queued() {
assert_eq!(1, output.affected_rows);
second_edit_task.await.unwrap().unwrap();
let second_file_sequence = region.version().ssts.levels()[0]
.files
.iter()
.find(|(file_id, _)| **file_id == second_file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get());
let second_file_sequence = region_file_sequence(&region, second_file_id);
assert_eq!(Some(3), second_file_sequence);
}
@@ -310,13 +301,8 @@ async fn test_bulk_insert_during_region_editing_is_queued() {
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
let bulk_engine = engine.clone();
@@ -344,12 +330,7 @@ async fn test_bulk_insert_during_region_editing_is_queued() {
assert_eq!(1, output.affected_rows);
second_edit_task.await.unwrap().unwrap();
let second_file_sequence = region.version().ssts.levels()[0]
.files
.iter()
.find(|(file_id, _)| **file_id == second_file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get());
let second_file_sequence = region_file_sequence(&region, second_file_id);
assert_eq!(Some(3), second_file_sequence);
}
@@ -373,13 +354,7 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() {
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
@@ -436,6 +411,142 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() {
assert!(edit_task.await.unwrap().is_err());
}
#[tokio::test]
async fn test_raw_add_only_region_edits_are_merged() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
// Hold the manifest lock so the first edit keeps the region in `Editing`.
// The following two edits will be queued in `RegionEditQueue`.
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let first_file_id = FileId::random();
let first_edit = test_region_edit(region.region_id, first_file_id);
let first_edit_engine = engine.clone();
let first_edit_task =
tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await });
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
// Both queued edits are raw add-only edits, so they should be merged into
// a single edit and share the same committed sequence.
let second_file_id = FileId::random();
let second_edit = test_region_edit(region.region_id, second_file_id);
let second_edit_engine = engine.clone();
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
let third_file_id = FileId::random();
let third_edit = test_region_edit(region.region_id, third_file_id);
let third_edit_engine = engine.clone();
let third_edit_task =
tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await });
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
first_edit_task.await.unwrap().unwrap();
second_edit_task.await.unwrap().unwrap();
third_edit_task.await.unwrap().unwrap();
// The first edit gets sequence 1. The merged queued edit gets sequence 2,
// so both files from the second and third requests share sequence 2.
assert_eq!(Some(1), region_file_sequence(&region, first_file_id));
assert_eq!(Some(2), region_file_sequence(&region, second_file_id));
assert_eq!(Some(2), region_file_sequence(&region, third_file_id));
}
#[tokio::test]
async fn test_region_edit_with_file_sequence_is_not_merged() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
// Hold the manifest lock so the first edit keeps the region in `Editing`.
// The following two edits will be queued in `RegionEditQueue`.
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let first_file_id = FileId::random();
let first_edit = test_region_edit(region.region_id, first_file_id);
let first_edit_engine = engine.clone();
let first_edit_task =
tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await });
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
let second_file_id = FileId::random();
let mut second_edit = test_region_edit(region.region_id, second_file_id);
// A file that already carries a sequence is not a raw add-only edit and
// must not be merged with the next queued edit.
second_edit.files_to_add[0].sequence = NonZeroU64::new(99);
let second_edit_engine = engine.clone();
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
let third_file_id = FileId::random();
let third_edit = test_region_edit(region.region_id, third_file_id);
let third_edit_engine = engine.clone();
let third_edit_task =
tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await });
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
first_edit_task.await.unwrap().unwrap();
second_edit_task.await.unwrap().unwrap();
third_edit_task.await.unwrap().unwrap();
// Because the second edit is not mergeable, the third edit runs separately
// and receives the next committed sequence.
assert_eq!(Some(1), region_file_sequence(&region, first_file_id));
assert_eq!(Some(2), region_file_sequence(&region, second_file_id));
assert_eq!(Some(3), region_file_sequence(&region, third_file_id));
}
async fn wait_until_region_is_in_editing(region: &MitoRegionRef) {
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}
fn region_file_sequence(region: &MitoRegionRef, file_id: FileId) -> Option<u64> {
region.version().ssts.levels()[0]
.files
.iter()
.find(|(id, _)| **id == file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get())
}
struct RecvRequestListener {
tx: mpsc::UnboundedSender<usize>,
}

View File

@@ -542,6 +542,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to edit region {}", region_id))]
EditRegion {
region_id: RegionId,
source: Arc<Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to compat readers for region {}, reason: {}",
region_id,
@@ -1377,6 +1385,7 @@ impl ErrorExt for Error {
RegionTruncated { .. } => StatusCode::Cancelled,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
EditRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,

View File

@@ -1100,6 +1100,27 @@ pub(crate) struct CopyRegionFromFinished {
pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
}
#[derive(Debug, Default)]
pub(crate) struct Waiters(SmallVec<[Sender<Result<()>>; 1]>);
impl Waiters {
pub(crate) fn one(waiter: Sender<Result<()>>) -> Self {
let mut waiters = SmallVec::new();
waiters.push(waiter);
Self(waiters)
}
pub(crate) fn reply_with<F: Fn() -> Result<()>>(self, f: F) {
for tx in self.0 {
let _ = tx.send(f());
}
}
pub(crate) fn merge(&mut self, other: Self) {
self.0.extend(other.0);
}
}
/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {
@@ -1107,8 +1128,24 @@ pub(crate) struct RegionEditRequest {
pub(crate) edit: RegionEdit,
/// Whether to preload SST files into the write cache.
pub(crate) preload_sst_cache: bool,
/// The sender to notify the result to the region engine.
pub(crate) tx: Sender<Result<()>>,
/// The waiters that are waiting for this region edit's result.
pub(crate) waiters: Waiters,
}
impl RegionEditRequest {
pub(crate) fn new(
region_id: RegionId,
edit: RegionEdit,
preload_sst_cache: bool,
waiter: Sender<Result<()>>,
) -> Self {
Self {
region_id,
edit,
preload_sst_cache,
waiters: Waiters::one(waiter),
}
}
}
/// Notifies the regin the result of editing region.
@@ -1116,12 +1153,12 @@ pub(crate) struct RegionEditRequest {
pub(crate) struct RegionEditResult {
/// Region id.
pub(crate) region_id: RegionId,
/// Result sender.
pub(crate) sender: Sender<Result<()>>,
/// Result waiters.
pub(crate) waiters: Waiters,
/// Region edit to apply.
pub(crate) edit: RegionEdit,
/// Result from the manifest manager.
pub(crate) result: Result<()>,
pub(crate) result: std::result::Result<(), Arc<Error>>,
/// Whether region state need to be set to Writable after handling this request.
pub(crate) update_region_state: bool,
/// The region is in staging mode before handling this request.
@@ -1214,6 +1251,55 @@ mod tests {
}
}
fn waiter() -> (Sender<Result<()>>, Receiver<Result<()>>) {
oneshot::channel()
}
fn assert_waiter_ok(rx: &mut Receiver<Result<()>>) {
rx.try_recv().unwrap().unwrap();
}
#[test]
fn test_waiters_reply_with_single_waiter() {
let (tx, mut rx) = waiter();
Waiters::one(tx).reply_with(|| Ok(()));
assert_waiter_ok(&mut rx);
}
#[test]
fn test_waiters_reply_with_many_waiters() {
let (tx1, mut rx1) = waiter();
let (tx2, mut rx2) = waiter();
let (tx3, mut rx3) = waiter();
let waiters = Waiters(vec![tx1, tx2, tx3].into());
waiters.reply_with(|| Ok(()));
assert_waiter_ok(&mut rx1);
assert_waiter_ok(&mut rx2);
assert_waiter_ok(&mut rx3);
}
#[test]
fn test_waiters_merge() {
let (tx1, mut rx1) = waiter();
let (tx2, mut rx2) = waiter();
let (tx3, mut rx3) = waiter();
let (tx4, mut rx4) = waiter();
let mut waiters = Waiters::one(tx1);
waiters.merge(Waiters::one(tx2));
waiters.merge(Waiters(vec![tx3, tx4].into()));
assert_eq!(4, waiters.0.len());
waiters.reply_with(|| Ok(()));
assert_waiter_ok(&mut rx1);
assert_waiter_ok(&mut rx2);
assert_waiter_ok(&mut rx3);
assert_waiter_ok(&mut rx4);
}
#[test]
fn test_write_request_duplicate_column() {
let rows = Rows {

View File

@@ -136,13 +136,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
let _ = worker_sender
.send(WorkerRequestWithTime::new(WorkerRequest::EditRegion(
RegionEditRequest {
region_id: region.region_id,
edit,
RegionEditRequest::new(
region_id, edit,
// we don't need to preload sst cache during repartition, as it may cause extra network overhead.
preload_sst_cache: false,
tx,
},
false, tx,
),
)))
.await;

View File

@@ -20,8 +20,9 @@ use std::collections::{HashMap, VecDeque};
use std::num::NonZeroU64;
use std::sync::Arc;
use common_telemetry::{info, warn};
use common_telemetry::{debug, info, warn};
use parquet::file::metadata::PageIndexPolicy;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -29,7 +30,7 @@ use store_api::storage::RegionId;
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::config::IndexBuildMode;
use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::error::{EditRegionSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
@@ -73,19 +74,60 @@ impl RegionEditQueue {
fn enqueue(&mut self, request: RegionEditRequest) {
if self.requests.len() > Self::QUEUE_MAX_LEN {
let _ = request.tx.send(
request.waiters.reply_with(|| {
RegionBusySnafu {
region_id: self.region_id,
}
.fail(),
);
.fail()
});
return;
};
self.requests.push_back(request);
}
fn dequeue(&mut self) -> Option<RegionEditRequest> {
self.requests.pop_front()
fn can_merge(edit: &RegionEdit) -> bool {
// Only the `RegionEdit`:
// 1. contains the "raw" (file without a sequence) files to add,
// 2. and no `committed_sequence`,
// 3. and all other fields are empty,
// can it be merged.
//
// However, merging them means they will all share a same sequence, and if there are
// overlapping data in the files, the dedup is uncertain. This is a caution that must
// be noticed for editing region.
edit.files_to_add.iter().all(|f| f.sequence.is_none())
&& edit.files_to_remove.is_empty()
&& edit.timestamp_ms.is_none()
&& edit.compaction_time_window.is_none()
&& edit.flushed_entry_id.is_none()
&& edit.flushed_sequence.is_none()
&& edit.committed_sequence.is_none()
}
let mut merged = self.requests.pop_front()?;
if !can_merge(&merged.edit) {
return Some(merged);
}
while let Some(request) = self
.requests
.pop_front_if(|request| can_merge(&request.edit))
{
merged.edit.files_to_add.extend(request.edit.files_to_add);
merged.waiters.merge(request.waiters);
}
debug!(
"the files to add: [{}] are merged in one edit",
merged
.edit
.files_to_add
.iter()
.map(|x| x.file_id.to_string())
.collect::<Vec<_>>()
.join(", ")
);
Some(merged)
}
fn is_empty(&self) -> bool {
@@ -94,12 +136,12 @@ impl RegionEditQueue {
fn reject_all_as_not_found(mut self) {
while let Some(request) = self.requests.pop_front() {
let _ = request.tx.send(
request.waiters.reply_with(|| {
RegionNotFoundSnafu {
region_id: self.region_id,
}
.fail(),
);
.fail()
});
}
}
}
@@ -243,7 +285,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
request
.waiters
.reply_with(|| RegionNotFoundSnafu { region_id }.fail());
return;
};
@@ -254,7 +298,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.or_insert_with(|| RegionEditQueue::new(region_id))
.enqueue(request);
} else {
let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
request
.waiters
.reply_with(|| RegionBusySnafu { region_id }.fail());
}
return;
}
@@ -262,7 +308,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let RegionEditRequest {
region_id: _,
mut edit,
tx: sender,
waiters,
preload_sst_cache,
} = request;
let file_sequence = region.version_control.committed_sequence() + 1;
@@ -282,7 +328,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};
// Marks the region as editing.
if let Err(e) = region.set_editing(expect_state) {
let _ = sender.send(Err(e));
let e = Arc::new(e);
waiters.reply_with(|| Err(e.clone()).context(EditRegionSnafu { region_id }));
return;
}
@@ -300,12 +347,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
is_staging,
preload_sst_cache,
)
.await;
.await
.map_err(Arc::new);
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
region_id,
sender,
waiters,
edit,
result,
// we always need to restore region state after region edit
@@ -336,12 +384,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// edit-completion notification reached the worker.
self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
self.reject_region_edit_queue_as_not_found(edit_result.region_id);
let _ = edit_result.sender.send(
edit_result.waiters.reply_with(|| {
RegionNotFoundSnafu {
region_id: edit_result.region_id,
}
.fail(),
);
.fail()
});
return;
}
};
@@ -373,7 +422,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
need_compaction
};
let _ = edit_result.sender.send(edit_result.result);
edit_result
.waiters
.reply_with(|| match &edit_result.result {
Ok(()) => Ok(()),
Err(e) => Err(e.clone()).context(EditRegionSnafu {
region_id: edit_result.region_id,
}),
});
if edit_result.update_region_state {
// Writes stalled specifically by this edit are handled before the next queued edit.