feat: Trigger flush based on global write buffer size (#1585)

* feat(storage): Add AllocTracker

* feat(storage): flush request wip

* feat(storage): support global write buffer size

* fix(storage): Test and fix size based strategy

* test(storage): Test AllocTracker

* test(storage): Test pick_by_write_buffer_full

* docs: Add flush config example

* test(storage): Test schedule_engine_flush

* feat(storage): Add metrics for write buffer size

* chore(flush): Add log when triggering flush by global buffer

* chore(storage): track allocation in update_stats
This commit is contained in:
Yingwen
2023-05-22 19:00:30 +08:00
committed by GitHub
parent 067c5ee7ce
commit 6dbceb1ad5
20 changed files with 813 additions and 213 deletions

View File

@@ -58,8 +58,12 @@ checkpoint_on_startup = false
max_flush_tasks = 8
# Default write buffer size for a region.
region_write_buffer_size = "32MB"
# Interval to check whether a region needs flush.
picker_schedule_interval = "5m"
# Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
# Global write buffer size for all regions.
global_write_buffer_size = "1GB"
# Procedure storage options, see `standalone.example.toml`.
[procedure]

View File

@@ -123,8 +123,12 @@ checkpoint_on_startup = false
max_flush_tasks = 8
# Default write buffer size for a region.
region_write_buffer_size = "32MB"
# Interval to check whether a region needs flush.
picker_schedule_interval = "5m"
# Interval to auto flush a region if it has not flushed yet.
auto_flush_interval = "1h"
# Global write buffer size for all regions.
global_write_buffer_size = "1GB"
# Procedure storage options.
[procedure]

View File

@@ -223,6 +223,8 @@ pub struct FlushConfig {
/// Interval to auto flush a region if it has not flushed yet.
#[serde(with = "humantime_serde")]
pub auto_flush_interval: Duration,
/// Global write buffer size for all regions.
pub global_write_buffer_size: Option<ReadableSize>,
}
impl Default for FlushConfig {
@@ -234,6 +236,7 @@ impl Default for FlushConfig {
DEFAULT_PICKER_SCHEDULE_INTERVAL.into(),
),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
global_write_buffer_size: None,
}
}
}
@@ -260,6 +263,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
region_write_buffer_size: value.storage.flush.region_write_buffer_size,
picker_schedule_interval: value.storage.flush.picker_schedule_interval,
auto_flush_interval: value.storage.flush.auto_flush_interval,
global_write_buffer_size: value.storage.flush.global_write_buffer_size,
}
}
}

View File

@@ -44,6 +44,8 @@ pub struct EngineConfig {
pub picker_schedule_interval: Duration,
/// Interval to auto flush a region if it has not flushed yet.
pub auto_flush_interval: Duration,
/// Limit for global write buffer size. Disabled by default.
pub global_write_buffer_size: Option<ReadableSize>,
}
impl Default for EngineConfig {
@@ -62,6 +64,7 @@ impl Default for EngineConfig {
DEFAULT_PICKER_SCHEDULE_INTERVAL.into(),
),
auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()),
global_write_buffer_size: None,
}
}
}

View File

