diff --git a/config/config.md b/config/config.md
index fa58b07464..b0c95f4e2a 100644
--- a/config/config.md
+++ b/config/config.md
@@ -102,6 +102,10 @@
| `region_engine.mito.sst_meta_cache_size` | String | `128MB` | Cache size for SST metadata. Setting it to 0 to disable the cache.
If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.vector_cache_size` | String | `512MB` | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | `512MB` | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
+| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
+| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
+| `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
+| `region_engine.mito.experimental_write_cache_ttl` | String | `1h` | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
@@ -350,6 +354,10 @@
| `region_engine.mito.sst_meta_cache_size` | String | `128MB` | Cache size for SST metadata. Setting it to 0 to disable the cache.
If not set, it's default to 1/32 of OS memory with a max limitation of 128MB. |
| `region_engine.mito.vector_cache_size` | String | `512MB` | Cache size for vectors and arrow arrays. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
| `region_engine.mito.page_cache_size` | String | `512MB` | Cache size for pages of SST row groups. Setting it to 0 to disable the cache.
If not set, it's default to 1/16 of OS memory with a max limitation of 512MB. |
+| `region_engine.mito.enable_experimental_write_cache` | Bool | `false` | Whether to enable the experimental write cache. |
+| `region_engine.mito.experimental_write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}/write_cache`. |
+| `region_engine.mito.experimental_write_cache_size` | String | `512MB` | Capacity for write cache. |
+| `region_engine.mito.experimental_write_cache_ttl` | String | `1h` | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).
- `0`: using the default value (1/4 of cpu cores).
- `1`: scan in current thread.
- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 55e0c36383..d184904877 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -324,6 +324,18 @@ vector_cache_size = "512MB"
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
page_cache_size = "512MB"
+## Whether to enable the experimental write cache.
+enable_experimental_write_cache = false
+
+## File system path for write cache, defaults to `{data_home}/write_cache`.
+experimental_write_cache_path = ""
+
+## Capacity for write cache.
+experimental_write_cache_size = "512MB"
+
+## TTL for write cache.
+experimental_write_cache_ttl = "1h"
+
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 2ed47506b6..7f3daedb46 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -367,6 +367,18 @@ vector_cache_size = "512MB"
## If not set, it's default to 1/16 of OS memory with a max limitation of 512MB.
page_cache_size = "512MB"
+## Whether to enable the experimental write cache.
+enable_experimental_write_cache = false
+
+## File system path for write cache, defaults to `{data_home}/write_cache`.
+experimental_write_cache_path = ""
+
+## Capacity for write cache.
+experimental_write_cache_size = "512MB"
+
+## TTL for write cache.
+experimental_write_cache_ttl = "1h"
+
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs
index 890d0b7739..0afbf5b669 100644
--- a/src/mito2/src/cache/file_cache.rs
+++ b/src/mito2/src/cache/file_cache.rs
@@ -16,7 +16,7 @@
use std::ops::Range;
use std::sync::Arc;
-use std::time::Instant;
+use std::time::{Duration, Instant};
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
@@ -56,9 +56,13 @@ pub(crate) type FileCacheRef = Arc;
impl FileCache {
/// Creates a new file cache.
- pub(crate) fn new(local_store: ObjectStore, capacity: ReadableSize) -> FileCache {
+ pub(crate) fn new(
+ local_store: ObjectStore,
+ capacity: ReadableSize,
+ ttl: Option,
+ ) -> FileCache {
let cache_store = local_store.clone();
- let memory_index = Cache::builder()
+ let mut builder = Cache::builder()
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
value.file_size
@@ -87,8 +91,11 @@ impl FileCache {
}
}
.boxed()
- })
- .build();
+ });
+ if let Some(ttl) = ttl {
+ builder = builder.time_to_idle(ttl);
+ }
+ let memory_index = builder.build();
FileCache {
local_store,
memory_index,
@@ -376,12 +383,52 @@ mod tests {
ObjectStore::new(builder).unwrap().finish()
}
+ #[tokio::test]
+ async fn test_file_cache_ttl() {
+ let dir = create_temp_dir("");
+ let local_store = new_fs_store(dir.path().to_str().unwrap());
+
+ let cache = FileCache::new(
+ local_store.clone(),
+ ReadableSize::mb(10),
+ Some(Duration::from_millis(5)),
+ );
+ let region_id = RegionId::new(2000, 0);
+ let file_id = FileId::random();
+ let key = IndexKey::new(region_id, file_id, FileType::Parquet);
+ let file_path = cache.cache_file_path(key);
+
+ // Get an empty file.
+ assert!(cache.reader(key).await.is_none());
+
+ // Write a file.
+ local_store
+ .write(&file_path, b"hello".as_slice())
+ .await
+ .unwrap();
+
+ // Add to the cache.
+ cache
+ .put(
+ IndexKey::new(region_id, file_id, FileType::Parquet),
+ IndexValue { file_size: 5 },
+ )
+ .await;
+
+ let exist = cache.reader(key).await;
+ assert!(exist.is_some());
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ cache.memory_index.run_pending_tasks().await;
+ let non = cache.reader(key).await;
+ assert!(non.is_none());
+ }
+
#[tokio::test]
async fn test_file_cache_basic() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
- let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
+ let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -430,7 +477,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
- let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
+ let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -462,7 +509,7 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
- let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
+ let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
@@ -488,7 +535,7 @@ mod tests {
}
// Recover the cache.
- let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
+ let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
// No entry before recovery.
assert!(cache
.reader(IndexKey::new(region_id, file_ids[0], file_type))
@@ -513,7 +560,7 @@ mod tests {
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
- let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
+ let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs
index c13e32e641..23a8419469 100644
--- a/src/mito2/src/cache/write_cache.rs
+++ b/src/mito2/src/cache/write_cache.rs
@@ -15,6 +15,7 @@
//! A write-through cache for remote object stores.
use std::sync::Arc;
+use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
@@ -55,9 +56,10 @@ impl WriteCache {
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
+ ttl: Option,
intermediate_manager: IntermediateManager,
) -> Result {
- let file_cache = FileCache::new(local_store, cache_capacity);
+ let file_cache = FileCache::new(local_store, cache_capacity, ttl);
file_cache.recover().await?;
Ok(Self {
@@ -72,6 +74,7 @@ impl WriteCache {
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
+ ttl: Option,
intermediate_manager: IntermediateManager,
) -> Result {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
@@ -81,6 +84,7 @@ impl WriteCache {
local_store,
object_store_manager,
cache_capacity,
+ ttl,
intermediate_manager,
)
.await
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 60d079c5ab..7d633765a0 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -87,6 +87,9 @@ pub struct MitoConfig {
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
+ /// TTL for write cache.
+ #[serde(with = "humantime_serde")]
+ pub experimental_write_cache_ttl: Option,
// Other configs:
/// Buffer size for SST writing.
@@ -126,6 +129,7 @@ impl Default for MitoConfig {
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::mb(512),
+ experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60)),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
@@ -228,10 +232,16 @@ impl MitoConfig {
/// Enable experimental write cache.
#[cfg(test)]
- pub fn enable_write_cache(mut self, path: String, size: ReadableSize) -> Self {
+ pub fn enable_write_cache(
+ mut self,
+ path: String,
+ size: ReadableSize,
+ ttl: Option,
+ ) -> Self {
self.enable_experimental_write_cache = true;
self.experimental_write_cache_path = path;
self.experimental_write_cache_size = size;
+ self.experimental_write_cache_ttl = ttl;
self
}
}
diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs
index dbe33ff37f..6d3fac897e 100644
--- a/src/mito2/src/engine/basic_test.rs
+++ b/src/mito2/src/engine/basic_test.rs
@@ -565,7 +565,7 @@ async fn test_engine_with_write_cache() {
let mut env = TestEnv::new();
let path = env.data_home().to_str().unwrap().to_string();
- let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512));
+ let mito_config = MitoConfig::default().enable_write_cache(path, ReadableSize::mb(512), None);
let engine = env.create_engine(mito_config).await;
let region_id = RegionId::new(1, 1);
diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs
index 27a746f3ba..78dbd1c336 100644
--- a/src/mito2/src/test_util.rs
+++ b/src/mito2/src/test_util.rs
@@ -377,9 +377,10 @@ impl TestEnv {
.unwrap();
let object_store_manager = self.get_object_store_manager().unwrap();
- let write_cache = WriteCache::new(local_store, object_store_manager, capacity, intm_mgr)
- .await
- .unwrap();
+ let write_cache =
+ WriteCache::new(local_store, object_store_manager, capacity, None, intm_mgr)
+ .await
+ .unwrap();
Arc::new(write_cache)
}
diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs
index ca6eaa6bfe..7483c73bad 100644
--- a/src/mito2/src/worker.rs
+++ b/src/mito2/src/worker.rs
@@ -320,6 +320,7 @@ async fn write_cache_from_config(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
+ config.experimental_write_cache_ttl,
intermediate_manager,
)
.await?;
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 14c3e8cac2..49132fbc7b 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -814,6 +814,7 @@ auto_flush_interval = "30m"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "512MiB"
+experimental_write_cache_ttl = "1h"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
allow_stale_entries = false