diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 1224aac116..78bae2b461 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -515,6 +515,7 @@ async fn test_flush_workers() { } async fn test_flush_workers_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); let listener = Arc::new(FlushListener::default()); @@ -574,7 +575,7 @@ async fn test_flush_workers_with_format(flat_format: bool) { put_rows(&engine, region_id0, rows).await; // Waits until flush is finished. - while listener.success_count() < 2 { + while listener.success_count() < 3 { listener.wait().await; } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 50bbf59941..137e703e39 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; -use common_telemetry::{debug, error, info, trace}; +use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; use either::Either; use partition::expr::PartitionExpr; @@ -89,6 +89,12 @@ pub trait WriteBufferManager: Send + Sync + std::fmt::Debug { /// Returns the total memory used by memtables. fn memory_usage(&self) -> usize; + + /// Returns the mutable memtable memory limit. + /// + /// The write buffer manager should flush memtables when the mutable memory usage + /// exceeds this limit. + fn flush_limit(&self) -> usize; } pub type WriteBufferManagerRef = Arc; @@ -145,7 +151,7 @@ impl WriteBufferManagerImpl { impl WriteBufferManager for WriteBufferManagerImpl { fn should_flush_engine(&self) -> bool { let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed); - if mutable_memtable_memory_usage > self.mutable_limit { + if mutable_memtable_memory_usage >= self.mutable_limit { debug!( "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}", mutable_memtable_memory_usage, @@ -157,23 +163,8 @@ impl WriteBufferManager for WriteBufferManagerImpl { } let memory_usage = self.memory_used.load(Ordering::Relaxed); - // If the memory exceeds the buffer size, we trigger more aggressive - // flush. But if already more than half memory is being flushed, - // triggering more flush may not help. We will hold it instead. if memory_usage >= self.global_write_buffer_size { - if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 { - debug!( - "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \ - mutable_usage: {}.", - memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage - ); - return true; - } else { - trace!( - "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.", - memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage - ); - } + return true; } false @@ -205,6 +196,10 @@ impl WriteBufferManager for WriteBufferManagerImpl { fn memory_usage(&self) -> usize { self.memory_used.load(Ordering::Relaxed) } + + fn flush_limit(&self) -> usize { + self.mutable_limit + } } /// Reason of a flush task. @@ -888,6 +883,31 @@ impl FlushScheduler { self.region_status.contains_key(®ion_id) } + fn schedule_flush_task( + &mut self, + version_control: &VersionControlRef, + task: RegionFlushTask, + ) -> Result<()> { + let region_id = task.region_id; + + // If current region doesn't have flush status, we can flush the region directly. + if let Err(e) = version_control.freeze_mutable() { + error!(e; "Failed to freeze the mutable memtable for region {}", region_id); + + return Err(e); + } + // Submit a flush job. + let job = task.into_flush_job(version_control); + if let Err(e) = self.scheduler.schedule(job) { + // If scheduler returns error, senders in the job will be dropped and waiters + // can get recv errors. + error!(e; "Failed to schedule flush job for region {}", region_id); + + return Err(e); + } + Ok(()) + } + /// Schedules a flush `task` for specific `region`. pub(crate) fn schedule_flush( &mut self, @@ -910,46 +930,21 @@ impl FlushScheduler { .with_label_values(&[task.reason.as_str()]) .inc(); + // If current region has flush status, merge the task. + if let Some(flush_status) = self.region_status.get_mut(®ion_id) { + // Checks whether we can flush the region now. + debug!("Merging flush task for region {}", region_id); + flush_status.merge_task(task); + return Ok(()); + } + + self.schedule_flush_task(version_control, task)?; + // Add this region to status map. - let flush_status = self - .region_status - .entry(region_id) - .or_insert_with(|| FlushStatus::new(region_id, version_control.clone())); - // Checks whether we can flush the region now. - if flush_status.flushing { - // There is already a flush job running. - flush_status.merge_task(task); - return Ok(()); - } - - // TODO(yingwen): We can merge with pending and execute directly. - // If there are pending tasks, then we should push it to pending list. - if flush_status.pending_task.is_some() { - flush_status.merge_task(task); - return Ok(()); - } - - // Now we can flush the region directly. - if let Err(e) = version_control.freeze_mutable() { - error!(e; "Failed to freeze the mutable memtable for region {}", region_id); - - // Remove from region status if we can't freeze the mutable memtable. - self.region_status.remove(®ion_id); - return Err(e); - } - // Submit a flush job. - let job = task.into_flush_job(version_control); - if let Err(e) = self.scheduler.schedule(job) { - // If scheduler returns error, senders in the job will be dropped and waiters - // can get recv errors. - error!(e; "Failed to schedule flush job for region {}", region_id); - - // Remove from region status if we can't submit the task. - self.region_status.remove(®ion_id); - return Err(e); - } - - flush_status.flushing = true; + let _ = self.region_status.insert( + region_id, + FlushStatus::new(region_id, version_control.clone()), + ); Ok(()) } @@ -966,48 +961,56 @@ impl FlushScheduler { Vec, )> { let flush_status = self.region_status.get_mut(®ion_id)?; - - // This region doesn't have running flush job. - flush_status.flushing = false; - - let pending_requests = if flush_status.pending_task.is_none() { + // If region doesn't have any pending flush task, we need to remove it from the status. + if flush_status.pending_task.is_none() { // The region doesn't have any pending flush task. // Safety: The flush status must exist. + debug!( + "Region {} doesn't have any pending flush task, removing it from the status", + region_id + ); let flush_status = self.region_status.remove(®ion_id).unwrap(); - Some(( + return Some(( flush_status.pending_ddls, flush_status.pending_writes, flush_status.pending_bulk_writes, - )) - } else { - let version_data = flush_status.version_control.current(); - if version_data.version.memtables.is_empty() { - // The region has nothing to flush, we also need to remove it from the status. - // Safety: The pending task is not None. - let task = flush_status.pending_task.take().unwrap(); - // The region has nothing to flush. We can notify pending task. - task.on_success(); - // `schedule_next_flush()` may pick up the same region to flush, so we must remove - // it from the status to avoid leaking pending requests. - // Safety: The flush status must exist. - let flush_status = self.region_status.remove(®ion_id).unwrap(); - Some(( - flush_status.pending_ddls, - flush_status.pending_writes, - flush_status.pending_bulk_writes, - )) - } else { - // We can flush the region again, keep it in the region status. - None - } - }; - - // Schedule next flush job. - if let Err(e) = self.schedule_next_flush() { - error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id); + )); } - pending_requests + // If region has pending task, but has nothing to flush, we need to remove it from the status. + let version_data = flush_status.version_control.current(); + if version_data.version.memtables.is_empty() { + // The region has nothing to flush, we also need to remove it from the status. + // Safety: The pending task is not None. + let task = flush_status.pending_task.take().unwrap(); + // The region has nothing to flush. We can notify pending task. + task.on_success(); + debug!( + "Region {} has nothing to flush, removing it from the status", + region_id + ); + // Safety: The flush status must exist. + let flush_status = self.region_status.remove(®ion_id).unwrap(); + return Some(( + flush_status.pending_ddls, + flush_status.pending_writes, + flush_status.pending_bulk_writes, + )); + } + + // If region has pending task and has something to flush, we need to schedule it. + debug!("Scheduling pending flush task for region {}", region_id); + // Safety: The flush status must exist. + let task = flush_status.pending_task.take().unwrap(); + let version_control = flush_status.version_control.clone(); + if let Err(err) = self.schedule_flush_task(&version_control, task) { + error!( + err; + "Flush succeeded for region {region_id}, but failed to schedule next flush for it." + ); + } + // We can flush the region again, keep it in the region status. + None } /// Notifies the scheduler that the flush job is failed. @@ -1023,11 +1026,6 @@ impl FlushScheduler { // Fast fail: cancels all pending tasks and sends error to their waiters. flush_status.on_failure(err); - - // Still tries to schedule a new flush. - if let Err(e) = self.schedule_next_flush() { - error!(e; "Failed to schedule next flush after region {} flush is failed", region_id); - } } /// Notifies the scheduler that the region is dropped. @@ -1098,30 +1096,6 @@ impl FlushScheduler { .map(|status| !status.pending_ddls.is_empty()) .unwrap_or(false) } - - /// Schedules a new flush task when the scheduler can submit next task. - pub(crate) fn schedule_next_flush(&mut self) -> Result<()> { - debug_assert!( - self.region_status - .values() - .all(|status| status.flushing || status.pending_task.is_some()) - ); - - // Get the first region from status map. - let Some(flush_status) = self - .region_status - .values_mut() - .find(|status| status.pending_task.is_some()) - else { - return Ok(()); - }; - debug_assert!(!flush_status.flushing); - let task = flush_status.pending_task.take().unwrap(); - let region_id = flush_status.region_id; - let version_control = flush_status.version_control.clone(); - - self.schedule_flush(region_id, &version_control, task) - } } impl Drop for FlushScheduler { @@ -1141,11 +1115,6 @@ struct FlushStatus { region_id: RegionId, /// Version control of the region. version_control: VersionControlRef, - /// There is a flush task running. - /// - /// It is possible that a region is not flushing but has pending task if the scheduler - /// doesn't schedules this region. - flushing: bool, /// Task waiting for next flush. pending_task: Option, /// Pending ddl requests. @@ -1161,7 +1130,6 @@ impl FlushStatus { FlushStatus { region_id, version_control, - flushing: false, pending_task: None, pending_ddls: Vec::new(), pending_writes: Vec::new(), @@ -1253,10 +1221,12 @@ mod tests { // Global usage is still 1100. manager.schedule_free_mem(200); assert!(manager.should_flush_engine()); + assert!(manager.should_stall()); - // More than global limit, but mutable (1100-200-450=450) is not enough (< 500). + // More than global limit, mutable (1100-200-450=450) is less than mutable limit (< 500). manager.schedule_free_mem(450); - assert!(!manager.should_flush_engine()); + assert!(manager.should_flush_engine()); + assert!(manager.should_stall()); // Now mutable is enough. manager.reserve_mem(50); @@ -1503,4 +1473,92 @@ mod tests { assert_eq!(2, total_rows, "append_mode should preserve duplicates"); } } + + #[tokio::test] + async fn test_schedule_pending_request_on_flush_success() { + common_telemetry::init_default_ut_logging(); + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_flush_scheduler(); + let mut builder = VersionControlBuilder::new(); + // Overwrites the empty memtable builder. + builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default())); + let version_control = Arc::new(builder.build()); + // Writes data to the memtable so it is not empty. + let version_data = version_control.current(); + write_rows_to_version(&version_data.version, "host0", 0, 10); + let manifest_ctx = env + .mock_manifest_context(version_data.version.metadata.clone()) + .await; + // Creates 2 tasks. + let mut tasks: Vec<_> = (0..2) + .map(|_| RegionFlushTask { + region_id: builder.region_id(), + reason: FlushReason::Others, + senders: Vec::new(), + request_sender: tx.clone(), + access_layer: env.access_layer.clone(), + listener: WorkerListener::default(), + engine_config: Arc::new(MitoConfig::default()), + row_group_size: None, + cache_manager: Arc::new(CacheManager::default()), + manifest_ctx: manifest_ctx.clone(), + index_options: IndexOptions::default(), + flush_semaphore: Arc::new(Semaphore::new(2)), + is_staging: false, + }) + .collect(); + // Schedule first task. + let task = tasks.pop().unwrap(); + scheduler + .schedule_flush(builder.region_id(), &version_control, task) + .unwrap(); + // Should schedule 1 flush. + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + // Schedule second task. + let task = tasks.pop().unwrap(); + scheduler + .schedule_flush(builder.region_id(), &version_control, task) + .unwrap(); + assert!( + scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .pending_task + .is_some() + ); + + // Check the new version. + let version_data = version_control.current(); + assert_eq!(0, version_data.version.memtables.immutables()[0].id()); + // Assumes the flush job is finished. + version_control.apply_edit( + Some(RegionEdit { + files_to_add: Vec::new(), + files_to_remove: Vec::new(), + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + committed_sequence: None, + }), + &[0], + builder.file_purger(), + ); + write_rows_to_version(&version_data.version, "host1", 0, 10); + scheduler.on_flush_success(builder.region_id()); + assert_eq!(2, job_scheduler.num_jobs()); + // The pending task is cleared. + assert!( + scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .pending_task + .is_none() + ); + } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index bcb1db4331..2825c9f87e 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -1020,9 +1020,15 @@ pub struct MockWriteBufferManager { should_stall: AtomicBool, memory_used: AtomicUsize, memory_active: AtomicUsize, + flush_limit: usize, } impl MockWriteBufferManager { + /// Set flush limit. + pub fn set_flush_limit(&mut self, flush_limit: usize) { + self.flush_limit = flush_limit; + } + /// Set whether to flush the engine. pub fn set_should_flush(&self, value: bool) { self.should_flush.store(value, Ordering::Relaxed); @@ -1064,6 +1070,10 @@ impl WriteBufferManager for MockWriteBufferManager { fn memory_usage(&self) -> usize { self.memory_used.load(Ordering::Relaxed) } + + fn flush_limit(&self) -> usize { + self.flush_limit + } } pub fn column_metadata_to_column_schema(metadata: &ColumnMetadata) -> api::v1::ColumnSchema { diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index a8a4a3f46c..37ca5de2ba 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -22,6 +22,7 @@ use common_telemetry::info; use common_telemetry::tracing::warn; use humantime_serde::re::humantime; use snafu::{ResultExt, ensure}; +use store_api::logstore::LogStore; use store_api::metadata::{ InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, @@ -41,7 +42,7 @@ use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; use crate::sst::FormatType; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { pub(crate) async fn handle_alter_request( &mut self, region_id: RegionId, diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 8b9e750ffb..a838fd91d8 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -30,16 +30,26 @@ use crate::request::{BuildIndexRequest, FlushFailed, FlushFinished, OnFailure, O use crate::sst::index::IndexBuildType; use crate::worker::RegionWorkerLoop; -impl RegionWorkerLoop { +impl RegionWorkerLoop { /// On region flush job failed. pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) { self.flush_scheduler.on_flush_failed(region_id, request.err); + debug!( + "Flush failed for region {}, handling stalled requests", + region_id + ); + // Maybe flush worker again. + self.maybe_flush_worker(); + + // Handle stalled requests. + self.handle_stalled_requests().await; } /// Checks whether the engine reaches flush threshold. If so, finds regions in this /// worker to flush. pub(crate) fn maybe_flush_worker(&mut self) { if !self.write_buffer_manager.should_flush_engine() { + debug!("No need to flush worker"); // No need to flush worker. return; } @@ -56,9 +66,7 @@ impl RegionWorkerLoop { let regions = self.regions.list_regions(); let now = self.time_provider.current_time_millis(); let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; - let mut max_mutable_size = 0; - // Region with max mutable memtable size. - let mut max_mem_region = None; + let mut pending_regions = vec![]; for region in ®ions { if self.flush_scheduler.is_flush_requested(region.region_id) || !region.is_writable() { @@ -67,12 +75,8 @@ impl RegionWorkerLoop { } let version = region.version(); - let region_mutable_size = version.memtables.mutable_usage(); - // Tracks region with max mutable memtable size. - if region_mutable_size > max_mutable_size { - max_mem_region = Some(region); - max_mutable_size = region_mutable_size; - } + let region_memtable_size = + version.memtables.mutable_usage() + version.memtables.immutables_usage(); if region.last_flush_millis() < min_last_flush_time { // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. @@ -88,14 +92,38 @@ impl RegionWorkerLoop { ®ion.version_control, task, )?; + } else if region_memtable_size > 0 { + // We should only consider regions with memtable size > 0 to flush. + pending_regions.push((region, region_memtable_size)); } } + pending_regions.sort_unstable_by_key(|(_, size)| std::cmp::Reverse(*size)); + // The flush target is the mutable memtable limit (half of the global buffer). + // When memory is full, we aggressively flush regions until usage drops below this target, + // not just below the full limit. + let target_memory_usage = self.write_buffer_manager.flush_limit(); + let mut memory_usage = self.write_buffer_manager.memory_usage(); - // Flush memtable with max mutable memtable. - // TODO(yingwen): Maybe flush more tables to reduce write buffer size. - if let Some(region) = max_mem_region - && !self.flush_scheduler.is_flush_requested(region.region_id) + #[cfg(test)] { + debug!( + "Flushing regions on engine full, target memory usage: {}, memory usage: {}, pending regions: {:?}", + target_memory_usage, + memory_usage, + pending_regions + .iter() + .map(|(region, mem_size)| (region.region_id, mem_size)) + .collect::>() + ); + } + // Iterate over pending regions in descending order of their memory size and schedule flush tasks + // for each region until the overall memory usage drops below the flush limit. + for (region, region_mem_size) in pending_regions.into_iter() { + // Make sure the first region is always flushed. + if memory_usage < target_memory_usage { + // Stop flushing regions if memory usage is already below the flush limit + break; + } let task = self.new_flush_task( region, FlushReason::EngineFull, @@ -103,8 +131,12 @@ impl RegionWorkerLoop { self.config.clone(), region.is_staging(), ); + debug!("Scheduling flush task for region {}", region.region_id); + // Schedule a flush task for the current region self.flush_scheduler .schedule_flush(region.region_id, ®ion.version_control, task)?; + // Reduce memory usage by the region's size, ensuring it doesn't go negative + memory_usage = memory_usage.saturating_sub(region_mem_size); } Ok(()) @@ -291,6 +323,9 @@ impl RegionWorkerLoop { .await; } + // Maybe flush worker again. + self.maybe_flush_worker(); + // Handle stalled requests. self.handle_stalled_requests().await;