feat: loop up cache with range calculation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-05-16 11:14:02 +08:00
parent 3cbd15c1a9
commit 8eebf15ea7
3 changed files with 523 additions and 148 deletions

View File

@@ -23,13 +23,15 @@ pub(crate) mod manifest_cache;
pub(crate) mod test_util;
pub(crate) mod write_cache;
use std::collections::BTreeMap;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use dashmap::DashMap;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
@@ -358,20 +360,33 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::get_pages()].
/// Calls [CacheManager::get_page_ranges()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
pub fn get_page_ranges(
&self,
file_id: FileId,
row_group_idx: usize,
ranges: &[Range<u64>],
) -> Option<PageRangeLookup> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_page_ranges(file_id, row_group_idx, ranges)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_pages()].
/// Calls [CacheManager::put_page_ranges()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
pub fn put_page_ranges(
&self,
file_id: FileId,
row_group_idx: usize,
ranges: &[Range<u64>],
pages: &[Bytes],
) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_pages(page_key, pages);
cache_manager.put_page_ranges(file_id, row_group_idx, ranges, pages);
}
}
@@ -551,8 +566,8 @@ pub struct CacheManager {
sst_meta_cache: Option<SstMetaCache>,
/// Cache for vectors.
vector_cache: Option<VectorCache>,
/// Cache for SST pages.
page_cache: Option<PageCache>,
/// Cache for SST byte ranges.
page_cache: Option<Arc<PageRangeCache>>,
/// A Cache for writing files to object stores.
write_cache: Option<WriteCacheRef>,
/// Cache for inverted index.
@@ -730,21 +745,35 @@ impl CacheManager {
}
}
/// Gets pages for the row group.
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
self.page_cache.as_ref().and_then(|page_cache| {
let value = page_cache.get(page_key);
update_hit_miss(value, PAGE_TYPE)
/// Gets cached byte fragments for the requested ranges.
pub fn get_page_ranges(
&self,
file_id: FileId,
row_group_idx: usize,
ranges: &[Range<u64>],
) -> Option<PageRangeLookup> {
self.page_cache.as_ref().map(|page_cache| {
let lookup = page_cache.lookup(file_id, row_group_idx, ranges);
if lookup.cached_bytes > 0 {
CACHE_HIT.with_label_values(&[PAGE_TYPE]).inc();
}
if !lookup.missing_ranges.is_empty() {
CACHE_MISS.with_label_values(&[PAGE_TYPE]).inc();
}
lookup
})
}
/// Puts pages of the row group into the cache.
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
/// Puts byte fragments into the page cache.
pub fn put_page_ranges(
&self,
file_id: FileId,
row_group_idx: usize,
ranges: &[Range<u64>],
pages: &[Bytes],
) {
if let Some(cache) = &self.page_cache {
CACHE_BYTES
.with_label_values(&[PAGE_TYPE])
.add(page_cache_weight(&page_key, &pages).into());
cache.insert(page_key, pages);
cache.insert_ranges(file_id, row_group_idx, ranges, pages);
}
}
@@ -966,15 +995,6 @@ impl CacheManagerBuilder {
/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
fn to_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.sst_meta_cache_size)
@@ -985,7 +1005,7 @@ impl CacheManagerBuilder {
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[SST_META_TYPE, to_str(cause)])
.with_label_values(&[SST_META_TYPE, removal_cause_to_str(cause)])
.inc();
})
.build()
@@ -1000,24 +1020,13 @@ impl CacheManagerBuilder {
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[VECTOR_TYPE, to_str(cause)])
.inc();
})
.build()
});
let page_cache = (self.page_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.page_cache_size)
.weigher(page_cache_weight)
.eviction_listener(|k, v, cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
.with_label_values(&[PAGE_TYPE, to_str(cause)])
.with_label_values(&[VECTOR_TYPE, removal_cause_to_str(cause)])
.inc();
})
.build()
});
let page_cache =
(self.page_cache_size != 0).then(|| PageRangeCache::new(self.page_cache_size));
let inverted_index_cache = InvertedIndexCache::new(
self.index_metadata_size,
self.index_content_size,
@@ -1046,7 +1055,7 @@ impl CacheManagerBuilder {
.with_label_values(&[SELECTOR_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
.with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_to_str(cause)])
.inc();
})
.build()
@@ -1061,7 +1070,7 @@ impl CacheManagerBuilder {
.with_label_values(&[RANGE_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
.with_label_values(&[RANGE_RESULT_TYPE, removal_cause_to_str(cause)])
.inc();
})
.build()
@@ -1098,8 +1107,8 @@ fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
(mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
}
fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
fn page_cache_weight(k: &PageFragmentKey, v: &Bytes) -> u32 {
(k.estimated_size() + mem::size_of::<Bytes>() + v.len()) as u32
}
fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
@@ -1110,6 +1119,15 @@ fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>
(k.estimated_size() + v.estimated_size()) as u32
}
fn removal_cause_to_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
/// Updates cache hit/miss metrics.
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
@@ -1131,73 +1149,226 @@ impl SstMetaKey {
}
}
/// Path to column pages in the SST file.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnPagePath {
/// Region id of the SST file to cache.
region_id: RegionId,
/// Id of the SST file to cache.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct PageFragmentGroupKey {
file_id: FileId,
row_group_idx: usize,
}
/// Cache key for one byte fragment in an SST row group.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PageFragmentKey {
/// Id of the SST file.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Index of the column in the row group.
column_idx: usize,
/// Start offset of the cached byte fragment.
start: u64,
/// End offset of the cached byte fragment.
end: u64,
}
/// Cache key to pages in a row group (after projection).
///
/// Different projections will have different cache keys.
/// We cache all ranges together because they may refer to the same `Bytes`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageKey {
/// Id of the SST file to cache.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Byte ranges of the pages to cache.
ranges: Vec<Range<u64>>,
}
impl PageKey {
/// Creates a key for a list of pages.
pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
PageKey {
impl PageFragmentKey {
fn new(file_id: FileId, row_group_idx: usize, range: &Range<u64>) -> PageFragmentKey {
PageFragmentKey {
file_id,
row_group_idx,
ranges,
start: range.start,
end: range.end,
}
}
fn group_key(&self) -> PageFragmentGroupKey {
PageFragmentGroupKey {
file_id: self.file_id,
row_group_idx: self.row_group_idx,
}
}
/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
mem::size_of::<Self>()
}
}
/// Cached row group pages for a column.
// We don't use enum here to make it easier to mock and use the struct.
#[derive(Default)]
pub struct PageValue {
/// Compressed page in the row group.
pub compressed: Vec<Bytes>,
/// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
pub page_size: u64,
/// One cached byte fragment that overlaps a requested range.
#[derive(Clone)]
pub struct PageRangePart {
/// Range covered by `bytes`.
pub range: Range<u64>,
/// Bytes for `range`.
pub bytes: Bytes,
}
impl PageValue {
/// Creates a new value from a range of compressed pages.
pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
PageValue {
compressed: bytes,
page_size,
/// Result of looking up request ranges in the page range cache.
pub struct PageRangeLookup {
/// Cached fragments grouped by the original requested range index.
pub cached_parts: Vec<Vec<PageRangePart>>,
/// Ranges that are not covered by cached fragments and need fetching.
pub missing_ranges: Vec<Range<u64>>,
/// Number of cached fragments used.
pub cached_range_count: usize,
/// Number of requested bytes served from cached fragments.
pub cached_bytes: u64,
}
impl PageRangeLookup {
pub fn is_fully_cached(&self) -> bool {
self.missing_ranges.is_empty()
}
}
type PageFragmentRangeMap = RwLock<BTreeMap<(u64, u64), PageFragmentKey>>;
type PageFragmentIndex = DashMap<PageFragmentGroupKey, Arc<PageFragmentRangeMap>>;
/// Byte-fragment cache for Parquet row-group reads.
///
/// Moka owns capacity and eviction. The side index only makes overlap lookup possible.
pub struct PageRangeCache {
cache: Cache<PageFragmentKey, Bytes>,
index: Arc<PageFragmentIndex>,
}
impl PageRangeCache {
fn new(capacity: u64) -> Arc<PageRangeCache> {
let index: Arc<PageFragmentIndex> = Arc::new(DashMap::new());
let eviction_index = index.clone();
let cache = Cache::builder()
.max_capacity(capacity)
.weigher(page_cache_weight)
.eviction_listener(move |k: Arc<PageFragmentKey>, v: Bytes, cause| {
let key = *k;
let size = page_cache_weight(&key, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
.with_label_values(&[PAGE_TYPE, removal_cause_to_str(cause)])
.inc();
if let Some(group) = eviction_index.get(&key.group_key()) {
group.write().unwrap().remove(&(key.start, key.end));
}
})
.build();
Arc::new(PageRangeCache { cache, index })
}
fn lookup(
&self,
file_id: FileId,
row_group_idx: usize,
ranges: &[Range<u64>],
) -> PageRangeLookup {
let mut cached_parts = Vec::with_capacity(ranges.len());
let mut missing_ranges = Vec::new();
let mut cached_range_count = 0;
let mut cached_bytes = 0;
let group_key = PageFragmentGroupKey {
file_id,
row_group_idx,
};
let group = self.index.get(&group_key);
for range in ranges {
if range.start >= range.end {
cached_parts.push(Vec::new());
continue;
}
let mut parts = Vec::new();
if let Some(group) = group.as_ref() {
let index = group.read().unwrap();
// A simple first-stage interval lookup: inspect fragments whose start is before
// the requested end and keep those that overlap the requested range.
for (_, fragment_key) in index.range(..(range.end, 0)) {
if fragment_key.end <= range.start {
continue;
}
if let Some(bytes) = self.cache.get(fragment_key) {
let start = range.start.max(fragment_key.start);
let end = range.end.min(fragment_key.end);
if start < end {
let slice_start = (start - fragment_key.start) as usize;
let slice_end = (end - fragment_key.start) as usize;
parts.push(PageRangePart {
range: start..end,
bytes: bytes.slice(slice_start..slice_end),
});
}
}
}
}
parts.sort_unstable_by_key(|part| part.range.start);
let mut cursor = range.start;
let mut compacted_parts: Vec<PageRangePart> = Vec::with_capacity(parts.len());
for part in parts {
if part.range.end <= cursor {
continue;
}
let part = if part.range.start < cursor {
let offset = (cursor - part.range.start) as usize;
PageRangePart {
range: cursor..part.range.end,
bytes: part.bytes.slice(offset..),
}
} else {
part
};
if cursor < part.range.start {
missing_ranges.push(cursor..part.range.start);
}
cached_bytes += part.range.end - part.range.start;
cached_range_count += 1;
cursor = part.range.end;
compacted_parts.push(part);
if cursor >= range.end {
break;
}
}
if cursor < range.end {
missing_ranges.push(cursor..range.end);
}
cached_parts.push(compacted_parts);
}
PageRangeLookup {
cached_parts,
missing_ranges,
cached_range_count,
cached_bytes,
}
}
/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
+ self.page_size as usize
+ self.compressed.iter().map(mem::size_of_val).sum::<usize>()
fn insert_ranges(
&self,
file_id: FileId,
row_group_idx: usize,
ranges: &[Range<u64>],
pages: &[Bytes],
) {
for (range, bytes) in ranges.iter().zip(pages) {
if range.start >= range.end || bytes.len() as u64 != range.end - range.start {
continue;
}
let key = PageFragmentKey::new(file_id, row_group_idx, range);
let size = page_cache_weight(&key, bytes);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into());
self.cache.insert(key, bytes.clone());
let group = self
.index
.entry(key.group_key())
.or_insert_with(|| Arc::new(PageFragmentRangeMap::new(BTreeMap::new())))
.clone();
group.write().unwrap().insert((key.start, key.end), key);
}
}
}
@@ -1265,8 +1436,6 @@ type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
///
/// e.g. `"hello" => ["hello", "hello", "hello"]`
type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
/// Maps (region, file, row group, column) to [PageValue].
type PageCache = Cache<PageKey, Arc<PageValue>>;
/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
/// Maps partition-range scan key to cached flat batches.
@@ -1324,10 +1493,17 @@ mod tests {
.is_none()
);
let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());
cache.put_page_ranges(
file_id.file_id(),
1,
&[Range { start: 0, end: 5 }],
&[Bytes::from_static(b"abcde")],
);
assert!(
cache
.get_page_ranges(file_id.file_id(), 1, &[Range { start: 0, end: 5 }])
.is_none()
);
assert!(cache.write_cache().is_none());
}
@@ -1428,11 +1604,41 @@ mod tests {
fn test_page_cache() {
let cache = CacheManager::builder().page_cache_size(1000).build();
let file_id = FileId::random();
let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
let uncached = 0..10;
assert_eq!(
vec![0..10],
cache
.get_page_ranges(file_id, 0, std::slice::from_ref(&uncached))
.unwrap()
.missing_ranges
);
let cached = 100..500;
cache.put_page_ranges(
file_id,
0,
std::slice::from_ref(&cached),
&[Bytes::from(vec![7; 400])],
);
let subrange = 200..300;
let lookup = cache
.get_page_ranges(file_id, 0, std::slice::from_ref(&subrange))
.unwrap();
assert!(lookup.is_fully_cached());
assert_eq!(100, lookup.cached_bytes);
assert_eq!(1, lookup.cached_parts.len());
assert_eq!(200..300, lookup.cached_parts[0][0].range);
assert_eq!(100, lookup.cached_parts[0][0].bytes.len());
let overlapping = 400..600;
let lookup = cache
.get_page_ranges(file_id, 0, std::slice::from_ref(&overlapping))
.unwrap();
assert!(!lookup.is_fully_cached());
assert_eq!(100, lookup.cached_bytes);
assert_eq!(vec![500..600], lookup.missing_ranges);
assert_eq!(400..500, lookup.cached_parts[0][0].range);
}
#[test]

View File

@@ -139,7 +139,7 @@ mod tests {
use super::*;
use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
use crate::cache::test_util::assert_parquet_metadata_equal;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::cache::{CacheManager, CacheStrategy};
use crate::config::IndexConfig;
use crate::read::FlatSource;
use crate::region::options::{IndexOptions, InvertedIndexOptions};
@@ -339,11 +339,20 @@ mod tests {
// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey::new(handle.file_id().file_id(), i, get_ranges(i));
assert!(cache.get_pages(&page_key).is_some());
let lookup = cache
.get_page_ranges(handle.file_id().file_id(), i, &get_ranges(i))
.unwrap();
assert!(lookup.is_fully_cached());
}
let page_key = PageKey::new(handle.file_id().file_id(), 5, vec![]);
assert!(cache.get_pages(&page_key).is_none());
let missing_range = 0..10;
let lookup = cache
.get_page_ranges(
handle.file_id().file_id(),
5,
std::slice::from_ref(&missing_range),
)
.unwrap();
assert_eq!(vec![0..10], lookup.missing_ranges);
}
#[tokio::test]

View File

@@ -15,9 +15,8 @@
//! Push decoder stream implementation for SST parquet files.
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use datatypes::arrow::record_batch::RecordBatch;
use futures::StreamExt;
use futures::stream::BoxStream;
@@ -26,11 +25,11 @@ use parquet::DecodeResult;
use parquet::arrow::ProjectionMask;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, RowSelection};
use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
use snafu::ResultExt;
use snafu::{ResultExt, ensure};
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result};
use crate::cache::{CacheStrategy, PageRangePart};
use crate::error::{OpenDalSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::file::RegionFileId;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
@@ -85,30 +84,44 @@ impl SstParquetRangeFetcher {
.map(|_| std::time::Instant::now());
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(
let mut page_lookup = self.cache_strategy.get_page_ranges(
self.region_file_id.file_id(),
self.row_group_idx,
ranges.clone(),
&ranges,
);
// Check page cache first.
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
if let Some(metrics) = &self.fetch_metrics {
let total_size: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.page_cache_hit += 1;
metrics_data.pages_to_fetch_mem += ranges.len();
metrics_data.page_size_to_fetch_mem += total_size;
metrics_data.page_size_needed += total_size;
if let Some(start) = fetch_start {
metrics_data.total_fetch_elapsed += start.elapsed();
}
}
return Ok(pages.compressed.clone());
if let Some(lookup) = &page_lookup
&& lookup.cached_bytes > 0
&& let Some(metrics) = &self.fetch_metrics
{
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.page_cache_hit += 1;
metrics_data.pages_to_fetch_mem += lookup.cached_range_count;
metrics_data.page_size_to_fetch_mem += lookup.cached_bytes;
metrics_data.page_size_needed += lookup.cached_bytes;
}
// Fast path: all requested ranges can be assembled from cached fragments.
if page_lookup
.as_ref()
.map(|lookup| lookup.is_fully_cached())
.unwrap_or(false)
{
let lookup = page_lookup.take().unwrap();
if let Some(metrics) = &self.fetch_metrics
&& let Some(start) = fetch_start
{
metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
}
return assemble_ranges(&ranges, lookup.cached_parts, &[]);
}
let missing_ranges = page_lookup
.as_ref()
.map(|lookup| lookup.missing_ranges.clone())
.unwrap_or_else(|| ranges.clone());
// Calculate total range size for metrics.
let (total_range_size, unaligned_size) = compute_total_range_size(&ranges);
let (_, unaligned_size) = compute_total_range_size(&missing_ranges);
// Check write cache.
let key = IndexKey::new(
@@ -121,21 +134,22 @@ impl SstParquetRangeFetcher {
.as_ref()
.map(|_| std::time::Instant::now());
let write_cache_result = match self.cache_strategy.write_cache() {
Some(cache) => cache.file_cache().read_ranges(key, &ranges).await,
Some(cache) => cache.file_cache().read_ranges(key, &missing_ranges).await,
None => None,
};
let pages = match write_cache_result {
let fetched_pages = match write_cache_result {
Some(data) => {
if let Some(metrics) = &self.fetch_metrics {
let elapsed = fetch_write_cache_start
.map(|start| start.elapsed())
.unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let range_size_needed: u64 =
missing_ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.write_cache_fetch_elapsed += elapsed;
metrics_data.write_cache_hit += 1;
metrics_data.pages_to_fetch_write_cache += ranges.len();
metrics_data.pages_to_fetch_write_cache += missing_ranges.len();
metrics_data.page_size_to_fetch_write_cache += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
@@ -151,17 +165,19 @@ impl SstParquetRangeFetcher {
.fetch_metrics
.as_ref()
.map(|_| std::time::Instant::now());
let data = fetch_byte_ranges(&self.file_path, self.object_store.clone(), &ranges)
.await
.context(OpenDalSnafu)?;
let data =
fetch_byte_ranges(&self.file_path, self.object_store.clone(), &missing_ranges)
.await
.context(OpenDalSnafu)?;
if let Some(metrics) = &self.fetch_metrics {
let elapsed = start.map(|start| start.elapsed()).unwrap_or_default();
let range_size_needed: u64 = ranges.iter().map(|r| r.end - r.start).sum();
let range_size_needed: u64 =
missing_ranges.iter().map(|r| r.end - r.start).sum();
let mut metrics_data = metrics.data.lock().unwrap();
metrics_data.store_fetch_elapsed += elapsed;
metrics_data.cache_miss += 1;
metrics_data.pages_to_fetch_store += ranges.len();
metrics_data.pages_to_fetch_store += missing_ranges.len();
metrics_data.page_size_to_fetch_store += unaligned_size;
metrics_data.page_size_needed += range_size_needed;
}
@@ -169,19 +185,122 @@ impl SstParquetRangeFetcher {
}
};
// Put pages back to the cache.
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));
self.cache_strategy.put_page_ranges(
self.region_file_id.file_id(),
self.row_group_idx,
&missing_ranges,
&fetched_pages,
);
if let (Some(metrics), Some(start)) = (&self.fetch_metrics, fetch_start) {
metrics.data.lock().unwrap().total_fetch_elapsed += start.elapsed();
}
Ok(pages)
if let Some(lookup) = page_lookup {
let fetched_parts = missing_ranges
.into_iter()
.zip(fetched_pages)
.map(|(range, bytes)| PageRangePart { range, bytes })
.collect::<Vec<_>>();
return assemble_ranges(&ranges, lookup.cached_parts, &fetched_parts);
}
Ok(fetched_pages)
}
}
fn assemble_ranges(
ranges: &[Range<u64>],
cached_parts: Vec<Vec<PageRangePart>>,
fetched_parts: &[PageRangePart],
) -> Result<Vec<Bytes>> {
ensure!(
ranges.len() == cached_parts.len(),
UnexpectedSnafu {
reason: format!(
"Invalid parquet range assembly: {} requested ranges but {} cached part groups",
ranges.len(),
cached_parts.len()
),
}
);
ranges
.iter()
.zip(cached_parts)
.map(|(range, mut parts)| {
parts.extend(
fetched_parts
.iter()
.filter_map(|part| overlapping_part(range, part)),
);
assemble_range(range, parts)
})
.collect()
}
fn overlapping_part(range: &Range<u64>, part: &PageRangePart) -> Option<PageRangePart> {
let start = range.start.max(part.range.start);
let end = range.end.min(part.range.end);
if start >= end {
return None;
}
let slice_start = (start - part.range.start) as usize;
let slice_end = (end - part.range.start) as usize;
Some(PageRangePart {
range: start..end,
bytes: part.bytes.slice(slice_start..slice_end),
})
}
fn assemble_range(range: &Range<u64>, mut parts: Vec<PageRangePart>) -> Result<Bytes> {
if range.start >= range.end {
return Ok(Bytes::new());
}
parts.sort_unstable_by_key(|part| part.range.start);
if parts.len() == 1 && parts[0].range == *range {
return Ok(parts.pop().unwrap().bytes);
}
let mut cursor = range.start;
let mut output = BytesMut::with_capacity((range.end - range.start) as usize);
for part in parts {
ensure!(
part.range.start <= cursor,
UnexpectedSnafu {
reason: format!(
"Missing cached parquet bytes for range {}..{}, next part starts at {}",
range.start, range.end, part.range.start
),
}
);
if part.range.end <= cursor {
continue;
}
let slice_start = (cursor - part.range.start) as usize;
output.extend_from_slice(&part.bytes.slice(slice_start..));
cursor = part.range.end.min(range.end);
if cursor >= range.end {
break;
}
}
ensure!(
cursor == range.end,
UnexpectedSnafu {
reason: format!(
"Missing cached parquet bytes for range {}..{}, assembled through {}",
range.start, range.end, cursor
),
}
);
Ok(output.freeze())
}
/// Builds a parquet record batch stream driven directly by [ParquetPushDecoderBuilder].
pub(crate) fn build_sst_parquet_record_batch_stream(
arrow_metadata: ArrowReaderMetadata,
@@ -220,3 +339,44 @@ pub(crate) fn build_sst_parquet_record_batch_stream(
}
.boxed())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_assemble_range_from_cached_subrange_and_fetched_tail() {
let cached_parts = vec![vec![PageRangePart {
range: 400..500,
bytes: Bytes::from(vec![1; 100]),
}]];
let fetched_parts = vec![PageRangePart {
range: 500..600,
bytes: Bytes::from(vec![2; 100]),
}];
let requested = 400..600;
let output = assemble_ranges(
std::slice::from_ref(&requested),
cached_parts,
&fetched_parts,
)
.unwrap();
assert_eq!(1, output.len());
assert_eq!(vec![1; 100].as_slice(), &output[0][..100]);
assert_eq!(vec![2; 100].as_slice(), &output[0][100..]);
}
#[test]
fn test_assemble_range_returns_single_covering_part_without_copy() {
let bytes = Bytes::from_static(b"abcdef");
let cached_parts = vec![vec![PageRangePart {
range: 10..16,
bytes: bytes.clone(),
}]];
let requested = 10..16;
let output = assemble_ranges(std::slice::from_ref(&requested), cached_parts, &[]).unwrap();
assert_eq!(bytes, output[0]);
}
}