fix: fix write stall that never recovers due to flush logic issues (#7322)

* fix: fix write stall that never recovers due to flush logic issues

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: flush multiple regions when engine is full

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: refine fn name

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: simplify flush scheduler by removing flushing state

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-02 20:48:41 +08:00
committed by GitHub
parent 931556dbd3
commit 0177f244e9
5 changed files with 249 additions and 144 deletions

View File

@@ -515,6 +515,7 @@ async fn test_flush_workers() {
}
async fn test_flush_workers_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
@@ -574,7 +575,7 @@ async fn test_flush_workers_with_format(flat_format: bool) {
put_rows(&engine, region_id0, rows).await;
// Waits until flush is finished.
while listener.success_count() < 2 {
while listener.success_count() < 3 {
listener.wait().await;
}

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use common_telemetry::{debug, error, info, trace};
use common_telemetry::{debug, error, info};
use datatypes::arrow::datatypes::SchemaRef;
use either::Either;
use partition::expr::PartitionExpr;
@@ -89,6 +89,12 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
/// Returns the total memory used by memtables.
fn memory_usage(&self) -> usize;
/// Returns the mutable memtable memory limit.
///
/// The write buffer manager should flush memtables when the mutable memory usage
/// exceeds this limit.
fn flush_limit(&self) -> usize;
}
pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
@@ -145,7 +151,7 @@ impl WriteBufferManagerImpl {
impl WriteBufferManager for WriteBufferManagerImpl {
fn should_flush_engine(&self) -> bool {
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
if mutable_memtable_memory_usage > self.mutable_limit {
if mutable_memtable_memory_usage >= self.mutable_limit {
debug!(
"Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
mutable_memtable_memory_usage,
@@ -157,23 +163,8 @@ impl WriteBufferManager for WriteBufferManagerImpl {
}
let memory_usage = self.memory_used.load(Ordering::Relaxed);
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
if memory_usage >= self.global_write_buffer_size {
if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
debug!(
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
mutable_usage: {}.",
memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
);
return true;
} else {
trace!(
"Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
);
}
return true;
}
false
@@ -205,6 +196,10 @@ impl WriteBufferManager for WriteBufferManagerImpl {
fn memory_usage(&self) -> usize {
self.memory_used.load(Ordering::Relaxed)
}
fn flush_limit(&self) -> usize {
self.mutable_limit
}
}
/// Reason of a flush task.
@@ -888,6 +883,31 @@ impl FlushScheduler {
self.region_status.contains_key(&region_id)
}
fn schedule_flush_task(
&mut self,
version_control: &VersionControlRef,
task: RegionFlushTask,
) -> Result<()> {
let region_id = task.region_id;
// If current region doesn't have flush status, we can flush the region directly.
if let Err(e) = version_control.freeze_mutable() {
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
return Err(e);
}
// Submit a flush job.
let job = task.into_flush_job(version_control);
if let Err(e) = self.scheduler.schedule(job) {
// If scheduler returns error, senders in the job will be dropped and waiters
// can get recv errors.
error!(e; "Failed to schedule flush job for region {}", region_id);
return Err(e);
}
Ok(())
}
/// Schedules a flush `task` for specific `region`.
pub(crate) fn schedule_flush(
&mut self,
@@ -910,46 +930,21 @@ impl FlushScheduler {
.with_label_values(&[task.reason.as_str()])
.inc();
// If current region has flush status, merge the task.
if let Some(flush_status) = self.region_status.get_mut(&region_id) {
// Checks whether we can flush the region now.
debug!("Merging flush task for region {}", region_id);
flush_status.merge_task(task);
return Ok(());
}
self.schedule_flush_task(version_control, task)?;
// Add this region to status map.
let flush_status = self
.region_status
.entry(region_id)
.or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
// Checks whether we can flush the region now.
if flush_status.flushing {
// There is already a flush job running.
flush_status.merge_task(task);
return Ok(());
}
// TODO(yingwen): We can merge with pending and execute directly.
// If there are pending tasks, then we should push it to pending list.
if flush_status.pending_task.is_some() {
flush_status.merge_task(task);
return Ok(());
}
// Now we can flush the region directly.
if let Err(e) = version_control.freeze_mutable() {
error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
// Remove from region status if we can't freeze the mutable memtable.
self.region_status.remove(&region_id);
return Err(e);
}
// Submit a flush job.
let job = task.into_flush_job(version_control);
if let Err(e) = self.scheduler.schedule(job) {
// If scheduler returns error, senders in the job will be dropped and waiters
// can get recv errors.
error!(e; "Failed to schedule flush job for region {}", region_id);
// Remove from region status if we can't submit the task.
self.region_status.remove(&region_id);
return Err(e);
}
flush_status.flushing = true;
let _ = self.region_status.insert(
region_id,
FlushStatus::new(region_id, version_control.clone()),
);
Ok(())
}
@@ -966,48 +961,56 @@ impl FlushScheduler {
Vec<SenderBulkRequest>,
)> {
let flush_status = self.region_status.get_mut(&region_id)?;
// This region doesn't have running flush job.
flush_status.flushing = false;
let pending_requests = if flush_status.pending_task.is_none() {
// If region doesn't have any pending flush task, we need to remove it from the status.
if flush_status.pending_task.is_none() {
// The region doesn't have any pending flush task.
// Safety: The flush status must exist.
debug!(
"Region {} doesn't have any pending flush task, removing it from the status",
region_id
);
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((
return Some((
flush_status.pending_ddls,
flush_status.pending_writes,
flush_status.pending_bulk_writes,
))
} else {
let version_data = flush_status.version_control.current();
if version_data.version.memtables.is_empty() {
// The region has nothing to flush, we also need to remove it from the status.
// Safety: The pending task is not None.
let task = flush_status.pending_task.take().unwrap();
// The region has nothing to flush. We can notify pending task.
task.on_success();
// `schedule_next_flush()` may pick up the same region to flush, so we must remove
// it from the status to avoid leaking pending requests.
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some((
flush_status.pending_ddls,
flush_status.pending_writes,
flush_status.pending_bulk_writes,
))
} else {
// We can flush the region again, keep it in the region status.
None
}
};
// Schedule next flush job.
if let Err(e) = self.schedule_next_flush() {
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
));
}
pending_requests
// If region has pending task, but has nothing to flush, we need to remove it from the status.
let version_data = flush_status.version_control.current();
if version_data.version.memtables.is_empty() {
// The region has nothing to flush, we also need to remove it from the status.
// Safety: The pending task is not None.
let task = flush_status.pending_task.take().unwrap();
// The region has nothing to flush. We can notify pending task.
task.on_success();
debug!(
"Region {} has nothing to flush, removing it from the status",
region_id
);
// Safety: The flush status must exist.
let flush_status = self.region_status.remove(&region_id).unwrap();
return Some((
flush_status.pending_ddls,
flush_status.pending_writes,
flush_status.pending_bulk_writes,
));
}
// If region has pending task and has something to flush, we need to schedule it.
debug!("Scheduling pending flush task for region {}", region_id);
// Safety: The flush status must exist.
let task = flush_status.pending_task.take().unwrap();
let version_control = flush_status.version_control.clone();
if let Err(err) = self.schedule_flush_task(&version_control, task) {
error!(
err;
"Flush succeeded for region {region_id}, but failed to schedule next flush for it."
);
}
// We can flush the region again, keep it in the region status.
None
}
/// Notifies the scheduler that the flush job is failed.
@@ -1023,11 +1026,6 @@ impl FlushScheduler {
// Fast fail: cancels all pending tasks and sends error to their waiters.
flush_status.on_failure(err);
// Still tries to schedule a new flush.
if let Err(e) = self.schedule_next_flush() {
error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
}
}
/// Notifies the scheduler that the region is dropped.
@@ -1098,30 +1096,6 @@ impl FlushScheduler {
.map(|status| !status.pending_ddls.is_empty())
.unwrap_or(false)
}
/// Schedules a new flush task when the scheduler can submit next task.
pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
debug_assert!(
self.region_status
.values()
.all(|status| status.flushing || status.pending_task.is_some())
);
// Get the first region from status map.
let Some(flush_status) = self
.region_status
.values_mut()
.find(|status| status.pending_task.is_some())
else {
return Ok(());
};
debug_assert!(!flush_status.flushing);
let task = flush_status.pending_task.take().unwrap();
let region_id = flush_status.region_id;
let version_control = flush_status.version_control.clone();
self.schedule_flush(region_id, &version_control, task)
}
}
impl Drop for FlushScheduler {
@@ -1141,11 +1115,6 @@ struct FlushStatus {
region_id: RegionId,
/// Version control of the region.
version_control: VersionControlRef,
/// There is a flush task running.
///
/// It is possible that a region is not flushing but has pending task if the scheduler
/// doesn't schedules this region.
flushing: bool,
/// Task waiting for next flush.
pending_task: Option<RegionFlushTask>,
/// Pending ddl requests.
@@ -1161,7 +1130,6 @@ impl FlushStatus {
FlushStatus {
region_id,
version_control,
flushing: false,
pending_task: None,
pending_ddls: Vec::new(),
pending_writes: Vec::new(),
@@ -1253,10 +1221,12 @@ mod tests {
// Global usage is still 1100.
manager.schedule_free_mem(200);
assert!(manager.should_flush_engine());
assert!(manager.should_stall());
// More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
// More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500).
manager.schedule_free_mem(450);
assert!(!manager.should_flush_engine());
assert!(manager.should_flush_engine());
assert!(manager.should_stall());
// Now mutable is enough.
manager.reserve_mem(50);
@@ -1503,4 +1473,92 @@ mod tests {
assert_eq!(2, total_rows, "append_mode should preserve duplicates");
}
}
#[tokio::test]
async fn test_schedule_pending_request_on_flush_success() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_flush_scheduler();
let mut builder = VersionControlBuilder::new();
// Overwrites the empty memtable builder.
builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
let version_control = Arc::new(builder.build());
// Writes data to the memtable so it is not empty.
let version_data = version_control.current();
write_rows_to_version(&version_data.version, "host0", 0, 10);
let manifest_ctx = env
.mock_manifest_context(version_data.version.metadata.clone())
.await;
// Creates 2 tasks.
let mut tasks: Vec<_> = (0..2)
.map(|_| RegionFlushTask {
region_id: builder.region_id(),
reason: FlushReason::Others,
senders: Vec::new(),
request_sender: tx.clone(),
access_layer: env.access_layer.clone(),
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
manifest_ctx: manifest_ctx.clone(),
index_options: IndexOptions::default(),
flush_semaphore: Arc::new(Semaphore::new(2)),
is_staging: false,
})
.collect();
// Schedule first task.
let task = tasks.pop().unwrap();
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
// Should schedule 1 flush.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
// Schedule second task.
let task = tasks.pop().unwrap();
scheduler
.schedule_flush(builder.region_id(), &version_control, task)
.unwrap();
assert!(
scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.pending_task
.is_some()
);
// Check the new version.
let version_data = version_control.current();
assert_eq!(0, version_data.version.memtables.immutables()[0].id());
// Assumes the flush job is finished.
version_control.apply_edit(
Some(RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
committed_sequence: None,
}),
&[0],
builder.file_purger(),
);
write_rows_to_version(&version_data.version, "host1", 0, 10);
scheduler.on_flush_success(builder.region_id());
assert_eq!(2, job_scheduler.num_jobs());
// The pending task is cleared.
assert!(
scheduler
.region_status
.get(&builder.region_id())
.unwrap()
.pending_task
.is_none()
);
}
}

View File

@@ -1020,9 +1020,15 @@ pub struct MockWriteBufferManager {
should_stall: AtomicBool,
memory_used: AtomicUsize,
memory_active: AtomicUsize,
flush_limit: usize,
}
impl MockWriteBufferManager {
/// Set flush limit.
pub fn set_flush_limit(&mut self, flush_limit: usize) {
self.flush_limit = flush_limit;
}
/// Set whether to flush the engine.
pub fn set_should_flush(&self, value: bool) {
self.should_flush.store(value, Ordering::Relaxed);
@@ -1064,6 +1070,10 @@ impl WriteBufferManager for MockWriteBufferManager {
fn memory_usage(&self) -> usize {
self.memory_used.load(Ordering::Relaxed)
}
fn flush_limit(&self) -> usize {
self.flush_limit
}
}
pub fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema {

View File

@@ -22,6 +22,7 @@ use common_telemetry::info;
use common_telemetry::tracing::warn;
use humantime_serde::re::humantime;
use snafu::{ResultExt, ensure};
use store_api::logstore::LogStore;
use store_api::metadata::{
InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
RegionMetadataRef,
@@ -41,7 +42,7 @@ use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
use crate::sst::FormatType;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_alter_request(
&mut self,
region_id: RegionId,

View File

@@ -30,16 +30,26 @@ use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, O
use crate::sst::index::IndexBuildType;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
impl<S: LogStore> RegionWorkerLoop<S> {
/// On region flush job failed.
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
self.flush_scheduler.on_flush_failed(region_id, request.err);
debug!(
"Flush failed for region {}, handling stalled requests",
region_id
);
// Maybe flush worker again.
self.maybe_flush_worker();
// Handle stalled requests.
self.handle_stalled_requests().await;
}
/// Checks whether the engine reaches flush threshold. If so, finds regions in this
/// worker to flush.
pub(crate) fn maybe_flush_worker(&mut self) {
if !self.write_buffer_manager.should_flush_engine() {
debug!("No need to flush worker");
// No need to flush worker.
return;
}
@@ -56,9 +66,7 @@ impl<S> RegionWorkerLoop<S> {
let regions = self.regions.list_regions();
let now = self.time_provider.current_time_millis();
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
let mut max_mutable_size = 0;
// Region with max mutable memtable size.
let mut max_mem_region = None;
let mut pending_regions = vec![];
for region in &regions {
if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() {
@@ -67,12 +75,8 @@ impl<S> RegionWorkerLoop<S> {
}
let version = region.version();
let region_mutable_size = version.memtables.mutable_usage();
// Tracks region with max mutable memtable size.
if region_mutable_size > max_mutable_size {
max_mem_region = Some(region);
max_mutable_size = region_mutable_size;
}
let region_memtable_size =
version.memtables.mutable_usage() + version.memtables.immutables_usage();
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
@@ -88,14 +92,38 @@ impl<S> RegionWorkerLoop<S> {
&region.version_control,
task,
)?;
} else if region_memtable_size > 0 {
// We should only consider regions with memtable size > 0 to flush.
pending_regions.push((region, region_memtable_size));
}
}
pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size));
// The flush target is the mutable memtable limit (half of the global buffer).
// When memory is full, we aggressively flush regions until usage drops below this target,
// not just below the full limit.
let target_memory_usage = self.write_buffer_manager.flush_limit();
let mut memory_usage = self.write_buffer_manager.memory_usage();
// Flush memtable with max mutable memtable.
// TODO(yingwen): Maybe flush more tables to reduce write buffer size.
if let Some(region) = max_mem_region
&& !self.flush_scheduler.is_flush_requested(region.region_id)
#[cfg(test)]
{
debug!(
"Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}",
target_memory_usage,
memory_usage,
pending_regions
.iter()
.map(|(region, mem_size)| (region.region_id, mem_size))
.collect::<Vec<_>>()
);
}
// Iterate over pending regions in descending order of their memory size and schedule flush tasks
// for each region until the overall memory usage drops below the flush limit.
for (region, region_mem_size) in pending_regions.into_iter() {
// Make sure the first region is always flushed.
if memory_usage < target_memory_usage {
// Stop flushing regions if memory usage is already below the flush limit
break;
}
let task = self.new_flush_task(
region,
FlushReason::EngineFull,
@@ -103,8 +131,12 @@ impl<S> RegionWorkerLoop<S> {
self.config.clone(),
region.is_staging(),
);
debug!("Scheduling flush task for region {}", region.region_id);
// Schedule a flush task for the current region
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)?;
// Reduce memory usage by the region's size, ensuring it doesn't go negative
memory_usage = memory_usage.saturating_sub(region_mem_size);
}
Ok(())
@@ -291,6 +323,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
}
// Maybe flush worker again.
self.maybe_flush_worker();
// Handle stalled requests.
self.handle_stalled_requests().await;