feat(mito): Add cache manager (#2488)

* feat: add cache manager

* feat: add cache to reader builder

* feat: add AsyncFileReaderCache

* feat: Impl AsyncFileReaderCache

* chore: move moka dep to workspace

* feat: add moka cache to the manager

* feat: implement parquet meta cache

* test: test cache manager

* feat: consider vec size

* style: fix clippy

* test: fix config api test

* feat: divide cache

* test: test disabling meta cache

* test: fix config api test

* feat: remove meta cache if file is purged
This commit is contained in:
Yingwen
2023-09-26 19:46:19 +08:00
committed by GitHub
parent 515ce825bd
commit a6116bb866
23 changed files with 516 additions and 52 deletions

35
Cargo.lock generated
View File

@@ -1269,7 +1269,7 @@ dependencies = [
"log-store",
"meta-client",
"metrics",
"moka 0.11.3",
"moka",
"object-store",
"parking_lot 0.12.1",
"partition",
@@ -1548,7 +1548,7 @@ dependencies = [
"derive_builder 0.12.0",
"enum_dispatch",
"futures-util",
"moka 0.9.9",
"moka",
"parking_lot 0.12.1",
"prost",
"rand",
@@ -3369,7 +3369,7 @@ dependencies = [
"meta-client",
"meta-srv",
"metrics",
"moka 0.9.9",
"moka",
"object-store",
"openmetrics-parser",
"opentelemetry-proto",
@@ -5577,6 +5577,7 @@ dependencies = [
"log-store",
"memcomparable",
"metrics",
"moka",
"object-store",
"parquet",
"paste",
@@ -5595,32 +5596,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "moka"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b28455ac4363046076054a7e9cfbd7f168019c29dba32a625f59fc0aeffaaea4"
dependencies = [
"async-io",
"async-lock",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"num_cpus",
"once_cell",
"parking_lot 0.12.1",
"quanta 0.11.1",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid",
]
[[package]]
name = "moka"
version = "0.11.3"
@@ -6539,7 +6514,7 @@ dependencies = [
"datafusion-expr",
"datatypes",
"meta-client",
"moka 0.9.9",
"moka",
"serde",
"serde_json",
"snafu",

View File

@@ -82,6 +82,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
moka = { version = "0.11" }
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "43.0"

View File

@@ -31,7 +31,7 @@ futures-util.workspace = true
lazy_static.workspace = true
meta-client = { workspace = true }
metrics.workspace = true
moka = { version = "0.11", features = ["future"] }
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
partition.workspace = true
regex.workspace = true

View File

@@ -27,7 +27,7 @@ datatypes = { workspace = true }
derive_builder.workspace = true
enum_dispatch = "0.3"
futures-util.workspace = true
moka = { version = "0.9", features = ["future"] }
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true

View File

@@ -51,7 +51,7 @@ meta-client = { workspace = true }
raft-engine = { workspace = true }
# Although it is not used, please do not delete it.
metrics.workspace = true
moka = { version = "0.9", features = ["future"] }
moka = { workspace = true, features = ["future"] }
object-store = { workspace = true }
openmetrics-parser = "0.4"
opentelemetry-proto.workspace = true

View File

@@ -40,6 +40,7 @@ humantime-serde = { workspace = true }
lazy_static = "1.4"
memcomparable = "0.2"
metrics.workspace = true
moka.workspace = true
object-store = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true

131
src/mito2/src/cache.rs Normal file
View File

@@ -0,0 +1,131 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Cache for the engine.
mod cache_size;
#[cfg(test)]
pub(crate) mod test_util;
use std::mem;
use std::sync::Arc;
use moka::sync::Cache;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;
use crate::cache::cache_size::parquet_meta_size;
use crate::sst::file::FileId;
/// Manages cached data for the engine.
pub struct CacheManager {
/// Cache for SST metadata.
sst_meta_cache: Option<SstMetaCache>,
}
pub type CacheManagerRef = Arc<CacheManager>;
impl CacheManager {
/// Creates a new manager with specific cache size in bytes.
pub fn new(sst_meta_cache_size: u64) -> CacheManager {
let sst_meta_cache = if sst_meta_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(sst_meta_cache_size)
.weigher(|k: &SstMetaKey, v: &Arc<ParquetMetaData>| {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
})
.build();
Some(cache)
};
CacheManager { sst_meta_cache }
}
/// Gets cached [ParquetMetaData].
pub fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache
.as_ref()
.and_then(|sst_meta_cache| sst_meta_cache.get(&SstMetaKey(region_id, file_id)))
}
/// Puts [ParquetMetaData] into the cache.
pub fn put_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
metadata: Arc<ParquetMetaData>,
) {
if let Some(cache) = &self.sst_meta_cache {
cache.insert(SstMetaKey(region_id, file_id), metadata);
}
}
/// Removes [ParquetMetaData] from the cache.
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
if let Some(cache) = &self.sst_meta_cache {
cache.remove(&SstMetaKey(region_id, file_id));
}
}
}
/// Cache key for SST meta.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SstMetaKey(RegionId, FileId);
impl SstMetaKey {
/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<SstMetaKey>()
}
}
type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::test_util::parquet_meta;
#[test]
fn test_disable_meta_cache() {
let cache = CacheManager::new(0);
assert!(cache.sst_meta_cache.is_none());
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
}
#[test]
fn test_parquet_meta_cache() {
let cache = CacheManager::new(2000);
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_some());
cache.remove_parquet_meta_data(region_id, file_id);
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
}
}

