feat: Add FlushPicker to flush regions periodically (#1559)

* feat: Add FlushPicker

* feat(storage): Add close to StorageEngine

* style(storage): fix clippy

* feat(storage): Close regions in StorageEngine::close

* chore(storage): Clear requests on scheduler stop

* test(storage): Test flush picker

* feat(storage): Add metrics for auto flush

* feat(storage): Add flush reason and record it in metrics

* feat: Expose flush config

docs(config): Update config example

* refactor(storage): Run auto flush task in FlushScheduler

* refactor(storage): Add FlushItem trait to make FlushPicker easy to test
This commit is contained in:
Yingwen
2023-05-15 17:29:28 +08:00
committed by GitHub
parent 497b1f9dc9
commit 8d54d40b21
32 changed files with 606 additions and 66 deletions

View File

@@ -52,6 +52,15 @@ gc_duration = '30s'
# Whether to try creating a manifest checkpoint on region opening
checkpoint_on_startup = false
# Storage flush options
[storage.flush]
# Max inflight flush tasks.
max_flush_tasks = 8
# Default write buffer size for a region.
region_write_buffer_size = "32MB"
# Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
# Procedure storage options, see `standalone.example.toml`.
[procedure]
max_retry_times = 3

View File

@@ -117,6 +117,15 @@ gc_duration = '30s'
# Whether to try creating a manifest checkpoint on region opening
checkpoint_on_startup = false
# Storage flush options
[storage.flush]
# Max inflight flush tasks.
max_flush_tasks = 8
# Default write buffer size for a region.
region_write_buffer_size = "32MB"
# Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
# Procedure storage options.
[procedure]
# Procedure max retry time.

View File

@@ -509,7 +509,8 @@ mod tests {
Arc::new(NoopLogStore::default()),
object_store.clone(),
noop_compaction_scheduler,
),
)
.unwrap(),
object_store,
));
(dir, table_engine)

View File

@@ -23,7 +23,10 @@ use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::config::{
EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS,
DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE,
};
use storage::scheduler::SchedulerConfig;
use crate::error::Result;
@@ -49,6 +52,7 @@ pub struct StorageConfig {
pub store: ObjectStoreConfig,
pub compaction: CompactionConfig,
pub manifest: RegionManifestConfig,
pub flush: FlushConfig,
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
@@ -203,6 +207,34 @@ impl Default for CompactionConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
pub struct FlushConfig {
/// Max inflight flush tasks.
pub max_flush_tasks: usize,
/// Default write buffer size for a region.
pub region_write_buffer_size: ReadableSize,
/// Interval to schedule auto flush picker to find region to flush.
#[serde(with = "humantime_serde")]
pub picker_schedule_interval: Duration,
/// Interval to auto flush a region if it has not flushed yet.
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
}
impl Default for FlushConfig {
fn default() -> Self {
Self {
max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS,
region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE,
picker_schedule_interval: Duration::from_millis(
DEFAULT_PICKER_SCHEDULE_INTERVAL.into(),
),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
}
}
}
impl From<&DatanodeOptions> for SchedulerConfig {
fn from(value: &DatanodeOptions) -> Self {
Self {
@@ -220,6 +252,10 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
max_files_in_l0: value.storage.compaction.max_files_in_level0,
max_purge_tasks: value.storage.compaction.max_purge_tasks,
sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size,
max_flush_tasks: value.storage.flush.max_flush_tasks,
region_write_buffer_size: value.storage.flush.region_write_buffer_size,
picker_schedule_interval: value.storage.flush.picker_schedule_interval,
auto_flush_interval: value.storage.flush.auto_flush_interval,
}
}
}

View File

@@ -120,7 +120,8 @@ impl Instance {
log_store.clone(),
object_store.clone(),
compaction_scheduler,
),
)
.unwrap(),
object_store.clone(),
));

View File

@@ -669,6 +669,12 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
self.storage_engine
.close(&StorageEngineContext::default())
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(())
}
}

View File

@@ -66,7 +66,8 @@ mod procedure_test_util {
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
)
.unwrap();
let table_engine = MitoEngine::new(EngineConfig::default(), storage_engine, object_store);
TestEnv { table_engine, dir }

View File

@@ -83,7 +83,8 @@ async fn setup_table_with_column_default_constraint() -> (TempDir, String, Table
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
),
)
.unwrap(),
object_store,
);

View File

