mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
Compare commits
1 Commits
v0.18.0-ni
...
feat/prefi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab1928d5fd |
17
src/mito2/src/cache/file_cache.rs
vendored
17
src/mito2/src/cache/file_cache.rs
vendored
@@ -53,6 +53,8 @@ pub(crate) struct FileCache {
|
|||||||
///
|
///
|
||||||
/// File id is enough to identity a file uniquely.
|
/// File id is enough to identity a file uniquely.
|
||||||
memory_index: Cache<IndexKey, IndexValue>,
|
memory_index: Cache<IndexKey, IndexValue>,
|
||||||
|
/// Capacity of the cache.
|
||||||
|
capacity: ReadableSize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) type FileCacheRef = Arc<FileCache>;
|
pub(crate) type FileCacheRef = Arc<FileCache>;
|
||||||
@@ -103,6 +105,7 @@ impl FileCache {
|
|||||||
FileCache {
|
FileCache {
|
||||||
local_store,
|
local_store,
|
||||||
memory_index,
|
memory_index,
|
||||||
|
capacity,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -198,6 +201,20 @@ impl FileCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the available space in the file cache.
|
||||||
|
pub(crate) fn available_space(&self) -> u64 {
|
||||||
|
if self.capacity.as_bytes() > self.memory_index.weighted_size() {
|
||||||
|
self.capacity.as_bytes() - self.memory_index.weighted_size()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the capacity of the file cache.
|
||||||
|
pub(crate) fn capacity(&self) -> u64 {
|
||||||
|
self.capacity.as_bytes()
|
||||||
|
}
|
||||||
|
|
||||||
async fn recover_inner(&self) -> Result<()> {
|
async fn recover_inner(&self) -> Result<()> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let mut lister = self
|
let mut lister = self
|
||||||
|
|||||||
10
src/mito2/src/cache/write_cache.rs
vendored
10
src/mito2/src/cache/write_cache.rs
vendored
@@ -263,6 +263,16 @@ impl WriteCache {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the available space in the write cache.
|
||||||
|
pub(crate) fn available_space(&self) -> u64 {
|
||||||
|
self.file_cache.available_space()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the capacity of the write cache.
|
||||||
|
pub(crate) fn capacity(&self) -> u64 {
|
||||||
|
self.file_cache.capacity()
|
||||||
|
}
|
||||||
|
|
||||||
/// Uploads a Parquet file or a Puffin file to the remote object store.
|
/// Uploads a Parquet file or a Puffin file to the remote object store.
|
||||||
async fn upload(
|
async fn upload(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
//! Handling open request.
|
//! Handling open request.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use common_telemetry::info;
|
use common_telemetry::info;
|
||||||
use object_store::util::join_path;
|
use object_store::util::join_path;
|
||||||
@@ -24,14 +25,18 @@ use store_api::region_request::RegionOpenRequest;
|
|||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
use table::requests::STORAGE_KEY;
|
use table::requests::STORAGE_KEY;
|
||||||
|
|
||||||
|
use crate::cache::file_cache::{FileType, IndexKey};
|
||||||
|
use crate::cache::write_cache::WriteCacheRef;
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
|
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
|
||||||
};
|
};
|
||||||
|
use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
|
||||||
use crate::region::opener::RegionOpener;
|
use crate::region::opener::RegionOpener;
|
||||||
use crate::request::OptionOutputTx;
|
use crate::request::OptionOutputTx;
|
||||||
|
use crate::sst::location;
|
||||||
use crate::wal::entry_distributor::WalEntryReceiver;
|
use crate::wal::entry_distributor::WalEntryReceiver;
|
||||||
use crate::worker::handle_drop::remove_region_dir_once;
|
use crate::worker::handle_drop::remove_region_dir_once;
|
||||||
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
|
use crate::worker::{MitoRegionRef, RegionWorkerLoop, DROPPING_MARKER_FILE};
|
||||||
|
|
||||||
impl<S: LogStore> RegionWorkerLoop<S> {
|
impl<S: LogStore> RegionWorkerLoop<S> {
|
||||||
async fn check_and_cleanup_region(
|
async fn check_and_cleanup_region(
|
||||||
@@ -118,6 +123,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
let opening_regions = self.opening_regions.clone();
|
let opening_regions = self.opening_regions.clone();
|
||||||
let region_count = self.region_count.clone();
|
let region_count = self.region_count.clone();
|
||||||
let worker_id = self.id;
|
let worker_id = self.id;
|
||||||
|
let write_cache = self.cache_manager.write_cache().cloned();
|
||||||
opening_regions.insert_sender(region_id, sender);
|
opening_regions.insert_sender(region_id, sender);
|
||||||
common_runtime::spawn_global(async move {
|
common_runtime::spawn_global(async move {
|
||||||
match opener.open(&config, &wal).await {
|
match opener.open(&config, &wal).await {
|
||||||
@@ -126,12 +132,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
region_count.inc();
|
region_count.inc();
|
||||||
|
|
||||||
// Insert the Region into the RegionMap.
|
// Insert the Region into the RegionMap.
|
||||||
regions.insert_region(Arc::new(region));
|
let region = Arc::new(region);
|
||||||
|
regions.insert_region(region.clone());
|
||||||
|
|
||||||
let senders = opening_regions.remove_sender(region_id);
|
let senders = opening_regions.remove_sender(region_id);
|
||||||
for sender in senders {
|
for sender in senders {
|
||||||
sender.send(Ok(0));
|
sender.send(Ok(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(write_cache) = write_cache {
|
||||||
|
prefetch_latest_ssts(region, write_cache).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
let senders = opening_regions.remove_sender(region_id);
|
let senders = opening_regions.remove_sender(region_id);
|
||||||
@@ -144,3 +155,85 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Download latest SSTs from the remote storage for the region.
|
||||||
|
async fn prefetch_latest_ssts(region: MitoRegionRef, write_cache: WriteCacheRef) {
|
||||||
|
let version = region.version();
|
||||||
|
// Sort ssts by time range in descending order.
|
||||||
|
let mut ssts: Vec<_> = version
|
||||||
|
.ssts
|
||||||
|
.levels()
|
||||||
|
.iter()
|
||||||
|
.flat_map(|level| level.files())
|
||||||
|
.collect();
|
||||||
|
ssts.sort_unstable_by(|left, right| right.time_range().1.cmp(&left.time_range().1));
|
||||||
|
|
||||||
|
let layer = region.access_layer.clone();
|
||||||
|
let region_id = region.region_id;
|
||||||
|
// Prefetch the latest SSTs.
|
||||||
|
let mut has_err = false;
|
||||||
|
let mut fetched = 0;
|
||||||
|
let mut downloaded_bytes = 0;
|
||||||
|
let start = Instant::now();
|
||||||
|
for sst in ssts {
|
||||||
|
if has_err || write_cache.available_space() <= write_cache.capacity() / 2 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
WRITE_CACHE_INFLIGHT_DOWNLOAD.add(1);
|
||||||
|
|
||||||
|
let file_meta = sst.meta_ref();
|
||||||
|
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Parquet);
|
||||||
|
let remote_path = location::sst_file_path(layer.region_dir(), file_meta.file_id);
|
||||||
|
let file_size = file_meta.file_size;
|
||||||
|
if let Err(err) = write_cache
|
||||||
|
.download(index_key, &remote_path, layer.object_store(), file_size)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
common_telemetry::error!(
|
||||||
|
err; "Failed to download parquet file, region_id: {}, index_key: {:?}, remote_path: {}", region_id, index_key, remote_path
|
||||||
|
);
|
||||||
|
has_err = true;
|
||||||
|
} else {
|
||||||
|
fetched += 1;
|
||||||
|
downloaded_bytes += file_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
let is_index_exist = file_meta.exists_index();
|
||||||
|
if !has_err && is_index_exist {
|
||||||
|
let index_file_size = file_meta.index_file_size();
|
||||||
|
let index_file_index_key =
|
||||||
|
IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
|
||||||
|
let index_remote_path =
|
||||||
|
location::index_file_path(layer.region_dir(), file_meta.file_id);
|
||||||
|
// also download puffin file
|
||||||
|
if let Err(err) = write_cache
|
||||||
|
.download(
|
||||||
|
index_file_index_key,
|
||||||
|
&index_remote_path,
|
||||||
|
layer.object_store(),
|
||||||
|
index_file_size,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
common_telemetry::error!(
|
||||||
|
err; "Failed to download puffin file, region_id: {}, index_file_index_key: {:?}, index_remote_path: {}", region_id, index_file_index_key, index_remote_path
|
||||||
|
);
|
||||||
|
has_err = true;
|
||||||
|
} else {
|
||||||
|
fetched += 1;
|
||||||
|
downloaded_bytes += index_file_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WRITE_CACHE_INFLIGHT_DOWNLOAD.sub(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
common_telemetry::info!(
|
||||||
|
"region {} prefetched {} files with total {} bytes, elapsed: {:?}",
|
||||||
|
region_id,
|
||||||
|
fetched,
|
||||||
|
downloaded_bytes,
|
||||||
|
start.elapsed(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user