fix(mito): queue writes during region edit (#8079)

* fix(mito): queue writes during region edit

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor(mito): wrap bulk insert worker request

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix(mito): fail queued requests on region close

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* docs: add comment for the design considerations

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: remove redundant check for flushable region

skip ci

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: clippy

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-05-08 21:08:35 +08:00
committed by GitHub
parent b5997c6797
commit 98033b6421
8 changed files with 541 additions and 64 deletions

View File

@@ -15,21 +15,30 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use api::v1::{ArrowIpc, Rows};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::DfRecordBatch;
use common_test_util::flight::encode_to_flight_data;
use common_time::util::current_time_millis;
use datatypes::arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use object_store::ObjectStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::region_request::{
RegionBulkInsertsRequest, RegionCloseRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::{FileId, RegionId};
use tokio::sync::{Barrier, oneshot};
use tokio::sync::{Barrier, mpsc, oneshot};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::engine::flush_test::MockTimeProvider;
use crate::engine::listener::EventListener;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
use crate::sst::file::FileMeta;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, rows_schema};
#[tokio::test]
async fn test_edit_region_schedule_compaction() {
@@ -210,6 +219,314 @@ async fn test_edit_region_fill_cache_with_format(flat_format: bool) {
assert_eq!(file_id, actual);
}
#[tokio::test]
async fn test_write_during_region_editing_is_queued() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
let create_request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&create_request);
engine
.handle_request(region_id, RegionRequest::Create(create_request))
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let edit = test_region_edit(region.region_id, FileId::random());
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
drain_worker_recv_events(&mut request_rx);
let write_engine = engine.clone();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 1),
};
let write_task = tokio::spawn(async move {
write_engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_expr_version: None,
}),
)
.await
});
wait_worker_recv_event(&mut request_rx).await;
let second_file_id = FileId::random();
let second_edit = test_region_edit(region.region_id, second_file_id);
let second_edit_engine = engine.clone();
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
edit_task.await.unwrap().unwrap();
let output = write_task.await.unwrap().unwrap();
assert_eq!(1, output.affected_rows);
second_edit_task.await.unwrap().unwrap();
let second_file_sequence = region.version().ssts.levels()[0]
.files
.iter()
.find(|(file_id, _)| **file_id == second_file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get());
assert_eq!(Some(3), second_file_sequence);
}
#[tokio::test]
async fn test_bulk_insert_during_region_editing_is_queued() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
engine
.handle_request(
region_id,
RegionRequest::Create(CreateRequestBuilder::new().build()),
)
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let edit = test_region_edit(region.region_id, FileId::random());
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
drain_worker_recv_events(&mut request_rx);
let bulk_engine = engine.clone();
let bulk_task = tokio::spawn(async move {
bulk_engine
.handle_request(
region_id,
RegionRequest::BulkInserts(build_bulk_insert_request(region_id, 0, 1)),
)
.await
});
wait_worker_recv_event(&mut request_rx).await;
let second_file_id = FileId::random();
let second_edit = test_region_edit(region.region_id, second_file_id);
let second_edit_engine = engine.clone();
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
edit_task.await.unwrap().unwrap();
let output = bulk_task.await.unwrap().unwrap();
assert_eq!(1, output.affected_rows);
second_edit_task.await.unwrap().unwrap();
let second_file_sequence = region.version().ssts.levels()[0]
.files
.iter()
.find(|(file_id, _)| **file_id == second_file_id)
.and_then(|(_, file)| file.meta_ref().sequence)
.map(|sequence| sequence.get());
assert_eq!(Some(3), second_file_sequence);
}
#[tokio::test]
async fn test_stalled_write_fails_fast_if_region_closed_during_editing() {
let mut env = TestEnv::new().await;
let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await;
let region_id = RegionId::new(1, 1);
let create_request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&create_request);
engine
.handle_request(region_id, RegionRequest::Create(create_request))
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let manifest_guard = region.manifest_ctx.manifest_manager.write().await;
let edit = test_region_edit(region.region_id, FileId::random());
let edit_engine = engine.clone();
let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await });
tokio::time::timeout(Duration::from_secs(3), async {
while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
drain_worker_recv_events(&mut request_rx);
let write_engine = engine.clone();
let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 1),
};
let write_task = tokio::spawn(async move {
write_engine
.handle_request(
region_id,
RegionRequest::Put(RegionPutRequest {
rows,
hint: None,
partition_expr_version: None,
}),
)
.await
});
wait_worker_recv_event(&mut request_rx).await;
let second_edit_engine = engine.clone();
let second_edit = test_region_edit(region.region_id, FileId::random());
let second_edit_task =
tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await });
wait_worker_recv_event(&mut request_rx).await;
let close_engine = engine.clone();
let close_task = tokio::spawn(async move {
close_engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
});
wait_worker_recv_event(&mut request_rx).await;
drop(manifest_guard);
close_task.await.unwrap().unwrap();
let write_result = tokio::time::timeout(Duration::from_secs(3), write_task)
.await
.expect("stalled write should fail after region is closed")
.unwrap();
assert_eq!(
StatusCode::RegionNotFound,
write_result.unwrap_err().status_code()
);
assert_eq!(
StatusCode::RegionNotFound,
second_edit_task.await.unwrap().unwrap_err().status_code()
);
assert!(edit_task.await.unwrap().is_err());
}
struct RecvRequestListener {
tx: mpsc::UnboundedSender<usize>,
}
impl EventListener for RecvRequestListener {
fn on_recv_requests(&self, request_num: usize) {
let _ = self.tx.send(request_num);
}
}
async fn create_engine_with_request_listener(
env: &mut TestEnv,
) -> (MitoEngine, mpsc::UnboundedReceiver<usize>) {
let (tx, rx) = mpsc::unbounded_channel();
let engine = env
.create_engine_with(
MitoConfig::default(),
None,
Some(Arc::new(RecvRequestListener { tx })),
None,
)
.await;
(engine, rx)
}
fn drain_worker_recv_events(rx: &mut mpsc::UnboundedReceiver<usize>) {
while rx.try_recv().is_ok() {}
}
async fn wait_worker_recv_event(rx: &mut mpsc::UnboundedReceiver<usize>) {
tokio::time::timeout(Duration::from_secs(3), rx.recv())
.await
.unwrap()
.unwrap();
}
fn test_region_edit(region_id: RegionId, file_id: FileId) -> RegionEdit {
RegionEdit {
files_to_add: vec![FileMeta {
region_id,
file_id,
level: 0,
..Default::default()
}],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
}
}
fn build_bulk_insert_request(
region_id: RegionId,
start: usize,
end: usize,
) -> RegionBulkInsertsRequest {
let schema = Arc::new(Schema::new(vec![
Field::new("tag_0", DataType::Utf8, true),
Field::new("field_0", DataType::Float64, true),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]));
let tag = Arc::new(StringArray::from_iter_values(
(start..end).map(|value| value.to_string()),
)) as ArrayRef;
let field = Arc::new(Float64Array::from_iter_values(
(start..end).map(|value| value as f64),
)) as ArrayRef;
let ts = Arc::new(TimestampMillisecondArray::from_iter_values(
(start..end).map(|value| value as i64 * 1000),
)) as ArrayRef;
let payload = DfRecordBatch::try_new(schema, vec![tag, field, ts]).unwrap();
let (schema, record_batch) = encode_to_flight_data(payload.clone());
RegionBulkInsertsRequest {
region_id,
payload,
raw_data: ArrowIpc {
schema: schema.data_header,
data_header: record_batch.data_header,
payload: record_batch.data_body,
},
partition_expr_version: None,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_edit_region_concurrently() {
test_edit_region_concurrently_with_format(false).await;

View File

@@ -556,6 +556,13 @@ pub(crate) struct SenderBulkRequest {
pub(crate) partition_expr_version: Option<u64>,
}
#[derive(Debug)]
pub(crate) struct BulkInsertRequest {
pub(crate) metadata: Option<RegionMetadataRef>,
pub(crate) request: RegionBulkInsertsRequest,
pub(crate) sender: OptionOutputTx,
}
/// Request sent to a worker with timestamp
#[derive(Debug)]
pub(crate) struct WorkerRequestWithTime {
@@ -609,11 +616,7 @@ pub(crate) enum WorkerRequest {
SyncRegion(RegionSyncRequest),
/// Bulk inserts request and region metadata.
BulkInserts {
metadata: Option<RegionMetadataRef>,
request: RegionBulkInsertsRequest,
sender: OptionOutputTx,
},
BulkInserts(BulkInsertRequest),
/// Remap manifests request.
RemapManifests(RemapManifestsRequest),
@@ -748,11 +751,13 @@ impl WorkerRequest {
sender: sender.into(),
request: DdlRequest::EnterStaging(v),
}),
RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
metadata: region_metadata,
sender: sender.into(),
request: region_bulk_inserts_request,
},
RegionRequest::BulkInserts(region_bulk_inserts_request) => {
WorkerRequest::BulkInserts(BulkInsertRequest {
metadata: region_metadata,
sender: sender.into(),
request: region_bulk_inserts_request,
})
}
RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
region_id,
sender: sender.into(),

View File

@@ -73,8 +73,8 @@ use crate::region::{
RegionMapRef,
};
use crate::request::{
BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
WorkerRequest, WorkerRequestWithTime,
BackgroundNotify, BulkInsertRequest, DdlRequest, SenderBulkRequest, SenderDdlRequest,
SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::RegionFileId;
@@ -919,6 +919,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
write_req_buffer.clear();
ddl_req_buffer.clear();
general_req_buffer.clear();
let mut bulk_insert_req_num = 0;
let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
let sleep = tokio::time::sleep(max_wait_time);
@@ -935,6 +936,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
match request_with_time.request {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
WorkerRequest::BulkInserts(bulk_insert) => {
bulk_insert_req_num += 1;
self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
.await;
}
req => general_req_buffer.push(req),
}
},
@@ -983,6 +989,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
match request_with_time.request {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
WorkerRequest::BulkInserts(bulk_insert) => {
bulk_insert_req_num += 1;
self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer)
.await
}
req => general_req_buffer.push(req),
}
}
@@ -992,7 +1003,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
self.listener.on_recv_requests(
write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(),
write_req_buffer.len()
+ ddl_req_buffer.len()
+ general_req_buffer.len()
+ bulk_insert_req_num,
);
self.handle_requests(
@@ -1011,9 +1025,34 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!("Exit region worker thread {}", self.id);
}
async fn buffer_bulk_insert_request(
&mut self,
bulk_insert: BulkInsertRequest,
bulk_requests: &mut Vec<SenderBulkRequest>,
) {
let BulkInsertRequest {
metadata,
request,
sender,
} = bulk_insert;
if let Some(region_metadata) = metadata {
self.handle_bulk_insert_batch(region_metadata, request, bulk_requests, sender)
.await;
} else {
error!("Cannot find region metadata for {}", request.region_id);
sender.send(
error::RegionNotFoundSnafu {
region_id: request.region_id,
}
.fail(),
);
}
}
/// Dispatches and processes requests.
///
/// `buffer` should be empty.
/// `general_requests` should not contain categorized write, ddl, or bulk insert requests.
async fn handle_requests(
&mut self,
write_requests: &mut Vec<SenderWriteRequest>,
@@ -1024,10 +1063,31 @@ impl<S: LogStore> RegionWorkerLoop<S> {
for worker_req in general_requests.drain(..) {
match worker_req {
WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => {
// These requests are categorized into write_requests and ddl_requests.
// These requests are categorized before dispatching general requests.
continue;
}
WorkerRequest::BulkInserts(_) => unreachable!("bulk inserts are buffered"),
WorkerRequest::Background { region_id, notify } => {
if matches!(
&notify,
BackgroundNotify::RegionEdit(edit_result)
if edit_result.update_region_state
) {
// Region state must be Editing when reach here.
// This call only moves write/bulk write request into stall queue. When region edit result
// is processed inside handle_background_notify and region state is switched back to Writable,
// stalled request will be processed before the next region edit is dequeued from
// RegionEditQueue immediately in handle_region_edit_result. It not only ensured pending writes
// are processed in time, but also prevents them from starvation.
// TODO(hl): maybe we need to merge those queues for pending requests like pending_ddl,
// region edits and stalled request, so we can simplify the coordination between these queues.
self.handle_buffered_region_write_requests(
&region_id,
write_requests,
bulk_requests,
)
.await;
}
// For background notify, we handle it directly.
self.handle_background_notify(region_id, notify).await;
}
@@ -1048,29 +1108,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::SyncRegion(req) => {
self.handle_region_sync(req).await;
}
WorkerRequest::BulkInserts {
metadata,
request,
sender,
} => {
if let Some(region_metadata) = metadata {
self.handle_bulk_insert_batch(
region_metadata,
request,
bulk_requests,
sender,
)
.await;
} else {
error!("Cannot find region metadata for {}", request.region_id);
sender.send(
error::RegionNotFoundSnafu {
region_id: request.region_id,
}
.fail(),
);
}
}
WorkerRequest::RemapManifests(req) => {
self.handle_remap_manifests_request(req);
}

View File

@@ -69,6 +69,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
};
region.stop().await;
self.fail_region_stalled_requests_as_not_found(&region_id);
self.reject_region_edit_queue_as_not_found(region_id);
// Clean flush status.
self.flush_scheduler.on_region_closed(region_id);
// Clean compaction status.

View File

@@ -278,7 +278,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.sender
.send(enter_staging_result.result.map(|_| 0));
// Handles the stalled requests.
self.handle_region_stalled_requests(&enter_staging_result.region_id)
self.handle_region_stalled_requests(&enter_staging_result.region_id, true)
.await;
}

View File

@@ -293,9 +293,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let flush_on_close = request.flush_reason == FlushReason::Closing;
let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
// Notifies waiters and observes the flush timer.
request.on_success();
// In async mode, create indexes after flush.
if self.config.index.build_mode == IndexBuildMode::Async {
self.handle_rebuild_index(
@@ -314,7 +311,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// no need to handle requests and schedule compactions.
self.remove_region(region_id).await;
info!("Region {} closed after flush", region_id);
request.on_success();
} else {
// Notifies waiters and observes the flush timer.
request.on_success();
// Handle pending requests for the region.
if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) =
self.flush_scheduler.on_flush_success(region_id)

View File

@@ -49,9 +49,12 @@ use crate::worker::{RegionWorkerLoop, WorkerListener};
pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
/// A queue for temporary store region edit requests, if the region is in the "Editing" state.
/// When the current region edit request is completed, the next (if there exists) request in the
/// queue will be processed.
/// A queue for region edit requests received while the region is already `Editing`.
///
/// Normal writes and bulk inserts that arrive during `Editing` use the stalled-write queue instead.
/// When an edit completes, those writes are handled before the next queued edit starts, preserving
/// sequence ordering between direct SST edits and WAL/memtable writes unless global reject
/// backpressure rejects them first.
/// Everything is done in the region worker loop.
pub(crate) struct RegionEditQueue {
region_id: RegionId,
@@ -84,9 +87,31 @@ impl RegionEditQueue {
fn dequeue(&mut self) -> Option<RegionEditRequest> {
self.requests.pop_front()
}
fn is_empty(&self) -> bool {
self.requests.is_empty()
}
fn reject_all_as_not_found(mut self) {
while let Some(request) = self.requests.pop_front() {
let _ = request.tx.send(
RegionNotFoundSnafu {
region_id: self.region_id,
}
.fail(),
);
}
}
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// Rejects queued region edit requests as region not found.
pub(crate) fn reject_region_edit_queue_as_not_found(&mut self, region_id: RegionId) {
if let Some(edit_queue) = self.region_edit_queues.remove(&region_id) {
edit_queue.reject_all_as_not_found();
}
}
/// Handles region change result.
pub(crate) async fn handle_manifest_region_change_result(
&mut self,
@@ -134,7 +159,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
// Handles the stalled requests.
self.handle_region_stalled_requests(&change_result.region_id)
self.handle_region_stalled_requests(&change_result.region_id, true)
.await;
}
@@ -213,7 +238,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) {
let region_id = request.region_id;
@@ -299,6 +324,10 @@ impl<S> RegionWorkerLoop<S> {
let region = match self.regions.get_region(edit_result.region_id) {
Some(region) => region,
None => {
// Fail writes stalled behind this edit if the region was removed before the
// edit-completion notification reached the worker.
self.fail_region_stalled_requests_as_not_found(&edit_result.region_id);
self.reject_region_edit_queue_as_not_found(edit_result.region_id);
let _ = edit_result.sender.send(
RegionNotFoundSnafu {
region_id: edit_result.region_id,
@@ -338,9 +367,24 @@ impl<S> RegionWorkerLoop<S> {
let _ = edit_result.sender.send(edit_result.result);
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
&& let Some(request) = edit_queue.dequeue()
{
if edit_result.update_region_state {
// Writes stalled specifically by this edit are handled before the next queued edit.
// Otherwise the next edit could reserve a committed sequence before those writes.
self.handle_region_stalled_requests(&edit_result.region_id, false)
.await;
}
let next_request =
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) {
let request = edit_queue.dequeue();
if edit_queue.is_empty() {
self.region_edit_queues.remove(&edit_result.region_id);
}
request
} else {
None
};
if let Some(request) = next_request {
self.handle_region_edit(request);
}

View File

@@ -26,8 +26,8 @@ use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::error::{
InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu,
Result,
InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionNotFoundSnafu, RegionStateSnafu,
RejectWriteSnafu, Result,
};
use crate::metrics;
use crate::metrics::{
@@ -193,15 +193,79 @@ impl<S: LogStore> RegionWorkerLoop<S> {
reject_write_requests(&mut requests, &mut bulk);
}
/// Fails a specific region's stalled requests if the region no longer exists.
pub(crate) fn fail_region_stalled_requests_as_not_found(&mut self, region_id: &RegionId) {
debug!(
"Fails stalled requests for region {} as region not found",
region_id
);
let (requests, bulk) = self.stalled_requests.remove(region_id);
self.stalling_count
.sub((requests.len() + bulk.len()) as i64);
for req in requests {
req.sender.send(
RegionNotFoundSnafu {
region_id: req.request.region_id,
}
.fail(),
);
}
for req in bulk {
req.sender.send(
RegionNotFoundSnafu {
region_id: req.region_id,
}
.fail(),
);
}
}
/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
///
/// `allow_stall` should be false for backpressure retry paths to avoid stalling the same
/// requests again. It should remain true for non-backpressure retries, such as requests stalled
/// by alter, staging, and region editing. Global reject backpressure still applies before the
/// stall check.
pub(crate) async fn handle_region_stalled_requests(
&mut self,
region_id: &RegionId,
allow_stall: bool,
) {
debug!("Handles stalled requests for region {}", region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalling_count
.sub((requests.len() + bulk.len()) as i64);
self.handle_write_requests(&mut requests, &mut bulk, true)
self.handle_write_requests(&mut requests, &mut bulk, allow_stall)
.await;
}
/// Processes same-batch writes for a region before handling its edit-completion notification.
///
/// The worker dispatch loop handles background notifications before the current batch's write
/// buffer. Without this step, writes that arrived during edit N could be classified only after
/// edit N+1 is started, placing them behind that next edit.
pub(crate) async fn handle_buffered_region_write_requests(
&mut self,
region_id: &RegionId,
write_requests: &mut Vec<SenderWriteRequest>,
bulk_requests: &mut Vec<SenderBulkRequest>,
) {
let mut current_region_write_requests = write_requests
.extract_if(.., |r| r.request.region_id == *region_id)
.collect::<Vec<_>>();
let mut current_region_bulk_requests = bulk_requests
.extract_if(.., |r| r.region_id == *region_id)
.collect::<Vec<_>>();
self.handle_write_requests(
&mut current_region_write_requests,
&mut current_region_bulk_requests,
true,
)
.await;
}
}
impl<S> RegionWorkerLoop<S> {
@@ -269,10 +333,14 @@ impl<S> RegionWorkerLoop<S> {
e.insert(region_ctx);
}
RegionRoleState::Leader(RegionLeaderState::Altering) => {
RegionRoleState::Leader(RegionLeaderState::Altering)
| RegionRoleState::Leader(RegionLeaderState::Editing) => {
// Editing is transient: queue the write so edit completion can drain it
// before starting the next queued edit.
debug!(
"Region {} is altering, add request to pending writes",
region.region_id
"Region {} is {:?}, add request to pending writes",
region.region_id,
region.state()
);
self.stalling_count.add(1);
WRITE_STALL_TOTAL.inc();
@@ -408,10 +476,14 @@ impl<S> RegionWorkerLoop<S> {
e.insert(region_ctx);
}
RegionRoleState::Leader(RegionLeaderState::Altering) => {
RegionRoleState::Leader(RegionLeaderState::Altering)
| RegionRoleState::Leader(RegionLeaderState::Editing) => {
// Editing is transient: queue the bulk write so edit completion can drain
// it before starting the next queued edit.
debug!(
"Region {} is altering, add request to pending writes",
region.region_id
"Region {} is {:?}, add request to pending writes",
region.region_id,
region.state()
);
self.stalling_count.add(1);
WRITE_STALL_TOTAL.inc();