Compare commits

...

1 Commits

Author SHA1 Message Date
evenyag
ab1928d5fd feat: refetch sst on open 2025-03-17 15:17:14 +08:00
3 changed files with 122 additions and 2 deletions

View File

@@ -53,6 +53,8 @@ pub(crate) struct FileCache {
///
/// File id is enough to identity a file uniquely.
memory_index: Cache<IndexKey, IndexValue>,
/// Capacity of the cache.
capacity: ReadableSize,
}
pub(crate) type FileCacheRef = Arc<FileCache>;
@@ -103,6 +105,7 @@ impl FileCache {
FileCache {
local_store,
memory_index,
capacity,
}
}
@@ -198,6 +201,20 @@ impl FileCache {
}
}
/// Returns the available space in the file cache.
pub(crate) fn available_space(&self) -> u64 {
if self.capacity.as_bytes() > self.memory_index.weighted_size() {
self.capacity.as_bytes() - self.memory_index.weighted_size()
} else {
0
}
}
/// Returns the capacity of the file cache.
pub(crate) fn capacity(&self) -> u64 {
self.capacity.as_bytes()
}
async fn recover_inner(&self) -> Result<()> {
let now = Instant::now();
let mut lister = self

View File

@@ -263,6 +263,16 @@ impl WriteCache {
Ok(())
}
/// Returns the available space in the write cache.
pub(crate) fn available_space(&self) -> u64 {
self.file_cache.available_space()
}
/// Returns the capacity of the write cache.
pub(crate) fn capacity(&self) -> u64 {
self.file_cache.capacity()
}
/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
&self,

View File

@@ -15,6 +15,7 @@
//! Handling open request.
use std::sync::Arc;
use std::time::Instant;
use common_telemetry::info;
use object_store::util::join_path;
@@ -24,14 +25,18 @@ use store_api::region_request::RegionOpenRequest;
use store_api::storage::RegionId;
use table::requests::STORAGE_KEY;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::write_cache::WriteCacheRef;
use crate::error::{
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
};
use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
use crate::region::opener::RegionOpener;
use crate::request::OptionOutputTx;
use crate::sst::location;
use crate::wal::entry_distributor::WalEntryReceiver;
use crate::worker::handle_drop::remove_region_dir_once;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
use crate::worker::{MitoRegionRef, RegionWorkerLoop, DROPPING_MARKER_FILE};
impl<S: LogStore> RegionWorkerLoop<S> {
async fn check_and_cleanup_region(
@@ -118,6 +123,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let opening_regions = self.opening_regions.clone();
let region_count = self.region_count.clone();
let worker_id = self.id;
let write_cache = self.cache_manager.write_cache().cloned();
opening_regions.insert_sender(region_id, sender);
common_runtime::spawn_global(async move {
match opener.open(&config, &wal).await {
@@ -126,12 +132,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_count.inc();
// Insert the Region into the RegionMap.
regions.insert_region(Arc::new(region));
let region = Arc::new(region);
regions.insert_region(region.clone());
let senders = opening_regions.remove_sender(region_id);
for sender in senders {
sender.send(Ok(0));
}
if let Some(write_cache) = write_cache {
prefetch_latest_ssts(region, write_cache).await;
}
}
Err(err) => {
let senders = opening_regions.remove_sender(region_id);
@@ -144,3 +155,85 @@ impl<S: LogStore> RegionWorkerLoop<S> {
});
}
}
/// Download latest SSTs from the remote storage for the region.
async fn prefetch_latest_ssts(region: MitoRegionRef, write_cache: WriteCacheRef) {
let version = region.version();
// Sort ssts by time range in descending order.
let mut ssts: Vec<_> = version
.ssts
.levels()
.iter()
.flat_map(|level| level.files())
.collect();
ssts.sort_unstable_by(|left, right| right.time_range().1.cmp(&left.time_range().1));
let layer = region.access_layer.clone();
let region_id = region.region_id;
// Prefetch the latest SSTs.
let mut has_err = false;
let mut fetched = 0;
let mut downloaded_bytes = 0;
let start = Instant::now();
for sst in ssts {
if has_err || write_cache.available_space() <= write_cache.capacity() / 2 {
break;
}
WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
let file_meta = sst.meta_ref();
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
let file_size = file_meta.file_size;
if let Err(err) = write_cache
.download(index_key, &remote_path, layer.object_store(), file_size)
.await
{
common_telemetry::error!(
err; "Failed to download parquet file, region_id: {}, index_key: {:?}, remote_path: {}", region_id, index_key, remote_path
);
has_err = true;
} else {
fetched += 1;
downloaded_bytes += file_size;
}
let is_index_exist = file_meta.exists_index();
if !has_err && is_index_exist {
let index_file_size = file_meta.index_file_size();
let index_file_index_key =
IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
let index_remote_path =
location::index_file_path(layer.region_dir(), file_meta.file_id);
// also download puffin file
if let Err(err) = write_cache
.download(
index_file_index_key,
&index_remote_path,
layer.object_store(),
index_file_size,
)
.await
{
common_telemetry::error!(
err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
);
has_err = true;
} else {
fetched += 1;
downloaded_bytes += index_file_size;
}
}
WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
}
common_telemetry::info!(
"region {} prefetched {} files with total {} bytes, elapsed: {:?}",
region_id,
fetched,
downloaded_bytes,
start.elapsed(),
);
}