fix: notifies all workers once a region is flushed (#4016)

* fix: notify workers to handle stalled requests if flush is finished

* chore: change stalled count to gauge

* feat: process stalled requests eagerly
This commit is contained in:
Yingwen
2024-05-23 20:45:00 +08:00
committed by GitHub
parent 0d055b6ee6
commit dfc1acbb2a
4 changed files with 86 additions and 18 deletions

View File

@@ -23,6 +23,8 @@ pub const TYPE_LABEL: &str = "type";
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";
/// Region worker id label.
pub const WORKER_LABEL: &str = "worker";
lazy_static! {
/// Global write buffer size in bytes.
@@ -70,9 +72,12 @@ lazy_static! {
// ------ Write related metrics
/// Counter of stalled write requests.
pub static ref WRITE_STALL_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_stall_total", "mito write stall total").unwrap();
/// Number of stalled write requests in each worker.
pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_write_stall_total",
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
/// Counter of rejected write requests.
pub static ref WRITE_REJECT_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap();

View File

@@ -34,13 +34,14 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use prometheus::IntGauge;
use rand::{thread_rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::SetReadonlyResponse;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
@@ -49,6 +50,7 @@ use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_STALL_TOTAL;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
@@ -151,6 +153,7 @@ impl WorkerGroup {
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let (flush_sender, flush_receiver) = watch::channel(());
let workers = (0..config.num_workers)
.map(|id| {
@@ -166,6 +169,8 @@ impl WorkerGroup {
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
}
.start()
})
@@ -266,6 +271,7 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let (flush_sender, flush_receiver) = watch::channel(());
let workers = (0..config.num_workers)
.map(|id| {
WorkerStarter {
@@ -280,6 +286,8 @@ impl WorkerGroup {
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
flush_sender: flush_sender.clone(),
flush_receiver: flush_receiver.clone(),
}
.start()
})
@@ -346,6 +354,10 @@ struct WorkerStarter<S> {
cache_manager: CacheManagerRef,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
/// Watch channel sender to notify workers to handle stalled requests.
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -386,6 +398,9 @@ impl<S: LogStore> WorkerStarter<S> {
intermediate_manager: self.intermediate_manager,
time_provider: self.time_provider,
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&self.id.to_string()]),
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
@@ -548,6 +563,12 @@ struct RegionWorkerLoop<S> {
time_provider: TimeProviderRef,
/// Last time to check regions periodically.
last_periodical_check_millis: i64,
/// Watch channel sender to notify workers to handle stalled requests.
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
/// Gauge of stalled request count.
stalled_count: IntGauge,
}
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -568,17 +589,41 @@ impl<S: LogStore> RegionWorkerLoop<S> {
buffer.clear();
let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
match tokio::time::timeout(max_wait_time, self.receiver.recv()).await {
Ok(Some(request)) => buffer.push(request),
// The channel is disconnected.
Ok(None) => break,
Err(_) => {
let sleep = tokio::time::sleep(max_wait_time);
tokio::pin!(sleep);
tokio::select! {
request_opt = self.receiver.recv() => {
match request_opt {
Some(request) => buffer.push(request),
// The channel is disconnected.
None => break,
}
}
recv_res = self.flush_receiver.changed() => {
if recv_res.is_err() {
// The channel is disconnected.
break;
} else {
// A flush job is finished, handles stalled requests.
self.handle_stalled_requests().await;
continue;
}
}
_ = &mut sleep => {
// Timeout. Checks periodical tasks.
self.handle_periodical_tasks();
continue;
}
}
if self.flush_receiver.has_changed().unwrap_or(false) {
// Always checks whether we could process stalled requests to avoid a request
// hangs too long.
// If the channel is closed, do nothing.
self.handle_stalled_requests().await;
}
// Try to recv more requests from the channel.
for _ in 1..buffer.capacity() {
// We have received one request so we start from 1.
@@ -735,7 +780,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
impl<S> RegionWorkerLoop<S> {
// Clean up the worker.
/// Cleans up the worker.
async fn clean(&self) {
// Closes remaining regions.
let regions = self.regions.list_regions();
@@ -745,6 +790,15 @@ impl<S> RegionWorkerLoop<S> {
self.regions.clear();
}
/// Notifies the whole group that a flush job is finished so other
/// workers can handle stalled requests.
fn notify_group(&mut self) {
// Notifies all receivers.
let _ = self.flush_sender.send(());
// Marks the receiver in current worker as seen so the loop won't be waked up immediately.
self.flush_receiver.borrow_and_update();
}
}
/// Wrapper that only calls event listener in tests.

View File

@@ -191,6 +191,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: FlushFinished,
) {
// Notifies other workers. Even the remaining steps of this method fail we still
// wake up other workers as we have released some memory by flush.
self.notify_group();
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the flush task for a read only region {}",
@@ -230,9 +234,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
self.handle_stalled_requests().await;
// Schedules compaction.
if let Err(e) = self.compaction_scheduler.schedule_compaction(

View File

@@ -24,9 +24,7 @@ use store_api::metadata::RegionMetadata;
use store_api::storage::RegionId;
use crate::error::{InvalidRequestSnafu, RejectWriteSnafu, Result};
use crate::metrics::{
WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
};
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
@@ -50,13 +48,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
reject_write_requests(write_requests);
// Also reject all stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
reject_write_requests(stalled.requests);
return;
}
if self.write_buffer_manager.should_stall() && allow_stall {
WRITE_STALL_TOTAL.inc_by(write_requests.len() as u64);
self.stalled_count.add(write_requests.len() as i64);
self.stalled_requests.append(&mut write_requests);
self.listener.on_write_stall();
return;
@@ -120,6 +118,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.with_label_values(&["delete"])
.inc_by(delete_rows as u64);
}
/// Handles all stalled write requests.
pub(crate) async fn handle_stalled_requests(&mut self) {
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
self.handle_write_requests(stalled.requests, false).await;
}
}
impl<S> RegionWorkerLoop<S> {