fix: use actual buf size as cache page value size (#6829)

* feat: cache the cloned page bytes

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: cache the whole row group pages

The opendal reader may merge IO requests so the pages of different
columns can share the same Bytes.
When we use a per-column page cache, the page cache may still referencing
the whole Bytes after eviction if there are other columns in the cache that
share the same Bytes.

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: check possible max byte range and copy pages if needed

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: always copy pages

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: returns the copied pages

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: compute cache size by MERGE_GAP

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: align to buf size

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: aligh to 2MB

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove unused code

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix typo

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix parquet read with cache test

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-08-28 11:37:11 +08:00
parent beb3447938
commit 02d9245516
5 changed files with 114 additions and 286 deletions

View File

@@ -23,6 +23,7 @@ pub(crate) mod test_util;
pub(crate) mod write_cache;
use std::mem;
use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
@@ -32,7 +33,6 @@ use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}
use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
@@ -674,49 +674,33 @@ pub struct ColumnPagePath {
column_idx: usize,
}
/// Cache key for pages of a SST row group.
/// Cache key to pages in a row group (after projection).
///
/// Different projections will have different cache keys.
/// We cache all ranges together because they may refer to the same `Bytes`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PageKey {
/// Cache key for a compressed page in a row group.
Compressed(ColumnPagePath),
/// Cache key for all uncompressed pages in a row group.
Uncompressed(ColumnPagePath),
pub struct PageKey {
/// Id of the SST file to cache.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Byte ranges of the pages to cache.
ranges: Vec<Range<u64>>,
}
impl PageKey {
/// Creates a key for a compressed page.
pub fn new_compressed(
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> PageKey {
PageKey::Compressed(ColumnPagePath {
region_id,
/// Creates a key for a list of pages.
pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
PageKey {
file_id,
row_group_idx,
column_idx,
})
}
/// Creates a key for all uncompressed pages in a row group.
pub fn new_uncompressed(
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> PageKey {
PageKey::Uncompressed(ColumnPagePath {
region_id,
file_id,
row_group_idx,
column_idx,
})
ranges,
}
}
/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
}
}
@@ -724,38 +708,26 @@ impl PageKey {
// We don't use enum here to make it easier to mock and use the struct.
#[derive(Default)]
pub struct PageValue {
/// Compressed page of the column in the row group.
pub compressed: Bytes,
/// All pages of the column in the row group.
pub row_group: Vec<Page>,
/// Compressed page in the row group.
pub compressed: Vec<Bytes>,
/// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
pub page_size: u64,
}
impl PageValue {
/// Creates a new value from a compressed page.
pub fn new_compressed(bytes: Bytes) -> PageValue {
/// Creates a new value from a range of compressed pages.
pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
PageValue {
compressed: bytes,
row_group: vec![],
}
}
/// Creates a new value from all pages in a row group.
pub fn new_row_group(pages: Vec<Page>) -> PageValue {
PageValue {
compressed: Bytes::new(),
row_group: pages,
page_size,
}
}
/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
+ self.compressed.len()
+ self
.row_group
.iter()
.map(|page| page.buffer().len())
.sum::<usize>()
+ self.page_size as usize
+ self.compressed.iter().map(mem::size_of_val).sum::<usize>()
}
}
@@ -834,7 +806,7 @@ mod tests {
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());
let key = PageKey::new_uncompressed(region_id, file_id, 0, 0);
let key = PageKey::new(file_id, 1, vec![Range { start: 0, end: 5 }]);
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());
@@ -882,9 +854,8 @@ mod tests {
#[test]
fn test_page_cache() {
let cache = CacheManager::builder().page_cache_size(1000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey::new_compressed(region_id, file_id, 0, 0);
let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);

View File

@@ -27,7 +27,6 @@ pub(crate) mod file_range;
pub mod format;
pub(crate) mod helper;
pub(crate) mod metadata;
pub(crate) mod page_reader;
pub mod plain_format;
pub mod reader;
pub mod row_group;
@@ -236,7 +235,7 @@ mod tests {
)
.await;
writer
let sst_info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
@@ -265,16 +264,24 @@ mod tests {
.await;
}
// Doesn't have compressed page cached.
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
assert!(cache.get_pages(&page_key).is_none());
let parquet_meta = sst_info.file_metadata.unwrap();
let get_ranges = |row_group_idx: usize| {
let row_group = parquet_meta.row_group(row_group_idx);
let mut ranges = Vec::with_capacity(row_group.num_columns());
for i in 0..row_group.num_columns() {
let (start, length) = row_group.column(i).byte_range();
ranges.push(start..start + length);
}
ranges
};
// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
let page_key = PageKey::new(handle.file_id(), i, get_ranges(i));
assert!(cache.get_pages(&page_key).is_some());
}
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
let page_key = PageKey::new(handle.file_id(), 5, vec![]);
assert!(cache.get_pages(&page_key).is_none());
}

