refactor(puffin): adjust generic parameters

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-04 06:58:18 +00:00
parent 03c933c006
commit 81ea1b6ee4
10 changed files with 62 additions and 86 deletions

1
Cargo.lock generated
View File

@@ -8615,6 +8615,7 @@ dependencies = [
"async-compression 0.4.11",
"async-trait",
"async-walkdir",
"auto_impl",
"base64 0.21.7",
"bitflags 2.5.0",
"common-error",

View File

@@ -21,7 +21,7 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard, FsDirGuard};
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
use puffin::puffin_manager::BlobGuard;
use snafu::ResultExt;
@@ -36,12 +36,8 @@ type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncR
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinManager = FsPuffinManager<
Arc<FsBlobGuard>,
Arc<FsDirGuard>,
InstrumentedAsyncRead,
InstrumentedAsyncWrite,
>;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
const STAGING_DIR: &str = "staging";
@@ -75,12 +71,12 @@ impl PuffinManagerFactory {
pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager {
let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size);
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
SstPuffinManager::new(self.stager.clone(), Arc::new(puffin_file_accessor))
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
}
#[cfg(test)]
impl PuffinManagerFactory {
#[cfg(test)]
pub(crate) async fn new_for_test_async(
prefix: &str,
) -> (common_test_util::temp_dir::TempDir, Self) {
@@ -91,7 +87,6 @@ impl PuffinManagerFactory {
(tempdir, factory)
}
#[cfg(test)]
pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
@@ -103,6 +98,7 @@ impl PuffinManagerFactory {
}
/// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage.
#[derive(Clone)]
pub(crate) struct ObjectStorePuffinFileAccessor {
object_store: InstrumentedStore,
}
@@ -152,9 +148,7 @@ mod tests {
use futures::AsyncReadExt;
use object_store::services::Memory;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use super::*;

View File

@@ -11,6 +11,7 @@ workspace = true
async-compression = { version = "0.4", features = ["futures-io", "zstd"] }
async-trait.workspace = true
async-walkdir = "2.0.0"
auto_impl = "1.2.0"
base64.workspace = true
bitflags.workspace = true
common-error.workspace = true

View File

@@ -72,6 +72,7 @@ pub struct PutOptions {
/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
#[auto_impl::auto_impl(Box, Rc, Arc)]
pub trait PuffinReader {
type Blob: BlobGuard;
type Dir: DirGuard;
@@ -91,6 +92,7 @@ pub trait PuffinReader {
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
#[auto_impl::auto_impl(Box, Rc, Arc)]
pub trait BlobGuard {
type Reader: AsyncRead + AsyncSeek + Unpin;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>>;
@@ -98,6 +100,7 @@ pub trait BlobGuard {
/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
#[auto_impl::auto_impl(Box, Rc, Arc)]
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
@@ -21,9 +19,10 @@ use crate::error::Result;
/// `PuffinFileAccessor` is for opening readers and writers for puffin files.
#[async_trait]
pub trait PuffinFileAccessor {
type Reader: AsyncRead + AsyncSeek;
type Writer: AsyncWrite;
#[auto_impl::auto_impl(Box, Rc, Arc)]
pub trait PuffinFileAccessor: Send + Sync + 'static {
type Reader: AsyncRead + AsyncSeek + Unpin + Send;
type Writer: AsyncWrite + Unpin + Send;
/// Opens a reader for the given puffin file.
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader>;
@@ -31,6 +30,3 @@ pub trait PuffinFileAccessor {
/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
}
pub type PuffinFileAccessorRef<R, W> =
Arc<dyn PuffinFileAccessor<Reader = R, Writer = W> + Send + Sync>;

View File

@@ -17,30 +17,25 @@ mod reader;
mod writer;
use async_trait::async_trait;
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
pub use reader::FsPuffinReader;
pub use writer::FsPuffinWriter;
use super::file_accessor::PuffinFileAccessor;
use crate::error::Result;
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinManager};
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::PuffinManager;
/// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem.
pub struct FsPuffinManager<B, D, AR, AW> {
pub struct FsPuffinManager<S, F> {
/// The stager.
stager: StagerRef<B, D>,
stager: S,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
puffin_file_accessor: F,
}
impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
impl<S, F> FsPuffinManager<S, F> {
/// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`.
pub fn new(
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
pub fn new(stager: S, puffin_file_accessor: F) -> Self {
Self {
stager,
puffin_file_accessor,
@@ -49,15 +44,13 @@ impl<B, D, AR, AW> FsPuffinManager<B, D, AR, AW> {
}
#[async_trait]
impl<B, D, AR, AW> PuffinManager for FsPuffinManager<B, D, AR, AW>
impl<S, F> PuffinManager for FsPuffinManager<S, F>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + Send + Unpin + 'static,
S: Stager + Clone,
F: PuffinFileAccessor + Clone,
{
type Reader = FsPuffinReader<B, D, AR, AW>;
type Writer = FsPuffinWriter<B, D, AW>;
type Reader = FsPuffinReader<S, F>;
type Writer = FsPuffinWriter<S, F::Writer>;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FsPuffinReader::new(

View File

@@ -16,7 +16,7 @@ use async_compression::futures::bufread::ZstdDecoder;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::io::BufReader;
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite};
use futures::{AsyncRead, AsyncReadExt, AsyncWrite};
use snafu::{ensure, OptionExt, ResultExt};
use crate::blob_metadata::CompressionCodec;
@@ -25,29 +25,25 @@ use crate::error::{
ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu,
};
use crate::file_format::reader::{AsyncReader, PuffinFileReader};
use crate::puffin_manager::file_accessor::PuffinFileAccessorRef;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, StagerRef};
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader};
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::PuffinReader;
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<B, G, AR, AW> {
pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,
/// The stager.
stager: StagerRef<B, G>,
stager: S,
/// The puffin file accessor.
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
puffin_file_accessor: F,
}
impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
pub(crate) fn new(
puffin_file_name: String,
stager: StagerRef<B, D>,
puffin_file_accessor: PuffinFileAccessorRef<AR, AW>,
) -> Self {
impl<S, F> FsPuffinReader<S, F> {
pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self {
Self {
puffin_file_name,
stager,
@@ -57,15 +53,13 @@ impl<B, D, AR, AW> FsPuffinReader<B, D, AR, AW> {
}
#[async_trait]
impl<B, D, AR, AW> PuffinReader for FsPuffinReader<B, D, AR, AW>
impl<S, F> PuffinReader for FsPuffinReader<S, F>
where
B: BlobGuard,
D: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
S: Stager,
F: PuffinFileAccessor + Clone,
{
type Blob = B;
type Dir = D;
type Blob = S::Blob;
type Dir = S::Dir;
async fn blob(&self, key: &str) -> Result<Self::Blob> {
self.stager
@@ -98,18 +92,16 @@ where
}
}
impl<B, G, AR, AW> FsPuffinReader<B, G, AR, AW>
impl<S, F> FsPuffinReader<S, F>
where
B: BlobGuard,
G: DirGuard,
AR: AsyncRead + AsyncSeek + Send + Unpin + 'static,
AW: AsyncWrite + 'static,
S: Stager,
F: PuffinFileAccessor,
{
fn init_blob_to_cache(
puffin_file_name: String,
key: String,
mut writer: BoxWriter,
accessor: PuffinFileAccessorRef<AR, AW>,
accessor: F,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;
@@ -134,7 +126,7 @@ where
puffin_file_name: String,
key: String,
writer_provider: DirWriterProviderRef,
accessor: PuffinFileAccessorRef<AR, AW>,
accessor: F,
) -> BoxFuture<'static, Result<u64>> {
Box::pin(async move {
let reader = accessor.reader(&puffin_file_name).await?;

View File

@@ -30,16 +30,16 @@ use crate::error::{
};
use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata};
use crate::puffin_manager::stager::StagerRef;
use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions};
use crate::puffin_manager::stager::Stager;
use crate::puffin_manager::{PuffinWriter, PutOptions};
/// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file.
pub struct FsPuffinWriter<B, D, W> {
pub struct FsPuffinWriter<S, W> {
/// The name of the puffin file.
puffin_file_name: String,
/// The stager.
stager: StagerRef<B, D>,
stager: S,
/// The underlying `PuffinFileWriter`.
puffin_file_writer: PuffinFileWriter<W>,
@@ -48,8 +48,8 @@ pub struct FsPuffinWriter<B, D, W> {
blob_keys: HashSet<String>,
}
impl<B, D, W> FsPuffinWriter<B, D, W> {
pub(crate) fn new(puffin_file_name: String, stager: StagerRef<B, D>, writer: W) -> Self {
impl<S, W> FsPuffinWriter<S, W> {
pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self {
Self {
puffin_file_name,
stager,
@@ -60,10 +60,9 @@ impl<B, D, W> FsPuffinWriter<B, D, W> {
}
#[async_trait]
impl<B, D, W> PuffinWriter for FsPuffinWriter<B, D, W>
impl<S, W> PuffinWriter for FsPuffinWriter<S, W>
where
B: BlobGuard,
D: DirGuard,
S: Stager,
W: AsyncWrite + Unpin + Send,
{
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
@@ -164,10 +163,9 @@ where
}
}
impl<B, G, W> FsPuffinWriter<B, G, W>
impl<S, W> FsPuffinWriter<S, W>
where
B: BlobGuard,
G: DirGuard,
S: Stager,
W: AsyncWrite + Unpin + Send,
{
/// Compresses the raw data and writes it to the puffin file.

View File

@@ -15,7 +15,6 @@
mod bounded_stager;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard};
@@ -53,7 +52,8 @@ pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult;
/// `Stager` manages the staging area for the puffin files.
#[async_trait]
pub trait Stager {
#[auto_impl::auto_impl(Box, Rc, Arc)]
pub trait Stager: Send + Sync {
type Blob: BlobGuard;
type Dir: DirGuard;
@@ -88,5 +88,3 @@ pub trait Stager {
dir_size: u64,
) -> Result<()>;
}
pub type StagerRef<B, D> = Arc<dyn Stager<Blob = B, Dir = D> + Send + Sync>;

View File

@@ -425,7 +425,7 @@ pub struct FsBlobGuard {
delete_queue: Sender<DeleteTask>,
}
impl BlobGuard for Arc<FsBlobGuard> {
impl BlobGuard for FsBlobGuard {
type Reader = Compat<fs::File>;
fn reader(&self) -> BoxFuture<'static, Result<Self::Reader>> {
@@ -460,7 +460,7 @@ pub struct FsDirGuard {
delete_queue: Sender<DeleteTask>,
}
impl DirGuard for Arc<FsDirGuard> {
impl DirGuard for FsDirGuard {
fn path(&self) -> &PathBuf {
&self.path
}