mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
refactor: Avoid wrapping Option for CacheManagerRef (#4996)
This commit is contained in:
2
src/mito2/src/cache/write_cache.rs
vendored
2
src/mito2/src/cache/write_cache.rs
vendored
@@ -501,7 +501,7 @@ mod tests {
|
||||
|
||||
// Read metadata from write cache
|
||||
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
|
||||
.cache(Some(cache_manager.clone()));
|
||||
.cache(cache_manager.clone());
|
||||
let reader = builder.build().await.unwrap();
|
||||
|
||||
// Check parquet metadata
|
||||
|
||||
@@ -562,7 +562,7 @@ pub struct SerializedCompactionOutput {
|
||||
struct CompactionSstReaderBuilder<'a> {
|
||||
metadata: RegionMetadataRef,
|
||||
sst_layer: AccessLayerRef,
|
||||
cache: Option<CacheManagerRef>,
|
||||
cache: CacheManagerRef,
|
||||
inputs: &'a [FileHandle],
|
||||
append_mode: bool,
|
||||
filter_deleted: bool,
|
||||
|
||||
@@ -295,7 +295,7 @@ impl Compactor for DefaultCompactor {
|
||||
let reader = CompactionSstReaderBuilder {
|
||||
metadata: region_metadata.clone(),
|
||||
sst_layer: sst_layer.clone(),
|
||||
cache: Some(cache_manager.clone()),
|
||||
cache: cache_manager.clone(),
|
||||
inputs: &output.inputs,
|
||||
append_mode,
|
||||
filter_deleted: output.filter_deleted,
|
||||
|
||||
@@ -438,16 +438,12 @@ impl EngineInner {
|
||||
channel_size: self.config.parallel_scan_channel_size,
|
||||
};
|
||||
|
||||
let scan_region = ScanRegion::new(
|
||||
version,
|
||||
region.access_layer.clone(),
|
||||
request,
|
||||
Some(cache_manager),
|
||||
)
|
||||
.with_parallelism(scan_parallelism)
|
||||
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
|
||||
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
|
||||
.with_start_time(query_start);
|
||||
let scan_region =
|
||||
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
|
||||
.with_parallelism(scan_parallelism)
|
||||
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
|
||||
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
|
||||
.with_start_time(query_start);
|
||||
|
||||
Ok(scan_region)
|
||||
}
|
||||
|
||||
@@ -85,7 +85,7 @@ impl RowGroupLastRowCachedReader {
|
||||
pub(crate) fn new(
|
||||
file_id: FileId,
|
||||
row_group_idx: usize,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
row_group_reader: RowGroupReader,
|
||||
) -> Self {
|
||||
let key = SelectorResultKey {
|
||||
@@ -94,9 +94,6 @@ impl RowGroupLastRowCachedReader {
|
||||
selector: TimeSeriesRowSelector::LastRow,
|
||||
};
|
||||
|
||||
let Some(cache_manager) = cache_manager else {
|
||||
return Self::new_miss(key, row_group_reader, None);
|
||||
};
|
||||
if let Some(value) = cache_manager.get_selector_result(&key) {
|
||||
let schema_matches = value.projection
|
||||
== row_group_reader
|
||||
|
||||
@@ -171,7 +171,7 @@ impl ProjectionMapper {
|
||||
pub(crate) fn convert(
|
||||
&self,
|
||||
batch: &Batch,
|
||||
cache_manager: Option<&CacheManager>,
|
||||
cache_manager: &CacheManager,
|
||||
) -> common_recordbatch::error::Result<RecordBatch> {
|
||||
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
|
||||
debug_assert!(self
|
||||
@@ -204,15 +204,12 @@ impl ProjectionMapper {
|
||||
match index {
|
||||
BatchIndex::Tag(idx) => {
|
||||
let value = &pk_values[*idx];
|
||||
let vector = match cache_manager {
|
||||
Some(cache) => repeated_vector_with_cache(
|
||||
&column_schema.data_type,
|
||||
value,
|
||||
num_rows,
|
||||
cache,
|
||||
)?,
|
||||
None => new_repeated_vector(&column_schema.data_type, value, num_rows)?,
|
||||
};
|
||||
let vector = repeated_vector_with_cache(
|
||||
&column_schema.data_type,
|
||||
value,
|
||||
num_rows,
|
||||
cache_manager,
|
||||
)?;
|
||||
columns.push(vector);
|
||||
}
|
||||
BatchIndex::Timestamp => {
|
||||
@@ -360,7 +357,7 @@ mod tests {
|
||||
// With vector cache.
|
||||
let cache = CacheManager::builder().vector_cache_size(1024).build();
|
||||
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
|
||||
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+---------------------+----+----+----+----+
|
||||
| ts | k0 | k1 | v0 | v1 |
|
||||
@@ -380,7 +377,7 @@ mod tests {
|
||||
assert!(cache
|
||||
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
|
||||
.is_none());
|
||||
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
assert_eq!(expect, print_record_batch(record_batch));
|
||||
}
|
||||
|
||||
@@ -401,7 +398,8 @@ mod tests {
|
||||
);
|
||||
|
||||
let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
|
||||
let record_batch = mapper.convert(&batch, None).unwrap();
|
||||
let cache = CacheManager::builder().vector_cache_size(1024).build();
|
||||
let record_batch = mapper.convert(&batch, &cache).unwrap();
|
||||
let expect = "\
|
||||
+----+----+
|
||||
| v1 | k0 |
|
||||
|
||||
@@ -90,7 +90,7 @@ impl RangeMeta {
|
||||
Self::push_unordered_file_ranges(
|
||||
input.memtables.len(),
|
||||
&input.files,
|
||||
input.cache_manager.as_deref(),
|
||||
&input.cache_manager,
|
||||
&mut ranges,
|
||||
);
|
||||
|
||||
@@ -172,16 +172,15 @@ impl RangeMeta {
|
||||
fn push_unordered_file_ranges(
|
||||
num_memtables: usize,
|
||||
files: &[FileHandle],
|
||||
cache: Option<&CacheManager>,
|
||||
cache: &CacheManager,
|
||||
ranges: &mut Vec<RangeMeta>,
|
||||
) {
|
||||
// For append mode, we can parallelize reading row groups.
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
let file_index = num_memtables + i;
|
||||
// Get parquet meta from the cache.
|
||||
let parquet_meta = cache.and_then(|c| {
|
||||
c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id())
|
||||
});
|
||||
let parquet_meta =
|
||||
cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
|
||||
if let Some(parquet_meta) = parquet_meta {
|
||||
// Scans each row group.
|
||||
for row_group_index in 0..file.meta_ref().num_row_groups {
|
||||
|
||||
@@ -167,7 +167,7 @@ pub(crate) struct ScanRegion {
|
||||
/// Scan request.
|
||||
request: ScanRequest,
|
||||
/// Cache.
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
/// Parallelism to scan.
|
||||
parallelism: ScanParallelism,
|
||||
/// Whether to ignore inverted index.
|
||||
@@ -184,7 +184,7 @@ impl ScanRegion {
|
||||
version: VersionRef,
|
||||
access_layer: AccessLayerRef,
|
||||
request: ScanRequest,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
) -> ScanRegion {
|
||||
ScanRegion {
|
||||
version,
|
||||
@@ -381,17 +381,12 @@ impl ScanRegion {
|
||||
}
|
||||
|
||||
let file_cache = || -> Option<FileCacheRef> {
|
||||
let cache_manager = self.cache_manager.as_ref()?;
|
||||
let write_cache = cache_manager.write_cache()?;
|
||||
let write_cache = self.cache_manager.write_cache()?;
|
||||
let file_cache = write_cache.file_cache();
|
||||
Some(file_cache)
|
||||
}();
|
||||
|
||||
let index_cache = self
|
||||
.cache_manager
|
||||
.as_ref()
|
||||
.and_then(|c| c.index_cache())
|
||||
.cloned();
|
||||
let index_cache = self.cache_manager.index_cache().cloned();
|
||||
|
||||
InvertedIndexApplierBuilder::new(
|
||||
self.access_layer.region_dir().to_string(),
|
||||
@@ -471,7 +466,7 @@ pub(crate) struct ScanInput {
|
||||
/// Handles to SST files to scan.
|
||||
pub(crate) files: Vec<FileHandle>,
|
||||
/// Cache.
|
||||
pub(crate) cache_manager: Option<CacheManagerRef>,
|
||||
pub(crate) cache_manager: CacheManagerRef,
|
||||
/// Ignores file not found error.
|
||||
ignore_file_not_found: bool,
|
||||
/// Parallelism to scan data.
|
||||
@@ -502,7 +497,7 @@ impl ScanInput {
|
||||
predicate: None,
|
||||
memtables: Vec::new(),
|
||||
files: Vec::new(),
|
||||
cache_manager: None,
|
||||
cache_manager: CacheManagerRef::default(),
|
||||
ignore_file_not_found: false,
|
||||
parallelism: ScanParallelism::default(),
|
||||
inverted_index_applier: None,
|
||||
@@ -545,7 +540,7 @@ impl ScanInput {
|
||||
|
||||
/// Sets cache for this query.
|
||||
#[must_use]
|
||||
pub(crate) fn with_cache(mut self, cache: Option<CacheManagerRef>) -> Self {
|
||||
pub(crate) fn with_cache(mut self, cache: CacheManagerRef) -> Self {
|
||||
self.cache_manager = cache;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -229,7 +229,7 @@ impl SeqScan {
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
let cache = stream_ctx.input.cache_manager.as_deref();
|
||||
let cache = &stream_ctx.input.cache_manager;
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
let mut fetch_start = Instant::now();
|
||||
#[cfg(debug_assertions)]
|
||||
|
||||
@@ -135,7 +135,7 @@ impl UnorderedScan {
|
||||
let stream = try_stream! {
|
||||
part_metrics.on_first_poll();
|
||||
|
||||
let cache = stream_ctx.input.cache_manager.as_deref();
|
||||
let cache = &stream_ctx.input.cache_manager;
|
||||
// Scans each part.
|
||||
for part_range in part_ranges {
|
||||
let mut metrics = ScannerMetrics::default();
|
||||
|
||||
@@ -195,11 +195,11 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Enable page cache.
|
||||
let cache = Some(Arc::new(
|
||||
let cache = Arc::new(
|
||||
CacheManager::builder()
|
||||
.page_cache_size(64 * 1024 * 1024)
|
||||
.build(),
|
||||
));
|
||||
);
|
||||
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
|
||||
.cache(cache.clone());
|
||||
for _ in 0..3 {
|
||||
@@ -219,15 +219,15 @@ mod tests {
|
||||
|
||||
// Doesn't have compressed page cached.
|
||||
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
|
||||
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
|
||||
assert!(cache.get_pages(&page_key).is_none());
|
||||
|
||||
// Cache 4 row groups.
|
||||
for i in 0..4 {
|
||||
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
|
||||
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
|
||||
assert!(cache.get_pages(&page_key).is_some());
|
||||
}
|
||||
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
|
||||
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
|
||||
assert!(cache.get_pages(&page_key).is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -82,7 +82,7 @@ pub struct ParquetReaderBuilder {
|
||||
/// can contain columns not in the parquet file.
|
||||
projection: Option<Vec<ColumnId>>,
|
||||
/// Manager that caches SST data.
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
/// Index appliers.
|
||||
inverted_index_applier: Option<InvertedIndexApplierRef>,
|
||||
fulltext_index_applier: Option<FulltextIndexApplierRef>,
|
||||
@@ -106,7 +106,7 @@ impl ParquetReaderBuilder {
|
||||
predicate: None,
|
||||
time_range: None,
|
||||
projection: None,
|
||||
cache_manager: None,
|
||||
cache_manager: CacheManagerRef::default(),
|
||||
inverted_index_applier: None,
|
||||
fulltext_index_applier: None,
|
||||
expected_metadata: None,
|
||||
@@ -138,7 +138,7 @@ impl ParquetReaderBuilder {
|
||||
|
||||
/// Attaches the cache to the builder.
|
||||
#[must_use]
|
||||
pub fn cache(mut self, cache: Option<CacheManagerRef>) -> ParquetReaderBuilder {
|
||||
pub fn cache(mut self, cache: CacheManagerRef) -> ParquetReaderBuilder {
|
||||
self.cache_manager = cache;
|
||||
self
|
||||
}
|
||||
@@ -313,10 +313,12 @@ impl ParquetReaderBuilder {
|
||||
let region_id = self.file_handle.region_id();
|
||||
let file_id = self.file_handle.file_id();
|
||||
// Tries to get from global cache.
|
||||
if let Some(manager) = &self.cache_manager {
|
||||
if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await {
|
||||
return Ok(metadata);
|
||||
}
|
||||
if let Some(metadata) = self
|
||||
.cache_manager
|
||||
.get_parquet_meta_data(region_id, file_id)
|
||||
.await
|
||||
{
|
||||
return Ok(metadata);
|
||||
}
|
||||
|
||||
// Cache miss, load metadata directly.
|
||||
@@ -324,13 +326,11 @@ impl ParquetReaderBuilder {
|
||||
let metadata = metadata_loader.load().await?;
|
||||
let metadata = Arc::new(metadata);
|
||||
// Cache the metadata.
|
||||
if let Some(cache) = &self.cache_manager {
|
||||
cache.put_parquet_meta_data(
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
metadata.clone(),
|
||||
);
|
||||
}
|
||||
self.cache_manager.put_parquet_meta_data(
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
metadata.clone(),
|
||||
);
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
@@ -846,7 +846,7 @@ pub(crate) struct RowGroupReaderBuilder {
|
||||
/// Field levels to read.
|
||||
field_levels: FieldLevels,
|
||||
/// Cache.
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
}
|
||||
|
||||
impl RowGroupReaderBuilder {
|
||||
@@ -864,7 +864,7 @@ impl RowGroupReaderBuilder {
|
||||
&self.parquet_meta
|
||||
}
|
||||
|
||||
pub(crate) fn cache_manager(&self) -> &Option<CacheManagerRef> {
|
||||
pub(crate) fn cache_manager(&self) -> &CacheManagerRef {
|
||||
&self.cache_manager
|
||||
}
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ pub struct InMemoryRowGroup<'a> {
|
||||
region_id: RegionId,
|
||||
file_id: FileId,
|
||||
row_group_idx: usize,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
/// Row group level cached pages for each column.
|
||||
///
|
||||
/// These pages are uncompressed pages of a row group.
|
||||
@@ -69,7 +69,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
file_id: FileId,
|
||||
parquet_meta: &'a ParquetMetaData,
|
||||
row_group_idx: usize,
|
||||
cache_manager: Option<CacheManagerRef>,
|
||||
cache_manager: CacheManagerRef,
|
||||
file_path: &'a str,
|
||||
object_store: ObjectStore,
|
||||
) -> Self {
|
||||
@@ -208,19 +208,18 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
};
|
||||
|
||||
let column = self.metadata.column(idx);
|
||||
if let Some(cache) = &self.cache_manager {
|
||||
if !cache_uncompressed_pages(column) {
|
||||
// For columns that have multiple uncompressed pages, we only cache the compressed page
|
||||
// to save memory.
|
||||
let page_key = PageKey::new_compressed(
|
||||
self.region_id,
|
||||
self.file_id,
|
||||
self.row_group_idx,
|
||||
idx,
|
||||
);
|
||||
cache
|
||||
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
|
||||
}
|
||||
|
||||
if !cache_uncompressed_pages(column) {
|
||||
// For columns that have multiple uncompressed pages, we only cache the compressed page
|
||||
// to save memory.
|
||||
let page_key = PageKey::new_compressed(
|
||||
self.region_id,
|
||||
self.file_id,
|
||||
self.row_group_idx,
|
||||
idx,
|
||||
);
|
||||
self.cache_manager
|
||||
.put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone())));
|
||||
}
|
||||
|
||||
*chunk = Some(Arc::new(ColumnChunkData::Dense {
|
||||
@@ -242,9 +241,6 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
.enumerate()
|
||||
.filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx))
|
||||
.for_each(|(idx, chunk)| {
|
||||
let Some(cache) = &self.cache_manager else {
|
||||
return;
|
||||
};
|
||||
let column = self.metadata.column(idx);
|
||||
if cache_uncompressed_pages(column) {
|
||||
// Fetches uncompressed pages for the row group.
|
||||
@@ -254,7 +250,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
self.row_group_idx,
|
||||
idx,
|
||||
);
|
||||
self.column_uncompressed_pages[idx] = cache.get_pages(&page_key);
|
||||
self.column_uncompressed_pages[idx] = self.cache_manager.get_pages(&page_key);
|
||||
} else {
|
||||
// Fetches the compressed page from the cache.
|
||||
let page_key = PageKey::new_compressed(
|
||||
@@ -264,7 +260,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
idx,
|
||||
);
|
||||
|
||||
*chunk = cache.get_pages(&page_key).map(|page_value| {
|
||||
*chunk = self.cache_manager.get_pages(&page_key).map(|page_value| {
|
||||
Arc::new(ColumnChunkData::Dense {
|
||||
offset: column.byte_range().0 as usize,
|
||||
data: page_value.compressed.clone(),
|
||||
@@ -300,7 +296,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
key: IndexKey,
|
||||
ranges: &[Range<u64>],
|
||||
) -> Option<Vec<Bytes>> {
|
||||
if let Some(cache) = self.cache_manager.as_ref()?.write_cache() {
|
||||
if let Some(cache) = self.cache_manager.write_cache() {
|
||||
return cache.file_cache().read_ranges(key, ranges).await;
|
||||
}
|
||||
None
|
||||
@@ -331,10 +327,6 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
}
|
||||
};
|
||||
|
||||
let Some(cache) = &self.cache_manager else {
|
||||
return Ok(Box::new(page_reader));
|
||||
};
|
||||
|
||||
let column = self.metadata.column(i);
|
||||
if cache_uncompressed_pages(column) {
|
||||
// This column use row group level page cache.
|
||||
@@ -343,7 +335,7 @@ impl<'a> InMemoryRowGroup<'a> {
|
||||
let page_value = Arc::new(PageValue::new_row_group(pages));
|
||||
let page_key =
|
||||
PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i);
|
||||
cache.put_pages(page_key, page_value.clone());
|
||||
self.cache_manager.put_pages(page_key, page_value.clone());
|
||||
|
||||
return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group)));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user