diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index da06361568..a823de56c8 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -22,7 +22,7 @@ use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; use object_store::ObjectStore; -use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; +use puffin::file_format::reader::{AsyncReader, PuffinFileReader}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -194,7 +194,7 @@ mod tests { use futures::io::Cursor; use index::inverted_index::search::index_apply::MockIndexApplier; use object_store::services::Memory; - use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; + use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; use super::*; use crate::error::Error; diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index bdad03c5d3..548f1f9349 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -26,7 +26,7 @@ use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; use object_store::ObjectStore; -use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; +use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; use tokio::io::duplex; diff --git a/src/puffin/src/blob_metadata.rs b/src/puffin/src/blob_metadata.rs index 1cdadb592f..bb2475bfa3 100644 --- a/src/puffin/src/blob_metadata.rs +++ b/src/puffin/src/blob_metadata.rs @@ -56,7 +56,7 @@ pub struct BlobMetadata { pub length: i64, /// See [`CompressionCodec`]. If omitted, the data is assumed to be uncompressed. - #[builder(default, setter(strip_option))] + #[builder(default)] #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub compression_codec: Option, @@ -107,7 +107,7 @@ mod tests { .sequence_number(200) .offset(300) .length(400) - .compression_codec(CompressionCodec::Lz4) + .compression_codec(Some(CompressionCodec::Lz4)) .properties(properties) .build() .unwrap(); diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 8d51c8cb69..5b8ba3c147 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -22,25 +22,29 @@ use crate::error::Result; pub use crate::file_format::reader::file::PuffinFileReader; use crate::file_metadata::FileMetadata; -/// `PuffinSyncReader` defines a synchronous reader for puffin data. -pub trait PuffinSyncReader<'a> { +/// `SyncReader` defines a synchronous reader for puffin data. +pub trait SyncReader<'a> { type Reader: std::io::Read + std::io::Seek; - /// fetch the FileMetadata + /// Fetches the FileMetadata. fn metadata(&'a mut self) -> Result; - /// read particular blob data based on given metadata + /// Reads particular blob data based on given metadata. + /// + /// Data read from the reader is compressed leaving the caller to decompress the data. fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; } -/// `PuffinAsyncReader` defines an asynchronous reader for puffin data. +/// `AsyncReader` defines an asynchronous reader for puffin data. #[async_trait] -pub trait PuffinAsyncReader<'a> { +pub trait AsyncReader<'a> { type Reader: futures::AsyncRead + futures::AsyncSeek; - /// fetch the FileMetadata + /// Fetches the FileMetadata. async fn metadata(&'a mut self) -> Result; - /// read particular blob data based on given metadata + /// Reads particular blob data based on given metadata. + /// + /// Data read from the reader is compressed leaving the caller to decompress the data. fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; } diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index b6b2df32f6..f1435bd0e4 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -24,7 +24,7 @@ use crate::error::{ UnsupportedDecompressionSnafu, }; use crate::file_format::reader::footer::FooterParser; -use crate::file_format::reader::{PuffinAsyncReader, PuffinSyncReader}; +use crate::file_format::reader::{AsyncReader, SyncReader}; use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE}; use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; @@ -63,7 +63,7 @@ impl PuffinFileReader { } } -impl<'a, R: io::Read + io::Seek + 'a> PuffinSyncReader<'a> for PuffinFileReader { +impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader { type Reader = PartialReader<&'a mut R>; fn metadata(&mut self) -> Result { @@ -103,9 +103,7 @@ impl<'a, R: io::Read + io::Seek + 'a> PuffinSyncReader<'a> for PuffinFileReader< } #[async_trait] -impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> PuffinAsyncReader<'a> - for PuffinFileReader -{ +impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> AsyncReader<'a> for PuffinFileReader { type Reader = PartialReader<&'a mut R>; async fn metadata(&'a mut self) -> Result { @@ -132,15 +130,6 @@ impl<'a, R: AsyncRead + AsyncSeek + Unpin + Send + 'a> PuffinAsyncReader<'a> } fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { - // TODO(zhongzc): support decompression - let compression = blob_metadata.compression_codec.as_ref(); - ensure!( - compression.is_none(), - UnsupportedDecompressionSnafu { - decompression: compression.unwrap().to_string() - } - ); - Ok(PartialReader::new( &mut self.source, blob_metadata.offset as _, diff --git a/src/puffin/src/file_format/writer.rs b/src/puffin/src/file_format/writer.rs index bfe717ae49..def8eb3e6c 100644 --- a/src/puffin/src/file_format/writer.rs +++ b/src/puffin/src/file_format/writer.rs @@ -40,8 +40,8 @@ pub struct Blob { pub properties: HashMap, } -/// The trait for writing Puffin files synchronously -pub trait PuffinSyncWriter { +/// `SyncWriter` defines a synchronous writer for puffin data. +pub trait SyncWriter { /// Set the properties of the Puffin file fn set_properties(&mut self, properties: HashMap); @@ -55,9 +55,9 @@ pub trait PuffinSyncWriter { fn finish(&mut self) -> Result; } -/// The trait for writing Puffin files asynchronously +/// `AsyncWriter` defines an asynchronous writer for puffin data. #[async_trait] -pub trait PuffinAsyncWriter { +pub trait AsyncWriter { /// Set the properties of the Puffin file fn set_properties(&mut self, properties: HashMap); diff --git a/src/puffin/src/file_format/writer/file.rs b/src/puffin/src/file_format/writer/file.rs index 6237453dc6..22e5cc0bdd 100644 --- a/src/puffin/src/file_format/writer/file.rs +++ b/src/puffin/src/file_format/writer/file.rs @@ -19,10 +19,10 @@ use async_trait::async_trait; use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; use snafu::ResultExt; -use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder}; +use crate::blob_metadata::{BlobMetadata, BlobMetadataBuilder, CompressionCodec}; use crate::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu}; use crate::file_format::writer::footer::FooterWriter; -use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinSyncWriter}; +use crate::file_format::writer::{AsyncWriter, Blob, SyncWriter}; use crate::file_format::MAGIC; /// Puffin file writer, implements both [`PuffinSyncWriter`] and [`PuffinAsyncWriter`] @@ -57,12 +57,14 @@ impl PuffinFileWriter { fn create_blob_metadata( &self, typ: String, + compression_codec: Option, properties: HashMap, size: u64, ) -> BlobMetadata { BlobMetadataBuilder::default() .blob_type(typ) .properties(properties) + .compression_codec(compression_codec) .offset(self.written_bytes as _) .length(size as _) .build() @@ -70,7 +72,7 @@ impl PuffinFileWriter { } } -impl PuffinSyncWriter for PuffinFileWriter { +impl SyncWriter for PuffinFileWriter { fn set_properties(&mut self, properties: HashMap) { self.properties = properties; } @@ -80,7 +82,12 @@ impl PuffinSyncWriter for PuffinFileWriter { let size = io::copy(&mut blob.compressed_data, &mut self.writer).context(WriteSnafu)?; - let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size); + let blob_metadata = self.create_blob_metadata( + blob.blob_type, + blob.compression_codec, + blob.properties, + size, + ); self.blob_metadata.push(blob_metadata); self.written_bytes += size; @@ -101,7 +108,7 @@ impl PuffinSyncWriter for PuffinFileWriter { } #[async_trait] -impl PuffinAsyncWriter for PuffinFileWriter { +impl AsyncWriter for PuffinFileWriter { fn set_properties(&mut self, properties: HashMap) { self.properties = properties; } @@ -113,7 +120,12 @@ impl PuffinAsyncWriter for PuffinFileWriter { .await .context(WriteSnafu)?; - let blob_metadata = self.create_blob_metadata(blob.blob_type, blob.properties, size); + let blob_metadata = self.create_blob_metadata( + blob.blob_type, + blob.compression_codec, + blob.properties, + size, + ); self.blob_metadata.push(blob_metadata); self.written_bytes += size; diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 218f3d3c47..7b8df56f1c 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod cache_manager; -pub mod cached_puffin_manager; pub mod file_accessor; +pub mod fs_puffin_manager; +pub mod stager; + +#[cfg(test)] +mod tests; use std::path::PathBuf; @@ -89,7 +92,7 @@ pub trait PuffinReader { /// `BlobGuard` is provided by the `PuffinReader` to access the blob data. /// Users should hold the `BlobGuard` until they are done with the blob data. pub trait BlobGuard { - type Reader: AsyncRead + AsyncSeek; + type Reader: AsyncRead + AsyncSeek + Unpin; fn reader(&self) -> BoxFuture<'static, Result>; } diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager.rs b/src/puffin/src/puffin_manager/cached_puffin_manager.rs deleted file mode 100644 index a9edb01169..0000000000 --- a/src/puffin/src/puffin_manager/cached_puffin_manager.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod dir_meta; -mod reader; -mod writer; - -pub use reader::CachedPuffinReader; -pub use writer::CachedPuffinWriter; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs new file mode 100644 index 0000000000..99c23459a8 --- /dev/null +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod dir_meta; +mod reader; +mod writer; + +use async_trait::async_trait; +use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +pub use reader::FsPuffinReader; +pub use writer::FsPuffinWriter; + +use crate::error::Result; +use crate::puffin_manager::file_accessor::PuffinFileAccessorRef; +use crate::puffin_manager::stager::StagerRef; +use crate::puffin_manager::{BlobGuard, DirGuard, PuffinManager}; + +/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem. +pub struct FsPuffinManager { + /// The stager. + stager: StagerRef, + + /// The puffin file accessor. + puffin_file_accessor: PuffinFileAccessorRef, +} + +impl FsPuffinManager { + /// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`. + pub fn new( + stager: StagerRef, + puffin_file_accessor: PuffinFileAccessorRef, + ) -> Self { + Self { + stager, + puffin_file_accessor, + } + } +} + +#[async_trait] +impl PuffinManager for FsPuffinManager +where + B: BlobGuard, + D: DirGuard, + AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, + AW: AsyncWrite + Send + Unpin + 'static, +{ + type Reader = FsPuffinReader; + type Writer = FsPuffinWriter; + + async fn reader(&self, puffin_file_name: &str) -> Result { + Ok(FsPuffinReader::new( + puffin_file_name.to_string(), + self.stager.clone(), + self.puffin_file_accessor.clone(), + )) + } + + async fn writer(&self, puffin_file_name: &str) -> Result { + let writer = self.puffin_file_accessor.writer(puffin_file_name).await?; + Ok(FsPuffinWriter::new( + puffin_file_name.to_string(), + self.stager.clone(), + writer, + )) + } +} diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/dir_meta.rs similarity index 100% rename from src/puffin/src/puffin_manager/cached_puffin_manager/dir_meta.rs rename to src/puffin/src/puffin_manager/fs_puffin_manager/dir_meta.rs diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs similarity index 89% rename from src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs rename to src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 0aad8f2576..2ce79857a9 100644 --- a/src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -24,41 +24,40 @@ use crate::error::{ BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, }; -use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; -use crate::puffin_manager::cache_manager::{BoxWriter, CacheManagerRef, DirWriterProviderRef}; -use crate::puffin_manager::cached_puffin_manager::dir_meta::DirMetadata; +use crate::file_format::reader::{AsyncReader, PuffinFileReader}; use crate::puffin_manager::file_accessor::PuffinFileAccessorRef; +use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; +use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, StagerRef}; use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader}; -/// `CachedPuffinReader` is a `PuffinReader` that provides cached readers for puffin files. -pub struct CachedPuffinReader { +/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. +pub struct FsPuffinReader { /// The name of the puffin file. puffin_file_name: String, - /// The cache manager. - cache_manager: CacheManagerRef, + /// The stager. + stager: StagerRef, /// The puffin file accessor. puffin_file_accessor: PuffinFileAccessorRef, } -impl CachedPuffinReader { - #[allow(unused)] +impl FsPuffinReader { pub(crate) fn new( puffin_file_name: String, - cache_manager: CacheManagerRef, + stager: StagerRef, puffin_file_accessor: PuffinFileAccessorRef, ) -> Self { Self { puffin_file_name, - cache_manager, + stager, puffin_file_accessor, } } } #[async_trait] -impl PuffinReader for CachedPuffinReader +impl PuffinReader for FsPuffinReader where B: BlobGuard, D: DirGuard, @@ -69,7 +68,7 @@ where type Dir = D; async fn blob(&self, key: &str) -> Result { - self.cache_manager + self.stager .get_blob( self.puffin_file_name.as_str(), key, @@ -84,7 +83,7 @@ where } async fn dir(&self, key: &str) -> Result { - self.cache_manager + self.stager .get_dir( self.puffin_file_name.as_str(), key, @@ -99,7 +98,7 @@ where } } -impl CachedPuffinReader +impl FsPuffinReader where B: BlobGuard, G: DirGuard, diff --git a/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs similarity index 62% rename from src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs rename to src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs index 2b26e4a355..10338aad74 100644 --- a/src/puffin/src/puffin_manager/cached_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs @@ -28,18 +28,18 @@ use crate::error::{ DuplicateBlobSnafu, MetadataSnafu, OpenSnafu, Result, SerializeJsonSnafu, UnsupportedCompressionSnafu, WalkDirSnafu, }; -use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; -use crate::puffin_manager::cache_manager::CacheManagerRef; -use crate::puffin_manager::cached_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata}; +use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; +use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata}; +use crate::puffin_manager::stager::StagerRef; use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions}; -/// `CachedPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file. -pub struct CachedPuffinWriter { +/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file. +pub struct FsPuffinWriter { /// The name of the puffin file. puffin_file_name: String, - /// The cache manager. - cache_manager: CacheManagerRef, + /// The stager. + stager: StagerRef, /// The underlying `PuffinFileWriter`. puffin_file_writer: PuffinFileWriter, @@ -48,16 +48,11 @@ pub struct CachedPuffinWriter { blob_keys: HashSet, } -impl CachedPuffinWriter { - #[allow(unused)] - pub(crate) fn new( - puffin_file_name: String, - cache_manager: CacheManagerRef, - writer: W, - ) -> Self { +impl FsPuffinWriter { + pub(crate) fn new(puffin_file_name: String, stager: StagerRef, writer: W) -> Self { Self { puffin_file_name, - cache_manager, + stager, puffin_file_writer: PuffinFileWriter::new(writer), blob_keys: HashSet::new(), } @@ -65,7 +60,7 @@ impl CachedPuffinWriter { } #[async_trait] -impl PuffinWriter for CachedPuffinWriter +impl PuffinWriter for FsPuffinWriter where B: BlobGuard, D: DirGuard, @@ -79,32 +74,10 @@ where !self.blob_keys.contains(key), DuplicateBlobSnafu { blob: key } ); - ensure!( - !matches!(options.compression, Some(CompressionCodec::Lz4)), - UnsupportedCompressionSnafu { codec: "lz4" } - ); - let written_bytes = match options.compression { - Some(CompressionCodec::Lz4) => unreachable!("checked above"), - Some(CompressionCodec::Zstd) => { - let blob = Blob { - blob_type: key.to_string(), - compressed_data: ZstdEncoder::new(BufReader::new(raw_data)), - compression_codec: options.compression, - properties: Default::default(), - }; - self.puffin_file_writer.add_blob(blob).await? - } - None => { - let blob = Blob { - blob_type: key.to_string(), - compressed_data: raw_data, - compression_codec: options.compression, - properties: Default::default(), - }; - self.puffin_file_writer.add_blob(blob).await? - } - }; + let written_bytes = self + .handle_compress(key.to_string(), raw_data, options.compression) + .await?; self.blob_keys.insert(key.to_string()); Ok(written_bytes) @@ -115,10 +88,6 @@ where !self.blob_keys.contains(key), DuplicateBlobSnafu { blob: key } ); - ensure!( - !matches!(options.compression, Some(CompressionCodec::Lz4)), - UnsupportedCompressionSnafu { codec: "lz4" } - ); // Walk the directory and add all files to the puffin file. let mut wd = async_walkdir::WalkDir::new(&dir_path).filter(|entry| async move { @@ -142,37 +111,23 @@ where .compat(); let file_key = Uuid::new_v4().to_string(); - match options.compression { - Some(CompressionCodec::Lz4) => unreachable!("checked above"), - Some(CompressionCodec::Zstd) => { - let blob = Blob { - blob_type: file_key.clone(), - compressed_data: ZstdEncoder::new(BufReader::new(reader)), - compression_codec: options.compression, - properties: Default::default(), - }; - written_bytes += self.puffin_file_writer.add_blob(blob).await?; - } - None => { - let blob = Blob { - blob_type: file_key.clone(), - compressed_data: reader, - compression_codec: options.compression, - properties: Default::default(), - }; - written_bytes += self.puffin_file_writer.add_blob(blob).await?; - } - } + written_bytes += self + .handle_compress(file_key.clone(), reader, options.compression) + .await?; - let relative_path = entry - .path() + let path = entry.path(); + let relative_path = path .strip_prefix(&dir_path) - .expect("entry path is under dir path") - .to_string_lossy() - .into_owned(); + .expect("entry path is under dir path"); + + let unified_rel_path = if cfg!(windows) { + relative_path.to_string_lossy().replace('\\', "/") + } else { + relative_path.to_string_lossy().to_string() + }; files.push(DirFileMetadata { - relative_path, + relative_path: unified_rel_path, key: file_key.clone(), blob_index: self.blob_keys.len(), }); @@ -191,8 +146,8 @@ where written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?; self.blob_keys.insert(key.to_string()); - // Move the directory into the cache. - self.cache_manager + // Move the directory into the stager. + self.stager .put_dir(&self.puffin_file_name, key, dir_path, dir_size) .await?; Ok(written_bytes) @@ -208,3 +163,40 @@ where Ok(size) } } + +impl FsPuffinWriter +where + B: BlobGuard, + G: DirGuard, + W: AsyncWrite + Unpin + Send, +{ + /// Compresses the raw data and writes it to the puffin file. + async fn handle_compress( + &mut self, + key: String, + raw_data: impl AsyncRead + Send, + compression: Option, + ) -> Result { + match compression { + Some(CompressionCodec::Lz4) => UnsupportedCompressionSnafu { codec: "lz4" }.fail(), + Some(CompressionCodec::Zstd) => { + let blob = Blob { + blob_type: key, + compressed_data: ZstdEncoder::new(BufReader::new(raw_data)), + compression_codec: compression, + properties: Default::default(), + }; + self.puffin_file_writer.add_blob(blob).await + } + None => { + let blob = Blob { + blob_type: key, + compressed_data: raw_data, + compression_codec: compression, + properties: Default::default(), + }; + self.puffin_file_writer.add_blob(blob).await + } + } + } +} diff --git a/src/puffin/src/puffin_manager/cache_manager.rs b/src/puffin/src/puffin_manager/stager.rs similarity index 83% rename from src/puffin/src/puffin_manager/cache_manager.rs rename to src/puffin/src/puffin_manager/stager.rs index 1f471236c0..c390e9910a 100644 --- a/src/puffin/src/puffin_manager/cache_manager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod moka_cache_manager; +mod bounded_stager; use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; +pub use bounded_stager::BoundedStager; use futures::future::BoxFuture; use futures::AsyncWrite; @@ -40,19 +41,19 @@ pub type DirWriterProviderRef = Box; /// Function that initializes a blob. /// -/// `CacheManager` will provide a `BoxWriter` that the caller of `get_blob` -/// can use to write the blob into the cache. +/// `Stager` will provide a `BoxWriter` that the caller of `get_blob` +/// can use to write the blob into the staging area. pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult; /// Function that initializes a directory. /// -/// `CacheManager` will provide a `DirWriterProvider` that the caller of `get_dir` -/// can use to write files inside the directory into the cache. +/// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir` +/// can use to write files inside the directory into the staging area. pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult; -/// `CacheManager` manages the cache for the puffin files. +/// `Stager` manages the staging area for the puffin files. #[async_trait] -pub trait CacheManager { +pub trait Stager { type Blob: BlobGuard; type Dir: DirGuard; @@ -78,7 +79,7 @@ pub trait CacheManager { init_fn: Box, ) -> Result; - /// Stores a directory in the cache. + /// Stores a directory in the staging area. async fn put_dir( &self, puffin_file_name: &str, @@ -88,4 +89,4 @@ pub trait CacheManager { ) -> Result<()>; } -pub type CacheManagerRef = Arc + Send + Sync>; +pub type StagerRef = Arc + Send + Sync>; diff --git a/src/puffin/src/puffin_manager/cache_manager/moka_cache_manager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs similarity index 89% rename from src/puffin/src/puffin_manager/cache_manager/moka_cache_manager.rs rename to src/puffin/src/puffin_manager/stager/bounded_stager.rs index b2d4ed3baf..e0179a50c0 100644 --- a/src/puffin/src/puffin_manager/cache_manager/moka_cache_manager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -36,19 +36,16 @@ use crate::error::{ CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu, Result, WalkDirSnafu, }; -use crate::puffin_manager::cache_manager::{ - BoxWriter, CacheManager, DirWriterProvider, InitBlobFn, InitDirFn, -}; +use crate::puffin_manager::stager::{BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager}; use crate::puffin_manager::{BlobGuard, DirGuard}; const DELETE_QUEUE_SIZE: usize = 10240; - const TMP_EXTENSION: &str = "tmp"; const DELETED_EXTENSION: &str = "deleted"; -/// `MokaCacheManager` is a `CacheManager` that uses `moka` to manage cache. -pub struct MokaCacheManager { - /// The base directory of the cache. +/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area. +pub struct BoundedStager { + /// The base directory of the staging area. base_dir: PathBuf, /// The cache maintaining the cache key to the size of the file or directory. @@ -69,16 +66,15 @@ pub struct MokaCacheManager { delete_queue: Sender, } -impl MokaCacheManager { - #[allow(unused)] - pub async fn new(base_dir: PathBuf, max_size: u64) -> Result { +impl BoundedStager { + pub async fn new(base_dir: PathBuf, capicity: u64) -> Result { let recycle_bin = Cache::builder() .time_to_live(Duration::from_secs(60)) .build(); let recycle_bin_cloned = recycle_bin.clone(); let cache = Cache::builder() - .max_capacity(max_size) + .max_capacity(capicity) .weigher(|_: &String, v: &CacheValue| v.weight()) .async_eviction_listener(move |k, v, _| { let recycle_bin = recycle_bin_cloned.clone(); @@ -92,21 +88,21 @@ impl MokaCacheManager { let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE); common_runtime::bg_runtime().spawn(Self::delete_routine(rx)); - let manager = Self { + let stager = Self { cache, base_dir, delete_queue, recycle_bin, }; - manager.recover().await?; + stager.recover().await?; - Ok(manager) + Ok(stager) } } #[async_trait] -impl CacheManager for MokaCacheManager { +impl Stager for BoundedStager { type Blob = Arc; type Dir = Arc; @@ -212,7 +208,7 @@ impl CacheManager for MokaCacheManager { } } -impl MokaCacheManager { +impl BoundedStager { fn encode_cache_key(puffin_file_name: &str, key: &str) -> String { let mut hasher = Sha256::new(); hasher.update(puffin_file_name); @@ -262,7 +258,7 @@ impl MokaCacheManager { Ok(size) } - /// Recovers the cache by iterating through the cache directory. + /// Recovers the staging area by iterating through the staging directory. async fn recover(&self) -> Result<()> { let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?; @@ -280,7 +276,7 @@ impl MokaCacheManager { fs::remove_file(path).await.context(RemoveSnafu)?; } } else { - // Insert the size of the file or directory to the cache + // Insert the guard of the file or directory to the cache let meta = entry.metadata().await.context(MetadataSnafu)?; let file_path = path.file_name().unwrap().to_string_lossy().into_owned(); @@ -289,7 +285,7 @@ impl MokaCacheManager { Some(key) => key.to_string(), None => { warn!( - "Invalid cache file name: {}, expected format: .", + "Invalid staging file name: {}, expected format: .", file_path ); continue; @@ -382,11 +378,11 @@ impl MokaCacheManager { } } - info!("The delete routine for moka cache manager is terminated."); + info!("The delete routine for the bounded stager is terminated."); } } -impl Drop for MokaCacheManager { +impl Drop for BoundedStager { fn drop(&mut self) { let _ = self.delete_queue.try_send(DeleteTask::Terminate); } @@ -486,7 +482,11 @@ struct MokaDirWriterProvider(PathBuf); #[async_trait] impl DirWriterProvider for MokaDirWriterProvider { async fn writer(&self, rel_path: &str) -> Result { - let full_path = self.0.join(rel_path); + let full_path = if cfg!(windows) { + self.0.join(rel_path.replace('/', "\\")) + } else { + self.0.join(rel_path) + }; if let Some(parent) = full_path.parent() { fs::create_dir_all(parent).await.context(CreateSnafu)?; } @@ -500,7 +500,7 @@ impl DirWriterProvider for MokaDirWriterProvider { } #[cfg(test)] -impl MokaCacheManager { +impl BoundedStager { pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File { let cache_key = Self::encode_cache_key(puffin_file_name, key); let value = self.cache.get(&cache_key).await.unwrap(); @@ -535,18 +535,18 @@ mod tests { use super::*; use crate::error::BlobNotFoundSnafu; - use crate::puffin_manager::cache_manager::CacheManager; + use crate::puffin_manager::stager::Stager; #[tokio::test] async fn test_get_blob() { let tempdir = create_temp_dir("test_get_blob_"); - let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) .await .unwrap(); let puffin_file_name = "test_get_blob"; let key = "key"; - let mut reader = manager + let mut reader = stager .get_blob( puffin_file_name, key, @@ -567,7 +567,7 @@ mod tests { reader.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); - let mut file = manager.must_get_file(puffin_file_name, key).await; + let mut file = stager.must_get_file(puffin_file_name, key).await; let mut buf = Vec::new(); file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); @@ -576,7 +576,7 @@ mod tests { #[tokio::test] async fn test_get_dir() { let tempdir = create_temp_dir("test_get_dir_"); - let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) .await .unwrap(); @@ -590,7 +590,7 @@ mod tests { let puffin_file_name = "test_get_dir"; let key = "key"; - let dir_path = manager + let dir_path = stager .get_dir( puffin_file_name, key, @@ -615,7 +615,7 @@ mod tests { assert_eq!(buf, *content); } - let dir_path = manager.must_get_dir(puffin_file_name, key).await; + let dir_path = stager.must_get_dir(puffin_file_name, key).await; for (rel_path, content) in &files_in_dir { let file_path = dir_path.join(rel_path); let mut file = tokio::fs::File::open(&file_path).await.unwrap(); @@ -628,14 +628,14 @@ mod tests { #[tokio::test] async fn test_recover() { let tempdir = create_temp_dir("test_recover_"); - let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) .await .unwrap(); - // initialize cache + // initialize stager let puffin_file_name = "test_recover"; let blob_key = "blob_key"; - let guard = manager + let guard = stager .get_blob( puffin_file_name, blob_key, @@ -659,7 +659,7 @@ mod tests { ]; let dir_key = "dir_key"; - let guard = manager + let guard = stager .get_dir( puffin_file_name, dir_key, @@ -677,13 +677,13 @@ mod tests { .unwrap(); drop(guard); - // recover cache - drop(manager); - let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + // recover stager + drop(stager); + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) .await .unwrap(); - let mut reader = manager + let mut reader = stager .get_blob( puffin_file_name, blob_key, @@ -698,7 +698,7 @@ mod tests { reader.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"hello world"); - let dir_path = manager + let dir_path = stager .get_dir( puffin_file_name, dir_key, @@ -718,7 +718,7 @@ mod tests { #[tokio::test] async fn test_eviction() { let tempdir = create_temp_dir("test_eviction_"); - let manager = MokaCacheManager::new( + let stager = BoundedStager::new( tempdir.path().to_path_buf(), 1, /* extremely small size */ ) @@ -729,7 +729,7 @@ mod tests { let blob_key = "blob_key"; // First time to get the blob - let mut reader = manager + let mut reader = stager .get_blob( puffin_file_name, blob_key, @@ -747,15 +747,15 @@ mod tests { .unwrap(); // The blob should be evicted - manager.cache.run_pending_tasks().await; - assert!(!manager.in_cache(puffin_file_name, blob_key)); + stager.cache.run_pending_tasks().await; + assert!(!stager.in_cache(puffin_file_name, blob_key)); let mut buf = Vec::new(); reader.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, b"Hello world"); // Second time to get the blob, get from recycle bin - let mut reader = manager + let mut reader = stager .get_blob( puffin_file_name, blob_key, @@ -768,8 +768,8 @@ mod tests { .unwrap(); // The blob should be evicted - manager.cache.run_pending_tasks().await; - assert!(!manager.in_cache(puffin_file_name, blob_key)); + stager.cache.run_pending_tasks().await; + assert!(!stager.in_cache(puffin_file_name, blob_key)); let mut buf = Vec::new(); reader.read_to_end(&mut buf).await.unwrap(); @@ -785,7 +785,7 @@ mod tests { ]; // First time to get the directory - let guard_0 = manager + let guard_0 = stager .get_dir( puffin_file_name, dir_key, @@ -813,11 +813,11 @@ mod tests { } // The directory should be evicted - manager.cache.run_pending_tasks().await; - assert!(!manager.in_cache(puffin_file_name, dir_key)); + stager.cache.run_pending_tasks().await; + assert!(!stager.in_cache(puffin_file_name, dir_key)); // Second time to get the directory - let guard_1 = manager + let guard_1 = stager .get_dir( puffin_file_name, dir_key, @@ -835,13 +835,13 @@ mod tests { } // Still hold the guard - manager.cache.run_pending_tasks().await; - assert!(!manager.in_cache(puffin_file_name, dir_key)); + stager.cache.run_pending_tasks().await; + assert!(!stager.in_cache(puffin_file_name, dir_key)); // Third time to get the directory and all guards are dropped drop(guard_0); drop(guard_1); - let guard_2 = manager + let guard_2 = stager .get_dir( puffin_file_name, dir_key, @@ -851,8 +851,8 @@ mod tests { .unwrap(); // Still hold the guard, so the directory should not be removed even if it's evicted - manager.cache.run_pending_tasks().await; - assert!(!manager.in_cache(puffin_file_name, blob_key)); + stager.cache.run_pending_tasks().await; + assert!(!stager.in_cache(puffin_file_name, blob_key)); for (rel_path, content) in &files_in_dir { let file_path = guard_2.path().join(rel_path); @@ -866,17 +866,17 @@ mod tests { #[tokio::test] async fn test_get_blob_concurrency_on_fail() { let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_"); - let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) .await .unwrap(); let puffin_file_name = "test_get_blob_concurrency_on_fail"; let key = "key"; - let manager = Arc::new(manager); + let stager = Arc::new(stager); let handles = (0..10) .map(|_| { - let manager = manager.clone(); + let stager = stager.clone(); let task = async move { let failed_init = Box::new(|_| { async { @@ -885,7 +885,7 @@ mod tests { } .boxed() }); - manager.get_blob(puffin_file_name, key, failed_init).await + stager.get_blob(puffin_file_name, key, failed_init).await }; tokio::spawn(task) @@ -897,23 +897,23 @@ mod tests { assert!(r.is_err()); } - assert!(!manager.in_cache(puffin_file_name, key)); + assert!(!stager.in_cache(puffin_file_name, key)); } #[tokio::test] async fn test_get_dir_concurrency_on_fail() { let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_"); - let manager = MokaCacheManager::new(tempdir.path().to_path_buf(), u64::MAX) + let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX) .await .unwrap(); let puffin_file_name = "test_get_dir_concurrency_on_fail"; let key = "key"; - let manager = Arc::new(manager); + let stager = Arc::new(stager); let handles = (0..10) .map(|_| { - let manager = manager.clone(); + let stager = stager.clone(); let task = async move { let failed_init = Box::new(|_| { async { @@ -922,7 +922,7 @@ mod tests { } .boxed() }); - manager.get_dir(puffin_file_name, key, failed_init).await + stager.get_dir(puffin_file_name, key, failed_init).await }; tokio::spawn(task) @@ -934,6 +934,6 @@ mod tests { assert!(r.is_err()); } - assert!(!manager.in_cache(puffin_file_name, key)); + assert!(!stager.in_cache(puffin_file_name, key)); } } diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs new file mode 100644 index 0000000000..0efa826b27 --- /dev/null +++ b/src/puffin/src/puffin_manager/tests.rs @@ -0,0 +1,359 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use futures::AsyncReadExt as _; +use tokio::fs::File; +use tokio::io::AsyncReadExt as _; +use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; + +use crate::blob_metadata::CompressionCodec; +use crate::error::Result; +use crate::puffin_manager::file_accessor::PuffinFileAccessor; +use crate::puffin_manager::fs_puffin_manager::FsPuffinManager; +use crate::puffin_manager::stager::BoundedStager; +use crate::puffin_manager::{ + BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions, +}; + +async fn new_bounded_stager(prefix: &str, capicity: u64) -> (TempDir, Arc) { + let staging_dir = create_temp_dir(prefix); + let path = staging_dir.path().to_path_buf(); + ( + staging_dir, + Arc::new(BoundedStager::new(path, capicity).await.unwrap()), + ) +} + +#[tokio::test] +async fn test_put_get_file() { + let capicities = [1, 16, u64::MAX]; + let compression_codecs = [None, Some(CompressionCodec::Zstd)]; + + for capicity in capicities { + for compression_codec in compression_codecs { + let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capicity).await; + let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_file_")); + + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); + + let puffin_file_name = "puffin_file"; + let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + + let key = "blob_a"; + let raw_data = "Hello, world!".as_bytes(); + put_blob(key, raw_data, compression_codec, &mut writer).await; + + writer.finish().await.unwrap(); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + + // renew cache manager + let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capicity).await; + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + } + } +} + +#[tokio::test] +async fn test_put_get_files() { + let capicities = [1, 16, u64::MAX]; + + for capicity in capicities { + let compression_codecs = [None, Some(CompressionCodec::Zstd)]; + + for compression_codec in compression_codecs { + let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capicity).await; + let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_files_")); + + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); + + let puffin_file_name = "puffin_file"; + let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + + let blobs = [ + ("blob_a", "Hello, world!".as_bytes()), + ("blob_b", "Hello, Rust!".as_bytes()), + ("blob_c", "你好,世界!".as_bytes()), + ] + .into_iter() + .collect::>(); + + for (key, raw_data) in &blobs { + put_blob(key, raw_data, compression_codec, &mut writer).await; + } + + writer.finish().await.unwrap(); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + for (key, raw_data) in &blobs { + check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + } + + // renew cache manager + let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capicity).await; + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + for (key, raw_data) in &blobs { + check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + } + } + } +} + +#[tokio::test] +async fn test_put_get_dir() { + let capicities = [1, 64, u64::MAX]; + + let compression_codecs = [None, Some(CompressionCodec::Zstd)]; + + for capicity in capicities { + for compression_codec in compression_codecs { + let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capicity).await; + let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_dir_")); + + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); + + let puffin_file_name = "puffin_file"; + let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + + let key = "dir_a"; + + let files_in_dir = vec![ + ("file_a", "Hello, world!".as_bytes()), + ("file_b", "Hello, Rust!".as_bytes()), + ("file_c", "你好,世界!".as_bytes()), + ("subdir/file_d", "Hello, Puffin!".as_bytes()), + ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), + ]; + + put_dir(key, &files_in_dir, compression_codec, &mut writer).await; + + writer.finish().await.unwrap(); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await; + + // renew cache manager + let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capicity).await; + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await; + } + } +} + +#[tokio::test] +async fn test_put_get_mix_file_dir() { + let capicities = [1, 64, u64::MAX]; + let compression_codecs = [None, Some(CompressionCodec::Zstd)]; + + for capicity in capicities { + for compression_codec in compression_codecs { + let (_staging_dir, stager) = + new_bounded_stager("test_put_get_mix_file_dir_", capicity).await; + let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_mix_file_dir_")); + + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); + + let puffin_file_name = "puffin_file"; + let mut writer = puffin_manager.writer(puffin_file_name).await.unwrap(); + + let blobs = [ + ("blob_a", "Hello, world!".as_bytes()), + ("blob_b", "Hello, Rust!".as_bytes()), + ("blob_c", "你好,世界!".as_bytes()), + ] + .into_iter() + .collect::>(); + + let dir_key = "dir_a"; + let files_in_dir = [ + ("file_a", "Hello, world!".as_bytes()), + ("file_b", "Hello, Rust!".as_bytes()), + ("file_c", "你好,世界!".as_bytes()), + ("subdir/file_d", "Hello, Puffin!".as_bytes()), + ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()), + ]; + + for (key, raw_data) in &blobs { + put_blob(key, raw_data, compression_codec, &mut writer).await; + } + put_dir(dir_key, &files_in_dir, compression_codec, &mut writer).await; + + writer.finish().await.unwrap(); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + for (key, raw_data) in &blobs { + check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + } + check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; + + // renew cache manager + let (_staging_dir, stager) = + new_bounded_stager("test_put_get_mix_file_dir_", capicity).await; + let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); + + let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); + for (key, raw_data) in &blobs { + check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + } + check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; + } + } +} + +async fn put_blob( + key: &str, + raw_data: &[u8], + compression_codec: Option, + puffin_writer: &mut impl PuffinWriter, +) { + puffin_writer + .put_blob( + key, + raw_data, + PutOptions { + compression: compression_codec, + }, + ) + .await + .unwrap(); +} + +async fn check_blob( + puffin_file_name: &str, + key: &str, + raw_data: &[u8], + stager: &BoundedStager, + puffin_reader: &R, +) where + R: PuffinReader, +{ + let blob = puffin_reader.blob(key).await.unwrap(); + let mut reader = blob.reader().await.unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, raw_data); + + let mut cached_file = stager.must_get_file(puffin_file_name, key).await; + let mut buf = Vec::new(); + cached_file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, raw_data); +} + +async fn put_dir( + key: &str, + files_in_dir: &[(&str, &[u8])], + compression_codec: Option, + puffin_writer: &mut impl PuffinWriter, +) { + let dir = create_temp_dir("dir_in_puffin_"); + for (file_key, raw_data) in files_in_dir.iter() { + let file_path = if cfg!(windows) { + dir.path().join(file_key.replace('/', "\\")) + } else { + dir.path().join(file_key) + }; + std::fs::create_dir_all(file_path.parent().unwrap()).unwrap(); + std::fs::write(&file_path, raw_data).unwrap(); + } + + puffin_writer + .put_dir( + key, + dir.path().to_path_buf(), + PutOptions { + compression: compression_codec, + }, + ) + .await + .unwrap(); +} + +async fn check_dir( + puffin_file_name: &str, + key: &str, + files_in_dir: &[(&str, &[u8])], + stager: &BoundedStager, + puffin_reader: &R, +) where + R: PuffinReader, +{ + let res_dir = puffin_reader.dir(key).await.unwrap(); + for (file_name, raw_data) in files_in_dir { + let file_path = if cfg!(windows) { + res_dir.path().join(file_name.replace('/', "\\")) + } else { + res_dir.path().join(file_name) + }; + let buf = std::fs::read(file_path).unwrap(); + assert_eq!(buf, *raw_data); + } + + let cached_dir = stager.must_get_dir(puffin_file_name, key).await; + for (file_name, raw_data) in files_in_dir { + let file_path = if cfg!(windows) { + cached_dir.as_path().join(file_name.replace('/', "\\")) + } else { + cached_dir.as_path().join(file_name) + }; + let buf = std::fs::read(file_path).unwrap(); + assert_eq!(buf, *raw_data); + } +} + +pub struct MockFileAccessor { + tempdir: TempDir, +} + +impl MockFileAccessor { + pub fn new(prefix: &str) -> Self { + let tempdir = create_temp_dir(prefix); + Self { tempdir } + } +} + +#[async_trait] +impl PuffinFileAccessor for MockFileAccessor { + type Reader = Compat; + type Writer = Compat; + + async fn reader(&self, puffin_file_name: &str) -> Result { + let f = tokio::fs::File::open(self.tempdir.path().join(puffin_file_name)) + .await + .unwrap(); + Ok(f.compat()) + } + + async fn writer(&self, puffin_file_name: &str) -> Result { + let p = self.tempdir.path().join(puffin_file_name); + if let Some(p) = p.parent() { + if !tokio::fs::try_exists(p).await.unwrap() { + tokio::fs::create_dir_all(p).await.unwrap(); + } + } + let f = tokio::fs::File::create(p).await.unwrap(); + Ok(f.compat()) + } +} diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index 5698846f48..bbde67067f 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -20,8 +20,8 @@ use futures::io::Cursor as AsyncCursor; use futures::AsyncReadExt; use tokio_util::compat::TokioAsyncReadCompatExt; -use crate::file_format::reader::{PuffinAsyncReader, PuffinFileReader, PuffinSyncReader}; -use crate::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter, PuffinSyncWriter}; +use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader}; +use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; #[test] fn test_read_empty_puffin_sync() {