mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
1 Commits
v0.18.0-ni
...
feat/prefi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab1928d5fd |
17
src/mito2/src/cache/file_cache.rs
vendored
17
src/mito2/src/cache/file_cache.rs
vendored
@@ -53,6 +53,8 @@ pub(crate) struct FileCache {
|
||||
///
|
||||
/// File id is enough to identity a file uniquely.
|
||||
memory_index: Cache<IndexKey, IndexValue>,
|
||||
/// Capacity of the cache.
|
||||
capacity: ReadableSize,
|
||||
}
|
||||
|
||||
pub(crate) type FileCacheRef = Arc<FileCache>;
|
||||
@@ -103,6 +105,7 @@ impl FileCache {
|
||||
FileCache {
|
||||
local_store,
|
||||
memory_index,
|
||||
capacity,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,6 +201,20 @@ impl FileCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the available space in the file cache.
|
||||
pub(crate) fn available_space(&self) -> u64 {
|
||||
if self.capacity.as_bytes() > self.memory_index.weighted_size() {
|
||||
self.capacity.as_bytes() - self.memory_index.weighted_size()
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the capacity of the file cache.
|
||||
pub(crate) fn capacity(&self) -> u64 {
|
||||
self.capacity.as_bytes()
|
||||
}
|
||||
|
||||
async fn recover_inner(&self) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
let mut lister = self
|
||||
|
||||
10
src/mito2/src/cache/write_cache.rs
vendored
10
src/mito2/src/cache/write_cache.rs
vendored
@@ -263,6 +263,16 @@ impl WriteCache {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the available space in the write cache.
|
||||
pub(crate) fn available_space(&self) -> u64 {
|
||||
self.file_cache.available_space()
|
||||
}
|
||||
|
||||
/// Returns the capacity of the write cache.
|
||||
pub(crate) fn capacity(&self) -> u64 {
|
||||
self.file_cache.capacity()
|
||||
}
|
||||
|
||||
/// Uploads a Parquet file or a Puffin file to the remote object store.
|
||||
async fn upload(
|
||||
&self,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! Handling open request.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::info;
|
||||
use object_store::util::join_path;
|
||||
@@ -24,14 +25,18 @@ use store_api::region_request::RegionOpenRequest;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::STORAGE_KEY;
|
||||
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::write_cache::WriteCacheRef;
|
||||
use crate::error::{
|
||||
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
|
||||
};
|
||||
use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
|
||||
use crate::region::opener::RegionOpener;
|
||||
use crate::request::OptionOutputTx;
|
||||
use crate::sst::location;
|
||||
use crate::wal::entry_distributor::WalEntryReceiver;
|
||||
use crate::worker::handle_drop::remove_region_dir_once;
|
||||
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
|
||||
use crate::worker::{MitoRegionRef, RegionWorkerLoop, DROPPING_MARKER_FILE};
|
||||
|
||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
async fn check_and_cleanup_region(
|
||||
@@ -118,6 +123,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
let opening_regions = self.opening_regions.clone();
|
||||
let region_count = self.region_count.clone();
|
||||
let worker_id = self.id;
|
||||
let write_cache = self.cache_manager.write_cache().cloned();
|
||||
opening_regions.insert_sender(region_id, sender);
|
||||
common_runtime::spawn_global(async move {
|
||||
match opener.open(&config, &wal).await {
|
||||
@@ -126,12 +132,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
region_count.inc();
|
||||
|
||||
// Insert the Region into the RegionMap.
|
||||
regions.insert_region(Arc::new(region));
|
||||
let region = Arc::new(region);
|
||||
regions.insert_region(region.clone());
|
||||
|
||||
let senders = opening_regions.remove_sender(region_id);
|
||||
for sender in senders {
|
||||
sender.send(Ok(0));
|
||||
}
|
||||
|
||||
if let Some(write_cache) = write_cache {
|
||||
prefetch_latest_ssts(region, write_cache).await;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let senders = opening_regions.remove_sender(region_id);
|
||||
@@ -144,3 +155,85 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Download latest SSTs from the remote storage for the region.
|
||||
async fn prefetch_latest_ssts(region: MitoRegionRef, write_cache: WriteCacheRef) {
|
||||
let version = region.version();
|
||||
// Sort ssts by time range in descending order.
|
||||
let mut ssts: Vec<_> = version
|
||||
.ssts
|
||||
.levels()
|
||||
.iter()
|
||||
.flat_map(|level| level.files())
|
||||
.collect();
|
||||
ssts.sort_unstable_by(|left, right| right.time_range().1.cmp(&left.time_range().1));
|
||||
|
||||
let layer = region.access_layer.clone();
|
||||
let region_id = region.region_id;
|
||||
// Prefetch the latest SSTs.
|
||||
let mut has_err = false;
|
||||
let mut fetched = 0;
|
||||
let mut downloaded_bytes = 0;
|
||||
let start = Instant::now();
|
||||
for sst in ssts {
|
||||
if has_err || write_cache.available_space() <= write_cache.capacity() / 2 {
|
||||
break;
|
||||
}
|
||||
|
||||
WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
|
||||
|
||||
let file_meta = sst.meta_ref();
|
||||
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
|
||||
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
|
||||
let file_size = file_meta.file_size;
|
||||
if let Err(err) = write_cache
|
||||
.download(index_key, &remote_path, layer.object_store(), file_size)
|
||||
.await
|
||||
{
|
||||
common_telemetry::error!(
|
||||
err; "Failed to download parquet file, region_id: {}, index_key: {:?}, remote_path: {}", region_id, index_key, remote_path
|
||||
);
|
||||
has_err = true;
|
||||
} else {
|
||||
fetched += 1;
|
||||
downloaded_bytes += file_size;
|
||||
}
|
||||
|
||||
let is_index_exist = file_meta.exists_index();
|
||||
if !has_err && is_index_exist {
|
||||
let index_file_size = file_meta.index_file_size();
|
||||
let index_file_index_key =
|
||||
IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
|
||||
let index_remote_path =
|
||||
location::index_file_path(layer.region_dir(), file_meta.file_id);
|
||||
// also download puffin file
|
||||
if let Err(err) = write_cache
|
||||
.download(
|
||||
index_file_index_key,
|
||||
&index_remote_path,
|
||||
layer.object_store(),
|
||||
index_file_size,
|
||||
)
|
||||
.await
|
||||
{
|
||||
common_telemetry::error!(
|
||||
err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
|
||||
);
|
||||
has_err = true;
|
||||
} else {
|
||||
fetched += 1;
|
||||
downloaded_bytes += index_file_size;
|
||||
}
|
||||
}
|
||||
|
||||
WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
|
||||
}
|
||||
|
||||
common_telemetry::info!(
|
||||
"region {} prefetched {} files with total {} bytes, elapsed: {:?}",
|
||||
region_id,
|
||||
fetched,
|
||||
downloaded_bytes,
|
||||
start.elapsed(),
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user