View File

@@ -89,7 +89,7 @@ fn parse_column_orders(
}
const FETCH_PARALLELISM: usize = 8;
const MERGE_GAP: usize = 512 * 1024;
pub(crate) const MERGE_GAP: usize = 512 * 1024;
/// Asynchronously fetches byte ranges from an object store.
///

View File

@@ -1,91 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Parquet page reader.
use std::collections::VecDeque;
use parquet::column::page::{Page, PageMetadata, PageReader};
use parquet::errors::Result;
/// A reader that reads all pages from a cache.
pub(crate) struct RowGroupCachedReader {
/// Cached pages.
pages: VecDeque<Page>,
}
impl RowGroupCachedReader {
/// Returns a new reader from pages of a column in a row group.
pub(crate) fn new(pages: &[Page]) -> Self {
Self {
pages: pages.iter().cloned().collect(),
}
}
}
impl PageReader for RowGroupCachedReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.pop_front())
}
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Ok(self.pages.front().map(page_to_page_meta))
}
fn skip_next_page(&mut self) -> Result<()> {
// When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops
// the dictionary page. So it always return the dictionary page as the first page. See:
// https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770
// But the `GenericColumnReader` will read the dictionary page before skipping records so it won't skip dictionary page.
// So we don't need to handle the dictionary page specifically in this method.
// https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331
self.pages.pop_front();
Ok(())
}
}
impl Iterator for RowGroupCachedReader {
type Item = Result<Page>;
fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}
/// Get [PageMetadata] from `page`.
///
/// The conversion is based on [decode_page()](https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L438-L481)
/// and [PageMetadata](https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/page.rs#L279-L301).
fn page_to_page_meta(page: &Page) -> PageMetadata {
match page {
Page::DataPage { num_values, .. } => PageMetadata {
num_rows: None,
num_levels: Some(*num_values as usize),
is_dict: false,
},
Page::DataPageV2 {
num_values,
num_rows,
..
} => PageMetadata {
num_rows: Some(*num_rows as usize),
num_levels: Some(*num_values as usize),
is_dict: false,
},
Page::DictionaryPage { .. } => PageMetadata {
num_rows: None,
num_levels: None,
is_dict: true,
},
}
}

View File

