feat(mito): Checks whether a region should flush periodically (#3459)

* feat: handle flush periodically

* chore: call periodical method in loop

* feat: check periodical tasks on channel timeout

* refactor: use time provider to get time

Mock a time provider to test auto flush

* chore: fix typos

* refactor: rename mock time provider

* style: fix cilppy

* chore: address comment
This commit is contained in:
Yingwen
2024-03-15 14:41:28 +08:00
committed by GitHub
parent a52aedec5b
commit 74862f8c3f
11 changed files with 314 additions and 20 deletions

View File

@@ -56,6 +56,7 @@ pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
puffin.workspace = true
rand.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
@@ -75,7 +76,6 @@ common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
log-store.workspace = true
rand.workspace = true
toml.workspace = true
[[bench]]

View File

@@ -373,6 +373,7 @@ impl MitoEngine {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: crate::time_provider::TimeProviderRef,
) -> Result<MitoEngine> {
config.sanitize(data_home)?;
@@ -385,6 +386,7 @@ impl MitoEngine {
object_store_manager,
write_buffer_manager,
listener,
time_provider,
)
.await?,
config,

View File

@@ -14,10 +14,13 @@
//! Flush tests for mito engine.
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use common_time::util::current_time_millis;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
@@ -28,6 +31,8 @@ use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};
use crate::time_provider::TimeProvider;
use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
#[tokio::test]
async fn test_manual_flush() {
@@ -272,3 +277,101 @@ async fn test_flush_reopen_region() {
assert_eq!(2, version_data.last_entry_id);
assert_eq!(5, version_data.committed_sequence);
}
#[derive(Debug)]
struct MockTimeProvider {
now: AtomicI64,
elapsed: AtomicI64,
}
impl TimeProvider for MockTimeProvider {
fn current_time_millis(&self) -> i64 {
self.now.load(Ordering::Relaxed)
}
fn elapsed_since(&self, _current_millis: i64) -> i64 {
self.elapsed.load(Ordering::Relaxed)
}
fn wait_duration(&self, _duration: Duration) -> Duration {
Duration::from_millis(20)
}
}
impl MockTimeProvider {
fn new(now: i64) -> Self {
Self {
now: AtomicI64::new(now),
elapsed: AtomicI64::new(0),
}
}
fn set_now(&self, now: i64) {
self.now.store(now, Ordering::Relaxed);
}
fn set_elapsed(&self, elapsed: i64) {
self.elapsed.store(elapsed, Ordering::Relaxed);
}
}
#[tokio::test]
async fn test_auto_flush_engine() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::default());
let now = current_time_millis();
let time_provider = Arc::new(MockTimeProvider::new(now));
let engine = env
.create_engine_with_time(
MitoConfig {
auto_flush_interval: Duration::from_secs(60 * 5),
..Default::default()
},
Some(write_buffer_manager.clone()),
Some(listener.clone()),
time_provider.clone(),
)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Prepares rows for flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
// Sets current time to now + auto_flush_interval * 2.
time_provider.set_now(now + (60 * 5 * 2) * 1000);
// Sets elapsed time to MAX_INITIAL_CHECK_DELAY_SECS + 1.
time_provider.set_elapsed((MAX_INITIAL_CHECK_DELAY_SECS as i64 + 1) * 1000);
// Wait until flush is finished.
tokio::time::timeout(Duration::from_secs(3), listener.wait())
.await
.unwrap();
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| a | 0.0 | 1970-01-01T00:00:00 |
| a | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -176,6 +176,8 @@ pub enum FlushReason {
Manual,
/// Flush to alter table.
Alter,
/// Flush periodically.
Periodically,
}
impl FlushReason {
@@ -432,18 +434,19 @@ impl FlushScheduler {
) -> Result<()> {
debug_assert_eq!(region_id, task.region_id);
FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();
let version = version_control.current().version;
if version.memtables.mutable.is_empty() && version.memtables.immutables().is_empty() {
if version.memtables.is_empty() {
debug_assert!(!self.region_status.contains_key(&region_id));
// The region has nothing to flush.
task.on_success();
return Ok(());
}
// Don't increase the counter if a region has nothing to flush.
FLUSH_REQUESTS_TOTAL
.with_label_values(&[task.reason.as_str()])
.inc();
// Add this region to status map.
let flush_status = self
.region_status

View File

@@ -40,6 +40,7 @@ pub mod request;
pub mod row_converter;
pub(crate) mod schedule;
pub mod sst;
mod time_provider;
pub mod wal;
mod worker;

View File

@@ -23,7 +23,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
use common_telemetry::info;
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
@@ -37,6 +36,7 @@ use crate::memtable::MemtableId;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::OnFailure;
use crate::sst::file_purger::FilePurgerRef;
use crate::time_provider::TimeProviderRef;
/// This is the approximate factor to estimate the size of wal.
const ESTIMATED_WAL_FACTOR: f32 = 0.42825;
@@ -83,6 +83,9 @@ pub(crate) struct MitoRegion {
last_flush_millis: AtomicI64,
/// Whether the region is writable.
writable: AtomicBool,
/// Provider to get current time.
time_provider: TimeProviderRef,
}
pub(crate) type MitoRegionRef = Arc<MitoRegion>;
@@ -119,7 +122,7 @@ impl MitoRegion {
/// Update flush time to current time.
pub(crate) fn update_flush_millis(&self) {
let now = current_time_millis();
let now = self.time_provider.current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}

View File

@@ -19,7 +19,6 @@ use std::sync::atomic::{AtomicBool, AtomicI64};
use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use common_wal::options::WalOptions;
use futures::StreamExt;
use object_store::manager::ObjectStoreManagerRef;
@@ -47,6 +46,7 @@ use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::{EntryId, Wal};
/// Builder to create a new [MitoRegion] or open an existing one.
@@ -61,6 +61,7 @@ pub(crate) struct RegionOpener {
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
intermediate_manager: IntermediateManager,
time_provider: Option<TimeProviderRef>,
}
impl RegionOpener {
@@ -84,6 +85,7 @@ impl RegionOpener {
cache_manager: None,
skip_wal_replay: false,
intermediate_manager,
time_provider: None,
}
}
@@ -189,6 +191,9 @@ impl RegionOpener {
object_store,
self.intermediate_manager,
));
let time_provider = self
.time_provider
.unwrap_or_else(|| Arc::new(StdTimeProvider));
Ok(MitoRegion {
region_id,
@@ -201,9 +206,10 @@ impl RegionOpener {
self.cache_manager,
)),
wal_options,
last_flush_millis: AtomicI64::new(current_time_millis()),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is writable after it is created.
writable: AtomicBool::new(true),
time_provider,
})
}
@@ -307,6 +313,10 @@ impl RegionOpener {
} else {
info!("Skip the WAL replay for region: {}", region_id);
}
let time_provider = self
.time_provider
.clone()
.unwrap_or_else(|| Arc::new(StdTimeProvider));
let region = MitoRegion {
region_id: self.region_id,
@@ -315,9 +325,10 @@ impl RegionOpener {
manifest_manager,
file_purger,
wal_options,
last_flush_millis: AtomicI64::new(current_time_millis()),
last_flush_millis: AtomicI64::new(time_provider.current_time_millis()),
// Region is always opened in read only mode.
writable: AtomicBool::new(false),
time_provider,
};
Ok(Some(region))
}

View File

@@ -59,6 +59,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::worker::WorkerGroup;
#[derive(Debug)]
@@ -179,6 +180,7 @@ impl TestEnv {
object_store_manager,
manager,
listener,
Arc::new(StdTimeProvider),
)
.await
.unwrap()
@@ -219,6 +221,37 @@ impl TestEnv {
object_store_manager,
manager,
listener,
Arc::new(StdTimeProvider),
)
.await
.unwrap()
}
/// Creates a new engine with specific config and manager/listener/time provider under this env.
pub async fn create_engine_with_time(
&mut self,
config: MitoConfig,
manager: Option<WriteBufferManagerRef>,
listener: Option<EventListenerRef>,
time_provider: TimeProviderRef,
) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
let logstore = Arc::new(log_store);
let object_store_manager = Arc::new(object_store_manager);
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
let data_home = self.data_home().display().to_string();
MitoEngine::new_for_test(
&data_home,
config,
logstore,
object_store_manager,
manager,
listener,
time_provider.clone(),
)
.await
.unwrap()

View File

@@ -0,0 +1,52 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Abstraction to get current time.
use std::sync::Arc;
use std::time::Duration;
use common_time::util::current_time_millis;
/// Trait to get current time and deal with durations.
///
/// We define the trait to simplify time related tests.
pub trait TimeProvider: std::fmt::Debug + Send + Sync {
/// Returns current time in millis.
fn current_time_millis(&self) -> i64;
/// Returns millis elapsed since specify time.
fn elapsed_since(&self, current_millis: i64) -> i64;
/// Computes the actual duration to wait from an expected one.
fn wait_duration(&self, duration: Duration) -> Duration {
duration
}
}
pub type TimeProviderRef = Arc<dyn TimeProvider>;
/// Default implementation of the time provider based on std.
#[derive(Debug)]
pub struct StdTimeProvider;
impl TimeProvider for StdTimeProvider {
fn current_time_millis(&self) -> i64 {
current_time_millis()
}
fn elapsed_since(&self, current_millis: i64) -> i64 {
current_time_millis() - current_millis
}
}

View File

@@ -33,6 +33,7 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use rand::{thread_rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::region_engine::SetReadonlyResponse;
@@ -56,6 +57,7 @@ use crate::request::{
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::wal::Wal;
/// Identifier for a worker.
@@ -63,6 +65,11 @@ pub(crate) type WorkerId = u32;
pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
/// Interval to check whether regions should flush.
pub(crate) const CHECK_REGION_INTERVAL: Duration = Duration::from_secs(60);
/// Max delay to check region periodical tasks.
pub(crate) const MAX_INITIAL_CHECK_DELAY_SECS: u64 = 60 * 3;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// A fixed size group of [RegionWorkers](RegionWorker).
///
@@ -140,6 +147,7 @@ impl WorkerGroup {
.write_cache(write_cache)
.build(),
);
let time_provider = Arc::new(StdTimeProvider);
let workers = (0..config.num_workers)
.map(|id| {
@@ -153,6 +161,7 @@ impl WorkerGroup {
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
}
.start()
})
@@ -223,6 +232,7 @@ impl WorkerGroup {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
time_provider: TimeProviderRef,
) -> Result<WorkerGroup> {
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
@@ -260,6 +270,7 @@ impl WorkerGroup {
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
intermediate_manager: intermediate_manager.clone(),
time_provider: time_provider.clone(),
}
.start()
})
@@ -300,6 +311,12 @@ async fn write_cache_from_config(
Ok(Some(Arc::new(cache)))
}
/// Computes a initial check delay for a worker.
pub(crate) fn worker_init_check_delay() -> Duration {
let init_check_delay = thread_rng().gen_range(0..MAX_INITIAL_CHECK_DELAY_SECS);
Duration::from_secs(init_check_delay)
}
/// Worker start config.
struct WorkerStarter<S> {
id: WorkerId,
@@ -311,6 +328,7 @@ struct WorkerStarter<S> {
listener: WorkerListener,
cache_manager: CacheManagerRef,
intermediate_manager: IntermediateManager,
time_provider: TimeProviderRef,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -320,7 +338,6 @@ impl<S: LogStore> WorkerStarter<S> {
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);
let running = Arc::new(AtomicBool::new(true));
let memtable_builder = match &self.config.memtable {
MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new(
merge_tree.clone(),
@@ -330,6 +347,7 @@ impl<S: LogStore> WorkerStarter<S> {
self.write_buffer_manager.clone(),
))) as _,
};
let now = self.time_provider.current_time_millis();
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config,
@@ -353,6 +371,8 @@ impl<S: LogStore> WorkerStarter<S> {
listener: self.listener,
cache_manager: self.cache_manager,
intermediate_manager: self.intermediate_manager,
time_provider: self.time_provider,
last_periodical_check_millis: now,
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
@@ -511,12 +531,21 @@ struct RegionWorkerLoop<S> {
cache_manager: CacheManagerRef,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
/// Provider to get current time.
time_provider: TimeProviderRef,
/// Last time to check regions periodically.
last_periodical_check_millis: i64,
}
impl<S: LogStore> RegionWorkerLoop<S> {
/// Starts the worker loop.
async fn run(&mut self) {
info!("Start region worker thread {}", self.id);
let init_check_delay = worker_init_check_delay();
info!(
"Start region worker thread {}, init_check_delay: {:?}",
self.id, init_check_delay
);
self.last_periodical_check_millis += init_check_delay.as_millis() as i64;
// Buffer to retrieve requests from receiver.
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
@@ -525,9 +554,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Clear the buffer before handling next batch of requests.
buffer.clear();
match self.receiver.recv().await {
Some(request) => buffer.push(request),
None => break,
let max_wait_time = self.time_provider.wait_duration(CHECK_REGION_INTERVAL);
match tokio::time::timeout(max_wait_time, self.receiver.recv()).await {
Ok(Some(request)) => buffer.push(request),
// The channel is disconnected.
Ok(None) => break,
Err(_) => {
// Timeout. Checks periodical tasks.
self.handle_periodical_tasks();
continue;
}
}
// Try to recv more requests from the channel.
@@ -541,6 +577,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
self.handle_requests(&mut buffer).await;
self.handle_periodical_tasks();
}
self.clean().await;
@@ -629,6 +667,24 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
/// Handle periodical tasks such as region auto flush.
fn handle_periodical_tasks(&mut self) {
let interval = CHECK_REGION_INTERVAL.as_millis() as i64;
if self
.time_provider
.elapsed_since(self.last_periodical_check_millis)
< interval
{
return;
}
self.last_periodical_check_millis = self.time_provider.current_time_millis();
if let Err(e) = self.flush_periodically() {
error!(e; "Failed to flush regions periodically");
}
}
/// Handles region background request
async fn handle_background_notify(&mut self, region_id: RegionId, notify: BackgroundNotify) {
match notify {

View File

@@ -17,7 +17,6 @@
use std::sync::Arc;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
@@ -77,10 +76,10 @@ impl<S> RegionWorkerLoop<S> {
}
}
/// Find some regions to flush to reduce write buffer usage.
/// Finds some regions to flush to reduce write buffer usage.
fn flush_regions_on_engine_full(&mut self) -> Result<()> {
let regions = self.regions.list_regions();
let now = current_time_millis();
let now = self.time_provider.current_time_millis();
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
let mut max_mutable_size = 0;
// Region with max mutable memtable size.
@@ -129,7 +128,38 @@ impl<S> RegionWorkerLoop<S> {
Ok(())
}
/// Create a flush task with specific `reason` for the `region`.
/// Flushes regions periodically.
pub(crate) fn flush_periodically(&mut self) -> Result<()> {
let regions = self.regions.list_regions();
let now = self.time_provider.current_time_millis();
let min_last_flush_time = now - self.config.auto_flush_interval.as_millis() as i64;
for region in &regions {
if self.flush_scheduler.is_flush_requested(region.region_id) {
// Already flushing.
continue;
}
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(
region,
FlushReason::Periodically,
None,
self.config.clone(),
);
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
task,
)?;
}
}
Ok(())
}
/// Creates a flush task with specific `reason` for the `region`.
pub(crate) fn new_flush_task(
&self,
region: &MitoRegionRef,