feat: speed up read/write cache and stager eviction (#5531)

* feat: change cache policy for file cache

* feat: file cache run pending task after put

* feat: run pending task in put_dir

* feat: run pending task after stager recovered

* feat: purge recycle bin periodically

* feat: use lru policy for read cache
This commit is contained in:
Yingwen
2025-02-14 01:13:24 +08:00
committed by GitHub
parent 73c29bb482
commit b4d5393080
3 changed files with 62 additions and 32 deletions

View File

@@ -24,6 +24,7 @@ use common_telemetry::{error, info, warn};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
use moka::policy::EvictionPolicy;
use object_store::util::join_path;
use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
@@ -65,6 +66,7 @@ impl FileCache {
) -> FileCache {
let cache_store = local_store.clone();
let mut builder = Cache::builder()
.eviction_policy(EvictionPolicy::lru())
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
value.file_size
@@ -112,6 +114,9 @@ impl FileCache {
.with_label_values(&[FILE_TYPE])
.add(value.file_size.into());
self.memory_index.insert(key, value).await;
// Since files are large items, we run the pending tasks immediately.
self.memory_index.run_pending_tasks().await;
}
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
@@ -224,10 +229,15 @@ impl FileCache {
// The metrics is a signed int gauge so we can updates it finally.
CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size);
// Run all pending tasks of the moka cache so that the cache size is updated
// and the eviction policy is applied.
self.memory_index.run_pending_tasks().await;
info!(
"Recovered file cache, num_keys: {}, num_bytes: {}, cost: {:?}",
"Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}",
total_keys,
total_size,
self.memory_index.weighted_size(),
now.elapsed()
);
Ok(())

View File

@@ -18,6 +18,7 @@ use common_telemetry::debug;
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use moka::policy::EvictionPolicy;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
@@ -120,6 +121,7 @@ impl<C: Access> ReadCache<C> {
file_cache,
mem_cache: Cache::builder()
.max_capacity(capacity as u64)
.eviction_policy(EvictionPolicy::lru())
.weigher(|_key, value: &ReadResult| -> u32 {
// TODO(dennis): add key's length to weight?
value.size_bytes()

View File

@@ -44,6 +44,7 @@ use crate::puffin_manager::{BlobGuard, DirGuard};
const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp";
const DELETED_EXTENSION: &str = "deleted";
const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
pub struct BoundedStager {
@@ -74,9 +75,7 @@ impl BoundedStager {
.await
.context(CreateSnafu)?;
let recycle_bin = Cache::builder()
.time_to_live(Duration::from_secs(60))
.build();
let recycle_bin = Cache::builder().time_to_live(RECYCLE_BIN_TTL).build();
let recycle_bin_cloned = recycle_bin.clone();
let cache = Cache::builder()
@@ -93,7 +92,7 @@ impl BoundedStager {
.build();
let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
common_runtime::global_runtime().spawn(Self::delete_routine(rx));
common_runtime::global_runtime().spawn(Self::delete_routine(rx, recycle_bin.clone()));
let stager = Self {
cache,
@@ -211,7 +210,15 @@ impl Stager for BoundedStager {
})
.await
.map(|_| ())
.context(CacheGetSnafu)
.context(CacheGetSnafu)?;
// Dir is usually large.
// Runs pending tasks of the cache and recycle bin to free up space
// more quickly.
self.cache.run_pending_tasks().await;
self.recycle_bin.run_pending_tasks().await;
Ok(())
}
}
@@ -327,6 +334,7 @@ impl BoundedStager {
for (key, value) in elems {
self.cache.insert(key, value).await;
}
self.cache.run_pending_tasks().await;
Ok(())
}
@@ -349,38 +357,48 @@ impl BoundedStager {
Ok(size)
}
async fn delete_routine(mut receiver: Receiver<DeleteTask>) {
while let Some(task) = receiver.recv().await {
match task {
DeleteTask::File(path) => {
if let Err(err) = fs::remove_file(&path).await {
if err.kind() == std::io::ErrorKind::NotFound {
continue;
}
async fn delete_routine(
mut receiver: Receiver<DeleteTask>,
recycle_bin: Cache<String, CacheValue>,
) {
loop {
match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
Ok(Some(task)) => match task {
DeleteTask::File(path) => {
if let Err(err) = fs::remove_file(&path).await {
if err.kind() == std::io::ErrorKind::NotFound {
continue;
}
warn!(err; "Failed to remove the file.");
warn!(err; "Failed to remove the file.");
}
}
}
DeleteTask::Dir(path) => {
let deleted_path = path.with_extension(DELETED_EXTENSION);
if let Err(err) = fs::rename(&path, &deleted_path).await {
if err.kind() == std::io::ErrorKind::NotFound {
continue;
}
// Remove the deleted directory if the rename fails and retry
let _ = fs::remove_dir_all(&deleted_path).await;
DeleteTask::Dir(path) => {
let deleted_path = path.with_extension(DELETED_EXTENSION);
if let Err(err) = fs::rename(&path, &deleted_path).await {
warn!(err; "Failed to rename the dangling directory to deleted path.");
continue;
if err.kind() == std::io::ErrorKind::NotFound {
continue;
}
// Remove the deleted directory if the rename fails and retry
let _ = fs::remove_dir_all(&deleted_path).await;
if let Err(err) = fs::rename(&path, &deleted_path).await {
warn!(err; "Failed to rename the dangling directory to deleted path.");
continue;
}
}
if let Err(err) = fs::remove_dir_all(&deleted_path).await {
warn!(err; "Failed to remove the dangling directory.");
}
}
if let Err(err) = fs::remove_dir_all(&deleted_path).await {
warn!(err; "Failed to remove the dangling directory.");
DeleteTask::Terminate => {
break;
}
}
DeleteTask::Terminate => {
break;
},
Ok(None) => break,
Err(_) => {
// Purge recycle bin periodically to reclaim the space quickly.
recycle_bin.run_pending_tasks().await;
}
}
}