feat: tracks index files in another cache and preloads them (#7181)

* feat: divide parquet and puffin index

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: download index files when we open the region

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use different label for parquet/puffin

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: control parallelism and cache size by env

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: change gauge to counter

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: correct file type labels in file cache

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: move env to config and change cache ratio to percent

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: checks capacity before download and refine metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: change open to return MitoRegionRef

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: extract download to FileCache

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: run load cache task in write cache

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: check region state before downloading files

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update config docs and test

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: use file id from index_file_id to compute puffin key

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip loading cache in some states

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-11-11 16:37:32 +08:00
committed by GitHub
parent c7fded29ee
commit 24671b60b4
17 changed files with 575 additions and 179 deletions

View File

@@ -152,6 +152,8 @@
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).<br/>When enabled, index files are loaded into the write cache during region initialization,<br/>which can improve query performance at the cost of longer startup times. |
| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).<br/>The remaining capacity is used for data (parquet) files.<br/>Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,<br/>1GiB is reserved for index files and 4GiB for data files. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
@@ -553,6 +555,8 @@
| `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. |
| `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).<br/>When enabled, index files are loaded into the write cache during region initialization,<br/>which can improve query performance at the cost of longer startup times. |
| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).<br/>The remaining capacity is used for data (parquet) files.<br/>Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,<br/>1GiB is reserved for index files and 4GiB for data files. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |

View File

@@ -499,6 +499,17 @@ write_cache_size = "5GiB"
## @toml2docs:none-default
write_cache_ttl = "8h"
## Preload index (puffin) files into cache on region open (default: true).
## When enabled, index files are loaded into the write cache during region initialization,
## which can improve query performance at the cost of longer startup times.
preload_index_cache = true
## Percentage of write cache capacity allocated for index (puffin) files (default: 20).
## The remaining capacity is used for data (parquet) files.
## Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,
## 1GiB is reserved for index files and 4GiB for data files.
index_cache_percent = 20
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -590,6 +590,17 @@ write_cache_size = "5GiB"
## @toml2docs:none-default
write_cache_ttl = "8h"
## Preload index (puffin) files into cache on region open (default: true).
## When enabled, index files are loaded into the write cache during region initialization,
## which can improve query performance at the cost of longer startup times.
preload_index_cache = true
## Percentage of write cache capacity allocated for index (puffin) files (default: 20).
## The remaining capacity is used for data (parquet) files.
## Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,
## 1GiB is reserved for index files and 4GiB for data files.
index_cache_percent = 20
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -53,6 +53,8 @@ const VECTOR_TYPE: &str = "vector";
const PAGE_TYPE: &str = "page";
/// Metrics type key for files on the local store.
const FILE_TYPE: &str = "file";
/// Metrics type key for index files (puffin) on the local store.
const INDEX_TYPE: &str = "index";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";

View File