@@ -23,9 +23,8 @@ use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::ProjectionMask;
use parquet::column::page::{PageIterator, PageReader};
use parquet::errors::{ParquetError, Result};
use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::properties::DEFAULT_PAGE_SIZE;
use parquet::file::reader::{ChunkReader, Length};
use parquet::file::serialized_reader::SerializedPageReader;
use store_api::storage::RegionId;
@@ -35,8 +34,7 @@ use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::page_reader::RowGroupCachedReader;
use crate::sst::parquet::helper::{fetch_byte_ranges, MERGE_GAP};
pub(crate) struct RowGroupBase<'a> {
metadata: &'a RowGroupMetaData,
@@ -44,11 +42,6 @@ pub(crate) struct RowGroupBase<'a> {
/// Compressed page of each column.
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
pub(crate) row_count: usize,
/// Row group level cached pages for each column.
///
/// These pages are uncompressed pages of a row group.
/// `column_uncompressed_pages.len()` equals to `column_chunks.len()`.
column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
}
impl<'a> RowGroupBase<'a> {
@@ -68,7 +61,6 @@ impl<'a> RowGroupBase<'a> {
offset_index,
column_chunks: vec![None; metadata.columns().len()],
row_count: metadata.num_rows() as usize,
column_uncompressed_pages: vec![None; metadata.columns().len()],
}
}
@@ -144,13 +136,9 @@ impl<'a> RowGroupBase<'a> {
pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec<Range<u64>> {
self.column_chunks
.iter()
.zip(&self.column_uncompressed_pages)
.enumerate()
.filter(|&(idx, (chunk, uncompressed_pages))| {
// Don't need to fetch column data if we already cache the column's pages.
chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none()
})
.map(|(idx, (_chunk, _pages))| {
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
.map(|(idx, _chunk)| {
let column = self.metadata.column(idx);
let (start, length) = column.byte_range();
start..(start + length)
@@ -158,23 +146,17 @@ impl<'a> RowGroupBase<'a> {
.collect::<Vec<_>>()
}
/// Assigns uncompressed chunk binary data to [RowGroupBase::column_chunks]
/// Assigns compressed chunk binary data to [RowGroupBase::column_chunks]
/// and returns the chunk offset and binary data assigned.
pub(crate) fn assign_dense_chunk(
&mut self,
projection: &ProjectionMask,
chunk_data: Vec<Bytes>,
) -> Vec<(usize, Bytes)> {
) {
let mut chunk_data = chunk_data.into_iter();
let mut res = vec![];
for (idx, (chunk, row_group_pages)) in self
.column_chunks
.iter_mut()
.zip(&self.column_uncompressed_pages)
.enumerate()
{
if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() {
for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
if chunk.is_some() || !projection.leaf_included(idx) {
continue;
}
@@ -184,13 +166,11 @@ impl<'a> RowGroupBase<'a> {
};
let column = self.metadata.column(idx);
res.push((idx, data.clone()));
*chunk = Some(Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data,
}));
}
res
}
/// Create [PageReader] from [RowGroupBase::column_chunks]
@@ -219,7 +199,6 @@ impl<'a> RowGroupBase<'a> {
}
};
// This column don't cache uncompressed pages.
Ok(page_reader)
}
}
@@ -277,9 +256,6 @@ impl<'a> InMemoryRowGroup<'a> {
self.base
.assign_sparse_chunk(projection, chunk_data, page_start_offsets);
} else {
// Now we only use cache in dense chunk data.
self.fetch_pages_from_cache(projection);
// Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
// is a synchronous, CPU-bound operation.
yield_now().await;
@@ -296,75 +272,25 @@ impl<'a> InMemoryRowGroup<'a> {
let chunk_data = self.fetch_bytes(&fetch_ranges).await?;
// Assigns fetched data to base.
let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data);
// Put fetched data to cache if necessary.
for (col_idx, data) in assigned_columns {
let column = self.base.metadata.column(col_idx);
if !cache_uncompressed_pages(column) {
// For columns that have multiple uncompressed pages, we only cache the compressed page
// to save memory.
let page_key = PageKey::new_compressed(
self.region_id,
self.file_id,
self.row_group_idx,
col_idx,
);
self.cache_strategy
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
}
}
self.base.assign_dense_chunk(projection, chunk_data);
}
Ok(())
}
/// Fetches pages for columns if cache is enabled.
/// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column.
fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
self.base
.column_chunks
.iter_mut()
.enumerate()
.filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
.for_each(|(idx, chunk)| {
let column = self.base.metadata.column(idx);
if cache_uncompressed_pages(column) {
// Fetches uncompressed pages for the row group.
let page_key = PageKey::new_uncompressed(
self.region_id,
self.file_id,
self.row_group_idx,
idx,
);
self.base.column_uncompressed_pages[idx] =
self.cache_strategy.get_pages(&page_key);
} else {
// Fetches the compressed page from the cache.
let page_key = PageKey::new_compressed(
self.region_id,
self.file_id,
self.row_group_idx,
idx,
);
*chunk = self.cache_strategy.get_pages(&page_key).map(|page_value| {
Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data: page_value.compressed.clone(),
})
});
}
});
}
/// Try to fetch data from WriteCache,
/// Try to fetch data from the memory cache or the WriteCache,
/// if not in WriteCache, fetch data from object store directly.
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
// Now fetch page timer includes the whole time to read pages.
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec());
if let Some(pages) = self.cache_strategy.get_pages(&page_key) {
return Ok(pages.compressed.clone());
}
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
match self.fetch_ranges_from_write_cache(key, ranges).await {
Some(data) => Ok(data),
let pages = match self.fetch_ranges_from_write_cache(key, ranges).await {
Some(data) => data,
None => {
// Fetch data from object store.
let _timer = READ_STAGE_ELAPSED
@@ -373,9 +299,17 @@ impl<'a> InMemoryRowGroup<'a> {
let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
Ok(data)
data
}
}
};
// Put pages back to the cache.
let total_range_size = compute_total_range_size(ranges);
let page_value = PageValue::new(pages.clone(), total_range_size);
self.cache_strategy
.put_pages(page_key, Arc::new(page_value));
Ok(pages)
}
/// Fetches data from write cache.
@@ -390,40 +324,46 @@ impl<'a> InMemoryRowGroup<'a> {
}
None
}
/// Creates a page reader to read column at `i`.
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] {
debug_assert!(!cached_pages.row_group.is_empty());
// Hits the row group level page cache.
return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
}
let page_reader = self.base.column_reader(i)?;
let column = self.base.metadata.column(i);
if cache_uncompressed_pages(column) {
// This column use row group level page cache.
// We collect all pages and put them into the cache.
let pages = page_reader.collect::<Result<Vec<_>>>()?;
let page_value = Arc::new(PageValue::new_row_group(pages));
let page_key =
PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
self.cache_strategy.put_pages(page_key, page_value.clone());
return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
}
// This column don't cache uncompressed pages.
Ok(Box::new(page_reader))
}
}
/// Returns whether we cache uncompressed pages for the column.
fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool {
// If the row group only has a data page, cache the whole row group as
// it might be faster than caching a compressed page.
column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE
/// Computes the max possible buffer size to read the given `ranges`.
// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192
fn compute_total_range_size(ranges: &[Range<u64>]) -> u64 {
if ranges.is_empty() {
return 0;
}
let gap = MERGE_GAP as u64;
let mut sorted_ranges = ranges.to_vec();
sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
let mut total_size = 0;
let mut cur = sorted_ranges[0].clone();
for range in sorted_ranges.into_iter().skip(1) {
if range.start <= cur.end + gap {
// There is an overlap or the gap is small enough to merge
cur.end = cur.end.max(range.end);
} else {
// No overlap and the gap is too large, add current range to total and start a new one
total_size += align_to_pooled_buf_size(cur.end - cur.start);
cur = range;
}
}
// Add the last range
total_size += align_to_pooled_buf_size(cur.end - cur.start);
total_size
}
/// Aligns the given size to the multiple of the pooled buffer size.
// See:
// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178
// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46
fn align_to_pooled_buf_size(size: u64) -> u64 {
const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024;
size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE
}
impl RowGroups for InMemoryRowGroup<'_> {
@@ -432,10 +372,11 @@ impl RowGroups for InMemoryRowGroup<'_> {
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
let page_reader = self.column_page_reader(i)?;
// Creates a page reader to read column at `i`.
let page_reader = self.base.column_reader(i)?;
Ok(Box::new(ColumnChunkIterator {
reader: Some(Ok(page_reader)),
reader: Some(Ok(Box::new(page_reader))),
}))
}
}