diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index 4a92d3494f..84bed66e38 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -15,21 +15,30 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; +use api::v1::{ArrowIpc, Rows}; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_recordbatch::DfRecordBatch; +use common_test_util::flight::encode_to_flight_data; use common_time::util::current_time_millis; +use datatypes::arrow::array::{ArrayRef, Float64Array, StringArray, TimestampMillisecondArray}; +use datatypes::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use object_store::ObjectStore; use store_api::region_engine::RegionEngine; -use store_api::region_request::RegionRequest; +use store_api::region_request::{ + RegionBulkInsertsRequest, RegionCloseRequest, RegionPutRequest, RegionRequest, +}; use store_api::storage::{FileId, RegionId}; -use tokio::sync::{Barrier, oneshot}; +use tokio::sync::{Barrier, mpsc, oneshot}; use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::engine::flush_test::MockTimeProvider; use crate::engine::listener::EventListener; use crate::manifest::action::RegionEdit; -use crate::region::MitoRegionRef; +use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState}; use crate::sst::file::FileMeta; -use crate::test_util::{CreateRequestBuilder, TestEnv}; +use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, rows_schema}; #[tokio::test] async fn test_edit_region_schedule_compaction() { @@ -210,6 +219,314 @@ async fn test_edit_region_fill_cache_with_format(flat_format: bool) { assert_eq!(file_id, actual); } +#[tokio::test] +async fn test_write_during_region_editing_is_queued() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + let create_request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&create_request); + engine + .handle_request(region_id, RegionRequest::Create(create_request)) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + let edit = test_region_edit(region.region_id, FileId::random()); + + let edit_engine = engine.clone(); + let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); + + tokio::time::timeout(Duration::from_secs(3), async { + while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + drain_worker_recv_events(&mut request_rx); + + let write_engine = engine.clone(); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 1), + }; + let write_task = tokio::spawn(async move { + write_engine + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { + rows, + hint: None, + partition_expr_version: None, + }), + ) + .await + }); + wait_worker_recv_event(&mut request_rx).await; + + let second_file_id = FileId::random(); + let second_edit = test_region_edit(region.region_id, second_file_id); + let second_edit_engine = engine.clone(); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + edit_task.await.unwrap().unwrap(); + let output = write_task.await.unwrap().unwrap(); + assert_eq!(1, output.affected_rows); + second_edit_task.await.unwrap().unwrap(); + + let second_file_sequence = region.version().ssts.levels()[0] + .files + .iter() + .find(|(file_id, _)| **file_id == second_file_id) + .and_then(|(_, file)| file.meta_ref().sequence) + .map(|sequence| sequence.get()); + assert_eq!(Some(3), second_file_sequence); +} + +#[tokio::test] +async fn test_bulk_insert_during_region_editing_is_queued() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + engine + .handle_request( + region_id, + RegionRequest::Create(CreateRequestBuilder::new().build()), + ) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + let edit = test_region_edit(region.region_id, FileId::random()); + + let edit_engine = engine.clone(); + let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); + + tokio::time::timeout(Duration::from_secs(3), async { + while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + drain_worker_recv_events(&mut request_rx); + + let bulk_engine = engine.clone(); + let bulk_task = tokio::spawn(async move { + bulk_engine + .handle_request( + region_id, + RegionRequest::BulkInserts(build_bulk_insert_request(region_id, 0, 1)), + ) + .await + }); + + wait_worker_recv_event(&mut request_rx).await; + + let second_file_id = FileId::random(); + let second_edit = test_region_edit(region.region_id, second_file_id); + let second_edit_engine = engine.clone(); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + edit_task.await.unwrap().unwrap(); + let output = bulk_task.await.unwrap().unwrap(); + assert_eq!(1, output.affected_rows); + second_edit_task.await.unwrap().unwrap(); + + let second_file_sequence = region.version().ssts.levels()[0] + .files + .iter() + .find(|(file_id, _)| **file_id == second_file_id) + .and_then(|(_, file)| file.meta_ref().sequence) + .map(|sequence| sequence.get()); + assert_eq!(Some(3), second_file_sequence); +} + +#[tokio::test] +async fn test_stalled_write_fails_fast_if_region_closed_during_editing() { + let mut env = TestEnv::new().await; + let (engine, mut request_rx) = create_engine_with_request_listener(&mut env).await; + + let region_id = RegionId::new(1, 1); + let create_request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&create_request); + engine + .handle_request(region_id, RegionRequest::Create(create_request)) + .await + .unwrap(); + let region = engine.get_region(region_id).unwrap(); + + let manifest_guard = region.manifest_ctx.manifest_manager.write().await; + let edit = test_region_edit(region.region_id, FileId::random()); + + let edit_engine = engine.clone(); + let edit_task = tokio::spawn(async move { edit_engine.edit_region(region_id, edit).await }); + + tokio::time::timeout(Duration::from_secs(3), async { + while region.state() != RegionRoleState::Leader(RegionLeaderState::Editing) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + + drain_worker_recv_events(&mut request_rx); + + let write_engine = engine.clone(); + let rows = Rows { + schema: column_schemas, + rows: build_rows(0, 1), + }; + let write_task = tokio::spawn(async move { + write_engine + .handle_request( + region_id, + RegionRequest::Put(RegionPutRequest { + rows, + hint: None, + partition_expr_version: None, + }), + ) + .await + }); + + wait_worker_recv_event(&mut request_rx).await; + + let second_edit_engine = engine.clone(); + let second_edit = test_region_edit(region.region_id, FileId::random()); + let second_edit_task = + tokio::spawn(async move { second_edit_engine.edit_region(region_id, second_edit).await }); + wait_worker_recv_event(&mut request_rx).await; + + let close_engine = engine.clone(); + let close_task = tokio::spawn(async move { + close_engine + .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {})) + .await + }); + + wait_worker_recv_event(&mut request_rx).await; + + drop(manifest_guard); + close_task.await.unwrap().unwrap(); + + let write_result = tokio::time::timeout(Duration::from_secs(3), write_task) + .await + .expect("stalled write should fail after region is closed") + .unwrap(); + assert_eq!( + StatusCode::RegionNotFound, + write_result.unwrap_err().status_code() + ); + assert_eq!( + StatusCode::RegionNotFound, + second_edit_task.await.unwrap().unwrap_err().status_code() + ); + assert!(edit_task.await.unwrap().is_err()); +} + +struct RecvRequestListener { + tx: mpsc::UnboundedSender, +} + +impl EventListener for RecvRequestListener { + fn on_recv_requests(&self, request_num: usize) { + let _ = self.tx.send(request_num); + } +} + +async fn create_engine_with_request_listener( + env: &mut TestEnv, +) -> (MitoEngine, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + let engine = env + .create_engine_with( + MitoConfig::default(), + None, + Some(Arc::new(RecvRequestListener { tx })), + None, + ) + .await; + (engine, rx) +} + +fn drain_worker_recv_events(rx: &mut mpsc::UnboundedReceiver) { + while rx.try_recv().is_ok() {} +} + +async fn wait_worker_recv_event(rx: &mut mpsc::UnboundedReceiver) { + tokio::time::timeout(Duration::from_secs(3), rx.recv()) + .await + .unwrap() + .unwrap(); +} + +fn test_region_edit(region_id: RegionId, file_id: FileId) -> RegionEdit { + RegionEdit { + files_to_add: vec![FileMeta { + region_id, + file_id, + level: 0, + ..Default::default() + }], + files_to_remove: vec![], + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + } +} + +fn build_bulk_insert_request( + region_id: RegionId, + start: usize, + end: usize, +) -> RegionBulkInsertsRequest { + let schema = Arc::new(Schema::new(vec![ + Field::new("tag_0", DataType::Utf8, true), + Field::new("field_0", DataType::Float64, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ])); + let tag = Arc::new(StringArray::from_iter_values( + (start..end).map(|value| value.to_string()), + )) as ArrayRef; + let field = Arc::new(Float64Array::from_iter_values( + (start..end).map(|value| value as f64), + )) as ArrayRef; + let ts = Arc::new(TimestampMillisecondArray::from_iter_values( + (start..end).map(|value| value as i64 * 1000), + )) as ArrayRef; + let payload = DfRecordBatch::try_new(schema, vec![tag, field, ts]).unwrap(); + let (schema, record_batch) = encode_to_flight_data(payload.clone()); + + RegionBulkInsertsRequest { + region_id, + payload, + raw_data: ArrowIpc { + schema: schema.data_header, + data_header: record_batch.data_header, + payload: record_batch.data_body, + }, + partition_expr_version: None, + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_edit_region_concurrently() { test_edit_region_concurrently_with_format(false).await; diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index cc965f48df..fefac39e6a 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -556,6 +556,13 @@ pub(crate) struct SenderBulkRequest { pub(crate) partition_expr_version: Option, } +#[derive(Debug)] +pub(crate) struct BulkInsertRequest { + pub(crate) metadata: Option, + pub(crate) request: RegionBulkInsertsRequest, + pub(crate) sender: OptionOutputTx, +} + /// Request sent to a worker with timestamp #[derive(Debug)] pub(crate) struct WorkerRequestWithTime { @@ -609,11 +616,7 @@ pub(crate) enum WorkerRequest { SyncRegion(RegionSyncRequest), /// Bulk inserts request and region metadata. - BulkInserts { - metadata: Option, - request: RegionBulkInsertsRequest, - sender: OptionOutputTx, - }, + BulkInserts(BulkInsertRequest), /// Remap manifests request. RemapManifests(RemapManifestsRequest), @@ -748,11 +751,13 @@ impl WorkerRequest { sender: sender.into(), request: DdlRequest::EnterStaging(v), }), - RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts { - metadata: region_metadata, - sender: sender.into(), - request: region_bulk_inserts_request, - }, + RegionRequest::BulkInserts(region_bulk_inserts_request) => { + WorkerRequest::BulkInserts(BulkInsertRequest { + metadata: region_metadata, + sender: sender.into(), + request: region_bulk_inserts_request, + }) + } RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, sender: sender.into(), diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 3fb6a63184..51b3594b95 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -73,8 +73,8 @@ use crate::region::{ RegionMapRef, }; use crate::request::{ - BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest, - WorkerRequest, WorkerRequestWithTime, + BackgroundNotify, BulkInsertRequest, DdlRequest, SenderBulkRequest, SenderDdlRequest, + SenderWriteRequest, WorkerRequest, WorkerRequestWithTime, }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::file::RegionFileId; @@ -919,6 +919,7 @@ impl RegionWorkerLoop { write_req_buffer.clear(); ddl_req_buffer.clear(); general_req_buffer.clear(); + let mut bulk_insert_req_num = 0; let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL); let sleep = tokio::time::sleep(max_wait_time); @@ -935,6 +936,11 @@ impl RegionWorkerLoop { match request_with_time.request { WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req), WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req), + WorkerRequest::BulkInserts(bulk_insert) => { + bulk_insert_req_num += 1; + self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer) + .await; + } req => general_req_buffer.push(req), } }, @@ -983,6 +989,11 @@ impl RegionWorkerLoop { match request_with_time.request { WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req), WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req), + WorkerRequest::BulkInserts(bulk_insert) => { + bulk_insert_req_num += 1; + self.buffer_bulk_insert_request(bulk_insert, &mut bulk_req_buffer) + .await + } req => general_req_buffer.push(req), } } @@ -992,7 +1003,10 @@ impl RegionWorkerLoop { } self.listener.on_recv_requests( - write_req_buffer.len() + ddl_req_buffer.len() + general_req_buffer.len(), + write_req_buffer.len() + + ddl_req_buffer.len() + + general_req_buffer.len() + + bulk_insert_req_num, ); self.handle_requests( @@ -1011,9 +1025,34 @@ impl RegionWorkerLoop { info!("Exit region worker thread {}", self.id); } + async fn buffer_bulk_insert_request( + &mut self, + bulk_insert: BulkInsertRequest, + bulk_requests: &mut Vec, + ) { + let BulkInsertRequest { + metadata, + request, + sender, + } = bulk_insert; + + if let Some(region_metadata) = metadata { + self.handle_bulk_insert_batch(region_metadata, request, bulk_requests, sender) + .await; + } else { + error!("Cannot find region metadata for {}", request.region_id); + sender.send( + error::RegionNotFoundSnafu { + region_id: request.region_id, + } + .fail(), + ); + } + } + /// Dispatches and processes requests. /// - /// `buffer` should be empty. + /// `general_requests` should not contain categorized write, ddl, or bulk insert requests. async fn handle_requests( &mut self, write_requests: &mut Vec, @@ -1024,10 +1063,31 @@ impl RegionWorkerLoop { for worker_req in general_requests.drain(..) { match worker_req { WorkerRequest::Write(_) | WorkerRequest::Ddl(_) => { - // These requests are categorized into write_requests and ddl_requests. + // These requests are categorized before dispatching general requests. continue; } + WorkerRequest::BulkInserts(_) => unreachable!("bulk inserts are buffered"), WorkerRequest::Background { region_id, notify } => { + if matches!( + ¬ify, + BackgroundNotify::RegionEdit(edit_result) + if edit_result.update_region_state + ) { + // Region state must be Editing when reach here. + // This call only moves write/bulk write request into stall queue. When region edit result + // is processed inside handle_background_notify and region state is switched back to Writable, + // stalled request will be processed before the next region edit is dequeued from + // RegionEditQueue immediately in handle_region_edit_result. It not only ensured pending writes + // are processed in time, but also prevents them from starvation. + // TODO(hl): maybe we need to merge those queues for pending requests like pending_ddl, + // region edits and stalled request, so we can simplify the coordination between these queues. + self.handle_buffered_region_write_requests( + ®ion_id, + write_requests, + bulk_requests, + ) + .await; + } // For background notify, we handle it directly. self.handle_background_notify(region_id, notify).await; } @@ -1048,29 +1108,6 @@ impl RegionWorkerLoop { WorkerRequest::SyncRegion(req) => { self.handle_region_sync(req).await; } - WorkerRequest::BulkInserts { - metadata, - request, - sender, - } => { - if let Some(region_metadata) = metadata { - self.handle_bulk_insert_batch( - region_metadata, - request, - bulk_requests, - sender, - ) - .await; - } else { - error!("Cannot find region metadata for {}", request.region_id); - sender.send( - error::RegionNotFoundSnafu { - region_id: request.region_id, - } - .fail(), - ); - } - } WorkerRequest::RemapManifests(req) => { self.handle_remap_manifests_request(req); } diff --git a/src/mito2/src/worker/handle_close.rs b/src/mito2/src/worker/handle_close.rs index 52fe068f15..1abd772f9f 100644 --- a/src/mito2/src/worker/handle_close.rs +++ b/src/mito2/src/worker/handle_close.rs @@ -69,6 +69,8 @@ impl RegionWorkerLoop { return; }; region.stop().await; + self.fail_region_stalled_requests_as_not_found(®ion_id); + self.reject_region_edit_queue_as_not_found(region_id); // Clean flush status. self.flush_scheduler.on_region_closed(region_id); // Clean compaction status. diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 75e7071468..54bca8e4d8 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -278,7 +278,7 @@ impl RegionWorkerLoop { .sender .send(enter_staging_result.result.map(|_| 0)); // Handles the stalled requests. - self.handle_region_stalled_requests(&enter_staging_result.region_id) + self.handle_region_stalled_requests(&enter_staging_result.region_id, true) .await; } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 6778ef6cf2..fce995134d 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -293,9 +293,6 @@ impl RegionWorkerLoop { let flush_on_close = request.flush_reason == FlushReason::Closing; let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add); - // Notifies waiters and observes the flush timer. - request.on_success(); - // In async mode, create indexes after flush. if self.config.index.build_mode == IndexBuildMode::Async { self.handle_rebuild_index( @@ -314,7 +311,10 @@ impl RegionWorkerLoop { // no need to handle requests and schedule compactions. self.remove_region(region_id).await; info!("Region {} closed after flush", region_id); + request.on_success(); } else { + // Notifies waiters and observes the flush timer. + request.on_success(); // Handle pending requests for the region. if let Some((mut ddl_requests, mut write_requests, mut bulk_writes)) = self.flush_scheduler.on_flush_success(region_id) diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 1be6586430..40366f9ccd 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -49,9 +49,12 @@ use crate::worker::{RegionWorkerLoop, WorkerListener}; pub(crate) type RegionEditQueues = HashMap; -/// A queue for temporary store region edit requests, if the region is in the "Editing" state. -/// When the current region edit request is completed, the next (if there exists) request in the -/// queue will be processed. +/// A queue for region edit requests received while the region is already `Editing`. +/// +/// Normal writes and bulk inserts that arrive during `Editing` use the stalled-write queue instead. +/// When an edit completes, those writes are handled before the next queued edit starts, preserving +/// sequence ordering between direct SST edits and WAL/memtable writes unless global reject +/// backpressure rejects them first. /// Everything is done in the region worker loop. pub(crate) struct RegionEditQueue { region_id: RegionId, @@ -84,9 +87,31 @@ impl RegionEditQueue { fn dequeue(&mut self) -> Option { self.requests.pop_front() } + + fn is_empty(&self) -> bool { + self.requests.is_empty() + } + + fn reject_all_as_not_found(mut self) { + while let Some(request) = self.requests.pop_front() { + let _ = request.tx.send( + RegionNotFoundSnafu { + region_id: self.region_id, + } + .fail(), + ); + } + } } impl RegionWorkerLoop { + /// Rejects queued region edit requests as region not found. + pub(crate) fn reject_region_edit_queue_as_not_found(&mut self, region_id: RegionId) { + if let Some(edit_queue) = self.region_edit_queues.remove(®ion_id) { + edit_queue.reject_all_as_not_found(); + } + } + /// Handles region change result. pub(crate) async fn handle_manifest_region_change_result( &mut self, @@ -134,7 +159,7 @@ impl RegionWorkerLoop { .await; } // Handles the stalled requests. - self.handle_region_stalled_requests(&change_result.region_id) + self.handle_region_stalled_requests(&change_result.region_id, true) .await; } @@ -213,7 +238,7 @@ impl RegionWorkerLoop { } } -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// Handles region edit request. pub(crate) fn handle_region_edit(&mut self, request: RegionEditRequest) { let region_id = request.region_id; @@ -299,6 +324,10 @@ impl RegionWorkerLoop { let region = match self.regions.get_region(edit_result.region_id) { Some(region) => region, None => { + // Fail writes stalled behind this edit if the region was removed before the + // edit-completion notification reached the worker. + self.fail_region_stalled_requests_as_not_found(&edit_result.region_id); + self.reject_region_edit_queue_as_not_found(edit_result.region_id); let _ = edit_result.sender.send( RegionNotFoundSnafu { region_id: edit_result.region_id, @@ -338,9 +367,24 @@ impl RegionWorkerLoop { let _ = edit_result.sender.send(edit_result.result); - if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) - && let Some(request) = edit_queue.dequeue() - { + if edit_result.update_region_state { + // Writes stalled specifically by this edit are handled before the next queued edit. + // Otherwise the next edit could reserve a committed sequence before those writes. + self.handle_region_stalled_requests(&edit_result.region_id, false) + .await; + } + + let next_request = + if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) { + let request = edit_queue.dequeue(); + if edit_queue.is_empty() { + self.region_edit_queues.remove(&edit_result.region_id); + } + request + } else { + None + }; + if let Some(request) = next_request { self.handle_region_edit(request); } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 802073b816..721415411e 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -26,8 +26,8 @@ use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::error::{ - InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionStateSnafu, RejectWriteSnafu, - Result, + InvalidRequestSnafu, PartitionExprVersionMismatchSnafu, RegionNotFoundSnafu, RegionStateSnafu, + RejectWriteSnafu, Result, }; use crate::metrics; use crate::metrics::{ @@ -193,15 +193,79 @@ impl RegionWorkerLoop { reject_write_requests(&mut requests, &mut bulk); } + /// Fails a specific region's stalled requests if the region no longer exists. + pub(crate) fn fail_region_stalled_requests_as_not_found(&mut self, region_id: &RegionId) { + debug!( + "Fails stalled requests for region {} as region not found", + region_id + ); + let (requests, bulk) = self.stalled_requests.remove(region_id); + self.stalling_count + .sub((requests.len() + bulk.len()) as i64); + + for req in requests { + req.sender.send( + RegionNotFoundSnafu { + region_id: req.request.region_id, + } + .fail(), + ); + } + for req in bulk { + req.sender.send( + RegionNotFoundSnafu { + region_id: req.region_id, + } + .fail(), + ); + } + } + /// Handles a specific region's stalled requests. - pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { + /// + /// `allow_stall` should be false for backpressure retry paths to avoid stalling the same + /// requests again. It should remain true for non-backpressure retries, such as requests stalled + /// by alter, staging, and region editing. Global reject backpressure still applies before the + /// stall check. + pub(crate) async fn handle_region_stalled_requests( + &mut self, + region_id: &RegionId, + allow_stall: bool, + ) { debug!("Handles stalled requests for region {}", region_id); let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); self.stalling_count .sub((requests.len() + bulk.len()) as i64); - self.handle_write_requests(&mut requests, &mut bulk, true) + self.handle_write_requests(&mut requests, &mut bulk, allow_stall) .await; } + + /// Processes same-batch writes for a region before handling its edit-completion notification. + /// + /// The worker dispatch loop handles background notifications before the current batch's write + /// buffer. Without this step, writes that arrived during edit N could be classified only after + /// edit N+1 is started, placing them behind that next edit. + pub(crate) async fn handle_buffered_region_write_requests( + &mut self, + region_id: &RegionId, + write_requests: &mut Vec, + bulk_requests: &mut Vec, + ) { + let mut current_region_write_requests = write_requests + .extract_if(.., |r| r.request.region_id == *region_id) + .collect::>(); + + let mut current_region_bulk_requests = bulk_requests + .extract_if(.., |r| r.region_id == *region_id) + .collect::>(); + + self.handle_write_requests( + &mut current_region_write_requests, + &mut current_region_bulk_requests, + true, + ) + .await; + } } impl RegionWorkerLoop { @@ -269,10 +333,14 @@ impl RegionWorkerLoop { e.insert(region_ctx); } - RegionRoleState::Leader(RegionLeaderState::Altering) => { + RegionRoleState::Leader(RegionLeaderState::Altering) + | RegionRoleState::Leader(RegionLeaderState::Editing) => { + // Editing is transient: queue the write so edit completion can drain it + // before starting the next queued edit. debug!( - "Region {} is altering, add request to pending writes", - region.region_id + "Region {} is {:?}, add request to pending writes", + region.region_id, + region.state() ); self.stalling_count.add(1); WRITE_STALL_TOTAL.inc(); @@ -408,10 +476,14 @@ impl RegionWorkerLoop { e.insert(region_ctx); } - RegionRoleState::Leader(RegionLeaderState::Altering) => { + RegionRoleState::Leader(RegionLeaderState::Altering) + | RegionRoleState::Leader(RegionLeaderState::Editing) => { + // Editing is transient: queue the bulk write so edit completion can drain + // it before starting the next queued edit. debug!( - "Region {} is altering, add request to pending writes", - region.region_id + "Region {} is {:?}, add request to pending writes", + region.region_id, + region.state() ); self.stalling_count.add(1); WRITE_STALL_TOTAL.inc();