diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 961eaf05bb..6f6649022a 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -31,7 +31,7 @@ use store_api::region_request::{ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; -use crate::engine::listener::AlterFlushListener; +use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener}; use crate::engine::MitoEngine; use crate::test_util::{ build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, @@ -572,3 +572,70 @@ async fn test_alter_column_fulltext_options() { check_fulltext_options(&engine, &expect_fulltext_options); check_region_version(&engine, region_id, 1, 3, 1, 3); } + +#[tokio::test] +async fn test_write_stall_on_altering() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new(); + let listener = Arc::new(NotifyRegionChangeResultListener::default()); + let engine = env + .create_engine_with(MitoConfig::default(), None, Some(listener.clone())) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + ) + .await; + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let engine_cloned = engine.clone(); + let alter_job = tokio::spawn(async move { + let request = add_tag1(); + engine_cloned + .handle_request(region_id, RegionRequest::Alter(request)) + .await + .unwrap(); + }); + + let column_schemas_cloned = column_schemas.clone(); + let engine_cloned = engine.clone(); + let put_job = tokio::spawn(async move { + let rows = Rows { + schema: column_schemas_cloned, + rows: build_rows(0, 3), + }; + put_rows(&engine_cloned, region_id, rows).await; + }); + + listener.wake_notify(); + alter_job.await.unwrap(); + put_job.await.unwrap(); + + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_1 | tag_0 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | 0 | 0.0 | 1970-01-01T00:00:00 | +| | 1 | 1.0 | 1970-01-01T00:00:01 | +| | 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+-------+---------+---------------------+"; + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index 83679f96e4..f11ae35456 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -70,6 +70,9 @@ pub trait EventListener: Send + Sync { /// Notifies the listener that the compaction is scheduled. fn on_compaction_scheduled(&self, _region_id: RegionId) {} + + /// Notifies the listener that region starts to send a region change result to worker. + async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {} } pub type EventListenerRef = Arc; @@ -274,3 +277,26 @@ impl EventListener for AlterFlushListener { self.request_begin_notify.notify_one(); } } + +#[derive(Default)] +pub struct NotifyRegionChangeResultListener { + notify: Notify, +} + +impl NotifyRegionChangeResultListener { + /// Continue to sending region change result. + pub fn wake_notify(&self) { + self.notify.notify_one(); + } +} + +#[async_trait] +impl EventListener for NotifyRegionChangeResultListener { + async fn on_notify_region_change_result_begin(&self, region_id: RegionId) { + info!( + "Wait on notify to start notify region change result for region {}", + region_id + ); + self.notify.notified().await; + } +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 4ce633e6a6..41fb239a58 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -550,6 +550,26 @@ impl RegionMap { Ok(region) } + /// Gets region by region id. + /// + /// Calls the callback if the region does not exist. + pub(crate) fn get_region_or( + &self, + region_id: RegionId, + cb: &mut F, + ) -> Option { + match self + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id }) + { + Ok(region) => Some(region), + Err(e) => { + cb.on_failure(e); + None + } + } + } + /// Gets writable region by region id. /// /// Calls the callback if the region does not exist or is readonly. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 7ebb12963b..e883f18338 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -26,6 +26,7 @@ mod handle_open; mod handle_truncate; mod handle_write; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -579,7 +580,10 @@ type RequestBuffer = Vec; #[derive(Default)] pub(crate) struct StalledRequests { /// Stalled requests. - pub(crate) requests: Vec, + /// + /// Key: RegionId + /// Value: (estimated size, stalled requests) + pub(crate) requests: HashMap)>, /// Estimated size of all stalled requests. pub(crate) estimated_size: usize, } @@ -587,12 +591,28 @@ pub(crate) struct StalledRequests { impl StalledRequests { /// Appends stalled requests. pub(crate) fn append(&mut self, requests: &mut Vec) { - let size: usize = requests - .iter() - .map(|req| req.request.estimated_size()) - .sum(); - self.requests.append(requests); - self.estimated_size += size; + for req in requests.drain(..) { + self.push(req); + } + } + + /// Pushes a stalled request to the buffer. + pub(crate) fn push(&mut self, req: SenderWriteRequest) { + let (size, requests) = self.requests.entry(req.request.region_id).or_default(); + let req_size = req.request.estimated_size(); + *size += req_size; + self.estimated_size += req_size; + requests.push(req); + } + + /// Removes stalled requests of specific region. + pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec { + if let Some((size, requests)) = self.requests.remove(region_id) { + self.estimated_size -= size; + requests + } else { + vec![] + } } } @@ -854,7 +874,9 @@ impl RegionWorkerLoop { } BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await, - BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req), + BackgroundNotify::RegionChange(req) => { + self.handle_manifest_region_change_result(req).await + } BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await, } } @@ -996,6 +1018,15 @@ impl WorkerListener { listener.on_compaction_scheduled(_region_id); } } + + pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) { + #[cfg(any(test, feature = "test"))] + if let Some(listener) = &self.listener { + listener + .on_notify_region_change_result_begin(_region_id) + .await; + } + } } #[cfg(test)] diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index e97b30afec..5a21c0de15 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -19,6 +19,7 @@ use std::collections::{HashMap, VecDeque}; use common_telemetry::{info, warn}; +use store_api::logstore::LogStore; use store_api::storage::RegionId; use crate::cache::file_cache::{FileType, IndexKey}; @@ -74,6 +75,50 @@ impl RegionEditQueue { } } +impl RegionWorkerLoop { + /// Handles region change result. + pub(crate) async fn handle_manifest_region_change_result( + &mut self, + change_result: RegionChangeResult, + ) { + let region = match self.regions.get_region(change_result.region_id) { + Some(region) => region, + None => { + self.reject_region_stalled_requests(&change_result.region_id); + change_result.sender.send( + RegionNotFoundSnafu { + region_id: change_result.region_id, + } + .fail(), + ); + return; + } + }; + + if change_result.result.is_ok() { + // Apply the metadata to region's version. + region + .version_control + .alter_schema(change_result.new_meta, ®ion.memtable_builder); + + info!( + "Region {} is altered, schema version is {}", + region.region_id, + region.metadata().schema_version + ); + } + + // Sets the region as writable. + region.switch_state_to_writable(RegionLeaderState::Altering); + // Sends the result. + change_result.sender.send(change_result.result.map(|_| 0)); + + // Handles the stalled requests. + self.handle_region_stalled_requests(&change_result.region_id) + .await; + } +} + impl RegionWorkerLoop { /// Handles region edit request. pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) { @@ -233,7 +278,7 @@ impl RegionWorkerLoop { sender.send(Err(e)); return; } - + let listener = self.listener.clone(); let request_sender = self.sender.clone(); // Now the region is in altering state. common_runtime::spawn_global(async move { @@ -254,6 +299,9 @@ impl RegionWorkerLoop { new_meta, }), }; + listener + .on_notify_region_change_result_begin(region.region_id) + .await; if let Err(res) = request_sender.send(notify).await { warn!( @@ -263,40 +311,6 @@ impl RegionWorkerLoop { } }); } - - /// Handles region change result. - pub(crate) fn handle_manifest_region_change_result(&self, change_result: RegionChangeResult) { - let region = match self.regions.get_region(change_result.region_id) { - Some(region) => region, - None => { - change_result.sender.send( - RegionNotFoundSnafu { - region_id: change_result.region_id, - } - .fail(), - ); - return; - } - }; - - if change_result.result.is_ok() { - // Apply the metadata to region's version. - region - .version_control - .alter_schema(change_result.new_meta, ®ion.memtable_builder); - - info!( - "Region {} is altered, schema version is {}", - region.region_id, - region.metadata().schema_version - ); - } - - // Sets the region as writable. - region.switch_state_to_writable(RegionLeaderState::Altering); - - change_result.sender.send(change_result.result.map(|_| 0)); - } } /// Checks the edit, writes and applies it. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 85ce49f315..7dccb6952a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -18,13 +18,15 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; use api::v1::OpType; +use common_telemetry::debug; use snafu::ensure; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; use store_api::storage::RegionId; -use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result}; +use crate::error::{InvalidRequestSnafu, RegionLeaderStateSnafu, RejectWriteSnafu, Result}; use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED}; +use crate::region::{RegionLeaderState, RegionRoleState}; use crate::region_write_ctx::RegionWriteCtx; use crate::request::{SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; @@ -47,9 +49,7 @@ impl RegionWorkerLoop { // The memory pressure is still too high, reject write requests. reject_write_requests(write_requests); // Also reject all stalled requests. - let stalled = std::mem::take(&mut self.stalled_requests); - self.stalled_count.sub(stalled.requests.len() as i64); - reject_write_requests(stalled.requests); + self.reject_stalled_requests(); return; } @@ -124,7 +124,34 @@ impl RegionWorkerLoop { let stalled = std::mem::take(&mut self.stalled_requests); self.stalled_count.sub(stalled.requests.len() as i64); // We already stalled these requests, don't stall them again. - self.handle_write_requests(stalled.requests, false).await; + for (_, (_, requests)) in stalled.requests { + self.handle_write_requests(requests, false).await; + } + } + + /// Rejects all stalled requests. + pub(crate) fn reject_stalled_requests(&mut self) { + let stalled = std::mem::take(&mut self.stalled_requests); + self.stalled_count.sub(stalled.requests.len() as i64); + for (_, (_, requests)) in stalled.requests { + reject_write_requests(requests); + } + } + + /// Rejects a specific region's stalled requests. + pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) { + debug!("Rejects stalled requests for region {}", region_id); + let requests = self.stalled_requests.remove(region_id); + self.stalled_count.sub(requests.len() as i64); + reject_write_requests(requests); + } + + /// Handles a specific region's stalled requests. + pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { + debug!("Handles stalled requests for region {}", region_id); + let requests = self.stalled_requests.remove(region_id); + self.stalled_count.sub(requests.len() as i64); + self.handle_write_requests(requests, true).await; } } @@ -152,19 +179,43 @@ impl RegionWorkerLoop { if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { let Some(region) = self .regions - .writable_region_or(region_id, &mut sender_req.sender) + .get_region_or(region_id, &mut sender_req.sender) else { - // No such region or the region is read only. + // No such region. continue; }; + match region.state() { + RegionRoleState::Leader(RegionLeaderState::Writable) => { + let region_ctx = RegionWriteCtx::new( + region.region_id, + ®ion.version_control, + region.provider.clone(), + ); - let region_ctx = RegionWriteCtx::new( - region.region_id, - ®ion.version_control, - region.provider.clone(), - ); - - e.insert(region_ctx); + e.insert(region_ctx); + } + RegionRoleState::Leader(RegionLeaderState::Altering) => { + debug!( + "Region {} is altering, add request to pending writes", + region.region_id + ); + self.stalled_count.add(1); + self.stalled_requests.push(sender_req); + continue; + } + state => { + // The region is not writable. + sender_req.sender.send( + RegionLeaderStateSnafu { + region_id, + state, + expect: RegionLeaderState::Writable, + } + .fail(), + ); + continue; + } + } } // Safety: Now we ensure the region exists.