From 24671b60b4c64aaffa32483cc8c52fb73736b73d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 11 Nov 2025 16:37:32 +0800 Subject: [PATCH] feat: tracks index files in another cache and preloads them (#7181) * feat: divide parquet and puffin index Signed-off-by: evenyag * feat: download index files when we open the region Signed-off-by: evenyag * feat: use different label for parquet/puffin Signed-off-by: evenyag * feat: control parallelism and cache size by env Signed-off-by: evenyag * fix: change gauge to counter Signed-off-by: evenyag * fix: correct file type labels in file cache Signed-off-by: evenyag * refactor: move env to config and change cache ratio to percent Signed-off-by: evenyag * feat: checks capacity before download and refine metrics Signed-off-by: evenyag * refactor: change open to return MitoRegionRef Signed-off-by: evenyag * refactor: extract download to FileCache Signed-off-by: evenyag * feat: run load cache task in write cache Signed-off-by: evenyag * feat: check region state before downloading files Signed-off-by: evenyag * chore: update config docs and test Signed-off-by: evenyag * fix: use file id from index_file_id to compute puffin key Signed-off-by: evenyag * fix: skip loading cache in some states Signed-off-by: evenyag --------- Signed-off-by: evenyag --- config/config.md | 4 + config/datanode.example.toml | 11 + config/standalone.example.toml | 11 + src/mito2/src/cache.rs | 2 + src/mito2/src/cache/file_cache.rs | 351 ++++++++++++++++++++---- src/mito2/src/cache/write_cache.rs | 110 ++------ src/mito2/src/config.rs | 18 ++ src/mito2/src/metrics.rs | 12 + src/mito2/src/region/opener.rs | 178 +++++++++++- src/mito2/src/sst/index.rs | 1 + src/mito2/src/test_util.rs | 2 + src/mito2/src/worker.rs | 1 + src/mito2/src/worker/handle_catchup.rs | 38 ++- src/mito2/src/worker/handle_create.rs | 4 +- src/mito2/src/worker/handle_manifest.rs | 7 +- src/mito2/src/worker/handle_open.rs | 2 +- tests-integration/tests/http.rs | 2 + 17 files changed, 575 insertions(+), 179 deletions(-) diff --git a/config/config.md b/config/config.md index 6f8a19a49a..58c491b4ad 100644 --- a/config/config.md +++ b/config/config.md @@ -152,6 +152,8 @@ | `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. | | `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. | +| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).
When enabled, index files are loaded into the write cache during region initialization,
which can improve query performance at the cost of longer startup times. | +| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).
The remaining capacity is used for data (parquet) files.
Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,
1GiB is reserved for index files and 4GiB for data files. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | @@ -553,6 +555,8 @@ | `region_engine.mito.write_cache_path` | String | `""` | File system path for write cache, defaults to `{data_home}`. | | `region_engine.mito.write_cache_size` | String | `5GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. | | `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. | +| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).
When enabled, index files are loaded into the write cache during region initialization,
which can improve query performance at the cost of longer startup times. | +| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).
The remaining capacity is used for data (parquet) files.
Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,
1GiB is reserved for index files and 4GiB for data files. | | `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. | | `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. | | `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 4e8605b387..dda926e1cb 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -499,6 +499,17 @@ write_cache_size = "5GiB" ## @toml2docs:none-default write_cache_ttl = "8h" +## Preload index (puffin) files into cache on region open (default: true). +## When enabled, index files are loaded into the write cache during region initialization, +## which can improve query performance at the cost of longer startup times. +preload_index_cache = true + +## Percentage of write cache capacity allocated for index (puffin) files (default: 20). +## The remaining capacity is used for data (parquet) files. +## Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation, +## 1GiB is reserved for index files and 4GiB for data files. +index_cache_percent = 20 + ## Buffer size for SST writing. sst_write_buffer_size = "8MB" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index bfdb507969..70e6e0888f 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -590,6 +590,17 @@ write_cache_size = "5GiB" ## @toml2docs:none-default write_cache_ttl = "8h" +## Preload index (puffin) files into cache on region open (default: true). +## When enabled, index files are loaded into the write cache during region initialization, +## which can improve query performance at the cost of longer startup times. +preload_index_cache = true + +## Percentage of write cache capacity allocated for index (puffin) files (default: 20). +## The remaining capacity is used for data (parquet) files. +## Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation, +## 1GiB is reserved for index files and 4GiB for data files. +index_cache_percent = 20 + ## Buffer size for SST writing. sst_write_buffer_size = "8MB" diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index b371e39b78..b3a9bfb2df 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -53,6 +53,8 @@ const VECTOR_TYPE: &str = "vector"; const PAGE_TYPE: &str = "page"; /// Metrics type key for files on the local store. const FILE_TYPE: &str = "file"; +/// Metrics type key for index files (puffin) on the local store. +const INDEX_TYPE: &str = "index"; /// Metrics type key for selector result cache. const SELECTOR_RESULT_TYPE: &str = "selector_result"; diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 3fed4b9916..e9c67aaa45 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -21,8 +21,8 @@ use std::time::{Duration, Instant}; use bytes::Bytes; use common_base::readable_size::ReadableSize; -use common_telemetry::{error, info, warn}; -use futures::{FutureExt, TryStreamExt}; +use common_telemetry::{debug, error, info, warn}; +use futures::{AsyncWriteExt, FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; use moka::policy::EvictionPolicy; @@ -31,10 +31,16 @@ use object_store::{ErrorKind, ObjectStore, Reader}; use parquet::file::metadata::ParquetMetaData; use snafu::ResultExt; use store_api::storage::{FileId, RegionId}; +use tokio::sync::mpsc::UnboundedReceiver; -use crate::cache::FILE_TYPE; -use crate::error::{OpenDalSnafu, Result}; -use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; +use crate::access_layer::TempFileCleaner; +use crate::cache::{FILE_TYPE, INDEX_TYPE}; +use crate::error::{self, OpenDalSnafu, Result}; +use crate::metrics::{ + CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, + WRITE_CACHE_DOWNLOAD_ELAPSED, +}; +use crate::region::opener::RegionLoadCacheTask; use crate::sst::parquet::helper::fetch_byte_ranges; use crate::sst::parquet::metadata::MetadataLoader; @@ -43,16 +49,24 @@ use crate::sst::parquet::metadata::MetadataLoader; /// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer). const FILE_DIR: &str = "cache/object/write/"; +/// Default percentage for index (puffin) cache (20% of total capacity). +pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20; + +/// Minimum capacity for each cache (512MB). +const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024; + /// A file cache manages files on local store and evict files based /// on size. #[derive(Debug)] pub(crate) struct FileCache { /// Local store to cache files. local_store: ObjectStore, - /// Index to track cached files. - /// - /// File id is enough to identity a file uniquely. - memory_index: Cache, + /// Index to track cached Parquet files. + parquet_index: Cache, + /// Index to track cached Puffin files. + puffin_index: Cache, + /// Capacity of the puffin (index) cache in bytes. + puffin_capacity: u64, } pub(crate) type FileCacheRef = Arc; @@ -63,15 +77,57 @@ impl FileCache { local_store: ObjectStore, capacity: ReadableSize, ttl: Option, + index_cache_percent: Option, ) -> FileCache { - let cache_store = local_store.clone(); + // Validate and use the provided percent or default + let index_percent = index_cache_percent + .filter(|&percent| percent > 0 && percent < 100) + .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT); + let total_capacity = capacity.as_bytes(); + + // Convert percent to ratio and calculate capacity for each cache + let index_ratio = index_percent as f64 / 100.0; + let puffin_capacity = (total_capacity as f64 * index_ratio) as u64; + let parquet_capacity = total_capacity - puffin_capacity; + + // Ensure both capacities are at least 512MB + let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY); + let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY); + + info!( + "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}", + index_percent, + ReadableSize(total_capacity), + ReadableSize(parquet_capacity), + ReadableSize(puffin_capacity) + ); + + let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file"); + let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index"); + + FileCache { + local_store, + parquet_index, + puffin_index, + puffin_capacity, + } + } + + /// Builds a cache for a specific file type. + fn build_cache( + local_store: ObjectStore, + capacity: u64, + ttl: Option, + label: &'static str, + ) -> Cache { + let cache_store = local_store; let mut builder = Cache::builder() .eviction_policy(EvictionPolicy::lru()) .weigher(|_key, value: &IndexValue| -> u32 { // We only measure space on local store. value.file_size }) - .max_capacity(capacity.as_bytes()) + .max_capacity(capacity) .async_eviction_listener(move |key, value, cause| { let store = cache_store.clone(); // Stores files under FILE_DIR. @@ -80,14 +136,14 @@ impl FileCache { if let RemovalCause::Replaced = cause { // The cache is replaced by another file. This is unexpected, we don't remove the same // file but updates the metrics as the file is already replaced by users. - CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); + CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into()); warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id); return; } match store.delete(&file_path).await { Ok(()) => { - CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); + CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into()); } Err(e) => { warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id); @@ -99,10 +155,14 @@ impl FileCache { if let Some(ttl) = ttl { builder = builder.time_to_idle(ttl); } - let memory_index = builder.build(); - FileCache { - local_store, - memory_index, + builder.build() + } + + /// Returns the appropriate memory index for the given file type. + fn memory_index(&self, file_type: FileType) -> &Cache { + match file_type { + FileType::Parquet => &self.parquet_index, + FileType::Puffin => &self.puffin_index, } } @@ -111,16 +171,17 @@ impl FileCache { /// The `WriteCache` should ensure the file is in the correct path. pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) { CACHE_BYTES - .with_label_values(&[FILE_TYPE]) + .with_label_values(&[key.file_type.metric_label()]) .add(value.file_size.into()); - self.memory_index.insert(key, value).await; + let index = self.memory_index(key.file_type); + index.insert(key, value).await; // Since files are large items, we run the pending tasks immediately. - self.memory_index.run_pending_tasks().await; + index.run_pending_tasks().await; } pub(crate) async fn get(&self, key: IndexKey) -> Option { - self.memory_index.get(&key).await + self.memory_index(key.file_type).get(&key).await } /// Reads a file from the cache. @@ -128,15 +189,20 @@ impl FileCache { pub(crate) async fn reader(&self, key: IndexKey) -> Option { // We must use `get()` to update the estimator of the cache. // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key - if self.memory_index.get(&key).await.is_none() { - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + let index = self.memory_index(key.file_type); + if index.get(&key).await.is_none() { + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); return None; } let file_path = self.cache_file_path(key); match self.get_reader(&file_path).await { Ok(Some(reader)) => { - CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + CACHE_HIT + .with_label_values(&[key.file_type.metric_label()]) + .inc(); return Some(reader); } Err(e) => { @@ -148,8 +214,10 @@ impl FileCache { } // We removes the file from the index. - self.memory_index.remove(&key).await; - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + index.remove(&key).await; + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); None } @@ -159,8 +227,11 @@ impl FileCache { key: IndexKey, ranges: &[Range], ) -> Option> { - if self.memory_index.get(&key).await.is_none() { - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + let index = self.memory_index(key.file_type); + if index.get(&key).await.is_none() { + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); return None; } @@ -170,7 +241,9 @@ impl FileCache { let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await; match bytes_result { Ok(bytes) => { - CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + CACHE_HIT + .with_label_values(&[key.file_type.metric_label()]) + .inc(); Some(bytes) } Err(e) => { @@ -179,8 +252,10 @@ impl FileCache { } // We removes the file from the index. - self.memory_index.remove(&key).await; - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + index.remove(&key).await; + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); None } } @@ -191,7 +266,7 @@ impl FileCache { /// in the memory index if upload is failed. pub(crate) async fn remove(&self, key: IndexKey) { let file_path = self.cache_file_path(key); - self.memory_index.remove(&key).await; + self.memory_index(key.file_type).remove(&key).await; // Always delete the file from the local store. if let Err(e) = self.local_store.delete(&file_path).await { warn!(e; "Failed to delete a cached file {}", file_path); @@ -208,6 +283,7 @@ impl FileCache { // Use i64 for total_size to reduce the risk of overflow. // It is possible that the total size of the cache is larger than i32::MAX. let (mut total_size, mut total_keys) = (0i64, 0); + let (mut parquet_size, mut puffin_size) = (0i64, 0i64); while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? { let meta = entry.metadata(); if !meta.is_file() { @@ -223,36 +299,76 @@ impl FileCache { .await .context(OpenDalSnafu)?; let file_size = meta.content_length() as u32; - self.memory_index - .insert(key, IndexValue { file_size }) - .await; - total_size += i64::from(file_size); + let index = self.memory_index(key.file_type); + index.insert(key, IndexValue { file_size }).await; + let size = i64::from(file_size); + total_size += size; total_keys += 1; + + // Track sizes separately for each file type + match key.file_type { + FileType::Parquet => parquet_size += size, + FileType::Puffin => puffin_size += size, + } } // The metrics is a signed int gauge so we can updates it finally. - CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size); + CACHE_BYTES + .with_label_values(&[FILE_TYPE]) + .add(parquet_size); + CACHE_BYTES + .with_label_values(&[INDEX_TYPE]) + .add(puffin_size); // Run all pending tasks of the moka cache so that the cache size is updated // and the eviction policy is applied. - self.memory_index.run_pending_tasks().await; + self.parquet_index.run_pending_tasks().await; + self.puffin_index.run_pending_tasks().await; + let parquet_weight = self.parquet_index.weighted_size(); + let parquet_count = self.parquet_index.entry_count(); + let puffin_weight = self.puffin_index.weighted_size(); + let puffin_count = self.puffin_index.entry_count(); info!( - "Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}", + "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}", total_keys, total_size, - self.memory_index.weighted_size(), + parquet_count, + parquet_weight, + puffin_count, + puffin_weight, now.elapsed() ); Ok(()) } /// Recovers the index from local store. - pub(crate) async fn recover(self: &Arc, sync: bool) { + /// + /// If `task_receiver` is provided, spawns a background task after recovery + /// to process `RegionLoadCacheTask` messages for loading files into the cache. + pub(crate) async fn recover( + self: &Arc, + sync: bool, + task_receiver: Option>, + ) { let moved_self = self.clone(); let handle = tokio::spawn(async move { if let Err(err) = moved_self.recover_inner().await { error!(err; "Failed to recover file cache.") } + + // Spawns background task to process region load cache tasks after recovery. + // So it won't block the recovery when `sync` is true. + if let Some(mut receiver) = task_receiver { + let cache_ref = moved_self.clone(); + info!("Spawning background task for processing region load cache tasks"); + tokio::spawn(async move { + while let Some(task) = receiver.recv().await { + let file_cache = cache_ref.clone(); + task.fill_cache(file_cache).await; + } + info!("Background task for processing region load cache tasks stopped"); + }); + } }); if sync { @@ -274,7 +390,7 @@ impl FileCache { /// If the file is not in the cache or fail to load metadata, return None. pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option { // Check if file cache contains the key - if let Some(index_value) = self.memory_index.get(&key).await { + if let Some(index_value) = self.parquet_index.get(&key).await { // Load metadata from file cache let local_store = self.local_store(); let file_path = self.cache_file_path(key); @@ -283,7 +399,9 @@ impl FileCache { match metadata_loader.load().await { Ok(metadata) => { - CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + CACHE_HIT + .with_label_values(&[key.file_type.metric_label()]) + .inc(); Some(metadata) } Err(e) => { @@ -294,13 +412,17 @@ impl FileCache { ); } // We removes the file from the index. - self.memory_index.remove(&key).await; - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + self.parquet_index.remove(&key).await; + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); None } } } else { - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + CACHE_MISS + .with_label_values(&[key.file_type.metric_label()]) + .inc(); None } } @@ -314,9 +436,106 @@ impl FileCache { } /// Checks if the key is in the file cache. - #[cfg(test)] pub(crate) fn contains_key(&self, key: &IndexKey) -> bool { - self.memory_index.contains_key(key) + self.memory_index(key.file_type).contains_key(key) + } + + /// Returns the capacity of the puffin (index) cache in bytes. + pub(crate) fn puffin_cache_capacity(&self) -> u64 { + self.puffin_capacity + } + + /// Returns the current weighted size (used bytes) of the puffin (index) cache. + pub(crate) fn puffin_cache_size(&self) -> u64 { + self.puffin_index.weighted_size() + } + + /// Downloads a file in `remote_path` from the remote object store to the local cache + /// (specified by `index_key`). + pub(crate) async fn download( + &self, + index_key: IndexKey, + remote_path: &str, + remote_store: &ObjectStore, + file_size: u64, + ) -> Result<()> { + if let Err(e) = self + .download_without_cleaning(index_key, remote_path, remote_store, file_size) + .await + { + let filename = index_key.to_string(); + TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await; + + return Err(e); + } + Ok(()) + } + + async fn download_without_cleaning( + &self, + index_key: IndexKey, + remote_path: &str, + remote_store: &ObjectStore, + file_size: u64, + ) -> Result<()> { + const DOWNLOAD_READER_CONCURRENCY: usize = 8; + const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); + + let file_type = index_key.file_type; + let timer = WRITE_CACHE_DOWNLOAD_ELAPSED + .with_label_values(&[match file_type { + FileType::Parquet => "download_parquet", + FileType::Puffin => "download_puffin", + }]) + .start_timer(); + + let reader = remote_store + .reader_with(remote_path) + .concurrent(DOWNLOAD_READER_CONCURRENCY) + .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize) + .await + .context(error::OpenDalSnafu)? + .into_futures_async_read(0..file_size) + .await + .context(error::OpenDalSnafu)?; + + let cache_path = self.cache_file_path(index_key); + let mut writer = self + .local_store + .writer(&cache_path) + .await + .context(error::OpenDalSnafu)? + .into_futures_async_write(); + + let region_id = index_key.region_id; + let file_id = index_key.file_id; + let bytes_written = + futures::io::copy(reader, &mut writer) + .await + .context(error::DownloadSnafu { + region_id, + file_id, + file_type, + })?; + writer.close().await.context(error::DownloadSnafu { + region_id, + file_id, + file_type, + })?; + + WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written); + + let elapsed = timer.stop_and_record(); + debug!( + "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s", + remote_path, cache_path, bytes_written, region_id, elapsed, + ); + + let index_value = IndexValue { + file_size: bytes_written as _, + }; + self.put(index_key, index_value).await; + Ok(()) } } @@ -377,6 +596,14 @@ impl FileType { FileType::Puffin => "puffin", } } + + /// Returns the metric label for this file type. + fn metric_label(&self) -> &'static str { + match self { + FileType::Parquet => FILE_TYPE, + FileType::Puffin => INDEX_TYPE, + } + } } /// An entity that describes the file in the file cache. @@ -429,6 +656,7 @@ mod tests { local_store.clone(), ReadableSize::mb(10), Some(Duration::from_millis(10)), + None, ); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); @@ -455,7 +683,7 @@ mod tests { let exist = cache.reader(key).await; assert!(exist.is_some()); tokio::time::sleep(Duration::from_millis(15)).await; - cache.memory_index.run_pending_tasks().await; + cache.parquet_index.run_pending_tasks().await; let non = cache.reader(key).await; assert!(non.is_none()); } @@ -465,7 +693,7 @@ mod tests { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = IndexKey::new(region_id, file_id, FileType::Parquet); @@ -493,19 +721,19 @@ mod tests { assert_eq!("hello", String::from_utf8(buf).unwrap()); // Get weighted size. - cache.memory_index.run_pending_tasks().await; - assert_eq!(5, cache.memory_index.weighted_size()); + cache.parquet_index.run_pending_tasks().await; + assert_eq!(5, cache.parquet_index.weighted_size()); // Remove the file. cache.remove(key).await; assert!(cache.reader(key).await.is_none()); // Ensure all pending tasks of the moka cache is done before assertion. - cache.memory_index.run_pending_tasks().await; + cache.parquet_index.run_pending_tasks().await; // The file also not exists. assert!(!local_store.exists(&file_path).await.unwrap()); - assert_eq!(0, cache.memory_index.weighted_size()); + assert_eq!(0, cache.parquet_index.weighted_size()); } #[tokio::test] @@ -513,7 +741,7 @@ mod tests { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = IndexKey::new(region_id, file_id, FileType::Parquet); @@ -538,14 +766,14 @@ mod tests { // Reader is none. assert!(cache.reader(key).await.is_none()); // Key is removed. - assert!(!cache.memory_index.contains_key(&key)); + assert!(!cache.parquet_index.contains_key(&key)); } #[tokio::test] async fn test_file_cache_recover() { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None); let region_id = RegionId::new(2000, 0); let file_type = FileType::Parquet; @@ -575,6 +803,7 @@ mod tests { local_store.clone(), ReadableSize::mb(10), None, + None, )); // No entry before recovery. assert!( @@ -583,11 +812,11 @@ mod tests { .await .is_none() ); - cache.recover(true).await; + cache.recover(true, None).await; // Check size. - cache.memory_index.run_pending_tasks().await; - assert_eq!(total_size, cache.memory_index.weighted_size() as usize); + cache.parquet_index.run_pending_tasks().await; + assert_eq!(total_size, cache.parquet_index.weighted_size() as usize); for (i, file_id) in file_ids.iter().enumerate() { let key = IndexKey::new(region_id, *file_id, file_type); @@ -601,7 +830,7 @@ mod tests { async fn test_file_cache_read_ranges() { let dir = create_temp_dir(""); let local_store = new_fs_store(dir.path().to_str().unwrap()); - let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None); + let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None); let region_id = RegionId::new(2000, 0); let file_id = FileId::random(); let key = IndexKey::new(region_id, file_id, FileType::Parquet); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 96f2030562..b54e3e6f73 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -23,6 +23,7 @@ use futures::AsyncWriteExt; use object_store::ObjectStore; use snafu::ResultExt; use store_api::storage::RegionId; +use tokio::sync::mpsc::{UnboundedSender, unbounded_channel}; use crate::access_layer::{ FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, SstWriteRequest, @@ -30,9 +31,8 @@ use crate::access_layer::{ }; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; -use crate::metrics::{ - UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED, -}; +use crate::metrics::UPLOAD_BYTES_TOTAL; +use crate::region::opener::RegionLoadCacheTask; use crate::sst::file::RegionFileId; use crate::sst::index::IndexerBuilderImpl; use crate::sst::index::intermediate::IntermediateManager; @@ -51,6 +51,8 @@ pub struct WriteCache { puffin_manager_factory: PuffinManagerFactory, /// Intermediate manager for index. intermediate_manager: IntermediateManager, + /// Sender for region load cache tasks. + task_sender: UnboundedSender, } pub type WriteCacheRef = Arc; @@ -62,16 +64,25 @@ impl WriteCache { local_store: ObjectStore, cache_capacity: ReadableSize, ttl: Option, + index_cache_percent: Option, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result { - let file_cache = Arc::new(FileCache::new(local_store, cache_capacity, ttl)); - file_cache.recover(false).await; + let (task_sender, task_receiver) = unbounded_channel(); + + let file_cache = Arc::new(FileCache::new( + local_store, + cache_capacity, + ttl, + index_cache_percent, + )); + file_cache.recover(false, Some(task_receiver)).await; Ok(Self { file_cache, puffin_manager_factory, intermediate_manager, + task_sender, }) } @@ -80,6 +91,7 @@ impl WriteCache { cache_dir: &str, cache_capacity: ReadableSize, ttl: Option, + index_cache_percent: Option, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager, ) -> Result { @@ -90,6 +102,7 @@ impl WriteCache { local_store, cache_capacity, ttl, + index_cache_percent, puffin_manager_factory, intermediate_manager, ) @@ -272,85 +285,9 @@ impl WriteCache { remote_store: &ObjectStore, file_size: u64, ) -> Result<()> { - if let Err(e) = self - .download_without_cleaning(index_key, remote_path, remote_store, file_size) + self.file_cache + .download(index_key, remote_path, remote_store, file_size) .await - { - let filename = index_key.to_string(); - TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename]) - .await; - - return Err(e); - } - Ok(()) - } - - async fn download_without_cleaning( - &self, - index_key: IndexKey, - remote_path: &str, - remote_store: &ObjectStore, - file_size: u64, - ) -> Result<()> { - const DOWNLOAD_READER_CONCURRENCY: usize = 8; - const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); - - let file_type = index_key.file_type; - let timer = WRITE_CACHE_DOWNLOAD_ELAPSED - .with_label_values(&[match file_type { - FileType::Parquet => "download_parquet", - FileType::Puffin => "download_puffin", - }]) - .start_timer(); - - let reader = remote_store - .reader_with(remote_path) - .concurrent(DOWNLOAD_READER_CONCURRENCY) - .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize) - .await - .context(error::OpenDalSnafu)? - .into_futures_async_read(0..file_size) - .await - .context(error::OpenDalSnafu)?; - - let cache_path = self.file_cache.cache_file_path(index_key); - let mut writer = self - .file_cache - .local_store() - .writer(&cache_path) - .await - .context(error::OpenDalSnafu)? - .into_futures_async_write(); - - let region_id = index_key.region_id; - let file_id = index_key.file_id; - let bytes_written = - futures::io::copy(reader, &mut writer) - .await - .context(error::DownloadSnafu { - region_id, - file_id, - file_type, - })?; - writer.close().await.context(error::DownloadSnafu { - region_id, - file_id, - file_type, - })?; - - WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written); - - let elapsed = timer.stop_and_record(); - debug!( - "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s", - remote_path, cache_path, bytes_written, region_id, elapsed, - ); - - let index_value = IndexValue { - file_size: bytes_written as _, - }; - self.file_cache.put(index_key, index_value).await; - Ok(()) } /// Uploads a Parquet file or a Puffin file to the remote object store. @@ -424,6 +361,13 @@ impl WriteCache { Ok(()) } + + /// Sends a region load cache task to the background processing queue. + /// + /// If the receiver has been dropped, the error is ignored. + pub(crate) fn load_region_cache(&self, task: RegionLoadCacheTask) { + let _ = self.task_sender.send(task); + } } /// Request to write and upload a SST. diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 6b787f070f..53cc745fe5 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -25,6 +25,7 @@ use common_telemetry::warn; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use crate::cache::file_cache::DEFAULT_INDEX_CACHE_PERCENT; use crate::error::Result; use crate::gc::GcConfig; use crate::memtable::MemtableConfig; @@ -119,6 +120,12 @@ pub struct MitoConfig { /// TTL for write cache. #[serde(with = "humantime_serde")] pub write_cache_ttl: Option, + /// Preload index (puffin) files into cache on region open (default: true). + pub preload_index_cache: bool, + /// Percentage of write cache capacity allocated for index (puffin) files (default: 20). + /// The remaining capacity is used for data (parquet) files. + /// Must be between 0 and 100 (exclusive). + pub index_cache_percent: u8, // Other configs: /// Buffer size for SST writing. @@ -182,6 +189,8 @@ impl Default for MitoConfig { write_cache_path: String::new(), write_cache_size: ReadableSize::gb(5), write_cache_ttl: None, + preload_index_cache: true, + index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT, sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES, @@ -271,6 +280,15 @@ impl MitoConfig { self.write_cache_path = data_home.to_string(); } + // Validate index_cache_percent is within valid range (0, 100) + if self.index_cache_percent == 0 || self.index_cache_percent >= 100 { + warn!( + "Invalid index_cache_percent {}, resetting to default {}", + self.index_cache_percent, DEFAULT_INDEX_CACHE_PERCENT + ); + self.index_cache_percent = DEFAULT_INDEX_CACHE_PERCENT; + } + self.index.sanitize(data_home, &self.inverted_index)?; Ok(()) diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index dcf8752dcd..a4b2c570e7 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -479,6 +479,18 @@ lazy_static! { "greptime_mito_gc_delete_file_count", "mito gc deleted file count", ).unwrap(); + + /// Total number of files downloaded during cache fill on region open. + pub static ref CACHE_FILL_DOWNLOADED_FILES: IntCounter = register_int_counter!( + "mito_cache_fill_downloaded_files", + "mito cache fill downloaded files count", + ).unwrap(); + + /// Number of files pending download during cache fill on region open. + pub static ref CACHE_FILL_PENDING_FILES: IntGauge = register_int_gauge!( + "mito_cache_fill_pending_files", + "mito cache fill pending files count", + ).unwrap(); } /// Stager notifier to collect metrics. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index af2fa093b8..100443d322 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -41,6 +41,7 @@ use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::AccessLayer; use crate::cache::CacheManagerRef; +use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::config::MitoConfig; use crate::error; use crate::error::{ @@ -53,20 +54,22 @@ use crate::manifest::storage::manifest_compress_type; use crate::memtable::MemtableBuilderProvider; use crate::memtable::bulk::part::BulkPart; use crate::memtable::time_partition::TimePartitions; +use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES}; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; use crate::region::{ - ManifestContext, ManifestStats, MitoRegion, RegionLeaderState, RegionRoleState, + ManifestContext, ManifestStats, MitoRegion, MitoRegionRef, RegionLeaderState, RegionRoleState, }; use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::FormatType; +use crate::sst::file::RegionFileId; use crate::sst::file_purger::create_local_file_purger; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; -use crate::sst::location::region_dir_from_table_dir; +use crate::sst::location::{self, region_dir_from_table_dir}; use crate::time_provider::TimeProviderRef; use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; @@ -217,7 +220,7 @@ impl RegionOpener { mut self, config: &MitoConfig, wal: &Wal, - ) -> Result { + ) -> Result { let region_id = self.region_id; let region_dir = self.region_dir(); let metadata = self.build_metadata()?; @@ -305,7 +308,7 @@ impl RegionOpener { )); let now = self.time_provider.current_time_millis(); - Ok(MitoRegion { + Ok(Arc::new(MitoRegion { region_id, version_control, access_layer: access_layer.clone(), @@ -329,7 +332,7 @@ impl RegionOpener { written_bytes: Arc::new(AtomicU64::new(0)), sst_format, stats: self.stats, - }) + })) } /// Opens an existing region in read only mode. @@ -339,7 +342,7 @@ impl RegionOpener { mut self, config: &MitoConfig, wal: &Wal, - ) -> Result { + ) -> Result { let region_id = self.region_id; let region_dir = self.region_dir(); let region = self @@ -397,7 +400,7 @@ impl RegionOpener { &mut self, config: &MitoConfig, wal: &Wal, - ) -> Result> { + ) -> Result> { let region_options = self.options.as_ref().unwrap().clone(); let region_manifest_options = Self::manifest_options( @@ -539,8 +542,8 @@ impl RegionOpener { let region = MitoRegion { region_id: self.region_id, - version_control, - access_layer, + version_control: version_control.clone(), + access_layer: access_layer.clone(), // Region is always opened in read only mode. manifest_ctx: Arc::new(ManifestContext::new( manifest_manager, @@ -557,6 +560,11 @@ impl RegionOpener { sst_format, stats: self.stats.clone(), }; + + let region = Arc::new(region); + + maybe_load_cache(®ion, config, &self.cache_manager); + Ok(Some(region)) } @@ -809,3 +817,155 @@ where pub(crate) fn new_manifest_dir(region_dir: &str) -> String { join_dir(region_dir, "manifest") } + +/// A task to load and fill the region file cache. +pub(crate) struct RegionLoadCacheTask { + region: MitoRegionRef, +} + +impl RegionLoadCacheTask { + pub(crate) fn new(region: MitoRegionRef) -> Self { + Self { region } + } + + /// Fills the file cache with index files from the region. + pub(crate) async fn fill_cache(&self, file_cache: FileCacheRef) { + let region_id = self.region.region_id; + let table_dir = self.region.access_layer.table_dir(); + let path_type = self.region.access_layer.path_type(); + let object_store = self.region.access_layer.object_store(); + let version_control = &self.region.version_control; + + // Collects IndexKeys and file sizes for files that need to be downloaded + let mut files_to_download = Vec::new(); + let mut files_already_cached = 0; + + { + let version = version_control.current().version; + for level in version.ssts.levels() { + for file_handle in level.files.values() { + let file_meta = file_handle.meta_ref(); + if file_meta.exists_index() { + let puffin_key = IndexKey::new( + file_meta.region_id, + file_meta.index_file_id().file_id(), + FileType::Puffin, + ); + + if !file_cache.contains_key(&puffin_key) { + files_to_download.push((puffin_key, file_meta.index_file_size)); + } else { + files_already_cached += 1; + } + } + } + } + // Releases the Version after the scope to avoid holding the memtables and file handles + // for a long time. + } + let total_files = files_to_download.len() as i64; + + info!( + "Starting background index cache preload for region {}, total_files_to_download: {}, files_already_cached: {}", + region_id, total_files, files_already_cached + ); + + CACHE_FILL_PENDING_FILES.add(total_files); + + let mut files_downloaded = 0; + let mut files_skipped = 0; + + for (puffin_key, file_size) in files_to_download { + let current_size = file_cache.puffin_cache_size(); + let capacity = file_cache.puffin_cache_capacity(); + let region_state = self.region.state(); + if !can_load_cache(region_state) { + info!( + "Stopping index cache by state: {:?}, region: {}, current_size: {}, capacity: {}", + region_state, region_id, current_size, capacity + ); + break; + } + + // Checks if adding this file would exceed capacity + if current_size + file_size > capacity { + info!( + "Stopping index cache preload due to capacity limit, region: {}, file_id: {}, current_size: {}, file_size: {}, capacity: {}", + region_id, puffin_key.file_id, current_size, file_size, capacity + ); + files_skipped = (total_files - files_downloaded) as usize; + CACHE_FILL_PENDING_FILES.sub(total_files - files_downloaded); + break; + } + + let index_remote_path = location::index_file_path( + table_dir, + RegionFileId::new(puffin_key.region_id, puffin_key.file_id), + path_type, + ); + + match file_cache + .download(puffin_key, &index_remote_path, object_store, file_size) + .await + { + Ok(_) => { + debug!( + "Downloaded index file to write cache, region: {}, file_id: {}", + region_id, puffin_key.file_id + ); + files_downloaded += 1; + CACHE_FILL_DOWNLOADED_FILES.inc_by(1); + CACHE_FILL_PENDING_FILES.dec(); + } + Err(e) => { + warn!( + e; "Failed to download index file to write cache, region: {}, file_id: {}", + region_id, puffin_key.file_id + ); + CACHE_FILL_PENDING_FILES.dec(); + } + } + } + + info!( + "Completed background cache fill task for region {}, total_files: {}, files_downloaded: {}, files_already_cached: {}, files_skipped: {}", + region_id, total_files, files_downloaded, files_already_cached, files_skipped + ); + } +} + +/// Loads all index (Puffin) files from the version into the write cache. +fn maybe_load_cache( + region: &MitoRegionRef, + config: &MitoConfig, + cache_manager: &Option, +) { + let Some(cache_manager) = cache_manager else { + return; + }; + let Some(write_cache) = cache_manager.write_cache() else { + return; + }; + + let preload_enabled = config.preload_index_cache; + if !preload_enabled { + return; + } + + let task = RegionLoadCacheTask::new(region.clone()); + write_cache.load_region_cache(task); +} + +fn can_load_cache(state: RegionRoleState) -> bool { + match state { + RegionRoleState::Leader(RegionLeaderState::Writable) + | RegionRoleState::Leader(RegionLeaderState::Staging) + | RegionRoleState::Leader(RegionLeaderState::Altering) + | RegionRoleState::Leader(RegionLeaderState::Editing) + | RegionRoleState::Follower => true, + // The region will be closed soon if it is downgrading. + RegionRoleState::Leader(RegionLeaderState::Downgrading) + | RegionRoleState::Leader(RegionLeaderState::Dropping) + | RegionRoleState::Leader(RegionLeaderState::Truncating) => false, + } +} diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 83bceab351..c51a3893e6 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -1714,6 +1714,7 @@ mod tests { dir.path().to_str().unwrap(), ReadableSize::mb(10), None, + None, factory, intm_manager, ) diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index e86ab29158..baaa7fe343 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -629,6 +629,7 @@ impl TestEnv { local_store, capacity, None, + None, self.puffin_manager.clone(), self.intermediate_manager.clone(), ) @@ -648,6 +649,7 @@ impl TestEnv { path, capacity, None, + None, self.puffin_manager.clone(), self.intermediate_manager.clone(), ) diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index a37af4d472..60ac23af33 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -450,6 +450,7 @@ pub async fn write_cache_from_config( &config.write_cache_path, config.write_cache_size, config.write_cache_ttl, + Some(config.index_cache_percent), puffin_manager_factory, intermediate_manager, ) diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 8b744f7d6a..8ba8b75b29 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -111,26 +111,24 @@ impl RegionWorkerLoop { info!( "Reopening the region: {region_id}, manifest version: {manifest_version}, flushed entry id: {flushed_entry_id}" ); - let reopened_region = Arc::new( - RegionOpener::new( - region_id, - region.table_dir(), - region.access_layer.path_type(), - self.memtable_builder_provider.clone(), - self.object_store_manager.clone(), - self.purge_scheduler.clone(), - self.puffin_manager_factory.clone(), - self.intermediate_manager.clone(), - self.time_provider.clone(), - self.file_ref_manager.clone(), - self.partition_expr_fetcher.clone(), - ) - .cache(Some(self.cache_manager.clone())) - .options(region.version().options.clone())? - .skip_wal_replay(true) - .open(&self.config, &self.wal) - .await?, - ); + let reopened_region = RegionOpener::new( + region_id, + region.table_dir(), + region.access_layer.path_type(), + self.memtable_builder_provider.clone(), + self.object_store_manager.clone(), + self.purge_scheduler.clone(), + self.puffin_manager_factory.clone(), + self.intermediate_manager.clone(), + self.time_provider.clone(), + self.file_ref_manager.clone(), + self.partition_expr_fetcher.clone(), + ) + .cache(Some(self.cache_manager.clone())) + .options(region.version().options.clone())? + .skip_wal_replay(true) + .open(&self.config, &self.wal) + .await?; debug_assert!(!reopened_region.is_writable()); self.regions.insert_region(reopened_region.clone()); diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 3c5f091a1a..9e812ba88f 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -14,8 +14,6 @@ //! Handling create request. -use std::sync::Arc; - use common_telemetry::info; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataBuilder; @@ -84,7 +82,7 @@ impl RegionWorkerLoop { self.region_count.inc(); // Insert the MitoRegion into the RegionMap. - self.regions.insert_region(Arc::new(region)); + self.regions.insert_region(region); Ok(0) } diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 88c4b314b1..78a4b16210 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -431,8 +431,11 @@ async fn edit_region( let is_index_exist = file_meta.exists_index(); let index_file_size = file_meta.index_file_size(); - let index_file_index_key = - IndexKey::new(region_id, file_meta.file_id, FileType::Puffin); + let index_file_index_key = IndexKey::new( + region_id, + file_meta.index_file_id().file_id(), + FileType::Puffin, + ); let index_remote_path = location::index_file_path( layer.table_dir(), file_meta.file_id(), diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 4d71289c08..420f8380db 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -133,7 +133,7 @@ impl RegionWorkerLoop { region_count.inc(); // Insert the Region into the RegionMap. - regions.insert_region(Arc::new(region)); + regions.insert_region(region); let senders = opening_regions.remove_sender(region_id); for sender in senders { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b90541f6da..d0b603c800 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1497,6 +1497,8 @@ auto_flush_interval = "30m" enable_write_cache = false write_cache_path = "" write_cache_size = "5GiB" +preload_index_cache = true +index_cache_percent = 20 sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 max_concurrent_scan_files = 384