diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 607d2fac9b..d8c300d45f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -52,6 +52,15 @@ gc_duration = '30s' # Whether to try creating a manifest checkpoint on region opening checkpoint_on_startup = false +# Storage flush options +[storage.flush] +# Max inflight flush tasks. +max_flush_tasks = 8 +# Default write buffer size for a region. +region_write_buffer_size = "32MB" +# Interval to auto flush a region if it has not flushed yet. +auto_flush_interval = "1h" + # Procedure storage options, see `standalone.example.toml`. [procedure] max_retry_times = 3 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index b00ac213e6..25ec6867c0 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -117,6 +117,15 @@ gc_duration = '30s' # Whether to try creating a manifest checkpoint on region opening checkpoint_on_startup = false +# Storage flush options +[storage.flush] +# Max inflight flush tasks. +max_flush_tasks = 8 +# Default write buffer size for a region. +region_write_buffer_size = "32MB" +# Interval to auto flush a region if it has not flushed yet. +auto_flush_interval = "1h" + # Procedure storage options. [procedure] # Procedure max retry time. diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 12a7b81c02..1d8c964935 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -509,7 +509,8 @@ mod tests { Arc::new(NoopLogStore::default()), object_store.clone(), noop_compaction_scheduler, - ), + ) + .unwrap(), object_store, )); (dir, table_engine) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0f9d4e087c..e0aa1ed215 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -23,7 +23,10 @@ use secrecy::SecretString; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use servers::Mode; -use storage::config::EngineConfig as StorageEngineConfig; +use storage::config::{ + EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS, + DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE, +}; use storage::scheduler::SchedulerConfig; use crate::error::Result; @@ -49,6 +52,7 @@ pub struct StorageConfig { pub store: ObjectStoreConfig, pub compaction: CompactionConfig, pub manifest: RegionManifestConfig, + pub flush: FlushConfig, } #[derive(Debug, Clone, Serialize, Default, Deserialize)] @@ -203,6 +207,34 @@ impl Default for CompactionConfig { } } +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(default)] +pub struct FlushConfig { + /// Max inflight flush tasks. + pub max_flush_tasks: usize, + /// Default write buffer size for a region. + pub region_write_buffer_size: ReadableSize, + /// Interval to schedule auto flush picker to find region to flush. + #[serde(with = "humantime_serde")] + pub picker_schedule_interval: Duration, + /// Interval to auto flush a region if it has not flushed yet. + #[serde(with = "humantime_serde")] + pub auto_flush_interval: Duration, +} + +impl Default for FlushConfig { + fn default() -> Self { + Self { + max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS, + region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE, + picker_schedule_interval: Duration::from_millis( + DEFAULT_PICKER_SCHEDULE_INTERVAL.into(), + ), + auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), + } + } +} + impl From<&DatanodeOptions> for SchedulerConfig { fn from(value: &DatanodeOptions) -> Self { Self { @@ -220,6 +252,10 @@ impl From<&DatanodeOptions> for StorageEngineConfig { max_files_in_l0: value.storage.compaction.max_files_in_level0, max_purge_tasks: value.storage.compaction.max_purge_tasks, sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size, + max_flush_tasks: value.storage.flush.max_flush_tasks, + region_write_buffer_size: value.storage.flush.region_write_buffer_size, + picker_schedule_interval: value.storage.flush.picker_schedule_interval, + auto_flush_interval: value.storage.flush.auto_flush_interval, } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4433c2193b..707d8c3232 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -120,7 +120,8 @@ impl Instance { log_store.clone(), object_store.clone(), compaction_scheduler, - ), + ) + .unwrap(), object_store.clone(), )); diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index d644d73f6b..b163cf010a 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -669,6 +669,12 @@ impl MitoEngineInner { .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; + self.storage_engine + .close(&StorageEngineContext::default()) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; + Ok(()) } } diff --git a/src/mito/src/engine/procedure.rs b/src/mito/src/engine/procedure.rs index 4d3a8b24d0..a8627b6164 100644 --- a/src/mito/src/engine/procedure.rs +++ b/src/mito/src/engine/procedure.rs @@ -66,7 +66,8 @@ mod procedure_test_util { Arc::new(NoopLogStore::default()), object_store.clone(), compaction_scheduler, - ); + ) + .unwrap(); let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store); TestEnv { table_engine, dir } diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index 7ff3fa3223..72bdf0ccda 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -83,7 +83,8 @@ async fn setup_table_with_column_default_constraint() -> (TempDir, String, Table Arc::new(NoopLogStore::default()), object_store.clone(), compaction_scheduler, - ), + ) + .unwrap(), object_store, ); diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index bd655ea029..19d85eaf7c 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -35,8 +35,8 @@ use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; use store_api::storage::{ - AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, ReadContext, Region, - RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, + AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, FlushReason, ReadContext, + Region, RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, }; use table::error::{ InvalidTableSnafu, RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu, @@ -294,7 +294,12 @@ impl Table for MitoTable { region_number: Option, wait: Option, ) -> TableResult<()> { - let flush_ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default(); + let flush_ctx = wait + .map(|wait| FlushContext { + wait, + reason: FlushReason::Manually, + }) + .unwrap_or_default(); if let Some(region_number) = region_number { if let Some(region) = self.regions.get(®ion_number) { region diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index dcde46ba03..adf0196302 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -155,7 +155,8 @@ pub async fn setup_test_engine_and_table() -> TestEngineComponents { Arc::new(NoopLogStore::default()), object_store.clone(), compaction_scheduler, - ); + ) + .unwrap(); let table_engine = MitoEngine::new( EngineConfig::default(), storage_engine.clone(), diff --git a/src/mito/src/table/test_util/mock_engine.rs b/src/mito/src/table/test_util/mock_engine.rs index 3b0b7f8ebe..a1dcedd7a2 100644 --- a/src/mito/src/table/test_util/mock_engine.rs +++ b/src/mito/src/table/test_util/mock_engine.rs @@ -326,4 +326,8 @@ impl StorageEngine for MockEngine { let regions = self.regions.lock().unwrap(); Ok(regions.opened_regions.get(name).cloned()) } + + async fn close(&self, _ctx: &EngineContext) -> Result<()> { + Ok(()) + } } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index fdb2f2f177..771fd295f6 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -160,7 +160,8 @@ mod tests { Arc::new(log_store), object_store.clone(), compaction_scheduler, - ), + ) + .unwrap(), object_store, )); let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone())); diff --git a/src/storage/src/config.rs b/src/storage/src/config.rs index bffcc4ffba..f1bc18164a 100644 --- a/src/storage/src/config.rs +++ b/src/storage/src/config.rs @@ -18,6 +18,15 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; +/// Default max flush tasks. +pub const DEFAULT_MAX_FLUSH_TASKS: usize = 8; +/// Default region write buffer size. +pub const DEFAULT_REGION_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32); +/// Default interval to trigger auto flush in millis. +pub const DEFAULT_AUTO_FLUSH_INTERVAL: u32 = 60 * 60 * 1000; +/// Default interval to schedule the picker to flush automatically in millis. +pub const DEFAULT_PICKER_SCHEDULE_INTERVAL: u32 = 5 * 60 * 1000; + #[derive(Debug, Clone)] pub struct EngineConfig { pub manifest_checkpoint_on_startup: bool, @@ -26,6 +35,14 @@ pub struct EngineConfig { pub max_files_in_l0: usize, pub max_purge_tasks: usize, pub sst_write_buffer_size: ReadableSize, + /// Max inflight flush tasks. + pub max_flush_tasks: usize, + /// Default write buffer size for a region. + pub region_write_buffer_size: ReadableSize, + /// Interval to schedule the auto flush picker. + pub picker_schedule_interval: Duration, + /// Interval to auto flush a region if it has not flushed yet. + pub auto_flush_interval: Duration, } impl Default for EngineConfig { @@ -37,6 +54,12 @@ impl Default for EngineConfig { max_files_in_l0: 8, max_purge_tasks: 32, sst_write_buffer_size: ReadableSize::mb(8), + max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS, + region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE, + picker_schedule_interval: Duration::from_millis( + DEFAULT_PICKER_SCHEDULE_INTERVAL.into(), + ), + auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), } } } diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index c85984500f..9d9e81bedd 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use async_trait::async_trait; -use common_telemetry::logging::debug; +use common_telemetry::logging::{self, debug}; use object_store::{util, ObjectStore}; use snafu::ResultExt; use store_api::logstore::LogStore; @@ -30,12 +30,14 @@ use crate::compaction::CompactionSchedulerRef; use crate::config::EngineConfig; use crate::error::{self, Error, Result}; use crate::file_purger::{FilePurgeHandler, FilePurgerRef}; -use crate::flush::{FlushScheduler, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy}; +use crate::flush::{ + FlushScheduler, FlushSchedulerRef, FlushStrategyRef, PickerConfig, SizeBasedStrategy, +}; use crate::manifest::region::RegionManifest; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::metadata::RegionMetadata; use crate::region::{RegionImpl, StoreConfig}; -use crate::scheduler::{LocalScheduler, SchedulerConfig}; +use crate::scheduler::{LocalScheduler, Scheduler, SchedulerConfig}; use crate::sst::FsAccessLayer; /// [StorageEngine] implementation. @@ -85,6 +87,16 @@ impl StorageEngine for EngineImpl { fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result> { Ok(self.inner.get_region(name)) } + + async fn close(&self, _ctx: &EngineContext) -> Result<()> { + logging::info!("Stopping storage engine"); + + self.inner.close().await?; + + logging::info!("Storage engine stopped"); + + Ok(()) + } } impl EngineImpl { @@ -93,15 +105,15 @@ impl EngineImpl { log_store: Arc, object_store: ObjectStore, compaction_scheduler: CompactionSchedulerRef, - ) -> Self { - Self { + ) -> Result { + Ok(Self { inner: Arc::new(EngineInner::new( config, log_store, object_store, compaction_scheduler, - )), - } + )?), + }) } } @@ -124,7 +136,7 @@ pub fn region_manifest_dir(parent_dir: &str, region_name: &str) -> String { /// Also used as a placeholder in the region map when the region isn't ready, e.g. during /// creating/opening. #[derive(Debug)] -enum RegionSlot { +pub(crate) enum RegionSlot { /// The region is during creation. Creating, /// The region is during opening. @@ -208,17 +220,21 @@ impl<'a, S: LogStore> Drop for SlotGuard<'a, S> { } /// Region slot map. -struct RegionMap(RwLock>>); +pub struct RegionMap(RwLock>>); impl RegionMap { /// Returns a new region map. - fn new() -> RegionMap { + pub fn new() -> RegionMap { RegionMap(RwLock::new(HashMap::new())) } /// Returns the `Some(slot)` if there is existing slot with given `name`, or insert /// given `slot` and returns `None`. - fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot) -> Option> { + pub(crate) fn get_or_occupy_slot( + &self, + name: &str, + slot: RegionSlot, + ) -> Option> { { // Try to get the region under read lock. let regions = self.0.read().unwrap(); @@ -258,6 +274,26 @@ impl RegionMap { let mut regions = self.0.write().unwrap(); regions.remove(name); } + + /// Collects regions. + pub(crate) fn list_regions(&self) -> Vec> { + let regions = self.0.read().unwrap(); + regions + .values() + .filter_map(|slot| slot.get_ready_region()) + .collect() + } + + /// Clear the region map. + fn clear(&self) { + self.0.write().unwrap().clear(); + } +} + +impl Default for RegionMap { + fn default() -> Self { + Self::new() + } } struct EngineInner { @@ -278,12 +314,19 @@ impl EngineInner { log_store: Arc, object_store: ObjectStore, compaction_scheduler: CompactionSchedulerRef, - ) -> Self { - // TODO(yingwen): max inflight flush tasks. + ) -> Result { + let regions = Arc::new(RegionMap::new()); let flush_scheduler = Arc::new(FlushScheduler::new( - SchedulerConfig::default(), + SchedulerConfig { + max_inflight_tasks: config.max_flush_tasks, + }, compaction_scheduler.clone(), - )); + regions.clone(), + PickerConfig { + schedule_interval: config.picker_schedule_interval, + auto_flush_interval: config.auto_flush_interval, + }, + )?); let file_purger = Arc::new(LocalScheduler::new( SchedulerConfig { @@ -291,17 +334,19 @@ impl EngineInner { }, FilePurgeHandler, )); - Self { + Ok(Self { object_store, log_store, - regions: Arc::new(RegionMap::new()), + regions, memtable_builder: Arc::new(DefaultMemtableBuilder::default()), flush_scheduler, - flush_strategy: Arc::new(SizeBasedStrategy::default()), + flush_strategy: Arc::new(SizeBasedStrategy::new( + config.region_write_buffer_size.as_bytes() as usize, + )), compaction_scheduler, file_purger, config: Arc::new(config), - } + }) } async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result>> { @@ -427,6 +472,22 @@ impl EngineInner { compaction_time_window, }) } + + async fn close(&self) -> Result<()> { + let regions = self.regions.list_regions(); + for region in regions { + // Tolerate failure during closing regions. + if let Err(e) = region.close().await { + logging::error!(e; "Failed to close region {}", region.id()); + } + } + // Clear regions to release references to regions in the region map. + self.regions.clear(); + + self.compaction_scheduler.stop(true).await?; + self.flush_scheduler.stop().await?; + self.file_purger.stop(true).await + } } #[cfg(test)] @@ -462,7 +523,8 @@ mod tests { Arc::new(log_store), object_store, compaction_scheduler, - ); + ) + .unwrap(); let region_name = "region-0"; let desc = RegionDescBuilder::new(region_name) diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 692b6bf346..786de859bb 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -487,6 +487,18 @@ pub enum Error { sequence: SequenceNumber, location: Location, }, + + #[snafu(display("Failed to start picking task for flush: {}", source))] + StartPickTask { + #[snafu(backtrace)] + source: RuntimeError, + }, + + #[snafu(display("Failed to stop picking task for flush: {}", source))] + StopPickTask { + #[snafu(backtrace)] + source: RuntimeError, + }, } pub type Result = std::result::Result; @@ -579,7 +591,9 @@ impl ErrorExt for Error { StartManifestGcTask { .. } | StopManifestGcTask { .. } | IllegalSchedulerState { .. } - | DuplicateFlush { .. } => StatusCode::Unexpected, + | DuplicateFlush { .. } + | StartPickTask { .. } + | StopPickTask { .. } => StatusCode::Unexpected, TtlCalculation { source, .. } => source.status_code(), } diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index 2a212fe430..20feecdcaa 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod picker; mod scheduler; use std::sync::Arc; use common_telemetry::{logging, timer}; +pub use picker::{FlushPicker, PickerConfig}; pub use scheduler::{FlushHandle, FlushRequest, FlushScheduler, FlushSchedulerRef}; use store_api::logstore::LogStore; use store_api::storage::consts::WRITE_ROW_GROUP_SIZE; use store_api::storage::SequenceNumber; -use crate::config::EngineConfig; +use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE}; use crate::error::Result; use crate::manifest::action::*; use crate::manifest::region::RegionManifest; @@ -32,9 +34,6 @@ use crate::region::{RegionWriterRef, SharedDataRef}; use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions}; use crate::wal::Wal; -/// Default write buffer size (32M). -const DEFAULT_WRITE_BUFFER_SIZE: usize = 32 * 1024 * 1024; - pub trait FlushStrategy: Send + Sync + std::fmt::Debug { fn should_flush( &self, @@ -72,7 +71,7 @@ fn get_mutable_limitation(max_write_buffer_size: usize) -> usize { impl Default for SizeBasedStrategy { fn default() -> Self { - let max_write_buffer_size = DEFAULT_WRITE_BUFFER_SIZE; + let max_write_buffer_size = DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize; Self { max_write_buffer_size, mutable_limitation: get_mutable_limitation(max_write_buffer_size), diff --git a/src/storage/src/flush/picker.rs b/src/storage/src/flush/picker.rs new file mode 100644 index 0000000000..7c0726f403 --- /dev/null +++ b/src/storage/src/flush/picker.rs @@ -0,0 +1,201 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use async_trait::async_trait; +use common_telemetry::logging; +use common_time::util; +use store_api::logstore::LogStore; +use store_api::storage::{FlushContext, FlushReason, Region}; + +use crate::config::{DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_PICKER_SCHEDULE_INTERVAL}; +use crate::region::RegionImpl; + +/// Config for [FlushPicker]. +pub struct PickerConfig { + /// Interval to schedule the picker. + pub schedule_interval: Duration, + /// Interval to auto flush a region if it has not flushed yet. + pub auto_flush_interval: Duration, +} + +impl PickerConfig { + /// Returns the auto flush interval in millis or a default value + /// if overflow occurs. + fn auto_flush_interval_millis(&self) -> i64 { + self.auto_flush_interval + .as_millis() + .try_into() + .unwrap_or(DEFAULT_AUTO_FLUSH_INTERVAL.into()) + } +} + +impl Default for PickerConfig { + fn default() -> Self { + PickerConfig { + schedule_interval: Duration::from_millis(DEFAULT_PICKER_SCHEDULE_INTERVAL.into()), + auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), + } + } +} + +/// Flush task picker. +pub struct FlushPicker { + /// Interval to flush a region automatically. + auto_flush_interval_millis: i64, +} + +impl FlushPicker { + /// Returns a new FlushPicker. + pub fn new(config: PickerConfig) -> FlushPicker { + FlushPicker { + auto_flush_interval_millis: config.auto_flush_interval_millis(), + } + } + + /// Pick regions and flush them by interval. + /// + /// Returns the number of flushed regions. + pub async fn pick_by_interval(&self, regions: &[T]) -> usize { + let now = util::current_time_millis(); + // Flush regions by interval. + if let Some(earliest_flush_millis) = now.checked_sub(self.auto_flush_interval_millis) { + flush_regions_by_interval(regions, earliest_flush_millis).await + } else { + 0 + } + } +} + +/// Item for picker to flush. +#[async_trait] +pub trait FlushItem { + /// Id of the item. + fn item_id(&self) -> u64; + + /// Last flush time in millis. + fn last_flush_time(&self) -> i64; + + /// Requests the item to schedule a flush for specific `reason`. + /// + /// The flush job itself should run in background. + async fn request_flush(&self, reason: FlushReason); +} + +#[async_trait] +impl FlushItem for RegionImpl { + fn item_id(&self) -> u64 { + self.id() + } + + fn last_flush_time(&self) -> i64 { + self.last_flush_millis() + } + + async fn request_flush(&self, reason: FlushReason) { + let ctx = FlushContext { + wait: false, + reason, + }; + if let Err(e) = self.flush(&ctx).await { + logging::error!(e; "Failed to flush region {}", self.id()); + } + } +} + +/// Auto flush regions based on last flush time. +/// +/// Returns the number of flushed regions. +async fn flush_regions_by_interval( + regions: &[T], + earliest_flush_millis: i64, +) -> usize { + let mut flushed = 0; + for region in regions { + if region.last_flush_time() < earliest_flush_millis { + logging::debug!( + "Auto flush region {} due to last flush time ({} < {})", + region.item_id(), + region.last_flush_time(), + earliest_flush_millis, + ); + + flushed += 1; + region.request_flush(FlushReason::Periodically).await; + } + } + + flushed +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use super::*; + + struct MockItem { + id: u64, + last_flush_time: i64, + flush_reason: Mutex>, + } + + impl MockItem { + fn new(id: u64, last_flush_time: i64) -> MockItem { + MockItem { + id, + last_flush_time, + flush_reason: Mutex::new(None), + } + } + + fn flush_reason(&self) -> Option { + *self.flush_reason.lock().unwrap() + } + } + + #[async_trait] + impl FlushItem for MockItem { + fn item_id(&self) -> u64 { + self.id + } + + fn last_flush_time(&self) -> i64 { + self.last_flush_time + } + + async fn request_flush(&self, reason: FlushReason) { + let mut flush_reason = self.flush_reason.lock().unwrap(); + *flush_reason = Some(reason); + } + } + + #[tokio::test] + async fn test_pick_by_interval() { + let regions = [ + MockItem::new(0, util::current_time_millis()), + MockItem::new(1, util::current_time_millis() - 60 * 1000), + ]; + let picker = FlushPicker::new(PickerConfig { + // schedule_interval is unused in this test. + schedule_interval: Duration::from_millis(10), + auto_flush_interval: Duration::from_millis(30 * 1000), + }); + let flushed = picker.pick_by_interval(®ions).await; + assert_eq!(1, flushed); + assert!(regions[0].flush_reason().is_none()); + assert_eq!(Some(FlushReason::Periodically), regions[1].flush_reason()); + } +} diff --git a/src/storage/src/flush/scheduler.rs b/src/storage/src/flush/scheduler.rs index 9eadd63333..cfe08cc4e2 100644 --- a/src/storage/src/flush/scheduler.rs +++ b/src/storage/src/flush/scheduler.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; +use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::logging; use metrics::increment_counter; use snafu::{ensure, ResultExt}; @@ -25,11 +27,14 @@ use tokio::sync::{oneshot, Notify}; use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; use crate::config::EngineConfig; -use crate::error::{DuplicateFlushSnafu, Result, WaitFlushSnafu}; -use crate::flush::FlushJob; +use crate::engine::RegionMap; +use crate::error::{ + DuplicateFlushSnafu, Error, Result, StartPickTaskSnafu, StopPickTaskSnafu, WaitFlushSnafu, +}; +use crate::flush::{FlushJob, FlushPicker, PickerConfig}; use crate::manifest::region::RegionManifest; use crate::memtable::{MemtableId, MemtableRef}; -use crate::metrics::{FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL}; +use crate::metrics::FLUSH_ERRORS_TOTAL; use crate::region; use crate::region::{RegionWriterRef, SharedDataRef}; use crate::scheduler::rate_limit::BoxedRateLimitToken; @@ -145,20 +150,37 @@ impl FlushHandle { /// Flush scheduler. pub struct FlushScheduler { + /// Flush task scheduler. scheduler: LocalScheduler>, + /// Auto flush task. + auto_flush_task: RepeatedTask, } pub type FlushSchedulerRef = Arc>; impl FlushScheduler { /// Returns a new [FlushScheduler]. - pub fn new(config: SchedulerConfig, compaction_scheduler: CompactionSchedulerRef) -> Self { + pub fn new( + config: SchedulerConfig, + compaction_scheduler: CompactionSchedulerRef, + regions: Arc>, + picker_config: PickerConfig, + ) -> Result { let handler = FlushHandler { compaction_scheduler, }; - Self { + let task_interval = picker_config.schedule_interval; + let picker = FlushPicker::new(picker_config); + let task_fn = AutoFlushFunction { regions, picker }; + let auto_flush_task = RepeatedTask::new(task_interval, Box::new(task_fn)); + auto_flush_task + .start(common_runtime::bg_runtime()) + .context(StartPickTaskSnafu)?; + + Ok(Self { scheduler: LocalScheduler::new(config, handler), - } + auto_flush_task, + }) } /// Schedules a flush request and return the handle to the flush task. @@ -183,13 +205,22 @@ impl FlushScheduler { } ); - increment_counter!(FLUSH_REQUESTS_TOTAL); - Ok(FlushHandle { region_id, receiver, }) } + + /// Stop the scheduler. + pub async fn stop(&self) -> Result<()> { + self.auto_flush_task + .stop() + .await + .context(StopPickTaskSnafu)?; + self.scheduler.stop(true).await?; + + Ok(()) + } } struct FlushHandler { @@ -235,6 +266,9 @@ async fn execute_flush( } else { logging::debug!("Successfully flush region: {}", req.region_id()); + // Update last flush time. + req.shared.update_flush_millis(); + let compaction_request = CompactionRequestImpl::from(&req); let max_files_in_l0 = req.engine_config.max_files_in_l0; let shared_data = req.shared.clone(); @@ -247,6 +281,29 @@ async fn execute_flush( max_files_in_l0, ); + // Complete the request. req.complete(Ok(())); } } + +/// Task function to pick regions to flush. +struct AutoFlushFunction { + /// Regions of the engine. + regions: Arc>, + picker: FlushPicker, +} + +#[async_trait] +impl TaskFunction for AutoFlushFunction { + async fn call(&mut self) -> Result<()> { + // Get all regions. + let regions = self.regions.list_regions(); + self.picker.pick_by_interval(®ions).await; + + Ok(()) + } + + fn name(&self) -> &str { + "FlushPicker-pick-task" + } +} diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 640c6b5914..9a39265823 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -22,6 +22,8 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total"; pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total"; /// Elapsed time of a flush job. pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed"; +/// Reason to flush. +pub const FLUSH_REASON: &str = "reason"; /// Gauge for open regions pub const REGION_COUNT: &str = "storage.region_count"; /// Timer for logstore write diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index 69ba0dfd8f..461ec3a0c6 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -18,11 +18,13 @@ mod writer; use std::collections::BTreeMap; use std::fmt; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use common_telemetry::logging; +use common_time::util; use metrics::{decrement_gauge, increment_gauge}; use snafu::ResultExt; use store_api::logstore::LogStore; @@ -235,6 +237,7 @@ impl RegionImpl { id, name, version_control: Arc::new(version_control), + last_flush_millis: AtomicI64::new(0), }), writer: Arc::new(RegionWriter::new( store_config.memtable_builder, @@ -315,6 +318,7 @@ impl RegionImpl { id: metadata.id(), name, version_control, + last_flush_millis: AtomicI64::new(0), }); let compaction_time_window = store_config .compaction_time_window @@ -366,6 +370,11 @@ impl RegionImpl { self.inner.shared.id() } + /// Returns last flush timestamp in millis. + pub fn last_flush_millis(&self) -> i64 { + self.inner.shared.last_flush_millis() + } + fn create_version_with_checkpoint( checkpoint: RegionCheckpoint, memtable_builder: &MemtableBuilderRef, @@ -558,6 +567,9 @@ pub struct SharedData { name: String, // TODO(yingwen): Maybe no need to use Arc for version control. pub version_control: VersionControlRef, + + /// Last flush time in millis. + last_flush_millis: AtomicI64, } impl SharedData { @@ -570,6 +582,17 @@ impl SharedData { pub fn name(&self) -> &str { &self.name } + + /// Update flush time to current time. + pub(crate) fn update_flush_millis(&self) { + let now = util::current_time_millis(); + self.last_flush_millis.store(now, Ordering::Relaxed); + } + + /// Returns last flush timestamp in millis. + fn last_flush_millis(&self) -> i64 { + self.last_flush_millis.load(Ordering::Relaxed) + } } pub type SharedDataRef = Arc; diff --git a/src/storage/src/region/tests/alter.rs b/src/storage/src/region/tests/alter.rs index 90c483bbf1..cfaab4e9e6 100644 --- a/src/storage/src/region/tests/alter.rs +++ b/src/storage/src/region/tests/alter.rs @@ -22,8 +22,8 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; use log_store::raft_engine::log_store::RaftEngineLogStore; use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor, - ColumnDescriptorBuilder, ColumnId, FlushContext, Region, RegionMeta, ScanRequest, SchemaRef, - Snapshot, WriteRequest, WriteResponse, + ColumnDescriptorBuilder, ColumnId, FlushContext, FlushReason, Region, RegionMeta, ScanRequest, + SchemaRef, Snapshot, WriteRequest, WriteResponse, }; use crate::region::tests::{self, FileTesterBase}; @@ -118,7 +118,12 @@ impl AlterTester { } async fn flush(&self, wait: Option) { - let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default(); + let ctx = wait + .map(|wait| FlushContext { + wait, + reason: FlushReason::Manually, + }) + .unwrap_or_default(); self.base().region.flush(&ctx).await.unwrap(); } diff --git a/src/storage/src/region/tests/compact.rs b/src/storage/src/region/tests/compact.rs index a283b3d32c..a7416f7aa6 100644 --- a/src/storage/src/region/tests/compact.rs +++ b/src/storage/src/region/tests/compact.rs @@ -23,7 +23,7 @@ use common_test_util::temp_dir::create_temp_dir; use log_store::raft_engine::log_store::RaftEngineLogStore; use object_store::services::{Fs, S3}; use object_store::ObjectStore; -use store_api::storage::{FlushContext, Region, WriteResponse}; +use store_api::storage::{FlushContext, FlushReason, Region, WriteResponse}; use tokio::sync::Notify; use crate::compaction::{CompactionHandler, SimplePicker}; @@ -191,7 +191,12 @@ impl CompactionTester { } async fn flush(&self, wait: Option) { - let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default(); + let ctx = wait + .map(|wait| FlushContext { + wait, + reason: FlushReason::Manually, + }) + .unwrap_or_default(); self.base().region.flush(&ctx).await.unwrap(); } diff --git a/src/storage/src/region/tests/flush.rs b/src/storage/src/region/tests/flush.rs index 75141e361f..4033b3d538 100644 --- a/src/storage/src/region/tests/flush.rs +++ b/src/storage/src/region/tests/flush.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use common_test_util::temp_dir::create_temp_dir; use log_store::raft_engine::log_store::RaftEngineLogStore; -use store_api::storage::{FlushContext, OpenOptions, Region, WriteResponse}; +use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse}; use crate::engine; use crate::flush::FlushStrategyRef; @@ -91,7 +91,12 @@ impl FlushTester { } async fn flush(&self, wait: Option) { - let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default(); + let ctx = wait + .map(|wait| FlushContext { + wait, + reason: FlushReason::Manually, + }) + .unwrap_or_default(); self.base().region.flush(&ctx).await.unwrap(); } } diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 740c11305a..991a11a571 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -18,10 +18,13 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::logging; use futures::TryStreamExt; +use metrics::increment_counter; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::manifest::{Manifest, ManifestVersion, MetaAction}; -use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse}; +use store_api::storage::{ + AlterRequest, FlushContext, FlushReason, SequenceNumber, WriteContext, WriteResponse, +}; use tokio::sync::{oneshot, Mutex}; use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef}; @@ -33,6 +36,7 @@ use crate::manifest::action::{ }; use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef}; use crate::metadata::RegionMetadataRef; +use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL}; use crate::proto::wal::WalHeader; use crate::region::{ CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef, @@ -281,7 +285,7 @@ impl RegionWriter { ensure!(!inner.is_closed(), error::ClosedRegionSnafu); - inner.manual_flush(writer_ctx).await?; + inner.manual_flush(writer_ctx, ctx.reason).await?; if ctx.wait { if let Some(handle) = inner.flush_handle.take() { @@ -587,7 +591,8 @@ impl WriterInner { version_control, writer_ctx.flush_strategy, ) { - self.trigger_flush(writer_ctx).await?; + self.trigger_flush(writer_ctx, FlushReason::MemtableFull) + .await?; } Ok(()) @@ -612,12 +617,18 @@ impl WriterInner { flush_strategy.should_flush(shared, mutable_bytes_allocated, total_bytes_allocated) } - async fn trigger_flush(&mut self, ctx: &WriterContext<'_, S>) -> Result<()> { + async fn trigger_flush( + &mut self, + ctx: &WriterContext<'_, S>, + reason: FlushReason, + ) -> Result<()> { let version_control = &ctx.shared.version_control; let new_mutable = self.alloc_memtable(version_control); // Freeze all mutable memtables so we can flush them later. version_control.freeze_mutable(new_mutable); + increment_counter!(FLUSH_REQUESTS_TOTAL, FLUSH_REASON => reason.as_str()); + if let Some(flush_handle) = self.flush_handle.take() { // Previous flush job is incomplete, wait util it is finished. // However the last flush job may fail, in which case, we just return error @@ -719,8 +730,12 @@ impl WriterInner { Ok(()) } - async fn manual_flush(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> { - self.trigger_flush(&writer_ctx).await?; + async fn manual_flush( + &mut self, + writer_ctx: WriterContext<'_, S>, + reason: FlushReason, + ) -> Result<()> { + self.trigger_flush(&writer_ctx, reason).await?; Ok(()) } diff --git a/src/storage/src/scheduler.rs b/src/storage/src/scheduler.rs index 33249fbe28..2c8d1c162a 100644 --- a/src/storage/src/scheduler.rs +++ b/src/storage/src/scheduler.rs @@ -124,6 +124,9 @@ where self.state.store(STATE_STOP, Ordering::Relaxed); self.cancel_token.cancel(); + + // Clear all requests + self.request_queue.write().unwrap().clear(); } } diff --git a/src/storage/src/scheduler/dedup_deque.rs b/src/storage/src/scheduler/dedup_deque.rs index e04c16dcc5..91cef075ae 100644 --- a/src/storage/src/scheduler/dedup_deque.rs +++ b/src/storage/src/scheduler/dedup_deque.rs @@ -76,6 +76,12 @@ impl DedupDeque { pub fn is_empty(&self) -> bool { self.deque.is_empty() } + + #[inline] + pub fn clear(&mut self) { + self.deque.clear(); + self.existing.clear(); + } } impl Debug for DedupDeque @@ -111,5 +117,8 @@ mod tests { assert!(deque.push_back(1, "hello".to_string())); assert!(!deque.push_back(1, "world".to_string())); assert_eq!((1, "hello".to_string()), deque.pop_front().unwrap()); + + deque.clear(); + assert!(deque.is_empty()); } } diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index b969fe4608..ae418b56ee 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -21,9 +21,9 @@ use object_store::ObjectStore; use store_api::manifest::Manifest; use crate::compaction::noop::NoopCompactionScheduler; -use crate::engine; +use crate::engine::{self, RegionMap}; use crate::file_purger::noop::NoopFilePurgeHandler; -use crate::flush::{FlushScheduler, SizeBasedStrategy}; +use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy}; use crate::manifest::region::RegionManifest; use crate::memtable::DefaultMemtableBuilder; use crate::region::StoreConfig; @@ -65,10 +65,16 @@ pub async fn new_store_config_with_object_store( }; let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - let flush_scheduler = Arc::new(FlushScheduler::new( - SchedulerConfig::default(), - compaction_scheduler.clone(), - )); + // We use an empty region map so actually the background worker of the picker is disabled. + let flush_scheduler = Arc::new( + FlushScheduler::new( + SchedulerConfig::default(), + compaction_scheduler.clone(), + Arc::new(RegionMap::new()), + PickerConfig::default(), + ) + .unwrap(), + ); let file_purger = Arc::new(LocalScheduler::new( SchedulerConfig::default(), NoopFilePurgeHandler, diff --git a/src/storage/src/test_util/flush_switch.rs b/src/storage/src/test_util/flush_switch.rs index 6d1b7df9cc..3b5b2e10c1 100644 --- a/src/storage/src/test_util/flush_switch.rs +++ b/src/storage/src/test_util/flush_switch.rs @@ -17,6 +17,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use crate::flush::FlushStrategy; use crate::region::SharedDataRef; +/// Controls whether to flush a region while writing the region. +/// Disable flush by default. #[derive(Debug, Default)] pub struct FlushSwitch { should_flush: AtomicBool, diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index aba97a3e3b..d8978db21f 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -34,7 +34,7 @@ pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine}; pub use self::metadata::RegionMeta; -pub use self::region::{FlushContext, Region, RegionStat, WriteContext}; +pub use self::region::{FlushContext, FlushReason, Region, RegionStat, WriteContext}; pub use self::requests::{ AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest, }; diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 78bc5fc631..fb055fa5b5 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -73,6 +73,9 @@ pub trait StorageEngine: Send + Sync + Clone + 'static { ctx: &EngineContext, name: &str, ) -> Result, Self::Error>; + + /// Close the engine. + async fn close(&self, ctx: &EngineContext) -> Result<(), Self::Error>; } /// Storage engine context. diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs index de3904bd2e..648b15a876 100644 --- a/src/store-api/src/storage/region.rs +++ b/src/store-api/src/storage/region.rs @@ -110,10 +110,40 @@ pub struct FlushContext { /// If true, the flush will wait until the flush is done. /// Default: true pub wait: bool, + /// Flush reason. + pub reason: FlushReason, } impl Default for FlushContext { fn default() -> FlushContext { - FlushContext { wait: true } + FlushContext { + wait: true, + reason: FlushReason::Others, + } + } +} + +/// Reason of flush operation. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlushReason { + /// Other reasons. + Others, + /// Memtable is full. + MemtableFull, + /// Flush manually. + Manually, + /// Auto flush periodically. + Periodically, +} + +impl FlushReason { + /// Returns reason as `str`. + pub fn as_str(&self) -> &'static str { + match self { + FlushReason::Others => "others", + FlushReason::MemtableFull => "memtable_full", + FlushReason::Manually => "manually", + FlushReason::Periodically => "periodically", + } } } diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 59615a1a78..63aee29281 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -57,7 +57,8 @@ impl TestEnv { Arc::new(NoopLogStore::default()), object_store.clone(), compaction_scheduler, - ); + ) + .unwrap(); let table_engine = Arc::new(MitoEngine::new( EngineConfig::default(), storage_engine,