@@ -35,8 +35,8 @@ use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, ReadContext, Region,
RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
AddColumn, AlterOperation, AlterRequest, ChunkReader, FlushContext, FlushReason, ReadContext,
Region, RegionMeta, RegionNumber, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error::{
InvalidTableSnafu, RegionSchemaMismatchSnafu, Result as TableResult, TableOperationSnafu,
@@ -294,7 +294,12 @@ impl<R: Region> Table for MitoTable<R> {
region_number: Option<RegionNumber>,
wait: Option<bool>,
) -> TableResult<()> {
let flush_ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
let flush_ctx = wait
.map(|wait| FlushContext {
wait,
reason: FlushReason::Manually,
})
.unwrap_or_default();
if let Some(region_number) = region_number {
if let Some(region) = self.regions.get(&region_number) {
region

View File

@@ -155,7 +155,8 @@ pub async fn setup_test_engine_and_table() -> TestEngineComponents {
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
)
.unwrap();
let table_engine = MitoEngine::new(
EngineConfig::default(),
storage_engine.clone(),

View File

@@ -326,4 +326,8 @@ impl StorageEngine for MockEngine {
let regions = self.regions.lock().unwrap();
Ok(regions.opened_regions.get(name).cloned())
}
async fn close(&self, _ctx: &EngineContext) -> Result<()> {
Ok(())
}
}

View File

@@ -160,7 +160,8 @@ mod tests {
Arc::new(log_store),
object_store.clone(),
compaction_scheduler,
),
)
.unwrap(),
object_store,
));
let engine_manager = Arc::new(MemoryTableEngineManager::new(mock_engine.clone()));

View File

@@ -18,6 +18,15 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
/// Default max flush tasks.
pub const DEFAULT_MAX_FLUSH_TASKS: usize = 8;
/// Default region write buffer size.
pub const DEFAULT_REGION_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32);
/// Default interval to trigger auto flush in millis.
pub const DEFAULT_AUTO_FLUSH_INTERVAL: u32 = 60 * 60 * 1000;
/// Default interval to schedule the picker to flush automatically in millis.
pub const DEFAULT_PICKER_SCHEDULE_INTERVAL: u32 = 5 * 60 * 1000;
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub manifest_checkpoint_on_startup: bool,
@@ -26,6 +35,14 @@ pub struct EngineConfig {
pub max_files_in_l0: usize,
pub max_purge_tasks: usize,
pub sst_write_buffer_size: ReadableSize,
/// Max inflight flush tasks.
pub max_flush_tasks: usize,
/// Default write buffer size for a region.
pub region_write_buffer_size: ReadableSize,
/// Interval to schedule the auto flush picker.
pub picker_schedule_interval: Duration,
/// Interval to auto flush a region if it has not flushed yet.
pub auto_flush_interval: Duration,
}
impl Default for EngineConfig {
@@ -37,6 +54,12 @@ impl Default for EngineConfig {
max_files_in_l0: 8,
max_purge_tasks: 32,
sst_write_buffer_size: ReadableSize::mb(8),
max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS,
region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE,
picker_schedule_interval: Duration::from_millis(
DEFAULT_PICKER_SCHEDULE_INTERVAL.into(),
),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
}
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::{Arc, RwLock};
use std::time::Duration;
use async_trait::async_trait;
use common_telemetry::logging::debug;
use common_telemetry::logging::{self, debug};
use object_store::{util, ObjectStore};
use snafu::ResultExt;
use store_api::logstore::LogStore;
@@ -30,12 +30,14 @@ use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::error::{self, Error, Result};
use crate::file_purger::{FilePurgeHandler, FilePurgerRef};
use crate::flush::{FlushScheduler, FlushSchedulerRef, FlushStrategyRef, SizeBasedStrategy};
use crate::flush::{
FlushScheduler, FlushSchedulerRef, FlushStrategyRef, PickerConfig, SizeBasedStrategy,
};
use crate::manifest::region::RegionManifest;
use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef};
use crate::metadata::RegionMetadata;
use crate::region::{RegionImpl, StoreConfig};
use crate::scheduler::{LocalScheduler, SchedulerConfig};
use crate::scheduler::{LocalScheduler, Scheduler, SchedulerConfig};
use crate::sst::FsAccessLayer;
/// [StorageEngine] implementation.
@@ -85,6 +87,16 @@ impl<S: LogStore> StorageEngine for EngineImpl<S> {
fn get_region(&self, _ctx: &EngineContext, name: &str) -> Result<Option<Self::Region>> {
Ok(self.inner.get_region(name))
}
async fn close(&self, _ctx: &EngineContext) -> Result<()> {
logging::info!("Stopping storage engine");
self.inner.close().await?;
logging::info!("Storage engine stopped");
Ok(())
}
}
impl<S: LogStore> EngineImpl<S> {
@@ -93,15 +105,15 @@ impl<S: LogStore> EngineImpl<S> {
log_store: Arc<S>,
object_store: ObjectStore,
compaction_scheduler: CompactionSchedulerRef<S>,
) -> Self {
Self {
) -> Result<Self> {
Ok(Self {
inner: Arc::new(EngineInner::new(
config,
log_store,
object_store,
compaction_scheduler,
)),
}
)?),
})
}
}
@@ -124,7 +136,7 @@ pub fn region_manifest_dir(parent_dir: &str, region_name: &str) -> String {
/// Also used as a placeholder in the region map when the region isn't ready, e.g. during
/// creating/opening.
#[derive(Debug)]
enum RegionSlot<S: LogStore> {
pub(crate) enum RegionSlot<S: LogStore> {
/// The region is during creation.
Creating,
/// The region is during opening.
@@ -208,17 +220,21 @@ impl<'a, S: LogStore> Drop for SlotGuard<'a, S> {
}
/// Region slot map.
struct RegionMap<S: LogStore>(RwLock<HashMap<String, RegionSlot<S>>>);
pub struct RegionMap<S: LogStore>(RwLock<HashMap<String, RegionSlot<S>>>);
impl<S: LogStore> RegionMap<S> {
/// Returns a new region map.
fn new() -> RegionMap<S> {
pub fn new() -> RegionMap<S> {
RegionMap(RwLock::new(HashMap::new()))
}
/// Returns the `Some(slot)` if there is existing slot with given `name`, or insert
/// given `slot` and returns `None`.
fn get_or_occupy_slot(&self, name: &str, slot: RegionSlot<S>) -> Option<RegionSlot<S>> {
pub(crate) fn get_or_occupy_slot(
&self,
name: &str,
slot: RegionSlot<S>,
) -> Option<RegionSlot<S>> {
{
// Try to get the region under read lock.
let regions = self.0.read().unwrap();
@@ -258,6 +274,26 @@ impl<S: LogStore> RegionMap<S> {
let mut regions = self.0.write().unwrap();
regions.remove(name);
}
/// Collects regions.
pub(crate) fn list_regions(&self) -> Vec<RegionImpl<S>> {
let regions = self.0.read().unwrap();
regions
.values()
.filter_map(|slot| slot.get_ready_region())
.collect()
}
/// Clear the region map.
fn clear(&self) {
self.0.write().unwrap().clear();
}
}
impl<S: LogStore> Default for RegionMap<S> {
fn default() -> Self {
Self::new()
}
}
struct EngineInner<S: LogStore> {
@@ -278,12 +314,19 @@ impl<S: LogStore> EngineInner<S> {
log_store: Arc<S>,
object_store: ObjectStore,
compaction_scheduler: CompactionSchedulerRef<S>,
) -> Self {
// TODO(yingwen): max inflight flush tasks.
) -> Result<Self> {
let regions = Arc::new(RegionMap::new());
let flush_scheduler = Arc::new(FlushScheduler::new(
SchedulerConfig::default(),
SchedulerConfig {
max_inflight_tasks: config.max_flush_tasks,
},
compaction_scheduler.clone(),
));
regions.clone(),
PickerConfig {
schedule_interval: config.picker_schedule_interval,
auto_flush_interval: config.auto_flush_interval,
},
)?);
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig {
@@ -291,17 +334,19 @@ impl<S: LogStore> EngineInner<S> {
},
FilePurgeHandler,
));
Self {
Ok(Self {
object_store,
log_store,
regions: Arc::new(RegionMap::new()),
regions,
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
flush_strategy: Arc::new(SizeBasedStrategy::new(
config.region_write_buffer_size.as_bytes() as usize,
)),
compaction_scheduler,
file_purger,
config: Arc::new(config),
}
})
}
async fn open_region(&self, name: &str, opts: &OpenOptions) -> Result<Option<RegionImpl<S>>> {
@@ -427,6 +472,22 @@ impl<S: LogStore> EngineInner<S> {
compaction_time_window,
})
}
async fn close(&self) -> Result<()> {
let regions = self.regions.list_regions();
for region in regions {
// Tolerate failure during closing regions.
if let Err(e) = region.close().await {
logging::error!(e; "Failed to close region {}", region.id());
}
}
// Clear regions to release references to regions in the region map.
self.regions.clear();
self.compaction_scheduler.stop(true).await?;
self.flush_scheduler.stop().await?;
self.file_purger.stop(true).await
}
}
#[cfg(test)]
@@ -462,7 +523,8 @@ mod tests {
Arc::new(log_store),
object_store,
compaction_scheduler,
);
)
.unwrap();
let region_name = "region-0";
let desc = RegionDescBuilder::new(region_name)

View File

@@ -487,6 +487,18 @@ pub enum Error {
sequence: SequenceNumber,
location: Location,
},
#[snafu(display("Failed to start picking task for flush: {}", source))]
StartPickTask {
#[snafu(backtrace)]
source: RuntimeError,
},
#[snafu(display("Failed to stop picking task for flush: {}", source))]
StopPickTask {
#[snafu(backtrace)]
source: RuntimeError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -579,7 +591,9 @@ impl ErrorExt for Error {
StartManifestGcTask { .. }
| StopManifestGcTask { .. }
| IllegalSchedulerState { .. }
| DuplicateFlush { .. } => StatusCode::Unexpected,
| DuplicateFlush { .. }
| StartPickTask { .. }
| StopPickTask { .. } => StatusCode::Unexpected,
TtlCalculation { source, .. } => source.status_code(),
}

View File

@@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod picker;
mod scheduler;
use std::sync::Arc;
use common_telemetry::{logging, timer};
pub use picker::{FlushPicker, PickerConfig};
pub use scheduler::{FlushHandle, FlushRequest, FlushScheduler, FlushSchedulerRef};
use store_api::logstore::LogStore;
use store_api::storage::consts::WRITE_ROW_GROUP_SIZE;
use store_api::storage::SequenceNumber;
use crate::config::EngineConfig;
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
use crate::error::Result;
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
@@ -32,9 +34,6 @@ use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
/// Default write buffer size (32M).
const DEFAULT_WRITE_BUFFER_SIZE: usize = 32 * 1024 * 1024;
pub trait FlushStrategy: Send + Sync + std::fmt::Debug {
fn should_flush(
&self,
@@ -72,7 +71,7 @@ fn get_mutable_limitation(max_write_buffer_size: usize) -> usize {
impl Default for SizeBasedStrategy {
fn default() -> Self {
let max_write_buffer_size = DEFAULT_WRITE_BUFFER_SIZE;
let max_write_buffer_size = DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize;
Self {
max_write_buffer_size,
mutable_limitation: get_mutable_limitation(max_write_buffer_size),

View File

@@ -0,0 +1,201 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use async_trait::async_trait;
use common_telemetry::logging;
use common_time::util;
use store_api::logstore::LogStore;
use store_api::storage::{FlushContext, FlushReason, Region};
use crate::config::{DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_PICKER_SCHEDULE_INTERVAL};
use crate::region::RegionImpl;
/// Config for [FlushPicker].
pub struct PickerConfig {
/// Interval to schedule the picker.
pub schedule_interval: Duration,
/// Interval to auto flush a region if it has not flushed yet.
pub auto_flush_interval: Duration,
}
impl PickerConfig {
/// Returns the auto flush interval in millis or a default value
/// if overflow occurs.
fn auto_flush_interval_millis(&self) -> i64 {
self.auto_flush_interval
.as_millis()
.try_into()
.unwrap_or(DEFAULT_AUTO_FLUSH_INTERVAL.into())
}
}
impl Default for PickerConfig {
fn default() -> Self {
PickerConfig {
schedule_interval: Duration::from_millis(DEFAULT_PICKER_SCHEDULE_INTERVAL.into()),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
}
}
}
/// Flush task picker.
pub struct FlushPicker {
/// Interval to flush a region automatically.
auto_flush_interval_millis: i64,
}
impl FlushPicker {
/// Returns a new FlushPicker.
pub fn new(config: PickerConfig) -> FlushPicker {
FlushPicker {
auto_flush_interval_millis: config.auto_flush_interval_millis(),
}
}
/// Pick regions and flush them by interval.
///
/// Returns the number of flushed regions.
pub async fn pick_by_interval<T: FlushItem>(&self, regions: &[T]) -> usize {
let now = util::current_time_millis();
// Flush regions by interval.
if let Some(earliest_flush_millis) = now.checked_sub(self.auto_flush_interval_millis) {
flush_regions_by_interval(regions, earliest_flush_millis).await
} else {
0
}
}
}
/// Item for picker to flush.
#[async_trait]
pub trait FlushItem {
/// Id of the item.
fn item_id(&self) -> u64;
/// Last flush time in millis.
fn last_flush_time(&self) -> i64;
/// Requests the item to schedule a flush for specific `reason`.
///
/// The flush job itself should run in background.
async fn request_flush(&self, reason: FlushReason);
}
#[async_trait]
impl<S: LogStore> FlushItem for RegionImpl<S> {
fn item_id(&self) -> u64 {
self.id()
}
fn last_flush_time(&self) -> i64 {
self.last_flush_millis()
}
async fn request_flush(&self, reason: FlushReason) {
let ctx = FlushContext {
wait: false,
reason,
};
if let Err(e) = self.flush(&ctx).await {
logging::error!(e; "Failed to flush region {}", self.id());
}
}
}
/// Auto flush regions based on last flush time.
///
/// Returns the number of flushed regions.
async fn flush_regions_by_interval<T: FlushItem>(
regions: &[T],
earliest_flush_millis: i64,
) -> usize {
let mut flushed = 0;
for region in regions {
if region.last_flush_time() < earliest_flush_millis {
logging::debug!(
"Auto flush region {} due to last flush time ({} < {})",
region.item_id(),
region.last_flush_time(),
earliest_flush_millis,
);
flushed += 1;
region.request_flush(FlushReason::Periodically).await;
}
}
flushed
}
#[cfg(test)]
mod tests {
use std::sync::Mutex;
use super::*;
struct MockItem {
id: u64,
last_flush_time: i64,
flush_reason: Mutex<Option<FlushReason>>,
}
impl MockItem {
fn new(id: u64, last_flush_time: i64) -> MockItem {
MockItem {
id,
last_flush_time,
flush_reason: Mutex::new(None),
}
}
fn flush_reason(&self) -> Option<FlushReason> {
*self.flush_reason.lock().unwrap()
}
}
#[async_trait]
impl FlushItem for MockItem {
fn item_id(&self) -> u64 {
self.id
}
fn last_flush_time(&self) -> i64 {
self.last_flush_time
}
async fn request_flush(&self, reason: FlushReason) {
let mut flush_reason = self.flush_reason.lock().unwrap();
*flush_reason = Some(reason);
}
}
#[tokio::test]
async fn test_pick_by_interval() {
let regions = [
MockItem::new(0, util::current_time_millis()),
MockItem::new(1, util::current_time_millis() - 60 * 1000),
];
let picker = FlushPicker::new(PickerConfig {
// schedule_interval is unused in this test.
schedule_interval: Duration::from_millis(10),
auto_flush_interval: Duration::from_millis(30 * 1000),
});
let flushed = picker.pick_by_interval(&regions).await;
assert_eq!(1, flushed);
assert!(regions[0].flush_reason().is_none());
assert_eq!(Some(FlushReason::Periodically), regions[1].flush_reason());
}
}

View File

@@ -15,6 +15,8 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::logging;
use metrics::increment_counter;
use snafu::{ensure, ResultExt};
@@ -25,11 +27,14 @@ use tokio::sync::{oneshot, Notify};
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::error::{DuplicateFlushSnafu, Result, WaitFlushSnafu};
use crate::flush::FlushJob;
use crate::engine::RegionMap;
use crate::error::{
DuplicateFlushSnafu, Error, Result, StartPickTaskSnafu, StopPickTaskSnafu, WaitFlushSnafu,
};
use crate::flush::{FlushJob, FlushPicker, PickerConfig};
use crate::manifest::region::RegionManifest;
use crate::memtable::{MemtableId, MemtableRef};
use crate::metrics::{FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::metrics::FLUSH_ERRORS_TOTAL;
use crate::region;
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::scheduler::rate_limit::BoxedRateLimitToken;
@@ -145,20 +150,37 @@ impl FlushHandle {
/// Flush scheduler.
pub struct FlushScheduler<S: LogStore> {
/// Flush task scheduler.
scheduler: LocalScheduler<FlushRequest<S>>,
/// Auto flush task.
auto_flush_task: RepeatedTask<Error>,
}
pub type FlushSchedulerRef<S> = Arc<FlushScheduler<S>>;
impl<S: LogStore> FlushScheduler<S> {
/// Returns a new [FlushScheduler].
pub fn new(config: SchedulerConfig, compaction_scheduler: CompactionSchedulerRef<S>) -> Self {
pub fn new(
config: SchedulerConfig,
compaction_scheduler: CompactionSchedulerRef<S>,
regions: Arc<RegionMap<S>>,
picker_config: PickerConfig,
) -> Result<Self> {
let handler = FlushHandler {
compaction_scheduler,
};
Self {
let task_interval = picker_config.schedule_interval;
let picker = FlushPicker::new(picker_config);
let task_fn = AutoFlushFunction { regions, picker };
let auto_flush_task = RepeatedTask::new(task_interval, Box::new(task_fn));
auto_flush_task
.start(common_runtime::bg_runtime())
.context(StartPickTaskSnafu)?;
Ok(Self {
scheduler: LocalScheduler::new(config, handler),
}
auto_flush_task,
})
}
/// Schedules a flush request and return the handle to the flush task.
@@ -183,13 +205,22 @@ impl<S: LogStore> FlushScheduler<S> {
}
);
increment_counter!(FLUSH_REQUESTS_TOTAL);
Ok(FlushHandle {
region_id,
receiver,
})
}
/// Stop the scheduler.
pub async fn stop(&self) -> Result<()> {
self.auto_flush_task
.stop()
.await
.context(StopPickTaskSnafu)?;
self.scheduler.stop(true).await?;
Ok(())
}
}
struct FlushHandler<S: LogStore> {
@@ -235,6 +266,9 @@ async fn execute_flush<S: LogStore>(
} else {
logging::debug!("Successfully flush region: {}", req.region_id());
// Update last flush time.
req.shared.update_flush_millis();
let compaction_request = CompactionRequestImpl::from(&req);
let max_files_in_l0 = req.engine_config.max_files_in_l0;
let shared_data = req.shared.clone();
@@ -247,6 +281,29 @@ async fn execute_flush<S: LogStore>(
max_files_in_l0,
);
// Complete the request.
req.complete(Ok(()));
}
}
/// Task function to pick regions to flush.
struct AutoFlushFunction<S: LogStore> {
/// Regions of the engine.
regions: Arc<RegionMap<S>>,
picker: FlushPicker,
}
#[async_trait]
impl<S: LogStore> TaskFunction<Error> for AutoFlushFunction<S> {
async fn call(&mut self) -> Result<()> {
// Get all regions.
let regions = self.regions.list_regions();
self.picker.pick_by_interval(&regions).await;
Ok(())
}
fn name(&self) -> &str {
"FlushPicker-pick-task"
}
}

View File

@@ -22,6 +22,8 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total";
pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total";
/// Elapsed time of a flush job.
pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// Gauge for open regions
pub const REGION_COUNT: &str = "storage.region_count";
/// Timer for logstore write

View File

@@ -18,11 +18,13 @@ mod writer;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_telemetry::logging;
use common_time::util;
use metrics::{decrement_gauge, increment_gauge};
use snafu::ResultExt;
use store_api::logstore::LogStore;
@@ -235,6 +237,7 @@ impl<S: LogStore> RegionImpl<S> {
id,
name,
version_control: Arc::new(version_control),
last_flush_millis: AtomicI64::new(0),
}),
writer: Arc::new(RegionWriter::new(
store_config.memtable_builder,
@@ -315,6 +318,7 @@ impl<S: LogStore> RegionImpl<S> {
id: metadata.id(),
name,
version_control,
last_flush_millis: AtomicI64::new(0),
});
let compaction_time_window = store_config
.compaction_time_window
@@ -366,6 +370,11 @@ impl<S: LogStore> RegionImpl<S> {
self.inner.shared.id()
}
/// Returns last flush timestamp in millis.
pub fn last_flush_millis(&self) -> i64 {
self.inner.shared.last_flush_millis()
}
fn create_version_with_checkpoint(
checkpoint: RegionCheckpoint,
memtable_builder: &MemtableBuilderRef,
@@ -558,6 +567,9 @@ pub struct SharedData {
name: String,
// TODO(yingwen): Maybe no need to use Arc for version control.
pub version_control: VersionControlRef,
/// Last flush time in millis.
last_flush_millis: AtomicI64,
}
impl SharedData {
@@ -570,6 +582,17 @@ impl SharedData {
pub fn name(&self) -> &str {
&self.name
}
/// Update flush time to current time.
pub(crate) fn update_flush_millis(&self) {
let now = util::current_time_millis();
self.last_flush_millis.store(now, Ordering::Relaxed);
}
/// Returns last flush timestamp in millis.
fn last_flush_millis(&self) -> i64 {
self.last_flush_millis.load(Ordering::Relaxed)
}
}
pub type SharedDataRef = Arc<SharedData>;

View File

@@ -22,8 +22,8 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, Chunk, ChunkReader, ColumnDescriptor,
ColumnDescriptorBuilder, ColumnId, FlushContext, Region, RegionMeta, ScanRequest, SchemaRef,
Snapshot, WriteRequest, WriteResponse,
ColumnDescriptorBuilder, ColumnId, FlushContext, FlushReason, Region, RegionMeta, ScanRequest,
SchemaRef, Snapshot, WriteRequest, WriteResponse,
};
use crate::region::tests::{self, FileTesterBase};
@@ -118,7 +118,12 @@ impl AlterTester {
}
async fn flush(&self, wait: Option<bool>) {
let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
let ctx = wait
.map(|wait| FlushContext {
wait,
reason: FlushReason::Manually,
})
.unwrap_or_default();
self.base().region.flush(&ctx).await.unwrap();
}

View File

@@ -23,7 +23,7 @@ use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use store_api::storage::{FlushContext, Region, WriteResponse};
use store_api::storage::{FlushContext, FlushReason, Region, WriteResponse};
use tokio::sync::Notify;
use crate::compaction::{CompactionHandler, SimplePicker};
@@ -191,7 +191,12 @@ impl CompactionTester {
}
async fn flush(&self, wait: Option<bool>) {
let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
let ctx = wait
.map(|wait| FlushContext {
wait,
reason: FlushReason::Manually,
})
.unwrap_or_default();
self.base().region.flush(&ctx).await.unwrap();
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{FlushContext, OpenOptions, Region, WriteResponse};
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
use crate::engine;
use crate::flush::FlushStrategyRef;
@@ -91,7 +91,12 @@ impl FlushTester {
}
async fn flush(&self, wait: Option<bool>) {
let ctx = wait.map(|wait| FlushContext { wait }).unwrap_or_default();
let ctx = wait
.map(|wait| FlushContext {
wait,
reason: FlushReason::Manually,
})
.unwrap_or_default();
self.base().region.flush(&ctx).await.unwrap();
}
}

View File

@@ -18,10 +18,13 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::logging;
use futures::TryStreamExt;
use metrics::increment_counter;
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
use store_api::manifest::{Manifest, ManifestVersion, MetaAction};
use store_api::storage::{AlterRequest, FlushContext, SequenceNumber, WriteContext, WriteResponse};
use store_api::storage::{
AlterRequest, FlushContext, FlushReason, SequenceNumber, WriteContext, WriteResponse,
};
use tokio::sync::{oneshot, Mutex};
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
@@ -33,6 +36,7 @@ use crate::manifest::action::{
};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL};
use crate::proto::wal::WalHeader;
use crate::region::{
CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
@@ -281,7 +285,7 @@ impl RegionWriter {
ensure!(!inner.is_closed(), error::ClosedRegionSnafu);
inner.manual_flush(writer_ctx).await?;
inner.manual_flush(writer_ctx, ctx.reason).await?;
if ctx.wait {
if let Some(handle) = inner.flush_handle.take() {
@@ -587,7 +591,8 @@ impl WriterInner {
version_control,
writer_ctx.flush_strategy,
) {
self.trigger_flush(writer_ctx).await?;
self.trigger_flush(writer_ctx, FlushReason::MemtableFull)
.await?;
}
Ok(())
@@ -612,12 +617,18 @@ impl WriterInner {
flush_strategy.should_flush(shared, mutable_bytes_allocated, total_bytes_allocated)
}
async fn trigger_flush<S: LogStore>(&mut self, ctx: &WriterContext<'_, S>) -> Result<()> {
async fn trigger_flush<S: LogStore>(
&mut self,
ctx: &WriterContext<'_, S>,
reason: FlushReason,
) -> Result<()> {
let version_control = &ctx.shared.version_control;
let new_mutable = self.alloc_memtable(version_control);
// Freeze all mutable memtables so we can flush them later.
version_control.freeze_mutable(new_mutable);
increment_counter!(FLUSH_REQUESTS_TOTAL, FLUSH_REASON => reason.as_str());
if let Some(flush_handle) = self.flush_handle.take() {
// Previous flush job is incomplete, wait util it is finished.
// However the last flush job may fail, in which case, we just return error
@@ -719,8 +730,12 @@ impl WriterInner {
Ok(())
}
async fn manual_flush<S: LogStore>(&mut self, writer_ctx: WriterContext<'_, S>) -> Result<()> {
self.trigger_flush(&writer_ctx).await?;
async fn manual_flush<S: LogStore>(
&mut self,
writer_ctx: WriterContext<'_, S>,
reason: FlushReason,
) -> Result<()> {
self.trigger_flush(&writer_ctx, reason).await?;
Ok(())
}

View File

@@ -124,6 +124,9 @@ where
self.state.store(STATE_STOP, Ordering::Relaxed);
self.cancel_token.cancel();
// Clear all requests
self.request_queue.write().unwrap().clear();
}
}

View File

@@ -76,6 +76,12 @@ impl<K: Eq + Hash + Clone, V> DedupDeque<K, V> {
pub fn is_empty(&self) -> bool {
self.deque.is_empty()
}
#[inline]
pub fn clear(&mut self) {
self.deque.clear();
self.existing.clear();
}
}
impl<K, V> Debug for DedupDeque<K, V>
@@ -111,5 +117,8 @@ mod tests {
assert!(deque.push_back(1, "hello".to_string()));
assert!(!deque.push_back(1, "world".to_string()));
assert_eq!((1, "hello".to_string()), deque.pop_front().unwrap());
deque.clear();
assert!(deque.is_empty());
}
}

View File

@@ -21,9 +21,9 @@ use object_store::ObjectStore;
use store_api::manifest::Manifest;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::engine;
use crate::engine::{self, RegionMap};
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::flush::{FlushScheduler, SizeBasedStrategy};
use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy};
use crate::manifest::region::RegionManifest;
use crate::memtable::DefaultMemtableBuilder;
use crate::region::StoreConfig;
@@ -65,10 +65,16 @@ pub async fn new_store_config_with_object_store(
};
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let flush_scheduler = Arc::new(FlushScheduler::new(
SchedulerConfig::default(),
compaction_scheduler.clone(),
));
// We use an empty region map so actually the background worker of the picker is disabled.
let flush_scheduler = Arc::new(
FlushScheduler::new(
SchedulerConfig::default(),
compaction_scheduler.clone(),
Arc::new(RegionMap::new()),
PickerConfig::default(),
)
.unwrap(),
);
let file_purger = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
NoopFilePurgeHandler,

View File

@@ -17,6 +17,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use crate::flush::FlushStrategy;
use crate::region::SharedDataRef;
/// Controls whether to flush a region while writing the region.
/// Disable flush by default.
#[derive(Debug, Default)]
pub struct FlushSwitch {
should_flush: AtomicBool,

View File

@@ -34,7 +34,7 @@ pub use self::chunk::{Chunk, ChunkReader};
pub use self::descriptors::*;
pub use self::engine::{CreateOptions, EngineContext, OpenOptions, StorageEngine};
pub use self::metadata::RegionMeta;
pub use self::region::{FlushContext, Region, RegionStat, WriteContext};
pub use self::region::{FlushContext, FlushReason, Region, RegionStat, WriteContext};
pub use self::requests::{
AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest,
};

View File

@@ -73,6 +73,9 @@ pub trait StorageEngine: Send + Sync + Clone + 'static {
ctx: &EngineContext,
name: &str,
) -> Result<Option<Self::Region>, Self::Error>;
/// Close the engine.
async fn close(&self, ctx: &EngineContext) -> Result<(), Self::Error>;
}
/// Storage engine context.

View File

@@ -110,10 +110,40 @@ pub struct FlushContext {
/// If true, the flush will wait until the flush is done.
/// Default: true
pub wait: bool,
/// Flush reason.
pub reason: FlushReason,
}
impl Default for FlushContext {
fn default() -> FlushContext {
FlushContext { wait: true }
FlushContext {
wait: true,
reason: FlushReason::Others,
}
}
}
/// Reason of flush operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushReason {
/// Other reasons.
Others,
/// Memtable is full.
MemtableFull,
/// Flush manually.
Manually,
/// Auto flush periodically.
Periodically,
}
impl FlushReason {
/// Returns reason as `str`.
pub fn as_str(&self) -> &'static str {
match self {
FlushReason::Others => "others",
FlushReason::MemtableFull => "memtable_full",
FlushReason::Manually => "manually",
FlushReason::Periodically => "periodically",
}
}
}

View File

@@ -57,7 +57,8 @@ impl TestEnv {
Arc::new(NoopLogStore::default()),
object_store.clone(),
compaction_scheduler,
);
)
.unwrap();
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),
storage_engine,