142
src/mito2/src/cache/cache_size.rs vendored Normal file
View File

@@ -0,0 +1,142 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Cache size of different cache value.
use std::mem;
use parquet::file::metadata::{
FileMetaData, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex, RowGroupMetaData,
};
use parquet::file::page_index::index::Index;
use parquet::format::{ColumnOrder, KeyValue, PageLocation};
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor, Type};
/// Returns estimated size of [ParquetMetaData].
pub fn parquet_meta_size(meta: &ParquetMetaData) -> usize {
// struct size
let mut size = mem::size_of::<ParquetMetaData>();
// file_metadata
size += file_meta_heap_size(meta.file_metadata());
// row_groups
size += meta
.row_groups()
.iter()
.map(row_group_meta_heap_size)
.sum::<usize>();
// column_index
size += meta
.column_index()
.map(parquet_column_index_heap_size)
.unwrap_or(0);
// offset_index
size += meta
.offset_index()
.map(parquet_offset_index_heap_size)
.unwrap_or(0);
size
}
/// Returns estimated size of [FileMetaData] allocated from heap.
fn file_meta_heap_size(meta: &FileMetaData) -> usize {
// created_by
let mut size = meta.created_by().map(|s| s.len()).unwrap_or(0);
// key_value_metadata
size += meta
.key_value_metadata()
.map(|kvs| {
kvs.iter()
.map(|kv| {
kv.key.len()
+ kv.value.as_ref().map(|v| v.len()).unwrap_or(0)
+ mem::size_of::<KeyValue>()
})
.sum()
})
.unwrap_or(0);
// schema_descr (It's a ptr so we also add size of SchemaDescriptor).
size += mem::size_of::<SchemaDescriptor>();
size += schema_descr_heap_size(meta.schema_descr());
// column_orders
size += meta
.column_orders()
.map(|orders| orders.len() * mem::size_of::<ColumnOrder>())
.unwrap_or(0);
size
}
/// Returns estimated size of [SchemaDescriptor] allocated from heap.
fn schema_descr_heap_size(descr: &SchemaDescriptor) -> usize {
// schema
let mut size = mem::size_of::<Type>();
// leaves
size += descr
.columns()
.iter()
.map(|descr| mem::size_of::<ColumnDescriptor>() + column_descr_heap_size(descr))
.sum::<usize>();
// leaf_to_base
size += descr.num_columns() * mem::size_of::<usize>();
size
}
/// Returns estimated size of [ColumnDescriptor] allocated from heap.
fn column_descr_heap_size(descr: &ColumnDescriptor) -> usize {
descr.path().parts().iter().map(|s| s.len()).sum()
}
/// Returns estimated size of [ColumnDescriptor] allocated from heap.
fn row_group_meta_heap_size(meta: &RowGroupMetaData) -> usize {
mem::size_of_val(meta.columns())
}
/// Returns estimated size of [ParquetColumnIndex] allocated from heap.
fn parquet_column_index_heap_size(column_index: &ParquetColumnIndex) -> usize {
column_index
.iter()
.map(|row_group| row_group.len() * mem::size_of::<Index>() + mem::size_of_val(row_group))
.sum()
}
/// Returns estimated size of [ParquetOffsetIndex] allocated from heap.
fn parquet_offset_index_heap_size(offset_index: &ParquetOffsetIndex) -> usize {
offset_index
.iter()
.map(|row_group| {
row_group
.iter()
.map(|column| {
column.len() * mem::size_of::<PageLocation>() + mem::size_of_val(column)
})
.sum::<usize>()
+ mem::size_of_val(row_group)
})
.sum()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::test_util::parquet_meta;
#[test]
fn test_parquet_meta_size() {
let metadata = parquet_meta();
assert_eq!(948, parquet_meta_size(&metadata));
}
}

