diff --git a/config/datanode.example.toml b/config/datanode.example.toml index d8c300d45f..5a9b2a08bc 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -58,8 +58,12 @@ checkpoint_on_startup = false max_flush_tasks = 8 # Default write buffer size for a region. region_write_buffer_size = "32MB" +# Interval to check whether a region needs flush. +picker_schedule_interval = "5m" # Interval to auto flush a region if it has not flushed yet. auto_flush_interval = "1h" +# Global write buffer size for all regions. +global_write_buffer_size = "1GB" # Procedure storage options, see `standalone.example.toml`. [procedure] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 25ec6867c0..edefbc7296 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -123,8 +123,12 @@ checkpoint_on_startup = false max_flush_tasks = 8 # Default write buffer size for a region. region_write_buffer_size = "32MB" +# Interval to check whether a region needs flush. +picker_schedule_interval = "5m" # Interval to auto flush a region if it has not flushed yet. auto_flush_interval = "1h" +# Global write buffer size for all regions. +global_write_buffer_size = "1GB" # Procedure storage options. [procedure] diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index b9d706c950..2242fea494 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -223,6 +223,8 @@ pub struct FlushConfig { /// Interval to auto flush a region if it has not flushed yet. #[serde(with = "humantime_serde")] pub auto_flush_interval: Duration, + /// Global write buffer size for all regions. + pub global_write_buffer_size: Option, } impl Default for FlushConfig { @@ -234,6 +236,7 @@ impl Default for FlushConfig { DEFAULT_PICKER_SCHEDULE_INTERVAL.into(), ), auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), + global_write_buffer_size: None, } } } @@ -260,6 +263,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig { region_write_buffer_size: value.storage.flush.region_write_buffer_size, picker_schedule_interval: value.storage.flush.picker_schedule_interval, auto_flush_interval: value.storage.flush.auto_flush_interval, + global_write_buffer_size: value.storage.flush.global_write_buffer_size, } } } diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index a983dda41c..324d45b0b6 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -44,6 +44,8 @@ pub struct EngineConfig { pub picker_schedule_interval: Duration, /// Interval to auto flush a region if it has not flushed yet. pub auto_flush_interval: Duration, + /// Limit for global write buffer size. Disabled by default. + pub global_write_buffer_size: Option, } impl Default for EngineConfig { @@ -62,6 +64,7 @@ impl Default for EngineConfig { DEFAULT_PICKER_SCHEDULE_INTERVAL.into(), ), auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), + global_write_buffer_size: None, } } } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index f57859b68b..68c222b7b6 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -27,7 +27,7 @@ use store_api::storage::{ }; use crate::compaction::CompactionSchedulerRef; -use crate::config::EngineConfig; +use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE}; use crate::error::{self, Error, Result}; use crate::file_purger::{FilePurgeHandler, FilePurgerRef}; use crate::flush::{ @@ -288,7 +288,7 @@ impl RegionMap { } /// Clear the region map. - fn clear(&self) { + pub(crate) fn clear(&self) { self.0.write().unwrap().clear(); } } @@ -337,15 +337,25 @@ impl EngineInner { }, FilePurgeHandler, )); + let flush_strategy = Arc::new(SizeBasedStrategy::new( + config + .global_write_buffer_size + .map(|size| size.as_bytes() as usize), + )); + let memtable_builder = if config.global_write_buffer_size.is_some() { + // If global write buffer size is provided, we set the flush strategy + // to the memtable to track global memtable usage. + DefaultMemtableBuilder::with_flush_strategy(Some(flush_strategy.clone())) + } else { + DefaultMemtableBuilder::default() + }; Ok(Self { object_store, log_store, regions, - memtable_builder: Arc::new(DefaultMemtableBuilder::default()), + memtable_builder: Arc::new(memtable_builder), flush_scheduler, - flush_strategy: Arc::new(SizeBasedStrategy::new( - config.region_write_buffer_size.as_bytes() as usize, - )), + flush_strategy, compaction_scheduler, file_purger, config: Arc::new(config), @@ -461,10 +471,7 @@ impl EngineInner { config.manifest_gc_duration, ); manifest.start().await?; - - let flush_strategy = write_buffer_size - .map(|size| Arc::new(SizeBasedStrategy::new(size)) as Arc<_>) - .unwrap_or_else(|| self.flush_strategy.clone()); + let flush_strategy = self.flush_strategy.clone(); Ok(StoreConfig { log_store: self.log_store.clone(), @@ -478,6 +485,8 @@ impl EngineInner { file_purger: self.file_purger.clone(), ttl, compaction_time_window, + write_buffer_size: write_buffer_size + .unwrap_or(DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize), }) } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 20feecdcaa..6501537c62 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -15,16 +15,19 @@ mod picker; mod scheduler; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{logging, timer}; pub use picker::{FlushPicker, PickerConfig}; -pub use scheduler::{FlushHandle, FlushRequest, FlushScheduler, FlushSchedulerRef}; +pub use scheduler::{ + FlushHandle, FlushRegionRequest, FlushRequest, FlushScheduler, FlushSchedulerRef, +}; use store_api::logstore::LogStore; use store_api::storage::consts::WRITE_ROW_GROUP_SIZE; -use store_api::storage::SequenceNumber; +use store_api::storage::{RegionId, SequenceNumber}; -use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE}; +use crate::config::EngineConfig; use crate::error::Result; use crate::manifest::action::*; use crate::manifest::region::RegionManifest; @@ -34,95 +37,177 @@ use crate::region::{RegionWriterRef, SharedDataRef}; use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions}; use crate::wal::Wal; +/// Current flush-related status of a region. +#[derive(Debug, Clone, Copy)] +pub struct RegionStatus { + /// Id of the region this status belongs to. + pub region_id: RegionId, + /// Size of the mutable memtable. + pub bytes_mutable: usize, + /// Write buffer size of the region. + pub write_buffer_size: usize, +} + +/// Type of flush request to send. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlushType { + /// Flush current region. + Region, + /// Engine level flush. Find regions to flush globally. + Engine, +} + +/// Strategy to control whether to flush a region before writing to the region. pub trait FlushStrategy: Send + Sync + std::fmt::Debug { - fn should_flush( - &self, - shared: &SharedDataRef, - bytes_mutable: usize, - bytes_total: usize, - ) -> bool; + /// Returns whether to trigger a flush operation. + fn should_flush(&self, status: RegionStatus) -> Option; + + /// Reserves `mem` bytes. + fn reserve_mem(&self, mem: usize); + + /// Tells the strategy we are freeing `mem` bytes. + /// + /// We are in the process of freeing `mem` bytes, so it is not considered + /// when checking the soft limit. + fn schedule_free_mem(&self, mem: usize); + + /// We have freed `mem` bytes. + fn free_mem(&self, mem: usize); } pub type FlushStrategyRef = Arc; +/// Flush strategy based on memory usage. #[derive(Debug)] pub struct SizeBasedStrategy { - /// Write buffer size of memtable. - max_write_buffer_size: usize, - /// Mutable memtable memory size limitation + /// Write buffer size for all memtables. + global_write_buffer_size: Option, + /// Mutable memtable memory size limitation, only valid when `global_write_buffer_size` + /// is `Some`. mutable_limitation: usize, + /// Memory in used (e.g. used by mutable and immutable memtables). + memory_used: AtomicUsize, + /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables). + memory_active: AtomicUsize, } impl SizeBasedStrategy { - pub fn new(max_write_buffer_size: usize) -> Self { + /// Returns a new [SizeBasedStrategy] with specific `global_write_buffer_size`. + pub fn new(global_write_buffer_size: Option) -> Self { Self { - max_write_buffer_size, - mutable_limitation: get_mutable_limitation(max_write_buffer_size), + global_write_buffer_size, + mutable_limitation: get_mutable_limitation(global_write_buffer_size), + memory_used: AtomicUsize::new(0), + memory_active: AtomicUsize::new(0), } } + + /// Returns whether to trigger an engine level flush. + /// + /// Inspired by RocksDB's WriteBufferManager. + /// https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94 + fn should_flush_engine(&self) -> bool { + // We only check global limit when it is Some. + let Some(global_write_buffer_size) = self.global_write_buffer_size else { + return false; + }; + + let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed); + if mutable_memtable_memory_usage > self.mutable_limitation { + logging::info!( + "Engine should flush (over mutable limit), mutable_usage: {}, mutable_limitation: {}.", + mutable_memtable_memory_usage, + self.mutable_limitation, + ); + return true; + } + + let memory_usage = self.memory_used.load(Ordering::Relaxed); + // If the memory exceeds the buffer size, we trigger more aggressive + // flush. But if already more than half memory is being flushed, + // triggering more flush may not help. We will hold it instead. + if memory_usage >= global_write_buffer_size + && mutable_memtable_memory_usage >= global_write_buffer_size / 2 + { + logging::info!( + "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \ + mutable_usage: {}.", + memory_usage, + global_write_buffer_size, + mutable_memtable_memory_usage, + ); + return true; + } + + false + } + + /// Returns true if the global memory limitation is enabled. + #[inline] + fn is_global_limit_enabled(&self) -> bool { + self.global_write_buffer_size.is_some() + } } #[inline] -fn get_mutable_limitation(max_write_buffer_size: usize) -> usize { - // Inspired by RocksDB +fn get_mutable_limitation(global_write_buffer_size: Option) -> usize { + // Inspired by RocksDB. // https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L86 - max_write_buffer_size * 7 / 8 + global_write_buffer_size + .map(|size| size * 7 / 8) + .unwrap_or(0) } impl Default for SizeBasedStrategy { fn default() -> Self { - let max_write_buffer_size = DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize; Self { - max_write_buffer_size, - mutable_limitation: get_mutable_limitation(max_write_buffer_size), + global_write_buffer_size: None, + mutable_limitation: 0, + memory_used: AtomicUsize::new(0), + memory_active: AtomicUsize::new(0), } } } impl FlushStrategy for SizeBasedStrategy { - fn should_flush( - &self, - shared: &SharedDataRef, - bytes_mutable: usize, - bytes_total: usize, - ) -> bool { - // Insipired by RocksDB flush strategy - // https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94 - - if bytes_mutable > self.mutable_limitation { - logging::info!( - "Region should flush, region: {}, bytes_mutable: {}, mutable_limitation: {}, \ - bytes_total: {}, max_write_buffer_size: {} .", - shared.name(), - bytes_mutable, - self.mutable_limitation, - bytes_total, - self.max_write_buffer_size + fn should_flush(&self, status: RegionStatus) -> Option { + if status.bytes_mutable >= status.write_buffer_size { + // If the mutable memtable is full, we should freeze it and flush it. + logging::debug!( + "Region should flush as mutable memtable is full, region: {}, bytes_mutable: {}, \ + write_buffer_size: {}.", + status.region_id, + status.bytes_mutable, + status.write_buffer_size, ); - return true; + return Some(FlushType::Region); } - let buffer_size = self.max_write_buffer_size; - - // If the memory exceeds the buffer size, we trigger more aggressive - // flush. But if already more than half memory is being flushed, - // triggering more flush may not help. We will hold it instead. - let should_flush = bytes_total >= buffer_size && bytes_mutable >= buffer_size / 2; - - if should_flush { - logging::info!( - "Region should flush, region: {}, bytes_mutable: {}, mutable_limitation: {}, \ - bytes_total: {}, max_write_buffer_size: {} .", - shared.name(), - bytes_mutable, - self.mutable_limitation, - bytes_total, - buffer_size - ); + if self.should_flush_engine() { + return Some(FlushType::Engine); } - should_flush + None + } + + fn reserve_mem(&self, mem: usize) { + if self.is_global_limit_enabled() { + self.memory_used.fetch_add(mem, Ordering::Relaxed); + self.memory_active.fetch_add(mem, Ordering::Relaxed); + } + } + + fn schedule_free_mem(&self, mem: usize) { + if self.is_global_limit_enabled() { + self.memory_active.fetch_sub(mem, Ordering::Relaxed); + } + } + + fn free_mem(&self, mem: usize) { + if self.is_global_limit_enabled() { + self.memory_used.fetch_sub(mem, Ordering::Relaxed); + } } } @@ -236,11 +321,164 @@ impl FlushJob { #[cfg(test)] mod tests { use super::*; + use crate::memtable::AllocTracker; #[test] fn test_get_mutable_limitation() { - assert_eq!(7, get_mutable_limitation(8)); - assert_eq!(8, get_mutable_limitation(10)); - assert_eq!(56, get_mutable_limitation(64)); + assert_eq!(7, get_mutable_limitation(Some(8))); + assert_eq!(8, get_mutable_limitation(Some(10))); + assert_eq!(56, get_mutable_limitation(Some(64))); + assert_eq!(0, get_mutable_limitation(None)); + } + + #[test] + fn test_strategy_global_disabled() { + let strategy = SizeBasedStrategy::new(None); + strategy.reserve_mem(1000); + assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed)); + strategy.schedule_free_mem(1000); + assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed)); + strategy.free_mem(1000); + assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed)); + + let status = RegionStatus { + region_id: 1, + bytes_mutable: 400, + write_buffer_size: 300, + }; + assert_eq!(Some(FlushType::Region), strategy.should_flush(status)); + let status = RegionStatus { + region_id: 1, + bytes_mutable: 100, + write_buffer_size: 300, + }; + assert_eq!(None, strategy.should_flush(status)); + } + + #[test] + fn test_strategy_over_mutable_limit() { + let strategy = SizeBasedStrategy::new(Some(1000)); + strategy.reserve_mem(500); + let status = RegionStatus { + region_id: 1, + bytes_mutable: 300, + write_buffer_size: 500, + }; + assert_eq!(None, strategy.should_flush(status)); + strategy.reserve_mem(400); + + // Flush region. + let status = RegionStatus { + region_id: 1, + bytes_mutable: 400, + write_buffer_size: 300, + }; + assert_eq!(Some(FlushType::Region), strategy.should_flush(status)); + + // More than mutable limitation, Flush global. + let status = RegionStatus { + region_id: 1, + bytes_mutable: 100, + write_buffer_size: 300, + }; + assert_eq!(Some(FlushType::Engine), strategy.should_flush(status)); + + strategy.schedule_free_mem(500); + assert_eq!(None, strategy.should_flush(status)); + assert_eq!(900, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(400, strategy.memory_active.load(Ordering::Relaxed)); + + strategy.free_mem(500); + assert_eq!(400, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(400, strategy.memory_active.load(Ordering::Relaxed)); + } + + #[test] + fn test_strategy_over_global() { + common_telemetry::init_default_ut_logging(); + + let strategy = SizeBasedStrategy::new(Some(1000)); + strategy.reserve_mem(1100); + strategy.schedule_free_mem(200); + // More than global limit. + let status = RegionStatus { + region_id: 1, + bytes_mutable: 100, + write_buffer_size: 300, + }; + assert_eq!(Some(FlushType::Engine), strategy.should_flush(status)); + + // More than global limit, but mutable not enough (< 500). + strategy.schedule_free_mem(450); + let status = RegionStatus { + region_id: 1, + bytes_mutable: 100, + write_buffer_size: 300, + }; + assert_eq!(None, strategy.should_flush(status)); + strategy.schedule_free_mem(100); + assert_eq!(None, strategy.should_flush(status)); + + // Now mutable is enough. + strategy.reserve_mem(150); + // We can flush again. + assert_eq!(Some(FlushType::Engine), strategy.should_flush(status)); + strategy.reserve_mem(100); + assert_eq!(Some(FlushType::Engine), strategy.should_flush(status)); + } + + #[test] + fn test_alloc_tracker_without_strategy() { + let tracker = AllocTracker::new(None); + assert_eq!(0, tracker.bytes_allocated()); + tracker.on_allocate(100); + assert_eq!(100, tracker.bytes_allocated()); + tracker.on_allocate(200); + assert_eq!(300, tracker.bytes_allocated()); + + tracker.done_allocating(); + assert_eq!(300, tracker.bytes_allocated()); + } + + #[test] + fn test_alloc_tracker_with_strategy() { + let strategy = Arc::new(SizeBasedStrategy::new(Some(1000))); + { + let tracker = AllocTracker::new(Some(strategy.clone() as FlushStrategyRef)); + + tracker.on_allocate(100); + assert_eq!(100, tracker.bytes_allocated()); + assert_eq!(100, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(100, strategy.memory_active.load(Ordering::Relaxed)); + + for _ in 0..2 { + // Done allocating won't free the same memory multiple times. + tracker.done_allocating(); + assert_eq!(100, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed)); + } + } + + assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed)); + } + + #[test] + fn test_alloc_tracker_without_done_allocating() { + let strategy = Arc::new(SizeBasedStrategy::new(Some(1000))); + { + let tracker = AllocTracker::new(Some(strategy.clone() as FlushStrategyRef)); + + tracker.on_allocate(100); + assert_eq!(100, tracker.bytes_allocated()); + assert_eq!(100, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(100, strategy.memory_active.load(Ordering::Relaxed)); + } + + assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed)); + assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed)); } } diff --git a/src/storage/src/flush/picker.rs b/src/storage/src/flush/picker.rs index 7c0726f403..9e3e782dc4 100644 --- a/src/storage/src/flush/picker.rs +++ b/src/storage/src/flush/picker.rs @@ -52,6 +52,7 @@ impl Default for PickerConfig { } /// Flush task picker. +#[derive(Debug, Clone)] pub struct FlushPicker { /// Interval to flush a region automatically. auto_flush_interval_millis: i64, @@ -65,7 +66,7 @@ impl FlushPicker { } } - /// Pick regions and flush them by interval. + /// Picks regions and flushes them by interval. /// /// Returns the number of flushed regions. pub async fn pick_by_interval(&self, regions: &[T]) -> usize { @@ -77,6 +78,25 @@ impl FlushPicker { 0 } } + + /// Picks and flushes regions when the write buffer is full. + pub async fn pick_by_write_buffer_full(&self, regions: &[T]) { + // In such case, we pick the oldest region to flush. If this is not enough, + // the next time the region writer will trigger the picker again. Then we + // can pick another region to flush. The total memory will go down eventually. + let target = regions + .iter() + .filter(|region| region.mutable_memtable_usage() > 0) + .min_by_key(|region| region.last_flush_time()); + if let Some(region) = target { + logging::debug!( + "Request flush for region {} due to global buffer is full", + region.item_id() + ); + + region.request_flush(FlushReason::GlobalBufferFull).await; + } + } } /// Item for picker to flush. @@ -88,6 +108,9 @@ pub trait FlushItem { /// Last flush time in millis. fn last_flush_time(&self) -> i64; + /// Mutable memtable usage. + fn mutable_memtable_usage(&self) -> usize; + /// Requests the item to schedule a flush for specific `reason`. /// /// The flush job itself should run in background. @@ -104,11 +127,18 @@ impl FlushItem for RegionImpl { self.last_flush_millis() } + fn mutable_memtable_usage(&self) -> usize { + let current = self.version_control().current(); + let memtables = current.memtables(); + memtables.mutable_bytes_allocated() + } + async fn request_flush(&self, reason: FlushReason) { let ctx = FlushContext { wait: false, reason, }; + if let Err(e) = self.flush(&ctx).await { logging::error!(e; "Failed to flush region {}", self.id()); } @@ -149,14 +179,16 @@ mod tests { struct MockItem { id: u64, last_flush_time: i64, + usage: usize, flush_reason: Mutex>, } impl MockItem { - fn new(id: u64, last_flush_time: i64) -> MockItem { + fn new(id: u64, last_flush_time: i64, usage: usize) -> MockItem { MockItem { id, last_flush_time, + usage, flush_reason: Mutex::new(None), } } @@ -176,6 +208,10 @@ mod tests { self.last_flush_time } + fn mutable_memtable_usage(&self) -> usize { + self.usage + } + async fn request_flush(&self, reason: FlushReason) { let mut flush_reason = self.flush_reason.lock().unwrap(); *flush_reason = Some(reason); @@ -185,8 +221,8 @@ mod tests { #[tokio::test] async fn test_pick_by_interval() { let regions = [ - MockItem::new(0, util::current_time_millis()), - MockItem::new(1, util::current_time_millis() - 60 * 1000), + MockItem::new(0, util::current_time_millis(), 1), + MockItem::new(1, util::current_time_millis() - 60 * 1000, 1), ]; let picker = FlushPicker::new(PickerConfig { // schedule_interval is unused in this test. @@ -198,4 +234,29 @@ mod tests { assert!(regions[0].flush_reason().is_none()); assert_eq!(Some(FlushReason::Periodically), regions[1].flush_reason()); } + + #[tokio::test] + async fn test_pick_by_buffer_full() { + let regions = [ + MockItem::new(0, util::current_time_millis(), 10), + MockItem::new(1, util::current_time_millis() - 60 * 1000, 0), + MockItem::new(1, util::current_time_millis() - 60 * 1000, 10), + ]; + let picker = FlushPicker::new(PickerConfig { + schedule_interval: Duration::from_millis(10), + auto_flush_interval: Duration::from_millis(30 * 1000), + }); + picker.pick_by_write_buffer_full(®ions).await; + assert!(regions[0].flush_reason().is_none()); + assert!(regions[1].flush_reason().is_none()); + assert_eq!( + Some(FlushReason::GlobalBufferFull), + regions[2].flush_reason() + ); + + // No target. + let regions = [MockItem::new(1, util::current_time_millis(), 0)]; + picker.pick_by_write_buffer_full(®ions).await; + assert!(regions[0].flush_reason().is_none()); + } } diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs index cfe08cc4e2..fc3c219390 100644 --- a/src/storage/src/flush/scheduler.rs +++ b/src/storage/src/flush/scheduler.rs @@ -42,12 +42,48 @@ use crate::scheduler::{Handler, LocalScheduler, Request, Scheduler, SchedulerCon use crate::sst::AccessLayerRef; use crate::wal::Wal; -/// Key for [FlushRequest], consist of a region id and the flush -/// sequence. -type FlushKey = (RegionId, SequenceNumber); +/// Key for [FlushRequest]. +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum FlushKey { + Engine, + Region(RegionId, SequenceNumber), +} + +/// Flush request. +pub enum FlushRequest { + /// Flush the engine. + Engine, + /// Flush a region. + Region { + /// Region flush request. + req: FlushRegionRequest, + /// Flush result sender. + sender: Sender>, + }, +} + +impl Request for FlushRequest { + type Key = FlushKey; + + #[inline] + fn key(&self) -> FlushKey { + match &self { + FlushRequest::Engine => FlushKey::Engine, + FlushRequest::Region { req, .. } => { + FlushKey::Region(req.shared.id(), req.flush_sequence) + } + } + } + + fn complete(self, result: Result<()>) { + if let FlushRequest::Region { sender, .. } = self { + let _ = sender.send(result); + } + } +} /// Region flush request. -pub struct FlushRequest { +pub struct FlushRegionRequest { /// Max memtable id in these memtables, /// used to remove immutable memtables in current version. pub max_memtable_id: MemtableId, @@ -67,8 +103,6 @@ pub struct FlushRequest { pub manifest: RegionManifest, /// Storage engine config pub engine_config: Arc, - /// Flush result sender. Callers should set the sender to None. - pub sender: Option>>, // Compaction related options: /// TTL of the region. @@ -77,30 +111,15 @@ pub struct FlushRequest { pub compaction_time_window: Option, } -impl FlushRequest { +impl FlushRegionRequest { #[inline] fn region_id(&self) -> RegionId { self.shared.id() } } -impl Request for FlushRequest { - type Key = FlushKey; - - #[inline] - fn key(&self) -> FlushKey { - (self.shared.id(), self.flush_sequence) - } - - fn complete(self, result: Result<()>) { - if let Some(sender) = self.sender { - let _ = sender.send(result); - } - } -} - -impl From<&FlushRequest> for FlushJob { - fn from(req: &FlushRequest) -> FlushJob { +impl From<&FlushRegionRequest> for FlushJob { + fn from(req: &FlushRegionRequest) -> FlushJob { FlushJob { max_memtable_id: req.max_memtable_id, memtables: req.memtables.clone(), @@ -115,8 +134,8 @@ impl From<&FlushRequest> for FlushJob { } } -impl From<&FlushRequest> for CompactionRequestImpl { - fn from(req: &FlushRequest) -> CompactionRequestImpl { +impl From<&FlushRegionRequest> for CompactionRequestImpl { + fn from(req: &FlushRegionRequest) -> CompactionRequestImpl { CompactionRequestImpl { region_id: req.region_id(), sst_layer: req.sst_layer.clone(), @@ -166,16 +185,23 @@ impl FlushScheduler { regions: Arc>, picker_config: PickerConfig, ) -> Result { - let handler = FlushHandler { - compaction_scheduler, - }; let task_interval = picker_config.schedule_interval; let picker = FlushPicker::new(picker_config); - let task_fn = AutoFlushFunction { regions, picker }; + // Now we just clone the picker since we don't need to share states and + // the clone of picker is cheap. + let task_fn = AutoFlushFunction { + regions: regions.clone(), + picker: picker.clone(), + }; let auto_flush_task = RepeatedTask::new(task_interval, Box::new(task_fn)); auto_flush_task .start(common_runtime::bg_runtime()) .context(StartPickTaskSnafu)?; + let handler = FlushHandler { + compaction_scheduler, + regions, + picker, + }; Ok(Self { scheduler: LocalScheduler::new(config, handler), @@ -183,19 +209,15 @@ impl FlushScheduler { }) } - /// Schedules a flush request and return the handle to the flush task. - /// - /// # Panics - /// Panics if `sender` of the `req` is not `None`. - pub fn schedule_flush(&self, mut req: FlushRequest) -> Result { - assert!(req.sender.is_none()); - + /// Schedules a region flush request and return the handle to the flush task. + pub fn schedule_region_flush(&self, req: FlushRegionRequest) -> Result { let region_id = req.region_id(); let sequence = req.flush_sequence; let (sender, receiver) = oneshot::channel(); - req.sender = Some(sender); - let scheduled = self.scheduler.schedule(req)?; + let scheduled = self + .scheduler + .schedule(FlushRequest::Region { req, sender })?; // Normally we should not have duplicate flush request. ensure!( scheduled, @@ -211,6 +233,12 @@ impl FlushScheduler { }) } + /// Schedules a engine flush request. + pub fn schedule_engine_flush(&self) -> Result<()> { + self.scheduler.schedule(FlushRequest::Engine)?; + Ok(()) + } + /// Stop the scheduler. pub async fn stop(&self) -> Result<()> { self.auto_flush_task @@ -225,6 +253,8 @@ impl FlushScheduler { struct FlushHandler { compaction_scheduler: CompactionSchedulerRef, + regions: Arc>, + picker: FlushPicker, } #[async_trait::async_trait] @@ -238,8 +268,18 @@ impl Handler for FlushHandler { finish_notifier: Arc, ) -> Result<()> { let compaction_scheduler = self.compaction_scheduler.clone(); + let region_map = self.regions.clone(); + let picker = self.picker.clone(); common_runtime::spawn_bg(async move { - execute_flush(req, compaction_scheduler).await; + match req { + FlushRequest::Engine => { + let regions = region_map.list_regions(); + picker.pick_by_write_buffer_full(®ions).await; + } + FlushRequest::Region { req, sender } => { + execute_flush_region(req, sender, compaction_scheduler).await; + } + } // releases rate limit token token.try_release(); @@ -251,8 +291,9 @@ impl Handler for FlushHandler { } } -async fn execute_flush( - req: FlushRequest, +async fn execute_flush_region( + req: FlushRegionRequest, + sender: Sender>, compaction_scheduler: CompactionSchedulerRef, ) { let mut flush_job = FlushJob::from(&req); @@ -262,7 +303,7 @@ async fn execute_flush( increment_counter!(FLUSH_ERRORS_TOTAL); - req.complete(Err(e)); + FlushRequest::Region { req, sender }.complete(Err(e)); } else { logging::debug!("Successfully flush region: {}", req.region_id()); @@ -282,7 +323,7 @@ async fn execute_flush( ); // Complete the request. - req.complete(Ok(())); + FlushRequest::Region { req, sender }.complete(Ok(())); } } diff --git a/src/storage/src/memtable.rs b/src/storage/src/memtable.rs index 8f333aa6f7..71f9e59e0c 100644 --- a/src/storage/src/memtable.rs +++ b/src/storage/src/memtable.rs @@ -18,17 +18,21 @@ mod inserter; pub mod tests; mod version; -use std::sync::atomic::{AtomicI64, AtomicU32, AtomicUsize, Ordering}; +use std::fmt; +use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; use common_time::range::TimestampRange; use datatypes::vectors::VectorRef; +use metrics::{decrement_gauge, increment_gauge}; use store_api::storage::{consts, OpType, SequenceNumber}; use crate::error::Result; +use crate::flush::FlushStrategyRef; use crate::memtable::btree::BTreeMemtable; pub use crate::memtable::inserter::Inserter; pub use crate::memtable::version::MemtableVersion; +use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; use crate::schema::{ProjectedSchemaRef, RegionSchemaRef}; @@ -40,32 +44,21 @@ pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. Result /// of this method may be larger than the estimated based on [`num_rows`] because /// of the implementor's pre-alloc behavior. - estimated_bytes: AtomicUsize, + estimated_bytes: usize, /// The max timestamp that this memtable contains. - max_timestamp: AtomicI64, + pub max_timestamp: i64, /// The min timestamp that this memtable contains. - min_timestamp: AtomicI64, + pub min_timestamp: i64, } impl MemtableStats { pub fn bytes_allocated(&self) -> usize { self.estimated_bytes - .load(std::sync::atomic::Ordering::Relaxed) - } -} - -impl Default for MemtableStats { - fn default() -> Self { - Self { - estimated_bytes: AtomicUsize::default(), - max_timestamp: AtomicI64::new(i64::MIN), - min_timestamp: AtomicI64::new(i64::MAX), - } } } /// In memory storage. -pub trait Memtable: Send + Sync + std::fmt::Debug { +pub trait Memtable: Send + Sync + fmt::Debug { /// Returns id of this memtable. fn id(&self) -> MemtableId; @@ -87,7 +80,12 @@ pub trait Memtable: Send + Sync + std::fmt::Debug { fn num_rows(&self) -> usize; /// Returns stats of this memtable. - fn stats(&self) -> &MemtableStats; + fn stats(&self) -> MemtableStats; + + /// Mark the memtable is immutable. + /// + /// The region MUST call this inside the region writer's write lock. + fn mark_immutable(&self); } pub type MemtableRef = Arc; @@ -153,7 +151,7 @@ pub trait BatchIterator: Iterator> + Send + Sync { pub type BoxedBatchIterator = Box; -pub trait MemtableBuilder: Send + Sync + std::fmt::Debug { +pub trait MemtableBuilder: Send + Sync + fmt::Debug { fn build(&self, schema: RegionSchemaRef) -> MemtableRef; } @@ -200,14 +198,104 @@ impl KeyValues { } } +/// Memtable memory allocation tracker. +pub struct AllocTracker { + flush_strategy: Option, + /// Bytes allocated by the tracker. + bytes_allocated: AtomicUsize, + /// Whether allocating is done. + is_done_allocating: AtomicBool, +} + +impl fmt::Debug for AllocTracker { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("AllocTracker") + .field("bytes_allocated", &self.bytes_allocated) + .field("is_done_allocating", &self.is_done_allocating) + .finish() + } +} + +impl AllocTracker { + /// Returns a new [AllocTracker]. + pub fn new(flush_strategy: Option) -> AllocTracker { + AllocTracker { + flush_strategy, + bytes_allocated: AtomicUsize::new(0), + is_done_allocating: AtomicBool::new(false), + } + } + + /// Tracks `bytes` memory is allocated. + pub(crate) fn on_allocate(&self, bytes: usize) { + self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed); + increment_gauge!(WRITE_BUFFER_BYTES, bytes as f64); + if let Some(flush_strategy) = &self.flush_strategy { + flush_strategy.reserve_mem(bytes); + } + } + + /// Marks we have finished allocating memory so we can free it from + /// the write buffer's limit. + /// + /// The region MUST ensure that it calls this method inside the region writer's write lock. + pub(crate) fn done_allocating(&self) { + if let Some(flush_strategy) = &self.flush_strategy { + if self + .is_done_allocating + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + flush_strategy.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed)); + } + } + } + + /// Returns bytes allocated. + pub(crate) fn bytes_allocated(&self) -> usize { + self.bytes_allocated.load(Ordering::Relaxed) + } +} + +impl Drop for AllocTracker { + fn drop(&mut self) { + if !self.is_done_allocating.load(Ordering::Relaxed) { + self.done_allocating(); + } + + let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed); + decrement_gauge!(WRITE_BUFFER_BYTES, bytes_allocated as f64); + + // Memory tracked by this tracker is freed. + if let Some(flush_strategy) = &self.flush_strategy { + flush_strategy.free_mem(bytes_allocated); + } + } +} + +/// Default memtable builder that builds [BTreeMemtable]. #[derive(Debug, Default)] pub struct DefaultMemtableBuilder { memtable_id: AtomicU32, + flush_strategy: Option, +} + +impl DefaultMemtableBuilder { + /// Returns a new [DefaultMemtableBuilder] with specific `flush_strategy`. + /// + /// If `flush_strategy` is `Some`, the memtable will report its memory usage + /// to the `flush_strategy`. + pub fn with_flush_strategy(flush_strategy: Option) -> Self { + Self { + memtable_id: AtomicU32::new(0), + flush_strategy, + } + } } impl MemtableBuilder for DefaultMemtableBuilder { fn build(&self, schema: RegionSchemaRef) -> MemtableRef { let id = self.memtable_id.fetch_add(1, Ordering::Relaxed); - Arc::new(BTreeMemtable::new(id, schema)) + Arc::new(BTreeMemtable::new(id, schema, self.flush_strategy.clone())) } } diff --git a/src/storage/src/memtable/btree.rs b/src/storage/src/memtable/btree.rs index 560fd8d748..c0a0852a32 100644 --- a/src/storage/src/memtable/btree.rs +++ b/src/storage/src/memtable/btree.rs @@ -16,7 +16,7 @@ use std::cmp::Ordering; use std::collections::{btree_map, BTreeMap}; use std::fmt; use std::ops::Bound; -use std::sync::atomic::Ordering as AtomicOrdering; +use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, RwLock}; use common_time::range::TimestampRange; @@ -27,9 +27,10 @@ use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8Ve use store_api::storage::{OpType, SequenceNumber}; use crate::error::Result; +use crate::flush::FlushStrategyRef; use crate::memtable::{ - BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, MemtableStats, - RowOrdering, + AllocTracker, BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, + MemtableStats, RowOrdering, }; use crate::read::Batch; use crate::schema::compat::ReadAdapter; @@ -44,45 +45,51 @@ pub struct BTreeMemtable { id: MemtableId, schema: RegionSchemaRef, map: Arc, - stats: MemtableStats, + alloc_tracker: AllocTracker, + max_timestamp: AtomicI64, + min_timestamp: AtomicI64, } impl BTreeMemtable { - pub fn new(id: MemtableId, schema: RegionSchemaRef) -> BTreeMemtable { + pub fn new( + id: MemtableId, + schema: RegionSchemaRef, + flush_strategy: Option, + ) -> BTreeMemtable { BTreeMemtable { id, schema, map: Arc::new(RwLock::new(BTreeMap::new())), - stats: Default::default(), + alloc_tracker: AllocTracker::new(flush_strategy), + max_timestamp: AtomicI64::new(i64::MIN), + min_timestamp: AtomicI64::new(i64::MAX), } } /// Updates memtable stats. /// This function is guarded by `BTreeMemtable::map` so that store-after-load is safe. - fn update_stats(&self, min: Option, max: Option) { + fn update_stats(&self, request_size: usize, min: Option, max: Option) { + self.alloc_tracker.on_allocate(request_size); + if let Some(min) = min { let min_val = min .as_timestamp() .expect("Min timestamp must be a valid timestamp value") .value(); - let cur_min = self.stats.min_timestamp.load(AtomicOrdering::Relaxed); + let cur_min = self.min_timestamp.load(AtomicOrdering::Relaxed); if min_val < cur_min { - self.stats - .min_timestamp - .store(min_val, AtomicOrdering::Relaxed); + self.min_timestamp.store(min_val, AtomicOrdering::Relaxed); } } if let Some(max) = max { - let cur_max = self.stats.max_timestamp.load(AtomicOrdering::Relaxed); + let cur_max = self.max_timestamp.load(AtomicOrdering::Relaxed); let max_val = max .as_timestamp() .expect("Max timestamp must be a valid timestamp value") .value(); if max_val > cur_max { - self.stats - .max_timestamp - .store(max_val, AtomicOrdering::Relaxed); + self.max_timestamp.store(max_val, AtomicOrdering::Relaxed); } } } @@ -97,7 +104,9 @@ impl fmt::Debug for BTreeMemtable { // Only show StoreSchema .field("schema", &self.schema) .field("rows", &len) - .field("stats", &self.stats) + .field("alloc_tracker", &self.alloc_tracker) + .field("max_timestamp", &self.max_timestamp) + .field("min_timestamp", &self.min_timestamp) .finish() } } @@ -113,10 +122,6 @@ impl Memtable for BTreeMemtable { fn write(&self, kvs: &KeyValues) -> Result<()> { debug_assert!(kvs.timestamp.is_some()); - self.stats - .estimated_bytes - .fetch_add(kvs.estimated_memory_size(), AtomicOrdering::Relaxed); - let iter_row = IterRow::new(kvs); let mut map = self.map.write().unwrap(); @@ -135,7 +140,8 @@ impl Memtable for BTreeMemtable { map.insert(inner_key, row_value); } - self.update_stats(min_ts, max_ts); + self.update_stats(kvs.estimated_memory_size(), min_ts, max_ts); + Ok(()) } @@ -151,8 +157,16 @@ impl Memtable for BTreeMemtable { self.map.read().unwrap().len() } - fn stats(&self) -> &MemtableStats { - &self.stats + fn stats(&self) -> MemtableStats { + MemtableStats { + estimated_bytes: self.alloc_tracker.bytes_allocated(), + max_timestamp: self.max_timestamp.load(AtomicOrdering::Relaxed), + min_timestamp: self.min_timestamp.load(AtomicOrdering::Relaxed), + } + } + + fn mark_immutable(&self) { + self.alloc_tracker.done_allocating(); } } diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 17c663cd2a..50b20ad235 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -122,7 +122,6 @@ struct SliceIndex { #[cfg(test)] mod tests { use std::collections::HashMap; - use std::sync::atomic::Ordering; use std::sync::Arc; use common_time::timestamp::Timestamp; @@ -178,8 +177,8 @@ mod tests { min_ts: i64, ) { let iter = mem.iter(&IterContext::default()).unwrap(); - assert_eq!(min_ts, mem.stats().min_timestamp.load(Ordering::Relaxed)); - assert_eq!(max_ts, mem.stats().max_timestamp.load(Ordering::Relaxed)); + assert_eq!(min_ts, mem.stats().min_timestamp); + assert_eq!(max_ts, mem.stats().max_timestamp); let mut index = 0; for batch in iter { diff --git a/src/storage/src/memtable/version.rs b/src/storage/src/memtable/version.rs index ef66f758e6..0efac7d627 100644 --- a/src/storage/src/memtable/version.rs +++ b/src/storage/src/memtable/version.rs @@ -53,8 +53,14 @@ impl MemtableVersion { /// Clone current memtable version and freeze its mutable memtables, which moves /// all mutable memtables to immutable memtable list. + /// + /// This method also calls [Memtable::mark_immutable()](crate::memtable::Memtable::mark_immutable()) to + /// mark the mutable memtable as immutable. pub fn freeze_mutable(&self, new_mutable: MemtableRef) -> MemtableVersion { let mut immutables = self.immutables.clone(); + // Marks the mutable memtable as immutable so it can free the memory usage from our + // soft limit. + self.mutable.mark_immutable(); immutables.push(self.mutable.clone()); MemtableVersion { diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 9a39265823..4667eb64fb 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -30,3 +30,5 @@ pub const REGION_COUNT: &str = "storage.region_count"; pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed"; /// Elapsed time of a compact job. pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed"; +/// Global write buffer size in bytes. +pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes"; diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index c96e71c528..f8f6f7d0b8 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -166,6 +166,7 @@ pub struct StoreConfig { pub file_purger: FilePurgerRef, pub ttl: Option, pub compaction_time_window: Option, + pub write_buffer_size: usize, } pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); @@ -249,6 +250,7 @@ impl RegionImpl { store_config.engine_config.clone(), store_config.ttl, store_config.compaction_time_window, + store_config.write_buffer_size, )), wal, flush_strategy: store_config.flush_strategy, @@ -339,6 +341,7 @@ impl RegionImpl { store_config.engine_config.clone(), store_config.ttl, compaction_time_window, + store_config.write_buffer_size, )); let writer_ctx = WriterContext { shared: &shared, @@ -382,10 +385,15 @@ impl RegionImpl { } /// Returns last flush timestamp in millis. - pub fn last_flush_millis(&self) -> i64 { + pub(crate) fn last_flush_millis(&self) -> i64 { self.inner.shared.last_flush_millis() } + /// Returns the [VersionControl] of the region. + pub(crate) fn version_control(&self) -> &VersionControl { + self.inner.version_control() + } + fn create_version_with_checkpoint( checkpoint: RegionCheckpoint, memtable_builder: &MemtableBuilderRef, diff --git a/src/storage/src/region/tests/compact.rs b/src/storage/src/region/tests/compact.rs index a7416f7aa6..022d0d095c 100644 --- a/src/storage/src/region/tests/compact.rs +++ b/src/storage/src/region/tests/compact.rs @@ -79,7 +79,7 @@ async fn create_region_for_compaction< let object_store = new_object_store(store_dir, s3_bucket); - let mut store_config = config_util::new_store_config_with_object_store( + let (mut store_config, _) = config_util::new_store_config_with_object_store( REGION_NAME, store_dir, object_store.clone(), diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 860b81f278..52bf6255a7 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -15,13 +15,14 @@ //! Region flush tests. use std::sync::Arc; +use std::time::Duration; use common_test_util::temp_dir::create_temp_dir; use log_store::raft_engine::log_store::RaftEngineLogStore; use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse}; -use crate::engine; -use crate::flush::FlushStrategyRef; +use crate::engine::{self, RegionMap}; +use crate::flush::{FlushStrategyRef, FlushType}; use crate::region::tests::{self, FileTesterBase}; use crate::region::RegionImpl; use crate::test_util::config_util; @@ -33,13 +34,20 @@ const REGION_NAME: &str = "region-flush-0"; async fn create_region_for_flush( store_dir: &str, flush_strategy: FlushStrategyRef, -) -> RegionImpl { +) -> ( + RegionImpl, + Arc>, +) { let metadata = tests::new_metadata(REGION_NAME); - let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await; + let (mut store_config, regions) = + config_util::new_store_config_and_region_map(REGION_NAME, store_dir).await; store_config.flush_strategy = flush_strategy; - RegionImpl::create(metadata, store_config).await.unwrap() + ( + RegionImpl::create(metadata, store_config).await.unwrap(), + regions, + ) } /// Tester for region flush. @@ -47,20 +55,23 @@ struct FlushTester { base: Option, store_dir: String, flush_strategy: FlushStrategyRef, + regions: Arc>, } impl FlushTester { async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> FlushTester { - let region = create_region_for_flush(store_dir, flush_strategy.clone()).await; + let (region, regions) = create_region_for_flush(store_dir, flush_strategy.clone()).await; FlushTester { base: Some(FileTesterBase::with_region(region)), store_dir: store_dir.to_string(), flush_strategy: flush_strategy.clone(), + regions, } } async fn reopen(&mut self) { + self.regions.clear(); // Close the old region. if let Some(base) = self.base.as_ref() { base.close().await; @@ -101,6 +112,12 @@ impl FlushTester { } } +impl Drop for FlushTester { + fn drop(&mut self) { + self.regions.clear(); + } +} + #[tokio::test] async fn test_flush_and_stall() { common_telemetry::init_default_ut_logging(); @@ -279,3 +296,41 @@ async fn test_merge_read_after_flush() { let output = tester.full_scan().await; assert_eq!(expect, output); } + +#[tokio::test] +async fn test_schedule_engine_flush() { + common_telemetry::init_default_ut_logging(); + + let dir = create_temp_dir("engine-flush"); + let store_dir = dir.path().to_str().unwrap(); + + let flush_switch = Arc::new(FlushSwitch::default()); + let tester = FlushTester::new(store_dir, flush_switch.clone()).await; + assert_eq!(0, tester.base().region.last_flush_millis()); + + // Insert the region to the region map. + tester.regions.get_or_occupy_slot( + REGION_NAME, + engine::RegionSlot::Ready(tester.base().region.clone()), + ); + + // Put elements so we have content to flush. + tester.put(&[(1000, Some(100))]).await; + tester.put(&[(2000, Some(200))]).await; + + flush_switch.set_flush_type(FlushType::Engine); + + // Put element and trigger an engine level flush. + tester.put(&[(3000, Some(300))]).await; + + // Wait for flush. + let mut count = 0; + while tester.base().region.last_flush_millis() == 0 && count < 50 { + tokio::time::sleep(Duration::from_millis(100)).await; + count += 1; + } + + // Check parquet files. + let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME)); + assert!(has_parquet_file(&sst_dir)); +} diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 52acbe3ce0..37b71fd239 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -30,7 +30,9 @@ use tokio::sync::{oneshot, Mutex}; use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; use crate::config::EngineConfig; use crate::error::{self, Result}; -use crate::flush::{FlushHandle, FlushRequest, FlushSchedulerRef, FlushStrategyRef}; +use crate::flush::{ + FlushHandle, FlushRegionRequest, FlushSchedulerRef, FlushStrategyRef, FlushType, RegionStatus, +}; use crate::manifest::action::{ RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionRemove, @@ -71,6 +73,7 @@ impl RegionWriter { config: Arc, ttl: Option, compaction_time_window: Option, + write_buffer_size: usize, ) -> RegionWriter { RegionWriter { inner: Mutex::new(WriterInner::new( @@ -78,6 +81,7 @@ impl RegionWriter { config, ttl, compaction_time_window, + write_buffer_size, )), version_mutex: Mutex::new(()), } @@ -443,6 +447,8 @@ struct WriterInner { engine_config: Arc, ttl: Option, compaction_time_window: Option, + /// Size in bytes to freeze the mutable memtable. + write_buffer_size: usize, } impl WriterInner { @@ -451,6 +457,7 @@ impl WriterInner { engine_config: Arc, ttl: Option, compaction_time_window: Option, + write_buffer_size: usize, ) -> WriterInner { WriterInner { memtable_builder, @@ -459,6 +466,7 @@ impl WriterInner { closed: false, ttl, compaction_time_window, + write_buffer_size, } } @@ -663,13 +671,24 @@ impl WriterInner { let version_control = writer_ctx.version_control(); // Check whether memtable is full or flush should be triggered. We need to do this first since // switching memtables will clear all mutable memtables. - if self.should_flush( + if let Some(flush_type) = self.should_flush( writer_ctx.shared, version_control, writer_ctx.flush_strategy, ) { - self.trigger_flush(writer_ctx, FlushReason::MemtableFull) - .await?; + // Trigger flush according to the flush type. + match flush_type { + FlushType::Region => { + // Trigger flush for current region. + self.trigger_flush(writer_ctx, FlushReason::MemtableFull) + .await?; + } + FlushType::Engine => { + // Trigger engine level flush. This wakeup the flush handler + // to pick region to flush. + writer_ctx.flush_scheduler.schedule_engine_flush()?; + } + } } Ok(()) @@ -686,12 +705,15 @@ impl WriterInner { shared: &SharedDataRef, version_control: &VersionControlRef, flush_strategy: &FlushStrategyRef, - ) -> bool { + ) -> Option { let current = version_control.current(); let memtables = current.memtables(); - let mutable_bytes_allocated = memtables.mutable_bytes_allocated(); - let total_bytes_allocated = memtables.total_bytes_allocated(); - flush_strategy.should_flush(shared, mutable_bytes_allocated, total_bytes_allocated) + let status = RegionStatus { + region_id: shared.id(), + bytes_mutable: memtables.mutable_bytes_allocated(), + write_buffer_size: self.write_buffer_size, + }; + flush_strategy.should_flush(status) } async fn trigger_flush( @@ -723,11 +745,14 @@ impl WriterInner { let (max_memtable_id, mem_to_flush) = current_version.memtables().memtables_to_flush(); if max_memtable_id.is_none() { + // We still update the flush time to avoid the picker picks this region again. + ctx.shared.update_flush_millis(); + logging::info!("No memtables to flush in region: {}", ctx.shared.name); return Ok(()); } - let flush_req = FlushRequest { + let flush_req = FlushRegionRequest { max_memtable_id: max_memtable_id.unwrap(), memtables: mem_to_flush, // In write thread, safe to use current committed sequence. @@ -738,15 +763,17 @@ impl WriterInner { wal: ctx.wal.clone(), manifest: ctx.manifest.clone(), engine_config: self.engine_config.clone(), - sender: None, ttl: self.ttl, compaction_time_window: self.compaction_time_window, }; - let flush_handle = ctx.flush_scheduler.schedule_flush(flush_req).map_err(|e| { - logging::error!(e; "Failed to schedule flush request"); - e - })?; + let flush_handle = ctx + .flush_scheduler + .schedule_region_flush(flush_req) + .map_err(|e| { + logging::error!(e; "Failed to schedule flush request"); + e + })?; self.flush_handle = Some(flush_handle); Ok(()) diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index f6f1803e64..342fe1804f 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -22,6 +22,7 @@ use object_store::ObjectStore; use store_api::manifest::Manifest; use crate::compaction::noop::NoopCompactionScheduler; +use crate::config::DEFAULT_REGION_WRITE_BUFFER_SIZE; use crate::engine::{self, RegionMap}; use crate::file_purger::noop::NoopFilePurgeHandler; use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy}; @@ -44,6 +45,23 @@ pub async fn new_store_config( builder.root(store_dir); let object_store = ObjectStore::new(builder).unwrap().finish(); + new_store_config_with_object_store(region_name, store_dir, object_store) + .await + .0 +} + +/// Create a new StoreConfig and region map for test. +pub async fn new_store_config_and_region_map( + region_name: &str, + store_dir: &str, +) -> ( + StoreConfig, + Arc>, +) { + let mut builder = Fs::default(); + builder.root(store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); + new_store_config_with_object_store(region_name, store_dir, object_store).await } @@ -52,7 +70,10 @@ pub async fn new_store_config_with_object_store( region_name: &str, store_dir: &str, object_store: ObjectStore, -) -> StoreConfig { +) -> ( + StoreConfig, + Arc>, +) { let parent_dir = ""; let sst_dir = engine::region_sst_dir(parent_dir, region_name); let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); @@ -73,11 +94,12 @@ pub async fn new_store_config_with_object_store( let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); // We use an empty region map so actually the background worker of the picker is disabled. + let regions = Arc::new(RegionMap::new()); let flush_scheduler = Arc::new( FlushScheduler::new( SchedulerConfig::default(), compaction_scheduler.clone(), - Arc::new(RegionMap::new()), + regions.clone(), PickerConfig::default(), ) .unwrap(), @@ -86,17 +108,21 @@ pub async fn new_store_config_with_object_store( SchedulerConfig::default(), NoopFilePurgeHandler, )); - StoreConfig { - log_store, - sst_layer, - manifest, - memtable_builder: Arc::new(DefaultMemtableBuilder::default()), - flush_scheduler, - flush_strategy: Arc::new(SizeBasedStrategy::default()), - compaction_scheduler, - engine_config: Default::default(), - file_purger, - ttl: None, - compaction_time_window: None, - } + ( + StoreConfig { + log_store, + sst_layer, + manifest, + memtable_builder: Arc::new(DefaultMemtableBuilder::default()), + flush_scheduler, + flush_strategy: Arc::new(SizeBasedStrategy::default()), + compaction_scheduler, + engine_config: Default::default(), + file_purger, + ttl: None, + compaction_time_window: None, + write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize, + }, + regions, + ) } diff --git a/src/storage/src/test_util/flush_switch.rs b/src/storage/src/test_util/flush_switch.rs index 3b5b2e10c1..58124fa896 100644 --- a/src/storage/src/test_util/flush_switch.rs +++ b/src/storage/src/test_util/flush_switch.rs @@ -12,33 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; -use crate::flush::FlushStrategy; -use crate::region::SharedDataRef; +use crate::flush::{FlushStrategy, FlushType, RegionStatus}; /// Controls whether to flush a region while writing the region. /// Disable flush by default. #[derive(Debug, Default)] pub struct FlushSwitch { - should_flush: AtomicBool, + flush_type: Mutex>, } impl FlushSwitch { pub fn set_should_flush(&self, should_flush: bool) { - self.should_flush.store(should_flush, Ordering::Relaxed); + if should_flush { + *self.flush_type.lock().unwrap() = Some(FlushType::Region); + } else { + *self.flush_type.lock().unwrap() = None; + } + } + + pub fn set_flush_type(&self, flush_type: FlushType) { + *self.flush_type.lock().unwrap() = Some(flush_type); } } impl FlushStrategy for FlushSwitch { - fn should_flush( - &self, - _shared: &SharedDataRef, - _bytes_mutable: usize, - _bytes_total: usize, - ) -> bool { - self.should_flush.load(Ordering::Relaxed) + fn should_flush(&self, _status: RegionStatus) -> Option { + *self.flush_type.lock().unwrap() } + + fn reserve_mem(&self, _mem: usize) {} + + fn schedule_free_mem(&self, _mem: usize) {} + + fn free_mem(&self, _mem: usize) {} } pub fn has_parquet_file(sst_dir: &str) -> bool { diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index 9968158feb..61c05f750b 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -136,6 +136,8 @@ pub enum FlushReason { Manually, /// Auto flush periodically. Periodically, + /// Global write buffer is full. + GlobalBufferFull, } impl FlushReason { @@ -146,6 +148,7 @@ impl FlushReason { FlushReason::MemtableFull => "memtable_full", FlushReason::Manually => "manually", FlushReason::Periodically => "periodically", + FlushReason::GlobalBufferFull => "global_buffer_full", } } }