feat: refine region state checks and handle stalled requests (#4971)

* feat: refine region state checks and handle stalled requests

* test: add tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: add comments
This commit is contained in:
Weny Xu
2024-11-13 10:51:27 +08:00
committed by GitHub
parent 0b6d78a527
commit c47ad548a4
6 changed files with 267 additions and 58 deletions

View File

@@ -31,7 +31,7 @@ use store_api::region_request::{
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::AlterFlushListener;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
@@ -572,3 +572,70 @@ async fn test_alter_column_fulltext_options() {
check_fulltext_options(&engine, &expect_fulltext_options);
check_region_version(&engine, region_id, 1, 3, 1, 3);
}
#[tokio::test]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let listener = Arc::new(NotifyRegionChangeResultListener::default());
let engine = env
.create_engine_with(MitoConfig::default(), None, Some(listener.clone()))
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
)
.await;
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let engine_cloned = engine.clone();
let alter_job = tokio::spawn(async move {
let request = add_tag1();
engine_cloned
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();
});
let column_schemas_cloned = column_schemas.clone();
let engine_cloned = engine.clone();
let put_job = tokio::spawn(async move {
let rows = Rows {
schema: column_schemas_cloned,
rows: build_rows(0, 3),
};
put_rows(&engine_cloned, region_id, rows).await;
});
listener.wake_notify();
alter_job.await.unwrap();
put_job.await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -70,6 +70,9 @@ pub trait EventListener: Send + Sync {
/// Notifies the listener that the compaction is scheduled.
fn on_compaction_scheduled(&self, _region_id: RegionId) {}
/// Notifies the listener that region starts to send a region change result to worker.
async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {}
}
pub type EventListenerRef = Arc<dyn EventListener>;
@@ -274,3 +277,26 @@ impl EventListener for AlterFlushListener {
self.request_begin_notify.notify_one();
}
}
#[derive(Default)]
pub struct NotifyRegionChangeResultListener {
notify: Notify,
}
impl NotifyRegionChangeResultListener {
/// Continue to sending region change result.
pub fn wake_notify(&self) {
self.notify.notify_one();
}
}
#[async_trait]
impl EventListener for NotifyRegionChangeResultListener {
async fn on_notify_region_change_result_begin(&self, region_id: RegionId) {
info!(
"Wait on notify to start notify region change result for region {}",
region_id
);
self.notify.notified().await;
}
}

View File

@@ -550,6 +550,26 @@ impl RegionMap {
Ok(region)
}
/// Gets region by region id.
///
/// Calls the callback if the region does not exist.
pub(crate) fn get_region_or<F: OnFailure>(
&self,
region_id: RegionId,
cb: &mut F,
) -> Option<MitoRegionRef> {
match self
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })
{
Ok(region) => Some(region),
Err(e) => {
cb.on_failure(e);
None
}
}
}
/// Gets writable region by region id.
///
/// Calls the callback if the region does not exist or is readonly.

View File

@@ -26,6 +26,7 @@ mod handle_open;
mod handle_truncate;
mod handle_write;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -579,7 +580,10 @@ type RequestBuffer = Vec<WorkerRequest>;
#[derive(Default)]
pub(crate) struct StalledRequests {
/// Stalled requests.
pub(crate) requests: Vec<SenderWriteRequest>,
///
/// Key: RegionId
/// Value: (estimated size, stalled requests)
pub(crate) requests: HashMap<RegionId, (usize, Vec<SenderWriteRequest>)>,
/// Estimated size of all stalled requests.
pub(crate) estimated_size: usize,
}
@@ -587,12 +591,28 @@ pub(crate) struct StalledRequests {
impl StalledRequests {
/// Appends stalled requests.
pub(crate) fn append(&mut self, requests: &mut Vec<SenderWriteRequest>) {
let size: usize = requests
.iter()
.map(|req| req.request.estimated_size())
.sum();
self.requests.append(requests);
self.estimated_size += size;
for req in requests.drain(..) {
self.push(req);
}
}
/// Pushes a stalled request to the buffer.
pub(crate) fn push(&mut self, req: SenderWriteRequest) {
let (size, requests) = self.requests.entry(req.request.region_id).or_default();
let req_size = req.request.estimated_size();
*size += req_size;
self.estimated_size += req_size;
requests.push(req);
}
/// Removes stalled requests of specific region.
pub(crate) fn remove(&mut self, region_id: &RegionId) -> Vec<SenderWriteRequest> {
if let Some((size, requests)) = self.requests.remove(region_id) {
self.estimated_size -= size;
requests
} else {
vec![]
}
}
}
@@ -854,7 +874,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req),
BackgroundNotify::RegionChange(req) => {
self.handle_manifest_region_change_result(req).await
}
BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
}
}
@@ -996,6 +1018,15 @@ impl WorkerListener {
listener.on_compaction_scheduled(_region_id);
}
}
pub(crate) async fn on_notify_region_change_result_begin(&self, _region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener
.on_notify_region_change_result_begin(_region_id)
.await;
}
}
}
#[cfg(test)]