@@ -27,7 +27,7 @@ use store_api::storage::{
};
use crate::compaction::CompactionSchedulerRef;
use crate::config::EngineConfig;
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
use crate::error::{self, Error, Result};
use crate::file_purger::{FilePurgeHandler, FilePurgerRef};
use crate::flush::{
@@ -288,7 +288,7 @@ impl<S: LogStore> RegionMap<S> {
}
/// Clear the region map.
fn clear(&self) {
pub(crate) fn clear(&self) {
self.0.write().unwrap().clear();
}
}
@@ -337,15 +337,25 @@ impl<S: LogStore> EngineInner<S> {
},
FilePurgeHandler,
));
let flush_strategy = Arc::new(SizeBasedStrategy::new(
config
.global_write_buffer_size
.map(|size| size.as_bytes() as usize),
));
let memtable_builder = if config.global_write_buffer_size.is_some() {
// If global write buffer size is provided, we set the flush strategy
// to the memtable to track global memtable usage.
DefaultMemtableBuilder::with_flush_strategy(Some(flush_strategy.clone()))
} else {
DefaultMemtableBuilder::default()
};
Ok(Self {
object_store,
log_store,
regions,
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
memtable_builder: Arc::new(memtable_builder),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::new(
config.region_write_buffer_size.as_bytes() as usize,
)),
flush_strategy,
compaction_scheduler,
file_purger,
config: Arc::new(config),
@@ -461,10 +471,7 @@ impl<S: LogStore> EngineInner<S> {
config.manifest_gc_duration,
);
manifest.start().await?;
let flush_strategy = write_buffer_size
.map(|size| Arc::new(SizeBasedStrategy::new(size)) as Arc<_>)
.unwrap_or_else(|| self.flush_strategy.clone());
let flush_strategy = self.flush_strategy.clone();
Ok(StoreConfig {
log_store: self.log_store.clone(),
@@ -478,6 +485,8 @@ impl<S: LogStore> EngineInner<S> {
file_purger: self.file_purger.clone(),
ttl,
compaction_time_window,
write_buffer_size: write_buffer_size
.unwrap_or(DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize),
})
}

View File

@@ -15,16 +15,19 @@
mod picker;
mod scheduler;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common_telemetry::{logging, timer};
pub use picker::{FlushPicker, PickerConfig};
pub use scheduler::{FlushHandle, FlushRequest, FlushScheduler, FlushSchedulerRef};
pub use scheduler::{
FlushHandle, FlushRegionRequest, FlushRequest, FlushScheduler, FlushSchedulerRef,
};
use store_api::logstore::LogStore;
use store_api::storage::consts::WRITE_ROW_GROUP_SIZE;
use store_api::storage::SequenceNumber;
use store_api::storage::{RegionId, SequenceNumber};
use crate::config::{EngineConfig, DEFAULT_REGION_WRITE_BUFFER_SIZE};
use crate::config::EngineConfig;
use crate::error::Result;
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
@@ -34,95 +37,177 @@ use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
/// Current flush-related status of a region.
#[derive(Debug, Clone, Copy)]
pub struct RegionStatus {
/// Id of the region this status belongs to.
pub region_id: RegionId,
/// Size of the mutable memtable.
pub bytes_mutable: usize,
/// Write buffer size of the region.
pub write_buffer_size: usize,
}
/// Type of flush request to send.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushType {
/// Flush current region.
Region,
/// Engine level flush. Find regions to flush globally.
Engine,
}
/// Strategy to control whether to flush a region before writing to the region.
pub trait FlushStrategy: Send + Sync + std::fmt::Debug {
fn should_flush(
&self,
shared: &SharedDataRef,
bytes_mutable: usize,
bytes_total: usize,
) -> bool;
/// Returns whether to trigger a flush operation.
fn should_flush(&self, status: RegionStatus) -> Option<FlushType>;
/// Reserves `mem` bytes.
fn reserve_mem(&self, mem: usize);
/// Tells the strategy we are freeing `mem` bytes.
///
/// We are in the process of freeing `mem` bytes, so it is not considered
/// when checking the soft limit.
fn schedule_free_mem(&self, mem: usize);
/// We have freed `mem` bytes.
fn free_mem(&self, mem: usize);
}
pub type FlushStrategyRef = Arc<dyn FlushStrategy>;
/// Flush strategy based on memory usage.
#[derive(Debug)]
pub struct SizeBasedStrategy {
/// Write buffer size of memtable.
max_write_buffer_size: usize,
/// Mutable memtable memory size limitation
/// Write buffer size for all memtables.
global_write_buffer_size: Option<usize>,
/// Mutable memtable memory size limitation, only valid when `global_write_buffer_size`
/// is `Some`.
mutable_limitation: usize,
/// Memory in used (e.g. used by mutable and immutable memtables).
memory_used: AtomicUsize,
/// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
memory_active: AtomicUsize,
}
impl SizeBasedStrategy {
pub fn new(max_write_buffer_size: usize) -> Self {
/// Returns a new [SizeBasedStrategy] with specific `global_write_buffer_size`.
pub fn new(global_write_buffer_size: Option<usize>) -> Self {
Self {
max_write_buffer_size,
mutable_limitation: get_mutable_limitation(max_write_buffer_size),
global_write_buffer_size,
mutable_limitation: get_mutable_limitation(global_write_buffer_size),
memory_used: AtomicUsize::new(0),
memory_active: AtomicUsize::new(0),
}
}
/// Returns whether to trigger an engine level flush.
///
/// Inspired by RocksDB's WriteBufferManager.
/// https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
fn should_flush_engine(&self) -> bool {
// We only check global limit when it is Some.
let Some(global_write_buffer_size) = self.global_write_buffer_size else {
return false;
};
let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
if mutable_memtable_memory_usage > self.mutable_limitation {
logging::info!(
"Engine should flush (over mutable limit), mutable_usage: {}, mutable_limitation: {}.",
mutable_memtable_memory_usage,
self.mutable_limitation,
);
return true;
}
let memory_usage = self.memory_used.load(Ordering::Relaxed);
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
if memory_usage >= global_write_buffer_size
&& mutable_memtable_memory_usage >= global_write_buffer_size / 2
{
logging::info!(
"Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
mutable_usage: {}.",
memory_usage,
global_write_buffer_size,
mutable_memtable_memory_usage,
);
return true;
}
false
}
/// Returns true if the global memory limitation is enabled.
#[inline]
fn is_global_limit_enabled(&self) -> bool {
self.global_write_buffer_size.is_some()
}
}
#[inline]
fn get_mutable_limitation(max_write_buffer_size: usize) -> usize {
// Inspired by RocksDB
fn get_mutable_limitation(global_write_buffer_size: Option<usize>) -> usize {
// Inspired by RocksDB.
// https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L86
max_write_buffer_size * 7 / 8
global_write_buffer_size
.map(|size| size * 7 / 8)
.unwrap_or(0)
}
impl Default for SizeBasedStrategy {
fn default() -> Self {
let max_write_buffer_size = DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize;
Self {
max_write_buffer_size,
mutable_limitation: get_mutable_limitation(max_write_buffer_size),
global_write_buffer_size: None,
mutable_limitation: 0,
memory_used: AtomicUsize::new(0),
memory_active: AtomicUsize::new(0),
}
}
}
impl FlushStrategy for SizeBasedStrategy {
fn should_flush(
&self,
shared: &SharedDataRef,
bytes_mutable: usize,
bytes_total: usize,
) -> bool {
// Insipired by RocksDB flush strategy
// https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h#L94
if bytes_mutable > self.mutable_limitation {
logging::info!(
"Region should flush, region: {}, bytes_mutable: {}, mutable_limitation: {}, \
bytes_total: {}, max_write_buffer_size: {} .",
shared.name(),
bytes_mutable,
self.mutable_limitation,
bytes_total,
self.max_write_buffer_size
fn should_flush(&self, status: RegionStatus) -> Option<FlushType> {
if status.bytes_mutable >= status.write_buffer_size {
// If the mutable memtable is full, we should freeze it and flush it.
logging::debug!(
"Region should flush as mutable memtable is full, region: {}, bytes_mutable: {}, \
write_buffer_size: {}.",
status.region_id,
status.bytes_mutable,
status.write_buffer_size,
);
return true;
return Some(FlushType::Region);
}
let buffer_size = self.max_write_buffer_size;
// If the memory exceeds the buffer size, we trigger more aggressive
// flush. But if already more than half memory is being flushed,
// triggering more flush may not help. We will hold it instead.
let should_flush = bytes_total >= buffer_size && bytes_mutable >= buffer_size / 2;
if should_flush {
logging::info!(
"Region should flush, region: {}, bytes_mutable: {}, mutable_limitation: {}, \
bytes_total: {}, max_write_buffer_size: {} .",
shared.name(),
bytes_mutable,
self.mutable_limitation,
bytes_total,
buffer_size
);
if self.should_flush_engine() {
return Some(FlushType::Engine);
}
should_flush
None
}
fn reserve_mem(&self, mem: usize) {
if self.is_global_limit_enabled() {
self.memory_used.fetch_add(mem, Ordering::Relaxed);
self.memory_active.fetch_add(mem, Ordering::Relaxed);
}
}
fn schedule_free_mem(&self, mem: usize) {
if self.is_global_limit_enabled() {
self.memory_active.fetch_sub(mem, Ordering::Relaxed);
}
}
fn free_mem(&self, mem: usize) {
if self.is_global_limit_enabled() {
self.memory_used.fetch_sub(mem, Ordering::Relaxed);
}
}
}
@@ -236,11 +321,164 @@ impl<S: LogStore> FlushJob<S> {
#[cfg(test)]
mod tests {
use super::*;
use crate::memtable::AllocTracker;
#[test]
fn test_get_mutable_limitation() {
assert_eq!(7, get_mutable_limitation(8));
assert_eq!(8, get_mutable_limitation(10));
assert_eq!(56, get_mutable_limitation(64));
assert_eq!(7, get_mutable_limitation(Some(8)));
assert_eq!(8, get_mutable_limitation(Some(10)));
assert_eq!(56, get_mutable_limitation(Some(64)));
assert_eq!(0, get_mutable_limitation(None));
}
#[test]
fn test_strategy_global_disabled() {
let strategy = SizeBasedStrategy::new(None);
strategy.reserve_mem(1000);
assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed));
strategy.schedule_free_mem(1000);
assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed));
strategy.free_mem(1000);
assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed));
let status = RegionStatus {
region_id: 1,
bytes_mutable: 400,
write_buffer_size: 300,
};
assert_eq!(Some(FlushType::Region), strategy.should_flush(status));
let status = RegionStatus {
region_id: 1,
bytes_mutable: 100,
write_buffer_size: 300,
};
assert_eq!(None, strategy.should_flush(status));
}
#[test]
fn test_strategy_over_mutable_limit() {
let strategy = SizeBasedStrategy::new(Some(1000));
strategy.reserve_mem(500);
let status = RegionStatus {
region_id: 1,
bytes_mutable: 300,
write_buffer_size: 500,
};
assert_eq!(None, strategy.should_flush(status));
strategy.reserve_mem(400);
// Flush region.
let status = RegionStatus {
region_id: 1,
bytes_mutable: 400,
write_buffer_size: 300,
};
assert_eq!(Some(FlushType::Region), strategy.should_flush(status));
// More than mutable limitation, Flush global.
let status = RegionStatus {
region_id: 1,
bytes_mutable: 100,
write_buffer_size: 300,
};
assert_eq!(Some(FlushType::Engine), strategy.should_flush(status));
strategy.schedule_free_mem(500);
assert_eq!(None, strategy.should_flush(status));
assert_eq!(900, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(400, strategy.memory_active.load(Ordering::Relaxed));
strategy.free_mem(500);
assert_eq!(400, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(400, strategy.memory_active.load(Ordering::Relaxed));
}
#[test]
fn test_strategy_over_global() {
common_telemetry::init_default_ut_logging();
let strategy = SizeBasedStrategy::new(Some(1000));
strategy.reserve_mem(1100);
strategy.schedule_free_mem(200);
// More than global limit.
let status = RegionStatus {
region_id: 1,
bytes_mutable: 100,
write_buffer_size: 300,
};
assert_eq!(Some(FlushType::Engine), strategy.should_flush(status));
// More than global limit, but mutable not enough (< 500).
strategy.schedule_free_mem(450);
let status = RegionStatus {
region_id: 1,
bytes_mutable: 100,
write_buffer_size: 300,
};
assert_eq!(None, strategy.should_flush(status));
strategy.schedule_free_mem(100);
assert_eq!(None, strategy.should_flush(status));
// Now mutable is enough.
strategy.reserve_mem(150);
// We can flush again.
assert_eq!(Some(FlushType::Engine), strategy.should_flush(status));
strategy.reserve_mem(100);
assert_eq!(Some(FlushType::Engine), strategy.should_flush(status));
}
#[test]
fn test_alloc_tracker_without_strategy() {
let tracker = AllocTracker::new(None);
assert_eq!(0, tracker.bytes_allocated());
tracker.on_allocate(100);
assert_eq!(100, tracker.bytes_allocated());
tracker.on_allocate(200);
assert_eq!(300, tracker.bytes_allocated());
tracker.done_allocating();
assert_eq!(300, tracker.bytes_allocated());
}
#[test]
fn test_alloc_tracker_with_strategy() {
let strategy = Arc::new(SizeBasedStrategy::new(Some(1000)));
{
let tracker = AllocTracker::new(Some(strategy.clone() as FlushStrategyRef));
tracker.on_allocate(100);
assert_eq!(100, tracker.bytes_allocated());
assert_eq!(100, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(100, strategy.memory_active.load(Ordering::Relaxed));
for _ in 0..2 {
// Done allocating won't free the same memory multiple times.
tracker.done_allocating();
assert_eq!(100, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed));
}
}
assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed));
}
#[test]
fn test_alloc_tracker_without_done_allocating() {
let strategy = Arc::new(SizeBasedStrategy::new(Some(1000)));
{
let tracker = AllocTracker::new(Some(strategy.clone() as FlushStrategyRef));
tracker.on_allocate(100);
assert_eq!(100, tracker.bytes_allocated());
assert_eq!(100, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(100, strategy.memory_active.load(Ordering::Relaxed));
}
assert_eq!(0, strategy.memory_used.load(Ordering::Relaxed));
assert_eq!(0, strategy.memory_active.load(Ordering::Relaxed));
}
}

View File

@@ -52,6 +52,7 @@ impl Default for PickerConfig {
}
/// Flush task picker.
#[derive(Debug, Clone)]
pub struct FlushPicker {
/// Interval to flush a region automatically.
auto_flush_interval_millis: i64,
@@ -65,7 +66,7 @@ impl FlushPicker {
}
}
/// Pick regions and flush them by interval.
/// Picks regions and flushes them by interval.
///
/// Returns the number of flushed regions.
pub async fn pick_by_interval<T: FlushItem>(&self, regions: &[T]) -> usize {
@@ -77,6 +78,25 @@ impl FlushPicker {
0
}
}
/// Picks and flushes regions when the write buffer is full.
pub async fn pick_by_write_buffer_full<T: FlushItem>(&self, regions: &[T]) {
// In such case, we pick the oldest region to flush. If this is not enough,
// the next time the region writer will trigger the picker again. Then we
// can pick another region to flush. The total memory will go down eventually.
let target = regions
.iter()
.filter(|region| region.mutable_memtable_usage() > 0)
.min_by_key(|region| region.last_flush_time());
if let Some(region) = target {
logging::debug!(
"Request flush for region {} due to global buffer is full",
region.item_id()
);
region.request_flush(FlushReason::GlobalBufferFull).await;
}
}
}
/// Item for picker to flush.
@@ -88,6 +108,9 @@ pub trait FlushItem {
/// Last flush time in millis.
fn last_flush_time(&self) -> i64;
/// Mutable memtable usage.
fn mutable_memtable_usage(&self) -> usize;
/// Requests the item to schedule a flush for specific `reason`.
///
/// The flush job itself should run in background.
@@ -104,11 +127,18 @@ impl<S: LogStore> FlushItem for RegionImpl<S> {
self.last_flush_millis()
}
fn mutable_memtable_usage(&self) -> usize {
let current = self.version_control().current();
let memtables = current.memtables();
memtables.mutable_bytes_allocated()
}
async fn request_flush(&self, reason: FlushReason) {
let ctx = FlushContext {
wait: false,
reason,
};
if let Err(e) = self.flush(&ctx).await {
logging::error!(e; "Failed to flush region {}", self.id());
}
@@ -149,14 +179,16 @@ mod tests {
struct MockItem {
id: u64,
last_flush_time: i64,
usage: usize,
flush_reason: Mutex<Option<FlushReason>>,
}
impl MockItem {
fn new(id: u64, last_flush_time: i64) -> MockItem {
fn new(id: u64, last_flush_time: i64, usage: usize) -> MockItem {
MockItem {
id,
last_flush_time,
usage,
flush_reason: Mutex::new(None),
}
}
@@ -176,6 +208,10 @@ mod tests {
self.last_flush_time
}
fn mutable_memtable_usage(&self) -> usize {
self.usage
}
async fn request_flush(&self, reason: FlushReason) {
let mut flush_reason = self.flush_reason.lock().unwrap();
*flush_reason = Some(reason);
@@ -185,8 +221,8 @@ mod tests {
#[tokio::test]
async fn test_pick_by_interval() {
let regions = [
MockItem::new(0, util::current_time_millis()),
MockItem::new(1, util::current_time_millis() - 60 * 1000),
MockItem::new(0, util::current_time_millis(), 1),
MockItem::new(1, util::current_time_millis() - 60 * 1000, 1),
];
let picker = FlushPicker::new(PickerConfig {
// schedule_interval is unused in this test.
@@ -198,4 +234,29 @@ mod tests {
assert!(regions[0].flush_reason().is_none());
assert_eq!(Some(FlushReason::Periodically), regions[1].flush_reason());
}
#[tokio::test]
async fn test_pick_by_buffer_full() {
let regions = [
MockItem::new(0, util::current_time_millis(), 10),
MockItem::new(1, util::current_time_millis() - 60 * 1000, 0),
MockItem::new(1, util::current_time_millis() - 60 * 1000, 10),
];
let picker = FlushPicker::new(PickerConfig {
schedule_interval: Duration::from_millis(10),
auto_flush_interval: Duration::from_millis(30 * 1000),
});
picker.pick_by_write_buffer_full(&regions).await;
assert!(regions[0].flush_reason().is_none());
assert!(regions[1].flush_reason().is_none());
assert_eq!(
Some(FlushReason::GlobalBufferFull),
regions[2].flush_reason()
);
// No target.
let regions = [MockItem::new(1, util::current_time_millis(), 0)];
picker.pick_by_write_buffer_full(&regions).await;
assert!(regions[0].flush_reason().is_none());
}
}

View File

@@ -42,12 +42,48 @@ use crate::scheduler::{Handler, LocalScheduler, Request, Scheduler, SchedulerCon
use crate::sst::AccessLayerRef;
use crate::wal::Wal;
/// Key for [FlushRequest], consist of a region id and the flush
/// sequence.
type FlushKey = (RegionId, SequenceNumber);
/// Key for [FlushRequest].
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum FlushKey {
Engine,
Region(RegionId, SequenceNumber),
}
/// Flush request.
pub enum FlushRequest<S: LogStore> {
/// Flush the engine.
Engine,
/// Flush a region.
Region {
/// Region flush request.
req: FlushRegionRequest<S>,
/// Flush result sender.
sender: Sender<Result<()>>,
},
}
impl<S: LogStore> Request for FlushRequest<S> {
type Key = FlushKey;
#[inline]
fn key(&self) -> FlushKey {
match &self {
FlushRequest::Engine => FlushKey::Engine,
FlushRequest::Region { req, .. } => {
FlushKey::Region(req.shared.id(), req.flush_sequence)
}
}
}
fn complete(self, result: Result<()>) {
if let FlushRequest::Region { sender, .. } = self {
let _ = sender.send(result);
}
}
}
/// Region flush request.
pub struct FlushRequest<S: LogStore> {
pub struct FlushRegionRequest<S: LogStore> {
/// Max memtable id in these memtables,
/// used to remove immutable memtables in current version.
pub max_memtable_id: MemtableId,
@@ -67,8 +103,6 @@ pub struct FlushRequest<S: LogStore> {
pub manifest: RegionManifest,
/// Storage engine config
pub engine_config: Arc<EngineConfig>,
/// Flush result sender. Callers should set the sender to None.
pub sender: Option<Sender<Result<()>>>,
// Compaction related options:
/// TTL of the region.
@@ -77,30 +111,15 @@ pub struct FlushRequest<S: LogStore> {
pub compaction_time_window: Option<i64>,
}
impl<S: LogStore> FlushRequest<S> {
impl<S: LogStore> FlushRegionRequest<S> {
#[inline]
fn region_id(&self) -> RegionId {
self.shared.id()
}
}
impl<S: LogStore> Request for FlushRequest<S> {
type Key = FlushKey;
#[inline]
fn key(&self) -> FlushKey {
(self.shared.id(), self.flush_sequence)
}
fn complete(self, result: Result<()>) {
if let Some(sender) = self.sender {
let _ = sender.send(result);
}
}
}
impl<S: LogStore> From<&FlushRequest<S>> for FlushJob<S> {
fn from(req: &FlushRequest<S>) -> FlushJob<S> {
impl<S: LogStore> From<&FlushRegionRequest<S>> for FlushJob<S> {
fn from(req: &FlushRegionRequest<S>) -> FlushJob<S> {
FlushJob {
max_memtable_id: req.max_memtable_id,
memtables: req.memtables.clone(),
@@ -115,8 +134,8 @@ impl<S: LogStore> From<&FlushRequest<S>> for FlushJob<S> {
}
}
impl<S: LogStore> From<&FlushRequest<S>> for CompactionRequestImpl<S> {
fn from(req: &FlushRequest<S>) -> CompactionRequestImpl<S> {
impl<S: LogStore> From<&FlushRegionRequest<S>> for CompactionRequestImpl<S> {
fn from(req: &FlushRegionRequest<S>) -> CompactionRequestImpl<S> {
CompactionRequestImpl {
region_id: req.region_id(),
sst_layer: req.sst_layer.clone(),
@@ -166,16 +185,23 @@ impl<S: LogStore> FlushScheduler<S> {
regions: Arc<RegionMap<S>>,
picker_config: PickerConfig,
) -> Result<Self> {
let handler = FlushHandler {
compaction_scheduler,
};
let task_interval = picker_config.schedule_interval;
let picker = FlushPicker::new(picker_config);
let task_fn = AutoFlushFunction { regions, picker };
// Now we just clone the picker since we don't need to share states and
// the clone of picker is cheap.
let task_fn = AutoFlushFunction {
regions: regions.clone(),
picker: picker.clone(),
};
let auto_flush_task = RepeatedTask::new(task_interval, Box::new(task_fn));
auto_flush_task
.start(common_runtime::bg_runtime())
.context(StartPickTaskSnafu)?;
let handler = FlushHandler {
compaction_scheduler,
regions,
picker,
};
Ok(Self {
scheduler: LocalScheduler::new(config, handler),
@@ -183,19 +209,15 @@ impl<S: LogStore> FlushScheduler<S> {
})
}
/// Schedules a flush request and return the handle to the flush task.
///
/// # Panics
/// Panics if `sender` of the `req` is not `None`.
pub fn schedule_flush(&self, mut req: FlushRequest<S>) -> Result<FlushHandle> {
assert!(req.sender.is_none());
/// Schedules a region flush request and return the handle to the flush task.
pub fn schedule_region_flush(&self, req: FlushRegionRequest<S>) -> Result<FlushHandle> {
let region_id = req.region_id();
let sequence = req.flush_sequence;
let (sender, receiver) = oneshot::channel();
req.sender = Some(sender);
let scheduled = self.scheduler.schedule(req)?;
let scheduled = self
.scheduler
.schedule(FlushRequest::Region { req, sender })?;
// Normally we should not have duplicate flush request.
ensure!(
scheduled,
@@ -211,6 +233,12 @@ impl<S: LogStore> FlushScheduler<S> {
})
}
/// Schedules a engine flush request.
pub fn schedule_engine_flush(&self) -> Result<()> {
self.scheduler.schedule(FlushRequest::Engine)?;
Ok(())
}
/// Stop the scheduler.
pub async fn stop(&self) -> Result<()> {
self.auto_flush_task
@@ -225,6 +253,8 @@ impl<S: LogStore> FlushScheduler<S> {
struct FlushHandler<S: LogStore> {
compaction_scheduler: CompactionSchedulerRef<S>,
regions: Arc<RegionMap<S>>,
picker: FlushPicker,
}
#[async_trait::async_trait]
@@ -238,8 +268,18 @@ impl<S: LogStore> Handler for FlushHandler<S> {
finish_notifier: Arc<Notify>,
) -> Result<()> {
let compaction_scheduler = self.compaction_scheduler.clone();
let region_map = self.regions.clone();
let picker = self.picker.clone();
common_runtime::spawn_bg(async move {
execute_flush(req, compaction_scheduler).await;
match req {
FlushRequest::Engine => {
let regions = region_map.list_regions();
picker.pick_by_write_buffer_full(&regions).await;
}
FlushRequest::Region { req, sender } => {
execute_flush_region(req, sender, compaction_scheduler).await;
}
}
// releases rate limit token
token.try_release();
@@ -251,8 +291,9 @@ impl<S: LogStore> Handler for FlushHandler<S> {
}
}
async fn execute_flush<S: LogStore>(
req: FlushRequest<S>,
async fn execute_flush_region<S: LogStore>(
req: FlushRegionRequest<S>,
sender: Sender<Result<()>>,
compaction_scheduler: CompactionSchedulerRef<S>,
) {
let mut flush_job = FlushJob::from(&req);
@@ -262,7 +303,7 @@ async fn execute_flush<S: LogStore>(
increment_counter!(FLUSH_ERRORS_TOTAL);
req.complete(Err(e));
FlushRequest::Region { req, sender }.complete(Err(e));
} else {
logging::debug!("Successfully flush region: {}", req.region_id());
@@ -282,7 +323,7 @@ async fn execute_flush<S: LogStore>(
);
// Complete the request.
req.complete(Ok(()));
FlushRequest::Region { req, sender }.complete(Ok(()));
}
}

View File

@@ -18,17 +18,21 @@ mod inserter;
pub mod tests;
mod version;
use std::sync::atomic::{AtomicI64, AtomicU32, AtomicUsize, Ordering};
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use common_time::range::TimestampRange;
use datatypes::vectors::VectorRef;
use metrics::{decrement_gauge, increment_gauge};
use store_api::storage::{consts, OpType, SequenceNumber};
use crate::error::Result;
use crate::flush::FlushStrategyRef;
use crate::memtable::btree::BTreeMemtable;
pub use crate::memtable::inserter::Inserter;
pub use crate::memtable::version::MemtableVersion;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;
use crate::schema::{ProjectedSchemaRef, RegionSchemaRef};
@@ -40,32 +44,21 @@ pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap. Result
/// of this method may be larger than the estimated based on [`num_rows`] because
/// of the implementor's pre-alloc behavior.
estimated_bytes: AtomicUsize,
estimated_bytes: usize,
/// The max timestamp that this memtable contains.
max_timestamp: AtomicI64,
pub max_timestamp: i64,
/// The min timestamp that this memtable contains.
min_timestamp: AtomicI64,
pub min_timestamp: i64,
}
impl MemtableStats {
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes
.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl Default for MemtableStats {
fn default() -> Self {
Self {
estimated_bytes: AtomicUsize::default(),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
}
}
/// In memory storage.
pub trait Memtable: Send + Sync + std::fmt::Debug {
pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns id of this memtable.
fn id(&self) -> MemtableId;
@@ -87,7 +80,12 @@ pub trait Memtable: Send + Sync + std::fmt::Debug {
fn num_rows(&self) -> usize;
/// Returns stats of this memtable.
fn stats(&self) -> &MemtableStats;
fn stats(&self) -> MemtableStats;
/// Mark the memtable is immutable.
///
/// The region MUST call this inside the region writer's write lock.
fn mark_immutable(&self);
}
pub type MemtableRef = Arc<dyn Memtable>;
@@ -153,7 +151,7 @@ pub trait BatchIterator: Iterator<Item = Result<Batch>> + Send + Sync {
pub type BoxedBatchIterator = Box<dyn BatchIterator>;
pub trait MemtableBuilder: Send + Sync + std::fmt::Debug {
pub trait MemtableBuilder: Send + Sync + fmt::Debug {
fn build(&self, schema: RegionSchemaRef) -> MemtableRef;
}
@@ -200,14 +198,104 @@ impl KeyValues {
}
}
/// Memtable memory allocation tracker.
pub struct AllocTracker {
flush_strategy: Option<FlushStrategyRef>,
/// Bytes allocated by the tracker.
bytes_allocated: AtomicUsize,
/// Whether allocating is done.
is_done_allocating: AtomicBool,
}
impl fmt::Debug for AllocTracker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("AllocTracker")
.field("bytes_allocated", &self.bytes_allocated)
.field("is_done_allocating", &self.is_done_allocating)
.finish()
}
}
impl AllocTracker {
/// Returns a new [AllocTracker].
pub fn new(flush_strategy: Option<FlushStrategyRef>) -> AllocTracker {
AllocTracker {
flush_strategy,
bytes_allocated: AtomicUsize::new(0),
is_done_allocating: AtomicBool::new(false),
}
}
/// Tracks `bytes` memory is allocated.
pub(crate) fn on_allocate(&self, bytes: usize) {
self.bytes_allocated.fetch_add(bytes, Ordering::Relaxed);
increment_gauge!(WRITE_BUFFER_BYTES, bytes as f64);
if let Some(flush_strategy) = &self.flush_strategy {
flush_strategy.reserve_mem(bytes);
}
}
/// Marks we have finished allocating memory so we can free it from
/// the write buffer's limit.
///
/// The region MUST ensure that it calls this method inside the region writer's write lock.
pub(crate) fn done_allocating(&self) {
if let Some(flush_strategy) = &self.flush_strategy {
if self
.is_done_allocating
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
flush_strategy.schedule_free_mem(self.bytes_allocated.load(Ordering::Relaxed));
}
}
}
/// Returns bytes allocated.
pub(crate) fn bytes_allocated(&self) -> usize {
self.bytes_allocated.load(Ordering::Relaxed)
}
}
impl Drop for AllocTracker {
fn drop(&mut self) {
if !self.is_done_allocating.load(Ordering::Relaxed) {
self.done_allocating();
}
let bytes_allocated = self.bytes_allocated.load(Ordering::Relaxed);
decrement_gauge!(WRITE_BUFFER_BYTES, bytes_allocated as f64);
// Memory tracked by this tracker is freed.
if let Some(flush_strategy) = &self.flush_strategy {
flush_strategy.free_mem(bytes_allocated);
}
}
}
/// Default memtable builder that builds [BTreeMemtable].
#[derive(Debug, Default)]
pub struct DefaultMemtableBuilder {
memtable_id: AtomicU32,
flush_strategy: Option<FlushStrategyRef>,
}
impl DefaultMemtableBuilder {
/// Returns a new [DefaultMemtableBuilder] with specific `flush_strategy`.
///
/// If `flush_strategy` is `Some`, the memtable will report its memory usage
/// to the `flush_strategy`.
pub fn with_flush_strategy(flush_strategy: Option<FlushStrategyRef>) -> Self {
Self {
memtable_id: AtomicU32::new(0),
flush_strategy,
}
}
}
impl MemtableBuilder for DefaultMemtableBuilder {
fn build(&self, schema: RegionSchemaRef) -> MemtableRef {
let id = self.memtable_id.fetch_add(1, Ordering::Relaxed);
Arc::new(BTreeMemtable::new(id, schema))
Arc::new(BTreeMemtable::new(id, schema, self.flush_strategy.clone()))
}
}

View File

@@ -16,7 +16,7 @@ use std::cmp::Ordering;
use std::collections::{btree_map, BTreeMap};
use std::fmt;
use std::ops::Bound;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, RwLock};
use common_time::range::TimestampRange;
@@ -27,9 +27,10 @@ use datatypes::vectors::{UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8Ve
use store_api::storage::{OpType, SequenceNumber};
use crate::error::Result;
use crate::flush::FlushStrategyRef;
use crate::memtable::{
BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId, MemtableStats,
RowOrdering,
AllocTracker, BatchIterator, BoxedBatchIterator, IterContext, KeyValues, Memtable, MemtableId,
MemtableStats, RowOrdering,
};
use crate::read::Batch;
use crate::schema::compat::ReadAdapter;
@@ -44,45 +45,51 @@ pub struct BTreeMemtable {
id: MemtableId,
schema: RegionSchemaRef,
map: Arc<RwLockMap>,
stats: MemtableStats,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
}
impl BTreeMemtable {
pub fn new(id: MemtableId, schema: RegionSchemaRef) -> BTreeMemtable {
pub fn new(
id: MemtableId,
schema: RegionSchemaRef,
flush_strategy: Option<FlushStrategyRef>,
) -> BTreeMemtable {
BTreeMemtable {
id,
schema,
map: Arc::new(RwLock::new(BTreeMap::new())),
stats: Default::default(),
alloc_tracker: AllocTracker::new(flush_strategy),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
}
}
/// Updates memtable stats.
/// This function is guarded by `BTreeMemtable::map` so that store-after-load is safe.
fn update_stats(&self, min: Option<Value>, max: Option<Value>) {
fn update_stats(&self, request_size: usize, min: Option<Value>, max: Option<Value>) {
self.alloc_tracker.on_allocate(request_size);
if let Some(min) = min {
let min_val = min
.as_timestamp()
.expect("Min timestamp must be a valid timestamp value")
.value();
let cur_min = self.stats.min_timestamp.load(AtomicOrdering::Relaxed);
let cur_min = self.min_timestamp.load(AtomicOrdering::Relaxed);
if min_val < cur_min {
self.stats
.min_timestamp
.store(min_val, AtomicOrdering::Relaxed);
self.min_timestamp.store(min_val, AtomicOrdering::Relaxed);
}
}
if let Some(max) = max {
let cur_max = self.stats.max_timestamp.load(AtomicOrdering::Relaxed);
let cur_max = self.max_timestamp.load(AtomicOrdering::Relaxed);
let max_val = max
.as_timestamp()
.expect("Max timestamp must be a valid timestamp value")
.value();
if max_val > cur_max {
self.stats
.max_timestamp
.store(max_val, AtomicOrdering::Relaxed);
self.max_timestamp.store(max_val, AtomicOrdering::Relaxed);
}
}
}
@@ -97,7 +104,9 @@ impl fmt::Debug for BTreeMemtable {
// Only show StoreSchema
.field("schema", &self.schema)
.field("rows", &len)
.field("stats", &self.stats)
.field("alloc_tracker", &self.alloc_tracker)
.field("max_timestamp", &self.max_timestamp)
.field("min_timestamp", &self.min_timestamp)
.finish()
}
}
@@ -113,10 +122,6 @@ impl Memtable for BTreeMemtable {
fn write(&self, kvs: &KeyValues) -> Result<()> {
debug_assert!(kvs.timestamp.is_some());
self.stats
.estimated_bytes
.fetch_add(kvs.estimated_memory_size(), AtomicOrdering::Relaxed);
let iter_row = IterRow::new(kvs);
let mut map = self.map.write().unwrap();
@@ -135,7 +140,8 @@ impl Memtable for BTreeMemtable {
map.insert(inner_key, row_value);
}
self.update_stats(min_ts, max_ts);
self.update_stats(kvs.estimated_memory_size(), min_ts, max_ts);
Ok(())
}
@@ -151,8 +157,16 @@ impl Memtable for BTreeMemtable {
self.map.read().unwrap().len()
}
fn stats(&self) -> &MemtableStats {
&self.stats
fn stats(&self) -> MemtableStats {
MemtableStats {
estimated_bytes: self.alloc_tracker.bytes_allocated(),
max_timestamp: self.max_timestamp.load(AtomicOrdering::Relaxed),
min_timestamp: self.min_timestamp.load(AtomicOrdering::Relaxed),
}
}
fn mark_immutable(&self) {
self.alloc_tracker.done_allocating();
}
}

View File

@@ -122,7 +122,6 @@ struct SliceIndex {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use common_time::timestamp::Timestamp;
@@ -178,8 +177,8 @@ mod tests {
min_ts: i64,
) {
let iter = mem.iter(&IterContext::default()).unwrap();
assert_eq!(min_ts, mem.stats().min_timestamp.load(Ordering::Relaxed));
assert_eq!(max_ts, mem.stats().max_timestamp.load(Ordering::Relaxed));
assert_eq!(min_ts, mem.stats().min_timestamp);
assert_eq!(max_ts, mem.stats().max_timestamp);
let mut index = 0;
for batch in iter {

View File

@@ -53,8 +53,14 @@ impl MemtableVersion {
/// Clone current memtable version and freeze its mutable memtables, which moves
/// all mutable memtables to immutable memtable list.
///
/// This method also calls [Memtable::mark_immutable()](crate::memtable::Memtable::mark_immutable()) to
/// mark the mutable memtable as immutable.
pub fn freeze_mutable(&self, new_mutable: MemtableRef) -> MemtableVersion {
let mut immutables = self.immutables.clone();
// Marks the mutable memtable as immutable so it can free the memory usage from our
// soft limit.
self.mutable.mark_immutable();
immutables.push(self.mutable.clone());
MemtableVersion {

View File

@@ -30,3 +30,5 @@ pub const REGION_COUNT: &str = "storage.region_count";
pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed";
/// Elapsed time of a compact job.
pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed";
/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";

View File

@@ -166,6 +166,7 @@ pub struct StoreConfig<S: LogStore> {
pub file_purger: FilePurgerRef,
pub ttl: Option<Duration>,
pub compaction_time_window: Option<i64>,
pub write_buffer_size: usize,
}
pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata));
@@ -249,6 +250,7 @@ impl<S: LogStore> RegionImpl<S> {
store_config.engine_config.clone(),
store_config.ttl,
store_config.compaction_time_window,
store_config.write_buffer_size,
)),
wal,
flush_strategy: store_config.flush_strategy,
@@ -339,6 +341,7 @@ impl<S: LogStore> RegionImpl<S> {
store_config.engine_config.clone(),
store_config.ttl,
compaction_time_window,
store_config.write_buffer_size,
));
let writer_ctx = WriterContext {
shared: &shared,
@@ -382,10 +385,15 @@ impl<S: LogStore> RegionImpl<S> {
}
/// Returns last flush timestamp in millis.
pub fn last_flush_millis(&self) -> i64 {
pub(crate) fn last_flush_millis(&self) -> i64 {
self.inner.shared.last_flush_millis()
}
/// Returns the [VersionControl] of the region.
pub(crate) fn version_control(&self) -> &VersionControl {
self.inner.version_control()
}
fn create_version_with_checkpoint(
checkpoint: RegionCheckpoint,
memtable_builder: &MemtableBuilderRef,

View File

@@ -79,7 +79,7 @@ async fn create_region_for_compaction<
let object_store = new_object_store(store_dir, s3_bucket);
let mut store_config = config_util::new_store_config_with_object_store(
let (mut store_config, _) = config_util::new_store_config_with_object_store(
REGION_NAME,
store_dir,
object_store.clone(),

View File

@@ -15,13 +15,14 @@
//! Region flush tests.
use std::sync::Arc;
use std::time::Duration;
use common_test_util::temp_dir::create_temp_dir;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
use crate::engine;
use crate::flush::FlushStrategyRef;
use crate::engine::{self, RegionMap};
use crate::flush::{FlushStrategyRef, FlushType};
use crate::region::tests::{self, FileTesterBase};
use crate::region::RegionImpl;
use crate::test_util::config_util;
@@ -33,13 +34,20 @@ const REGION_NAME: &str = "region-flush-0";
async fn create_region_for_flush(
store_dir: &str,
flush_strategy: FlushStrategyRef,
) -> RegionImpl<RaftEngineLogStore> {
) -> (
RegionImpl<RaftEngineLogStore>,
Arc<RegionMap<RaftEngineLogStore>>,
) {
let metadata = tests::new_metadata(REGION_NAME);
let mut store_config = config_util::new_store_config(REGION_NAME, store_dir).await;
let (mut store_config, regions) =
config_util::new_store_config_and_region_map(REGION_NAME, store_dir).await;
store_config.flush_strategy = flush_strategy;
RegionImpl::create(metadata, store_config).await.unwrap()
(
RegionImpl::create(metadata, store_config).await.unwrap(),
regions,
)
}
/// Tester for region flush.
@@ -47,20 +55,23 @@ struct FlushTester {
base: Option<FileTesterBase>,
store_dir: String,
flush_strategy: FlushStrategyRef,
regions: Arc<RegionMap<RaftEngineLogStore>>,
}
impl FlushTester {
async fn new(store_dir: &str, flush_strategy: FlushStrategyRef) -> FlushTester {
let region = create_region_for_flush(store_dir, flush_strategy.clone()).await;
let (region, regions) = create_region_for_flush(store_dir, flush_strategy.clone()).await;
FlushTester {
base: Some(FileTesterBase::with_region(region)),
store_dir: store_dir.to_string(),
flush_strategy: flush_strategy.clone(),
regions,
}
}
async fn reopen(&mut self) {
self.regions.clear();
// Close the old region.
if let Some(base) = self.base.as_ref() {
base.close().await;
@@ -101,6 +112,12 @@ impl FlushTester {
}
}
impl Drop for FlushTester {
fn drop(&mut self) {
self.regions.clear();
}
}
#[tokio::test]
async fn test_flush_and_stall() {
common_telemetry::init_default_ut_logging();
@@ -279,3 +296,41 @@ async fn test_merge_read_after_flush() {
let output = tester.full_scan().await;
assert_eq!(expect, output);
}
#[tokio::test]
async fn test_schedule_engine_flush() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("engine-flush");
let store_dir = dir.path().to_str().unwrap();
let flush_switch = Arc::new(FlushSwitch::default());
let tester = FlushTester::new(store_dir, flush_switch.clone()).await;
assert_eq!(0, tester.base().region.last_flush_millis());
// Insert the region to the region map.
tester.regions.get_or_occupy_slot(
REGION_NAME,
engine::RegionSlot::Ready(tester.base().region.clone()),
);
// Put elements so we have content to flush.
tester.put(&[(1000, Some(100))]).await;
tester.put(&[(2000, Some(200))]).await;
flush_switch.set_flush_type(FlushType::Engine);
// Put element and trigger an engine level flush.
tester.put(&[(3000, Some(300))]).await;
// Wait for flush.
let mut count = 0;
while tester.base().region.last_flush_millis() == 0 && count < 50 {
tokio::time::sleep(Duration::from_millis(100)).await;
count += 1;
}
// Check parquet files.
let sst_dir = format!("{}/{}", store_dir, engine::region_sst_dir("", REGION_NAME));
assert!(has_parquet_file(&sst_dir));
}

View File

@@ -30,7 +30,9 @@ use tokio::sync::{oneshot, Mutex};
use crate::compaction::{CompactionRequestImpl, CompactionSchedulerRef};
use crate::config::EngineConfig;
use crate::error::{self, Result};
use crate::flush::{FlushHandle, FlushRequest, FlushSchedulerRef, FlushStrategyRef};
use crate::flush::{
FlushHandle, FlushRegionRequest, FlushSchedulerRef, FlushStrategyRef, FlushType, RegionStatus,
};
use crate::manifest::action::{
RawRegionMetadata, RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList,
RegionRemove,
@@ -71,6 +73,7 @@ impl RegionWriter {
config: Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
write_buffer_size: usize,
) -> RegionWriter {
RegionWriter {
inner: Mutex::new(WriterInner::new(
@@ -78,6 +81,7 @@ impl RegionWriter {
config,
ttl,
compaction_time_window,
write_buffer_size,
)),
version_mutex: Mutex::new(()),
}
@@ -443,6 +447,8 @@ struct WriterInner {
engine_config: Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
/// Size in bytes to freeze the mutable memtable.
write_buffer_size: usize,
}
impl WriterInner {
@@ -451,6 +457,7 @@ impl WriterInner {
engine_config: Arc<EngineConfig>,
ttl: Option<Duration>,
compaction_time_window: Option<i64>,
write_buffer_size: usize,
) -> WriterInner {
WriterInner {
memtable_builder,
@@ -459,6 +466,7 @@ impl WriterInner {
closed: false,
ttl,
compaction_time_window,
write_buffer_size,
}
}
@@ -663,13 +671,24 @@ impl WriterInner {
let version_control = writer_ctx.version_control();
// Check whether memtable is full or flush should be triggered. We need to do this first since
// switching memtables will clear all mutable memtables.
if self.should_flush(
if let Some(flush_type) = self.should_flush(
writer_ctx.shared,
version_control,
writer_ctx.flush_strategy,
) {
self.trigger_flush(writer_ctx, FlushReason::MemtableFull)
.await?;
// Trigger flush according to the flush type.
match flush_type {
FlushType::Region => {
// Trigger flush for current region.
self.trigger_flush(writer_ctx, FlushReason::MemtableFull)
.await?;
}
FlushType::Engine => {
// Trigger engine level flush. This wakeup the flush handler
// to pick region to flush.
writer_ctx.flush_scheduler.schedule_engine_flush()?;
}
}
}
Ok(())
@@ -686,12 +705,15 @@ impl WriterInner {
shared: &SharedDataRef,
version_control: &VersionControlRef,
flush_strategy: &FlushStrategyRef,
) -> bool {
) -> Option<FlushType> {
let current = version_control.current();
let memtables = current.memtables();
let mutable_bytes_allocated = memtables.mutable_bytes_allocated();
let total_bytes_allocated = memtables.total_bytes_allocated();
flush_strategy.should_flush(shared, mutable_bytes_allocated, total_bytes_allocated)
let status = RegionStatus {
region_id: shared.id(),
bytes_mutable: memtables.mutable_bytes_allocated(),
write_buffer_size: self.write_buffer_size,
};
flush_strategy.should_flush(status)
}
async fn trigger_flush<S: LogStore>(
@@ -723,11 +745,14 @@ impl WriterInner {
let (max_memtable_id, mem_to_flush) = current_version.memtables().memtables_to_flush();
if max_memtable_id.is_none() {
// We still update the flush time to avoid the picker picks this region again.
ctx.shared.update_flush_millis();
logging::info!("No memtables to flush in region: {}", ctx.shared.name);
return Ok(());
}
let flush_req = FlushRequest {
let flush_req = FlushRegionRequest {
max_memtable_id: max_memtable_id.unwrap(),
memtables: mem_to_flush,
// In write thread, safe to use current committed sequence.
@@ -738,15 +763,17 @@ impl WriterInner {
wal: ctx.wal.clone(),
manifest: ctx.manifest.clone(),
engine_config: self.engine_config.clone(),
sender: None,
ttl: self.ttl,
compaction_time_window: self.compaction_time_window,
};
let flush_handle = ctx.flush_scheduler.schedule_flush(flush_req).map_err(|e| {
logging::error!(e; "Failed to schedule flush request");
e
})?;
let flush_handle = ctx
.flush_scheduler
.schedule_region_flush(flush_req)
.map_err(|e| {
logging::error!(e; "Failed to schedule flush request");
e
})?;
self.flush_handle = Some(flush_handle);
Ok(())

View File

@@ -22,6 +22,7 @@ use object_store::ObjectStore;
use store_api::manifest::Manifest;
use crate::compaction::noop::NoopCompactionScheduler;
use crate::config::DEFAULT_REGION_WRITE_BUFFER_SIZE;
use crate::engine::{self, RegionMap};
use crate::file_purger::noop::NoopFilePurgeHandler;
use crate::flush::{FlushScheduler, PickerConfig, SizeBasedStrategy};
@@ -44,6 +45,23 @@ pub async fn new_store_config(
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
new_store_config_with_object_store(region_name, store_dir, object_store)
.await
.0
}
/// Create a new StoreConfig and region map for test.
pub async fn new_store_config_and_region_map(
region_name: &str,
store_dir: &str,
) -> (
StoreConfig<RaftEngineLogStore>,
Arc<RegionMap<RaftEngineLogStore>>,
) {
let mut builder = Fs::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
new_store_config_with_object_store(region_name, store_dir, object_store).await
}
@@ -52,7 +70,10 @@ pub async fn new_store_config_with_object_store(
region_name: &str,
store_dir: &str,
object_store: ObjectStore,
) -> StoreConfig<RaftEngineLogStore> {
) -> (
StoreConfig<RaftEngineLogStore>,
Arc<RegionMap<RaftEngineLogStore>>,
) {
let parent_dir = "";
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
@@ -73,11 +94,12 @@ pub async fn new_store_config_with_object_store(
let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap());
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
// We use an empty region map so actually the background worker of the picker is disabled.
let regions = Arc::new(RegionMap::new());
let flush_scheduler = Arc::new(
FlushScheduler::new(
SchedulerConfig::default(),
compaction_scheduler.clone(),
Arc::new(RegionMap::new()),
regions.clone(),
PickerConfig::default(),
)
.unwrap(),
@@ -86,17 +108,21 @@ pub async fn new_store_config_with_object_store(
SchedulerConfig::default(),
NoopFilePurgeHandler,
));
StoreConfig {
log_store,
sst_layer,
manifest,
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
compaction_scheduler,
engine_config: Default::default(),
file_purger,
ttl: None,
compaction_time_window: None,
}
(
StoreConfig {
log_store,
sst_layer,
manifest,
memtable_builder: Arc::new(DefaultMemtableBuilder::default()),
flush_scheduler,
flush_strategy: Arc::new(SizeBasedStrategy::default()),
compaction_scheduler,
engine_config: Default::default(),
file_purger,
ttl: None,
compaction_time_window: None,
write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE.as_bytes() as usize,
},
regions,
)
}

View File

@@ -12,33 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use crate::flush::FlushStrategy;
use crate::region::SharedDataRef;
use crate::flush::{FlushStrategy, FlushType, RegionStatus};
/// Controls whether to flush a region while writing the region.
/// Disable flush by default.
#[derive(Debug, Default)]
pub struct FlushSwitch {
should_flush: AtomicBool,
flush_type: Mutex<Option<FlushType>>,
}
impl FlushSwitch {
pub fn set_should_flush(&self, should_flush: bool) {
self.should_flush.store(should_flush, Ordering::Relaxed);
if should_flush {
*self.flush_type.lock().unwrap() = Some(FlushType::Region);
} else {
*self.flush_type.lock().unwrap() = None;
}
}
pub fn set_flush_type(&self, flush_type: FlushType) {
*self.flush_type.lock().unwrap() = Some(flush_type);
}
}
impl FlushStrategy for FlushSwitch {
fn should_flush(
&self,
_shared: &SharedDataRef,
_bytes_mutable: usize,
_bytes_total: usize,
) -> bool {
self.should_flush.load(Ordering::Relaxed)
fn should_flush(&self, _status: RegionStatus) -> Option<FlushType> {
*self.flush_type.lock().unwrap()
}
fn reserve_mem(&self, _mem: usize) {}
fn schedule_free_mem(&self, _mem: usize) {}
fn free_mem(&self, _mem: usize) {}
}
pub fn has_parquet_file(sst_dir: &str) -> bool {

View File

@@ -136,6 +136,8 @@ pub enum FlushReason {
Manually,
/// Auto flush periodically.
Periodically,
/// Global write buffer is full.
GlobalBufferFull,
}
impl FlushReason {
@@ -146,6 +148,7 @@ impl FlushReason {
FlushReason::MemtableFull => "memtable_full",
FlushReason::Manually => "manually",
FlushReason::Periodically => "periodically",
FlushReason::GlobalBufferFull => "global_buffer_full",
}
}
}