feat: merge files to add in one region edit

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-05-20 20:53:36 +08:00
parent 28bed396e2
commit 2a07c0e829
7 changed files with 355 additions and 72 deletions

View File

@@ -33,7 +33,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTIO
use crate::region::RegionRoleState;
use crate::request::{
BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx,
RegionEditResult, WorkerRequest, WorkerRequestWithTime,
RegionEditResult, Waiters, WorkerRequest, WorkerRequestWithTime,
};
use crate::sst::file::FileMeta;
use crate::worker::WorkerListener;
@@ -162,7 +162,7 @@ impl CompactionTaskImpl {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
region_id,
sender: expire_delete_sender,
waiters: Waiters::One(expire_delete_sender),
edit,
result: Ok(()),
update_region_state: false,

View File

@@ -455,11 +455,7 @@ impl MitoEngine {
);
let (tx, rx) = oneshot::channel();
let request = WorkerRequest::EditRegion(RegionEditRequest {
region_id,
edit,
tx,
});
let request = WorkerRequest::EditRegion(RegionEditRequest::new(region_id, edit, tx));
self.inner
.workers
.submit_to_worker(region_id, request)

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroU64;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -239,13 +240,8 @@ async fn test_write_during_region_editing_is_queued() {
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
let write_engine = engine.clone();
@@ -280,12 +276,7 @@ async fn test_write_during_region_editing_is_queued() {
assert_eq!(1, output.affected_rows);
second_edit_task.await.unwrap().unwrap();
let second_file_sequence = region.version().ssts.levels()[0]
.files
.iter()
.find(|(file_id, _)| **file_id == second_file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get());
let second_file_sequence = region_file_sequence(&region, second_file_id);
assert_eq!(Some(3), second_file_sequence);
}
@@ -310,13 +301,8 @@ async fn test_bulk_insert_during_region_editing_is_queued() {
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
let bulk_engine = engine.clone();
@@ -344,12 +330,7 @@ async fn test_bulk_insert_during_region_editing_is_queued() {
assert_eq!(1, output.affected_rows);
second_edit_task.await.unwrap().unwrap();
let second_file_sequence = region.version().ssts.levels()[0]
.files
.iter()
.find(|(file_id, _)| **file_id == second_file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get());
let second_file_sequence = region_file_sequence(&region, second_file_id);
assert_eq!(Some(3), second_file_sequence);
}
@@ -373,13 +354,7 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() {
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
@@ -436,6 +411,142 @@ async fn test_stalled_write_fails_fast_if_region_closed_during_editing() {
assert!(edit_task.await.unwrap().is_err());
}
#[tokio::test]
async fn test_raw_add_only_region_edits_are_merged() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
// Hold the manifest lock so the first edit keeps the region in `Editing`.
// The following two edits will be queued in `RegionEditQueue`.
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let first_file_id = FileId::random();
let first_edit = test_region_edit(region.region_id, first_file_id);
let first_edit_engine = engine.clone();
let first_edit_task =
tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await });
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
// Both queued edits are raw add-only edits, so they should be merged into
// a single edit and share the same committed sequence.
let second_file_id = FileId::random();
let second_edit = test_region_edit(region.region_id, second_file_id);
let second_edit_engine = engine.clone();
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
let third_file_id = FileId::random();
let third_edit = test_region_edit(region.region_id, third_file_id);
let third_edit_engine = engine.clone();
let third_edit_task =
tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await });
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
first_edit_task.await.unwrap().unwrap();
second_edit_task.await.unwrap().unwrap();
third_edit_task.await.unwrap().unwrap();
// The first edit gets sequence 1. The merged queued edit gets sequence 2,
// so both files from the second and third requests share sequence 2.
assert_eq!(Some(1), region_file_sequence(&region, first_file_id));
assert_eq!(Some(2), region_file_sequence(&region, second_file_id));
assert_eq!(Some(2), region_file_sequence(&region, third_file_id));
}
#[tokio::test]
async fn test_region_edit_with_file_sequence_is_not_merged() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
// Hold the manifest lock so the first edit keeps the region in `Editing`.
// The following two edits will be queued in `RegionEditQueue`.
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let first_file_id = FileId::random();
let first_edit = test_region_edit(region.region_id, first_file_id);
let first_edit_engine = engine.clone();
let first_edit_task =
tokio::spawn(async move { first_edit_engine.edit_region(region_id, first_edit).await });
wait_until_region_is_in_editing(&region).await;
drain_worker_recv_events(&mut request_rx);
let second_file_id = FileId::random();
let mut second_edit = test_region_edit(region.region_id, second_file_id);
// A file that already carries a sequence is not a raw add-only edit and
// must not be merged with the next queued edit.
second_edit.files_to_add[0].sequence = NonZeroU64::new(99);
let second_edit_engine = engine.clone();
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
let third_file_id = FileId::random();
let third_edit = test_region_edit(region.region_id, third_file_id);
let third_edit_engine = engine.clone();
let third_edit_task =
tokio::spawn(async move { third_edit_engine.edit_region(region_id, third_edit).await });
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
first_edit_task.await.unwrap().unwrap();
second_edit_task.await.unwrap().unwrap();
third_edit_task.await.unwrap().unwrap();
// Because the second edit is not mergeable, the third edit runs separately
// and receives the next committed sequence.
assert_eq!(Some(1), region_file_sequence(&region, first_file_id));
assert_eq!(Some(2), region_file_sequence(&region, second_file_id));
assert_eq!(Some(3), region_file_sequence(&region, third_file_id));
}
async fn wait_until_region_is_in_editing(region: &MitoRegionRef) {
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}
fn region_file_sequence(region: &MitoRegionRef, file_id: FileId) -> Option<u64> {
region.version().ssts.levels()[0]
.files
.iter()
.find(|(id, _)| **id == file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get())
}
struct RecvRequestListener {
tx: mpsc::UnboundedSender<usize>,
}