44
src/mito2/src/cache/test_util.rs vendored Normal file
View File

@@ -0,0 +1,44 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for testing cache.
use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::ParquetMetaData;
/// Returns a parquet meta data.
pub(crate) fn parquet_meta() -> Arc<ParquetMetaData> {
let file_data = parquet_file_data();
let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(file_data)).unwrap();
builder.metadata().clone()
}
/// Write a test parquet file to a buffer
fn parquet_file_data() -> Vec<u8> {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
writer.write(&to_write).unwrap();
writer.close().unwrap();
buffer
}

View File

@@ -56,6 +56,10 @@ pub struct MitoConfig {
pub global_write_buffer_size: ReadableSize,
/// Global write buffer size threshold to reject write requests (default 2G).
pub global_write_buffer_reject_size: ReadableSize,
// Cache configs:
/// Cache size for SST metadata (default 128MB). Setting it to 0 to disable cache.
pub sst_meta_cache_size: ReadableSize,
}
impl Default for MitoConfig {
@@ -70,6 +74,7 @@ impl Default for MitoConfig {
auto_flush_interval: Duration::from_secs(30 * 60),
global_write_buffer_size: ReadableSize::gb(1),
global_write_buffer_reject_size: ReadableSize::gb(2),
sst_meta_cache_size: ReadableSize::mb(128),
}
}
}

View File

@@ -144,7 +144,14 @@ impl EngineInner {
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
let version = region.version();
let scan_region = ScanRegion::new(version, region.access_layer.clone(), request);
// Get cache.
let cache_manager = self.workers.cache_manager();
let scan_region = ScanRegion::new(
version,
region.access_layer.clone(),
request,
Some(cache_manager),
);
scan_region.scanner()
}

View File

@@ -22,6 +22,7 @@
pub mod test_util;
mod access_layer;
mod cache;
mod compaction;
pub mod config;
pub mod engine;

View File

