diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 8a93e20b57..cf1a8533f1 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -56,6 +56,7 @@ pin-project.workspace = true prometheus.workspace = true prost.workspace = true puffin.workspace = true +rand.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true @@ -75,7 +76,6 @@ common-procedure-test.workspace = true common-test-util.workspace = true criterion = "0.4" log-store.workspace = true -rand.workspace = true toml.workspace = true [[bench]] diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 971edad74b..16a3bca5dc 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -373,6 +373,7 @@ impl MitoEngine { object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, + time_provider: crate::time_provider::TimeProviderRef, ) -> Result { config.sanitize(data_home)?; @@ -385,6 +386,7 @@ impl MitoEngine { object_store_manager, write_buffer_manager, listener, + time_provider, ) .await?, config, diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index efd8a727d2..9c348102f2 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -14,10 +14,13 @@ //! Flush tests for mito engine. +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; +use common_time::util::current_time_millis; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -28,6 +31,8 @@ use crate::test_util::{ build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, MockWriteBufferManager, TestEnv, }; +use crate::time_provider::TimeProvider; +use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; #[tokio::test] async fn test_manual_flush() { @@ -272,3 +277,101 @@ async fn test_flush_reopen_region() { assert_eq!(2, version_data.last_entry_id); assert_eq!(5, version_data.committed_sequence); } + +#[derive(Debug)] +struct MockTimeProvider { + now: AtomicI64, + elapsed: AtomicI64, +} + +impl TimeProvider for MockTimeProvider { + fn current_time_millis(&self) -> i64 { + self.now.load(Ordering::Relaxed) + } + + fn elapsed_since(&self, _current_millis: i64) -> i64 { + self.elapsed.load(Ordering::Relaxed) + } + + fn wait_duration(&self, _duration: Duration) -> Duration { + Duration::from_millis(20) + } +} + +impl MockTimeProvider { + fn new(now: i64) -> Self { + Self { + now: AtomicI64::new(now), + elapsed: AtomicI64::new(0), + } + } + + fn set_now(&self, now: i64) { + self.now.store(now, Ordering::Relaxed); + } + + fn set_elapsed(&self, elapsed: i64) { + self.elapsed.store(elapsed, Ordering::Relaxed); + } +} + +#[tokio::test] +async fn test_auto_flush_engine() { + let mut env = TestEnv::new(); + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(FlushListener::default()); + let now = current_time_millis(); + let time_provider = Arc::new(MockTimeProvider::new(now)); + let engine = env + .create_engine_with_time( + MitoConfig { + auto_flush_interval: Duration::from_secs(60 * 5), + ..Default::default() + }, + Some(write_buffer_manager.clone()), + Some(listener.clone()), + time_provider.clone(), + ) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Prepares rows for flush. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id, rows).await; + + // Sets current time to now + auto_flush_interval * 2. + time_provider.set_now(now + (60 * 5 * 2) * 1000); + // Sets elapsed time to MAX_INITIAL_CHECK_DELAY_SECS + 1. + time_provider.set_elapsed((MAX_INITIAL_CHECK_DELAY_SECS as i64 + 1) * 1000); + + // Wait until flush is finished. + tokio::time::timeout(Duration::from_secs(3), listener.wait()) + .await + .unwrap(); + + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id, request).unwrap(); + assert_eq!(0, scanner.num_memtables()); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 0eea330537..f3890cafda 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -176,6 +176,8 @@ pub enum FlushReason { Manual, /// Flush to alter table. Alter, + /// Flush periodically. + Periodically, } impl FlushReason { @@ -432,18 +434,19 @@ impl FlushScheduler { ) -> Result<()> { debug_assert_eq!(region_id, task.region_id); - FLUSH_REQUESTS_TOTAL - .with_label_values(&[task.reason.as_str()]) - .inc(); - let version = version_control.current().version; - if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() { + if version.memtables.is_empty() { debug_assert!(!self.region_status.contains_key(®ion_id)); // The region has nothing to flush. task.on_success(); return Ok(()); } + // Don't increase the counter if a region has nothing to flush. + FLUSH_REQUESTS_TOTAL + .with_label_values(&[task.reason.as_str()]) + .inc(); + // Add this region to status map. let flush_status = self .region_status diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index be785e81c3..acf65608ce 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -40,6 +40,7 @@ pub mod request; pub mod row_converter; pub(crate) mod schedule; pub mod sst; +mod time_provider; pub mod wal; mod worker; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 2306c663cd..c32e45b87a 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -23,7 +23,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; use common_telemetry::info; -use common_time::util::current_time_millis; use common_wal::options::WalOptions; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; @@ -37,6 +36,7 @@ use crate::memtable::MemtableId; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::OnFailure; use crate::sst::file_purger::FilePurgerRef; +use crate::time_provider::TimeProviderRef; /// This is the approximate factor to estimate the size of wal. const ESTIMATED_WAL_FACTOR: f32 = 0.42825; @@ -83,6 +83,9 @@ pub(crate) struct MitoRegion { last_flush_millis: AtomicI64, /// Whether the region is writable. writable: AtomicBool, + + /// Provider to get current time. + time_provider: TimeProviderRef, } pub(crate) type MitoRegionRef = Arc; @@ -119,7 +122,7 @@ impl MitoRegion { /// Update flush time to current time. pub(crate) fn update_flush_millis(&self) { - let now = current_time_millis(); + let now = self.time_provider.current_time_millis(); self.last_flush_millis.store(now, Ordering::Relaxed); } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 5192c55469..d0ac1a5530 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; use common_telemetry::{debug, error, info, warn}; -use common_time::util::current_time_millis; use common_wal::options::WalOptions; use futures::StreamExt; use object_store::manager::ObjectStoreManagerRef; @@ -47,6 +46,7 @@ use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; +use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::wal::{EntryId, Wal}; /// Builder to create a new [MitoRegion] or open an existing one. @@ -61,6 +61,7 @@ pub(crate) struct RegionOpener { cache_manager: Option, skip_wal_replay: bool, intermediate_manager: IntermediateManager, + time_provider: Option, } impl RegionOpener { @@ -84,6 +85,7 @@ impl RegionOpener { cache_manager: None, skip_wal_replay: false, intermediate_manager, + time_provider: None, } } @@ -189,6 +191,9 @@ impl RegionOpener { object_store, self.intermediate_manager, )); + let time_provider = self + .time_provider + .unwrap_or_else(|| Arc::new(StdTimeProvider)); Ok(MitoRegion { region_id, @@ -201,9 +206,10 @@ impl RegionOpener { self.cache_manager, )), wal_options, - last_flush_millis: AtomicI64::new(current_time_millis()), + last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), // Region is writable after it is created. writable: AtomicBool::new(true), + time_provider, }) } @@ -307,6 +313,10 @@ impl RegionOpener { } else { info!("Skip the WAL replay for region: {}", region_id); } + let time_provider = self + .time_provider + .clone() + .unwrap_or_else(|| Arc::new(StdTimeProvider)); let region = MitoRegion { region_id: self.region_id, @@ -315,9 +325,10 @@ impl RegionOpener { manifest_manager, file_purger, wal_options, - last_flush_millis: AtomicI64::new(current_time_millis()), + last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), // Region is always opened in read only mode. writable: AtomicBool::new(false), + time_provider, }; Ok(Some(region)) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 84316dbcc1..7339949490 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -59,6 +59,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::read::{Batch, BatchBuilder, BatchReader}; use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest}; use crate::sst::index::intermediate::IntermediateManager; +use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::worker::WorkerGroup; #[derive(Debug)] @@ -179,6 +180,7 @@ impl TestEnv { object_store_manager, manager, listener, + Arc::new(StdTimeProvider), ) .await .unwrap() @@ -219,6 +221,37 @@ impl TestEnv { object_store_manager, manager, listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap() + } + + /// Creates a new engine with specific config and manager/listener/time provider under this env. + pub async fn create_engine_with_time( + &mut self, + config: MitoConfig, + manager: Option, + listener: Option, + time_provider: TimeProviderRef, + ) -> MitoEngine { + let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; + + let logstore = Arc::new(log_store); + let object_store_manager = Arc::new(object_store_manager); + self.logstore = Some(logstore.clone()); + self.object_store_manager = Some(object_store_manager.clone()); + + let data_home = self.data_home().display().to_string(); + + MitoEngine::new_for_test( + &data_home, + config, + logstore, + object_store_manager, + manager, + listener, + time_provider.clone(), ) .await .unwrap() diff --git a/src/mito2/src/time_provider.rs b/src/mito2/src/time_provider.rs new file mode 100644 index 0000000000..37cfd59e41 --- /dev/null +++ b/src/mito2/src/time_provider.rs @@ -0,0 +1,52 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Abstraction to get current time. + +use std::sync::Arc; +use std::time::Duration; + +use common_time::util::current_time_millis; + +/// Trait to get current time and deal with durations. +/// +/// We define the trait to simplify time related tests. +pub trait TimeProvider: std::fmt::Debug + Send + Sync { + /// Returns current time in millis. + fn current_time_millis(&self) -> i64; + + /// Returns millis elapsed since specify time. + fn elapsed_since(&self, current_millis: i64) -> i64; + + /// Computes the actual duration to wait from an expected one. + fn wait_duration(&self, duration: Duration) -> Duration { + duration + } +} + +pub type TimeProviderRef = Arc; + +/// Default implementation of the time provider based on std. +#[derive(Debug)] +pub struct StdTimeProvider; + +impl TimeProvider for StdTimeProvider { + fn current_time_millis(&self) -> i64 { + current_time_millis() + } + + fn elapsed_since(&self, current_millis: i64) -> i64 { + current_time_millis() - current_millis + } +} diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 08db2002ac..a8fe38e87d 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -33,6 +33,7 @@ use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; +use rand::{thread_rng, Rng}; use snafu::{ensure, ResultExt}; use store_api::logstore::LogStore; use store_api::region_engine::SetReadonlyResponse; @@ -56,6 +57,7 @@ use crate::request::{ }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; use crate::sst::index::intermediate::IntermediateManager; +use crate::time_provider::{StdTimeProvider, TimeProviderRef}; use crate::wal::Wal; /// Identifier for a worker. @@ -63,6 +65,11 @@ pub(crate) type WorkerId = u32; pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping"; +/// Interval to check whether regions should flush. +pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60); +/// Max delay to check region periodical tasks. +pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3; + #[cfg_attr(doc, aquamarine::aquamarine)] /// A fixed size group of [RegionWorkers](RegionWorker). /// @@ -140,6 +147,7 @@ impl WorkerGroup { .write_cache(write_cache) .build(), ); + let time_provider = Arc::new(StdTimeProvider); let workers = (0..config.num_workers) .map(|id| { @@ -153,6 +161,7 @@ impl WorkerGroup { listener: WorkerListener::default(), cache_manager: cache_manager.clone(), intermediate_manager: intermediate_manager.clone(), + time_provider: time_provider.clone(), } .start() }) @@ -223,6 +232,7 @@ impl WorkerGroup { object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, + time_provider: TimeProviderRef, ) -> Result { let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| { Arc::new(WriteBufferManagerImpl::new( @@ -260,6 +270,7 @@ impl WorkerGroup { listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), intermediate_manager: intermediate_manager.clone(), + time_provider: time_provider.clone(), } .start() }) @@ -300,6 +311,12 @@ async fn write_cache_from_config( Ok(Some(Arc::new(cache))) } +/// Computes a initial check delay for a worker. +pub(crate) fn worker_init_check_delay() -> Duration { + let init_check_delay = thread_rng().gen_range(0..MAX_INITIAL_CHECK_DELAY_SECS); + Duration::from_secs(init_check_delay) +} + /// Worker start config. struct WorkerStarter { id: WorkerId, @@ -311,6 +328,7 @@ struct WorkerStarter { listener: WorkerListener, cache_manager: CacheManagerRef, intermediate_manager: IntermediateManager, + time_provider: TimeProviderRef, } impl WorkerStarter { @@ -320,7 +338,6 @@ impl WorkerStarter { let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); - let memtable_builder = match &self.config.memtable { MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new( merge_tree.clone(), @@ -330,6 +347,7 @@ impl WorkerStarter { self.write_buffer_manager.clone(), ))) as _, }; + let now = self.time_provider.current_time_millis(); let mut worker_thread = RegionWorkerLoop { id: self.id, config: self.config, @@ -353,6 +371,8 @@ impl WorkerStarter { listener: self.listener, cache_manager: self.cache_manager, intermediate_manager: self.intermediate_manager, + time_provider: self.time_provider, + last_periodical_check_millis: now, }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -511,12 +531,21 @@ struct RegionWorkerLoop { cache_manager: CacheManagerRef, /// Intermediate manager for inverted index. intermediate_manager: IntermediateManager, + /// Provider to get current time. + time_provider: TimeProviderRef, + /// Last time to check regions periodically. + last_periodical_check_millis: i64, } impl RegionWorkerLoop { /// Starts the worker loop. async fn run(&mut self) { - info!("Start region worker thread {}", self.id); + let init_check_delay = worker_init_check_delay(); + info!( + "Start region worker thread {}, init_check_delay: {:?}", + self.id, init_check_delay + ); + self.last_periodical_check_millis += init_check_delay.as_millis() as i64; // Buffer to retrieve requests from receiver. let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size); @@ -525,9 +554,16 @@ impl RegionWorkerLoop { // Clear the buffer before handling next batch of requests. buffer.clear(); - match self.receiver.recv().await { - Some(request) => buffer.push(request), - None => break, + let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL); + match tokio::time::timeout(max_wait_time, self.receiver.recv()).await { + Ok(Some(request)) => buffer.push(request), + // The channel is disconnected. + Ok(None) => break, + Err(_) => { + // Timeout. Checks periodical tasks. + self.handle_periodical_tasks(); + continue; + } } // Try to recv more requests from the channel. @@ -541,6 +577,8 @@ impl RegionWorkerLoop { } self.handle_requests(&mut buffer).await; + + self.handle_periodical_tasks(); } self.clean().await; @@ -629,6 +667,24 @@ impl RegionWorkerLoop { } } + /// Handle periodical tasks such as region auto flush. + fn handle_periodical_tasks(&mut self) { + let interval = CHECK_REGION_INTERVAL.as_millis() as i64; + if self + .time_provider + .elapsed_since(self.last_periodical_check_millis) + < interval + { + return; + } + + self.last_periodical_check_millis = self.time_provider.current_time_millis(); + + if let Err(e) = self.flush_periodically() { + error!(e; "Failed to flush regions periodically"); + } + } + /// Handles region background request async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) { match notify { diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index e3ce1f2f63..a4b0ebccd0 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use common_telemetry::{error, info, warn}; -use common_time::util::current_time_millis; use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; use store_api::storage::RegionId; @@ -77,10 +76,10 @@ impl RegionWorkerLoop { } } - /// Find some regions to flush to reduce write buffer usage. + /// Finds some regions to flush to reduce write buffer usage. fn flush_regions_on_engine_full(&mut self) -> Result<()> { let regions = self.regions.list_regions(); - let now = current_time_millis(); + let now = self.time_provider.current_time_millis(); let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; let mut max_mutable_size = 0; // Region with max mutable memtable size. @@ -129,7 +128,38 @@ impl RegionWorkerLoop { Ok(()) } - /// Create a flush task with specific `reason` for the `region`. + /// Flushes regions periodically. + pub(crate) fn flush_periodically(&mut self) -> Result<()> { + let regions = self.regions.list_regions(); + let now = self.time_provider.current_time_millis(); + let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64; + + for region in ®ions { + if self.flush_scheduler.is_flush_requested(region.region_id) { + // Already flushing. + continue; + } + + if region.last_flush_millis() < min_last_flush_time { + // If flush time of this region is earlier than `min_last_flush_time`, we can flush this region. + let task = self.new_flush_task( + region, + FlushReason::Periodically, + None, + self.config.clone(), + ); + self.flush_scheduler.schedule_flush( + region.region_id, + ®ion.version_control, + task, + )?; + } + } + + Ok(()) + } + + /// Creates a flush task with specific `reason` for the `region`. pub(crate) fn new_flush_task( &self, region: &MitoRegionRef,