View File

@@ -542,6 +542,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to edit region {}", region_id))]
EditRegion {
region_id: RegionId,
source: Arc<Error>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Failed to compat readers for region {}, reason: {}",
region_id,
@@ -1355,6 +1363,7 @@ impl ErrorExt for Error {
RegionTruncated { .. } => StatusCode::Cancelled,
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
EditRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
RegionState { .. } | UpdateManifest { .. } => StatusCode::RegionNotReady,

View File

@@ -1100,13 +1100,67 @@ pub(crate) struct CopyRegionFromFinished {
pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
}
#[derive(Debug, Default)]
pub(crate) enum Waiters {
#[default]
None,
One(Sender<Result<()>>),
Many(Vec<Sender<Result<()>>>),
}
impl Waiters {
pub(crate) fn reply_with<F: Fn() -> Result<()>>(self, f: F) {
match self {
Self::None => {}
Self::One(tx) => {
let _ = tx.send(f());
}
Self::Many(txs) => {
for tx in txs {
let _ = tx.send(f());
}
}
}
}
pub(crate) fn merge(&mut self, other: Self) {
let zelf = std::mem::take(self);
*self = match (zelf, other) {
(Waiters::None, x) | (x, Waiters::None) => x,
(Waiters::One(x), Waiters::One(y)) => Waiters::Many(vec![x, y]),
(Waiters::One(x), Waiters::Many(mut ys)) => {
ys.insert(0, x);
Waiters::Many(ys)
}
(Waiters::Many(mut xs), Waiters::One(y)) => {
xs.push(y);
Waiters::Many(xs)
}
(Waiters::Many(mut xs), Waiters::Many(ys)) => {
xs.extend(ys);
Waiters::Many(xs)
}
};
}
}
/// Request to edit a region directly.
#[derive(Debug)]
pub(crate) struct RegionEditRequest {
pub(crate) region_id: RegionId,
pub(crate) edit: RegionEdit,
/// The sender to notify the result to the region engine.
pub(crate) tx: Sender<Result<()>>,
/// The waiters that are waiting for this region edit's result.
pub(crate) waiters: Waiters,
}
impl RegionEditRequest {
pub(crate) fn new(region_id: RegionId, edit: RegionEdit, waiter: Sender<Result<()>>) -> Self {
Self {
region_id,
edit,
waiters: Waiters::One(waiter),
}
}
}
/// Notifies the regin the result of editing region.
@@ -1114,12 +1168,12 @@ pub(crate) struct RegionEditRequest {
pub(crate) struct RegionEditResult {
/// Region id.
pub(crate) region_id: RegionId,
/// Result sender.
pub(crate) sender: Sender<Result<()>>,
/// Result waiters.
pub(crate) waiters: Waiters,
/// Region edit to apply.
pub(crate) edit: RegionEdit,
/// Result from the manifest manager.
pub(crate) result: Result<()>,
pub(crate) result: std::result::Result<(), Arc<Error>>,
/// Whether region state need to be set to Writable after handling this request.
pub(crate) update_region_state: bool,
/// The region is in staging mode before handling this request.
@@ -1212,6 +1266,58 @@ mod tests {
}
}
fn waiter() -> (Sender<Result<()>>, Receiver<Result<()>>) {
oneshot::channel()
}
fn assert_waiter_ok(rx: &mut Receiver<Result<()>>) {
rx.try_recv().unwrap().unwrap();
}
#[test]
fn test_waiters_reply_with_single_waiter() {
let (tx, mut rx) = waiter();
Waiters::One(tx).reply_with(|| Ok(()));
assert_waiter_ok(&mut rx);
}
#[test]
fn test_waiters_reply_with_many_waiters() {
let (tx1, mut rx1) = waiter();
let (tx2, mut rx2) = waiter();
let (tx3, mut rx3) = waiter();
Waiters::Many(vec![tx1, tx2, tx3]).reply_with(|| Ok(()));
assert_waiter_ok(&mut rx1);
assert_waiter_ok(&mut rx2);
assert_waiter_ok(&mut rx3);
}
#[test]
fn test_waiters_merge() {
let (tx1, mut rx1) = waiter();
let (tx2, mut rx2) = waiter();
let (tx3, mut rx3) = waiter();
let (tx4, mut rx4) = waiter();
let mut waiters = Waiters::One(tx1);
waiters.merge(Waiters::One(tx2));
waiters.merge(Waiters::Many(vec![tx3, tx4]));
let Waiters::Many(txs) = &waiters else {
panic!("expected many waiters after merging");
};
assert_eq!(4, txs.len());
waiters.reply_with(|| Ok(()));
assert_waiter_ok(&mut rx1);
assert_waiter_ok(&mut rx2);
assert_waiter_ok(&mut rx3);
assert_waiter_ok(&mut rx4);
}
#[test]
fn test_write_request_duplicate_column() {
let rows = Rows {

View File

@@ -136,11 +136,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
let _ = worker_sender
.send(WorkerRequestWithTime::new(WorkerRequest::EditRegion(
RegionEditRequest {
region_id: region.region_id,
edit,
tx,
},
RegionEditRequest::new(region_id, edit, tx),
)))
.await;

View File

@@ -20,8 +20,9 @@ use std::collections::{HashMap, VecDeque};
use std::num::NonZeroU64;
use std::sync::Arc;
use common_telemetry::{info, warn};
use common_telemetry::{debug, info, warn};
use parquet::file::metadata::PageIndexPolicy;
use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
@@ -29,7 +30,7 @@ use store_api::storage::RegionId;
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::config::IndexBuildMode;
use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::error::{EditRegionSnafu, RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
@@ -73,19 +74,64 @@ impl RegionEditQueue {
fn enqueue(&mut self, request: RegionEditRequest) {
if self.requests.len() > Self::QUEUE_MAX_LEN {
let _ = request.tx.send(
request.waiters.reply_with(|| {
RegionBusySnafu {
region_id: self.region_id,
}
.fail(),
);
.fail()
});
return;
};
self.requests.push_back(request);
}
fn dequeue(&mut self) -> Option<RegionEditRequest> {
self.requests.pop_front()
fn dequeue(&mut self, allow_merge_edits: bool) -> Option<RegionEditRequest> {
if !allow_merge_edits {
return self.requests.pop_front();
}
fn can_merge(edit: &RegionEdit) -> bool {
// Only the `RegionEdit`:
// 1. contains the "raw" (file without a sequence) files to add,
// 2. and no `committed_sequence`,
// 3. and all other fields are empty,
// can it be merged.
//
// However, merging them means they will all share a same sequence, and if there are
// overlapping data in the files, the dedup is uncertain. This is a caution that must
// be noticed for editing region.
edit.files_to_add.iter().all(|f| f.sequence.is_none())
&& edit.files_to_remove.is_empty()
&& edit.timestamp_ms.is_none()
&& edit.compaction_time_window.is_none()
&& edit.flushed_entry_id.is_none()
&& edit.flushed_sequence.is_none()
&& edit.committed_sequence.is_none()
}
let mut merged = self.requests.pop_front()?;
if !can_merge(&merged.edit) {
return Some(merged);
}
while let Some(request) = self
.requests
.pop_front_if(|request| can_merge(&request.edit))
{
merged.edit.files_to_add.extend(request.edit.files_to_add);
merged.waiters.merge(request.waiters);
}
debug!(
"the files to add: [{}] are merged in one edit",
merged
.edit
.files_to_add
.iter()
.map(|x| x.file_id.to_string())
.collect::<Vec<_>>()
.join(", ")
);
Some(merged)
}
fn is_empty(&self) -> bool {
@@ -94,12 +140,12 @@ impl RegionEditQueue {
fn reject_all_as_not_found(mut self) {
while let Some(request) = self.requests.pop_front() {
let _ = request.tx.send(
request.waiters.reply_with(|| {
RegionNotFoundSnafu {
region_id: self.region_id,
}
.fail(),
);
.fail()
});
}
}
}
@@ -243,7 +289,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
let _ = request.tx.send(RegionNotFoundSnafu { region_id }.fail());
request
.waiters
.reply_with(|| RegionNotFoundSnafu { region_id }.fail());
return;
};
@@ -254,7 +302,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.or_insert_with(|| RegionEditQueue::new(region_id))
.enqueue(request);
} else {
let _ = request.tx.send(RegionBusySnafu { region_id }.fail());
request
.waiters
.reply_with(|| RegionBusySnafu { region_id }.fail());
}
return;
}
@@ -262,7 +312,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let RegionEditRequest {
region_id: _,
mut edit,
tx: sender,
waiters,
} = request;
let file_sequence = region.version_control.committed_sequence() + 1;
edit.committed_sequence = Some(file_sequence);
@@ -281,7 +331,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};
// Marks the region as editing.
if let Err(e) = region.set_editing(expect_state) {
let _ = sender.send(Err(e));
let e = Arc::new(e);
waiters.reply_with(|| Err(e.clone()).context(EditRegionSnafu { region_id }));
return;
}
@@ -291,13 +342,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Now the region is in editing state.
// Updates manifest in background.
common_runtime::spawn_global(async move {
let result =
edit_region(&region, edit.clone(), cache_manager, listener, is_staging).await;
let result = edit_region(&region, edit.clone(), cache_manager, listener, is_staging)
.await
.map_err(Arc::new);
let notify = WorkerRequest::Background {
region_id,
notify: BackgroundNotify::RegionEdit(RegionEditResult {
region_id,
sender,
waiters,
edit,
result,
// we always need to restore region state after region edit
@@ -328,12 +380,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// edit-completion notification reached the worker.
self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
self.reject_region_edit_queue_as_not_found(edit_result.region_id);
let _ = edit_result.sender.send(
edit_result.waiters.reply_with(|| {
RegionNotFoundSnafu {
region_id: edit_result.region_id,
}
.fail(),
);
.fail()
});
return;
}
};
@@ -365,7 +418,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
need_compaction
};
let _ = edit_result.sender.send(edit_result.result);
edit_result
.waiters
.reply_with(|| match &edit_result.result {
Ok(()) => Ok(()),
Err(e) => Err(e.clone()).context(EditRegionSnafu {
region_id: edit_result.region_id,
}),
});
if edit_result.update_region_state {
// Writes stalled specifically by this edit are handled before the next queued edit.
@@ -374,9 +434,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
let has_pending_stalled_writes = self
.stalled_requests
.requests
.contains_key(&edit_result.region_id);
let allow_merge_edits = !has_pending_stalled_writes && !region.is_staging();
let next_request =
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
let request = edit_queue.dequeue();
let request = edit_queue.dequeue(allow_merge_edits);
if edit_queue.is_empty() {
self.region_edit_queues.remove(&edit_result.region_id);
}