fix: implement a CacheStrategy to ensure compaction use cache correctly (#5254)

* feat: impl CacheStrategy

* refactor: replace Option<CacheManagerRef> with CacheStrategy

* feat: add disabled strategy

* ci: force update taplo

* refactor: rename CacheStrategy::Normal to CacheStrategy::EnableAll

* ci: force install cargo-gc-bin

* ci: force install

* chore: use CacheStrategy::Disabled as ScanInput default

* chore: fix compiler errors
This commit is contained in:
Yingwen
2024-12-30 14:24:53 +08:00
committed by GitHub
parent 11bab0c47c
commit f1eb76f489
16 changed files with 316 additions and 156 deletions

View File

@@ -84,7 +84,7 @@ jobs:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
- name: Run taplo
run: taplo format --check
@@ -107,7 +107,7 @@ jobs:
shared-key: "build-binaries"
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin
run: cargo install cargo-gc-bin --force
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
@@ -163,7 +163,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin
cargo +nightly install cargo-fuzz cargo-gc-bin --force
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
@@ -220,7 +220,7 @@ jobs:
shell: bash
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz cargo-gc-bin
cargo install cargo-fuzz cargo-gc-bin --force
- name: Download pre-built binariy
uses: actions/download-artifact@v4
with:
@@ -268,7 +268,7 @@ jobs:
shared-key: "build-greptime-ci"
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin
run: cargo install cargo-gc-bin --force
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
@@ -338,7 +338,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin
cargo +nightly install cargo-fuzz cargo-gc-bin --force
# Downloads ci image
- name: Download pre-built binariy
uses: actions/download-artifact@v4
@@ -487,7 +487,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin
cargo +nightly install cargo-fuzz cargo-gc-bin --force
# Downloads ci image
- name: Download pre-built binariy
uses: actions/download-artifact@v4

View File

@@ -55,6 +55,195 @@ const FILE_TYPE: &str = "file";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
pub enum CacheStrategy {
/// Strategy for normal operations.
/// Doesn't disable any cache.
EnableAll(CacheManagerRef),
/// Strategy for compaction.
/// Disables some caches during compaction to avoid affecting queries.
/// Enables the write cache so that the compaction can read files cached
/// in the write cache and write the compacted files back to the write cache.
Compaction(CacheManagerRef),
/// Do not use any cache.
Disabled,
}
impl CacheStrategy {
/// Calls [CacheManager::get_parquet_meta_data()].
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
}
CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
pub fn get_parquet_meta_data_from_mem_cache(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
}
CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_parquet_meta_data()].
pub fn put_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
metadata: Arc<ParquetMetaData>,
) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
}
CacheStrategy::Disabled => {}
}
}
/// Calls [CacheManager::remove_parquet_meta_data()].
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.remove_parquet_meta_data(region_id, file_id);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.remove_parquet_meta_data(region_id, file_id);
}
CacheStrategy::Disabled => {}
}
}
/// Calls [CacheManager::get_repeated_vector()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_repeated_vector(
&self,
data_type: &ConcreteDataType,
value: &Value,
) -> Option<VectorRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_repeated_vector(data_type, value)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_repeated_vector()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_repeated_vector(value, vector);
}
}
/// Calls [CacheManager::get_pages()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_pages()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_pages(page_key, pages);
}
}
/// Calls [CacheManager::get_selector_result()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_selector_result(
&self,
selector_key: &SelectorResultKey,
) -> Option<Arc<SelectorResultValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_selector_result(selector_key)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_selector_result()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_selector_result(
&self,
selector_key: SelectorResultKey,
result: Arc<SelectorResultValue>,
) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_selector_result(selector_key, result);
}
}
/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::index_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::bloom_filter_index_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::puffin_metadata_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
}
/// Manages cached data for the engine.
///
/// All caches are disabled by default.

View File

