From 7bcb01d26983cabd60289a1a4171fcfa6c61b5f2 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 26 Mar 2025 10:47:20 +0800 Subject: [PATCH] feat: utilize blob metadata properties (#5767) * feat: utilize blob metadata properties Signed-off-by: Zhenchi * Update src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs --------- Signed-off-by: Zhenchi --- .../src/fulltext_index/create/bloom_filter.rs | 3 +- .../src/fulltext_index/create/tantivy.rs | 3 ++ .../src/sst/index/bloom_filter/applier.rs | 2 +- .../src/sst/index/bloom_filter/creator.rs | 9 ++++- .../src/sst/index/inverted_index/applier.rs | 16 ++++++-- .../src/sst/index/inverted_index/creator.rs | 7 +++- src/mito2/src/sst/index/puffin_manager.rs | 7 +++- src/puffin/src/puffin_manager.rs | 40 ++++++++++++++++--- .../fs_puffin_manager/reader.rs | 9 +++-- .../fs_puffin_manager/writer.rs | 24 ++++++++--- src/puffin/src/puffin_manager/tests.rs | 12 ++++-- 11 files changed, 105 insertions(+), 27 deletions(-) diff --git a/src/index/src/fulltext_index/create/bloom_filter.rs b/src/index/src/fulltext_index/create/bloom_filter.rs index ba6d4eceed..1e734be9e2 100644 --- a/src/index/src/fulltext_index/create/bloom_filter.rs +++ b/src/index/src/fulltext_index/create/bloom_filter.rs @@ -91,7 +91,8 @@ impl FulltextIndexCreator for BloomFilterFulltextIndexCreator { let (index_finish, puffin_add_blob) = futures::join!( creator.finish(tx.compat_write()), - puffin_writer.put_blob(blob_key, rx.compat(), put_options) + // TODO(zhongzc): add fulltext config properties + puffin_writer.put_blob(blob_key, rx.compat(), put_options, Default::default()) ); match ( diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index 2ddc8299ae..6b09c1f0fb 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -164,6 +164,8 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator { #[cfg(test)] mod tests { + use std::collections::HashMap; + use common_test_util::temp_dir::create_temp_dir; use futures::AsyncRead; use tantivy::collector::DocSetCollector; @@ -182,6 +184,7 @@ mod tests { _key: &str, _raw_data: R, _options: PutOptions, + _properties: HashMap, ) -> puffin::error::Result where R: AsyncRead + Send, diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 2008d7cbfb..f7597773a1 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -24,7 +24,7 @@ use index::bloom_filter::applier::BloomFilterApplier; use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; -use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; +use puffin::puffin_manager::{PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 59437961b5..5c00b1a19c 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -304,7 +304,12 @@ impl BloomFilterIndexer { let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id); let (index_finish, puffin_add_blob) = futures::join!( creator.finish(tx.compat_write()), - puffin_writer.put_blob(&blob_name, rx.compat(), PutOptions::default()) + puffin_writer.put_blob( + &blob_name, + rx.compat(), + PutOptions::default(), + Default::default(), + ) ); match ( @@ -351,7 +356,7 @@ pub(crate) mod tests { use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl}; use object_store::services::Memory; use object_store::ObjectStore; - use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; + use puffin::puffin_manager::{PuffinManager, PuffinReader}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use store_api::storage::RegionId; diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 5f0b267214..c1f2325130 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -24,7 +24,7 @@ use index::inverted_index::search::index_apply::{ }; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; -use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader}; +use puffin::puffin_manager::{PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::RegionId; @@ -250,7 +250,12 @@ mod tests { ); let mut writer = puffin_manager.writer(&file_id).await.unwrap(); writer - .put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default()) + .put_blob( + INDEX_BLOB_TYPE, + Cursor::new(vec![]), + Default::default(), + Default::default(), + ) .await .unwrap(); writer.finish().await.unwrap(); @@ -298,7 +303,12 @@ mod tests { ); let mut writer = puffin_manager.writer(&file_id).await.unwrap(); writer - .put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default()) + .put_blob( + "invalid_blob_type", + Cursor::new(vec![]), + Default::default(), + Default::default(), + ) .await .unwrap(); writer.finish().await.unwrap(); diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index f67ab9aec0..8991b72aec 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -280,7 +280,12 @@ impl InvertedIndexer { // TODO(zhongzc): config bitmap type self.index_creator .finish(&mut index_writer, index::bitmap::BitmapType::Roaring), - puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default()) + puffin_writer.put_blob( + INDEX_BLOB_TYPE, + rx.compat(), + PutOptions::default(), + Default::default(), + ) ); match ( diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 161a791d32..9f288ecd16 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -210,7 +210,12 @@ mod tests { let mut writer = manager.writer(&file_id).await.unwrap(); writer - .put_blob(blob_key, Cursor::new(raw_data), PutOptions::default()) + .put_blob( + blob_key, + Cursor::new(raw_data), + PutOptions::default(), + Default::default(), + ) .await .unwrap(); let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_"); diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 2ceccf2ce1..1dfec58f5b 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -20,6 +20,7 @@ pub mod stager; #[cfg(test)] mod tests; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -27,7 +28,7 @@ use async_trait::async_trait; use common_base::range_read::RangeReader; use futures::AsyncRead; -use crate::blob_metadata::CompressionCodec; +use crate::blob_metadata::{BlobMetadata, CompressionCodec}; use crate::error::Result; use crate::file_metadata::FileMetadata; @@ -50,7 +51,13 @@ pub trait PuffinManager { pub trait PuffinWriter { /// Writes a blob associated with the specified `key` to the Puffin file. /// Returns the number of bytes written. - async fn put_blob(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result + async fn put_blob( + &mut self, + key: &str, + raw_data: R, + options: PutOptions, + properties: HashMap, + ) -> Result where R: AsyncRead + Send; @@ -87,9 +94,9 @@ pub trait PuffinReader { /// Reads a blob from the Puffin file. /// - /// The returned `BlobGuard` is used to access the blob data. - /// Users should hold the `BlobGuard` until they are done with the blob data. - async fn blob(&self, key: &str) -> Result; + /// The returned `BlobWithMetadata` is used to access the blob data and its metadata. + /// Users should hold the `BlobWithMetadata` until they are done with the blob data. + async fn blob(&self, key: &str) -> Result>; /// Reads a directory from the Puffin file. /// @@ -107,6 +114,29 @@ pub trait BlobGuard { async fn reader(&self) -> Result; } +/// `BlobWithMetadata` provides access to the blob data and its metadata. +pub struct BlobWithMetadata { + blob: B, + metadata: BlobMetadata, +} + +impl BlobWithMetadata { + /// Creates a new `BlobWithMetadata` instance. + pub fn new(blob: B, metadata: BlobMetadata) -> Self { + Self { blob, metadata } + } + + /// Returns the reader for the blob data. + pub async fn reader(&self) -> Result { + self.blob.reader().await + } + + /// Returns the metadata of the blob. + pub fn metadata(&self) -> &BlobMetadata { + &self.metadata + } +} + /// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem. /// Users should hold the `DirGuard` until they are done with the directory. #[auto_impl::auto_impl(Arc)] diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 2d08cd81a0..2722990b84 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -36,7 +36,7 @@ use crate::partial_reader::PartialReader; use crate::puffin_manager::file_accessor::PuffinFileAccessor; use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata; use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager}; -use crate::puffin_manager::{BlobGuard, PuffinReader}; +use crate::puffin_manager::{BlobGuard, BlobWithMetadata, PuffinReader}; /// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files. pub struct FsPuffinReader @@ -101,7 +101,7 @@ where self.get_puffin_file_metadata(&mut file).await } - async fn blob(&self, key: &str) -> Result { + async fn blob(&self, key: &str) -> Result> { let mut reader = self.puffin_file_accessor.reader(&self.handle).await?; if let Some(file_size_hint) = self.file_size_hint { reader.with_file_size_hint(file_size_hint); @@ -121,10 +121,11 @@ where Either::L(RandomReadBlob { handle: self.handle.clone(), accessor: self.puffin_file_accessor.clone(), - blob_metadata, + blob_metadata: blob_metadata.clone(), }) } else { // If the blob is compressed, we need to decompress it into staging space before reading. + let blob_metadata = blob_metadata.clone(); let staged_blob = self .stager .get_blob( @@ -139,7 +140,7 @@ where Either::R(staged_blob) }; - Ok(blob) + Ok(BlobWithMetadata::new(blob, blob_metadata)) } async fn dir(&self, key: &str) -> Result { diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs index 924ff5f990..61d9df52f0 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/writer.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use async_compression::futures::bufread::ZstdEncoder; @@ -65,7 +65,13 @@ where S: Stager, W: AsyncWrite + Unpin + Send, { - async fn put_blob(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result + async fn put_blob( + &mut self, + key: &str, + raw_data: R, + options: PutOptions, + properties: HashMap, + ) -> Result where R: AsyncRead + Send, { @@ -75,7 +81,7 @@ where ); let written_bytes = self - .handle_compress(key.to_string(), raw_data, options.compression) + .handle_compress(key.to_string(), raw_data, options.compression, properties) .await?; self.blob_keys.insert(key.to_string()); @@ -111,7 +117,12 @@ where let file_key = Uuid::new_v4().to_string(); written_bytes += self - .handle_compress(file_key.clone(), reader, options.compression) + .handle_compress( + file_key.clone(), + reader, + options.compression, + Default::default(), + ) .await?; let path = entry.path(); @@ -174,6 +185,7 @@ where key: String, raw_data: impl AsyncRead + Send, compression: Option, + properties: HashMap, ) -> Result { match compression { Some(CompressionCodec::Lz4) => UnsupportedCompressionSnafu { codec: "lz4" }.fail(), @@ -182,7 +194,7 @@ where blob_type: key, compressed_data: ZstdEncoder::new(BufReader::new(raw_data)), compression_codec: compression, - properties: Default::default(), + properties, }; self.puffin_file_writer.add_blob(blob).await } @@ -191,7 +203,7 @@ where blob_type: key, compressed_data: raw_data, compression_codec: compression, - properties: Default::default(), + properties, }; self.puffin_file_writer.add_blob(blob).await } diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 582e8864d8..e2f32e9498 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -23,9 +23,7 @@ use crate::blob_metadata::CompressionCodec; use crate::puffin_manager::file_accessor::MockFileAccessor; use crate::puffin_manager::fs_puffin_manager::FsPuffinManager; use crate::puffin_manager::stager::BoundedStager; -use crate::puffin_manager::{ - BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions, -}; +use crate::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc>) { let staging_dir = create_temp_dir(prefix); @@ -283,6 +281,7 @@ async fn put_blob( PutOptions { compression: compression_codec, }, + HashMap::from_iter([("test_key".to_string(), "test_value".to_string())]), ) .await .unwrap(); @@ -297,6 +296,13 @@ async fn check_blob( compressed: bool, ) { let blob = puffin_reader.blob(key).await.unwrap(); + + let blob_metadata = blob.metadata(); + assert_eq!( + blob_metadata.properties, + HashMap::from_iter([("test_key".to_string(), "test_value".to_string())]) + ); + let reader = blob.reader().await.unwrap(); let meta = reader.metadata().await.unwrap(); let bs = reader.read(0..meta.content_length).await.unwrap();