diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs index 920b2e1f8c..4e7a48a8b7 100644 --- a/src/common/base/src/range_read.rs +++ b/src/common/base/src/range_read.rs @@ -60,21 +60,46 @@ pub trait RangeReader: Send + Unpin { } } -/// Implement `RangeReader` for a type that implements `AsyncRead + AsyncSeek`. +#[async_trait] +impl RangeReader for &mut R { + async fn metadata(&mut self) -> io::Result { + (*self).metadata().await + } + async fn read(&mut self, range: Range) -> io::Result { + (*self).read(range).await + } + async fn read_into( + &mut self, + range: Range, + buf: &mut (impl BufMut + Send), + ) -> io::Result<()> { + (*self).read_into(range, buf).await + } + async fn read_vec(&mut self, ranges: &[Range]) -> io::Result> { + (*self).read_vec(ranges).await + } +} + +/// `RangeReaderAdapter` bridges `RangeReader` and `AsyncRead + AsyncSeek`. +pub struct RangeReaderAdapter(pub R); + +/// Implements `RangeReader` for a type that implements `AsyncRead + AsyncSeek`. /// /// TODO(zhongzc): It's a temporary solution for porting the codebase from `AsyncRead + AsyncSeek` to `RangeReader`. /// Until the codebase is fully ported to `RangeReader`, remove this implementation. #[async_trait] -impl RangeReader for R { +impl RangeReader + for RangeReaderAdapter +{ async fn metadata(&mut self) -> io::Result { - let content_length = self.seek(io::SeekFrom::End(0)).await?; + let content_length = self.0.seek(io::SeekFrom::End(0)).await?; Ok(Metadata { content_length }) } async fn read(&mut self, range: Range) -> io::Result { let mut buf = vec![0; (range.end - range.start) as usize]; - self.seek(io::SeekFrom::Start(range.start)).await?; - self.read_exact(&mut buf).await?; + self.0.seek(io::SeekFrom::Start(range.start)).await?; + self.0.read_exact(&mut buf).await?; Ok(Bytes::from(buf)) } } diff --git a/src/index/src/inverted_index/format/reader/blob.rs b/src/index/src/inverted_index/format/reader/blob.rs index 5da70e3748..f79d651e79 100644 --- a/src/index/src/inverted_index/format/reader/blob.rs +++ b/src/index/src/inverted_index/format/reader/blob.rs @@ -80,6 +80,7 @@ impl InvertedIndexReader for InvertedIndexBlobReader { #[cfg(test)] mod tests { use common_base::bit_vec::prelude::*; + use common_base::range_read::RangeReaderAdapter; use fst::MapBuilder; use futures::io::Cursor; use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexMetas}; @@ -162,7 +163,8 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_reader_metadata() { let blob = create_inverted_index_blob(); - let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + let cursor = RangeReaderAdapter(Cursor::new(blob)); + let mut blob_reader = InvertedIndexBlobReader::new(cursor); let metas = blob_reader.metadata().await.unwrap(); assert_eq!(metas.metas.len(), 2); @@ -189,7 +191,8 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_reader_fst() { let blob = create_inverted_index_blob(); - let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + let cursor = RangeReaderAdapter(Cursor::new(blob)); + let mut blob_reader = InvertedIndexBlobReader::new(cursor); let metas = blob_reader.metadata().await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); @@ -221,7 +224,8 @@ mod tests { #[tokio::test] async fn test_inverted_index_blob_reader_bitmap() { let blob = create_inverted_index_blob(); - let mut blob_reader = InvertedIndexBlobReader::new(Cursor::new(blob)); + let cursor = RangeReaderAdapter(Cursor::new(blob)); + let mut blob_reader = InvertedIndexBlobReader::new(cursor); let metas = blob_reader.metadata().await.unwrap(); let meta = metas.metas.get("tag0").unwrap(); diff --git a/src/index/src/inverted_index/format/reader/footer.rs b/src/index/src/inverted_index/format/reader/footer.rs index 244973669b..ffcaf9d921 100644 --- a/src/index/src/inverted_index/format/reader/footer.rs +++ b/src/index/src/inverted_index/format/reader/footer.rs @@ -24,18 +24,18 @@ use crate::inverted_index::error::{ use crate::inverted_index::format::FOOTER_PAYLOAD_SIZE_SIZE; /// InvertedIndeFooterReader is for reading the footer section of the blob. -pub struct InvertedIndeFooterReader<'a, R> { - source: &'a mut R, +pub struct InvertedIndeFooterReader { + source: R, blob_size: u64, } -impl<'a, R> InvertedIndeFooterReader<'a, R> { - pub fn new(source: &'a mut R, blob_size: u64) -> Self { +impl InvertedIndeFooterReader { + pub fn new(source: R, blob_size: u64) -> Self { Self { source, blob_size } } } -impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> { +impl InvertedIndeFooterReader { pub async fn metadata(&mut self) -> Result { let payload_size = self.read_payload_size().await?; let metas = self.read_payload(payload_size).await?; @@ -113,6 +113,7 @@ impl<'a, R: RangeReader> InvertedIndeFooterReader<'a, R> { #[cfg(test)] mod tests { + use common_base::range_read::RangeReaderAdapter; use futures::io::Cursor; use prost::Message; @@ -142,8 +143,8 @@ mod tests { let payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut cursor = Cursor::new(payload_buf); - let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size); + let cursor = RangeReaderAdapter(Cursor::new(payload_buf)); + let mut reader = InvertedIndeFooterReader::new(cursor, blob_size); let payload_size = reader.read_payload_size().await.unwrap(); let metas = reader.read_payload(payload_size).await.unwrap(); @@ -163,8 +164,8 @@ mod tests { let mut payload_buf = create_test_payload(meta); payload_buf.push(0xff); // Add an extra byte to corrupt the footer let blob_size = payload_buf.len() as u64; - let mut cursor = Cursor::new(payload_buf); - let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size); + let cursor = RangeReaderAdapter(Cursor::new(payload_buf)); + let mut reader = InvertedIndeFooterReader::new(cursor, blob_size); let payload_size_result = reader.read_payload_size().await; assert!(payload_size_result.is_err()); @@ -181,8 +182,8 @@ mod tests { let payload_buf = create_test_payload(meta); let blob_size = payload_buf.len() as u64; - let mut cursor = Cursor::new(payload_buf); - let mut reader = InvertedIndeFooterReader::new(&mut cursor, blob_size); + let cursor = RangeReaderAdapter(Cursor::new(payload_buf)); + let mut reader = InvertedIndeFooterReader::new(cursor, blob_size); let payload_size = reader.read_payload_size().await.unwrap(); let payload_result = reader.read_payload(payload_size).await; diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index 767a7a3412..26d3fb26d9 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -99,6 +99,7 @@ impl InvertedIndexBlobWriter { #[cfg(test)] mod tests { + use common_base::range_read::RangeReaderAdapter; use futures::io::Cursor; use futures::stream; @@ -119,7 +120,7 @@ mod tests { .await .unwrap(); - let cursor = Cursor::new(blob); + let cursor = RangeReaderAdapter(Cursor::new(blob)); let mut reader = InvertedIndexBlobReader::new(cursor); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.total_row_count, 8); @@ -160,7 +161,7 @@ mod tests { .await .unwrap(); - let cursor = Cursor::new(blob); + let cursor = RangeReaderAdapter(Cursor::new(blob)); let mut reader = InvertedIndexBlobReader::new(cursor); let metadata = reader.metadata().await.unwrap(); assert_eq!(metadata.total_row_count, 8); diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index cac3ffedd7..a3482cc075 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -16,6 +16,7 @@ pub mod builder; use std::sync::Arc; +use common_base::range_read::RangeReaderAdapter; use common_telemetry::warn; use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ @@ -108,6 +109,7 @@ impl InvertedIndexApplier { self.remote_blob_reader(file_id).await? } }; + let blob = RangeReaderAdapter(blob); if let Some(index_cache) = &self.inverted_index_cache { let mut index_reader = CachedInvertedIndexBlobReader::new(