@@ -332,7 +332,7 @@ mod tests {
use super::*;
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::CacheManager;
use crate::cache::{CacheManager, CacheStrategy};
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
@@ -495,7 +495,7 @@ mod tests {
// Read metadata from write cache
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(Some(cache_manager.clone()));
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
let reader = builder.build().await.unwrap();
// Check parquet metadata

View File

@@ -43,7 +43,7 @@ use table::predicate::Predicate;
use tokio::sync::mpsc::{self, Sender};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
@@ -573,6 +573,7 @@ pub struct SerializedCompactionOutput {
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: CacheManagerRef,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,
@@ -586,7 +587,8 @@ impl<'a> CompactionSstReaderBuilder<'a> {
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
.with_cache(None)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)

View File

@@ -307,6 +307,7 @@ impl Compactor for DefaultCompactor {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,

View File

@@ -84,6 +84,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{oneshot, Semaphore};
use crate::cache::CacheStrategy;
use crate::config::MitoConfig;
use crate::error::{
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
@@ -428,7 +429,7 @@ impl EngineInner {
version,
region.access_layer.clone(),
request,
Some(cache_manager),
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())

View File

@@ -21,7 +21,7 @@ use datatypes::vectors::UInt32Vector;
use store_api::storage::TimeSeriesRowSelector;
use crate::cache::{
selector_result_cache_hit, selector_result_cache_miss, CacheManagerRef, SelectorResultKey,
selector_result_cache_hit, selector_result_cache_miss, CacheStrategy, SelectorResultKey,
SelectorResultValue,
};
use crate::error::Result;
@@ -86,7 +86,7 @@ impl RowGroupLastRowCachedReader {
pub(crate) fn new(
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
row_group_reader: RowGroupReader,
) -> Self {
let key = SelectorResultKey {
@@ -95,20 +95,17 @@ impl RowGroupLastRowCachedReader {
selector: TimeSeriesRowSelector::LastRow,
};
let Some(cache_manager) = cache_manager else {
return Self::new_miss(key, row_group_reader, None);
};
if let Some(value) = cache_manager.get_selector_result(&key) {
if let Some(value) = cache_strategy.get_selector_result(&key) {
let schema_matches =
value.projection == row_group_reader.read_format().projection_indices();
if schema_matches {
// Schema matches, use cache batches.
Self::new_hit(value)
} else {
Self::new_miss(key, row_group_reader, Some(cache_manager))
Self::new_miss(key, row_group_reader, cache_strategy)
}
} else {
Self::new_miss(key, row_group_reader, Some(cache_manager))
Self::new_miss(key, row_group_reader, cache_strategy)
}
}
@@ -130,13 +127,13 @@ impl RowGroupLastRowCachedReader {
fn new_miss(
key: SelectorResultKey,
row_group_reader: RowGroupReader,
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
) -> Self {
selector_result_cache_miss();
Self::Miss(RowGroupLastRowReader::new(
key,
row_group_reader,
cache_manager,
cache_strategy,
))
}
}
@@ -175,23 +172,19 @@ pub(crate) struct RowGroupLastRowReader {
reader: RowGroupReader,
selector: LastRowSelector,
yielded_batches: Vec<Batch>,
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
/// Index buffer to take a new batch from the last row.
take_index: UInt32Vector,
}
impl RowGroupLastRowReader {
fn new(
key: SelectorResultKey,
reader: RowGroupReader,
cache_manager: Option<CacheManagerRef>,
) -> Self {
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self {
Self {
key,
reader,
selector: LastRowSelector::default(),
yielded_batches: vec![],
cache_manager,
cache_strategy,
take_index: UInt32Vector::from_vec(vec![0]),
}
}
@@ -221,17 +214,15 @@ impl RowGroupLastRowReader {
/// Updates row group's last row cache if cache manager is present.
fn maybe_update_cache(&mut self) {
if let Some(cache) = &self.cache_manager {
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self.reader.read_format().projection_indices().to_vec(),
});
cache.put_selector_result(self.key, value)
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self.reader.read_format().projection_indices().to_vec(),
});
self.cache_strategy.put_selector_result(self.key, value);
}
fn metrics(&self) -> &ReaderMetrics {

View File

@@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use crate::cache::CacheManager;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, Result};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -171,7 +171,7 @@ impl ProjectionMapper {
pub(crate) fn convert(
&self,
batch: &Batch,
cache_manager: Option<&CacheManager>,
cache_strategy: &CacheStrategy,
) -> common_recordbatch::error::Result<RecordBatch> {
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
debug_assert!(self
@@ -204,15 +204,12 @@ impl ProjectionMapper {
match index {
BatchIndex::Tag(idx) => {
let value = &pk_values[*idx];
let vector = match cache_manager {
Some(cache) => repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache,
)?,
None => new_repeated_vector(&column_schema.data_type, value, num_rows)?,
};
let vector = repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache_strategy,
)?;
columns.push(vector);
}
BatchIndex::Timestamp => {
@@ -244,9 +241,9 @@ fn repeated_vector_with_cache(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
cache_manager: &CacheManager,
cache_strategy: &CacheStrategy,
) -> common_recordbatch::error::Result<VectorRef> {
if let Some(vector) = cache_manager.get_repeated_vector(data_type, value) {
if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
// Tries to get the vector from cache manager. If the vector doesn't
// have enough length, creates a new one.
match vector.len().cmp(&num_rows) {
@@ -260,7 +257,7 @@ fn repeated_vector_with_cache(
let vector = new_repeated_vector(data_type, value, num_rows)?;
// Updates cache.
if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
cache_manager.put_repeated_vector(value.clone(), vector.clone());
cache_strategy.put_repeated_vector(value.clone(), vector.clone());
}
Ok(vector)
@@ -284,12 +281,15 @@ fn new_repeated_vector(
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::OpType;
use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::util::pretty;
use datatypes::value::ValueRef;
use super::*;
use crate::cache::CacheManager;
use crate::read::BatchBuilder;
use crate::test_util::meta_util::TestRegionMetadataBuilder;
@@ -359,8 +359,9 @@ mod tests {
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let record_batch = mapper.convert(&batch, &cache).unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
@@ -380,7 +381,7 @@ mod tests {
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none());
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let record_batch = mapper.convert(&batch, &cache).unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}
@@ -401,7 +402,9 @@ mod tests {
);
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let record_batch = mapper.convert(&batch, None).unwrap();
let cache = CacheManager::builder().vector_cache_size(1024).build();
let cache = CacheStrategy::EnableAll(Arc::new(cache));
let record_batch = mapper.convert(&batch, &cache).unwrap();
let expect = "\
+----+----+
| v1 | k0 |

View File

@@ -22,7 +22,7 @@ use parquet::arrow::arrow_reader::RowSelection;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;
use crate::cache::CacheManager;
use crate::cache::CacheStrategy;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
use crate::read::scan_region::ScanInput;
@@ -112,7 +112,7 @@ impl RangeMeta {
Self::push_unordered_file_ranges(
input.memtables.len(),
&input.files,
input.cache_manager.as_deref(),
&input.cache_strategy,
&mut ranges,
);
@@ -203,16 +203,15 @@ impl RangeMeta {
fn push_unordered_file_ranges(
num_memtables: usize,
files: &[FileHandle],
cache: Option<&CacheManager>,
cache: &CacheStrategy,
ranges: &mut Vec<RangeMeta>,
) {
// For append mode, we can parallelize reading row groups.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
// Get parquet meta from the cache.
let parquet_meta = cache.and_then(|c| {
c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id())
});
let parquet_meta =
cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
if let Some(parquet_meta) = parquet_meta {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {

View File

@@ -33,7 +33,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRange;
@@ -171,7 +171,7 @@ pub(crate) struct ScanRegion {
/// Scan request.
request: ScanRequest,
/// Cache.
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Whether to ignore inverted index.
@@ -190,13 +190,13 @@ impl ScanRegion {
version: VersionRef,
access_layer: AccessLayerRef,
request: ScanRequest,
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
) -> ScanRegion {
ScanRegion {
version,
access_layer,
request,
cache_manager,
cache_strategy,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
ignore_inverted_index: false,
ignore_fulltext_index: false,
@@ -357,7 +357,7 @@ impl ScanRegion {
.with_predicate(Some(predicate))
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager)
.with_cache(self.cache_strategy)
.with_inverted_index_applier(inverted_index_applier)
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
@@ -421,23 +421,14 @@ impl ScanRegion {
}
let file_cache = || -> Option<FileCacheRef> {
let cache_manager = self.cache_manager.as_ref()?;
let write_cache = cache_manager.write_cache()?;
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let index_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.index_cache())
.cloned();
let index_cache = self.cache_strategy.index_cache().cloned();
let puffin_metadata_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.puffin_metadata_cache())
.cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
@@ -470,23 +461,14 @@ impl ScanRegion {
}
let file_cache = || -> Option<FileCacheRef> {
let cache_manager = self.cache_manager.as_ref()?;
let write_cache = cache_manager.write_cache()?;
let write_cache = self.cache_strategy.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();
let index_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.bloom_filter_index_cache())
.cloned();
let index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
let puffin_metadata_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.puffin_metadata_cache())
.cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
BloomFilterIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
@@ -550,7 +532,7 @@ pub(crate) struct ScanInput {
/// Handles to SST files to scan.
pub(crate) files: Vec<FileHandle>,
/// Cache.
pub(crate) cache_manager: Option<CacheManagerRef>,
pub(crate) cache_strategy: CacheStrategy,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
@@ -582,7 +564,7 @@ impl ScanInput {
predicate: None,
memtables: Vec::new(),
files: Vec::new(),
cache_manager: None,
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
@@ -626,8 +608,8 @@ impl ScanInput {
/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: Option<CacheManagerRef>) -> Self {
self.cache_manager = cache;
pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
self.cache_strategy = cache;
self
}
@@ -760,7 +742,7 @@ impl ScanInput {
.read_sst(file.clone())
.predicate(self.predicate.clone())
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.cache(self.cache_strategy.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())

View File

@@ -257,7 +257,7 @@ impl SeqScan {
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cache = stream_ctx.input.cache_manager.as_deref();
let cache = &stream_ctx.input.cache_strategy;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]

View File

@@ -148,7 +148,7 @@ impl UnorderedScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let cache = stream_ctx.input.cache_manager.as_deref();
let cache = &stream_ctx.input.cache_strategy;
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),

View File

@@ -95,7 +95,7 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
@@ -195,7 +195,7 @@ mod tests {
.unwrap();
// Enable page cache.
let cache = Some(Arc::new(
let cache = CacheStrategy::EnableAll(Arc::new(
CacheManager::builder()
.page_cache_size(64 * 1024 * 1024)
.build(),
@@ -219,15 +219,15 @@ mod tests {
// Doesn't have compressed page cached.
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
assert!(cache.get_pages(&page_key).is_none());
// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
assert!(cache.get_pages(&page_key).is_some());
}
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
assert!(cache.get_pages(&page_key).is_none());
}
#[tokio::test]

View File

@@ -114,7 +114,7 @@ impl FileRange {
let reader = RowGroupLastRowCachedReader::new(
self.file_handle().file_id(),
self.row_group_idx,
self.context.reader_builder.cache_manager().clone(),
self.context.reader_builder.cache_strategy().clone(),
RowGroupReader::new(self.context.clone(), parquet_reader),
);
PruneReader::new_with_last_row_reader(self.context.clone(), reader)

View File

@@ -38,7 +38,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::cache::CacheManagerRef;
use crate::cache::CacheStrategy;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
ReadParquetSnafu, Result,
@@ -77,8 +77,8 @@ pub struct ParquetReaderBuilder {
/// `None` reads all columns. Due to schema change, the projection
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
/// Manager that caches SST data.
cache_manager: Option<CacheManagerRef>,
/// Strategy to cache SST data.
cache_strategy: CacheStrategy,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
@@ -102,7 +102,7 @@ impl ParquetReaderBuilder {
object_store,
predicate: None,
projection: None,
cache_manager: None,
cache_strategy: CacheStrategy::Disabled,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
@@ -128,8 +128,8 @@ impl ParquetReaderBuilder {
/// Attaches the cache to the builder.
#[must_use]
pub fn cache(mut self, cache: Option<CacheManagerRef>) -> ParquetReaderBuilder {
self.cache_manager = cache;
pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder {
self.cache_strategy = cache;
self
}
@@ -234,7 +234,7 @@ impl ParquetReaderBuilder {
object_store: self.object_store.clone(),
projection: projection_mask,
field_levels,
cache_manager: self.cache_manager.clone(),
cache_strategy: self.cache_strategy.clone(),
};
let filters = if let Some(predicate) = &self.predicate {
@@ -308,10 +308,12 @@ impl ParquetReaderBuilder {
let region_id = self.file_handle.region_id();
let file_id = self.file_handle.file_id();
// Tries to get from global cache.
if let Some(manager) = &self.cache_manager {
if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await {
return Ok(metadata);
}
if let Some(metadata) = self
.cache_strategy
.get_parquet_meta_data(region_id, file_id)
.await
{
return Ok(metadata);
}
// Cache miss, load metadata directly.
@@ -319,13 +321,11 @@ impl ParquetReaderBuilder {
let metadata = metadata_loader.load().await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
if let Some(cache) = &self.cache_manager {
cache.put_parquet_meta_data(
self.file_handle.region_id(),
self.file_handle.file_id(),
metadata.clone(),
);
}
self.cache_strategy.put_parquet_meta_data(
self.file_handle.region_id(),
self.file_handle.file_id(),
metadata.clone(),
);
Ok(metadata)
}
@@ -857,7 +857,7 @@ pub(crate) struct RowGroupReaderBuilder {
/// Field levels to read.
field_levels: FieldLevels,
/// Cache.
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
}
impl RowGroupReaderBuilder {
@@ -875,8 +875,8 @@ impl RowGroupReaderBuilder {
&self.parquet_meta
}
pub(crate) fn cache_manager(&self) -> &Option<CacheManagerRef> {
&self.cache_manager
pub(crate) fn cache_strategy(&self) -> &CacheStrategy {
&self.cache_strategy
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
@@ -890,7 +890,7 @@ impl RowGroupReaderBuilder {
self.file_handle.file_id(),
&self.parquet_meta,
row_group_idx,
self.cache_manager.clone(),
self.cache_strategy.clone(),
&self.file_path,
self.object_store.clone(),
);

View File

@@ -32,7 +32,7 @@ use store_api::storage::RegionId;
use tokio::task::yield_now;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::{CacheManagerRef, PageKey, PageValue};
use crate::cache::{CacheStrategy, PageKey, PageValue};
use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES};
use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
@@ -223,7 +223,7 @@ pub struct InMemoryRowGroup<'a> {
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
file_path: &'a str,
/// Object store.
object_store: ObjectStore,
@@ -240,7 +240,7 @@ impl<'a> InMemoryRowGroup<'a> {
file_id: FileId,
parquet_meta: &'a ParquetMetaData,
row_group_idx: usize,
cache_manager: Option<CacheManagerRef>,
cache_strategy: CacheStrategy,
file_path: &'a str,
object_store: ObjectStore,
) -> Self {
@@ -249,7 +249,7 @@ impl<'a> InMemoryRowGroup<'a> {
region_id,
file_id,
row_group_idx,
cache_manager,
cache_strategy,
file_path,
object_store,
}
@@ -293,21 +293,19 @@ impl<'a> InMemoryRowGroup<'a> {
let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data);
// Put fetched data to cache if necessary.
if let Some(cache) = &self.cache_manager {
for (col_idx, data) in assigned_columns {
let column = self.base.metadata.column(col_idx);
if !cache_uncompressed_pages(column) {
// For columns that have multiple uncompressed pages, we only cache the compressed page
// to save memory.
let page_key = PageKey::new_compressed(
self.region_id,
self.file_id,
self.row_group_idx,
col_idx,
);
cache
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
}
for (col_idx, data) in assigned_columns {
let column = self.base.metadata.column(col_idx);
if !cache_uncompressed_pages(column) {
// For columns that have multiple uncompressed pages, we only cache the compressed page
// to save memory.
let page_key = PageKey::new_compressed(
self.region_id,
self.file_id,
self.row_group_idx,
col_idx,
);
self.cache_strategy
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
}
}
}
@@ -325,9 +323,6 @@ impl<'a> InMemoryRowGroup<'a> {
.enumerate()
.filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
.for_each(|(idx, chunk)| {
let Some(cache) = &self.cache_manager else {
return;
};
let column = self.base.metadata.column(idx);
if cache_uncompressed_pages(column) {
// Fetches uncompressed pages for the row group.
@@ -337,7 +332,8 @@ impl<'a> InMemoryRowGroup<'a> {
self.row_group_idx,
idx,
);
self.base.column_uncompressed_pages[idx] = cache.get_pages(&page_key);
self.base.column_uncompressed_pages[idx] =
self.cache_strategy.get_pages(&page_key);
} else {
// Fetches the compressed page from the cache.
let page_key = PageKey::new_compressed(
@@ -347,7 +343,7 @@ impl<'a> InMemoryRowGroup<'a> {
idx,
);
*chunk = cache.get_pages(&page_key).map(|page_value| {
*chunk = self.cache_strategy.get_pages(&page_key).map(|page_value| {
Arc::new(ColumnChunkData::Dense {
offset: column.byte_range().0 as usize,
data: page_value.compressed.clone(),
@@ -383,7 +379,7 @@ impl<'a> InMemoryRowGroup<'a> {
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if let Some(cache) = self.cache_manager.as_ref()?.write_cache() {
if let Some(cache) = self.cache_strategy.write_cache() {
return cache.file_cache().read_ranges(key, ranges).await;
}
None
@@ -399,10 +395,6 @@ impl<'a> InMemoryRowGroup<'a> {
let page_reader = self.base.column_reader(i)?;
let Some(cache) = &self.cache_manager else {
return Ok(Box::new(page_reader));
};
let column = self.base.metadata.column(i);
if cache_uncompressed_pages(column) {
// This column use row group level page cache.
@@ -411,7 +403,7 @@ impl<'a> InMemoryRowGroup<'a> {
let page_value = Arc::new(PageValue::new_row_group(pages));
let page_key =
PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
cache.put_pages(page_key, page_value.clone());
self.cache_strategy.put_pages(page_key, page_value.clone());
return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
}