feat(puffin): complete dir support (#4240)

* feat(puffin): implement CachedPuffinReader

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: next PR to introduce CachedPuffinManager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(puffin): implement MokaCacheManager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: +1s

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat(puffin): implement CachedPuffinManager and add tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: corner case to get a blob

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: keep dir in used

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: add more tests

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add doc comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: toml format

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename unreleased_dirs

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: refine some comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: handle more cornor cases

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: refine

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: simplify

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: more explanation

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: comment compressed

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: Cached* -> Fs*

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: CacheManager -> Stager

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: rename Puffin(A)sync* -> (A)sync*

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-02 13:43:06 +08:00
committed by GitHub
parent ea081c95bf
commit db5d1162f0
17 changed files with 642 additions and 225 deletions

View File

@@ -22,7 +22,7 @@ use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use puffin::file_format::reader::{AsyncReader, PuffinFileReader};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -194,7 +194,7 @@ mod tests {
use futures::io::Cursor;
use index::inverted_index::search::index_apply::MockIndexApplier;
use object_store::services::Memory;
use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use super::*;
use crate::error::Error;

View File

@@ -26,7 +26,7 @@ use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use object_store::ObjectStore;
use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use tokio::io::duplex;

View File

@@ -56,7 +56,7 @@ pub struct BlobMetadata {
pub length: i64,
/// See [`CompressionCodec`]. If omitted, the data is assumed to be uncompressed.
#[builder(default, setter(strip_option))]
#[builder(default)]
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub compression_codec: Option<CompressionCodec>,
@@ -107,7 +107,7 @@ mod tests {
.sequence_number(200)
.offset(300)
.length(400)
.compression_codec(CompressionCodec::Lz4)
.compression_codec(Some(CompressionCodec::Lz4))
.properties(properties)
.build()
.unwrap();

View File

@@ -22,25 +22,29 @@ use crate::error::Result;
pub use crate::file_format::reader::file::PuffinFileReader;
use crate::file_metadata::FileMetadata;
/// `PuffinSyncReader` defines a synchronous reader for puffin data.
pub trait PuffinSyncReader<'a> {
/// `SyncReader` defines a synchronous reader for puffin data.
pub trait SyncReader<'a> {
type Reader: std::io::Read + std::io::Seek;
/// fetch the FileMetadata
/// Fetches the FileMetadata.
fn metadata(&'a mut self) -> Result<FileMetadata>;
/// read particular blob data based on given metadata
/// Reads particular blob data based on given metadata.
///
/// Data read from the reader is compressed leaving the caller to decompress the data.
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
}
/// `PuffinAsyncReader` defines an asynchronous reader for puffin data.
/// `AsyncReader` defines an asynchronous reader for puffin data.
#[async_trait]
pub trait PuffinAsyncReader<'a> {
pub trait AsyncReader<'a> {
type Reader: futures::AsyncRead + futures::AsyncSeek;
/// fetch the FileMetadata
/// Fetches the FileMetadata.
async fn metadata(&'a mut self) -> Result<FileMetadata>;
/// read particular blob data based on given metadata
/// Reads particular blob data based on given metadata.
///
/// Data read from the reader is compressed leaving the caller to decompress the data.
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader>;
}

View File

@@ -24,7 +24,7 @@ use crate::error::{
UnsupportedDecompressionSnafu,
};
use crate::file_format::reader::footer::FooterParser;
use crate::file_format::reader::{PuffinAsyncReader, PuffinSyncReader};
use crate::file_format::reader::{AsyncReader, SyncReader};
use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE};
use crate::file_metadata::FileMetadata;
use crate::partial_reader::PartialReader;
@@ -63,7 +63,7 @@ impl<R> PuffinFileReader<R> {
}
}
impl<'a, R: io::Read + io::Seek + 'a> PuffinSyncReader<'a> for PuffinFileReader<R> {
impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader<R> {
type Reader = PartialReader<&'a mut R>;
fn metadata(&mut self) -> Result<FileMetadata> {
@@ -103,9 +103,7 @@ impl<'a, R: io::Read + io::Seek + 'a> PuffinSyncReader<'a> for PuffinFileReader<
}
#[async_trait]
impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> PuffinAsyncReader<'a>
for PuffinFileReader<R>
{
impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> AsyncReader<'a> for PuffinFileReader<R> {
type Reader = PartialReader<&'a mut R>;
async fn metadata(&'a mut self) -> Result<FileMetadata> {
@@ -132,15 +130,6 @@ impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> PuffinAsyncReader<'a>
}
fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result<Self::Reader> {
// TODO(zhongzc): support decompression
let compression = blob_metadata.compression_codec.as_ref();
ensure!(
compression.is_none(),
UnsupportedDecompressionSnafu {
decompression: compression.unwrap().to_string()
}
);
Ok(PartialReader::new(
&mut self.source,
blob_metadata.offset as _,

View File

@@ -40,8 +40,8 @@ pub struct Blob<R> {
pub properties: HashMap<String, String>,
}
/// The trait for writing Puffin files synchronously
pub trait PuffinSyncWriter {
/// `SyncWriter` defines a synchronous writer for puffin data.
pub trait SyncWriter {
/// Set the properties of the Puffin file
fn set_properties(&mut self, properties: HashMap<String, String>);
@@ -55,9 +55,9 @@ pub trait PuffinSyncWriter {
fn finish(&mut self) -> Result<u64>;
}
/// The trait for writing Puffin files asynchronously
/// `AsyncWriter` defines an asynchronous writer for puffin data.
#[async_trait]
pub trait PuffinAsyncWriter {
pub trait AsyncWriter {
/// Set the properties of the Puffin file
fn set_properties(&mut self, properties: HashMap<String, String>);

View File

@@ -19,10 +19,10 @@ use async_trait::async_trait;
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use snafu::ResultExt;
use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder};
use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder, CompressionCodec};
use crate::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
use crate::file_format::writer::footer::FooterWriter;
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinSyncWriter};
use crate::file_format::writer::{AsyncWriter, Blob, SyncWriter};
use crate::file_format::MAGIC;
/// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`]
@@ -57,12 +57,14 @@ impl<W> PuffinFileWriter<W> {
fn create_blob_metadata(
&self,
typ: String,
compression_codec: Option<CompressionCodec>,
properties: HashMap<String, String>,
size: u64,
) -> BlobMetadata {
BlobMetadataBuilder::default()
.blob_type(typ)
.properties(properties)
.compression_codec(compression_codec)
.offset(self.written_bytes as _)
.length(size as _)
.build()
@@ -70,7 +72,7 @@ impl<W> PuffinFileWriter<W> {
}
}
impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
impl<W: io::Write> SyncWriter for PuffinFileWriter<W> {
fn set_properties(&mut self, properties: HashMap<String, String>) {
self.properties = properties;
}
@@ -80,7 +82,12 @@ impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
let size = io::copy(&mut blob.compressed_data, &mut self.writer).context(WriteSnafu)?;
let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
let blob_metadata = self.create_blob_metadata(
blob.blob_type,
blob.compression_codec,
blob.properties,
size,
);
self.blob_metadata.push(blob_metadata);
self.written_bytes += size;
@@ -101,7 +108,7 @@ impl<W: io::Write> PuffinSyncWriter for PuffinFileWriter<W> {
}
#[async_trait]
impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
impl<W: AsyncWrite + Unpin + Send> AsyncWriter for PuffinFileWriter<W> {
fn set_properties(&mut self, properties: HashMap<String, String>) {
self.properties = properties;
}
@@ -113,7 +120,12 @@ impl<W: AsyncWrite + Unpin + Send> PuffinAsyncWriter for PuffinFileWriter<W> {
.await
.context(WriteSnafu)?;
let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size);
let blob_metadata = self.create_blob_metadata(
blob.blob_type,
blob.compression_codec,
blob.properties,
size,
);
self.blob_metadata.push(blob_metadata);
self.written_bytes += size;

View File

@@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod cache_manager;
pub mod cached_puffin_manager;
pub mod file_accessor;
pub mod fs_puffin_manager;
pub mod stager;
#[cfg(test)]
mod tests;
use std::path::PathBuf;
@@ -89,7 +92,7 @@ pub trait PuffinReader {
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek;
type Reader: AsyncRead + AsyncSeek + Unpin;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
}

View File

@@ -1,20 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod dir_meta;
mod reader;
mod writer;
pub use reader::CachedPuffinReader;
pub use writer::CachedPuffinWriter;

View File

@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod dir_meta;
mod reader;
mod writer;
use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
pub use reader::FsPuffinReader;
pub use writer::FsPuffinWriter;
use crate::error::Result;
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinManager};
/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
pub struct FsPuffinManager<B, D, AR, AW> {
/// The stager.
stager: StagerRef<B, D>,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
}
impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
/// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`.
pub fn new(
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
Self {
stager,
puffin_file_accessor,
}
}
}
#[async_trait]
impl<B, D, AR, AW> PuffinManager for FsPuffinManager<B, D, AR, AW>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + Send + Unpin + 'static,
{
type Reader = FsPuffinReader<B, D, AR, AW>;
type Writer = FsPuffinWriter<B, D, AW>;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FsPuffinReader::new(
puffin_file_name.to_string(),
self.stager.clone(),
self.puffin_file_accessor.clone(),
))
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let writer = self.puffin_file_accessor.writer(puffin_file_name).await?;
Ok(FsPuffinWriter::new(
puffin_file_name.to_string(),
self.stager.clone(),
writer,
))
}
}

View File

@@ -24,41 +24,40 @@ use crate::error::{
BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu,
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use crate::puffin_manager::cache_manager::{BoxWriter, CacheManagerRef, DirWriterProviderRef};
use crate::puffin_manager::cached_puffin_manager::dir_meta::DirMetadata;
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, StagerRef};
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader};
/// `CachedPuffinReader` is a `PuffinReader` that provides cached readers for puffin files.
pub struct CachedPuffinReader<B, G, AR, AW> {
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<B, G, AR, AW> {
/// The name of the puffin file.
puffin_file_name: String,
/// The cache manager.
cache_manager: CacheManagerRef<B, G>,
/// The stager.
stager: StagerRef<B, G>,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
}
impl<B, D, AR, AW> CachedPuffinReader<B, D, AR, AW> {
#[allow(unused)]
impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
pub(crate) fn new(
puffin_file_name: String,
cache_manager: CacheManagerRef<B, D>,
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
Self {
puffin_file_name,
cache_manager,
stager,
puffin_file_accessor,
}
}
}
#[async_trait]
impl<B, D, AR, AW> PuffinReader for CachedPuffinReader<B, D, AR, AW>
impl<B, D, AR, AW> PuffinReader for FsPuffinReader<B, D, AR, AW>
where
B: BlobGuard,
D: DirGuard,
@@ -69,7 +68,7 @@ where
type Dir = D;
async fn blob(&self, key: &str) -> Result<Self::Blob> {
self.cache_manager
self.stager
.get_blob(
self.puffin_file_name.as_str(),
key,
@@ -84,7 +83,7 @@ where
}
async fn dir(&self, key: &str) -> Result<Self::Dir> {
self.cache_manager
self.stager
.get_dir(
self.puffin_file_name.as_str(),
key,
@@ -99,7 +98,7 @@ where
}
}
impl<B, G, AR, AW> CachedPuffinReader<B, G, AR, AW>
impl<B, G, AR, AW> FsPuffinReader<B, G, AR, AW>
where
B: BlobGuard,
G: DirGuard,

View File

@@ -28,18 +28,18 @@ use crate::error::{
DuplicateBlobSnafu, MetadataSnafu, OpenSnafu, Result, SerializeJsonSnafu,
UnsupportedCompressionSnafu, WalkDirSnafu,
};
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
use crate::puffin_manager::cache_manager::CacheManagerRef;
use crate::puffin_manager::cached_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions};
/// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct CachedPuffinWriter<B, D, W> {
/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct FsPuffinWriter<B, D, W> {
/// The name of the puffin file.
puffin_file_name: String,
/// The cache manager.
cache_manager: CacheManagerRef<B, D>,
/// The stager.
stager: StagerRef<B, D>,
/// The underlying `PuffinFileWriter`.
puffin_file_writer: PuffinFileWriter<W>,
@@ -48,16 +48,11 @@ pub struct CachedPuffinWriter<B, D, W> {
blob_keys: HashSet<String>,
}
impl<B, D, W> CachedPuffinWriter<B, D, W> {
#[allow(unused)]
pub(crate) fn new(
puffin_file_name: String,
cache_manager: CacheManagerRef<B, D>,
writer: W,
) -> Self {
impl<B, D, W> FsPuffinWriter<B, D, W> {
pub(crate) fn new(puffin_file_name: String, stager: StagerRef<B, D>, writer: W) -> Self {
Self {
puffin_file_name,
cache_manager,
stager,
puffin_file_writer: PuffinFileWriter::new(writer),
blob_keys: HashSet::new(),
}
@@ -65,7 +60,7 @@ impl<B, D, W> CachedPuffinWriter<B, D, W> {
}
#[async_trait]
impl<B, D, W> PuffinWriter for CachedPuffinWriter<B, D, W>
impl<B, D, W> PuffinWriter for FsPuffinWriter<B, D, W>
where
B: BlobGuard,
D: DirGuard,
@@ -79,32 +74,10 @@ where
!self.blob_keys.contains(key),
DuplicateBlobSnafu { blob: key }
);
ensure!(
!matches!(options.compression, Some(CompressionCodec::Lz4)),
UnsupportedCompressionSnafu { codec: "lz4" }
);
let written_bytes = match options.compression {
Some(CompressionCodec::Lz4) => unreachable!("checked above"),
Some(CompressionCodec::Zstd) => {
let blob = Blob {
blob_type: key.to_string(),
compressed_data: ZstdEncoder::new(BufReader::new(raw_data)),
compression_codec: options.compression,
properties: Default::default(),
};
self.puffin_file_writer.add_blob(blob).await?
}
None => {
let blob = Blob {
blob_type: key.to_string(),
compressed_data: raw_data,
compression_codec: options.compression,
properties: Default::default(),
};
self.puffin_file_writer.add_blob(blob).await?
}
};
let written_bytes = self
.handle_compress(key.to_string(), raw_data, options.compression)
.await?;
self.blob_keys.insert(key.to_string());
Ok(written_bytes)
@@ -115,10 +88,6 @@ where
!self.blob_keys.contains(key),
DuplicateBlobSnafu { blob: key }
);
ensure!(
!matches!(options.compression, Some(CompressionCodec::Lz4)),
UnsupportedCompressionSnafu { codec: "lz4" }
);
// Walk the directory and add all files to the puffin file.
let mut wd = async_walkdir::WalkDir::new(&dir_path).filter(|entry| async move {
@@ -142,37 +111,23 @@ where
.compat();
let file_key = Uuid::new_v4().to_string();
match options.compression {
Some(CompressionCodec::Lz4) => unreachable!("checked above"),
Some(CompressionCodec::Zstd) => {
let blob = Blob {
blob_type: file_key.clone(),
compressed_data: ZstdEncoder::new(BufReader::new(reader)),
compression_codec: options.compression,
properties: Default::default(),
};
written_bytes += self.puffin_file_writer.add_blob(blob).await?;
}
None => {
let blob = Blob {
blob_type: file_key.clone(),
compressed_data: reader,
compression_codec: options.compression,
properties: Default::default(),
};
written_bytes += self.puffin_file_writer.add_blob(blob).await?;
}
}
written_bytes += self
.handle_compress(file_key.clone(), reader, options.compression)
.await?;
let relative_path = entry
.path()
let path = entry.path();
let relative_path = path
.strip_prefix(&dir_path)
.expect("entry path is under dir path")
.to_string_lossy()
.into_owned();
.expect("entry path is under dir path");
let unified_rel_path = if cfg!(windows) {
relative_path.to_string_lossy().replace('\\', "/")
} else {
relative_path.to_string_lossy().to_string()
};
files.push(DirFileMetadata {
relative_path,
relative_path: unified_rel_path,
key: file_key.clone(),
blob_index: self.blob_keys.len(),
});
@@ -191,8 +146,8 @@ where
written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?;
self.blob_keys.insert(key.to_string());
// Move the directory into the cache.
self.cache_manager
// Move the directory into the stager.
self.stager
.put_dir(&self.puffin_file_name, key, dir_path, dir_size)
.await?;
Ok(written_bytes)
@@ -208,3 +163,40 @@ where
Ok(size)
}
}
impl<B, G, W> FsPuffinWriter<B, G, W>
where
B: BlobGuard,
G: DirGuard,
W: AsyncWrite + Unpin + Send,
{
/// Compresses the raw data and writes it to the puffin file.
async fn handle_compress(
&mut self,
key: String,
raw_data: impl AsyncRead + Send,
compression: Option<CompressionCodec>,
) -> Result<u64> {
match compression {
Some(CompressionCodec::Lz4) => UnsupportedCompressionSnafu { codec: "lz4" }.fail(),
Some(CompressionCodec::Zstd) => {
let blob = Blob {
blob_type: key,
compressed_data: ZstdEncoder::new(BufReader::new(raw_data)),
compression_codec: compression,
properties: Default::default(),
};
self.puffin_file_writer.add_blob(blob).await
}
None => {
let blob = Blob {
blob_type: key,
compressed_data: raw_data,
compression_codec: compression,
properties: Default::default(),
};
self.puffin_file_writer.add_blob(blob).await
}
}
}
}

View File

@@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod moka_cache_manager;
mod bounded_stager;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
pub use bounded_stager::BoundedStager;
use futures::future::BoxFuture;
use futures::AsyncWrite;
@@ -40,19 +41,19 @@ pub type DirWriterProviderRef = Box<dyn DirWriterProvider + Send>;
/// Function that initializes a blob.
///
/// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the cache.
/// `Stager` will provide a `BoxWriter` that the caller of `get_blob`
/// can use to write the blob into the staging area.
pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult;
/// Function that initializes a directory.
///
/// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the cache.
/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir`
/// can use to write files inside the directory into the staging area.
pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;
/// `CacheManager` manages the cache for the puffin files.
/// `Stager` manages the staging area for the puffin files.
#[async_trait]
pub trait CacheManager {
pub trait Stager {
type Blob: BlobGuard;
type Dir: DirGuard;
@@ -78,7 +79,7 @@ pub trait CacheManager {
init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
) -> Result<Self::Dir>;
/// Stores a directory in the cache.
/// Stores a directory in the staging area.
async fn put_dir(
&self,
puffin_file_name: &str,
@@ -88,4 +89,4 @@ pub trait CacheManager {
) -> Result<()>;
}
pub type CacheManagerRef<B, D> = Arc<dyn CacheManager<Blob = B, Dir = D> + Send + Sync>;
pub type StagerRef<B, D> = Arc<dyn Stager<Blob = B, Dir = D> + Send + Sync>;

View File

@@ -36,19 +36,16 @@ use crate::error::{
CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
Result, WalkDirSnafu,
};
use crate::puffin_manager::cache_manager::{
BoxWriter, CacheManager, DirWriterProvider, InitBlobFn, InitDirFn,
};
use crate::puffin_manager::stager::{BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager};
use crate::puffin_manager::{BlobGuard, DirGuard};
const DELETE_QUEUE_SIZE: usize = 10240;
const TMP_EXTENSION: &str = "tmp";
const DELETED_EXTENSION: &str = "deleted";
/// `MokaCacheManager` is a `CacheManager` that uses `moka` to manage cache.
pub struct MokaCacheManager {
/// The base directory of the cache.
/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
pub struct BoundedStager {
/// The base directory of the staging area.
base_dir: PathBuf,
/// The cache maintaining the cache key to the size of the file or directory.
@@ -69,16 +66,15 @@ pub struct MokaCacheManager {
delete_queue: Sender<DeleteTask>,
}
impl MokaCacheManager {
#[allow(unused)]
pub async fn new(base_dir: PathBuf, max_size: u64) -> Result<Self> {
impl BoundedStager {
pub async fn new(base_dir: PathBuf, capicity: u64) -> Result<Self> {
let recycle_bin = Cache::builder()
.time_to_live(Duration::from_secs(60))
.build();
let recycle_bin_cloned = recycle_bin.clone();
let cache = Cache::builder()
.max_capacity(max_size)
.max_capacity(capicity)
.weigher(|_: &String, v: &CacheValue| v.weight())
.async_eviction_listener(move |k, v, _| {
let recycle_bin = recycle_bin_cloned.clone();
@@ -92,21 +88,21 @@ impl MokaCacheManager {
let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
common_runtime::bg_runtime().spawn(Self::delete_routine(rx));
let manager = Self {
let stager = Self {
cache,
base_dir,
delete_queue,
recycle_bin,
};
manager.recover().await?;
stager.recover().await?;
Ok(manager)
Ok(stager)
}
}
#[async_trait]
impl CacheManager for MokaCacheManager {
impl Stager for BoundedStager {
type Blob = Arc<FsBlobGuard>;
type Dir = Arc<FsDirGuard>;
@@ -212,7 +208,7 @@ impl CacheManager for MokaCacheManager {
}
}
impl MokaCacheManager {
impl BoundedStager {
fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(puffin_file_name);
@@ -262,7 +258,7 @@ impl MokaCacheManager {
Ok(size)
}
/// Recovers the cache by iterating through the cache directory.
/// Recovers the staging area by iterating through the staging directory.
async fn recover(&self) -> Result<()> {
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
@@ -280,7 +276,7 @@ impl MokaCacheManager {
fs::remove_file(path).await.context(RemoveSnafu)?;
}
} else {
// Insert the size of the file or directory to the cache
// Insert the guard of the file or directory to the cache
let meta = entry.metadata().await.context(MetadataSnafu)?;
let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
@@ -289,7 +285,7 @@ impl MokaCacheManager {
Some(key) => key.to_string(),
None => {
warn!(
"Invalid cache file name: {}, expected format: <key>.<uuid>",
"Invalid staging file name: {}, expected format: <key>.<uuid>",
file_path
);
continue;
@@ -382,11 +378,11 @@ impl MokaCacheManager {
}
}
info!("The delete routine for moka cache manager is terminated.");
info!("The delete routine for the bounded stager is terminated.");
}
}
impl Drop for MokaCacheManager {
impl Drop for BoundedStager {
fn drop(&mut self) {
let _ = self.delete_queue.try_send(DeleteTask::Terminate);
}
@@ -486,7 +482,11 @@ struct MokaDirWriterProvider(PathBuf);
#[async_trait]
impl DirWriterProvider for MokaDirWriterProvider {
async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
let full_path = self.0.join(rel_path);
let full_path = if cfg!(windows) {
self.0.join(rel_path.replace('/', "\\"))
} else {
self.0.join(rel_path)
};
if let Some(parent) = full_path.parent() {
fs::create_dir_all(parent).await.context(CreateSnafu)?;
}
@@ -500,7 +500,7 @@ impl DirWriterProvider for MokaDirWriterProvider {
}
#[cfg(test)]
impl MokaCacheManager {
impl BoundedStager {
pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
let cache_key = Self::encode_cache_key(puffin_file_name, key);
let value = self.cache.get(&cache_key).await.unwrap();
@@ -535,18 +535,18 @@ mod tests {
use super::*;
use crate::error::BlobNotFoundSnafu;
use crate::puffin_manager::cache_manager::CacheManager;
use crate::puffin_manager::stager::Stager;
#[tokio::test]
async fn test_get_blob() {
let tempdir = create_temp_dir("test_get_blob_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let puffin_file_name = "test_get_blob";
let key = "key";
let mut reader = manager
let mut reader = stager
.get_blob(
puffin_file_name,
key,
@@ -567,7 +567,7 @@ mod tests {
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
let mut file = manager.must_get_file(puffin_file_name, key).await;
let mut file = stager.must_get_file(puffin_file_name, key).await;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
@@ -576,7 +576,7 @@ mod tests {
#[tokio::test]
async fn test_get_dir() {
let tempdir = create_temp_dir("test_get_dir_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
@@ -590,7 +590,7 @@ mod tests {
let puffin_file_name = "test_get_dir";
let key = "key";
let dir_path = manager
let dir_path = stager
.get_dir(
puffin_file_name,
key,
@@ -615,7 +615,7 @@ mod tests {
assert_eq!(buf, *content);
}
let dir_path = manager.must_get_dir(puffin_file_name, key).await;
let dir_path = stager.must_get_dir(puffin_file_name, key).await;
for (rel_path, content) in &files_in_dir {
let file_path = dir_path.join(rel_path);
let mut file = tokio::fs::File::open(&file_path).await.unwrap();
@@ -628,14 +628,14 @@ mod tests {
#[tokio::test]
async fn test_recover() {
let tempdir = create_temp_dir("test_recover_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
// initialize cache
// initialize stager
let puffin_file_name = "test_recover";
let blob_key = "blob_key";
let guard = manager
let guard = stager
.get_blob(
puffin_file_name,
blob_key,
@@ -659,7 +659,7 @@ mod tests {
];
let dir_key = "dir_key";
let guard = manager
let guard = stager
.get_dir(
puffin_file_name,
dir_key,
@@ -677,13 +677,13 @@ mod tests {
.unwrap();
drop(guard);
// recover cache
drop(manager);
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
// recover stager
drop(stager);
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let mut reader = manager
let mut reader = stager
.get_blob(
puffin_file_name,
blob_key,
@@ -698,7 +698,7 @@ mod tests {
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello world");
let dir_path = manager
let dir_path = stager
.get_dir(
puffin_file_name,
dir_key,
@@ -718,7 +718,7 @@ mod tests {
#[tokio::test]
async fn test_eviction() {
let tempdir = create_temp_dir("test_eviction_");
let manager = MokaCacheManager::new(
let stager = BoundedStager::new(
tempdir.path().to_path_buf(),
1, /* extremely small size */
)
@@ -729,7 +729,7 @@ mod tests {
let blob_key = "blob_key";
// First time to get the blob
let mut reader = manager
let mut reader = stager
.get_blob(
puffin_file_name,
blob_key,
@@ -747,15 +747,15 @@ mod tests {
.unwrap();
// The blob should be evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, blob_key));
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"Hello world");
// Second time to get the blob, get from recycle bin
let mut reader = manager
let mut reader = stager
.get_blob(
puffin_file_name,
blob_key,
@@ -768,8 +768,8 @@ mod tests {
.unwrap();
// The blob should be evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, blob_key));
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
@@ -785,7 +785,7 @@ mod tests {
];
// First time to get the directory
let guard_0 = manager
let guard_0 = stager
.get_dir(
puffin_file_name,
dir_key,
@@ -813,11 +813,11 @@ mod tests {
}
// The directory should be evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, dir_key));
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
// Second time to get the directory
let guard_1 = manager
let guard_1 = stager
.get_dir(
puffin_file_name,
dir_key,
@@ -835,13 +835,13 @@ mod tests {
}
// Still hold the guard
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, dir_key));
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, dir_key));
// Third time to get the directory and all guards are dropped
drop(guard_0);
drop(guard_1);
let guard_2 = manager
let guard_2 = stager
.get_dir(
puffin_file_name,
dir_key,
@@ -851,8 +851,8 @@ mod tests {
.unwrap();
// Still hold the guard, so the directory should not be removed even if it's evicted
manager.cache.run_pending_tasks().await;
assert!(!manager.in_cache(puffin_file_name, blob_key));
stager.cache.run_pending_tasks().await;
assert!(!stager.in_cache(puffin_file_name, blob_key));
for (rel_path, content) in &files_in_dir {
let file_path = guard_2.path().join(rel_path);
@@ -866,17 +866,17 @@ mod tests {
#[tokio::test]
async fn test_get_blob_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let puffin_file_name = "test_get_blob_concurrency_on_fail";
let key = "key";
let manager = Arc::new(manager);
let stager = Arc::new(stager);
let handles = (0..10)
.map(|_| {
let manager = manager.clone();
let stager = stager.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
@@ -885,7 +885,7 @@ mod tests {
}
.boxed()
});
manager.get_blob(puffin_file_name, key, failed_init).await
stager.get_blob(puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
@@ -897,23 +897,23 @@ mod tests {
assert!(r.is_err());
}
assert!(!manager.in_cache(puffin_file_name, key));
assert!(!stager.in_cache(puffin_file_name, key));
}
#[tokio::test]
async fn test_get_dir_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX)
.await
.unwrap();
let puffin_file_name = "test_get_dir_concurrency_on_fail";
let key = "key";
let manager = Arc::new(manager);
let stager = Arc::new(stager);
let handles = (0..10)
.map(|_| {
let manager = manager.clone();
let stager = stager.clone();
let task = async move {
let failed_init = Box::new(|_| {
async {
@@ -922,7 +922,7 @@ mod tests {
}
.boxed()
});
manager.get_dir(puffin_file_name, key, failed_init).await
stager.get_dir(puffin_file_name, key, failed_init).await
};
tokio::spawn(task)
@@ -934,6 +934,6 @@ mod tests {
assert!(r.is_err());
}
assert!(!manager.in_cache(puffin_file_name, key));
assert!(!stager.in_cache(puffin_file_name, key));
}
}

View File

@@ -0,0 +1,359 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use futures::AsyncReadExt as _;
use tokio::fs::File;
use tokio::io::AsyncReadExt as _;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use crate::blob_metadata::CompressionCodec;
use crate::error::Result;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::FsPuffinManager;
use crate::puffin_manager::stager::BoundedStager;
use crate::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
async fn new_bounded_stager(prefix: &str, capicity: u64) -> (TempDir, Arc<BoundedStager>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
staging_dir,
Arc::new(BoundedStager::new(path, capicity).await.unwrap()),
)
}
#[tokio::test]
async fn test_put_get_file() {
let capicities = [1, 16, u64::MAX];
let compression_codecs = [None, Some(CompressionCodec::Zstd)];
for capicity in capicities {
for compression_codec in compression_codecs {
let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capicity).await;
let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_file_"));
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let key = "blob_a";
let raw_data = "Hello, world!".as_bytes();
put_blob(key, raw_data, compression_codec, &mut writer).await;
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capicity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
}
}
}
#[tokio::test]
async fn test_put_get_files() {
let capicities = [1, 16, u64::MAX];
for capicity in capicities {
let compression_codecs = [None, Some(CompressionCodec::Zstd)];
for compression_codec in compression_codecs {
let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capicity).await;
let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_files_"));
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let blobs = [
("blob_a", "Hello, world!".as_bytes()),
("blob_b", "Hello, Rust!".as_bytes()),
("blob_c", "你好,世界!".as_bytes()),
]
.into_iter()
.collect::<HashMap<_, _>>();
for (key, raw_data) in &blobs {
put_blob(key, raw_data, compression_codec, &mut writer).await;
}
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
}
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capicity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
}
}
}
}
#[tokio::test]
async fn test_put_get_dir() {
let capicities = [1, 64, u64::MAX];
let compression_codecs = [None, Some(CompressionCodec::Zstd)];
for capicity in capicities {
for compression_codec in compression_codecs {
let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capicity).await;
let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_dir_"));
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let key = "dir_a";
let files_in_dir = vec![
("file_a", "Hello, world!".as_bytes()),
("file_b", "Hello, Rust!".as_bytes()),
("file_c", "你好,世界!".as_bytes()),
("subdir/file_d", "Hello, Puffin!".as_bytes()),
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
put_dir(key, &files_in_dir, compression_codec, &mut writer).await;
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capicity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await;
}
}
}
#[tokio::test]
async fn test_put_get_mix_file_dir() {
let capicities = [1, 64, u64::MAX];
let compression_codecs = [None, Some(CompressionCodec::Zstd)];
for capicity in capicities {
for compression_codec in compression_codecs {
let (_staging_dir, stager) =
new_bounded_stager("test_put_get_mix_file_dir_", capicity).await;
let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_mix_file_dir_"));
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone());
let puffin_file_name = "puffin_file";
let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap();
let blobs = [
("blob_a", "Hello, world!".as_bytes()),
("blob_b", "Hello, Rust!".as_bytes()),
("blob_c", "你好,世界!".as_bytes()),
]
.into_iter()
.collect::<HashMap<_, _>>();
let dir_key = "dir_a";
let files_in_dir = [
("file_a", "Hello, world!".as_bytes()),
("file_b", "Hello, Rust!".as_bytes()),
("file_c", "你好,世界!".as_bytes()),
("subdir/file_d", "Hello, Puffin!".as_bytes()),
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
for (key, raw_data) in &blobs {
put_blob(key, raw_data, compression_codec, &mut writer).await;
}
put_dir(dir_key, &files_in_dir, compression_codec, &mut writer).await;
writer.finish().await.unwrap();
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
// renew cache manager
let (_staging_dir, stager) =
new_bounded_stager("test_put_get_mix_file_dir_", capicity).await;
let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor);
let reader = puffin_manager.reader(puffin_file_name).await.unwrap();
for (key, raw_data) in &blobs {
check_blob(puffin_file_name, key, raw_data, &stager, &reader).await;
}
check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await;
}
}
}
async fn put_blob(
key: &str,
raw_data: &[u8],
compression_codec: Option<CompressionCodec>,
puffin_writer: &mut impl PuffinWriter,
) {
puffin_writer
.put_blob(
key,
raw_data,
PutOptions {
compression: compression_codec,
},
)
.await
.unwrap();
}
async fn check_blob<R>(
puffin_file_name: &str,
key: &str,
raw_data: &[u8],
stager: &BoundedStager,
puffin_reader: &R,
) where
R: PuffinReader,
{
let blob = puffin_reader.blob(key).await.unwrap();
let mut reader = blob.reader().await.unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, raw_data);
let mut cached_file = stager.must_get_file(puffin_file_name, key).await;
let mut buf = Vec::new();
cached_file.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, raw_data);
}
async fn put_dir(
key: &str,
files_in_dir: &[(&str, &[u8])],
compression_codec: Option<CompressionCodec>,
puffin_writer: &mut impl PuffinWriter,
) {
let dir = create_temp_dir("dir_in_puffin_");
for (file_key, raw_data) in files_in_dir.iter() {
let file_path = if cfg!(windows) {
dir.path().join(file_key.replace('/', "\\"))
} else {
dir.path().join(file_key)
};
std::fs::create_dir_all(file_path.parent().unwrap()).unwrap();
std::fs::write(&file_path, raw_data).unwrap();
}
puffin_writer
.put_dir(
key,
dir.path().to_path_buf(),
PutOptions {
compression: compression_codec,
},
)
.await
.unwrap();
}
async fn check_dir<R>(
puffin_file_name: &str,
key: &str,
files_in_dir: &[(&str, &[u8])],
stager: &BoundedStager,
puffin_reader: &R,
) where
R: PuffinReader,
{
let res_dir = puffin_reader.dir(key).await.unwrap();
for (file_name, raw_data) in files_in_dir {
let file_path = if cfg!(windows) {
res_dir.path().join(file_name.replace('/', "\\"))
} else {
res_dir.path().join(file_name)
};
let buf = std::fs::read(file_path).unwrap();
assert_eq!(buf, *raw_data);
}
let cached_dir = stager.must_get_dir(puffin_file_name, key).await;
for (file_name, raw_data) in files_in_dir {
let file_path = if cfg!(windows) {
cached_dir.as_path().join(file_name.replace('/', "\\"))
} else {
cached_dir.as_path().join(file_name)
};
let buf = std::fs::read(file_path).unwrap();
assert_eq!(buf, *raw_data);
}
}
pub struct MockFileAccessor {
tempdir: TempDir,
}
impl MockFileAccessor {
pub fn new(prefix: &str) -> Self {
let tempdir = create_temp_dir(prefix);
Self { tempdir }
}
}
#[async_trait]
impl PuffinFileAccessor for MockFileAccessor {
type Reader = Compat<File>;
type Writer = Compat<File>;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
let f = tokio::fs::File::open(self.tempdir.path().join(puffin_file_name))
.await
.unwrap();
Ok(f.compat())
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let p = self.tempdir.path().join(puffin_file_name);
if let Some(p) = p.parent() {
if !tokio::fs::try_exists(p).await.unwrap() {
tokio::fs::create_dir_all(p).await.unwrap();
}
}
let f = tokio::fs::File::create(p).await.unwrap();
Ok(f.compat())
}
}

View File

@@ -20,8 +20,8 @@ use futures::io::Cursor as AsyncCursor;
use futures::AsyncReadExt;
use tokio_util::compat::TokioAsyncReadCompatExt;
use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader, PuffinSyncReader};
use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter, PuffinSyncWriter};
use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter};
#[test]
fn test_read_empty_puffin_sync() {