feat(puffin): implement MokaCacheManager (#4211)

* feat(puffin): implement MokaCacheManager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: +1s

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: corner case to get a blob

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: keep dir in used

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: add more tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add doc comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: toml format

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename unreleased_dirs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: refine some comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: handle more cornor cases

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: refine

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: simplify

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: more explanation

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: use recycle bin

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove instead

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: remove unnecessary removing

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-01 21:10:13 +08:00
committed by GitHub
parent f035a7c79c
commit b69b24a237
8 changed files with 1053 additions and 35 deletions

6
Cargo.lock generated
View File

@@ -8467,15 +8467,21 @@ dependencies = [
"async-compression 0.4.11",
"async-trait",
"async-walkdir",
"base64 0.21.7",
"bitflags 2.5.0",
"common-error",
"common-macro",
"common-runtime",
"common-telemetry",
"common-test-util",
"derive_builder 0.12.0",
"futures",
"lz4_flex 0.11.3",
"moka",
"pin-project",
"serde",
"serde_json",
"sha2",
"snafu 0.8.3",
"tokio",
"tokio-util",

View File

@@ -11,16 +11,24 @@ workspace = true
async-compression = "0.4.11"
async-trait.workspace = true
async-walkdir = "2.0.0"
base64.workspace = true
bitflags.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
derive_builder.workspace = true
futures.workspace = true
lz4_flex = "0.11"
moka.workspace = true
pin-project.workspace = true
serde.workspace = true
serde_json.workspace = true
sha2 = "0.10.8"
snafu.workspace = true
tokio.workspace = true
tokio-util.workspace = true
uuid.workspace = true
[dev-dependencies]
common-test-util.workspace = true

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use std::io::Error as IoError;
use std::sync::Arc;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -80,6 +81,30 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to create"))]
Create {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to rename"))]
Rename {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to remove"))]
Remove {
#[snafu(source)]
error: IoError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error while walking directory"))]
WalkDirError {
#[snafu(source)]
@@ -220,6 +245,9 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Get value from cache"))]
CacheGet { source: Arc<Error> },
}
impl ErrorExt for Error {
@@ -235,6 +263,9 @@ impl ErrorExt for Error {
| Close { .. }
| Open { .. }
| Metadata { .. }
| Create { .. }
| Remove { .. }
| Rename { .. }
| SerializeJson { .. }
| BytesToInteger { .. }
| ParseStageNotMatch { .. }
@@ -254,6 +285,8 @@ impl ErrorExt for Error {
}
DuplicateBlob { .. } => StatusCode::InvalidArguments,
CacheGet { source } => source.status_code(),
}
}

View File

@@ -19,6 +19,7 @@ pub mod file_accessor;
use std::path::PathBuf;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek};
use crate::blob_metadata::CompressionCodec;
@@ -69,12 +70,31 @@ pub struct PutOptions {
/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
pub trait PuffinReader {
type Reader: AsyncRead + AsyncSeek;
type Blob: BlobGuard;
type Dir: DirGuard;
/// Reads a blob from the Puffin file.
async fn blob(&self, key: &str) -> Result<Self::Reader>;
///
/// The returned `BlobGuard` is used to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<Self::Blob>;
/// Reads a directory from the Puffin file.
/// The returned `PathBuf` is used to access the directory in the filesystem.
async fn dir(&self, key: &str) -> Result<PathBuf>;
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn dir(&self, key: &str) -> Result<Self::Dir>;
}
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
}
/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}

View File

@@ -12,14 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod moka_cache_manager;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use futures::AsyncWrite;
use crate::error::Result;
use crate::puffin_manager::{BlobGuard, DirGuard};
pub type BoxWriter = Box<dyn AsyncWrite + Unpin + Send>;
@@ -39,34 +42,41 @@ pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
///
/// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the cache.
pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult;
pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult;
/// Function that initializes a directory.
///
/// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the cache.
pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult;
pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;
/// `CacheManager` manages the cache for the puffin files.
#[async_trait]
pub trait CacheManager {
type Reader: AsyncRead + AsyncSeek;
type Blob: BlobGuard;
type Dir: DirGuard;
/// Retrieves a blob, initializing it if necessary using the provided `init_fn`.
///
/// The returned `BlobGuard` is used to access the blob reader.
/// The caller is responsible for holding the `BlobGuard` until they are done with the blob.
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_factory: Box<dyn InitBlobFn + Send + 'a>,
) -> Result<Self::Reader>;
init_factory: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob>;
/// Retrieves a directory, initializing it if necessary using the provided `init_fn`.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_fn: Box<dyn InitDirFn + Send + 'a>,
) -> Result<PathBuf>;
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
/// Stores a directory in the cache.
async fn put_dir(
@@ -78,4 +88,4 @@ pub trait CacheManager {
) -> Result<()>;
}
pub type CacheManagerRef<R> = Arc<dyn CacheManager<Reader = R> + Send + Sync>;
pub type CacheManagerRef<B, D> = Arc<dyn CacheManager<Blob = B, Dir = D> + Send + Sync>;

View File

@@ -0,0 +1,939 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use async_walkdir::{Filtering, WalkDir};
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
use common_telemetry::{info, warn};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt};
use moka::future::Cache;
use sha2::{Digest, Sha256};
use snafu::ResultExt;
use tokio::fs;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
Result, WalkDirSnafu,
};
use crate::puffin_manager::cache_manager::{
BoxWriter, CacheManager, DirWriterProvider, InitBlobFn, InitDirFn,
};
use crate::puffin_manager::{BlobGuard, DirGuard};
const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp";
const DELETED_EXTENSION: &str = "deleted";
/// `MokaCacheManager` is a `CacheManager` that uses `moka` to manage cache.
pub struct MokaCacheManager {
/// The base directory of the cache.
base_dir: PathBuf,
/// The cache maintaining the cache key to the size of the file or directory.
cache: Cache<String, CacheValue>,
/// The recycle bin for the deleted files and directories.
recycle_bin: Cache<String, CacheValue>,
/// The delete queue for the cleanup task.
///
/// The lifetime of a guard is:
/// 1. initially inserted into the cache
/// 2. moved to the recycle bin when evicted
/// 2.1 moved back to the cache when accessed
/// 2.2 deleted from the recycle bin after a certain period
/// 3. sent the delete task to the delete queue on drop
/// 4. background routine removes the file or directory
delete_queue: Sender<DeleteTask>,
}
impl MokaCacheManager {
#[allow(unused)]
pub async fn new(base_dir: PathBuf, max_size: u64) -> Result<Self> {
let recycle_bin = Cache::builder()
.time_to_live(Duration::from_secs(60))
.build();
let recycle_bin_cloned = recycle_bin.clone();
let cache = Cache::builder()
.max_capacity(max_size)
.weigher(|_: &String, v: &CacheValue| v.weight())
.async_eviction_listener(move |k, v, _| {
let recycle_bin = recycle_bin_cloned.clone();
async move {
recycle_bin.insert(k.as_str().to_string(), v).await;
}
.boxed()
})
.build();
let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
common_runtime::bg_runtime().spawn(Self::delete_routine(rx));
let manager = Self {
cache,
base_dir,
delete_queue,
recycle_bin,
};
manager.recover().await?;
Ok(manager)
}
}
#[async_trait]
impl CacheManager for MokaCacheManager {
type Blob = Arc<FsBlobGuard>;
type Dir = Arc<FsDirGuard>;
async fn get_blob<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
) -> Result<Self::Blob> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let v = self
.cache
.try_get_with(cache_key.clone(), async {
if let Some(v) = self.recycle_bin.remove(&cache_key).await {
return Ok(v);
}
let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&file_name);
let size = Self::write_blob(&path, &init_fn).await?;
let guard = Arc::new(FsBlobGuard {
path,
delete_queue: self.delete_queue.clone(),
});
Ok(CacheValue::File { guard, size })
})
.await
.context(CacheGetSnafu)?;
match v {
CacheValue::File { guard, .. } => Ok(guard),
_ => unreachable!(),
}
}
async fn get_dir<'a>(
&self,
puffin_file_name: &str,
key: &str,
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let v = self
.cache
.try_get_with(cache_key.clone(), async {
if let Some(v) = self.recycle_bin.remove(&cache_key).await {
return Ok(v);
}
let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&dir_name);
let size = Self::write_dir(&path, &init_fn).await?;
let guard = Arc::new(FsDirGuard {
path,
delete_queue: self.delete_queue.clone(),
});
Ok(CacheValue::Dir { guard, size })
})
.await
.context(CacheGetSnafu)?;
match v {
CacheValue::Dir { guard, .. } => Ok(guard),
_ => unreachable!(),
}
}
async fn put_dir(
&self,
puffin_file_name: &str,
key: &str,
dir_path: PathBuf,
size: u64,
) -> Result<()> {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
self.cache
.try_get_with(cache_key.clone(), async move {
if let Some(v) = self.recycle_bin.remove(&cache_key).await {
return Ok(v);
}
let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
let path = self.base_dir.join(&dir_name);
fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
let guard = Arc::new(FsDirGuard {
path,
delete_queue: self.delete_queue.clone(),
});
Ok(CacheValue::Dir { guard, size })
})
.await
.map(|_| ())
.context(CacheGetSnafu)
}
}
impl MokaCacheManager {
fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(puffin_file_name);
hasher.update(key);
hasher.update(puffin_file_name);
let hash = hasher.finalize();
BASE64_URL_SAFE.encode(hash)
}
async fn write_blob(
target_path: &PathBuf,
init_fn: &(dyn InitBlobFn + Send + Sync + '_),
) -> Result<u64> {
// To guarantee the atomicity of writing the file, we need to write
// the file to a temporary file first...
let tmp_path = target_path.with_extension(TMP_EXTENSION);
let writer = Box::new(
fs::File::create(&tmp_path)
.await
.context(CreateSnafu)?
.compat_write(),
);
let size = init_fn(writer).await?;
// ...then rename the temporary file to the target path
fs::rename(tmp_path, target_path)
.await
.context(RenameSnafu)?;
Ok(size)
}
async fn write_dir(
target_path: &PathBuf,
init_fn: &(dyn InitDirFn + Send + Sync + '_),
) -> Result<u64> {
// To guarantee the atomicity of writing the directory, we need to write
// the directory to a temporary directory first...
let tmp_base = target_path.with_extension(TMP_EXTENSION);
let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
let size = init_fn(writer_provider).await?;
// ...then rename the temporary directory to the target path
fs::rename(&tmp_base, target_path)
.await
.context(RenameSnafu)?;
Ok(size)
}
/// Recovers the cache by iterating through the cache directory.
async fn recover(&self) -> Result<()> {
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
let mut elems = HashMap::new();
while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
let path = entry.path();
if path.extension() == Some(TMP_EXTENSION.as_ref())
|| path.extension() == Some(DELETED_EXTENSION.as_ref())
{
// Remove temporary or deleted files and directories
if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
fs::remove_dir_all(path).await.context(RemoveSnafu)?;
} else {
fs::remove_file(path).await.context(RemoveSnafu)?;
}
} else {
// Insert the size of the file or directory to the cache
let meta = entry.metadata().await.context(MetadataSnafu)?;
let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
// <key>.<uuid>
let key = match file_path.split('.').next() {
Some(key) => key.to_string(),
None => {
warn!(
"Invalid cache file name: {}, expected format: <key>.<uuid>",
file_path
);
continue;
}
};
if meta.is_dir() {
let size = Self::get_dir_size(&path).await?;
let v = CacheValue::Dir {
guard: Arc::new(FsDirGuard {
path,
delete_queue: self.delete_queue.clone(),
}),
size,
};
// A duplicate dir will be moved to the delete queue.
let _dup_dir = elems.insert(key, v);
} else {
let v = CacheValue::File {
guard: Arc::new(FsBlobGuard {
path,
delete_queue: self.delete_queue.clone(),
}),
size: meta.len(),
};
// A duplicate file will be moved to the delete queue.
let _dup_file = elems.insert(key, v);
}
}
}
for (key, value) in elems {
self.cache.insert(key, value).await;
}
Ok(())
}
/// Walks through the directory and calculate the total size of all files in the directory.
async fn get_dir_size(path: &PathBuf) -> Result<u64> {
let mut size = 0;
let mut wd = WalkDir::new(path).filter(|entry| async move {
match entry.file_type().await {
Ok(ft) if ft.is_dir() => Filtering::Ignore,
_ => Filtering::Continue,
}
});
while let Some(entry) = wd.next().await {
let entry = entry.context(WalkDirSnafu)?;
size += entry.metadata().await.context(MetadataSnafu)?.len();
}
Ok(size)
}
async fn delete_routine(mut receiver: Receiver<DeleteTask>) {
while let Some(task) = receiver.recv().await {
match task {
DeleteTask::File(path) => {
if let Err(err) = fs::remove_file(&path).await {
if err.kind() == std::io::ErrorKind::NotFound {
continue;
}
warn!(err; "Failed to remove the file.");
}
}
DeleteTask::Dir(path) => {
let deleted_path = path.with_extension(DELETED_EXTENSION);
if let Err(err) = fs::rename(&path, &deleted_path).await {
if err.kind() == std::io::ErrorKind::NotFound {
continue;
}
// Remove the deleted directory if the rename fails and retry
let _ = fs::remove_dir_all(&deleted_path).await;
if let Err(err) = fs::rename(&path, &deleted_path).await {
warn!(err; "Failed to rename the dangling directory to deleted path.");
continue;
}
}
if let Err(err) = fs::remove_dir_all(&deleted_path).await {
warn!(err; "Failed to remove the dangling directory.");
}
}
DeleteTask::Terminate => {
break;
}
}
}
info!("The delete routine for moka cache manager is terminated.");
}
}
impl Drop for MokaCacheManager {
fn drop(&mut self) {
let _ = self.delete_queue.try_send(DeleteTask::Terminate);
}
}
#[derive(Debug, Clone)]
enum CacheValue {
File { guard: Arc<FsBlobGuard>, size: u64 },
Dir { guard: Arc<FsDirGuard>, size: u64 },
}
impl CacheValue {
fn size(&self) -> u64 {
match self {
CacheValue::File { size, .. } => *size,
CacheValue::Dir { size, .. } => *size,
}
}
fn weight(&self) -> u32 {
self.size().try_into().unwrap_or(u32::MAX)
}
}
enum DeleteTask {
File(PathBuf),
Dir(PathBuf),
Terminate,
}
/// `FsBlobGuard` is a `BlobGuard` for accessing the blob and
/// automatically deleting the file on drop.
#[derive(Debug)]
pub struct FsBlobGuard {
path: PathBuf,
delete_queue: Sender<DeleteTask>,
}
impl BlobGuard for Arc<FsBlobGuard> {
type Reader = Compat<fs::File>;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>> {
let path = self.path.clone();
async move {
let file = fs::File::open(&path).await.context(OpenSnafu)?;
Ok(file.compat())
}
.boxed()
}
}
impl Drop for FsBlobGuard {
fn drop(&mut self) {
if let Err(err) = self
.delete_queue
.try_send(DeleteTask::File(self.path.clone()))
{
if matches!(err, TrySendError::Closed(_)) {
return;
}
warn!(err; "Failed to send the delete task for the file.");
}
}
}
/// `FsDirGuard` is a `DirGuard` for accessing the directory and
/// automatically deleting the directory on drop.
#[derive(Debug)]
pub struct FsDirGuard {
path: PathBuf,
delete_queue: Sender<DeleteTask>,
}
impl DirGuard for Arc<FsDirGuard> {
fn path(&self) -> &PathBuf {
&self.path
}
}
impl Drop for FsDirGuard {
fn drop(&mut self) {
if let Err(err) = self
.delete_queue
.try_send(DeleteTask::Dir(self.path.clone()))
{
if matches!(err, TrySendError::Closed(_)) {
return;
}
warn!(err; "Failed to send the delete task for the directory.");
}
}
}
/// `MokaDirWriterProvider` implements `DirWriterProvider` for initializing a directory.
struct MokaDirWriterProvider(PathBuf);
#[async_trait]
impl DirWriterProvider for MokaDirWriterProvider {
async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
let full_path = self.0.join(rel_path);
if let Some(parent) = full_path.parent() {
fs::create_dir_all(parent).await.context(CreateSnafu)?;
}
Ok(Box::new(
fs::File::create(full_path)
.await
.context(CreateSnafu)?
.compat_write(),
) as BoxWriter)
}
}
#[cfg(test)]
impl MokaCacheManager {
pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
let path = match &value {
CacheValue::File { guard, .. } => &guard.path,
_ => panic!("Expected a file, but got a directory."),
};
fs::File::open(path).await.unwrap()
}
pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
let path = match &value {
CacheValue::Dir { guard, .. } => &guard.path,
_ => panic!("Expected a directory, but got a file."),
};
path.clone()
}
pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
self.cache.contains_key(&cache_key)
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncReadExt as _;
use super::*;
use crate::error::BlobNotFoundSnafu;
use crate::puffin_manager::cache_manager::CacheManager;
#[tokio::test]
async fn test_get_blob() {
let tempdir = create_temp_dir("test_get_blob_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let puffin_file_name = "test_get_blob";
let key = "key";
let mut reader = manager
.get_blob(
puffin_file_name,
key,
Box::new(|mut writer| {
Box::pin(async move {
writer.write_all(b"hello world").await.unwrap();
Ok(11)
})
}),
)
.await
.unwrap()
.reader()
.await
.unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
let mut file = manager.must_get_file(puffin_file_name, key).await;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
}
#[tokio::test]
async fn test_get_dir() {
let tempdir = create_temp_dir("test_get_dir_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let files_in_dir = [
("file_a", "Hello, world!".as_bytes()),
("file_b", "Hello, Rust!".as_bytes()),
("file_c", "你好,世界!".as_bytes()),
("subdir/file_d", "Hello, Puffin!".as_bytes()),
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
let puffin_file_name = "test_get_dir";
let key = "key";
let dir_path = manager
.get_dir(
puffin_file_name,
key,
Box::new(|writer_provider| {
Box::pin(async move {
for (rel_path, content) in &files_in_dir {
let mut writer = writer_provider.writer(rel_path).await.unwrap();
writer.write_all(content).await.unwrap();
}
Ok(0)
})
}),
)
.await
.unwrap();
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
let dir_path = manager.must_get_dir(puffin_file_name, key).await;
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
}
#[tokio::test]
async fn test_recover() {
let tempdir = create_temp_dir("test_recover_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
// initialize cache
let puffin_file_name = "test_recover";
let blob_key = "blob_key";
let guard = manager
.get_blob(
puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
writer.write_all(b"hello world").await.unwrap();
Ok(11)
})
}),
)
.await
.unwrap();
drop(guard);
let files_in_dir = [
("file_a", "Hello, world!".as_bytes()),
("file_b", "Hello, Rust!".as_bytes()),
("file_c", "你好,世界!".as_bytes()),
("subdir/file_d", "Hello, Puffin!".as_bytes()),
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
let dir_key = "dir_key";
let guard = manager
.get_dir(
puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
for (rel_path, content) in &files_in_dir {
let mut writer = writer_provider.writer(rel_path).await.unwrap();
writer.write_all(content).await.unwrap();
}
Ok(0)
})
}),
)
.await
.unwrap();
drop(guard);
// recover cache
drop(manager);
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let mut reader = manager
.get_blob(
puffin_file_name,
blob_key,
Box::new(|_| Box::pin(async { Ok(0) })),
)
.await
.unwrap()
.reader()
.await
.unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
let dir_path = manager
.get_dir(
puffin_file_name,
dir_key,
Box::new(|_| Box::pin(async { Ok(0) })),
)
.await
.unwrap();
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
}
#[tokio::test]
async fn test_eviction() {
let tempdir = create_temp_dir("test_eviction_");
let manager = MokaCacheManager::new(
tempdir.path().to_path_buf(),
1, /* extremely small size */
)
.await
.unwrap();
let puffin_file_name = "test_eviction";
let blob_key = "blob_key";
// First time to get the blob
let mut reader = manager
.get_blob(
puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
writer.write_all(b"Hello world").await.unwrap();
Ok(11)
})
}),
)
.await
.unwrap()
.reader()
.await
.unwrap();
// The blob should be evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, blob_key));
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"Hello world");
// Second time to get the blob, get from recycle bin
let mut reader = manager
.get_blob(
puffin_file_name,
blob_key,
Box::new(|_| async { Ok(0) }.boxed()),
)
.await
.unwrap()
.reader()
.await
.unwrap();
// The blob should be evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, blob_key));
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"Hello world");
let dir_key = "dir_key";
let files_in_dir = [
("file_a", "Hello, world!".as_bytes()),
("file_b", "Hello, Rust!".as_bytes()),
("file_c", "你好,世界!".as_bytes()),
("subdir/file_d", "Hello, Puffin!".as_bytes()),
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
// First time to get the directory
let guard_0 = manager
.get_dir(
puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
let mut size = 0;
for (rel_path, content) in &files_in_dir {
let mut writer = writer_provider.writer(rel_path).await.unwrap();
writer.write_all(content).await.unwrap();
size += content.len() as u64;
}
Ok(size)
})
}),
)
.await
.unwrap();
for (rel_path, content) in &files_in_dir {
let file_path = guard_0.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
// The directory should be evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, dir_key));
// Second time to get the directory
let guard_1 = manager
.get_dir(
puffin_file_name,
dir_key,
Box::new(|_| async { Ok(0) }.boxed()),
)
.await
.unwrap();
for (rel_path, content) in &files_in_dir {
let file_path = guard_1.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
// Still hold the guard
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, dir_key));
// Third time to get the directory and all guards are dropped
drop(guard_0);
drop(guard_1);
let guard_2 = manager
.get_dir(
puffin_file_name,
dir_key,
Box::new(|_| Box::pin(async move { Ok(0) })),
)
.await
.unwrap();
// Still hold the guard, so the directory should not be removed even if it's evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, blob_key));
for (rel_path, content) in &files_in_dir {
let file_path = guard_2.path().join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, *content);
}
}
#[tokio::test]
async fn test_get_blob_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let puffin_file_name = "test_get_blob_concurrency_on_fail";
let key = "key";
let manager = Arc::new(manager);
let handles = (0..10)
.map(|_| {
let manager = manager.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
BlobNotFoundSnafu { blob: "whatever" }.fail()
}
.boxed()
});
manager.get_blob(puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
})
.collect::<Vec<_>>();
for handle in handles {
let r = handle.await.unwrap();
assert!(r.is_err());
}
assert!(!manager.in_cache(puffin_file_name, key));
}
#[tokio::test]
async fn test_get_dir_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let puffin_file_name = "test_get_dir_concurrency_on_fail";
let key = "key";
let manager = Arc::new(manager);
let handles = (0..10)
.map(|_| {
let manager = manager.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
BlobNotFoundSnafu { blob: "whatever" }.fail()
}
.boxed()
});
manager.get_dir(puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
})
.collect::<Vec<_>>();
for handle in handles {
let r = handle.await.unwrap();
assert!(r.is_err());
}
assert!(!manager.in_cache(puffin_file_name, key));
}
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
use futures::future::BoxFuture;
@@ -30,25 +28,25 @@ use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use crate::puffin_manager::cache_manager::{BoxWriter, CacheManagerRef, DirWriterProviderRef};
use crate::puffin_manager::cached_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::PuffinReader;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader};
/// `CachedPuffinReader` is a `PuffinReader` that provides cached readers for puffin files.
pub struct CachedPuffinReader<CR, AR, AW> {
pub struct CachedPuffinReader<B, G, AR, AW> {
/// The name of the puffin file.
puffin_file_name: String,
/// The cache manager.
cache_manager: CacheManagerRef<CR>,
cache_manager: CacheManagerRef<B, G>,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
}
impl<CR, AR, AW> CachedPuffinReader<CR, AR, AW> {
impl<B, D, AR, AW> CachedPuffinReader<B, D, AR, AW> {
#[allow(unused)]
pub(crate) fn new(
puffin_file_name: String,
cache_manager: CacheManagerRef<CR>,
cache_manager: CacheManagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
Self {
@@ -60,15 +58,17 @@ impl<CR, AR, AW> CachedPuffinReader<CR, AR, AW> {
}
#[async_trait]
impl<CR, AR, AW> PuffinReader for CachedPuffinReader<CR, AR, AW>
impl<B, D, AR, AW> PuffinReader for CachedPuffinReader<B, D, AR, AW>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
CR: AsyncRead + AsyncSeek,
{
type Reader = CR;
type Blob = B;
type Dir = D;
async fn blob(&self, key: &str) -> Result<Self::Reader> {
async fn blob(&self, key: &str) -> Result<Self::Blob> {
self.cache_manager
.get_blob(
self.puffin_file_name.as_str(),
@@ -83,7 +83,7 @@ where
.await
}
async fn dir(&self, key: &str) -> Result<PathBuf> {
async fn dir(&self, key: &str) -> Result<Self::Dir> {
self.cache_manager
.get_dir(
self.puffin_file_name.as_str(),
@@ -99,11 +99,12 @@ where
}
}
impl<CR, AR, AW> CachedPuffinReader<CR, AR, AW>
impl<B, G, AR, AW> CachedPuffinReader<B, G, AR, AW>
where
B: BlobGuard,
G: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
CR: AsyncRead + AsyncSeek,
{
fn init_blob_to_cache(
puffin_file_name: String,

View File

@@ -18,7 +18,7 @@ use std::path::PathBuf;
use async_compression::futures::bufread::ZstdEncoder;
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncRead, AsyncSeek, AsyncWrite, StreamExt};
use futures::{AsyncRead, AsyncWrite, StreamExt};
use snafu::{ensure, ResultExt};
use tokio_util::compat::TokioAsyncReadCompatExt;
use uuid::Uuid;
@@ -31,15 +31,15 @@ use crate::error::{
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use crate::puffin_manager::cache_manager::CacheManagerRef;
use crate::puffin_manager::cached_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::{PuffinWriter, PutOptions};
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions};
/// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct CachedPuffinWriter<CR, W> {
pub struct CachedPuffinWriter<B, D, W> {
/// The name of the puffin file.
puffin_file_name: String,
/// The cache manager.
cache_manager: CacheManagerRef<CR>,
cache_manager: CacheManagerRef<B, D>,
/// The underlying `PuffinFileWriter`.
puffin_file_writer: PuffinFileWriter<W>,
@@ -48,11 +48,11 @@ pub struct CachedPuffinWriter<CR, W> {
blob_keys: HashSet<String>,
}
impl<CR, W> CachedPuffinWriter<CR, W> {
impl<B, D, W> CachedPuffinWriter<B, D, W> {
#[allow(unused)]
pub(crate) fn new(
puffin_file_name: String,
cache_manager: CacheManagerRef<CR>,
cache_manager: CacheManagerRef<B, D>,
writer: W,
) -> Self {
Self {
@@ -65,9 +65,10 @@ impl<CR, W> CachedPuffinWriter<CR, W> {
}
#[async_trait]
impl<CR, W> PuffinWriter for CachedPuffinWriter<CR, W>
impl<B, D, W> PuffinWriter for CachedPuffinWriter<B, D, W>
where
CR: AsyncRead + AsyncSeek,
B: BlobGuard,
D: DirGuard,
W: AsyncWrite + Unpin + Send,
{
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>