feat(index): add file_size_hint for remote blob reader (#5147)

feat(index): add file_size_hint for remote blob reader
This commit is contained in:
Weny Xu
2024-12-12 12:45:40 +08:00
committed by GitHub
parent 03ad6e2a8d
commit 2137c53274
9 changed files with 97 additions and 15 deletions

View File

@@ -36,6 +36,11 @@ pub struct Metadata {
/// `RangeReader` reads a range of bytes from a source.
#[async_trait]
pub trait RangeReader: Send + Unpin {
/// Sets the file size hint for the reader.
///
/// It's used to optimize the reading process by reducing the number of remote requests.
fn with_file_size_hint(&mut self, file_size_hint: u64);
/// Returns the metadata of the source.
async fn metadata(&mut self) -> io::Result<Metadata>;
@@ -70,6 +75,10 @@ pub trait RangeReader: Send + Unpin {
#[async_trait]
impl<R: ?Sized + RangeReader> RangeReader for &mut R {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
(*self).with_file_size_hint(file_size_hint)
}
async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
@@ -186,6 +195,10 @@ impl<R: RangeReader + 'static> AsyncRead for AsyncReadAdapter<R> {
#[async_trait]
impl RangeReader for Vec<u8> {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}
async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.len() as u64,
@@ -222,6 +235,10 @@ impl FileReader {
#[async_trait]
impl RangeReader for FileReader {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}
async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.content_length,

View File

@@ -146,13 +146,33 @@ pub enum IndexType {
}
impl FileMeta {
/// Returns true if the file has an inverted index
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}
/// Returns true if the file has a fulltext index
pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}
/// Returns the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Some(self.index_file_size)
} else {
None
}
}
/// Returns the size of the fulltext index file
pub fn fulltext_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.fulltext_index_available() {
Some(self.index_file_size)
} else {
None
}
}
}
/// Handle to a SST file.

View File

@@ -113,7 +113,7 @@ impl InvertedIndexApplier {
}
/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
@@ -129,8 +129,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
self.remote_blob_reader(file_id).await?
self.remote_blob_reader(file_id, file_size_hint).await?
}
};
@@ -181,16 +180,22 @@ impl InvertedIndexApplier {
}
/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
async fn remote_blob_reader(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(INDEX_BLOB_TYPE)
.await
.context(PuffinReadBlobSnafu)?
@@ -250,7 +255,7 @@ mod tests {
Box::new(mock_index_applier),
puffin_manager_factory,
);
let output = sst_index_applier.apply(file_id).await.unwrap();
let output = sst_index_applier.apply(file_id, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
@@ -290,7 +295,7 @@ mod tests {
Box::new(mock_index_applier),
puffin_manager_factory,
);
let res = sst_index_applier.apply(file_id).await;
let res = sst_index_applier.apply(file_id, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}

View File

@@ -464,7 +464,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id)
.apply(sst_file_id, None)
.await
.unwrap()
.matched_segment_ids

View File

@@ -68,6 +68,7 @@ impl InstrumentedStore {
path: path.to_string(),
read_byte_count,
read_count,
file_size_hint: None,
})
}
@@ -262,15 +263,27 @@ pub(crate) struct InstrumentedRangeReader<'a> {
path: String,
read_byte_count: &'a IntCounter,
read_count: &'a IntCounter,
file_size_hint: Option<u64>,
}
#[async_trait]
impl RangeReader for InstrumentedRangeReader<'_> {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
self.file_size_hint = Some(file_size_hint);
}
async fn metadata(&mut self) -> io::Result<Metadata> {
let stat = self.store.stat(&self.path).await?;
Ok(Metadata {
content_length: stat.content_length(),
})
match self.file_size_hint {
Some(file_size_hint) => Ok(Metadata {
content_length: file_size_hint,
}),
None => {
let stat = self.store.stat(&self.path).await?;
Ok(Metadata {
content_length: stat.content_length(),
})
}
}
}
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {

View File

@@ -475,8 +475,11 @@ impl ParquetReaderBuilder {
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}
let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
let file_size_hint = self.file_handle.meta_ref().inverted_index_size();
let apply_output = match index_applier
.apply(self.file_handle.file_id(), file_size_hint)
.await
{
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {

View File

@@ -23,6 +23,10 @@ use crate::partial_reader::PartialReader;
#[async_trait]
impl<R: RangeReader> RangeReader for PartialReader<R> {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}
async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.size,

View File

@@ -73,11 +73,12 @@ pub struct PutOptions {
/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinReader {
type Blob: BlobGuard;
type Dir: DirGuard;
fn with_file_size_hint(self, file_size_hint: Option<u64>) -> Self;
/// Reads a blob from the Puffin file.
///
/// The returned `BlobGuard` is used to access the blob data.

View File

@@ -43,6 +43,9 @@ pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,
/// The file size hint.
file_size_hint: Option<u64>,
/// The stager.
stager: S,
@@ -62,6 +65,7 @@ impl<S, F> FsPuffinReader<S, F> {
) -> Self {
Self {
puffin_file_name,
file_size_hint: None,
stager,
puffin_file_accessor,
puffin_file_metadata_cache,
@@ -78,11 +82,19 @@ where
type Blob = Either<RandomReadBlob<F>, S::Blob>;
type Dir = S::Dir;
fn with_file_size_hint(mut self, file_size_hint: Option<u64>) -> Self {
self.file_size_hint = file_size_hint;
self
}
async fn blob(&self, key: &str) -> Result<Self::Blob> {
let reader = self
let mut reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);
let metadata = self.get_puffin_file_metadata(&mut file).await?;
@@ -303,6 +315,13 @@ where
A: RangeReader,
B: RangeReader,
{
fn with_file_size_hint(&mut self, file_size_hint: u64) {
match self {
Either::L(a) => a.with_file_size_hint(file_size_hint),
Either::R(b) => b.with_file_size_hint(file_size_hint),
}
}
async fn metadata(&mut self) -> io::Result<Metadata> {
match self {
Either::L(a) => a.metadata().await,