diff --git a/Cargo.lock b/Cargo.lock index 6dd85f06ec..6875f1e881 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8615,6 +8615,7 @@ dependencies = [ "async-compression 0.4.11", "async-trait", "async-walkdir", + "auto_impl", "base64 0.21.7", "bitflags 2.5.0", "common-error", diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 85cfbfd6b7..f953ca5dd4 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -21,7 +21,7 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore}; use puffin::error::{self as puffin_error, Result as PuffinResult}; use puffin::puffin_manager::file_accessor::PuffinFileAccessor; use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; -use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard, FsDirGuard}; +use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard}; use puffin::puffin_manager::BlobGuard; use snafu::ResultExt; @@ -36,12 +36,8 @@ type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncR type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; pub(crate) type BlobReader = as BlobGuard>::Reader; -pub(crate) type SstPuffinManager = FsPuffinManager< - Arc, - Arc, - InstrumentedAsyncRead, - InstrumentedAsyncWrite, ->; +pub(crate) type SstPuffinManager = + FsPuffinManager, ObjectStorePuffinFileAccessor>; const STAGING_DIR: &str = "staging"; @@ -75,12 +71,12 @@ impl PuffinManagerFactory { pub(crate) fn build(&self, store: ObjectStore) -> SstPuffinManager { let store = InstrumentedStore::new(store).with_write_buffer_size(self.write_buffer_size); let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store); - SstPuffinManager::new(self.stager.clone(), Arc::new(puffin_file_accessor)) + SstPuffinManager::new(self.stager.clone(), puffin_file_accessor) } } +#[cfg(test)] impl PuffinManagerFactory { - #[cfg(test)] pub(crate) async fn new_for_test_async( prefix: &str, ) -> (common_test_util::temp_dir::TempDir, Self) { @@ -91,7 +87,6 @@ impl PuffinManagerFactory { (tempdir, factory) } - #[cfg(test)] pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) { let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); @@ -103,6 +98,7 @@ impl PuffinManagerFactory { } /// A `PuffinFileAccessor` implementation that uses an object store as the underlying storage. +#[derive(Clone)] pub(crate) struct ObjectStorePuffinFileAccessor { object_store: InstrumentedStore, } @@ -152,9 +148,7 @@ mod tests { use futures::AsyncReadExt; use object_store::services::Memory; use puffin::blob_metadata::CompressionCodec; - use puffin::puffin_manager::{ - BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions, - }; + use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; use super::*; diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index ab1acd8983..22c429cd84 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -11,6 +11,7 @@ workspace = true async-compression = { version = "0.4", features = ["futures-io", "zstd"] } async-trait.workspace = true async-walkdir = "2.0.0" +auto_impl = "1.2.0" base64.workspace = true bitflags.workspace = true common-error.workspace = true diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 7b8df56f1c..339b266c74 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -72,6 +72,7 @@ pub struct PutOptions { /// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file. #[async_trait] +#[auto_impl::auto_impl(Arc)] pub trait PuffinReader { type Blob: BlobGuard; type Dir: DirGuard; @@ -91,6 +92,7 @@ pub trait PuffinReader { /// `BlobGuard` is provided by the `PuffinReader` to access the blob data. /// Users should hold the `BlobGuard` until they are done with the blob data. +#[auto_impl::auto_impl(Arc)] pub trait BlobGuard { type Reader: AsyncRead + AsyncSeek + Unpin; fn reader(&self) -> BoxFuture<'static, Result>; @@ -98,6 +100,7 @@ pub trait BlobGuard { /// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem. /// Users should hold the `DirGuard` until they are done with the directory. +#[auto_impl::auto_impl(Arc)] pub trait DirGuard { fn path(&self) -> &PathBuf; } diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs index 43d2b248fe..46deb198cc 100644 --- a/src/puffin/src/puffin_manager/file_accessor.rs +++ b/src/puffin/src/puffin_manager/file_accessor.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use async_trait::async_trait; use futures::{AsyncRead, AsyncSeek, AsyncWrite}; @@ -21,9 +19,10 @@ use crate::error::Result; /// `PuffinFileAccessor` is for opening readers and writers for puffin files. #[async_trait] -pub trait PuffinFileAccessor { - type Reader: AsyncRead + AsyncSeek; - type Writer: AsyncWrite; +#[auto_impl::auto_impl(Arc)] +pub trait PuffinFileAccessor: Send + Sync + 'static { + type Reader: AsyncRead + AsyncSeek + Unpin + Send; + type Writer: AsyncWrite + Unpin + Send; /// Opens a reader for the given puffin file. async fn reader(&self, puffin_file_name: &str) -> Result; @@ -31,6 +30,3 @@ pub trait PuffinFileAccessor { /// Creates a writer for the given puffin file. async fn writer(&self, puffin_file_name: &str) -> Result; } - -pub type PuffinFileAccessorRef = - Arc + Send + Sync>; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 99c23459a8..7c95532dc6 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -17,30 +17,25 @@ mod reader; mod writer; use async_trait::async_trait; -use futures::{AsyncRead, AsyncSeek, AsyncWrite}; pub use reader::FsPuffinReader; pub use writer::FsPuffinWriter; +use super::file_accessor::PuffinFileAccessor; use crate::error::Result; -use crate::puffin_manager::file_accessor::PuffinFileAccessorRef; -use crate::puffin_manager::stager::StagerRef; -use crate::puffin_manager::{BlobGuard, DirGuard, PuffinManager}; +use crate::puffin_manager::stager::Stager; +use crate::puffin_manager::PuffinManager; /// `FsPuffinManager` is a `PuffinManager` that provides readers and writers for puffin data in filesystem. -pub struct FsPuffinManager { +pub struct FsPuffinManager { /// The stager. - stager: StagerRef, - + stager: S, /// The puffin file accessor. - puffin_file_accessor: PuffinFileAccessorRef, + puffin_file_accessor: F, } -impl FsPuffinManager { +impl FsPuffinManager { /// Creates a new `FsPuffinManager` with the specified `stager` and `puffin_file_accessor`. - pub fn new( - stager: StagerRef, - puffin_file_accessor: PuffinFileAccessorRef, - ) -> Self { + pub fn new(stager: S, puffin_file_accessor: F) -> Self { Self { stager, puffin_file_accessor, @@ -49,15 +44,13 @@ impl FsPuffinManager { } #[async_trait] -impl PuffinManager for FsPuffinManager +impl PuffinManager for FsPuffinManager where - B: BlobGuard, - D: DirGuard, - AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, - AW: AsyncWrite + Send + Unpin + 'static, + S: Stager + Clone, + F: PuffinFileAccessor + Clone, { - type Reader = FsPuffinReader; - type Writer = FsPuffinWriter; + type Reader = FsPuffinReader; + type Writer = FsPuffinWriter; async fn reader(&self, puffin_file_name: &str) -> Result { Ok(FsPuffinReader::new( diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 2ce79857a9..a1b8d3a8ea 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -16,7 +16,7 @@ use async_compression::futures::bufread::ZstdDecoder; use async_trait::async_trait; use futures::future::BoxFuture; use futures::io::BufReader; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite}; +use futures::{AsyncRead, AsyncReadExt, AsyncWrite}; use snafu::{ensure, OptionExt, ResultExt}; use crate::blob_metadata::CompressionCodec; @@ -25,29 +25,25 @@ use crate::error::{ ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, }; use crate::file_format::reader::{AsyncReader, PuffinFileReader}; -use crate::puffin_manager::file_accessor::PuffinFileAccessorRef; +use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; -use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, StagerRef}; -use crate::puffin_manager::{BlobGuard, DirGuard, PuffinReader}; +use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; +use crate::puffin_manager::PuffinReader; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. -pub struct FsPuffinReader { +pub struct FsPuffinReader { /// The name of the puffin file. puffin_file_name: String, /// The stager. - stager: StagerRef, + stager: S, /// The puffin file accessor. - puffin_file_accessor: PuffinFileAccessorRef, + puffin_file_accessor: F, } -impl FsPuffinReader { - pub(crate) fn new( - puffin_file_name: String, - stager: StagerRef, - puffin_file_accessor: PuffinFileAccessorRef, - ) -> Self { +impl FsPuffinReader { + pub(crate) fn new(puffin_file_name: String, stager: S, puffin_file_accessor: F) -> Self { Self { puffin_file_name, stager, @@ -57,15 +53,13 @@ impl FsPuffinReader { } #[async_trait] -impl PuffinReader for FsPuffinReader +impl PuffinReader for FsPuffinReader where - B: BlobGuard, - D: DirGuard, - AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, - AW: AsyncWrite + 'static, + S: Stager, + F: PuffinFileAccessor + Clone, { - type Blob = B; - type Dir = D; + type Blob = S::Blob; + type Dir = S::Dir; async fn blob(&self, key: &str) -> Result { self.stager @@ -98,18 +92,16 @@ where } } -impl FsPuffinReader +impl FsPuffinReader where - B: BlobGuard, - G: DirGuard, - AR: AsyncRead + AsyncSeek + Send + Unpin + 'static, - AW: AsyncWrite + 'static, + S: Stager, + F: PuffinFileAccessor, { fn init_blob_to_cache( puffin_file_name: String, key: String, mut writer: BoxWriter, - accessor: PuffinFileAccessorRef, + accessor: F, ) -> BoxFuture<'static, Result> { Box::pin(async move { let reader = accessor.reader(&puffin_file_name).await?; @@ -134,7 +126,7 @@ where puffin_file_name: String, key: String, writer_provider: DirWriterProviderRef, - accessor: PuffinFileAccessorRef, + accessor: F, ) -> BoxFuture<'static, Result> { Box::pin(async move { let reader = accessor.reader(&puffin_file_name).await?; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs index 10338aad74..ab7227606d 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs @@ -30,16 +30,16 @@ use crate::error::{ }; use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; use crate::puffin_manager::fs_puffin_manager::dir_meta::{DirFileMetadata, DirMetadata}; -use crate::puffin_manager::stager::StagerRef; -use crate::puffin_manager::{BlobGuard, DirGuard, PuffinWriter, PutOptions}; +use crate::puffin_manager::stager::Stager; +use crate::puffin_manager::{PuffinWriter, PutOptions}; /// `FsPuffinWriter` is a `PuffinWriter` that writes blobs and directories to a puffin file. -pub struct FsPuffinWriter { +pub struct FsPuffinWriter { /// The name of the puffin file. puffin_file_name: String, /// The stager. - stager: StagerRef, + stager: S, /// The underlying `PuffinFileWriter`. puffin_file_writer: PuffinFileWriter, @@ -48,8 +48,8 @@ pub struct FsPuffinWriter { blob_keys: HashSet, } -impl FsPuffinWriter { - pub(crate) fn new(puffin_file_name: String, stager: StagerRef, writer: W) -> Self { +impl FsPuffinWriter { + pub(crate) fn new(puffin_file_name: String, stager: S, writer: W) -> Self { Self { puffin_file_name, stager, @@ -60,10 +60,9 @@ impl FsPuffinWriter { } #[async_trait] -impl PuffinWriter for FsPuffinWriter +impl PuffinWriter for FsPuffinWriter where - B: BlobGuard, - D: DirGuard, + S: Stager, W: AsyncWrite + Unpin + Send, { async fn put_blob(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result @@ -164,10 +163,9 @@ where } } -impl FsPuffinWriter +impl FsPuffinWriter where - B: BlobGuard, - G: DirGuard, + S: Stager, W: AsyncWrite + Unpin + Send, { /// Compresses the raw data and writes it to the puffin file. diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index 47d2eb8eb0..396dd69ba2 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -15,7 +15,6 @@ mod bounded_stager; use std::path::PathBuf; -use std::sync::Arc; use async_trait::async_trait; pub use bounded_stager::{BoundedStager, FsBlobGuard, FsDirGuard}; @@ -53,7 +52,8 @@ pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult; /// `Stager` manages the staging area for the puffin files. #[async_trait] -pub trait Stager { +#[auto_impl::auto_impl(Arc)] +pub trait Stager: Send + Sync { type Blob: BlobGuard; type Dir: DirGuard; @@ -88,5 +88,3 @@ pub trait Stager { dir_size: u64, ) -> Result<()>; } - -pub type StagerRef = Arc + Send + Sync>; diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index e63d4f5524..1f1aaa2c01 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -425,7 +425,7 @@ pub struct FsBlobGuard { delete_queue: Sender, } -impl BlobGuard for Arc { +impl BlobGuard for FsBlobGuard { type Reader = Compat; fn reader(&self) -> BoxFuture<'static, Result> { @@ -460,7 +460,7 @@ pub struct FsDirGuard { delete_queue: Sender, } -impl DirGuard for Arc { +impl DirGuard for FsDirGuard { fn path(&self) -> &PathBuf { &self.path }