@@ -22,6 +22,7 @@ use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::{BuildPredicateSnafu, Result};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
@@ -113,6 +114,8 @@ pub(crate) struct ScanRegion {
access_layer: AccessLayerRef,
/// Scan request.
request: ScanRequest,
/// Cache.
cache_manager: Option<CacheManagerRef>,
}
impl ScanRegion {
@@ -121,11 +124,13 @@ impl ScanRegion {
version: VersionRef,
access_layer: AccessLayerRef,
request: ScanRequest,
cache_manager: Option<CacheManagerRef>,
) -> ScanRegion {
ScanRegion {
version,
access_layer,
request,
cache_manager,
}
}
@@ -181,7 +186,8 @@ impl ScanRegion {
.with_time_range(Some(time_range))
.with_predicate(Some(predicate))
.with_memtables(memtables)
.with_files(files);
.with_files(files)
.with_cache(self.cache_manager);
Ok(seq_scan)
}

View File

@@ -25,6 +25,7 @@ use snafu::ResultExt;
use table::predicate::Predicate;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::read::compat::{self, CompatReader};
@@ -49,6 +50,8 @@ pub struct SeqScan {
memtables: Vec<MemtableRef>,
/// Handles to SST files to scan.
files: Vec<FileHandle>,
/// Cache.
cache_manager: Option<CacheManagerRef>,
}
impl SeqScan {
@@ -62,37 +65,44 @@ impl SeqScan {
predicate: None,
memtables: Vec::new(),
files: Vec::new(),
cache_manager: None,
}
}
/// Set time range filter for time index.
/// Sets time range filter for time index.
#[must_use]
pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
self.time_range = time_range;
self
}
/// Set predicate to push down.
/// Sets predicate to push down.
#[must_use]
pub(crate) fn with_predicate(mut self, predicate: Option<Predicate>) -> Self {
self.predicate = predicate;
self
}
/// Set memtables to read.
/// Sets memtables to read.
#[must_use]
pub(crate) fn with_memtables(mut self, memtables: Vec<MemtableRef>) -> Self {
self.memtables = memtables;
self
}
/// Set files to read.
/// Sets files to read.
#[must_use]
pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
self.files = files;
self
}
/// Sets cache for this query.
pub(crate) fn with_cache(mut self, cache: Option<CacheManagerRef>) -> Self {
self.cache_manager = cache;
self
}
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
@@ -129,6 +139,7 @@ impl SeqScan {
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.build()
.await?;
if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) {

View File

@@ -29,6 +29,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, RegionCorruptedSnafu, Result};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
@@ -51,6 +52,7 @@ pub(crate) struct RegionOpener {
region_dir: String,
scheduler: SchedulerRef,
options: HashMap<String, String>,
cache_manager: Option<CacheManagerRef>,
}
impl RegionOpener {
@@ -70,6 +72,7 @@ impl RegionOpener {
region_dir: normalize_dir(region_dir),
scheduler,
options: HashMap::new(),
cache_manager: None,
}
}
@@ -85,6 +88,12 @@ impl RegionOpener {
self
}
/// Sets the cache manager for the region.
pub(crate) fn cache(mut self, cache_manager: Option<CacheManagerRef>) -> Self {
self.cache_manager = cache_manager;
self
}
/// Writes region manifest and creates a new region if it does not exist.
/// Opens the region if it already exists.
///
@@ -145,7 +154,11 @@ impl RegionOpener {
version_control,
access_layer: access_layer.clone(),
manifest_manager,
file_purger: Arc::new(LocalFilePurger::new(self.scheduler, access_layer)),
file_purger: Arc::new(LocalFilePurger::new(
self.scheduler,
access_layer,
self.cache_manager,
)),
last_flush_millis: AtomicI64::new(current_time_millis()),
// Region is writable after it is created.
writable: AtomicBool::new(true),
@@ -205,6 +218,7 @@ impl RegionOpener {
let file_purger = Arc::new(LocalFilePurger::new(
self.scheduler.clone(),
access_layer.clone(),
self.cache_manager.clone(),
));
let mutable = self.memtable_builder.build(&metadata);
let options = RegionOptions::try_from(&self.options)?;

View File

@@ -118,6 +118,12 @@ impl FileHandle {
inner: Arc::new(FileHandleInner::new(meta, file_purger)),
}
}
/// Returns the region id of the file.
pub fn region_id(&self) -> RegionId {
self.inner.meta.region_id
}
/// Returns the file id.
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id

View File

@@ -19,6 +19,7 @@ use common_telemetry::{error, info};
use store_api::storage::RegionId;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileId;
@@ -39,10 +40,11 @@ pub trait FilePurger: Send + Sync + fmt::Debug {
pub type FilePurgerRef = Arc<dyn FilePurger>;
/// Purger that purges file for current region.
pub struct LocalFilePurger {
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
}
impl fmt::Debug for LocalFilePurger {
@@ -54,10 +56,16 @@ impl fmt::Debug for LocalFilePurger {
}
impl LocalFilePurger {
pub fn new(scheduler: SchedulerRef, sst_layer: AccessLayerRef) -> Self {
/// Creates a new purger.
pub fn new(
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
) -> Self {
Self {
scheduler,
sst_layer,
cache_manager,
}
}
}
@@ -68,6 +76,11 @@ impl FilePurger for LocalFilePurger {
let region_id = request.region_id;
let sst_layer = self.sst_layer.clone();
// Remove meta of the file from cache.
if let Some(cache) = &self.cache_manager {
cache.remove_parquet_meta_data(region_id, file_id);
}
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(file_id).await {
error!(e; "Failed to delete SST file, file: {}, region: {}",
@@ -113,7 +126,7 @@ mod tests {
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone()));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
{
let handle = FileHandle::new(

View File

@@ -14,29 +14,35 @@
//! Parquet reader.
use std::ops::Range;
use std::sync::Arc;
use async_compat::CompatExt;
use async_trait::async_trait;
use bytes::Bytes;
use common_time::range::TimestampRange;
use datatypes::arrow::record_batch::RecordBatch;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use futures::{FutureExt, TryStreamExt};
use object_store::ObjectStore;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::errors::ParquetError;
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use store_api::storage::{ColumnId, RegionId};
use table::predicate::Predicate;
use tokio::io::BufReader;
use crate::cache::CacheManagerRef;
use crate::error::{
InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result,
};
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileHandle;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::PARQUET_METADATA_KEY;
@@ -55,6 +61,8 @@ pub struct ParquetReaderBuilder {
/// `None` reads all columns. Due to schema change, the projection
/// can contain columns not in the parquet file.
projection: Option<Vec<ColumnId>>,
/// Manager that caches SST data.
cache_manager: Option<CacheManagerRef>,
}
impl ParquetReaderBuilder {
@@ -71,6 +79,7 @@ impl ParquetReaderBuilder {
predicate: None,
time_range: None,
projection: None,
cache_manager: None,
}
}
@@ -94,6 +103,12 @@ impl ParquetReaderBuilder {
self
}
/// Attaches the cache to the builder.
pub fn cache(mut self, cache: Option<CacheManagerRef>) -> ParquetReaderBuilder {
self.cache_manager = cache;
self
}
/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -119,8 +134,16 @@ impl ParquetReaderBuilder {
.await
.context(OpenDalSnafu)?
.compat();
let buf_reader = BufReader::new(reader);
let mut builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
let reader = BufReader::new(reader);
let reader = AsyncFileReaderCache {
reader,
// TODO(yingwen): Sets the metadata when we implement row group level reader.
metadata: None,
cache: self.cache_manager.clone(),
region_id: self.file_handle.region_id(),
file_id: self.file_handle.file_id(),
};
let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
.await
.context(ReadParquetSnafu { path: file_path })?;
@@ -249,3 +272,58 @@ impl ParquetReader {
self.read_format.metadata()
}
}
/// Cache layer for parquet's [AsyncFileReader].
struct AsyncFileReaderCache<T> {
/// Underlying async file reader.
reader: T,
/// Parquet metadata cached locally.
metadata: Option<Arc<ParquetMetaData>>,
/// Global cache.
cache: Option<CacheManagerRef>,
/// Id of the region.
region_id: RegionId,
/// Id of the file to read.
file_id: FileId,
}
impl<T: AsyncFileReader> AsyncFileReader for AsyncFileReaderCache<T> {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
self.reader.get_bytes(range)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, ParquetError>> {
self.reader.get_byte_ranges(ranges)
}
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>, ParquetError>> {
async {
// Tries to get from local cache.
if let Some(metadata) = &self.metadata {
return Ok(metadata.clone());
}
// Tries to get from global cache.
if let Some(metadata) = self
.cache
.as_ref()
.and_then(|cache| cache.get_parquet_meta_data(self.region_id, self.file_id))
{
return Ok(metadata);
}
// Cache miss.
let metadata = self.reader.get_metadata().await?;
// Cache the metadata.
if let Some(cache) = &self.cache {
cache.put_parquet_meta_data(self.region_id, self.file_id, metadata.clone());
}
Ok(metadata)
}
.boxed()
}
}

View File

@@ -39,6 +39,7 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, Mutex};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
@@ -95,8 +96,12 @@ pub(crate) const DROPPING_MARKER_FILE: &str = ".dropping";
/// Chan1 --> WorkerThread1
/// ```
pub(crate) struct WorkerGroup {
/// Workers of the group.
workers: Vec<RegionWorker>,
/// Global background job scheduelr.
scheduler: SchedulerRef,
/// Cache.
cache_manager: CacheManagerRef,
}
impl WorkerGroup {
@@ -114,6 +119,7 @@ impl WorkerGroup {
config.global_write_buffer_size.as_bytes() as usize,
));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes()));
let workers = (0..config.num_workers)
.map(|id| {
@@ -125,12 +131,17 @@ impl WorkerGroup {
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::default(),
cache_manager: cache_manager.clone(),
}
.start()
})
.collect();
WorkerGroup { workers, scheduler }
WorkerGroup {
workers,
scheduler,
cache_manager,
}
}
/// Stops the worker group.
@@ -166,6 +177,11 @@ impl WorkerGroup {
self.worker(region_id).get_region(region_id)
}
/// Returns cache of the group.
pub(crate) fn cache_manager(&self) -> CacheManagerRef {
self.cache_manager.clone()
}
/// Get worker for specific `region_id`.
fn worker(&self, region_id: RegionId) -> &RegionWorker {
let mut hasher = DefaultHasher::new();
@@ -193,6 +209,7 @@ impl WorkerGroup {
assert!(config.num_workers.is_power_of_two());
let config = Arc::new(config);
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(config.sst_meta_cache_size.as_bytes()));
let workers = (0..config.num_workers)
.map(|id| {
@@ -204,12 +221,17 @@ impl WorkerGroup {
write_buffer_manager: write_buffer_manager.clone(),
scheduler: scheduler.clone(),
listener: WorkerListener::new(listener.clone()),
cache_manager: cache_manager.clone(),
}
.start()
})
.collect();
WorkerGroup { workers, scheduler }
WorkerGroup {
workers,
scheduler,
cache_manager,
}
}
}
@@ -226,6 +248,7 @@ struct WorkerStarter<S> {
write_buffer_manager: WriteBufferManagerRef,
scheduler: SchedulerRef,
listener: WorkerListener,
cache_manager: CacheManagerRef,
}
impl<S: LogStore> WorkerStarter<S> {
@@ -254,6 +277,7 @@ impl<S: LogStore> WorkerStarter<S> {
compaction_scheduler: CompactionScheduler::new(self.scheduler, sender.clone()),
stalled_requests: StalledRequests::default(),
listener: self.listener,
cache_manager: self.cache_manager,
};
let handle = common_runtime::spawn_write(async move {
worker_thread.run().await;
@@ -376,7 +400,7 @@ impl StalledRequests {
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
// Id of the worker.
/// Id of the worker.
id: WorkerId,
/// Engine config.
config: Arc<MitoConfig>,
@@ -408,6 +432,8 @@ struct RegionWorkerLoop<S> {
stalled_requests: StalledRequests,
/// Event listener for tests.
listener: WorkerListener,
/// Cache.
cache_manager: CacheManagerRef,
}
impl<S: LogStore> RegionWorkerLoop<S> {

View File

@@ -65,6 +65,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
)
.metadata(metadata)
.options(request.options)
.cache(Some(self.cache_manager.clone()))
.create_or_open(&self.config, &self.wal)
.await?;

View File

@@ -63,6 +63,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.scheduler.clone(),
)
.options(request.options)
.cache(Some(self.cache_manager.clone()))
.open(&self.config, &self.wal)
.await?;

View File

@@ -20,7 +20,7 @@ datafusion-expr.workspace = true
datafusion.workspace = true
datatypes = { workspace = true }
meta-client = { workspace = true }
moka = { version = "0.9", features = ["future"] }
moka = { workspace = true, features = ["future"] }
serde.workspace = true
serde_json = "1.0"
snafu.workspace = true

View File

@@ -662,6 +662,7 @@ max_background_jobs = 4
auto_flush_interval = "30m"
global_write_buffer_size = "1GiB"
global_write_buffer_reject_size = "2GiB"
sst_meta_cache_size = "128MiB"
[[region_engine]]