View File

@@ -19,6 +19,7 @@
use std::collections::{HashMap, VecDeque};
use common_telemetry::{info, warn};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
use crate::cache::file_cache::{FileType, IndexKey};
@@ -74,6 +75,50 @@ impl RegionEditQueue {
}
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles region change result.
pub(crate) async fn handle_manifest_region_change_result(
&mut self,
change_result: RegionChangeResult,
) {
let region = match self.regions.get_region(change_result.region_id) {
Some(region) => region,
None => {
self.reject_region_stalled_requests(&change_result.region_id);
change_result.sender.send(
RegionNotFoundSnafu {
region_id: change_result.region_id,
}
.fail(),
);
return;
}
};
if change_result.result.is_ok() {
// Apply the metadata to region's version.
region
.version_control
.alter_schema(change_result.new_meta, &region.memtable_builder);
info!(
"Region {} is altered, schema version is {}",
region.region_id,
region.metadata().schema_version
);
}
// Sets the region as writable.
region.switch_state_to_writable(RegionLeaderState::Altering);
// Sends the result.
change_result.sender.send(change_result.result.map(|_| 0));
// Handles the stalled requests.
self.handle_region_stalled_requests(&change_result.region_id)
.await;
}
}
impl<S> RegionWorkerLoop<S> {
/// Handles region edit request.
pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
@@ -233,7 +278,7 @@ impl<S> RegionWorkerLoop<S> {
sender.send(Err(e));
return;
}
let listener = self.listener.clone();
let request_sender = self.sender.clone();
// Now the region is in altering state.
common_runtime::spawn_global(async move {
@@ -254,6 +299,9 @@ impl<S> RegionWorkerLoop<S> {
new_meta,
}),
};
listener
.on_notify_region_change_result_begin(region.region_id)
.await;
if let Err(res) = request_sender.send(notify).await {
warn!(
@@ -263,40 +311,6 @@ impl<S> RegionWorkerLoop<S> {
}
});
}
/// Handles region change result.
pub(crate) fn handle_manifest_region_change_result(&self, change_result: RegionChangeResult) {
let region = match self.regions.get_region(change_result.region_id) {
Some(region) => region,
None => {
change_result.sender.send(
RegionNotFoundSnafu {
region_id: change_result.region_id,
}
.fail(),
);
return;
}
};
if change_result.result.is_ok() {
// Apply the metadata to region's version.
region
.version_control
.alter_schema(change_result.new_meta, &region.memtable_builder);
info!(
"Region {} is altered, schema version is {}",
region.region_id,
region.metadata().schema_version
);
}
// Sets the region as writable.
region.switch_state_to_writable(RegionLeaderState::Altering);
change_result.sender.send(change_result.result.map(|_| 0));
}
}
/// Checks the edit, writes and applies it.

View File

@@ -18,13 +18,15 @@ use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use api::v1::OpType;
use common_telemetry::debug;
use snafu::ensure;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result};
use crate::error::{InvalidRequestSnafu, RegionLeaderStateSnafu, RejectWriteSnafu, Result};
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
@@ -47,9 +49,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// The memory pressure is still too high, reject write requests.
reject_write_requests(write_requests);
// Also reject all stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
reject_write_requests(stalled.requests);
self.reject_stalled_requests();
return;
}
@@ -124,7 +124,34 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
for (_, (_, requests)) in stalled.requests {
self.handle_write_requests(requests, false).await;
}
}
/// Rejects all stalled requests.
pub(crate) fn reject_stalled_requests(&mut self) {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
for (_, (_, requests)) in stalled.requests {
reject_write_requests(requests);
}
}
/// Rejects a specific region's stalled requests.
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Rejects stalled requests for region {}", region_id);
let requests = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
reject_write_requests(requests);
}
/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
let requests = self.stalled_requests.remove(region_id);
self.stalled_count.sub(requests.len() as i64);
self.handle_write_requests(requests, true).await;
}
}
@@ -152,19 +179,43 @@ impl<S> RegionWorkerLoop<S> {
if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) {
let Some(region) = self
.regions
.writable_region_or(region_id, &mut sender_req.sender)
.get_region_or(region_id, &mut sender_req.sender)
else {
// No such region or the region is read only.
// No such region.
continue;
};
match region.state() {
RegionRoleState::Leader(RegionLeaderState::Writable) => {
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
region.provider.clone(),
);
let region_ctx = RegionWriteCtx::new(
region.region_id,
&region.version_control,
region.provider.clone(),
);
e.insert(region_ctx);
e.insert(region_ctx);
}
RegionRoleState::Leader(RegionLeaderState::Altering) => {
debug!(
"Region {} is altering, add request to pending writes",
region.region_id
);
self.stalled_count.add(1);
self.stalled_requests.push(sender_req);
continue;
}
state => {
// The region is not writable.
sender_req.sender.send(
RegionLeaderStateSnafu {
region_id,
state,
expect: RegionLeaderState::Writable,
}
.fail(),
);
continue;
}
}
}
// Safety: Now we ensure the region exists.