diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 7919aeb4ca..1533694bc9 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -371,9 +371,6 @@ pub struct InvertedIndexConfig { /// Memory threshold for performing an external sort during index creation. pub mem_threshold_on_create: MemoryThreshold, - /// Whether to compress the index data. - pub compress: bool, - #[deprecated = "use [IndexConfig::aux_path] instead"] #[serde(skip_serializing)] pub intermediate_path: String, @@ -396,7 +393,6 @@ impl Default for InvertedIndexConfig { create_on_compaction: Mode::Auto, apply_on_query: Mode::Auto, mem_threshold_on_create: MemoryThreshold::Auto, - compress: true, write_buffer_size: ReadableSize::mb(8), intermediate_path: String::new(), metadata_cache_size: ReadableSize::mb(32), diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 1d598efcb4..9179d8a074 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -580,7 +580,7 @@ async fn test_region_usage() { flush_region(&engine, region_id, None).await; let region_stat = region.region_usage(); - assert_eq!(region_stat.sst_usage, 3026); + assert_eq!(region_stat.sst_usage, 3010); // region total usage // Some memtables may share items. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 2407a974c1..909bf481b4 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -220,7 +220,6 @@ impl<'a> IndexerBuilder<'a> { self.intermediate_manager.clone(), self.inverted_index_config.mem_threshold_on_create(), segment_row_count, - self.inverted_index_config.compress, &self.index_options.inverted_index.ignore_column_ids, ); diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 380661d60d..c8bed65bd8 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -24,7 +24,6 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; use index::inverted_index::format::writer::InvertedIndexBlobWriter; -use puffin::blob_metadata::CompressionCodec; use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -71,9 +70,6 @@ pub struct SstIndexCreator { /// The memory usage of the index creator. memory_usage: Arc, - /// Whether to compress the index data. - compress: bool, - /// Ids of indexed columns. column_ids: HashSet, } @@ -87,7 +83,6 @@ impl SstIndexCreator { intermediate_manager: IntermediateManager, memory_usage_threshold: Option, segment_row_count: NonZeroUsize, - compress: bool, ignore_column_ids: &[ColumnId], ) -> Self { let temp_file_provider = Arc::new(TempFileProvider::new( @@ -122,7 +117,6 @@ impl SstIndexCreator { stats: Statistics::default(), aborted: false, memory_usage, - compress, column_ids, } } @@ -242,12 +236,9 @@ impl SstIndexCreator { let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write()); - let put_options = PutOptions { - compression: self.compress.then_some(CompressionCodec::Zstd), - }; let (index_finish, puffin_add_blob) = futures::join!( self.index_creator.finish(&mut index_writer), - puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options) + puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default()) ); match ( @@ -398,7 +389,6 @@ mod tests { intm_mgr, memory_threshold, NonZeroUsize::new(segment_row_count).unwrap(), - false, &[], ); diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 7a9d246951..98a240547d 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -21,8 +21,8 @@ use object_store::{FuturesAsyncReader, FuturesAsyncWriter, ObjectStore}; use puffin::error::{self as puffin_error, Result as PuffinResult}; use puffin::puffin_manager::file_accessor::PuffinFileAccessor; use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; -use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard}; -use puffin::puffin_manager::{BlobGuard, PuffinManager}; +use puffin::puffin_manager::stager::BoundedStager; +use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; use crate::error::{PuffinInitStagerSnafu, Result}; @@ -35,10 +35,11 @@ use crate::sst::index::store::{self, InstrumentedStore}; type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncReader>; type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>; -pub(crate) type BlobReader = as BlobGuard>::Reader; -pub(crate) type SstPuffinWriter = ::Writer; pub(crate) type SstPuffinManager = FsPuffinManager, ObjectStorePuffinFileAccessor>; +pub(crate) type SstPuffinReader = ::Reader; +pub(crate) type SstPuffinWriter = ::Writer; +pub(crate) type BlobReader = <::Blob as BlobGuard>::Reader; const STAGING_DIR: &str = "staging"; diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index f1435bd0e4..9a87d70592 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -61,6 +61,15 @@ impl PuffinFileReader { ); Ok(()) } + + /// Converts the reader into an owned blob reader. + pub fn into_blob_reader(self, blob_metadata: &BlobMetadata) -> PartialReader { + PartialReader::new( + self.source, + blob_metadata.offset as _, + blob_metadata.length as _, + ) + } } impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader { diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 339b266c74..f77b79c007 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -22,7 +22,6 @@ mod tests; use std::path::PathBuf; use async_trait::async_trait; -use futures::future::BoxFuture; use futures::{AsyncRead, AsyncSeek}; use crate::blob_metadata::CompressionCodec; @@ -92,10 +91,11 @@ pub trait PuffinReader { /// `BlobGuard` is provided by the `PuffinReader` to access the blob data. /// Users should hold the `BlobGuard` until they are done with the blob data. +#[async_trait] #[auto_impl::auto_impl(Arc)] pub trait BlobGuard { type Reader: AsyncRead + AsyncSeek + Unpin; - fn reader(&self) -> BoxFuture<'static, Result>; + async fn reader(&self) -> Result; } /// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem. diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs index 46deb198cc..89ef8cc451 100644 --- a/src/puffin/src/puffin_manager/file_accessor.rs +++ b/src/puffin/src/puffin_manager/file_accessor.rs @@ -21,7 +21,7 @@ use crate::error::Result; #[async_trait] #[auto_impl::auto_impl(Arc)] pub trait PuffinFileAccessor: Send + Sync + 'static { - type Reader: AsyncRead + AsyncSeek + Unpin + Send; + type Reader: AsyncRead + AsyncSeek + Unpin + Send + Sync; type Writer: AsyncWrite + Unpin + Send; /// Opens a reader for the given puffin file. diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager.rs b/src/puffin/src/puffin_manager/fs_puffin_manager.rs index 7c95532dc6..01b367a782 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager.rs @@ -46,7 +46,7 @@ impl FsPuffinManager { #[async_trait] impl PuffinManager for FsPuffinManager where - S: Stager + Clone, + S: Stager + Clone + 'static, F: PuffinFileAccessor + Clone, { type Reader = FsPuffinReader; diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index a1b8d3a8ea..ad0eccabe4 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -12,23 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::pin::Pin; +use std::task::{Context, Poll}; + use async_compression::futures::bufread::ZstdDecoder; use async_trait::async_trait; -use futures::future::BoxFuture; use futures::io::BufReader; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite}; +use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite}; use snafu::{ensure, OptionExt, ResultExt}; -use crate::blob_metadata::CompressionCodec; +use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::{ BlobIndexOutOfBoundSnafu, BlobNotFoundSnafu, DeserializeJsonSnafu, FileKeyNotMatchSnafu, ReadSnafu, Result, UnsupportedDecompressionSnafu, WriteSnafu, }; use crate::file_format::reader::{AsyncReader, PuffinFileReader}; +use crate::partial_reader::PartialReader; use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; -use crate::puffin_manager::PuffinReader; +use crate::puffin_manager::{BlobGuard, PuffinReader}; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. pub struct FsPuffinReader { @@ -55,25 +58,51 @@ impl FsPuffinReader { #[async_trait] impl PuffinReader for FsPuffinReader where - S: Stager, + S: Stager + 'static, F: PuffinFileAccessor + Clone, { - type Blob = S::Blob; + type Blob = Either, S::Blob>; type Dir = S::Dir; async fn blob(&self, key: &str) -> Result { - self.stager - .get_blob( - self.puffin_file_name.as_str(), - key, - Box::new(move |writer| { - let accessor = self.puffin_file_accessor.clone(); - let puffin_file_name = self.puffin_file_name.clone(); - let key = key.to_string(); - Self::init_blob_to_cache(puffin_file_name, key, writer, accessor) - }), - ) - .await + let reader = self + .puffin_file_accessor + .reader(&self.puffin_file_name) + .await?; + let mut file = PuffinFileReader::new(reader); + + // TODO(zhongzc): cache the metadata. + let metadata = file.metadata().await?; + let blob_metadata = metadata + .blobs + .into_iter() + .find(|m| m.blob_type == key) + .context(BlobNotFoundSnafu { blob: key })?; + + let blob = if blob_metadata.compression_codec.is_none() { + // If the blob is not compressed, we can directly read it from the puffin file. + Either::L(RandomReadBlob { + file_name: self.puffin_file_name.clone(), + accessor: self.puffin_file_accessor.clone(), + blob_metadata, + }) + } else { + // If the blob is compressed, we need to decompress it into staging space before reading. + let staged_blob = self + .stager + .get_blob( + self.puffin_file_name.as_str(), + key, + Box::new(|writer| { + Box::pin(Self::init_blob_to_stager(file, blob_metadata, writer)) + }), + ) + .await?; + + Either::R(staged_blob) + }; + + Ok(blob) } async fn dir(&self, key: &str) -> Result { @@ -85,7 +114,12 @@ where let accessor = self.puffin_file_accessor.clone(); let puffin_file_name = self.puffin_file_name.clone(); let key = key.to_string(); - Self::init_dir_to_cache(puffin_file_name, key, writer_provider, accessor) + Box::pin(Self::init_dir_to_stager( + puffin_file_name, + key, + writer_provider, + accessor, + )) }), ) .await @@ -97,79 +131,63 @@ where S: Stager, F: PuffinFileAccessor, { - fn init_blob_to_cache( - puffin_file_name: String, - key: String, + async fn init_blob_to_stager( + mut reader: PuffinFileReader, + blob_metadata: BlobMetadata, mut writer: BoxWriter, - accessor: F, - ) -> BoxFuture<'static, Result> { - Box::pin(async move { - let reader = accessor.reader(&puffin_file_name).await?; - let mut file = PuffinFileReader::new(reader); - - let metadata = file.metadata().await?; - let blob_metadata = metadata - .blobs - .iter() - .find(|m| m.blob_type == key.as_str()) - .context(BlobNotFoundSnafu { blob: key })?; - let reader = file.blob_reader(blob_metadata)?; - - let compression = blob_metadata.compression_codec; - let size = Self::handle_decompress(reader, &mut writer, compression).await?; - - Ok(size) - }) + ) -> Result { + let reader = reader.blob_reader(&blob_metadata)?; + let compression = blob_metadata.compression_codec; + let size = Self::handle_decompress(reader, &mut writer, compression).await?; + Ok(size) } - fn init_dir_to_cache( + async fn init_dir_to_stager( puffin_file_name: String, key: String, writer_provider: DirWriterProviderRef, accessor: F, - ) -> BoxFuture<'static, Result> { - Box::pin(async move { - let reader = accessor.reader(&puffin_file_name).await?; - let mut file = PuffinFileReader::new(reader); + ) -> Result { + let reader = accessor.reader(&puffin_file_name).await?; + let mut file = PuffinFileReader::new(reader); - let puffin_metadata = file.metadata().await?; - let blob_metadata = puffin_metadata - .blobs - .iter() - .find(|m| m.blob_type == key.as_str()) - .context(BlobNotFoundSnafu { blob: key })?; + let puffin_metadata = file.metadata().await?; + let blob_metadata = puffin_metadata + .blobs + .iter() + .find(|m| m.blob_type == key.as_str()) + .context(BlobNotFoundSnafu { blob: key })?; - let mut reader = file.blob_reader(blob_metadata)?; - let mut buf = vec![]; - reader.read_to_end(&mut buf).await.context(ReadSnafu)?; - let dir_meta: DirMetadata = - serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?; + let mut reader = file.blob_reader(blob_metadata)?; + let mut buf = vec![]; + reader.read_to_end(&mut buf).await.context(ReadSnafu)?; + let dir_meta: DirMetadata = + serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?; - let mut size = 0; - for file_meta in dir_meta.files { - let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context( - BlobIndexOutOfBoundSnafu { - index: file_meta.blob_index, - max_index: puffin_metadata.blobs.len(), - }, - )?; - ensure!( - blob_meta.blob_type == file_meta.key, - FileKeyNotMatchSnafu { - expected: file_meta.key, - actual: &blob_meta.blob_type, - } - ); + let mut size = 0; + for file_meta in dir_meta.files { + let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context( + BlobIndexOutOfBoundSnafu { + index: file_meta.blob_index, + max_index: puffin_metadata.blobs.len(), + }, + )?; + ensure!( + blob_meta.blob_type == file_meta.key, + FileKeyNotMatchSnafu { + expected: file_meta.key, + actual: &blob_meta.blob_type, + } + ); - let reader = file.blob_reader(blob_meta)?; - let writer = writer_provider.writer(&file_meta.relative_path).await?; + let reader = file.blob_reader(blob_meta)?; + let writer = writer_provider.writer(&file_meta.relative_path).await?; - let compression = blob_meta.compression_codec; - size += Self::handle_decompress(reader, writer, compression).await?; - } + let compression = blob_meta.compression_codec; + size += Self::handle_decompress(reader, writer, compression).await?; + } - Ok(size) - }) + Ok(size) } /// Handles the decompression of the reader and writes the decompressed data to the writer. @@ -196,3 +214,87 @@ where } } } + +/// `RandomReadBlob` is a `BlobGuard` that directly reads the blob from the puffin file. +pub struct RandomReadBlob { + file_name: String, + accessor: F, + blob_metadata: BlobMetadata, +} + +#[async_trait] +impl BlobGuard for RandomReadBlob { + type Reader = PartialReader; + + async fn reader(&self) -> Result { + ensure!( + self.blob_metadata.compression_codec.is_none(), + UnsupportedDecompressionSnafu { + decompression: self.blob_metadata.compression_codec.unwrap().to_string() + } + ); + + let reader = self.accessor.reader(&self.file_name).await?; + let blob_reader = PuffinFileReader::new(reader).into_blob_reader(&self.blob_metadata); + Ok(blob_reader) + } +} + +/// `Either` is a type that represents either `A` or `B`. +/// +/// Used to: +/// impl `AsyncRead + AsyncSeek` for `Either`, +/// impl `BlobGuard` for `Either`. +pub enum Either { + L(A), + R(B), +} + +impl AsyncRead for Either +where + A: AsyncRead + Unpin, + B: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match self.get_mut() { + Either::L(a) => Pin::new(a).poll_read(cx, buf), + Either::R(b) => Pin::new(b).poll_read(cx, buf), + } + } +} + +impl AsyncSeek for Either +where + A: AsyncSeek + Unpin, + B: AsyncSeek + Unpin, +{ + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: std::io::SeekFrom, + ) -> Poll> { + match self.get_mut() { + Either::L(a) => Pin::new(a).poll_seek(cx, pos), + Either::R(b) => Pin::new(b).poll_seek(cx, pos), + } + } +} + +#[async_trait] +impl BlobGuard for Either +where + A: BlobGuard + Sync, + B: BlobGuard + Sync, +{ + type Reader = Either; + async fn reader(&self) -> Result { + match self { + Either::L(a) => Ok(Either::L(a.reader().await?)), + Either::R(b) => Ok(Either::R(b.reader().await?)), + } + } +} diff --git a/src/puffin/src/puffin_manager/stager.rs b/src/puffin/src/puffin_manager/stager.rs index 396dd69ba2..6e1581cddb 100644 --- a/src/puffin/src/puffin_manager/stager.rs +++ b/src/puffin/src/puffin_manager/stager.rs @@ -42,19 +42,19 @@ pub type DirWriterProviderRef = Box; /// /// `Stager` will provide a `BoxWriter` that the caller of `get_blob` /// can use to write the blob into the staging area. -pub trait InitBlobFn = Fn(BoxWriter) -> WriteResult; +pub trait InitBlobFn = FnOnce(BoxWriter) -> WriteResult; /// Function that initializes a directory. /// /// `Stager` will provide a `DirWriterProvider` that the caller of `get_dir` /// can use to write files inside the directory into the staging area. -pub trait InitDirFn = Fn(DirWriterProviderRef) -> WriteResult; +pub trait InitDirFn = FnOnce(DirWriterProviderRef) -> WriteResult; /// `Stager` manages the staging area for the puffin files. #[async_trait] #[auto_impl::auto_impl(Arc)] pub trait Stager: Send + Sync { - type Blob: BlobGuard; + type Blob: BlobGuard + Sync; type Dir: DirGuard; /// Retrieves a blob, initializing it if necessary using the provided `init_fn`. diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index 1f1aaa2c01..9294497a06 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -22,7 +22,6 @@ use async_walkdir::{Filtering, WalkDir}; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; use common_telemetry::{info, warn}; -use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; use moka::future::Cache; use sha2::{Digest, Sha256}; @@ -128,7 +127,7 @@ impl Stager for BoundedStager { let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); let path = self.base_dir.join(&file_name); - let size = Self::write_blob(&path, &init_fn).await?; + let size = Self::write_blob(&path, init_fn).await?; let guard = Arc::new(FsBlobGuard { path, @@ -163,7 +162,7 @@ impl Stager for BoundedStager { let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4()); let path = self.base_dir.join(&dir_name); - let size = Self::write_dir(&path, &init_fn).await?; + let size = Self::write_dir(&path, init_fn).await?; let guard = Arc::new(FsDirGuard { path, @@ -225,7 +224,7 @@ impl BoundedStager { async fn write_blob( target_path: &PathBuf, - init_fn: &(dyn InitBlobFn + Send + Sync + '_), + init_fn: Box, ) -> Result { // To guarantee the atomicity of writing the file, we need to write // the file to a temporary file first... @@ -247,7 +246,7 @@ impl BoundedStager { async fn write_dir( target_path: &PathBuf, - init_fn: &(dyn InitDirFn + Send + Sync + '_), + init_fn: Box, ) -> Result { // To guarantee the atomicity of writing the directory, we need to write // the directory to a temporary directory first... @@ -425,16 +424,13 @@ pub struct FsBlobGuard { delete_queue: Sender, } +#[async_trait] impl BlobGuard for FsBlobGuard { type Reader = Compat; - fn reader(&self) -> BoxFuture<'static, Result> { - let path = self.path.clone(); - async move { - let file = fs::File::open(&path).await.context(OpenSnafu)?; - Ok(file.compat()) - } - .boxed() + async fn reader(&self) -> Result { + let file = fs::File::open(&self.path).await.context(OpenSnafu)?; + Ok(file.compat()) } } diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index dc106d7461..02073522be 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -62,14 +62,30 @@ async fn test_put_get_file() { writer.finish().await.unwrap(); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); - check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + check_blob( + puffin_file_name, + key, + raw_data, + &stager, + &reader, + compression_codec.is_some(), + ) + .await; // renew cache manager let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); - check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + check_blob( + puffin_file_name, + key, + raw_data, + &stager, + &reader, + compression_codec.is_some(), + ) + .await; } } } @@ -106,7 +122,15 @@ async fn test_put_get_files() { let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { - check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + check_blob( + puffin_file_name, + key, + raw_data, + &stager, + &reader, + compression_codec.is_some(), + ) + .await; } // renew cache manager @@ -114,7 +138,15 @@ async fn test_put_get_files() { let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { - check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + check_blob( + puffin_file_name, + key, + raw_data, + &stager, + &reader, + compression_codec.is_some(), + ) + .await; } } } @@ -205,7 +237,15 @@ async fn test_put_get_mix_file_dir() { let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { - check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + check_blob( + puffin_file_name, + key, + raw_data, + &stager, + &reader, + compression_codec.is_some(), + ) + .await; } check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; @@ -216,7 +256,15 @@ async fn test_put_get_mix_file_dir() { let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { - check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; + check_blob( + puffin_file_name, + key, + raw_data, + &stager, + &reader, + compression_codec.is_some(), + ) + .await; } check_dir(puffin_file_name, dir_key, &files_in_dir, &stager, &reader).await; } @@ -241,24 +289,28 @@ async fn put_blob( .unwrap(); } -async fn check_blob( +async fn check_blob( puffin_file_name: &str, key: &str, raw_data: &[u8], stager: &BoundedStager, - puffin_reader: &R, -) where - R: PuffinReader, -{ + puffin_reader: &impl PuffinReader, + compressed: bool, +) { let blob = puffin_reader.blob(key).await.unwrap(); let mut reader = blob.reader().await.unwrap(); let mut buf = Vec::new(); reader.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, raw_data); - let mut cached_file = stager.must_get_file(puffin_file_name, key).await; + if !compressed { + // If the blob is not compressed, it won't be exist in the stager. + return; + } + + let mut staged_file = stager.must_get_file(puffin_file_name, key).await; let mut buf = Vec::new(); - cached_file.read_to_end(&mut buf).await.unwrap(); + staged_file.read_to_end(&mut buf).await.unwrap(); assert_eq!(buf, raw_data); } @@ -291,15 +343,13 @@ async fn put_dir( .unwrap(); } -async fn check_dir( +async fn check_dir( puffin_file_name: &str, key: &str, files_in_dir: &[(&str, &[u8])], stager: &BoundedStager, - puffin_reader: &R, -) where - R: PuffinReader, -{ + puffin_reader: &impl PuffinReader, +) { let res_dir = puffin_reader.dir(key).await.unwrap(); for (file_name, raw_data) in files_in_dir { let file_path = if cfg!(windows) { @@ -311,12 +361,12 @@ async fn check_dir( assert_eq!(buf, *raw_data); } - let cached_dir = stager.must_get_dir(puffin_file_name, key).await; + let staged_dir = stager.must_get_dir(puffin_file_name, key).await; for (file_name, raw_data) in files_in_dir { let file_path = if cfg!(windows) { - cached_dir.as_path().join(file_name.replace('/', "\\")) + staged_dir.as_path().join(file_name.replace('/', "\\")) } else { - cached_dir.as_path().join(file_name) + staged_dir.as_path().join(file_name) }; let buf = std::fs::read(file_path).unwrap(); assert_eq!(buf, *raw_data); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 96d9316f55..9a7d982790 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -838,7 +838,6 @@ create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" mem_threshold_on_create = "auto" -compress = true metadata_cache_size = "32MiB" content_cache_size = "32MiB"