diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 0bbb8400a1..b8c5f337c1 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -23,6 +23,7 @@ pub(crate) mod test_util; pub(crate) mod write_cache; use std::mem; +use std::ops::Range; use std::sync::Arc; use bytes::Bytes; @@ -32,7 +33,6 @@ use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef} use index::result_cache::IndexResultCache; use moka::notification::RemovalCause; use moka::sync::Cache; -use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef}; use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector}; @@ -674,49 +674,33 @@ pub struct ColumnPagePath { column_idx: usize, } -/// Cache key for pages of a SST row group. +/// Cache key to pages in a row group (after projection). +/// +/// Different projections will have different cache keys. +/// We cache all ranges together because they may refer to the same `Bytes`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub enum PageKey { - /// Cache key for a compressed page in a row group. - Compressed(ColumnPagePath), - /// Cache key for all uncompressed pages in a row group. - Uncompressed(ColumnPagePath), +pub struct PageKey { + /// Id of the SST file to cache. + file_id: FileId, + /// Index of the row group. + row_group_idx: usize, + /// Byte ranges of the pages to cache. + ranges: Vec>, } impl PageKey { - /// Creates a key for a compressed page. - pub fn new_compressed( - region_id: RegionId, - file_id: FileId, - row_group_idx: usize, - column_idx: usize, - ) -> PageKey { - PageKey::Compressed(ColumnPagePath { - region_id, + /// Creates a key for a list of pages. + pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec>) -> PageKey { + PageKey { file_id, row_group_idx, - column_idx, - }) - } - - /// Creates a key for all uncompressed pages in a row group. - pub fn new_uncompressed( - region_id: RegionId, - file_id: FileId, - row_group_idx: usize, - column_idx: usize, - ) -> PageKey { - PageKey::Uncompressed(ColumnPagePath { - region_id, - file_id, - row_group_idx, - column_idx, - }) + ranges, + } } /// Returns memory used by the key (estimated). fn estimated_size(&self) -> usize { - mem::size_of::() + mem::size_of::() + mem::size_of_val(self.ranges.as_slice()) } } @@ -724,38 +708,26 @@ impl PageKey { // We don't use enum here to make it easier to mock and use the struct. #[derive(Default)] pub struct PageValue { - /// Compressed page of the column in the row group. - pub compressed: Bytes, - /// All pages of the column in the row group. - pub row_group: Vec, + /// Compressed page in the row group. + pub compressed: Vec, + /// Total size of the pages (may be larger than sum of compressed bytes due to gaps). + pub page_size: u64, } impl PageValue { - /// Creates a new value from a compressed page. - pub fn new_compressed(bytes: Bytes) -> PageValue { + /// Creates a new value from a range of compressed pages. + pub fn new(bytes: Vec, page_size: u64) -> PageValue { PageValue { compressed: bytes, - row_group: vec![], - } - } - - /// Creates a new value from all pages in a row group. - pub fn new_row_group(pages: Vec) -> PageValue { - PageValue { - compressed: Bytes::new(), - row_group: pages, + page_size, } } /// Returns memory used by the value (estimated). fn estimated_size(&self) -> usize { mem::size_of::() - + self.compressed.len() - + self - .row_group - .iter() - .map(|page| page.buffer().len()) - .sum::() + + self.page_size as usize + + self.compressed.iter().map(mem::size_of_val).sum::() } } @@ -834,7 +806,7 @@ mod tests { .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value) .is_none()); - let key = PageKey::new_uncompressed(region_id, file_id, 0, 0); + let key = PageKey::new(file_id, 1, vec![Range { start: 0, end: 5 }]); let pages = Arc::new(PageValue::default()); cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_none()); @@ -882,9 +854,8 @@ mod tests { #[test] fn test_page_cache() { let cache = CacheManager::builder().page_cache_size(1000).build(); - let region_id = RegionId::new(1, 1); let file_id = FileId::random(); - let key = PageKey::new_compressed(region_id, file_id, 0, 0); + let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]); assert!(cache.get_pages(&key).is_none()); let pages = Arc::new(PageValue::default()); cache.put_pages(key.clone(), pages); diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 0a4c3e2b62..06c3107858 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -27,7 +27,6 @@ pub(crate) mod file_range; pub mod format; pub(crate) mod helper; pub(crate) mod metadata; -pub(crate) mod page_reader; pub mod plain_format; pub mod reader; pub mod row_group; @@ -236,7 +235,7 @@ mod tests { ) .await; - writer + let sst_info = writer .write_all(source, None, &write_opts) .await .unwrap() @@ -265,16 +264,24 @@ mod tests { .await; } - // Doesn't have compressed page cached. - let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0); - assert!(cache.get_pages(&page_key).is_none()); + let parquet_meta = sst_info.file_metadata.unwrap(); + let get_ranges = |row_group_idx: usize| { + let row_group = parquet_meta.row_group(row_group_idx); + let mut ranges = Vec::with_capacity(row_group.num_columns()); + for i in 0..row_group.num_columns() { + let (start, length) = row_group.column(i).byte_range(); + ranges.push(start..start + length); + } + + ranges + }; // Cache 4 row groups. for i in 0..4 { - let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0); + let page_key = PageKey::new(handle.file_id(), i, get_ranges(i)); assert!(cache.get_pages(&page_key).is_some()); } - let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0); + let page_key = PageKey::new(handle.file_id(), 5, vec![]); assert!(cache.get_pages(&page_key).is_none()); } diff --git a/src/mito2/src/sst/parquet/helper.rs b/src/mito2/src/sst/parquet/helper.rs index e80f751af9..95054a6092 100644 --- a/src/mito2/src/sst/parquet/helper.rs +++ b/src/mito2/src/sst/parquet/helper.rs @@ -89,7 +89,7 @@ fn parse_column_orders( } const FETCH_PARALLELISM: usize = 8; -const MERGE_GAP: usize = 512 * 1024; +pub(crate) const MERGE_GAP: usize = 512 * 1024; /// Asynchronously fetches byte ranges from an object store. /// diff --git a/src/mito2/src/sst/parquet/page_reader.rs b/src/mito2/src/sst/parquet/page_reader.rs deleted file mode 100644 index b2f69e2dd7..0000000000 --- a/src/mito2/src/sst/parquet/page_reader.rs +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Parquet page reader. - -use std::collections::VecDeque; - -use parquet::column::page::{Page, PageMetadata, PageReader}; -use parquet::errors::Result; - -/// A reader that reads all pages from a cache. -pub(crate) struct RowGroupCachedReader { - /// Cached pages. - pages: VecDeque, -} - -impl RowGroupCachedReader { - /// Returns a new reader from pages of a column in a row group. - pub(crate) fn new(pages: &[Page]) -> Self { - Self { - pages: pages.iter().cloned().collect(), - } - } -} - -impl PageReader for RowGroupCachedReader { - fn get_next_page(&mut self) -> Result> { - Ok(self.pages.pop_front()) - } - - fn peek_next_page(&mut self) -> Result> { - Ok(self.pages.front().map(page_to_page_meta)) - } - - fn skip_next_page(&mut self) -> Result<()> { - // When the `SerializedPageReader` is in `SerializedPageReaderState::Pages` state, it never pops - // the dictionary page. So it always return the dictionary page as the first page. See: - // https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L766-L770 - // But the `GenericColumnReader` will read the dictionary page before skipping records so it won't skip dictionary page. - // So we don't need to handle the dictionary page specifically in this method. - // https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/reader.rs#L322-L331 - self.pages.pop_front(); - Ok(()) - } -} - -impl Iterator for RowGroupCachedReader { - type Item = Result; - fn next(&mut self) -> Option { - self.get_next_page().transpose() - } -} - -/// Get [PageMetadata] from `page`. -/// -/// The conversion is based on [decode_page()](https://github.com/apache/arrow-rs/blob/1d6feeacebb8d0d659d493b783ba381940973745/parquet/src/file/serialized_reader.rs#L438-L481) -/// and [PageMetadata](https://github.com/apache/arrow-rs/blob/65f7be856099d389b0d0eafa9be47fad25215ee6/parquet/src/column/page.rs#L279-L301). -fn page_to_page_meta(page: &Page) -> PageMetadata { - match page { - Page::DataPage { num_values, .. } => PageMetadata { - num_rows: None, - num_levels: Some(*num_values as usize), - is_dict: false, - }, - Page::DataPageV2 { - num_values, - num_rows, - .. - } => PageMetadata { - num_rows: Some(*num_rows as usize), - num_levels: Some(*num_values as usize), - is_dict: false, - }, - Page::DictionaryPage { .. } => PageMetadata { - num_rows: None, - num_levels: None, - is_dict: true, - }, - } -} diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index c3978f430b..e2c75fcff8 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -23,9 +23,8 @@ use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; use parquet::arrow::ProjectionMask; use parquet::column::page::{PageIterator, PageReader}; use parquet::errors::{ParquetError, Result}; -use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData}; +use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; use parquet::file::page_index::offset_index::OffsetIndexMetaData; -use parquet::file::properties::DEFAULT_PAGE_SIZE; use parquet::file::reader::{ChunkReader, Length}; use parquet::file::serialized_reader::SerializedPageReader; use store_api::storage::RegionId; @@ -35,8 +34,7 @@ use crate::cache::file_cache::{FileType, IndexKey}; use crate::cache::{CacheStrategy, PageKey, PageValue}; use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::sst::file::FileId; -use crate::sst::parquet::helper::fetch_byte_ranges; -use crate::sst::parquet::page_reader::RowGroupCachedReader; +use crate::sst::parquet::helper::{fetch_byte_ranges, MERGE_GAP}; pub(crate) struct RowGroupBase<'a> { metadata: &'a RowGroupMetaData, @@ -44,11 +42,6 @@ pub(crate) struct RowGroupBase<'a> { /// Compressed page of each column. column_chunks: Vec>>, pub(crate) row_count: usize, - /// Row group level cached pages for each column. - /// - /// These pages are uncompressed pages of a row group. - /// `column_uncompressed_pages.len()` equals to `column_chunks.len()`. - column_uncompressed_pages: Vec>>, } impl<'a> RowGroupBase<'a> { @@ -68,7 +61,6 @@ impl<'a> RowGroupBase<'a> { offset_index, column_chunks: vec![None; metadata.columns().len()], row_count: metadata.num_rows() as usize, - column_uncompressed_pages: vec![None; metadata.columns().len()], } } @@ -144,13 +136,9 @@ impl<'a> RowGroupBase<'a> { pub(crate) fn calc_dense_read_ranges(&self, projection: &ProjectionMask) -> Vec> { self.column_chunks .iter() - .zip(&self.column_uncompressed_pages) .enumerate() - .filter(|&(idx, (chunk, uncompressed_pages))| { - // Don't need to fetch column data if we already cache the column's pages. - chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none() - }) - .map(|(idx, (_chunk, _pages))| { + .filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx)) + .map(|(idx, _chunk)| { let column = self.metadata.column(idx); let (start, length) = column.byte_range(); start..(start + length) @@ -158,23 +146,17 @@ impl<'a> RowGroupBase<'a> { .collect::>() } - /// Assigns uncompressed chunk binary data to [RowGroupBase::column_chunks] + /// Assigns compressed chunk binary data to [RowGroupBase::column_chunks] /// and returns the chunk offset and binary data assigned. pub(crate) fn assign_dense_chunk( &mut self, projection: &ProjectionMask, chunk_data: Vec, - ) -> Vec<(usize, Bytes)> { + ) { let mut chunk_data = chunk_data.into_iter(); - let mut res = vec![]; - for (idx, (chunk, row_group_pages)) in self - .column_chunks - .iter_mut() - .zip(&self.column_uncompressed_pages) - .enumerate() - { - if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() { + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { continue; } @@ -184,13 +166,11 @@ impl<'a> RowGroupBase<'a> { }; let column = self.metadata.column(idx); - res.push((idx, data.clone())); *chunk = Some(Arc::new(ColumnChunkData::Dense { offset: column.byte_range().0 as usize, data, })); } - res } /// Create [PageReader] from [RowGroupBase::column_chunks] @@ -219,7 +199,6 @@ impl<'a> RowGroupBase<'a> { } }; - // This column don't cache uncompressed pages. Ok(page_reader) } } @@ -277,9 +256,6 @@ impl<'a> InMemoryRowGroup<'a> { self.base .assign_sparse_chunk(projection, chunk_data, page_start_offsets); } else { - // Now we only use cache in dense chunk data. - self.fetch_pages_from_cache(projection); - // Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache` // is a synchronous, CPU-bound operation. yield_now().await; @@ -296,75 +272,25 @@ impl<'a> InMemoryRowGroup<'a> { let chunk_data = self.fetch_bytes(&fetch_ranges).await?; // Assigns fetched data to base. - let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data); - - // Put fetched data to cache if necessary. - for (col_idx, data) in assigned_columns { - let column = self.base.metadata.column(col_idx); - if !cache_uncompressed_pages(column) { - // For columns that have multiple uncompressed pages, we only cache the compressed page - // to save memory. - let page_key = PageKey::new_compressed( - self.region_id, - self.file_id, - self.row_group_idx, - col_idx, - ); - self.cache_strategy - .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); - } - } + self.base.assign_dense_chunk(projection, chunk_data); } Ok(()) } - /// Fetches pages for columns if cache is enabled. - /// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column. - fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { - let _timer = READ_STAGE_FETCH_PAGES.start_timer(); - self.base - .column_chunks - .iter_mut() - .enumerate() - .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) - .for_each(|(idx, chunk)| { - let column = self.base.metadata.column(idx); - if cache_uncompressed_pages(column) { - // Fetches uncompressed pages for the row group. - let page_key = PageKey::new_uncompressed( - self.region_id, - self.file_id, - self.row_group_idx, - idx, - ); - self.base.column_uncompressed_pages[idx] = - self.cache_strategy.get_pages(&page_key); - } else { - // Fetches the compressed page from the cache. - let page_key = PageKey::new_compressed( - self.region_id, - self.file_id, - self.row_group_idx, - idx, - ); - - *chunk = self.cache_strategy.get_pages(&page_key).map(|page_value| { - Arc::new(ColumnChunkData::Dense { - offset: column.byte_range().0 as usize, - data: page_value.compressed.clone(), - }) - }); - } - }); - } - - /// Try to fetch data from WriteCache, + /// Try to fetch data from the memory cache or the WriteCache, /// if not in WriteCache, fetch data from object store directly. async fn fetch_bytes(&self, ranges: &[Range]) -> Result> { + // Now fetch page timer includes the whole time to read pages. + let _timer = READ_STAGE_FETCH_PAGES.start_timer(); + let page_key = PageKey::new(self.file_id, self.row_group_idx, ranges.to_vec()); + if let Some(pages) = self.cache_strategy.get_pages(&page_key) { + return Ok(pages.compressed.clone()); + } + let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); - match self.fetch_ranges_from_write_cache(key, ranges).await { - Some(data) => Ok(data), + let pages = match self.fetch_ranges_from_write_cache(key, ranges).await { + Some(data) => data, None => { // Fetch data from object store. let _timer = READ_STAGE_ELAPSED @@ -373,9 +299,17 @@ impl<'a> InMemoryRowGroup<'a> { let data = fetch_byte_ranges(self.file_path, self.object_store.clone(), ranges) .await .map_err(|e| ParquetError::External(Box::new(e)))?; - Ok(data) + data } - } + }; + + // Put pages back to the cache. + let total_range_size = compute_total_range_size(ranges); + let page_value = PageValue::new(pages.clone(), total_range_size); + self.cache_strategy + .put_pages(page_key, Arc::new(page_value)); + + Ok(pages) } /// Fetches data from write cache. @@ -390,40 +324,46 @@ impl<'a> InMemoryRowGroup<'a> { } None } - - /// Creates a page reader to read column at `i`. - fn column_page_reader(&self, i: usize) -> Result> { - if let Some(cached_pages) = &self.base.column_uncompressed_pages[i] { - debug_assert!(!cached_pages.row_group.is_empty()); - // Hits the row group level page cache. - return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group))); - } - - let page_reader = self.base.column_reader(i)?; - - let column = self.base.metadata.column(i); - if cache_uncompressed_pages(column) { - // This column use row group level page cache. - // We collect all pages and put them into the cache. - let pages = page_reader.collect::>>()?; - let page_value = Arc::new(PageValue::new_row_group(pages)); - let page_key = - PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i); - self.cache_strategy.put_pages(page_key, page_value.clone()); - - return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group))); - } - - // This column don't cache uncompressed pages. - Ok(Box::new(page_reader)) - } } -/// Returns whether we cache uncompressed pages for the column. -fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool { - // If the row group only has a data page, cache the whole row group as - // it might be faster than caching a compressed page. - column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE +/// Computes the max possible buffer size to read the given `ranges`. +// See https://github.com/apache/opendal/blob/v0.54.0/core/src/types/read/reader.rs#L166-L192 +fn compute_total_range_size(ranges: &[Range]) -> u64 { + if ranges.is_empty() { + return 0; + } + + let gap = MERGE_GAP as u64; + let mut sorted_ranges = ranges.to_vec(); + sorted_ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start)); + + let mut total_size = 0; + let mut cur = sorted_ranges[0].clone(); + + for range in sorted_ranges.into_iter().skip(1) { + if range.start <= cur.end + gap { + // There is an overlap or the gap is small enough to merge + cur.end = cur.end.max(range.end); + } else { + // No overlap and the gap is too large, add current range to total and start a new one + total_size += align_to_pooled_buf_size(cur.end - cur.start); + cur = range; + } + } + + // Add the last range + total_size += align_to_pooled_buf_size(cur.end - cur.start); + + total_size +} + +/// Aligns the given size to the multiple of the pooled buffer size. +// See: +// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/backend.rs#L178 +// - https://github.com/apache/opendal/blob/v0.54.0/core/src/services/fs/reader.rs#L36-L46 +fn align_to_pooled_buf_size(size: u64) -> u64 { + const POOLED_BUF_SIZE: u64 = 2 * 1024 * 1024; + size.div_ceil(POOLED_BUF_SIZE) * POOLED_BUF_SIZE } impl RowGroups for InMemoryRowGroup<'_> { @@ -432,10 +372,11 @@ impl RowGroups for InMemoryRowGroup<'_> { } fn column_chunks(&self, i: usize) -> Result> { - let page_reader = self.column_page_reader(i)?; + // Creates a page reader to read column at `i`. + let page_reader = self.base.column_reader(i)?; Ok(Box::new(ColumnChunkIterator { - reader: Some(Ok(page_reader)), + reader: Some(Ok(Box::new(page_reader))), })) } }