diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 48881e5f05..758b352804 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -23,13 +23,15 @@ pub(crate) mod manifest_cache; pub(crate) mod test_util; pub(crate) mod write_cache; +use std::collections::BTreeMap; use std::mem; use std::ops::Range; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use bytes::Bytes; use common_base::readable_size::ReadableSize; use common_telemetry::warn; +use dashmap::DashMap; use datatypes::arrow::record_batch::RecordBatch; use datatypes::value::Value; use datatypes::vectors::VectorRef; @@ -358,20 +360,33 @@ impl CacheStrategy { } } - /// Calls [CacheManager::get_pages()]. + /// Calls [CacheManager::get_page_ranges()]. /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. - pub fn get_pages(&self, page_key: &PageKey) -> Option> { + pub fn get_page_ranges( + &self, + file_id: FileId, + row_group_idx: usize, + ranges: &[Range], + ) -> Option { match self { - CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key), + CacheStrategy::EnableAll(cache_manager) => { + cache_manager.get_page_ranges(file_id, row_group_idx, ranges) + } CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } - /// Calls [CacheManager::put_pages()]. + /// Calls [CacheManager::put_page_ranges()]. /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. - pub fn put_pages(&self, page_key: PageKey, pages: Arc) { + pub fn put_page_ranges( + &self, + file_id: FileId, + row_group_idx: usize, + ranges: &[Range], + pages: &[Bytes], + ) { if let CacheStrategy::EnableAll(cache_manager) = self { - cache_manager.put_pages(page_key, pages); + cache_manager.put_page_ranges(file_id, row_group_idx, ranges, pages); } } @@ -551,8 +566,8 @@ pub struct CacheManager { sst_meta_cache: Option, /// Cache for vectors. vector_cache: Option, - /// Cache for SST pages. - page_cache: Option, + /// Cache for SST byte ranges. + page_cache: Option>, /// A Cache for writing files to object stores. write_cache: Option, /// Cache for inverted index. @@ -730,21 +745,35 @@ impl CacheManager { } } - /// Gets pages for the row group. - pub fn get_pages(&self, page_key: &PageKey) -> Option> { - self.page_cache.as_ref().and_then(|page_cache| { - let value = page_cache.get(page_key); - update_hit_miss(value, PAGE_TYPE) + /// Gets cached byte fragments for the requested ranges. + pub fn get_page_ranges( + &self, + file_id: FileId, + row_group_idx: usize, + ranges: &[Range], + ) -> Option { + self.page_cache.as_ref().map(|page_cache| { + let lookup = page_cache.lookup(file_id, row_group_idx, ranges); + if lookup.cached_bytes > 0 { + CACHE_HIT.with_label_values(&[PAGE_TYPE]).inc(); + } + if !lookup.missing_ranges.is_empty() { + CACHE_MISS.with_label_values(&[PAGE_TYPE]).inc(); + } + lookup }) } - /// Puts pages of the row group into the cache. - pub fn put_pages(&self, page_key: PageKey, pages: Arc) { + /// Puts byte fragments into the page cache. + pub fn put_page_ranges( + &self, + file_id: FileId, + row_group_idx: usize, + ranges: &[Range], + pages: &[Bytes], + ) { if let Some(cache) = &self.page_cache { - CACHE_BYTES - .with_label_values(&[PAGE_TYPE]) - .add(page_cache_weight(&page_key, &pages).into()); - cache.insert(page_key, pages); + cache.insert_ranges(file_id, row_group_idx, ranges, pages); } } @@ -966,15 +995,6 @@ impl CacheManagerBuilder { /// Builds the [CacheManager]. pub fn build(self) -> CacheManager { - fn to_str(cause: RemovalCause) -> &'static str { - match cause { - RemovalCause::Expired => "expired", - RemovalCause::Explicit => "explicit", - RemovalCause::Replaced => "replaced", - RemovalCause::Size => "size", - } - } - let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| { Cache::builder() .max_capacity(self.sst_meta_cache_size) @@ -985,7 +1005,7 @@ impl CacheManagerBuilder { .with_label_values(&[SST_META_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[SST_META_TYPE, to_str(cause)]) + .with_label_values(&[SST_META_TYPE, removal_cause_to_str(cause)]) .inc(); }) .build() @@ -1000,24 +1020,13 @@ impl CacheManagerBuilder { .with_label_values(&[VECTOR_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[VECTOR_TYPE, to_str(cause)]) - .inc(); - }) - .build() - }); - let page_cache = (self.page_cache_size != 0).then(|| { - Cache::builder() - .max_capacity(self.page_cache_size) - .weigher(page_cache_weight) - .eviction_listener(|k, v, cause| { - let size = page_cache_weight(&k, &v); - CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); - CACHE_EVICTION - .with_label_values(&[PAGE_TYPE, to_str(cause)]) + .with_label_values(&[VECTOR_TYPE, removal_cause_to_str(cause)]) .inc(); }) .build() }); + let page_cache = + (self.page_cache_size != 0).then(|| PageRangeCache::new(self.page_cache_size)); let inverted_index_cache = InvertedIndexCache::new( self.index_metadata_size, self.index_content_size, @@ -1046,7 +1055,7 @@ impl CacheManagerBuilder { .with_label_values(&[SELECTOR_RESULT_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)]) + .with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_to_str(cause)]) .inc(); }) .build() @@ -1061,7 +1070,7 @@ impl CacheManagerBuilder { .with_label_values(&[RANGE_RESULT_TYPE]) .sub(size.into()); CACHE_EVICTION - .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)]) + .with_label_values(&[RANGE_RESULT_TYPE, removal_cause_to_str(cause)]) .inc(); }) .build() @@ -1098,8 +1107,8 @@ fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 { (mem::size_of::() + mem::size_of::() + v.memory_size()) as u32 } -fn page_cache_weight(k: &PageKey, v: &Arc) -> u32 { - (k.estimated_size() + v.estimated_size()) as u32 +fn page_cache_weight(k: &PageFragmentKey, v: &Bytes) -> u32 { + (k.estimated_size() + mem::size_of::() + v.len()) as u32 } fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc) -> u32 { @@ -1110,6 +1119,15 @@ fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc (k.estimated_size() + v.estimated_size()) as u32 } +fn removal_cause_to_str(cause: RemovalCause) -> &'static str { + match cause { + RemovalCause::Expired => "expired", + RemovalCause::Explicit => "explicit", + RemovalCause::Replaced => "replaced", + RemovalCause::Size => "size", + } +} + /// Updates cache hit/miss metrics. fn update_hit_miss(value: Option, cache_type: &str) -> Option { if value.is_some() { @@ -1131,73 +1149,226 @@ impl SstMetaKey { } } -/// Path to column pages in the SST file. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ColumnPagePath { - /// Region id of the SST file to cache. - region_id: RegionId, - /// Id of the SST file to cache. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +struct PageFragmentGroupKey { + file_id: FileId, + row_group_idx: usize, +} + +/// Cache key for one byte fragment in an SST row group. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct PageFragmentKey { + /// Id of the SST file. file_id: FileId, /// Index of the row group. row_group_idx: usize, - /// Index of the column in the row group. - column_idx: usize, + /// Start offset of the cached byte fragment. + start: u64, + /// End offset of the cached byte fragment. + end: u64, } -/// Cache key to pages in a row group (after projection). -/// -/// Different projections will have different cache keys. -/// We cache all ranges together because they may refer to the same `Bytes`. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PageKey { - /// Id of the SST file to cache. - file_id: FileId, - /// Index of the row group. - row_group_idx: usize, - /// Byte ranges of the pages to cache. - ranges: Vec>, -} - -impl PageKey { - /// Creates a key for a list of pages. - pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec>) -> PageKey { - PageKey { +impl PageFragmentKey { + fn new(file_id: FileId, row_group_idx: usize, range: &Range) -> PageFragmentKey { + PageFragmentKey { file_id, row_group_idx, - ranges, + start: range.start, + end: range.end, + } + } + + fn group_key(&self) -> PageFragmentGroupKey { + PageFragmentGroupKey { + file_id: self.file_id, + row_group_idx: self.row_group_idx, } } /// Returns memory used by the key (estimated). fn estimated_size(&self) -> usize { - mem::size_of::() + mem::size_of_val(self.ranges.as_slice()) + mem::size_of::() } } -/// Cached row group pages for a column. -// We don't use enum here to make it easier to mock and use the struct. -#[derive(Default)] -pub struct PageValue { - /// Compressed page in the row group. - pub compressed: Vec, - /// Total size of the pages (may be larger than sum of compressed bytes due to gaps). - pub page_size: u64, +/// One cached byte fragment that overlaps a requested range. +#[derive(Clone)] +pub struct PageRangePart { + /// Range covered by `bytes`. + pub range: Range, + /// Bytes for `range`. + pub bytes: Bytes, } -impl PageValue { - /// Creates a new value from a range of compressed pages. - pub fn new(bytes: Vec, page_size: u64) -> PageValue { - PageValue { - compressed: bytes, - page_size, +/// Result of looking up request ranges in the page range cache. +pub struct PageRangeLookup { + /// Cached fragments grouped by the original requested range index. + pub cached_parts: Vec>, + /// Ranges that are not covered by cached fragments and need fetching. + pub missing_ranges: Vec>, + /// Number of cached fragments used. + pub cached_range_count: usize, + /// Number of requested bytes served from cached fragments. + pub cached_bytes: u64, +} + +impl PageRangeLookup { + pub fn is_fully_cached(&self) -> bool { + self.missing_ranges.is_empty() + } +} + +type PageFragmentRangeMap = RwLock>; +type PageFragmentIndex = DashMap>; + +/// Byte-fragment cache for Parquet row-group reads. +/// +/// Moka owns capacity and eviction. The side index only makes overlap lookup possible. +pub struct PageRangeCache { + cache: Cache, + index: Arc, +} + +impl PageRangeCache { + fn new(capacity: u64) -> Arc { + let index: Arc = Arc::new(DashMap::new()); + let eviction_index = index.clone(); + let cache = Cache::builder() + .max_capacity(capacity) + .weigher(page_cache_weight) + .eviction_listener(move |k: Arc, v: Bytes, cause| { + let key = *k; + let size = page_cache_weight(&key, &v); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); + CACHE_EVICTION + .with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)]) + .inc(); + + if let Some(group) = eviction_index.get(&key.group_key()) { + group.write().unwrap().remove(&(key.start, key.end)); + } + }) + .build(); + + Arc::new(PageRangeCache { cache, index }) + } + + fn lookup( + &self, + file_id: FileId, + row_group_idx: usize, + ranges: &[Range], + ) -> PageRangeLookup { + let mut cached_parts = Vec::with_capacity(ranges.len()); + let mut missing_ranges = Vec::new(); + let mut cached_range_count = 0; + let mut cached_bytes = 0; + let group_key = PageFragmentGroupKey { + file_id, + row_group_idx, + }; + + let group = self.index.get(&group_key); + for range in ranges { + if range.start >= range.end { + cached_parts.push(Vec::new()); + continue; + } + + let mut parts = Vec::new(); + if let Some(group) = group.as_ref() { + let index = group.read().unwrap(); + // A simple first-stage interval lookup: inspect fragments whose start is before + // the requested end and keep those that overlap the requested range. + for (_, fragment_key) in index.range(..(range.end, 0)) { + if fragment_key.end <= range.start { + continue; + } + + if let Some(bytes) = self.cache.get(fragment_key) { + let start = range.start.max(fragment_key.start); + let end = range.end.min(fragment_key.end); + if start < end { + let slice_start = (start - fragment_key.start) as usize; + let slice_end = (end - fragment_key.start) as usize; + parts.push(PageRangePart { + range: start..end, + bytes: bytes.slice(slice_start..slice_end), + }); + } + } + } + } + + parts.sort_unstable_by_key(|part| part.range.start); + let mut cursor = range.start; + let mut compacted_parts: Vec = Vec::with_capacity(parts.len()); + for part in parts { + if part.range.end <= cursor { + continue; + } + + let part = if part.range.start < cursor { + let offset = (cursor - part.range.start) as usize; + PageRangePart { + range: cursor..part.range.end, + bytes: part.bytes.slice(offset..), + } + } else { + part + }; + + if cursor < part.range.start { + missing_ranges.push(cursor..part.range.start); + } + cached_bytes += part.range.end - part.range.start; + cached_range_count += 1; + cursor = part.range.end; + compacted_parts.push(part); + + if cursor >= range.end { + break; + } + } + + if cursor < range.end { + missing_ranges.push(cursor..range.end); + } + cached_parts.push(compacted_parts); + } + + PageRangeLookup { + cached_parts, + missing_ranges, + cached_range_count, + cached_bytes, } } - /// Returns memory used by the value (estimated). - fn estimated_size(&self) -> usize { - mem::size_of::() - + self.page_size as usize - + self.compressed.iter().map(mem::size_of_val).sum::() + fn insert_ranges( + &self, + file_id: FileId, + row_group_idx: usize, + ranges: &[Range], + pages: &[Bytes], + ) { + for (range, bytes) in ranges.iter().zip(pages) { + if range.start >= range.end || bytes.len() as u64 != range.end - range.start { + continue; + } + + let key = PageFragmentKey::new(file_id, row_group_idx, range); + let size = page_cache_weight(&key, bytes); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into()); + self.cache.insert(key, bytes.clone()); + + let group = self + .index + .entry(key.group_key()) + .or_insert_with(|| Arc::new(PageFragmentRangeMap::new(BTreeMap::new()))) + .clone(); + group.write().unwrap().insert((key.start, key.end), key); + } } } @@ -1265,8 +1436,6 @@ type SstMetaCache = Cache>; /// /// e.g. `"hello" => ["hello", "hello", "hello"]` type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>; -/// Maps (region, file, row group, column) to [PageValue]. -type PageCache = Cache>; /// Maps (file id, row group id, time series row selector) to [SelectorResultValue]. type SelectorResultCache = Cache>; /// Maps partition-range scan key to cached flat batches. @@ -1324,10 +1493,17 @@ mod tests { .is_none() ); - let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]); - let pages = Arc::new(PageValue::default()); - cache.put_pages(key.clone(), pages); - assert!(cache.get_pages(&key).is_none()); + cache.put_page_ranges( + file_id.file_id(), + 1, + &[Range { start: 0, end: 5 }], + &[Bytes::from_static(b"abcde")], + ); + assert!( + cache + .get_page_ranges(file_id.file_id(), 1, &[Range { start: 0, end: 5 }]) + .is_none() + ); assert!(cache.write_cache().is_none()); } @@ -1428,11 +1604,41 @@ mod tests { fn test_page_cache() { let cache = CacheManager::builder().page_cache_size(1000).build(); let file_id = FileId::random(); - let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]); - assert!(cache.get_pages(&key).is_none()); - let pages = Arc::new(PageValue::default()); - cache.put_pages(key.clone(), pages); - assert!(cache.get_pages(&key).is_some()); + let uncached = 0..10; + assert_eq!( + vec![0..10], + cache + .get_page_ranges(file_id, 0, std::slice::from_ref(&uncached)) + .unwrap() + .missing_ranges + ); + + let cached = 100..500; + cache.put_page_ranges( + file_id, + 0, + std::slice::from_ref(&cached), + &[Bytes::from(vec![7; 400])], + ); + + let subrange = 200..300; + let lookup = cache + .get_page_ranges(file_id, 0, std::slice::from_ref(&subrange)) + .unwrap(); + assert!(lookup.is_fully_cached()); + assert_eq!(100, lookup.cached_bytes); + assert_eq!(1, lookup.cached_parts.len()); + assert_eq!(200..300, lookup.cached_parts[0][0].range); + assert_eq!(100, lookup.cached_parts[0][0].bytes.len()); + + let overlapping = 400..600; + let lookup = cache + .get_page_ranges(file_id, 0, std::slice::from_ref(&overlapping)) + .unwrap(); + assert!(!lookup.is_fully_cached()); + assert_eq!(100, lookup.cached_bytes); + assert_eq!(vec![500..600], lookup.missing_ranges); + assert_eq!(400..500, lookup.cached_parts[0][0].range); } #[test] diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index a3fe32cbe3..88f4162b7d 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -139,7 +139,7 @@ mod tests { use super::*; use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType}; use crate::cache::test_util::assert_parquet_metadata_equal; - use crate::cache::{CacheManager, CacheStrategy, PageKey}; + use crate::cache::{CacheManager, CacheStrategy}; use crate::config::IndexConfig; use crate::read::FlatSource; use crate::region::options::{IndexOptions, InvertedIndexOptions}; @@ -339,11 +339,20 @@ mod tests { // Cache 4 row groups. for i in 0..4 { - let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i)); - assert!(cache.get_pages(&page_key).is_some()); + let lookup = cache + .get_page_ranges(handle.file_id().file_id(), i, &get_ranges(i)) + .unwrap(); + assert!(lookup.is_fully_cached()); } - let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]); - assert!(cache.get_pages(&page_key).is_none()); + let missing_range = 0..10; + let lookup = cache + .get_page_ranges( + handle.file_id().file_id(), + 5, + std::slice::from_ref(&missing_range), + ) + .unwrap(); + assert_eq!(vec![0..10], lookup.missing_ranges); } #[tokio::test] diff --git a/src/mito2/src/sst/parquet/push_decoder.rs b/src/mito2/src/sst/parquet/push_decoder.rs index c44813a5ab..2b166dfab1 100644 --- a/src/mito2/src/sst/parquet/push_decoder.rs +++ b/src/mito2/src/sst/parquet/push_decoder.rs @@ -15,9 +15,8 @@ //! Push decoder stream implementation for SST parquet files. use std::ops::Range; -use std::sync::Arc; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use datatypes::arrow::record_batch::RecordBatch; use futures::StreamExt; use futures::stream::BoxStream; @@ -26,11 +25,11 @@ use parquet::DecodeResult; use parquet::arrow::ProjectionMask; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection}; use parquet::arrow::push_decoder::ParquetPushDecoderBuilder; -use snafu::ResultExt; +use snafu::{ResultExt, ensure}; use crate::cache::file_cache::{FileType, IndexKey}; -use crate::cache::{CacheStrategy, PageKey, PageValue}; -use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result}; +use crate::cache::{CacheStrategy, PageRangePart}; +use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result, UnexpectedSnafu}; use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::sst::file::RegionFileId; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; @@ -85,30 +84,44 @@ impl SstParquetRangeFetcher { .map(|_| std::time::Instant::now()); let _timer = READ_STAGE_FETCH_PAGES.start_timer(); - let page_key = PageKey::new( + let mut page_lookup = self.cache_strategy.get_page_ranges( self.region_file_id.file_id(), self.row_group_idx, - ranges.clone(), + &ranges, ); - - // Check page cache first. - if let Some(pages) = self.cache_strategy.get_pages(&page_key) { - if let Some(metrics) = &self.fetch_metrics { - let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum(); - let mut metrics_data = metrics.data.lock().unwrap(); - metrics_data.page_cache_hit += 1; - metrics_data.pages_to_fetch_mem += ranges.len(); - metrics_data.page_size_to_fetch_mem += total_size; - metrics_data.page_size_needed += total_size; - if let Some(start) = fetch_start { - metrics_data.total_fetch_elapsed += start.elapsed(); - } - } - return Ok(pages.compressed.clone()); + if let Some(lookup) = &page_lookup + && lookup.cached_bytes > 0 + && let Some(metrics) = &self.fetch_metrics + { + let mut metrics_data = metrics.data.lock().unwrap(); + metrics_data.page_cache_hit += 1; + metrics_data.pages_to_fetch_mem += lookup.cached_range_count; + metrics_data.page_size_to_fetch_mem += lookup.cached_bytes; + metrics_data.page_size_needed += lookup.cached_bytes; } + // Fast path: all requested ranges can be assembled from cached fragments. + if page_lookup + .as_ref() + .map(|lookup| lookup.is_fully_cached()) + .unwrap_or(false) + { + let lookup = page_lookup.take().unwrap(); + if let Some(metrics) = &self.fetch_metrics + && let Some(start) = fetch_start + { + metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed(); + } + return assemble_ranges(&ranges, lookup.cached_parts, &[]); + } + + let missing_ranges = page_lookup + .as_ref() + .map(|lookup| lookup.missing_ranges.clone()) + .unwrap_or_else(|| ranges.clone()); + // Calculate total range size for metrics. - let (total_range_size, unaligned_size) = compute_total_range_size(&ranges); + let (_, unaligned_size) = compute_total_range_size(&missing_ranges); // Check write cache. let key = IndexKey::new( @@ -121,21 +134,22 @@ impl SstParquetRangeFetcher { .as_ref() .map(|_| std::time::Instant::now()); let write_cache_result = match self.cache_strategy.write_cache() { - Some(cache) => cache.file_cache().read_ranges(key, &ranges).await, + Some(cache) => cache.file_cache().read_ranges(key, &missing_ranges).await, None => None, }; - let pages = match write_cache_result { + let fetched_pages = match write_cache_result { Some(data) => { if let Some(metrics) = &self.fetch_metrics { let elapsed = fetch_write_cache_start .map(|start| start.elapsed()) .unwrap_or_default(); - let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let range_size_needed: u64 = + missing_ranges.iter().map(|r| r.end - r.start).sum(); let mut metrics_data = metrics.data.lock().unwrap(); metrics_data.write_cache_fetch_elapsed += elapsed; metrics_data.write_cache_hit += 1; - metrics_data.pages_to_fetch_write_cache += ranges.len(); + metrics_data.pages_to_fetch_write_cache += missing_ranges.len(); metrics_data.page_size_to_fetch_write_cache += unaligned_size; metrics_data.page_size_needed += range_size_needed; } @@ -151,17 +165,19 @@ impl SstParquetRangeFetcher { .fetch_metrics .as_ref() .map(|_| std::time::Instant::now()); - let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges) - .await - .context(OpenDalSnafu)?; + let data = + fetch_byte_ranges(&self.file_path, self.object_store.clone(), &missing_ranges) + .await + .context(OpenDalSnafu)?; if let Some(metrics) = &self.fetch_metrics { let elapsed = start.map(|start| start.elapsed()).unwrap_or_default(); - let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + let range_size_needed: u64 = + missing_ranges.iter().map(|r| r.end - r.start).sum(); let mut metrics_data = metrics.data.lock().unwrap(); metrics_data.store_fetch_elapsed += elapsed; metrics_data.cache_miss += 1; - metrics_data.pages_to_fetch_store += ranges.len(); + metrics_data.pages_to_fetch_store += missing_ranges.len(); metrics_data.page_size_to_fetch_store += unaligned_size; metrics_data.page_size_needed += range_size_needed; } @@ -169,19 +185,122 @@ impl SstParquetRangeFetcher { } }; - // Put pages back to the cache. - let page_value = PageValue::new(pages.clone(), total_range_size); - self.cache_strategy - .put_pages(page_key, Arc::new(page_value)); + self.cache_strategy.put_page_ranges( + self.region_file_id.file_id(), + self.row_group_idx, + &missing_ranges, + &fetched_pages, + ); if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) { metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed(); } - Ok(pages) + if let Some(lookup) = page_lookup { + let fetched_parts = missing_ranges + .into_iter() + .zip(fetched_pages) + .map(|(range, bytes)| PageRangePart { range, bytes }) + .collect::>(); + return assemble_ranges(&ranges, lookup.cached_parts, &fetched_parts); + } + + Ok(fetched_pages) } } +fn assemble_ranges( + ranges: &[Range], + cached_parts: Vec>, + fetched_parts: &[PageRangePart], +) -> Result> { + ensure!( + ranges.len() == cached_parts.len(), + UnexpectedSnafu { + reason: format!( + "Invalid parquet range assembly: {} requested ranges but {} cached part groups", + ranges.len(), + cached_parts.len() + ), + } + ); + + ranges + .iter() + .zip(cached_parts) + .map(|(range, mut parts)| { + parts.extend( + fetched_parts + .iter() + .filter_map(|part| overlapping_part(range, part)), + ); + assemble_range(range, parts) + }) + .collect() +} + +fn overlapping_part(range: &Range, part: &PageRangePart) -> Option { + let start = range.start.max(part.range.start); + let end = range.end.min(part.range.end); + if start >= end { + return None; + } + + let slice_start = (start - part.range.start) as usize; + let slice_end = (end - part.range.start) as usize; + Some(PageRangePart { + range: start..end, + bytes: part.bytes.slice(slice_start..slice_end), + }) +} + +fn assemble_range(range: &Range, mut parts: Vec) -> Result { + if range.start >= range.end { + return Ok(Bytes::new()); + } + + parts.sort_unstable_by_key(|part| part.range.start); + if parts.len() == 1 && parts[0].range == *range { + return Ok(parts.pop().unwrap().bytes); + } + + let mut cursor = range.start; + let mut output = BytesMut::with_capacity((range.end - range.start) as usize); + for part in parts { + ensure!( + part.range.start <= cursor, + UnexpectedSnafu { + reason: format!( + "Missing cached parquet bytes for range {}..{}, next part starts at {}", + range.start, range.end, part.range.start + ), + } + ); + if part.range.end <= cursor { + continue; + } + + let slice_start = (cursor - part.range.start) as usize; + output.extend_from_slice(&part.bytes.slice(slice_start..)); + cursor = part.range.end.min(range.end); + if cursor >= range.end { + break; + } + } + + ensure!( + cursor == range.end, + UnexpectedSnafu { + reason: format!( + "Missing cached parquet bytes for range {}..{}, assembled through {}", + range.start, range.end, cursor + ), + } + ); + + Ok(output.freeze()) +} + /// Builds a parquet record batch stream driven directly by [ParquetPushDecoderBuilder]. pub(crate) fn build_sst_parquet_record_batch_stream( arrow_metadata: ArrowReaderMetadata, @@ -220,3 +339,44 @@ pub(crate) fn build_sst_parquet_record_batch_stream( } .boxed()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_assemble_range_from_cached_subrange_and_fetched_tail() { + let cached_parts = vec![vec![PageRangePart { + range: 400..500, + bytes: Bytes::from(vec![1; 100]), + }]]; + let fetched_parts = vec![PageRangePart { + range: 500..600, + bytes: Bytes::from(vec![2; 100]), + }]; + + let requested = 400..600; + let output = assemble_ranges( + std::slice::from_ref(&requested), + cached_parts, + &fetched_parts, + ) + .unwrap(); + assert_eq!(1, output.len()); + assert_eq!(vec![1; 100].as_slice(), &output[0][..100]); + assert_eq!(vec![2; 100].as_slice(), &output[0][100..]); + } + + #[test] + fn test_assemble_range_returns_single_covering_part_without_copy() { + let bytes = Bytes::from_static(b"abcdef"); + let cached_parts = vec![vec![PageRangePart { + range: 10..16, + bytes: bytes.clone(), + }]]; + + let requested = 10..16; + let output = assemble_ranges(std::slice::from_ref(&requested), cached_parts, &[]).unwrap(); + assert_eq!(bytes, output[0]); + } +}