feat: add stager nofitier to collect metrics (#5530)

* feat: add stager nofitier to collect metrics

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* apply prev commit

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* remove dup size

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add load cost

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-02-14 15:49:26 +08:00
committed by GitHub
parent 33a2485f54
commit 858dae7b23
5 changed files with 422 additions and 56 deletions

View File

@@ -30,7 +30,7 @@ async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager>) {
let path = staging_dir.path().to_path_buf();
(
staging_dir,
Arc::new(BoundedStager::new(path, 102400).await.unwrap()),
Arc::new(BoundedStager::new(path, 102400, None).await.unwrap()),
)
}

View File

@@ -63,7 +63,7 @@ impl PuffinManagerFactory {
write_buffer_size: Option<usize>,
) -> Result<Self> {
let staging_dir = aux_path.as_ref().join(STAGING_DIR);
let stager = BoundedStager::new(staging_dir, staging_capacity)
let stager = BoundedStager::new(staging_dir, staging_capacity, None)
.await
.context(PuffinInitStagerSnafu)?;
Ok(Self {

View File

@@ -15,6 +15,7 @@
mod bounded_stager;
use std::path::PathBuf;
use std::time::Duration;
use async_trait::async_trait;
pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
@@ -88,3 +89,40 @@ pub trait Stager: Send + Sync {
dir_size: u64,
) -> Result<()>;
}
/// `StagerNotifier` provides a way to notify the caller of the staging events.
pub trait StagerNotifier: Send + Sync {
/// Notifies the caller that a cache hit occurred.
/// `size` is the size of the content that was hit in the cache.
fn on_cache_hit(&self, size: u64);
/// Notifies the caller that a cache miss occurred.
/// `size` is the size of the content that was missed in the cache.
fn on_cache_miss(&self, size: u64);
/// Notifies the caller that a blob or directory was inserted into the cache.
/// `size` is the size of the content that was inserted into the cache.
///
/// Note: not only cache misses will trigger this event, but recoveries and recycles as well.
fn on_cache_insert(&self, size: u64);
/// Notifies the caller that a directory was inserted into the cache.
/// `duration` is the time it took to load the directory.
fn on_load_dir(&self, duration: Duration);
/// Notifies the caller that a blob was inserted into the cache.
/// `duration` is the time it took to load the blob.
fn on_load_blob(&self, duration: Duration);
/// Notifies the caller that a blob or directory was evicted from the cache.
/// `size` is the size of the content that was evicted from the cache.
fn on_cache_evict(&self, size: u64);
/// Notifies the caller that a blob or directory was dropped to the recycle bin.
/// `size` is the size of the content that was dropped to the recycle bin.
fn on_recycle_insert(&self, size: u64);
/// Notifies the caller that the recycle bin was cleared.
/// `size` is the size of the content that was cleared from the recycle bin.
fn on_recycle_clear(&self, size: u64);
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use async_walkdir::{Filtering, WalkDir};
@@ -34,6 +34,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_util::compat::TokioAsyncWriteCompatExt;
use super::StagerNotifier;
use crate::error::{
CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
Result, WalkDirSnafu,
@@ -67,10 +68,17 @@ pub struct BoundedStager {
/// 3. sent the delete task to the delete queue on drop
/// 4. background routine removes the file or directory
delete_queue: Sender<DeleteTask>,
/// Notifier for the stager.
notifier: Option<Arc<dyn StagerNotifier>>,
}
impl BoundedStager {
pub async fn new(base_dir: PathBuf, capacity: u64) -> Result<Self> {
pub async fn new(
base_dir: PathBuf,
capacity: u64,
notifier: Option<Arc<dyn StagerNotifier>>,
) -> Result<Self> {
tokio::fs::create_dir_all(&base_dir)
.await
.context(CreateSnafu)?;
@@ -78,12 +86,17 @@ impl BoundedStager {
let recycle_bin = Cache::builder().time_to_live(RECYCLE_BIN_TTL).build();
let recycle_bin_cloned = recycle_bin.clone();
let notifier_cloned = notifier.clone();
let cache = Cache::builder()
.max_capacity(capacity)
.weigher(|_: &String, v: &CacheValue| v.weight())
.eviction_policy(EvictionPolicy::lru())
.async_eviction_listener(move |k, v, _| {
let recycle_bin = recycle_bin_cloned.clone();
if let Some(notifier) = notifier_cloned.as_ref() {
notifier.on_cache_evict(v.size());
notifier.on_recycle_insert(v.size());
}
async move {
recycle_bin.insert(k.as_str().to_string(), v).await;
}
@@ -92,13 +105,18 @@ impl BoundedStager {
.build();
let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
common_runtime::global_runtime().spawn(Self::delete_routine(rx, recycle_bin.clone()));
let notifier_cloned = notifier.clone();
common_runtime::global_runtime().spawn(Self::delete_routine(
rx,
recycle_bin.clone(),
notifier_cloned,
));
let stager = Self {
cache,
base_dir,
delete_queue,
recycle_bin,
notifier,
};
stager.recover().await?;
@@ -120,29 +138,48 @@ impl Stager for BoundedStager {
) -> Result<Self::Blob> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let mut miss = false;
let v = self
.cache
.try_get_with(cache_key.clone(), async {
.try_get_with_by_ref(&cache_key, async {
if let Some(v) = self.recycle_bin.remove(&cache_key).await {
if let Some(notifier) = self.notifier.as_ref() {
let size = v.size();
notifier.on_cache_insert(size);
notifier.on_recycle_clear(size);
}
return Ok(v);
}
miss = true;
let timer = Instant::now();
let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&file_name);
let size = Self::write_blob(&path, init_fn).await?;
if let Some(notifier) = self.notifier.as_ref() {
notifier.on_cache_insert(size);
notifier.on_load_blob(timer.elapsed());
}
let guard = Arc::new(FsBlobGuard {
path,
delete_queue: self.delete_queue.clone(),
size,
});
Ok(CacheValue::File { guard, size })
Ok(CacheValue::File(guard))
})
.await
.context(CacheGetSnafu)?;
if let Some(notifier) = self.notifier.as_ref() {
if miss {
notifier.on_cache_miss(v.size());
} else {
notifier.on_cache_hit(v.size());
}
}
match v {
CacheValue::File { guard, .. } => Ok(guard),
CacheValue::File(guard) => Ok(guard),
_ => unreachable!(),
}
}
@@ -155,29 +192,48 @@ impl Stager for BoundedStager {
) -> Result<Self::Dir> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let mut miss = false;
let v = self
.cache
.try_get_with(cache_key.clone(), async {
.try_get_with_by_ref(&cache_key, async {
if let Some(v) = self.recycle_bin.remove(&cache_key).await {
if let Some(notifier) = self.notifier.as_ref() {
let size = v.size();
notifier.on_cache_insert(size);
notifier.on_recycle_clear(size);
}
return Ok(v);
}
miss = true;
let timer = Instant::now();
let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&dir_name);
let size = Self::write_dir(&path, init_fn).await?;
if let Some(notifier) = self.notifier.as_ref() {
notifier.on_cache_insert(size);
notifier.on_load_dir(timer.elapsed());
}
let guard = Arc::new(FsDirGuard {
path,
size,
delete_queue: self.delete_queue.clone(),
});
Ok(CacheValue::Dir { guard, size })
Ok(CacheValue::Dir(guard))
})
.await
.context(CacheGetSnafu)?;
if let Some(notifier) = self.notifier.as_ref() {
if miss {
notifier.on_cache_miss(v.size());
} else {
notifier.on_cache_hit(v.size());
}
}
match v {
CacheValue::Dir { guard, .. } => Ok(guard),
CacheValue::Dir(guard) => Ok(guard),
_ => unreachable!(),
}
}
@@ -194,6 +250,11 @@ impl Stager for BoundedStager {
self.cache
.try_get_with(cache_key.clone(), async move {
if let Some(v) = self.recycle_bin.remove(&cache_key).await {
if let Some(notifier) = self.notifier.as_ref() {
let size = v.size();
notifier.on_cache_insert(size);
notifier.on_recycle_clear(size);
}
return Ok(v);
}
@@ -201,12 +262,15 @@ impl Stager for BoundedStager {
let path = self.base_dir.join(&dir_name);
fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
if let Some(notifier) = self.notifier.as_ref() {
notifier.on_cache_insert(size);
}
let guard = Arc::new(FsDirGuard {
path,
size,
delete_queue: self.delete_queue.clone(),
});
Ok(CacheValue::Dir { guard, size })
Ok(CacheValue::Dir(guard))
})
.await
.map(|_| ())
@@ -308,32 +372,35 @@ impl BoundedStager {
if meta.is_dir() {
let size = Self::get_dir_size(&path).await?;
let v = CacheValue::Dir {
guard: Arc::new(FsDirGuard {
path,
delete_queue: self.delete_queue.clone(),
}),
let v = CacheValue::Dir(Arc::new(FsDirGuard {
path,
size,
};
delete_queue: self.delete_queue.clone(),
}));
// A duplicate dir will be moved to the delete queue.
let _dup_dir = elems.insert(key, v);
} else {
let v = CacheValue::File {
guard: Arc::new(FsBlobGuard {
path,
delete_queue: self.delete_queue.clone(),
}),
size: meta.len(),
};
let size = meta.len();
let v = CacheValue::File(Arc::new(FsBlobGuard {
path,
size,
delete_queue: self.delete_queue.clone(),
}));
// A duplicate file will be moved to the delete queue.
let _dup_file = elems.insert(key, v);
}
}
}
let mut size = 0;
for (key, value) in elems {
size += value.size();
self.cache.insert(key, value).await;
}
if let Some(notifier) = self.notifier.as_ref() {
notifier.on_cache_insert(size);
}
self.cache.run_pending_tasks().await;
Ok(())
@@ -360,11 +427,12 @@ impl BoundedStager {
async fn delete_routine(
mut receiver: Receiver<DeleteTask>,
recycle_bin: Cache<String, CacheValue>,
notifier: Option<Arc<dyn StagerNotifier>>,
) {
loop {
match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
Ok(Some(task)) => match task {
DeleteTask::File(path) => {
DeleteTask::File(path, size) => {
if let Err(err) = fs::remove_file(&path).await {
if err.kind() == std::io::ErrorKind::NotFound {
continue;
@@ -372,8 +440,13 @@ impl BoundedStager {
warn!(err; "Failed to remove the file.");
}
if let Some(notifier) = notifier.as_ref() {
notifier.on_recycle_clear(size);
}
}
DeleteTask::Dir(path) => {
DeleteTask::Dir(path, size) => {
let deleted_path = path.with_extension(DELETED_EXTENSION);
if let Err(err) = fs::rename(&path, &deleted_path).await {
if err.kind() == std::io::ErrorKind::NotFound {
@@ -390,6 +463,9 @@ impl BoundedStager {
if let Err(err) = fs::remove_dir_all(&deleted_path).await {
warn!(err; "Failed to remove the dangling directory.");
}
if let Some(notifier) = notifier.as_ref() {
notifier.on_recycle_clear(size);
}
}
DeleteTask::Terminate => {
break;
@@ -415,15 +491,15 @@ impl Drop for BoundedStager {
#[derive(Debug, Clone)]
enum CacheValue {
File { guard: Arc<FsBlobGuard>, size: u64 },
Dir { guard: Arc<FsDirGuard>, size: u64 },
File(Arc<FsBlobGuard>),
Dir(Arc<FsDirGuard>),
}
impl CacheValue {
fn size(&self) -> u64 {
match self {
CacheValue::File { size, .. } => *size,
CacheValue::Dir { size, .. } => *size,
CacheValue::File(guard) => guard.size,
CacheValue::Dir(guard) => guard.size,
}
}
@@ -433,8 +509,8 @@ impl CacheValue {
}
enum DeleteTask {
File(PathBuf),
Dir(PathBuf),
File(PathBuf, u64),
Dir(PathBuf, u64),
Terminate,
}
@@ -443,6 +519,7 @@ enum DeleteTask {
#[derive(Debug)]
pub struct FsBlobGuard {
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
}
@@ -459,7 +536,7 @@ impl Drop for FsBlobGuard {
fn drop(&mut self) {
if let Err(err) = self
.delete_queue
.try_send(DeleteTask::File(self.path.clone()))
.try_send(DeleteTask::File(self.path.clone(), self.size))
{
if matches!(err, TrySendError::Closed(_)) {
return;
@@ -474,6 +551,7 @@ impl Drop for FsBlobGuard {
#[derive(Debug)]
pub struct FsDirGuard {
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
}
@@ -487,7 +565,7 @@ impl Drop for FsDirGuard {
fn drop(&mut self) {
if let Err(err) = self
.delete_queue
.try_send(DeleteTask::Dir(self.path.clone()))
.try_send(DeleteTask::Dir(self.path.clone(), self.size))
{
if matches!(err, TrySendError::Closed(_)) {
return;
@@ -526,7 +604,7 @@ impl BoundedStager {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
let path = match &value {
CacheValue::File { guard, .. } => &guard.path,
CacheValue::File(guard) => &guard.path,
_ => panic!("Expected a file, but got a directory."),
};
fs::File::open(path).await.unwrap()
@@ -536,7 +614,7 @@ impl BoundedStager {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
let path = match &value {
CacheValue::Dir { guard, .. } => &guard.path,
CacheValue::Dir(guard) => &guard.path,
_ => panic!("Expected a directory, but got a file."),
};
path.clone()
@@ -550,6 +628,8 @@ impl BoundedStager {
#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;
use common_base::range_read::RangeReader;
use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncWriteExt;
@@ -559,12 +639,124 @@ mod tests {
use crate::error::BlobNotFoundSnafu;
use crate::puffin_manager::stager::Stager;
struct MockNotifier {
cache_insert_size: AtomicU64,
cache_evict_size: AtomicU64,
cache_hit_count: AtomicU64,
cache_hit_size: AtomicU64,
cache_miss_count: AtomicU64,
cache_miss_size: AtomicU64,
recycle_insert_size: AtomicU64,
recycle_clear_size: AtomicU64,
}
#[derive(Debug, PartialEq, Eq)]
struct Stats {
cache_insert_size: u64,
cache_evict_size: u64,
cache_hit_count: u64,
cache_hit_size: u64,
cache_miss_count: u64,
cache_miss_size: u64,
recycle_insert_size: u64,
recycle_clear_size: u64,
}
impl MockNotifier {
fn build() -> Arc<MockNotifier> {
Arc::new(Self {
cache_insert_size: AtomicU64::new(0),
cache_evict_size: AtomicU64::new(0),
cache_hit_count: AtomicU64::new(0),
cache_hit_size: AtomicU64::new(0),
cache_miss_count: AtomicU64::new(0),
cache_miss_size: AtomicU64::new(0),
recycle_insert_size: AtomicU64::new(0),
recycle_clear_size: AtomicU64::new(0),
})
}
fn stats(&self) -> Stats {
Stats {
cache_insert_size: self
.cache_insert_size
.load(std::sync::atomic::Ordering::Relaxed),
cache_evict_size: self
.cache_evict_size
.load(std::sync::atomic::Ordering::Relaxed),
cache_hit_count: self
.cache_hit_count
.load(std::sync::atomic::Ordering::Relaxed),
cache_hit_size: self
.cache_hit_size
.load(std::sync::atomic::Ordering::Relaxed),
cache_miss_count: self
.cache_miss_count
.load(std::sync::atomic::Ordering::Relaxed),
cache_miss_size: self
.cache_miss_size
.load(std::sync::atomic::Ordering::Relaxed),
recycle_insert_size: self
.recycle_insert_size
.load(std::sync::atomic::Ordering::Relaxed),
recycle_clear_size: self
.recycle_clear_size
.load(std::sync::atomic::Ordering::Relaxed),
}
}
}
impl StagerNotifier for MockNotifier {
fn on_cache_insert(&self, size: u64) {
self.cache_insert_size
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
fn on_cache_evict(&self, size: u64) {
self.cache_evict_size
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
fn on_cache_hit(&self, size: u64) {
self.cache_hit_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.cache_hit_size
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
fn on_cache_miss(&self, size: u64) {
self.cache_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.cache_miss_size
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
fn on_recycle_insert(&self, size: u64) {
self.recycle_insert_size
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
fn on_recycle_clear(&self, size: u64) {
self.recycle_clear_size
.fetch_add(size, std::sync::atomic::Ordering::Relaxed);
}
fn on_load_blob(&self, _duration: Duration) {}
fn on_load_dir(&self, _duration: Duration) {}
}
#[tokio::test]
async fn test_get_blob() {
let tempdir = create_temp_dir("test_get_blob_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let notifier = MockNotifier::build();
let stager = BoundedStager::new(
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
)
.await
.unwrap();
let puffin_file_name = "test_get_blob";
let key = "key";
@@ -593,14 +785,34 @@ mod tests {
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 11,
cache_evict_size: 0,
cache_hit_count: 0,
cache_hit_size: 0,
cache_miss_count: 1,
cache_miss_size: 11,
recycle_insert_size: 0,
recycle_clear_size: 0,
}
);
}
#[tokio::test]
async fn test_get_dir() {
let tempdir = create_temp_dir("test_get_dir_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let notifier = MockNotifier::build();
let stager = BoundedStager::new(
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
)
.await
.unwrap();
let files_in_dir = [
("file_a", "Hello, world!".as_bytes()),
@@ -618,11 +830,13 @@ mod tests {
key,
Box::new(|writer_provider| {
Box::pin(async move {
let mut size = 0;
for (rel_path, content) in &files_in_dir {
size += content.len();
let mut writer = writer_provider.writer(rel_path).await.unwrap();
writer.write_all(content).await.unwrap();
}
Ok(0)
Ok(size as _)
})
}),
)
@@ -645,14 +859,34 @@ mod tests {
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 70,
cache_evict_size: 0,
cache_hit_count: 0,
cache_hit_size: 0,
cache_miss_count: 1,
cache_miss_size: 70,
recycle_insert_size: 0,
recycle_clear_size: 0
}
);
}
#[tokio::test]
async fn test_recover() {
let tempdir = create_temp_dir("test_recover_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let notifier = MockNotifier::build();
let stager = BoundedStager::new(
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
)
.await
.unwrap();
// initialize stager
let puffin_file_name = "test_recover";
@@ -687,11 +921,13 @@ mod tests {
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
let mut size = 0;
for (rel_path, content) in &files_in_dir {
size += content.len();
let mut writer = writer_provider.writer(rel_path).await.unwrap();
writer.write_all(content).await.unwrap();
}
Ok(0)
Ok(size as _)
})
}),
)
@@ -701,7 +937,7 @@ mod tests {
// recover stager
drop(stager);
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None)
.await
.unwrap();
@@ -736,14 +972,31 @@ mod tests {
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 81,
cache_evict_size: 0,
cache_hit_count: 0,
cache_hit_size: 0,
cache_miss_count: 2,
cache_miss_size: 81,
recycle_insert_size: 0,
recycle_clear_size: 0
}
);
}
#[tokio::test]
async fn test_eviction() {
let tempdir = create_temp_dir("test_eviction_");
let notifier = MockNotifier::build();
let stager = BoundedStager::new(
tempdir.path().to_path_buf(),
1, /* extremely small size */
Some(notifier.clone()),
)
.await
.unwrap();
@@ -773,6 +1026,21 @@ mod tests {
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 11,
cache_evict_size: 11,
cache_hit_count: 0,
cache_hit_size: 0,
cache_miss_count: 1,
cache_miss_size: 11,
recycle_insert_size: 11,
recycle_clear_size: 0
}
);
let m = reader.metadata().await.unwrap();
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"Hello world");
@@ -794,6 +1062,21 @@ mod tests {
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 22,
cache_evict_size: 22,
cache_hit_count: 1,
cache_hit_size: 11,
cache_miss_count: 1,
cache_miss_size: 11,
recycle_insert_size: 22,
recycle_clear_size: 11
}
);
let m = reader.metadata().await.unwrap();
let buf = reader.read(0..m.content_length).await.unwrap();
assert_eq!(&*buf, b"Hello world");
@@ -839,6 +1122,21 @@ mod tests {
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 92,
cache_evict_size: 92,
cache_hit_count: 1,
cache_hit_size: 11,
cache_miss_count: 2,
cache_miss_size: 81,
recycle_insert_size: 92,
recycle_clear_size: 11
}
);
// Second time to get the directory
let guard_1 = stager
.get_dir(
@@ -861,6 +1159,21 @@ mod tests {
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 162,
cache_evict_size: 162,
cache_hit_count: 2,
cache_hit_size: 81,
cache_miss_count: 2,
cache_miss_size: 81,
recycle_insert_size: 162,
recycle_clear_size: 81
}
);
// Third time to get the directory and all guards are dropped
drop(guard_0);
drop(guard_1);
@@ -884,12 +1197,27 @@ mod tests {
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 232,
cache_evict_size: 232,
cache_hit_count: 3,
cache_hit_size: 151,
cache_miss_count: 2,
cache_miss_size: 81,
recycle_insert_size: 232,
recycle_clear_size: 151
}
);
}
#[tokio::test]
async fn test_get_blob_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None)
.await
.unwrap();
@@ -926,7 +1254,7 @@ mod tests {
#[tokio::test]
async fn test_get_dir_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None)
.await
.unwrap();

View File

@@ -32,7 +32,7 @@ async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<Bounde
let path = staging_dir.path().to_path_buf();
(
staging_dir,
Arc::new(BoundedStager::new(path, capacity).await.unwrap()),
Arc::new(BoundedStager::new(path, capacity, None).await.unwrap()),
)
}