@@ -21,8 +21,8 @@ use std::time::{Duration, Instant};
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{error, info, warn};
use futures::{FutureExt, TryStreamExt};
use common_telemetry::{debug, error, info, warn};
use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
use moka::policy::EvictionPolicy;
@@ -31,10 +31,16 @@ use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::UnboundedReceiver;
use crate::cache::FILE_TYPE;
use crate::error::{OpenDalSnafu, Result};
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::access_layer::TempFileCleaner;
use crate::cache::{FILE_TYPE, INDEX_TYPE};
use crate::error::{self, OpenDalSnafu, Result};
use crate::metrics::{
CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
WRITE_CACHE_DOWNLOAD_ELAPSED,
};
use crate::region::opener::RegionLoadCacheTask;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;
@@ -43,16 +49,24 @@ use crate::sst::parquet::metadata::MetadataLoader;
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const FILE_DIR: &str = "cache/object/write/";
/// Default percentage for index (puffin) cache (20% of total capacity).
pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
/// Minimum capacity for each cache (512MB).
const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
/// A file cache manages files on local store and evict files based
/// on size.
#[derive(Debug)]
pub(crate) struct FileCache {
/// Local store to cache files.
local_store: ObjectStore,
/// Index to track cached files.
///
/// File id is enough to identity a file uniquely.
memory_index: Cache<IndexKey, IndexValue>,
/// Index to track cached Parquet files.
parquet_index: Cache<IndexKey, IndexValue>,
/// Index to track cached Puffin files.
puffin_index: Cache<IndexKey, IndexValue>,
/// Capacity of the puffin (index) cache in bytes.
puffin_capacity: u64,
}
pub(crate) type FileCacheRef = Arc<FileCache>;
@@ -63,15 +77,57 @@ impl FileCache {
local_store: ObjectStore,
capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
) -> FileCache {
let cache_store = local_store.clone();
// Validate and use the provided percent or default
let index_percent = index_cache_percent
.filter(|&percent| percent > 0 && percent < 100)
.unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
let total_capacity = capacity.as_bytes();
// Convert percent to ratio and calculate capacity for each cache
let index_ratio = index_percent as f64 / 100.0;
let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
let parquet_capacity = total_capacity - puffin_capacity;
// Ensure both capacities are at least 512MB
let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
info!(
"Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
index_percent,
ReadableSize(total_capacity),
ReadableSize(parquet_capacity),
ReadableSize(puffin_capacity)
);
let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
FileCache {
local_store,
parquet_index,
puffin_index,
puffin_capacity,
}
}
/// Builds a cache for a specific file type.
fn build_cache(
local_store: ObjectStore,
capacity: u64,
ttl: Option<Duration>,
label: &'static str,
) -> Cache<IndexKey, IndexValue> {
let cache_store = local_store;
let mut builder = Cache::builder()
.eviction_policy(EvictionPolicy::lru())
.weigher(|_key, value: &IndexValue| -> u32 {
// We only measure space on local store.
value.file_size
})
.max_capacity(capacity.as_bytes())
.max_capacity(capacity)
.async_eviction_listener(move |key, value, cause| {
let store = cache_store.clone();
// Stores files under FILE_DIR.
@@ -80,14 +136,14 @@ impl FileCache {
if let RemovalCause::Replaced = cause {
// The cache is replaced by another file. This is unexpected, we don't remove the same
// file but updates the metrics as the file is already replaced by users.
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
return;
}
match store.delete(&file_path).await {
Ok(()) => {
CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
}
Err(e) => {
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
@@ -99,10 +155,14 @@ impl FileCache {
if let Some(ttl) = ttl {
builder = builder.time_to_idle(ttl);
}
let memory_index = builder.build();
FileCache {
local_store,
memory_index,
builder.build()
}
/// Returns the appropriate memory index for the given file type.
fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
match file_type {
FileType::Parquet => &self.parquet_index,
FileType::Puffin => &self.puffin_index,
}
}
@@ -111,16 +171,17 @@ impl FileCache {
/// The `WriteCache` should ensure the file is in the correct path.
pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
CACHE_BYTES
.with_label_values(&[FILE_TYPE])
.with_label_values(&[key.file_type.metric_label()])
.add(value.file_size.into());
self.memory_index.insert(key, value).await;
let index = self.memory_index(key.file_type);
index.insert(key, value).await;
// Since files are large items, we run the pending tasks immediately.
self.memory_index.run_pending_tasks().await;
index.run_pending_tasks().await;
}
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
self.memory_index.get(&key).await
self.memory_index(key.file_type).get(&key).await
}
/// Reads a file from the cache.
@@ -128,15 +189,20 @@ impl FileCache {
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
// We must use `get()` to update the estimator of the cache.
// See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
if self.memory_index.get(&key).await.is_none() {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
let index = self.memory_index(key.file_type);
if index.get(&key).await.is_none() {
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
return None;
}
let file_path = self.cache_file_path(key);
match self.get_reader(&file_path).await {
Ok(Some(reader)) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
CACHE_HIT
.with_label_values(&[key.file_type.metric_label()])
.inc();
return Some(reader);
}
Err(e) => {
@@ -148,8 +214,10 @@ impl FileCache {
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
index.remove(&key).await;
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
None
}
@@ -159,8 +227,11 @@ impl FileCache {
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if self.memory_index.get(&key).await.is_none() {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
let index = self.memory_index(key.file_type);
if index.get(&key).await.is_none() {
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
return None;
}
@@ -170,7 +241,9 @@ impl FileCache {
let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
match bytes_result {
Ok(bytes) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
CACHE_HIT
.with_label_values(&[key.file_type.metric_label()])
.inc();
Some(bytes)
}
Err(e) => {
@@ -179,8 +252,10 @@ impl FileCache {
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
index.remove(&key).await;
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
None
}
}
@@ -191,7 +266,7 @@ impl FileCache {
/// in the memory index if upload is failed.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
self.memory_index.remove(&key).await;
self.memory_index(key.file_type).remove(&key).await;
// Always delete the file from the local store.
if let Err(e) = self.local_store.delete(&file_path).await {
warn!(e; "Failed to delete a cached file {}", file_path);
@@ -208,6 +283,7 @@ impl FileCache {
// Use i64 for total_size to reduce the risk of overflow.
// It is possible that the total size of the cache is larger than i32::MAX.
let (mut total_size, mut total_keys) = (0i64, 0);
let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
let meta = entry.metadata();
if !meta.is_file() {
@@ -223,36 +299,76 @@ impl FileCache {
.await
.context(OpenDalSnafu)?;
let file_size = meta.content_length() as u32;
self.memory_index
.insert(key, IndexValue { file_size })
.await;
total_size += i64::from(file_size);
let index = self.memory_index(key.file_type);
index.insert(key, IndexValue { file_size }).await;
let size = i64::from(file_size);
total_size += size;
total_keys += 1;
// Track sizes separately for each file type
match key.file_type {
FileType::Parquet => parquet_size += size,
FileType::Puffin => puffin_size += size,
}
}
// The metrics is a signed int gauge so we can updates it finally.
CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size);
CACHE_BYTES
.with_label_values(&[FILE_TYPE])
.add(parquet_size);
CACHE_BYTES
.with_label_values(&[INDEX_TYPE])
.add(puffin_size);
// Run all pending tasks of the moka cache so that the cache size is updated
// and the eviction policy is applied.
self.memory_index.run_pending_tasks().await;
self.parquet_index.run_pending_tasks().await;
self.puffin_index.run_pending_tasks().await;
let parquet_weight = self.parquet_index.weighted_size();
let parquet_count = self.parquet_index.entry_count();
let puffin_weight = self.puffin_index.weighted_size();
let puffin_count = self.puffin_index.entry_count();
info!(
"Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}",
"Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
total_keys,
total_size,
self.memory_index.weighted_size(),
parquet_count,
parquet_weight,
puffin_count,
puffin_weight,
now.elapsed()
);
Ok(())
}
/// Recovers the index from local store.
pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
///
/// If `task_receiver` is provided, spawns a background task after recovery
/// to process `RegionLoadCacheTask` messages for loading files into the cache.
pub(crate) async fn recover(
self: &Arc<Self>,
sync: bool,
task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
) {
let moved_self = self.clone();
let handle = tokio::spawn(async move {
if let Err(err) = moved_self.recover_inner().await {
error!(err; "Failed to recover file cache.")
}
// Spawns background task to process region load cache tasks after recovery.
// So it won't block the recovery when `sync` is true.
if let Some(mut receiver) = task_receiver {
let cache_ref = moved_self.clone();
info!("Spawning background task for processing region load cache tasks");
tokio::spawn(async move {
while let Some(task) = receiver.recv().await {
let file_cache = cache_ref.clone();
task.fill_cache(file_cache).await;
}
info!("Background task for processing region load cache tasks stopped");
});
}
});
if sync {
@@ -274,7 +390,7 @@ impl FileCache {
/// If the file is not in the cache or fail to load metadata, return None.
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
// Check if file cache contains the key
if let Some(index_value) = self.memory_index.get(&key).await {
if let Some(index_value) = self.parquet_index.get(&key).await {
// Load metadata from file cache
let local_store = self.local_store();
let file_path = self.cache_file_path(key);
@@ -283,7 +399,9 @@ impl FileCache {
match metadata_loader.load().await {
Ok(metadata) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
CACHE_HIT
.with_label_values(&[key.file_type.metric_label()])
.inc();
Some(metadata)
}
Err(e) => {
@@ -294,13 +412,17 @@ impl FileCache {
);
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
self.parquet_index.remove(&key).await;
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
None
}
}
} else {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
CACHE_MISS
.with_label_values(&[key.file_type.metric_label()])
.inc();
None
}
}
@@ -314,9 +436,106 @@ impl FileCache {
}
/// Checks if the key is in the file cache.
#[cfg(test)]
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
self.memory_index.contains_key(key)
self.memory_index(key.file_type).contains_key(key)
}
/// Returns the capacity of the puffin (index) cache in bytes.
pub(crate) fn puffin_cache_capacity(&self) -> u64 {
self.puffin_capacity
}
/// Returns the current weighted size (used bytes) of the puffin (index) cache.
pub(crate) fn puffin_cache_size(&self) -> u64 {
self.puffin_index.weighted_size()
}
/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
if let Err(e) = self
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
.await
{
let filename = index_key.to_string();
TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
return Err(e);
}
Ok(())
}
async fn download_without_cleaning(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
let file_type = index_key.file_type;
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
}])
.start_timer();
let reader = remote_store
.reader_with(remote_path)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..file_size)
.await
.context(error::OpenDalSnafu)?;
let cache_path = self.cache_file_path(index_key);
let mut writer = self
.local_store
.writer(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_write();
let region_id = index_key.region_id;
let file_id = index_key.file_id;
let bytes_written =
futures::io::copy(reader, &mut writer)
.await
.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
writer.close().await.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
let elapsed = timer.stop_and_record();
debug!(
"Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
remote_path, cache_path, bytes_written, region_id, elapsed,
);
let index_value = IndexValue {
file_size: bytes_written as _,
};
self.put(index_key, index_value).await;
Ok(())
}
}
@@ -377,6 +596,14 @@ impl FileType {
FileType::Puffin => "puffin",
}
}
/// Returns the metric label for this file type.
fn metric_label(&self) -> &'static str {
match self {
FileType::Parquet => FILE_TYPE,
FileType::Puffin => INDEX_TYPE,
}
}
}
/// An entity that describes the file in the file cache.
@@ -429,6 +656,7 @@ mod tests {
local_store.clone(),
ReadableSize::mb(10),
Some(Duration::from_millis(10)),
None,
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
@@ -455,7 +683,7 @@ mod tests {
let exist = cache.reader(key).await;
assert!(exist.is_some());
tokio::time::sleep(Duration::from_millis(15)).await;
cache.memory_index.run_pending_tasks().await;
cache.parquet_index.run_pending_tasks().await;
let non = cache.reader(key).await;
assert!(non.is_none());
}
@@ -465,7 +693,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -493,19 +721,19 @@ mod tests {
assert_eq!("hello", String::from_utf8(buf).unwrap());
// Get weighted size.
cache.memory_index.run_pending_tasks().await;
assert_eq!(5, cache.memory_index.weighted_size());
cache.parquet_index.run_pending_tasks().await;
assert_eq!(5, cache.parquet_index.weighted_size());
// Remove the file.
cache.remove(key).await;
assert!(cache.reader(key).await.is_none());
// Ensure all pending tasks of the moka cache is done before assertion.
cache.memory_index.run_pending_tasks().await;
cache.parquet_index.run_pending_tasks().await;
// The file also not exists.
assert!(!local_store.exists(&file_path).await.unwrap());
assert_eq!(0, cache.memory_index.weighted_size());
assert_eq!(0, cache.parquet_index.weighted_size());
}
#[tokio::test]
@@ -513,7 +741,7 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -538,14 +766,14 @@ mod tests {
// Reader is none.
assert!(cache.reader(key).await.is_none());
// Key is removed.
assert!(!cache.memory_index.contains_key(&key));
assert!(!cache.parquet_index.contains_key(&key));
}
#[tokio::test]
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
@@ -575,6 +803,7 @@ mod tests {
local_store.clone(),
ReadableSize::mb(10),
None,
None,
));
// No entry before recovery.
assert!(
@@ -583,11 +812,11 @@ mod tests {
.await
.is_none()
);
cache.recover(true).await;
cache.recover(true, None).await;
// Check size.
cache.memory_index.run_pending_tasks().await;
assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
cache.parquet_index.run_pending_tasks().await;
assert_eq!(total_size, cache.parquet_index.weighted_size() as usize);
for (i, file_id) in file_ids.iter().enumerate() {
let key = IndexKey::new(region_id, *file_id, file_type);
@@ -601,7 +830,7 @@ mod tests {
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);

View File

@@ -23,6 +23,7 @@ use futures::AsyncWriteExt;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
use crate::access_layer::{
FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
@@ -30,9 +31,8 @@ use crate::access_layer::{
};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{
UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED,
};
use crate::metrics::UPLOAD_BYTES_TOTAL;
use crate::region::opener::RegionLoadCacheTask;
use crate::sst::file::RegionFileId;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
@@ -51,6 +51,8 @@ pub struct WriteCache {
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager for index.
intermediate_manager: IntermediateManager,
/// Sender for region load cache tasks.
task_sender: UnboundedSender<RegionLoadCacheTask>,
}
pub type WriteCacheRef = Arc<WriteCache>;
@@ -62,16 +64,25 @@ impl WriteCache {
local_store: ObjectStore,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl));
file_cache.recover(false).await;
let (task_sender, task_receiver) = unbounded_channel();
let file_cache = Arc::new(FileCache::new(
local_store,
cache_capacity,
ttl,
index_cache_percent,
));
file_cache.recover(false, Some(task_receiver)).await;
Ok(Self {
file_cache,
puffin_manager_factory,
intermediate_manager,
task_sender,
})
}
@@ -80,6 +91,7 @@ impl WriteCache {
cache_dir: &str,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
@@ -90,6 +102,7 @@ impl WriteCache {
local_store,
cache_capacity,
ttl,
index_cache_percent,
puffin_manager_factory,
intermediate_manager,
)
@@ -272,85 +285,9 @@ impl WriteCache {
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
if let Err(e) = self
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
self.file_cache
.download(index_key, remote_path, remote_store, file_size)
.await
{
let filename = index_key.to_string();
TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
.await;
return Err(e);
}
Ok(())
}
async fn download_without_cleaning(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
let file_type = index_key.file_type;
let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
.with_label_values(&[match file_type {
FileType::Parquet => "download_parquet",
FileType::Puffin => "download_puffin",
}])
.start_timer();
let reader = remote_store
.reader_with(remote_path)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..file_size)
.await
.context(error::OpenDalSnafu)?;
let cache_path = self.file_cache.cache_file_path(index_key);
let mut writer = self
.file_cache
.local_store()
.writer(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_write();
let region_id = index_key.region_id;
let file_id = index_key.file_id;
let bytes_written =
futures::io::copy(reader, &mut writer)
.await
.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
writer.close().await.context(error::DownloadSnafu {
region_id,
file_id,
file_type,
})?;
WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
let elapsed = timer.stop_and_record();
debug!(
"Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
remote_path, cache_path, bytes_written, region_id, elapsed,
);
let index_value = IndexValue {
file_size: bytes_written as _,
};
self.file_cache.put(index_key, index_value).await;
Ok(())
}
/// Uploads a Parquet file or a Puffin file to the remote object store.
@@ -424,6 +361,13 @@ impl WriteCache {
Ok(())
}
/// Sends a region load cache task to the background processing queue.
///
/// If the receiver has been dropped, the error is ignored.
pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) {
let _ = self.task_sender.send(task);
}
}
/// Request to write and upload a SST.

View File

@@ -25,6 +25,7 @@ use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT;
use crate::error::Result;
use crate::gc::GcConfig;
use crate::memtable::MemtableConfig;
@@ -119,6 +120,12 @@ pub struct MitoConfig {
/// TTL for write cache.
#[serde(with = "humantime_serde")]
pub write_cache_ttl: Option<Duration>,
/// Preload index (puffin) files into cache on region open (default: true).
pub preload_index_cache: bool,
/// Percentage of write cache capacity allocated for index (puffin) files (default: 20).
/// The remaining capacity is used for data (parquet) files.
/// Must be between 0 and 100 (exclusive).
pub index_cache_percent: u8,
// Other configs:
/// Buffer size for SST writing.
@@ -182,6 +189,8 @@ impl Default for MitoConfig {
write_cache_path: String::new(),
write_cache_size: ReadableSize::gb(5),
write_cache_ttl: None,
preload_index_cache: true,
index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
@@ -271,6 +280,15 @@ impl MitoConfig {
self.write_cache_path = data_home.to_string();
}
// Validate index_cache_percent is within valid range (0, 100)
if self.index_cache_percent == 0 || self.index_cache_percent >= 100 {
warn!(
"Invalid index_cache_percent {}, resetting to default {}",
self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT
);
self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT;
}
self.index.sanitize(data_home, &self.inverted_index)?;
Ok(())

View File

@@ -479,6 +479,18 @@ lazy_static! {
"greptime_mito_gc_delete_file_count",
"mito gc deleted file count",
).unwrap();
/// Total number of files downloaded during cache fill on region open.
pub static ref CACHE_FILL_DOWNLOADED_FILES: IntCounter = register_int_counter!(
"mito_cache_fill_downloaded_files",
"mito cache fill downloaded files count",
).unwrap();
/// Number of files pending download during cache fill on region open.
pub static ref CACHE_FILL_PENDING_FILES: IntGauge = register_int_gauge!(
"mito_cache_fill_pending_files",
"mito cache fill pending files count",
).unwrap();
}
/// Stager notifier to collect metrics.

View File

@@ -41,6 +41,7 @@ use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::AccessLayer;
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::config::MitoConfig;
use crate::error;
use crate::error::{
@@ -53,20 +54,22 @@ use crate::manifest::storage::manifest_compress_type;
use crate::memtable::MemtableBuilderProvider;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::TimePartitions;
use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
use crate::region::{
ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState,
ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState,
};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::RegionFileId;
use crate::sst::file_purger::create_local_file_purger;
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location::region_dir_from_table_dir;
use crate::sst::location::{self, region_dir_from_table_dir};
use crate::time_provider::TimeProviderRef;
use crate::wal::entry_reader::WalEntryReader;
use crate::wal::{EntryId, Wal};
@@ -217,7 +220,7 @@ impl RegionOpener {
mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<MitoRegion> {
) -> Result<MitoRegionRef> {
let region_id = self.region_id;
let region_dir = self.region_dir();
let metadata = self.build_metadata()?;
@@ -305,7 +308,7 @@ impl RegionOpener {
));
let now = self.time_provider.current_time_millis();
Ok(MitoRegion {
Ok(Arc::new(MitoRegion {
region_id,
version_control,
access_layer: access_layer.clone(),
@@ -329,7 +332,7 @@ impl RegionOpener {
written_bytes: Arc::new(AtomicU64::new(0)),
sst_format,
stats: self.stats,
})
}))
}
/// Opens an existing region in read only mode.
@@ -339,7 +342,7 @@ impl RegionOpener {
mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<MitoRegion> {
) -> Result<MitoRegionRef> {
let region_id = self.region_id;
let region_dir = self.region_dir();
let region = self
@@ -397,7 +400,7 @@ impl RegionOpener {
&mut self,
config: &MitoConfig,
wal: &Wal<S>,
) -> Result<Option<MitoRegion>> {
) -> Result<Option<MitoRegionRef>> {
let region_options = self.options.as_ref().unwrap().clone();
let region_manifest_options = Self::manifest_options(
@@ -539,8 +542,8 @@ impl RegionOpener {
let region = MitoRegion {
region_id: self.region_id,
version_control,
access_layer,
version_control: version_control.clone(),
access_layer: access_layer.clone(),
// Region is always opened in read only mode.
manifest_ctx: Arc::new(ManifestContext::new(
manifest_manager,
@@ -557,6 +560,11 @@ impl RegionOpener {
sst_format,
stats: self.stats.clone(),
};
let region = Arc::new(region);
maybe_load_cache(&region, config, &self.cache_manager);
Ok(Some(region))
}
@@ -809,3 +817,155 @@ where
pub(crate) fn new_manifest_dir(region_dir: &str) -> String {
join_dir(region_dir, "manifest")
}
/// A task to load and fill the region file cache.
pub(crate) struct RegionLoadCacheTask {
region: MitoRegionRef,
}
impl RegionLoadCacheTask {
pub(crate) fn new(region: MitoRegionRef) -> Self {
Self { region }
}
/// Fills the file cache with index files from the region.
pub(crate) async fn fill_cache(&self, file_cache: FileCacheRef) {
let region_id = self.region.region_id;
let table_dir = self.region.access_layer.table_dir();
let path_type = self.region.access_layer.path_type();
let object_store = self.region.access_layer.object_store();
let version_control = &self.region.version_control;
// Collects IndexKeys and file sizes for files that need to be downloaded
let mut files_to_download = Vec::new();
let mut files_already_cached = 0;
{
let version = version_control.current().version;
for level in version.ssts.levels() {
for file_handle in level.files.values() {
let file_meta = file_handle.meta_ref();
if file_meta.exists_index() {
let puffin_key = IndexKey::new(
file_meta.region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
);
if !file_cache.contains_key(&puffin_key) {
files_to_download.push((puffin_key, file_meta.index_file_size));
} else {
files_already_cached += 1;
}
}
}
}
// Releases the Version after the scope to avoid holding the memtables and file handles
// for a long time.
}
let total_files = files_to_download.len() as i64;
info!(
"Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}",
region_id, total_files, files_already_cached
);
CACHE_FILL_PENDING_FILES.add(total_files);
let mut files_downloaded = 0;
let mut files_skipped = 0;
for (puffin_key, file_size) in files_to_download {
let current_size = file_cache.puffin_cache_size();
let capacity = file_cache.puffin_cache_capacity();
let region_state = self.region.state();
if !can_load_cache(region_state) {
info!(
"Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}",
region_state, region_id, current_size, capacity
);
break;
}
// Checks if adding this file would exceed capacity
if current_size + file_size > capacity {
info!(
"Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}",
region_id, puffin_key.file_id, current_size, file_size, capacity
);
files_skipped = (total_files - files_downloaded) as usize;
CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded);
break;
}
let index_remote_path = location::index_file_path(
table_dir,
RegionFileId::new(puffin_key.region_id, puffin_key.file_id),
path_type,
);
match file_cache
.download(puffin_key, &index_remote_path, object_store, file_size)
.await
{
Ok(_) => {
debug!(
"Downloaded index file to write cache, region: {}, file_id: {}",
region_id, puffin_key.file_id
);
files_downloaded += 1;
CACHE_FILL_DOWNLOADED_FILES.inc_by(1);
CACHE_FILL_PENDING_FILES.dec();
}
Err(e) => {
warn!(
e; "Failed to download index file to write cache, region: {}, file_id: {}",
region_id, puffin_key.file_id
);
CACHE_FILL_PENDING_FILES.dec();
}
}
}
info!(
"Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}",
region_id, total_files, files_downloaded, files_already_cached, files_skipped
);
}
}
/// Loads all index (Puffin) files from the version into the write cache.
fn maybe_load_cache(
region: &MitoRegionRef,
config: &MitoConfig,
cache_manager: &Option<CacheManagerRef>,
) {
let Some(cache_manager) = cache_manager else {
return;
};
let Some(write_cache) = cache_manager.write_cache() else {
return;
};
let preload_enabled = config.preload_index_cache;
if !preload_enabled {
return;
}
let task = RegionLoadCacheTask::new(region.clone());
write_cache.load_region_cache(task);
}
fn can_load_cache(state: RegionRoleState) -> bool {
match state {
RegionRoleState::Leader(RegionLeaderState::Writable)
| RegionRoleState::Leader(RegionLeaderState::Staging)
| RegionRoleState::Leader(RegionLeaderState::Altering)
| RegionRoleState::Leader(RegionLeaderState::Editing)
| RegionRoleState::Follower => true,
// The region will be closed soon if it is downgrading.
RegionRoleState::Leader(RegionLeaderState::Downgrading)
| RegionRoleState::Leader(RegionLeaderState::Dropping)
| RegionRoleState::Leader(RegionLeaderState::Truncating) => false,
}
}

View File

@@ -1714,6 +1714,7 @@ mod tests {
dir.path().to_str().unwrap(),
ReadableSize::mb(10),
None,
None,
factory,
intm_manager,
)

View File

@@ -629,6 +629,7 @@ impl TestEnv {
local_store,
capacity,
None,
None,
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
)
@@ -648,6 +649,7 @@ impl TestEnv {
path,
capacity,
None,
None,
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
)

View File

@@ -450,6 +450,7 @@ pub async fn write_cache_from_config(
&config.write_cache_path,
config.write_cache_size,
config.write_cache_ttl,
Some(config.index_cache_percent),
puffin_manager_factory,
intermediate_manager,
)

View File

@@ -111,26 +111,24 @@ impl<S: LogStore> RegionWorkerLoop<S> {
info!(
"Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}"
);
let reopened_region = Arc::new(
RegionOpener::new(
region_id,
region.table_dir(),
region.access_layer.path_type(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
self.file_ref_manager.clone(),
self.partition_expr_fetcher.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())?
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?,
);
let reopened_region = RegionOpener::new(
region_id,
region.table_dir(),
region.access_layer.path_type(),
self.memtable_builder_provider.clone(),
self.object_store_manager.clone(),
self.purge_scheduler.clone(),
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
self.time_provider.clone(),
self.file_ref_manager.clone(),
self.partition_expr_fetcher.clone(),
)
.cache(Some(self.cache_manager.clone()))
.options(region.version().options.clone())?
.skip_wal_replay(true)
.open(&self.config, &self.wal)
.await?;
debug_assert!(!reopened_region.is_writable());
self.regions.insert_region(reopened_region.clone());

View File

@@ -14,8 +14,6 @@
//! Handling create request.
use std::sync::Arc;
use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataBuilder;
@@ -84,7 +82,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.region_count.inc();
// Insert the MitoRegion into the RegionMap.
self.regions.insert_region(Arc::new(region));
self.regions.insert_region(region);
Ok(0)
}

View File

@@ -431,8 +431,11 @@ async fn edit_region(
let is_index_exist = file_meta.exists_index();
let index_file_size = file_meta.index_file_size();
let index_file_index_key =
IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
let index_file_index_key = IndexKey::new(
region_id,
file_meta.index_file_id().file_id(),
FileType::Puffin,
);
let index_remote_path = location::index_file_path(
layer.table_dir(),
file_meta.file_id(),

View File

@@ -133,7 +133,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_count.inc();
// Insert the Region into the RegionMap.
regions.insert_region(Arc::new(region));
regions.insert_region(region);
let senders = opening_regions.remove_sender(region_id);
for sender in senders {

View File

@@ -1497,6 +1497,8 @@ auto_flush_interval = "30m"
enable_write_cache = false
write_cache_path = ""
write_cache_size = "5GiB"
preload_index_cache = true
index_cache_percent = 20
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
max_concurrent_scan_files = 384