feat: utilize blob metadata properties (#5767)

* feat: utilize blob metadata properties

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-03-26 10:47:20 +08:00
committed by GitHub
parent e81213728b
commit 7bcb01d269
11 changed files with 105 additions and 27 deletions

View File

@@ -91,7 +91,8 @@ impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
let (index_finish, puffin_add_blob) = futures::join!(
creator.finish(tx.compat_write()),
puffin_writer.put_blob(blob_key, rx.compat(), put_options)
// TODO(zhongzc): add fulltext config properties
puffin_writer.put_blob(blob_key, rx.compat(), put_options, Default::default())
);
match (

View File

@@ -164,6 +164,8 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncRead;
use tantivy::collector::DocSetCollector;
@@ -182,6 +184,7 @@ mod tests {
_key: &str,
_raw_data: R,
_options: PutOptions,
_properties: HashMap<String, String>,
) -> puffin::error::Result<u64>
where
R: AsyncRead + Send,

View File

@@ -24,7 +24,7 @@ use index::bloom_filter::applier::BloomFilterApplier;
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};

View File

@@ -304,7 +304,12 @@ impl BloomFilterIndexer {
let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
let (index_finish, puffin_add_blob) = futures::join!(
creator.finish(tx.compat_write()),
puffin_writer.put_blob(&blob_name, rx.compat(), PutOptions::default())
puffin_writer.put_blob(
&blob_name,
rx.compat(),
PutOptions::default(),
Default::default(),
)
);
match (
@@ -351,7 +356,7 @@ pub(crate) mod tests {
use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;

View File

@@ -24,7 +24,7 @@ use index::inverted_index::search::index_apply::{
};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use puffin::puffin_manager::{PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -250,7 +250,12 @@ mod tests {
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
writer
.put_blob(INDEX_BLOB_TYPE, Cursor::new(vec![]), Default::default())
.put_blob(
INDEX_BLOB_TYPE,
Cursor::new(vec![]),
Default::default(),
Default::default(),
)
.await
.unwrap();
writer.finish().await.unwrap();
@@ -298,7 +303,12 @@ mod tests {
);
let mut writer = puffin_manager.writer(&file_id).await.unwrap();
writer
.put_blob("invalid_blob_type", Cursor::new(vec![]), Default::default())
.put_blob(
"invalid_blob_type",
Cursor::new(vec![]),
Default::default(),
Default::default(),
)
.await
.unwrap();
writer.finish().await.unwrap();

View File

@@ -280,7 +280,12 @@ impl InvertedIndexer {
// TODO(zhongzc): config bitmap type
self.index_creator
.finish(&mut index_writer, index::bitmap::BitmapType::Roaring),
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), PutOptions::default())
puffin_writer.put_blob(
INDEX_BLOB_TYPE,
rx.compat(),
PutOptions::default(),
Default::default(),
)
);
match (

View File

@@ -210,7 +210,12 @@ mod tests {
let mut writer = manager.writer(&file_id).await.unwrap();
writer
.put_blob(blob_key, Cursor::new(raw_data), PutOptions::default())
.put_blob(
blob_key,
Cursor::new(raw_data),
PutOptions::default(),
Default::default(),
)
.await
.unwrap();
let dir_data = create_temp_dir("test_puffin_manager_factory_dir_data_");

View File

@@ -20,6 +20,7 @@ pub mod stager;
#[cfg(test)]
mod tests;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -27,7 +28,7 @@ use async_trait::async_trait;
use common_base::range_read::RangeReader;
use futures::AsyncRead;
use crate::blob_metadata::CompressionCodec;
use crate::blob_metadata::{BlobMetadata, CompressionCodec};
use crate::error::Result;
use crate::file_metadata::FileMetadata;
@@ -50,7 +51,13 @@ pub trait PuffinManager {
pub trait PuffinWriter {
/// Writes a blob associated with the specified `key` to the Puffin file.
/// Returns the number of bytes written.
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
async fn put_blob<R>(
&mut self,
key: &str,
raw_data: R,
options: PutOptions,
properties: HashMap<String, String>,
) -> Result<u64>
where
R: AsyncRead + Send;
@@ -87,9 +94,9 @@ pub trait PuffinReader {
/// Reads a blob from the Puffin file.
///
/// The returned `BlobGuard` is used to access the blob data.
/// Users should hold the `BlobGuard` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<Self::Blob>;
/// The returned `BlobWithMetadata` is used to access the blob data and its metadata.
/// Users should hold the `BlobWithMetadata` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<BlobWithMetadata<Self::Blob>>;
/// Reads a directory from the Puffin file.
///
@@ -107,6 +114,29 @@ pub trait BlobGuard {
async fn reader(&self) -> Result<Self::Reader>;
}
/// `BlobWithMetadata` provides access to the blob data and its metadata.
pub struct BlobWithMetadata<B> {
blob: B,
metadata: BlobMetadata,
}
impl<B: BlobGuard> BlobWithMetadata<B> {
/// Creates a new `BlobWithMetadata` instance.
pub fn new(blob: B, metadata: BlobMetadata) -> Self {
Self { blob, metadata }
}
/// Returns the reader for the blob data.
pub async fn reader(&self) -> Result<B::Reader> {
self.blob.reader().await
}
/// Returns the metadata of the blob.
pub fn metadata(&self) -> &BlobMetadata {
&self.metadata
}
}
/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
#[auto_impl::auto_impl(Arc)]

View File

@@ -36,7 +36,7 @@ use crate::partial_reader::PartialReader;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, PuffinReader};
use crate::puffin_manager::{BlobGuard, BlobWithMetadata, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F>
@@ -101,7 +101,7 @@ where
self.get_puffin_file_metadata(&mut file).await
}
async fn blob(&self, key: &str) -> Result<Self::Blob> {
async fn blob(&self, key: &str) -> Result<BlobWithMetadata<Self::Blob>> {
let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
@@ -121,10 +121,11 @@ where
Either::L(RandomReadBlob {
handle: self.handle.clone(),
accessor: self.puffin_file_accessor.clone(),
blob_metadata,
blob_metadata: blob_metadata.clone(),
})
} else {
// If the blob is compressed, we need to decompress it into staging space before reading.
let blob_metadata = blob_metadata.clone();
let staged_blob = self
.stager
.get_blob(
@@ -139,7 +140,7 @@ where
Either::R(staged_blob)
};
Ok(blob)
Ok(BlobWithMetadata::new(blob, blob_metadata))
}
async fn dir(&self, key: &str) -> Result<Self::Dir> {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use async_compression::futures::bufread::ZstdEncoder;
@@ -65,7 +65,13 @@ where
S: Stager,
W: AsyncWrite + Unpin + Send,
{
async fn put_blob<R>(&mut self, key: &str, raw_data: R, options: PutOptions) -> Result<u64>
async fn put_blob<R>(
&mut self,
key: &str,
raw_data: R,
options: PutOptions,
properties: HashMap<String, String>,
) -> Result<u64>
where
R: AsyncRead + Send,
{
@@ -75,7 +81,7 @@ where
);
let written_bytes = self
.handle_compress(key.to_string(), raw_data, options.compression)
.handle_compress(key.to_string(), raw_data, options.compression, properties)
.await?;
self.blob_keys.insert(key.to_string());
@@ -111,7 +117,12 @@ where
let file_key = Uuid::new_v4().to_string();
written_bytes += self
.handle_compress(file_key.clone(), reader, options.compression)
.handle_compress(
file_key.clone(),
reader,
options.compression,
Default::default(),
)
.await?;
let path = entry.path();
@@ -174,6 +185,7 @@ where
key: String,
raw_data: impl AsyncRead + Send,
compression: Option<CompressionCodec>,
properties: HashMap<String, String>,
) -> Result<u64> {
match compression {
Some(CompressionCodec::Lz4) => UnsupportedCompressionSnafu { codec: "lz4" }.fail(),
@@ -182,7 +194,7 @@ where
blob_type: key,
compressed_data: ZstdEncoder::new(BufReader::new(raw_data)),
compression_codec: compression,
properties: Default::default(),
properties,
};
self.puffin_file_writer.add_blob(blob).await
}
@@ -191,7 +203,7 @@ where
blob_type: key,
compressed_data: raw_data,
compression_codec: compression,
properties: Default::default(),
properties,
};
self.puffin_file_writer.add_blob(blob).await
}

View File

@@ -23,9 +23,7 @@ use crate::blob_metadata::CompressionCodec;
use crate::puffin_manager::file_accessor::MockFileAccessor;
use crate::puffin_manager::fs_puffin_manager::FsPuffinManager;
use crate::puffin_manager::stager::BoundedStager;
use crate::puffin_manager::{
BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions,
};
use crate::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};
async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
@@ -283,6 +281,7 @@ async fn put_blob(
PutOptions {
compression: compression_codec,
},
HashMap::from_iter([("test_key".to_string(), "test_value".to_string())]),
)
.await
.unwrap();
@@ -297,6 +296,13 @@ async fn check_blob(
compressed: bool,
) {
let blob = puffin_reader.blob(key).await.unwrap();
let blob_metadata = blob.metadata();
assert_eq!(
blob_metadata.properties,
HashMap::from_iter([("test_key".to_string(), "test_value".to_string())])
);
let reader = blob.reader().await.unwrap();
let meta = reader.metadata().await.unwrap();
let bs = reader.read(0..meta.content_length).await.unwrap();