diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 54fb5e47c0..f0bf9fc578 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -53,6 +53,8 @@ pub(crate) struct FileCache { /// /// File id is enough to identity a file uniquely. memory_index: Cache, + /// Capacity of the cache. + capacity: ReadableSize, } pub(crate) type FileCacheRef = Arc; @@ -103,6 +105,7 @@ impl FileCache { FileCache { local_store, memory_index, + capacity, } } @@ -198,6 +201,20 @@ impl FileCache { } } + /// Returns the available space in the file cache. + pub(crate) fn available_space(&self) -> u64 { + if self.capacity.as_bytes() > self.memory_index.weighted_size() { + self.capacity.as_bytes() - self.memory_index.weighted_size() + } else { + 0 + } + } + + /// Returns the capacity of the file cache. + pub(crate) fn capacity(&self) -> u64 { + self.capacity.as_bytes() + } + async fn recover_inner(&self) -> Result<()> { let now = Instant::now(); let mut lister = self diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 974f0caef0..40bcccf893 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -263,6 +263,16 @@ impl WriteCache { Ok(()) } + /// Returns the available space in the write cache. + pub(crate) fn available_space(&self) -> u64 { + self.file_cache.available_space() + } + + /// Returns the capacity of the write cache. + pub(crate) fn capacity(&self) -> u64 { + self.file_cache.capacity() + } + /// Uploads a Parquet file or a Puffin file to the remote object store. async fn upload( &self, diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index ea6f011a90..4f59391436 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -15,6 +15,7 @@ //! Handling open request. use std::sync::Arc; +use std::time::Instant; use common_telemetry::info; use object_store::util::join_path; @@ -24,14 +25,18 @@ use store_api::region_request::RegionOpenRequest; use store_api::storage::RegionId; use table::requests::STORAGE_KEY; +use crate::cache::file_cache::{FileType, IndexKey}; +use crate::cache::write_cache::WriteCacheRef; use crate::error::{ ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result, }; +use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD; use crate::region::opener::RegionOpener; use crate::request::OptionOutputTx; +use crate::sst::location; use crate::wal::entry_distributor::WalEntryReceiver; use crate::worker::handle_drop::remove_region_dir_once; -use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; +use crate::worker::{MitoRegionRef, RegionWorkerLoop, DROPPING_MARKER_FILE}; impl RegionWorkerLoop { async fn check_and_cleanup_region( @@ -118,6 +123,7 @@ impl RegionWorkerLoop { let opening_regions = self.opening_regions.clone(); let region_count = self.region_count.clone(); let worker_id = self.id; + let write_cache = self.cache_manager.write_cache().cloned(); opening_regions.insert_sender(region_id, sender); common_runtime::spawn_global(async move { match opener.open(&config, &wal).await { @@ -126,12 +132,17 @@ impl RegionWorkerLoop { region_count.inc(); // Insert the Region into the RegionMap. - regions.insert_region(Arc::new(region)); + let region = Arc::new(region); + regions.insert_region(region.clone()); let senders = opening_regions.remove_sender(region_id); for sender in senders { sender.send(Ok(0)); } + + if let Some(write_cache) = write_cache { + prefetch_latest_ssts(region, write_cache).await; + } } Err(err) => { let senders = opening_regions.remove_sender(region_id); @@ -144,3 +155,85 @@ impl RegionWorkerLoop { }); } } + +/// Download latest SSTs from the remote storage for the region. +async fn prefetch_latest_ssts(region: MitoRegionRef, write_cache: WriteCacheRef) { + let version = region.version(); + // Sort ssts by time range in descending order. + let mut ssts: Vec<_> = version + .ssts + .levels() + .iter() + .flat_map(|level| level.files()) + .collect(); + ssts.sort_unstable_by(|left, right| right.time_range().1.cmp(&left.time_range().1)); + + let layer = region.access_layer.clone(); + let region_id = region.region_id; + // Prefetch the latest SSTs. + let mut has_err = false; + let mut fetched = 0; + let mut downloaded_bytes = 0; + let start = Instant::now(); + for sst in ssts { + if has_err || write_cache.available_space() <= write_cache.capacity() / 2 { + break; + } + + WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1); + + let file_meta = sst.meta_ref(); + let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet); + let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id); + let file_size = file_meta.file_size; + if let Err(err) = write_cache + .download(index_key, &remote_path, layer.object_store(), file_size) + .await + { + common_telemetry::error!( + err; "Failed to download parquet file, region_id: {}, index_key: {:?}, remote_path: {}", region_id, index_key, remote_path + ); + has_err = true; + } else { + fetched += 1; + downloaded_bytes += file_size; + } + + let is_index_exist = file_meta.exists_index(); + if !has_err && is_index_exist { + let index_file_size = file_meta.index_file_size(); + let index_file_index_key = + IndexKey::new(region_id, file_meta.file_id, FileType::Puffin); + let index_remote_path = + location::index_file_path(layer.region_dir(), file_meta.file_id); + // also download puffin file + if let Err(err) = write_cache + .download( + index_file_index_key, + &index_remote_path, + layer.object_store(), + index_file_size, + ) + .await + { + common_telemetry::error!( + err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path + ); + has_err = true; + } else { + fetched += 1; + downloaded_bytes += index_file_size; + } + } + + WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1); + } + + common_telemetry::info!( + "region {} prefetched {} files with total {} bytes, elapsed: {:?}", + region_id, + fetched, + downloaded_bytes, + start.elapsed(), + ); +}