chore: add ttl to write_cache (#4010)

* chore: add ttl to write_cache

* chore: update test & add example config

* chore: fix typo

* chore: fix typo

* chore: fix typo
This commit is contained in:
shuiyisong
2024-05-22 14:50:12 +08:00
committed by GitHub
parent 9800807fe5
commit 9e1af79637
10 changed files with 112 additions and 16 deletions

View File

@@ -102,6 +102,10 @@
| `region_engine.mito.sst_meta_cache_size` | String | `128MB` | Cache size for SST metadata. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.vector_cache_size` | String | `512MB` | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | `512MB` | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
| `region_engine.mito.experimental_write_cache_ttl` | String | `1h` | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
@@ -350,6 +354,10 @@
| `region_engine.mito.sst_meta_cache_size` | String | `128MB` | Cache size for SST metadata. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.vector_cache_size` | String | `512MB` | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | `512MB` | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.<br/>If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
| `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
| `region_engine.mito.experimental_write_cache_ttl` | String | `1h` | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |

View File

@@ -324,6 +324,18 @@ vector_cache_size = "512MB"
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
page_cache_size = "512MB"
## Whether to enable the experimental write cache.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/write_cache`.
experimental_write_cache_path = ""
## Capacity for write cache.
experimental_write_cache_size = "512MB"
## TTL for write cache.
experimental_write_cache_ttl = "1h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -367,6 +367,18 @@ vector_cache_size = "512MB"
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
page_cache_size = "512MB"
## Whether to enable the experimental write cache.
enable_experimental_write_cache = false
## File system path for write cache, defaults to `{data_home}/write_cache`.
experimental_write_cache_path = ""
## Capacity for write cache.
experimental_write_cache_size = "512MB"
## TTL for write cache.
experimental_write_cache_ttl = "1h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -16,7 +16,7 @@
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
@@ -56,9 +56,13 @@ pub(crate) type FileCacheRef = Arc<FileCache>;
impl FileCache {
/// Creates a new file cache.
pub(crate) fn new(local_store: ObjectStore, capacity: ReadableSize) -> FileCache {
pub(crate) fn new(
local_store: ObjectStore,
capacity: ReadableSize,
ttl: Option<Duration>,
) -> FileCache {
let cache_store = local_store.clone();
let memory_index = Cache::builder()
let mut builder = Cache::builder()
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
value.file_size
@@ -87,8 +91,11 @@ impl FileCache {
}
}
.boxed()
})
.build();
});
if let Some(ttl) = ttl {
builder = builder.time_to_idle(ttl);
}
let memory_index = builder.build();
FileCache {
local_store,
memory_index,
@@ -376,12 +383,52 @@ mod tests {
ObjectStore::new(builder).unwrap().finish()
}
#[tokio::test]
async fn test_file_cache_ttl() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
Some(Duration::from_millis(5)),
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
let file_path = cache.cache_file_path(key);
// Get an empty file.
assert!(cache.reader(key).await.is_none());
// Write a file.
local_store
.write(&file_path, b"hello".as_slice())
.await
.unwrap();
// Add to the cache.
cache
.put(
IndexKey::new(region_id, file_id, FileType::Parquet),
IndexValue { file_size: 5 },
)
.await;
let exist = cache.reader(key).await;
assert!(exist.is_some());
tokio::time::sleep(Duration::from_millis(10)).await;
cache.memory_index.run_pending_tasks().await;
let non = cache.reader(key).await;
assert!(non.is_none());
}
#[tokio::test]
async fn test_file_cache_basic() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -430,7 +477,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -462,7 +509,7 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
@@ -488,7 +535,7 @@ mod tests {
}
// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
// No entry before recovery.
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
@@ -513,7 +560,7 @@ mod tests {
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);

View File

@@ -15,6 +15,7 @@
//! A write-through cache for remote object stores.
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
@@ -55,9 +56,10 @@ impl WriteCache {
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity);
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
file_cache.recover().await?;
Ok(Self {
@@ -72,6 +74,7 @@ impl WriteCache {
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
@@ -81,6 +84,7 @@ impl WriteCache {
local_store,
object_store_manager,
cache_capacity,
ttl,
intermediate_manager,
)
.await

View File

@@ -87,6 +87,9 @@ pub struct MitoConfig {
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
/// TTL for write cache.
#[serde(with = "humantime_serde")]
pub experimental_write_cache_ttl: Option<Duration>,
// Other configs:
/// Buffer size for SST writing.
@@ -126,6 +129,7 @@ impl Default for MitoConfig {
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::mb(512),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60)),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
@@ -228,10 +232,16 @@ impl MitoConfig {
/// Enable experimental write cache.
#[cfg(test)]
pub fn enable_write_cache(mut self, path: String, size: ReadableSize) -> Self {
pub fn enable_write_cache(
mut self,
path: String,
size: ReadableSize,
ttl: Option<Duration>,
) -> Self {
self.enable_experimental_write_cache = true;
self.experimental_write_cache_path = path;
self.experimental_write_cache_size = size;
self.experimental_write_cache_ttl = ttl;
self
}
}

View File

@@ -565,7 +565,7 @@ async fn test_engine_with_write_cache() {
let mut env = TestEnv::new();
let path = env.data_home().to_str().unwrap().to_string();
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512));
let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
let engine = env.create_engine(mito_config).await;
let region_id = RegionId::new(1, 1);

View File

@@ -377,9 +377,10 @@ impl TestEnv {
.unwrap();
let object_store_manager = self.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(local_store, object_store_manager, capacity, intm_mgr)
.await
.unwrap();
let write_cache =
WriteCache::new(local_store, object_store_manager, capacity, None, intm_mgr)
.await
.unwrap();
Arc::new(write_cache)
}

View File

@@ -320,6 +320,7 @@ async fn write_cache_from_config(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
intermediate_manager,
)
.await?;

View File

@@ -814,6 +814,7 @@ auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "512MiB"
experimental_write_cache_ttl = "1h"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false