mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat: skip caching uncompressed pages if they are large (#4705)
* feat: cache each uncompressed page * chore: remove unused function * chore: log * chore: log * chore: row group pages cache kv * feat: also support row group level cache * chore: fix range count * feat: don't cache compressed page for row group cache * feat: use function to get part * chore: log whether scan is from compaction * chore: avoid get column * feat: add timer metrics * chore: Revert "feat: add timer metrics" This reverts commit 4618f57fa2ba13b1e1a8dec83afd01c00ae4c867. * feat: don't cache individual uncompressed page * feat: append in row group level under append mode Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * chore: fetch pages cost * perf: yield * Update src/mito2/src/sst/parquet/row_group.rs * refactor: cache key * feat: print file num and row groups num in explain * test: update sqlness test * chore: Update src/mito2/src/sst/parquet/page_reader.rs --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -25,6 +25,7 @@ pub(crate) mod write_cache;
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
|
||||
use bytes::Bytes;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use moka::notification::RemovalCause;
|
||||
@@ -393,20 +394,59 @@ impl SstMetaKey {
|
||||
}
|
||||
}
|
||||
|
||||
/// Path to column pages in the SST file.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct ColumnPagePath {
|
||||
/// Region id of the SST file to cache.
|
||||
region_id: RegionId,
|
||||
/// Id of the SST file to cache.
|
||||
file_id: FileId,
|
||||
/// Index of the row group.
|
||||
row_group_idx: usize,
|
||||
/// Index of the column in the row group.
|
||||
column_idx: usize,
|
||||
}
|
||||
|
||||
/// Cache key for pages of a SST row group.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct PageKey {
|
||||
/// Region id of the SST file to cache.
|
||||
pub region_id: RegionId,
|
||||
/// Id of the SST file to cache.
|
||||
pub file_id: FileId,
|
||||
/// Index of the row group.
|
||||
pub row_group_idx: usize,
|
||||
/// Index of the column in the row group.
|
||||
pub column_idx: usize,
|
||||
pub enum PageKey {
|
||||
/// Cache key for a compressed page in a row group.
|
||||
Compressed(ColumnPagePath),
|
||||
/// Cache key for all uncompressed pages in a row group.
|
||||
Uncompressed(ColumnPagePath),
|
||||
}
|
||||
|
||||
impl PageKey {
|
||||
/// Creates a key for a compressed page.
|
||||
pub fn new_compressed(
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
row_group_idx: usize,
|
||||
column_idx: usize,
|
||||
) -> PageKey {
|
||||
PageKey::Compressed(ColumnPagePath {
|
||||
region_id,
|
||||
file_id,
|
||||
row_group_idx,
|
||||
column_idx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Creates a key for all uncompressed pages in a row group.
|
||||
pub fn new_uncompressed(
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
row_group_idx: usize,
|
||||
column_idx: usize,
|
||||
) -> PageKey {
|
||||
PageKey::Uncompressed(ColumnPagePath {
|
||||
region_id,
|
||||
file_id,
|
||||
row_group_idx,
|
||||
column_idx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns memory used by the key (estimated).
|
||||
fn estimated_size(&self) -> usize {
|
||||
mem::size_of::<Self>()
|
||||
@@ -414,21 +454,41 @@ impl PageKey {
|
||||
}
|
||||
|
||||
/// Cached row group pages for a column.
|
||||
// We don't use enum here to make it easier to mock and use the struct.
|
||||
#[derive(Default)]
|
||||
pub struct PageValue {
|
||||
/// Compressed page of the column in the row group.
|
||||
pub compressed: Bytes,
|
||||
/// All pages of the column in the row group.
|
||||
pub pages: Vec<Page>,
|
||||
pub row_group: Vec<Page>,
|
||||
}
|
||||
|
||||
impl PageValue {
|
||||
/// Creates a new page value.
|
||||
pub fn new(pages: Vec<Page>) -> PageValue {
|
||||
PageValue { pages }
|
||||
/// Creates a new value from a compressed page.
|
||||
pub fn new_compressed(bytes: Bytes) -> PageValue {
|
||||
PageValue {
|
||||
compressed: bytes,
|
||||
row_group: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new value from all pages in a row group.
|
||||
pub fn new_row_group(pages: Vec<Page>) -> PageValue {
|
||||
PageValue {
|
||||
compressed: Bytes::new(),
|
||||
row_group: pages,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns memory used by the value (estimated).
|
||||
fn estimated_size(&self) -> usize {
|
||||
// We only consider heap size of all pages.
|
||||
self.pages.iter().map(|page| page.buffer().len()).sum()
|
||||
mem::size_of::<Self>()
|
||||
+ self.compressed.len()
|
||||
+ self
|
||||
.row_group
|
||||
.iter()
|
||||
.map(|page| page.buffer().len())
|
||||
.sum::<usize>()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -507,13 +567,8 @@ mod tests {
|
||||
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
|
||||
.is_none());
|
||||
|
||||
let key = PageKey {
|
||||
region_id,
|
||||
file_id,
|
||||
row_group_idx: 0,
|
||||
column_idx: 0,
|
||||
};
|
||||
let pages = Arc::new(PageValue::new(Vec::new()));
|
||||
let key = PageKey::new_uncompressed(region_id, file_id, 0, 0);
|
||||
let pages = Arc::new(PageValue::default());
|
||||
cache.put_pages(key.clone(), pages);
|
||||
assert!(cache.get_pages(&key).is_none());
|
||||
|
||||
@@ -562,14 +617,9 @@ mod tests {
|
||||
let cache = CacheManager::builder().page_cache_size(1000).build();
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let file_id = FileId::random();
|
||||
let key = PageKey {
|
||||
region_id,
|
||||
file_id,
|
||||
row_group_idx: 0,
|
||||
column_idx: 0,
|
||||
};
|
||||
let key = PageKey::new_compressed(region_id, file_id, 0, 0);
|
||||
assert!(cache.get_pages(&key).is_none());
|
||||
let pages = Arc::new(PageValue::new(Vec::new()));
|
||||
let pages = Arc::new(PageValue::default());
|
||||
cache.put_pages(key.clone(), pages);
|
||||
assert!(cache.get_pages(&key).is_some());
|
||||
}
|
||||
|
||||
@@ -133,6 +133,7 @@ lazy_static! {
|
||||
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
|
||||
/// Counter of rows read from different source.
|
||||
pub static ref READ_ROWS_TOTAL: IntCounterVec =
|
||||
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
|
||||
|
||||
@@ -89,6 +89,9 @@ impl Drop for MergeReader {
|
||||
READ_STAGE_ELAPSED
|
||||
.with_label_values(&["merge"])
|
||||
.observe(self.metrics.scan_cost.as_secs_f64());
|
||||
READ_STAGE_ELAPSED
|
||||
.with_label_values(&["merge_fetch"])
|
||||
.observe(self.metrics.fetch_cost.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -901,10 +901,21 @@ impl ScanPartList {
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the number of files.
|
||||
pub(crate) fn num_files(&self) -> usize {
|
||||
self.0.as_ref().map_or(0, |parts| {
|
||||
parts.iter().map(|part| part.file_ranges.len()).sum()
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the number of file ranges.
|
||||
pub(crate) fn num_file_ranges(&self) -> usize {
|
||||
self.0.as_ref().map_or(0, |parts| {
|
||||
parts.iter().map(|part| part.file_ranges.len()).sum()
|
||||
parts
|
||||
.iter()
|
||||
.flat_map(|part| part.file_ranges.iter())
|
||||
.map(|ranges| ranges.len())
|
||||
.sum()
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -947,9 +958,10 @@ impl StreamContext {
|
||||
Ok(inner) => match t {
|
||||
DisplayFormatType::Default => write!(
|
||||
f,
|
||||
"partition_count={} ({} memtable ranges, {} file ranges)",
|
||||
"partition_count={} ({} memtable ranges, {} file {} ranges)",
|
||||
inner.0.len(),
|
||||
inner.0.num_mem_ranges(),
|
||||
inner.0.num_files(),
|
||||
inner.0.num_file_ranges()
|
||||
)?,
|
||||
DisplayFormatType::Verbose => write!(f, "{:?}", inner.0)?,
|
||||
|
||||
@@ -166,8 +166,8 @@ impl SeqScan {
|
||||
reader_metrics.merge_from(reader.metrics());
|
||||
}
|
||||
debug!(
|
||||
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
|
||||
region_id, file_id, range_num, reader_metrics
|
||||
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}, compaction: {}",
|
||||
region_id, file_id, range_num, reader_metrics, compaction
|
||||
);
|
||||
// Reports metrics.
|
||||
reader_metrics.observe_rows(read_type);
|
||||
@@ -238,11 +238,12 @@ impl SeqScan {
|
||||
let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await;
|
||||
let build_reader_cost = build_start.elapsed();
|
||||
metrics.build_reader_cost += build_reader_cost;
|
||||
common_telemetry::debug!(
|
||||
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}",
|
||||
debug!(
|
||||
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}, compaction: {}",
|
||||
stream_ctx.input.mapper.metadata().region_id,
|
||||
range_id,
|
||||
build_reader_cost
|
||||
build_reader_cost,
|
||||
compaction,
|
||||
);
|
||||
|
||||
maybe_reader
|
||||
@@ -354,11 +355,12 @@ impl SeqScan {
|
||||
metrics.observe_metrics_on_finish();
|
||||
|
||||
debug!(
|
||||
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
|
||||
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
|
||||
stream_ctx.input.mapper.metadata().region_id,
|
||||
partition,
|
||||
metrics,
|
||||
first_poll,
|
||||
compaction,
|
||||
);
|
||||
}
|
||||
};
|
||||
@@ -450,13 +452,14 @@ impl SeqScan {
|
||||
metrics.total_cost = stream_ctx.query_start.elapsed();
|
||||
metrics.observe_metrics_on_finish();
|
||||
|
||||
common_telemetry::debug!(
|
||||
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}",
|
||||
debug!(
|
||||
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
|
||||
stream_ctx.input.mapper.metadata().region_id,
|
||||
partition,
|
||||
id,
|
||||
metrics,
|
||||
first_poll,
|
||||
compaction,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -152,7 +152,6 @@ impl RegionScanner for UnorderedScan {
|
||||
let parallelism = self.properties.num_partitions();
|
||||
let stream = try_stream! {
|
||||
let first_poll = stream_ctx.query_start.elapsed();
|
||||
|
||||
let part = {
|
||||
let mut parts = stream_ctx.parts.lock().await;
|
||||
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism)
|
||||
@@ -180,6 +179,7 @@ impl RegionScanner for UnorderedScan {
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
metrics.build_reader_cost = build_reader_start.elapsed();
|
||||
|
||||
let query_start = stream_ctx.query_start;
|
||||
let cache = stream_ctx.input.cache_manager.as_deref();
|
||||
// Scans memtables first.
|
||||
@@ -217,8 +217,8 @@ impl RegionScanner for UnorderedScan {
|
||||
metrics.total_cost = query_start.elapsed();
|
||||
metrics.observe_metrics_on_finish();
|
||||
debug!(
|
||||
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
|
||||
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
|
||||
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}, ranges: {}",
|
||||
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, part.file_ranges[0].len(),
|
||||
);
|
||||
};
|
||||
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||
@@ -343,14 +343,14 @@ impl UnorderedDistributor {
|
||||
|
||||
let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1);
|
||||
let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1);
|
||||
common_telemetry::debug!(
|
||||
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
|
||||
parallelism,
|
||||
self.mem_ranges.len(),
|
||||
self.file_ranges.len(),
|
||||
mems_per_part,
|
||||
ranges_per_part
|
||||
);
|
||||
debug!(
|
||||
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
|
||||
parallelism,
|
||||
self.mem_ranges.len(),
|
||||
self.file_ranges.len(),
|
||||
mems_per_part,
|
||||
ranges_per_part
|
||||
);
|
||||
let mut scan_parts = self
|
||||
.mem_ranges
|
||||
.chunks(mems_per_part)
|
||||
|
||||
@@ -216,22 +216,16 @@ mod tests {
|
||||
.await;
|
||||
}
|
||||
|
||||
// Doesn't have compressed page cached.
|
||||
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
|
||||
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
|
||||
|
||||
// Cache 4 row groups.
|
||||
for i in 0..4 {
|
||||
let page_key = PageKey {
|
||||
region_id: metadata.region_id,
|
||||
file_id: handle.file_id(),
|
||||
row_group_idx: i,
|
||||
column_idx: 0,
|
||||
};
|
||||
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
|
||||
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
|
||||
}
|
||||
let page_key = PageKey {
|
||||
region_id: metadata.region_id,
|
||||
file_id: handle.file_id(),
|
||||
row_group_idx: 5,
|
||||
column_idx: 0,
|
||||
};
|
||||
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
|
||||
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
|
||||
}
|
||||
|
||||
|
||||
@@ -19,14 +19,14 @@ use std::collections::VecDeque;
|
||||
use parquet::column::page::{Page, PageMetadata, PageReader};
|
||||
use parquet::errors::Result;
|
||||
|
||||
/// A reader that reads from cached pages.
|
||||
pub(crate) struct CachedPageReader {
|
||||
/// A reader that reads all pages from a cache.
|
||||
pub(crate) struct RowGroupCachedReader {
|
||||
/// Cached pages.
|
||||
pages: VecDeque<Page>,
|
||||
}
|
||||
|
||||
impl CachedPageReader {
|
||||
/// Returns a new reader from existing pages.
|
||||
impl RowGroupCachedReader {
|
||||
/// Returns a new reader from pages of a column in a row group.
|
||||
pub(crate) fn new(pages: &[Page]) -> Self {
|
||||
Self {
|
||||
pages: pages.iter().cloned().collect(),
|
||||
@@ -34,7 +34,7 @@ impl CachedPageReader {
|
||||
}
|
||||
}
|
||||
|
||||
impl PageReader for CachedPageReader {
|
||||
impl PageReader for RowGroupCachedReader {
|
||||
fn get_next_page(&mut self) -> Result<Option<Page>> {
|
||||
Ok(self.pages.pop_front())
|
||||
}
|
||||
@@ -55,9 +55,8 @@ impl PageReader for CachedPageReader {
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for CachedPageReader {
|
||||
impl Iterator for RowGroupCachedReader {
|
||||
type Item = Result<Page>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.get_next_page().transpose()
|
||||
}
|
||||
|
||||
@@ -23,33 +23,37 @@ use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
|
||||
use parquet::arrow::ProjectionMask;
|
||||
use parquet::column::page::{PageIterator, PageReader};
|
||||
use parquet::errors::{ParquetError, Result};
|
||||
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
|
||||
use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
|
||||
use parquet::file::properties::DEFAULT_PAGE_SIZE;
|
||||
use parquet::file::reader::{ChunkReader, Length};
|
||||
use parquet::file::serialized_reader::SerializedPageReader;
|
||||
use parquet::format::PageLocation;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::task::yield_now;
|
||||
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::{CacheManagerRef, PageKey, PageValue};
|
||||
use crate::metrics::READ_STAGE_ELAPSED;
|
||||
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::parquet::helper::fetch_byte_ranges;
|
||||
use crate::sst::parquet::page_reader::CachedPageReader;
|
||||
use crate::sst::parquet::page_reader::RowGroupCachedReader;
|
||||
|
||||
/// An in-memory collection of column chunks
|
||||
pub struct InMemoryRowGroup<'a> {
|
||||
metadata: &'a RowGroupMetaData,
|
||||
page_locations: Option<&'a [Vec<PageLocation>]>,
|
||||
/// Compressed page of each column.
|
||||
column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
|
||||
row_count: usize,
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
row_group_idx: usize,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
/// Cached pages for each column.
|
||||
/// Row group level cached pages for each column.
|
||||
///
|
||||
/// `column_cached_pages.len()` equals to `column_chunks.len()`.
|
||||
column_cached_pages: Vec<Option<Arc<PageValue>>>,
|
||||
/// These pages are uncompressed pages of a row group.
|
||||
/// `column_uncompressed_pages.len()` equals to `column_chunks.len()`.
|
||||
column_uncompressed_pages: Vec<Option<Arc<PageValue>>>,
|
||||
file_path: &'a str,
|
||||
/// Object store.
|
||||
object_store: ObjectStore,
|
||||
@@ -86,7 +90,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
file_id,
|
||||
row_group_idx,
|
||||
cache_manager,
|
||||
column_cached_pages: vec![None; metadata.columns().len()],
|
||||
column_uncompressed_pages: vec![None; metadata.columns().len()],
|
||||
file_path,
|
||||
object_store,
|
||||
}
|
||||
@@ -161,16 +165,20 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
// Now we only use cache in dense chunk data.
|
||||
self.fetch_pages_from_cache(projection);
|
||||
|
||||
// Release the CPU to avoid blocking the runtime. Since `fetch_pages_from_cache`
|
||||
// is a synchronous, CPU-bound operation.
|
||||
yield_now().await;
|
||||
|
||||
let fetch_ranges = self
|
||||
.column_chunks
|
||||
.iter()
|
||||
.zip(&self.column_cached_pages)
|
||||
.zip(&self.column_uncompressed_pages)
|
||||
.enumerate()
|
||||
// Don't need to fetch column data if we already cache the column's pages.
|
||||
.filter(|&(idx, (chunk, cached_pages))| {
|
||||
chunk.is_none() && projection.leaf_included(idx) && cached_pages.is_none()
|
||||
.filter(|&(idx, (chunk, uncompressed_pages))| {
|
||||
// Don't need to fetch column data if we already cache the column's pages.
|
||||
chunk.is_none() && projection.leaf_included(idx) && uncompressed_pages.is_none()
|
||||
})
|
||||
.map(|(idx, (_chunk, _cached_pages))| {
|
||||
.map(|(idx, (_chunk, _pages))| {
|
||||
let column = self.metadata.column(idx);
|
||||
let (start, length) = column.byte_range();
|
||||
start..(start + length)
|
||||
@@ -184,22 +192,41 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
|
||||
let mut chunk_data = self.fetch_bytes(&fetch_ranges).await?.into_iter();
|
||||
|
||||
for (idx, (chunk, cached_pages)) in self
|
||||
for (idx, (chunk, row_group_pages)) in self
|
||||
.column_chunks
|
||||
.iter_mut()
|
||||
.zip(&self.column_cached_pages)
|
||||
.zip(&self.column_uncompressed_pages)
|
||||
.enumerate()
|
||||
{
|
||||
if chunk.is_some() || !projection.leaf_included(idx) || cached_pages.is_some() {
|
||||
if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(data) = chunk_data.next() {
|
||||
*chunk = Some(Arc::new(ColumnChunkData::Dense {
|
||||
offset: self.metadata.column(idx).byte_range().0 as usize,
|
||||
data,
|
||||
}));
|
||||
// Get the fetched page.
|
||||
let Some(data) = chunk_data.next() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let column = self.metadata.column(idx);
|
||||
if let Some(cache) = &self.cache_manager {
|
||||
if !cache_uncompressed_pages(column) {
|
||||
// For columns that have multiple uncompressed pages, we only cache the compressed page
|
||||
// to save memory.
|
||||
let page_key = PageKey::new_compressed(
|
||||
self.region_id,
|
||||
self.file_id,
|
||||
self.row_group_idx,
|
||||
idx,
|
||||
);
|
||||
cache
|
||||
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
|
||||
}
|
||||
}
|
||||
|
||||
*chunk = Some(Arc::new(ColumnChunkData::Dense {
|
||||
offset: column.byte_range().0 as usize,
|
||||
data,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,20 +234,42 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
}
|
||||
|
||||
/// Fetches pages for columns if cache is enabled.
|
||||
/// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column.
|
||||
fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
|
||||
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
|
||||
self.column_chunks
|
||||
.iter()
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.filter(|&(idx, chunk)| chunk.is_none() && projection.leaf_included(idx))
|
||||
.for_each(|(idx, _chunk)| {
|
||||
if let Some(cache) = &self.cache_manager {
|
||||
let page_key = PageKey {
|
||||
region_id: self.region_id,
|
||||
file_id: self.file_id,
|
||||
row_group_idx: self.row_group_idx,
|
||||
column_idx: idx,
|
||||
};
|
||||
self.column_cached_pages[idx] = cache.get_pages(&page_key);
|
||||
.filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
|
||||
.for_each(|(idx, chunk)| {
|
||||
let Some(cache) = &self.cache_manager else {
|
||||
return;
|
||||
};
|
||||
let column = self.metadata.column(idx);
|
||||
if cache_uncompressed_pages(column) {
|
||||
// Fetches uncompressed pages for the row group.
|
||||
let page_key = PageKey::new_uncompressed(
|
||||
self.region_id,
|
||||
self.file_id,
|
||||
self.row_group_idx,
|
||||
idx,
|
||||
);
|
||||
self.column_uncompressed_pages[idx] = cache.get_pages(&page_key);
|
||||
} else {
|
||||
// Fetches the compressed page from the cache.
|
||||
let page_key = PageKey::new_compressed(
|
||||
self.region_id,
|
||||
self.file_id,
|
||||
self.row_group_idx,
|
||||
idx,
|
||||
);
|
||||
|
||||
*chunk = cache.get_pages(&page_key).map(|page_value| {
|
||||
Arc::new(ColumnChunkData::Dense {
|
||||
offset: column.byte_range().0 as usize,
|
||||
data: page_value.compressed.clone(),
|
||||
})
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -259,12 +308,12 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
|
||||
/// Creates a page reader to read column at `i`.
|
||||
fn column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
|
||||
if let Some(cached_pages) = &self.column_cached_pages[i] {
|
||||
// Already in cache.
|
||||
return Ok(Box::new(CachedPageReader::new(&cached_pages.pages)));
|
||||
if let Some(cached_pages) = &self.column_uncompressed_pages[i] {
|
||||
debug_assert!(!cached_pages.row_group.is_empty());
|
||||
// Hits the row group level page cache.
|
||||
return Ok(Box::new(RowGroupCachedReader::new(&cached_pages.row_group)));
|
||||
}
|
||||
|
||||
// Cache miss.
|
||||
let page_reader = match &self.column_chunks[i] {
|
||||
None => {
|
||||
return Err(ParquetError::General(format!(
|
||||
@@ -283,25 +332,34 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
};
|
||||
|
||||
let Some(cache) = &self.cache_manager else {
|
||||
// Cache is disabled.
|
||||
return Ok(Box::new(page_reader));
|
||||
};
|
||||
|
||||
// We collect all pages and put them into the cache.
|
||||
let pages = page_reader.collect::<Result<Vec<_>>>()?;
|
||||
let page_value = Arc::new(PageValue::new(pages));
|
||||
let page_key = PageKey {
|
||||
region_id: self.region_id,
|
||||
file_id: self.file_id,
|
||||
row_group_idx: self.row_group_idx,
|
||||
column_idx: i,
|
||||
};
|
||||
cache.put_pages(page_key, page_value.clone());
|
||||
let column = self.metadata.column(i);
|
||||
if cache_uncompressed_pages(column) {
|
||||
// This column use row group level page cache.
|
||||
// We collect all pages and put them into the cache.
|
||||
let pages = page_reader.collect::<Result<Vec<_>>>()?;
|
||||
let page_value = Arc::new(PageValue::new_row_group(pages));
|
||||
let page_key =
|
||||
PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
|
||||
cache.put_pages(page_key, page_value.clone());
|
||||
|
||||
Ok(Box::new(CachedPageReader::new(&page_value.pages)))
|
||||
return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
|
||||
}
|
||||
|
||||
// This column don't cache uncompressed pages.
|
||||
Ok(Box::new(page_reader))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether we cache uncompressed pages for the column.
|
||||
fn cache_uncompressed_pages(column: &ColumnChunkMetaData) -> bool {
|
||||
// If the row group only has a data page, cache the whole row group as
|
||||
// it might be faster than caching a compressed page.
|
||||
column.uncompressed_size() as usize <= DEFAULT_PAGE_SIZE
|
||||
}
|
||||
|
||||
impl<'a> RowGroups for InMemoryRowGroup<'a> {
|
||||
fn num_rows(&self) -> usize {
|
||||
self.row_count
|
||||
@@ -318,7 +376,7 @@ impl<'a> RowGroups for InMemoryRowGroup<'a> {
|
||||
|
||||
/// An in-memory column chunk
|
||||
#[derive(Clone)]
|
||||
enum ColumnChunkData {
|
||||
pub(crate) enum ColumnChunkData {
|
||||
/// Column chunk data representing only a subset of data pages
|
||||
Sparse {
|
||||
/// Length of the full column chunk
|
||||
|
||||
@@ -36,7 +36,7 @@ explain analyze SELECT count(*) FROM system_metrics;
|
||||
|_|_|_CoalescePartitionsExec REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 1_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -34,7 +34,7 @@ select sum(val) from t group by host;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[SUM(t.val)] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_ProjectionExec: expr=[SUM(t.val)@1 as SUM(t.val)] REDACTED
|
||||
|_|_|_AggregateExec: mode=FinalPartitioned, gby=[host@0 as host], aggr=[SUM(t.val)] REDACTED
|
||||
@@ -43,7 +43,7 @@ select sum(val) from t group by host;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[SUM(t.val)] REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
@@ -66,9 +66,9 @@ select sum(val) from t;
|
||||
|_|_|_ProjectionExec: expr=[val@1 as val] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 1_|
|
||||
+-+-+-+
|
||||
@@ -95,9 +95,9 @@ select sum(val) from t group by idc;
|
||||
|_|_|_ProjectionExec: expr=[val@1 as val, idc@3 as idc] REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file ranges) REDACTED
|
||||
| 1_| 1_|_SeqScan: region=REDACTED, partition_count=0 (0 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 0_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -77,7 +77,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
|
||||
|_|_|_CoalescePartitionsExec REDACTED
|
||||
|_|_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 10_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -89,7 +89,7 @@ explain analyze select * from demo where idc='idc1';
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
| 1_| 0_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 2_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -32,7 +32,7 @@ TQL ANALYZE (0, 10, '5s') test;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -63,7 +63,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -93,7 +93,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
@@ -125,7 +125,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges) REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
@@ -48,7 +48,7 @@ explain analyze
|
||||
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[host@1 as host], aggr=[last_value(t.host) ORDER BY [t.ts ASC NULLS LAST], last_value(t.not_pk) ORDER BY [t.ts ASC NULLS LAST], last_value(t.val) ORDER BY [t.ts ASC NULLS LAST]] REDACTED
|
||||
|_|_|_RepartitionExec: REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file ranges), selector=LastRow REDACTED
|
||||
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges), selector=LastRow REDACTED
|
||||
|_|_|_|
|
||||
|_|_| Total rows: 4_|
|
||||
+-+-+-+
|
||||
|
||||
Reference in New Issue
Block a user