mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
feat: implement a cache for manifest files (#7326)
* feat: use cache in manifest store Signed-off-by: evenyag <realevenyag@gmail.com> * feat: use ManifestCache Signed-off-by: evenyag <realevenyag@gmail.com> * feat: clean empty manifest dir Signed-off-by: evenyag <realevenyag@gmail.com> * feat: get last checkpoint from cache Signed-off-by: evenyag <realevenyag@gmail.com> * feat: add hit/miss counter for manifest cache Signed-off-by: evenyag <realevenyag@gmail.com> * chore: add logs Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: pass cache to ManifestObjectStore::new Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fix compiler errors Signed-off-by: evenyag <realevenyag@gmail.com> * feat: cache checkpoint Signed-off-by: evenyag <realevenyag@gmail.com> * feat: cache checkpoint in write Signed-off-by: evenyag <realevenyag@gmail.com> * chore: fix compiler warnings Signed-off-by: evenyag <realevenyag@gmail.com> * chore: update config comment Signed-off-by: evenyag <realevenyag@gmail.com> * chore: manifest store cache for staging Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: move recover_inner to FileCacheInner Signed-off-by: evenyag <realevenyag@gmail.com> * feat: remove manifest cache config from MitoConfig Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: reduce clone when cache is enabled Signed-off-by: evenyag <realevenyag@gmail.com> * feat: do not cache staging manifests We clean staging manifests by remove_all which isn't easy to clean the cache in the same way Signed-off-by: evenyag <realevenyag@gmail.com> * fix: fix paths in manifest cache Signed-off-by: evenyag <realevenyag@gmail.com> * chore: don't clean dir if it is too new Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: reuse write cache ttl as manifest cache ttl Signed-off-by: evenyag <realevenyag@gmail.com> * style: fix clippy Signed-off-by: evenyag <realevenyag@gmail.com> * fix: clean all empty subdirectories Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -18,6 +18,7 @@ mod cache_size;
|
||||
|
||||
pub(crate) mod file_cache;
|
||||
pub(crate) mod index;
|
||||
pub(crate) mod manifest_cache;
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_util;
|
||||
pub(crate) mod write_cache;
|
||||
|
||||
708
src/mito2/src/cache/file_cache.rs
vendored
708
src/mito2/src/cache/file_cache.rs
vendored
@@ -55,109 +55,18 @@ pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
|
||||
/// Minimum capacity for each cache (512MB).
|
||||
const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
|
||||
|
||||
/// A file cache manages files on local store and evict files based
|
||||
/// on size.
|
||||
/// Inner struct for FileCache that can be used in spawned tasks.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct FileCache {
|
||||
struct FileCacheInner {
|
||||
/// Local store to cache files.
|
||||
local_store: ObjectStore,
|
||||
/// Index to track cached Parquet files.
|
||||
parquet_index: Cache<IndexKey, IndexValue>,
|
||||
/// Index to track cached Puffin files.
|
||||
puffin_index: Cache<IndexKey, IndexValue>,
|
||||
/// Capacity of the puffin (index) cache in bytes.
|
||||
puffin_capacity: u64,
|
||||
}
|
||||
|
||||
pub(crate) type FileCacheRef = Arc<FileCache>;
|
||||
|
||||
impl FileCache {
|
||||
/// Creates a new file cache.
|
||||
pub(crate) fn new(
|
||||
local_store: ObjectStore,
|
||||
capacity: ReadableSize,
|
||||
ttl: Option<Duration>,
|
||||
index_cache_percent: Option<u8>,
|
||||
) -> FileCache {
|
||||
// Validate and use the provided percent or default
|
||||
let index_percent = index_cache_percent
|
||||
.filter(|&percent| percent > 0 && percent < 100)
|
||||
.unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
|
||||
let total_capacity = capacity.as_bytes();
|
||||
|
||||
// Convert percent to ratio and calculate capacity for each cache
|
||||
let index_ratio = index_percent as f64 / 100.0;
|
||||
let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
|
||||
let parquet_capacity = total_capacity - puffin_capacity;
|
||||
|
||||
// Ensure both capacities are at least 512MB
|
||||
let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
|
||||
let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
|
||||
|
||||
info!(
|
||||
"Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
|
||||
index_percent,
|
||||
ReadableSize(total_capacity),
|
||||
ReadableSize(parquet_capacity),
|
||||
ReadableSize(puffin_capacity)
|
||||
);
|
||||
|
||||
let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
|
||||
let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
|
||||
|
||||
FileCache {
|
||||
local_store,
|
||||
parquet_index,
|
||||
puffin_index,
|
||||
puffin_capacity,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a cache for a specific file type.
|
||||
fn build_cache(
|
||||
local_store: ObjectStore,
|
||||
capacity: u64,
|
||||
ttl: Option<Duration>,
|
||||
label: &'static str,
|
||||
) -> Cache<IndexKey, IndexValue> {
|
||||
let cache_store = local_store;
|
||||
let mut builder = Cache::builder()
|
||||
.eviction_policy(EvictionPolicy::lru())
|
||||
.weigher(|_key, value: &IndexValue| -> u32 {
|
||||
// We only measure space on local store.
|
||||
value.file_size
|
||||
})
|
||||
.max_capacity(capacity)
|
||||
.async_eviction_listener(move |key, value, cause| {
|
||||
let store = cache_store.clone();
|
||||
// Stores files under FILE_DIR.
|
||||
let file_path = cache_file_path(FILE_DIR, *key);
|
||||
async move {
|
||||
if let RemovalCause::Replaced = cause {
|
||||
// The cache is replaced by another file. This is unexpected, we don't remove the same
|
||||
// file but updates the metrics as the file is already replaced by users.
|
||||
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
|
||||
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
|
||||
return;
|
||||
}
|
||||
|
||||
match store.delete(&file_path).await {
|
||||
Ok(()) => {
|
||||
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
});
|
||||
if let Some(ttl) = ttl {
|
||||
builder = builder.time_to_idle(ttl);
|
||||
}
|
||||
builder.build()
|
||||
}
|
||||
|
||||
impl FileCacheInner {
|
||||
/// Returns the appropriate memory index for the given file type.
|
||||
fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
|
||||
match file_type {
|
||||
@@ -166,10 +75,15 @@ impl FileCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the cache file path for the key.
|
||||
fn cache_file_path(&self, key: IndexKey) -> String {
|
||||
cache_file_path(FILE_DIR, key)
|
||||
}
|
||||
|
||||
/// Puts a file into the cache index.
|
||||
///
|
||||
/// The `WriteCache` should ensure the file is in the correct path.
|
||||
pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
|
||||
async fn put(&self, key: IndexKey, value: IndexValue) {
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.add(value.file_size.into());
|
||||
@@ -180,100 +94,8 @@ impl FileCache {
|
||||
index.run_pending_tasks().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
|
||||
self.memory_index(key.file_type).get(&key).await
|
||||
}
|
||||
|
||||
/// Reads a file from the cache.
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
|
||||
// We must use `get()` to update the estimator of the cache.
|
||||
// See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
|
||||
let index = self.memory_index(key.file_type);
|
||||
if index.get(&key).await.is_none() {
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
return None;
|
||||
}
|
||||
|
||||
let file_path = self.cache_file_path(key);
|
||||
match self.get_reader(&file_path).await {
|
||||
Ok(Some(reader)) => {
|
||||
CACHE_HIT
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
return Some(reader);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
warn!(e; "Failed to get file for key {:?}", key);
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
}
|
||||
|
||||
// We removes the file from the index.
|
||||
index.remove(&key).await;
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
|
||||
/// Reads ranges from the cache.
|
||||
pub(crate) async fn read_ranges(
|
||||
&self,
|
||||
key: IndexKey,
|
||||
ranges: &[Range<u64>],
|
||||
) -> Option<Vec<Bytes>> {
|
||||
let index = self.memory_index(key.file_type);
|
||||
if index.get(&key).await.is_none() {
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
return None;
|
||||
}
|
||||
|
||||
let file_path = self.cache_file_path(key);
|
||||
// In most cases, it will use blocking read,
|
||||
// because FileCache is normally based on local file system, which supports blocking read.
|
||||
let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
|
||||
match bytes_result {
|
||||
Ok(bytes) => {
|
||||
CACHE_HIT
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
Some(bytes)
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
warn!(e; "Failed to get file for key {:?}", key);
|
||||
}
|
||||
|
||||
// We removes the file from the index.
|
||||
index.remove(&key).await;
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes a file from the cache explicitly.
|
||||
/// It always tries to remove the file from the local store because we may not have the file
|
||||
/// in the memory index if upload is failed.
|
||||
pub(crate) async fn remove(&self, key: IndexKey) {
|
||||
let file_path = self.cache_file_path(key);
|
||||
self.memory_index(key.file_type).remove(&key).await;
|
||||
// Always delete the file from the local store.
|
||||
if let Err(e) = self.local_store.delete(&file_path).await {
|
||||
warn!(e; "Failed to delete a cached file {}", file_path);
|
||||
}
|
||||
}
|
||||
|
||||
async fn recover_inner(&self) -> Result<()> {
|
||||
/// Recovers the index from local store.
|
||||
async fn recover(&self) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
let mut lister = self
|
||||
.local_store
|
||||
@@ -341,136 +163,7 @@ impl FileCache {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Recovers the index from local store.
|
||||
///
|
||||
/// If `task_receiver` is provided, spawns a background task after recovery
|
||||
/// to process `RegionLoadCacheTask` messages for loading files into the cache.
|
||||
pub(crate) async fn recover(
|
||||
self: &Arc<Self>,
|
||||
sync: bool,
|
||||
task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
|
||||
) {
|
||||
let moved_self = self.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
if let Err(err) = moved_self.recover_inner().await {
|
||||
error!(err; "Failed to recover file cache.")
|
||||
}
|
||||
|
||||
// Spawns background task to process region load cache tasks after recovery.
|
||||
// So it won't block the recovery when `sync` is true.
|
||||
if let Some(mut receiver) = task_receiver {
|
||||
let cache_ref = moved_self.clone();
|
||||
info!("Spawning background task for processing region load cache tasks");
|
||||
tokio::spawn(async move {
|
||||
while let Some(task) = receiver.recv().await {
|
||||
let file_cache = cache_ref.clone();
|
||||
task.fill_cache(file_cache).await;
|
||||
}
|
||||
info!("Background task for processing region load cache tasks stopped");
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
if sync {
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the cache file path for the key.
|
||||
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
|
||||
cache_file_path(FILE_DIR, key)
|
||||
}
|
||||
|
||||
/// Returns the local store of the file cache.
|
||||
pub(crate) fn local_store(&self) -> ObjectStore {
|
||||
self.local_store.clone()
|
||||
}
|
||||
|
||||
/// Get the parquet metadata in file cache.
|
||||
/// If the file is not in the cache or fail to load metadata, return None.
|
||||
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
|
||||
// Check if file cache contains the key
|
||||
if let Some(index_value) = self.parquet_index.get(&key).await {
|
||||
// Load metadata from file cache
|
||||
let local_store = self.local_store();
|
||||
let file_path = self.cache_file_path(key);
|
||||
let file_size = index_value.file_size as u64;
|
||||
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
|
||||
|
||||
match metadata_loader.load().await {
|
||||
Ok(metadata) => {
|
||||
CACHE_HIT
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
Some(metadata)
|
||||
}
|
||||
Err(e) => {
|
||||
if !e.is_object_not_found() {
|
||||
warn!(
|
||||
e; "Failed to get parquet metadata for key {:?}",
|
||||
key
|
||||
);
|
||||
}
|
||||
// We removes the file from the index.
|
||||
self.parquet_index.remove(&key).await;
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
|
||||
if self.local_store.exists(file_path).await? {
|
||||
Ok(Some(self.local_store.reader(file_path).await?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the key is in the file cache.
|
||||
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
|
||||
self.memory_index(key.file_type).contains_key(key)
|
||||
}
|
||||
|
||||
/// Returns the capacity of the puffin (index) cache in bytes.
|
||||
pub(crate) fn puffin_cache_capacity(&self) -> u64 {
|
||||
self.puffin_capacity
|
||||
}
|
||||
|
||||
/// Returns the current weighted size (used bytes) of the puffin (index) cache.
|
||||
pub(crate) fn puffin_cache_size(&self) -> u64 {
|
||||
self.puffin_index.weighted_size()
|
||||
}
|
||||
|
||||
/// Downloads a file in `remote_path` from the remote object store to the local cache
|
||||
/// (specified by `index_key`).
|
||||
pub(crate) async fn download(
|
||||
&self,
|
||||
index_key: IndexKey,
|
||||
remote_path: &str,
|
||||
remote_store: &ObjectStore,
|
||||
file_size: u64,
|
||||
) -> Result<()> {
|
||||
if let Err(e) = self
|
||||
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
|
||||
.await
|
||||
{
|
||||
let filename = index_key.to_string();
|
||||
TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Downloads a file without cleaning up on error.
|
||||
async fn download_without_cleaning(
|
||||
&self,
|
||||
index_key: IndexKey,
|
||||
@@ -537,11 +230,360 @@ impl FileCache {
|
||||
self.put(index_key, index_value).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Downloads a file from remote store to local cache.
|
||||
async fn download(
|
||||
&self,
|
||||
index_key: IndexKey,
|
||||
remote_path: &str,
|
||||
remote_store: &ObjectStore,
|
||||
file_size: u64,
|
||||
) -> Result<()> {
|
||||
if let Err(e) = self
|
||||
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
|
||||
.await
|
||||
{
|
||||
let filename = index_key.to_string();
|
||||
TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A file cache manages files on local store and evict files based
|
||||
/// on size.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct FileCache {
|
||||
/// Inner cache state shared with background worker.
|
||||
inner: Arc<FileCacheInner>,
|
||||
/// Capacity of the puffin (index) cache in bytes.
|
||||
puffin_capacity: u64,
|
||||
}
|
||||
|
||||
pub(crate) type FileCacheRef = Arc<FileCache>;
|
||||
|
||||
impl FileCache {
|
||||
/// Creates a new file cache.
|
||||
pub(crate) fn new(
|
||||
local_store: ObjectStore,
|
||||
capacity: ReadableSize,
|
||||
ttl: Option<Duration>,
|
||||
index_cache_percent: Option<u8>,
|
||||
) -> FileCache {
|
||||
// Validate and use the provided percent or default
|
||||
let index_percent = index_cache_percent
|
||||
.filter(|&percent| percent > 0 && percent < 100)
|
||||
.unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
|
||||
let total_capacity = capacity.as_bytes();
|
||||
|
||||
// Convert percent to ratio and calculate capacity for each cache
|
||||
let index_ratio = index_percent as f64 / 100.0;
|
||||
let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
|
||||
let parquet_capacity = total_capacity - puffin_capacity;
|
||||
|
||||
// Ensure both capacities are at least 512MB
|
||||
let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
|
||||
let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
|
||||
|
||||
info!(
|
||||
"Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
|
||||
index_percent,
|
||||
ReadableSize(total_capacity),
|
||||
ReadableSize(parquet_capacity),
|
||||
ReadableSize(puffin_capacity)
|
||||
);
|
||||
|
||||
let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
|
||||
let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
|
||||
|
||||
// Create inner cache shared with background worker
|
||||
let inner = Arc::new(FileCacheInner {
|
||||
local_store,
|
||||
parquet_index,
|
||||
puffin_index,
|
||||
});
|
||||
|
||||
FileCache {
|
||||
inner,
|
||||
puffin_capacity,
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds a cache for a specific file type.
|
||||
fn build_cache(
|
||||
local_store: ObjectStore,
|
||||
capacity: u64,
|
||||
ttl: Option<Duration>,
|
||||
label: &'static str,
|
||||
) -> Cache<IndexKey, IndexValue> {
|
||||
let cache_store = local_store;
|
||||
let mut builder = Cache::builder()
|
||||
.eviction_policy(EvictionPolicy::lru())
|
||||
.weigher(|_key, value: &IndexValue| -> u32 {
|
||||
// We only measure space on local store.
|
||||
value.file_size
|
||||
})
|
||||
.max_capacity(capacity)
|
||||
.async_eviction_listener(move |key, value, cause| {
|
||||
let store = cache_store.clone();
|
||||
// Stores files under FILE_DIR.
|
||||
let file_path = cache_file_path(FILE_DIR, *key);
|
||||
async move {
|
||||
if let RemovalCause::Replaced = cause {
|
||||
// The cache is replaced by another file. This is unexpected, we don't remove the same
|
||||
// file but updates the metrics as the file is already replaced by users.
|
||||
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
|
||||
// TODO(yingwen): Don't log warn later.
|
||||
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
|
||||
return;
|
||||
}
|
||||
|
||||
match store.delete(&file_path).await {
|
||||
Ok(()) => {
|
||||
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
});
|
||||
if let Some(ttl) = ttl {
|
||||
builder = builder.time_to_idle(ttl);
|
||||
}
|
||||
builder.build()
|
||||
}
|
||||
|
||||
/// Puts a file into the cache index.
|
||||
///
|
||||
/// The `WriteCache` should ensure the file is in the correct path.
|
||||
pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
|
||||
self.inner.put(key, value).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
|
||||
self.inner.memory_index(key.file_type).get(&key).await
|
||||
}
|
||||
|
||||
/// Reads a file from the cache.
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
|
||||
// We must use `get()` to update the estimator of the cache.
|
||||
// See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
|
||||
let index = self.inner.memory_index(key.file_type);
|
||||
if index.get(&key).await.is_none() {
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
return None;
|
||||
}
|
||||
|
||||
let file_path = self.inner.cache_file_path(key);
|
||||
match self.get_reader(&file_path).await {
|
||||
Ok(Some(reader)) => {
|
||||
CACHE_HIT
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
return Some(reader);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
warn!(e; "Failed to get file for key {:?}", key);
|
||||
}
|
||||
}
|
||||
Ok(None) => {}
|
||||
}
|
||||
|
||||
// We removes the file from the index.
|
||||
index.remove(&key).await;
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
|
||||
/// Reads ranges from the cache.
|
||||
pub(crate) async fn read_ranges(
|
||||
&self,
|
||||
key: IndexKey,
|
||||
ranges: &[Range<u64>],
|
||||
) -> Option<Vec<Bytes>> {
|
||||
let index = self.inner.memory_index(key.file_type);
|
||||
if index.get(&key).await.is_none() {
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
return None;
|
||||
}
|
||||
|
||||
let file_path = self.inner.cache_file_path(key);
|
||||
// In most cases, it will use blocking read,
|
||||
// because FileCache is normally based on local file system, which supports blocking read.
|
||||
let bytes_result =
|
||||
fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
|
||||
match bytes_result {
|
||||
Ok(bytes) => {
|
||||
CACHE_HIT
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
Some(bytes)
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() != ErrorKind::NotFound {
|
||||
warn!(e; "Failed to get file for key {:?}", key);
|
||||
}
|
||||
|
||||
// We removes the file from the index.
|
||||
index.remove(&key).await;
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes a file from the cache explicitly.
|
||||
/// It always tries to remove the file from the local store because we may not have the file
|
||||
/// in the memory index if upload is failed.
|
||||
pub(crate) async fn remove(&self, key: IndexKey) {
|
||||
let file_path = self.inner.cache_file_path(key);
|
||||
self.inner.memory_index(key.file_type).remove(&key).await;
|
||||
// Always delete the file from the local store.
|
||||
if let Err(e) = self.inner.local_store.delete(&file_path).await {
|
||||
warn!(e; "Failed to delete a cached file {}", file_path);
|
||||
}
|
||||
}
|
||||
|
||||
/// Recovers the index from local store.
|
||||
///
|
||||
/// If `task_receiver` is provided, spawns a background task after recovery
|
||||
/// to process `RegionLoadCacheTask` messages for loading files into the cache.
|
||||
pub(crate) async fn recover(
|
||||
&self,
|
||||
sync: bool,
|
||||
task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
|
||||
) {
|
||||
let moved_self = self.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
if let Err(err) = moved_self.inner.recover().await {
|
||||
error!(err; "Failed to recover file cache.")
|
||||
}
|
||||
|
||||
// Spawns background task to process region load cache tasks after recovery.
|
||||
// So it won't block the recovery when `sync` is true.
|
||||
if let Some(mut receiver) = task_receiver {
|
||||
info!("Spawning background task for processing region load cache tasks");
|
||||
tokio::spawn(async move {
|
||||
while let Some(task) = receiver.recv().await {
|
||||
task.fill_cache(&moved_self).await;
|
||||
}
|
||||
info!("Background task for processing region load cache tasks stopped");
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
if sync {
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the cache file path for the key.
|
||||
pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
|
||||
self.inner.cache_file_path(key)
|
||||
}
|
||||
|
||||
/// Returns the local store of the file cache.
|
||||
pub(crate) fn local_store(&self) -> ObjectStore {
|
||||
self.inner.local_store.clone()
|
||||
}
|
||||
|
||||
/// Get the parquet metadata in file cache.
|
||||
/// If the file is not in the cache or fail to load metadata, return None.
|
||||
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
|
||||
// Check if file cache contains the key
|
||||
if let Some(index_value) = self.inner.parquet_index.get(&key).await {
|
||||
// Load metadata from file cache
|
||||
let local_store = self.local_store();
|
||||
let file_path = self.inner.cache_file_path(key);
|
||||
let file_size = index_value.file_size as u64;
|
||||
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
|
||||
|
||||
match metadata_loader.load().await {
|
||||
Ok(metadata) => {
|
||||
CACHE_HIT
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
Some(metadata)
|
||||
}
|
||||
Err(e) => {
|
||||
if !e.is_object_not_found() {
|
||||
warn!(
|
||||
e; "Failed to get parquet metadata for key {:?}",
|
||||
key
|
||||
);
|
||||
}
|
||||
// We removes the file from the index.
|
||||
self.inner.parquet_index.remove(&key).await;
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
CACHE_MISS
|
||||
.with_label_values(&[key.file_type.metric_label()])
|
||||
.inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
|
||||
if self.inner.local_store.exists(file_path).await? {
|
||||
Ok(Some(self.inner.local_store.reader(file_path).await?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the key is in the file cache.
|
||||
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
|
||||
self.inner.memory_index(key.file_type).contains_key(key)
|
||||
}
|
||||
|
||||
/// Returns the capacity of the puffin (index) cache in bytes.
|
||||
pub(crate) fn puffin_cache_capacity(&self) -> u64 {
|
||||
self.puffin_capacity
|
||||
}
|
||||
|
||||
/// Returns the current weighted size (used bytes) of the puffin (index) cache.
|
||||
pub(crate) fn puffin_cache_size(&self) -> u64 {
|
||||
self.inner.puffin_index.weighted_size()
|
||||
}
|
||||
|
||||
/// Downloads a file in `remote_path` from the remote object store to the local cache
|
||||
/// (specified by `index_key`).
|
||||
pub(crate) async fn download(
|
||||
&self,
|
||||
index_key: IndexKey,
|
||||
remote_path: &str,
|
||||
remote_store: &ObjectStore,
|
||||
file_size: u64,
|
||||
) -> Result<()> {
|
||||
self.inner
|
||||
.download(index_key, remote_path, remote_store, file_size)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Key of file cache index.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub(crate) struct IndexKey {
|
||||
pub struct IndexKey {
|
||||
pub region_id: RegionId,
|
||||
pub file_id: FileId,
|
||||
pub file_type: FileType,
|
||||
@@ -683,7 +725,7 @@ mod tests {
|
||||
let exist = cache.reader(key).await;
|
||||
assert!(exist.is_some());
|
||||
tokio::time::sleep(Duration::from_millis(15)).await;
|
||||
cache.parquet_index.run_pending_tasks().await;
|
||||
cache.inner.parquet_index.run_pending_tasks().await;
|
||||
let non = cache.reader(key).await;
|
||||
assert!(non.is_none());
|
||||
}
|
||||
@@ -721,19 +763,19 @@ mod tests {
|
||||
assert_eq!("hello", String::from_utf8(buf).unwrap());
|
||||
|
||||
// Get weighted size.
|
||||
cache.parquet_index.run_pending_tasks().await;
|
||||
assert_eq!(5, cache.parquet_index.weighted_size());
|
||||
cache.inner.parquet_index.run_pending_tasks().await;
|
||||
assert_eq!(5, cache.inner.parquet_index.weighted_size());
|
||||
|
||||
// Remove the file.
|
||||
cache.remove(key).await;
|
||||
assert!(cache.reader(key).await.is_none());
|
||||
|
||||
// Ensure all pending tasks of the moka cache is done before assertion.
|
||||
cache.parquet_index.run_pending_tasks().await;
|
||||
cache.inner.parquet_index.run_pending_tasks().await;
|
||||
|
||||
// The file also not exists.
|
||||
assert!(!local_store.exists(&file_path).await.unwrap());
|
||||
assert_eq!(0, cache.parquet_index.weighted_size());
|
||||
assert_eq!(0, cache.inner.parquet_index.weighted_size());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -766,7 +808,7 @@ mod tests {
|
||||
// Reader is none.
|
||||
assert!(cache.reader(key).await.is_none());
|
||||
// Key is removed.
|
||||
assert!(!cache.parquet_index.contains_key(&key));
|
||||
assert!(!cache.inner.parquet_index.contains_key(&key));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -799,12 +841,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// Recover the cache.
|
||||
let cache = Arc::new(FileCache::new(
|
||||
local_store.clone(),
|
||||
ReadableSize::mb(10),
|
||||
None,
|
||||
None,
|
||||
));
|
||||
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
|
||||
// No entry before recovery.
|
||||
assert!(
|
||||
cache
|
||||
@@ -815,8 +852,11 @@ mod tests {
|
||||
cache.recover(true, None).await;
|
||||
|
||||
// Check size.
|
||||
cache.parquet_index.run_pending_tasks().await;
|
||||
assert_eq!(total_size, cache.parquet_index.weighted_size() as usize);
|
||||
cache.inner.parquet_index.run_pending_tasks().await;
|
||||
assert_eq!(
|
||||
total_size,
|
||||
cache.inner.parquet_index.weighted_size() as usize
|
||||
);
|
||||
|
||||
for (i, file_id) in file_ids.iter().enumerate() {
|
||||
let key = IndexKey::new(region_id, *file_id, file_type);
|
||||
|
||||
574
src/mito2/src/cache/manifest_cache.rs
vendored
Normal file
574
src/mito2/src/cache/manifest_cache.rs
vendored
Normal file
@@ -0,0 +1,574 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! A cache for manifest files.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use futures::{FutureExt, TryStreamExt};
|
||||
use moka::future::Cache;
|
||||
use moka::notification::RemovalCause;
|
||||
use moka::policy::EvictionPolicy;
|
||||
use object_store::ObjectStore;
|
||||
use object_store::util::join_path;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{OpenDalSnafu, Result};
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
|
||||
|
||||
/// Subdirectory of cached manifest files.
|
||||
///
|
||||
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
|
||||
const MANIFEST_DIR: &str = "cache/object/manifest/";
|
||||
|
||||
/// Metric label for manifest files.
|
||||
const MANIFEST_TYPE: &str = "manifest";
|
||||
|
||||
/// A manifest cache manages manifest files on local store and evicts files based
|
||||
/// on size.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ManifestCache {
|
||||
/// Local store to cache files.
|
||||
local_store: ObjectStore,
|
||||
/// Index to track cached manifest files.
|
||||
index: Cache<String, IndexValue>,
|
||||
}
|
||||
|
||||
impl ManifestCache {
|
||||
/// Creates a new manifest cache and recovers the index from local store.
|
||||
pub async fn new(
|
||||
local_store: ObjectStore,
|
||||
capacity: ReadableSize,
|
||||
ttl: Option<Duration>,
|
||||
) -> ManifestCache {
|
||||
let total_capacity = capacity.as_bytes();
|
||||
|
||||
info!(
|
||||
"Initializing manifest cache with capacity: {}",
|
||||
ReadableSize(total_capacity)
|
||||
);
|
||||
|
||||
let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
|
||||
|
||||
let cache = ManifestCache { local_store, index };
|
||||
|
||||
// Recovers the cache index from local store asynchronously
|
||||
cache.recover(false).await;
|
||||
|
||||
cache
|
||||
}
|
||||
|
||||
/// Builds the cache.
|
||||
fn build_cache(
|
||||
local_store: ObjectStore,
|
||||
capacity: u64,
|
||||
ttl: Option<Duration>,
|
||||
) -> Cache<String, IndexValue> {
|
||||
let cache_store = local_store;
|
||||
let mut builder = Cache::builder()
|
||||
.eviction_policy(EvictionPolicy::lru())
|
||||
.weigher(|key: &String, value: &IndexValue| -> u32 {
|
||||
key.len() as u32 + value.file_size
|
||||
})
|
||||
.max_capacity(capacity)
|
||||
.async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
|
||||
let store = cache_store.clone();
|
||||
// Stores files under MANIFEST_DIR.
|
||||
let file_path = join_path(MANIFEST_DIR, &key);
|
||||
async move {
|
||||
if let RemovalCause::Replaced = cause {
|
||||
// The cache is replaced by another file. We don't remove the same
|
||||
// file but updates the metrics as the file is already replaced by users.
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[MANIFEST_TYPE])
|
||||
.sub(value.file_size.into());
|
||||
return;
|
||||
}
|
||||
|
||||
match store.delete(&file_path).await {
|
||||
Ok(()) => {
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[MANIFEST_TYPE])
|
||||
.sub(value.file_size.into());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to delete cached manifest file {}", file_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
});
|
||||
if let Some(ttl) = ttl {
|
||||
builder = builder.time_to_idle(ttl);
|
||||
}
|
||||
builder.build()
|
||||
}
|
||||
|
||||
/// Puts a file into the cache index.
|
||||
///
|
||||
/// The caller should ensure the file is in the correct path.
|
||||
pub(crate) async fn put(&self, key: String, value: IndexValue) {
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[MANIFEST_TYPE])
|
||||
.add(value.file_size.into());
|
||||
self.index.insert(key, value).await;
|
||||
|
||||
// Since files can be large items, we run the pending tasks immediately.
|
||||
self.index.run_pending_tasks().await;
|
||||
}
|
||||
|
||||
/// Gets the index value for the key.
|
||||
pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
|
||||
self.index.get(key).await
|
||||
}
|
||||
|
||||
/// Removes a file from the cache explicitly.
|
||||
pub(crate) async fn remove(&self, key: &str) {
|
||||
let file_path = self.cache_file_path(key);
|
||||
self.index.remove(key).await;
|
||||
// Always deletes the file from the local store.
|
||||
if let Err(e) = self.local_store.delete(&file_path).await {
|
||||
warn!(e; "Failed to delete a cached manifest file {}", file_path);
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes multiple files from the cache in batch.
|
||||
pub(crate) async fn remove_batch(&self, keys: &[String]) {
|
||||
if keys.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for key in keys {
|
||||
self.index.remove(key).await;
|
||||
}
|
||||
|
||||
let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
|
||||
|
||||
if let Err(e) = self.local_store.delete_iter(file_paths).await {
|
||||
warn!(e; "Failed to delete cached manifest files in batch");
|
||||
}
|
||||
}
|
||||
|
||||
async fn recover_inner(&self) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
let mut lister = self
|
||||
.local_store
|
||||
.lister_with(MANIFEST_DIR)
|
||||
.recursive(true)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
let (mut total_size, mut total_keys) = (0i64, 0);
|
||||
while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
|
||||
let meta = entry.metadata();
|
||||
if !meta.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let meta = self
|
||||
.local_store
|
||||
.stat(entry.path())
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
let file_size = meta.content_length() as u32;
|
||||
let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
|
||||
common_telemetry::info!("Manifest cache recover {}, size: {}", key, file_size);
|
||||
self.index.insert(key, IndexValue { file_size }).await;
|
||||
let size = i64::from(file_size);
|
||||
total_size += size;
|
||||
total_keys += 1;
|
||||
}
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[MANIFEST_TYPE])
|
||||
.add(total_size);
|
||||
|
||||
// Runs all pending tasks of the moka cache so that the cache size is updated
|
||||
// and the eviction policy is applied.
|
||||
self.index.run_pending_tasks().await;
|
||||
|
||||
let weight = self.index.weighted_size();
|
||||
let count = self.index.entry_count();
|
||||
info!(
|
||||
"Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
|
||||
total_keys,
|
||||
total_size,
|
||||
count,
|
||||
weight,
|
||||
now.elapsed()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Recovers the index from local store.
|
||||
pub(crate) async fn recover(&self, sync: bool) {
|
||||
let moved_self = self.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
if let Err(err) = moved_self.recover_inner().await {
|
||||
error!(err; "Failed to recover manifest cache.")
|
||||
}
|
||||
|
||||
moved_self.clean_empty_dirs(true).await;
|
||||
});
|
||||
|
||||
if sync {
|
||||
let _ = handle.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the cache file path for the key.
|
||||
pub(crate) fn cache_file_path(&self, key: &str) -> String {
|
||||
join_path(MANIFEST_DIR, key)
|
||||
}
|
||||
|
||||
/// Gets a manifest file from cache.
|
||||
/// Returns the file data if found in cache, None otherwise.
|
||||
pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
|
||||
if self.get(key).await.is_none() {
|
||||
CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
|
||||
return None;
|
||||
}
|
||||
|
||||
let cache_file_path = self.cache_file_path(key);
|
||||
match self.local_store.read(&cache_file_path).await {
|
||||
Ok(data) => {
|
||||
CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
|
||||
Some(data.to_vec())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to read cached manifest file {}", cache_file_path);
|
||||
CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Puts a manifest file into cache.
|
||||
pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
|
||||
let cache_file_path = self.cache_file_path(&key);
|
||||
|
||||
if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
|
||||
warn!(e; "Failed to write manifest to cache {}", cache_file_path);
|
||||
return;
|
||||
}
|
||||
|
||||
let file_size = data.len() as u32;
|
||||
self.put(key, IndexValue { file_size }).await;
|
||||
}
|
||||
|
||||
/// Removes empty directories recursively under the manifest cache directory.
|
||||
///
|
||||
/// If `check_mtime` is true, only removes directories that have not been modified
|
||||
/// for at least 1 hour.
|
||||
pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
|
||||
info!("Clean empty dirs start");
|
||||
|
||||
let root = self.local_store.info().root();
|
||||
let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
|
||||
let manifest_dir_clone = manifest_dir.clone();
|
||||
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
|
||||
})
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
info!("Clean empty dirs end");
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to spawn blocking task for cleaning empty directories");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes all manifest files under the given directory from cache and cleans up empty directories.
|
||||
pub(crate) async fn clean_manifests(&self, dir: &str) {
|
||||
info!("Clean manifest cache for directory: {}", dir);
|
||||
|
||||
let cache_dir = join_path(MANIFEST_DIR, dir);
|
||||
let mut lister = match self
|
||||
.local_store
|
||||
.lister_with(&cache_dir)
|
||||
.recursive(true)
|
||||
.await
|
||||
{
|
||||
Ok(lister) => lister,
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to list manifest files under {}", cache_dir);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut keys_to_remove = Vec::new();
|
||||
loop {
|
||||
match lister.try_next().await {
|
||||
Ok(Some(entry)) => {
|
||||
let meta = entry.metadata();
|
||||
if meta.is_file() {
|
||||
keys_to_remove
|
||||
.push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
|
||||
}
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to read entry while listing {}", cache_dir);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Going to remove files from manifest cache, files: {:?}",
|
||||
keys_to_remove
|
||||
);
|
||||
|
||||
// Removes all files from cache in batch
|
||||
self.remove_batch(&keys_to_remove).await;
|
||||
|
||||
// Cleans up empty directories under the given dir
|
||||
let root = self.local_store.info().root();
|
||||
let dir_path = PathBuf::from(root).join(&cache_dir);
|
||||
let dir_path_clone = dir_path.clone();
|
||||
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
Self::clean_empty_dirs_sync(&dir_path_clone, false)
|
||||
})
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
info!("Cleaned manifest cache for directory: {}", dir);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
warn!(e; "Failed to clean empty directories under {}", dir_path.display());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(e; "Failed to spawn blocking task for cleaning empty directories");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Synchronously removes empty directories recursively.
|
||||
///
|
||||
/// If `check_mtime` is true, only removes directories that have not been modified
|
||||
/// for at least 1 hour.
|
||||
fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
|
||||
Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
|
||||
common_telemetry::debug!(
|
||||
"Maybe remove empty dir: {:?}, check_mtime: {}",
|
||||
dir,
|
||||
check_mtime
|
||||
);
|
||||
let entries = match std::fs::read_dir(dir) {
|
||||
Ok(entries) => entries,
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
// Directory doesn't exist, treat as already removed (empty)
|
||||
return Ok(true);
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
let mut is_empty = true;
|
||||
// Iterates all entries under the directory.
|
||||
// We have to check all entries to clean up all empty subdirectories.
|
||||
for entry in entries {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
let metadata = std::fs::metadata(&path)?;
|
||||
|
||||
if metadata.is_dir() {
|
||||
// Checks if we should skip this directory based on modification time
|
||||
if check_mtime
|
||||
&& let Ok(modified) = metadata.modified()
|
||||
&& let Ok(elapsed) = modified.elapsed()
|
||||
&& elapsed < Duration::from_secs(3600)
|
||||
{
|
||||
common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
|
||||
// Only removes if not modified for at least 1 hour.
|
||||
is_empty = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
|
||||
if subdir_empty {
|
||||
if let Err(e) = std::fs::remove_dir(&path)
|
||||
&& e.kind() != std::io::ErrorKind::NotFound
|
||||
{
|
||||
warn!(e; "Failed to remove empty directory {}", path.display());
|
||||
is_empty = false;
|
||||
} else {
|
||||
info!(
|
||||
"Removed empty directory {} from manifest cache",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
} else {
|
||||
is_empty = false;
|
||||
}
|
||||
} else {
|
||||
is_empty = false;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(is_empty)
|
||||
}
|
||||
}
|
||||
|
||||
/// An entity that describes the file in the manifest cache.
|
||||
///
|
||||
/// It should only keep minimal information needed by the cache.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct IndexValue {
|
||||
/// Size of the file in bytes.
|
||||
pub(crate) file_size: u32,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use object_store::services::Fs;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn new_fs_store(path: &str) -> ObjectStore {
|
||||
let builder = Fs::default().root(path);
|
||||
ObjectStore::new(builder).unwrap().finish()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_cache_basic() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let dir = create_temp_dir("");
|
||||
let local_store = new_fs_store(dir.path().to_str().unwrap());
|
||||
|
||||
let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
|
||||
let key = "region_1/manifest/00000000000000000007.json";
|
||||
let file_path = cache.cache_file_path(key);
|
||||
|
||||
// Get an empty file.
|
||||
assert!(cache.get(key).await.is_none());
|
||||
|
||||
// Write a file.
|
||||
local_store
|
||||
.write(&file_path, b"manifest content".as_slice())
|
||||
.await
|
||||
.unwrap();
|
||||
// Add to the cache.
|
||||
cache
|
||||
.put(key.to_string(), IndexValue { file_size: 16 })
|
||||
.await;
|
||||
|
||||
// Get the cached value.
|
||||
let value = cache.get(key).await.unwrap();
|
||||
assert_eq!(16, value.file_size);
|
||||
|
||||
// Get weighted size.
|
||||
cache.index.run_pending_tasks().await;
|
||||
assert_eq!(59, cache.index.weighted_size());
|
||||
|
||||
// Remove the file.
|
||||
cache.remove(key).await;
|
||||
cache.index.run_pending_tasks().await;
|
||||
assert!(cache.get(key).await.is_none());
|
||||
|
||||
// Ensure all pending tasks of the moka cache is done before assertion.
|
||||
cache.index.run_pending_tasks().await;
|
||||
|
||||
// The file also not exists.
|
||||
assert!(!local_store.exists(&file_path).await.unwrap());
|
||||
assert_eq!(0, cache.index.weighted_size());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manifest_cache_recover() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let dir = create_temp_dir("");
|
||||
let local_store = new_fs_store(dir.path().to_str().unwrap());
|
||||
let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
|
||||
|
||||
// Write some manifest files with different paths
|
||||
let keys = [
|
||||
"region_1/manifest/00000000000000000001.json",
|
||||
"region_1/manifest/00000000000000000002.json",
|
||||
"region_1/manifest/00000000000000000001.checkpoint",
|
||||
"region_2/manifest/00000000000000000001.json",
|
||||
];
|
||||
|
||||
let mut total_size = 0;
|
||||
for (i, key) in keys.iter().enumerate() {
|
||||
let file_path = cache.cache_file_path(key);
|
||||
let content = format!("manifest-{}", i).into_bytes();
|
||||
local_store
|
||||
.write(&file_path, content.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Add to the cache.
|
||||
cache
|
||||
.put(
|
||||
key.to_string(),
|
||||
IndexValue {
|
||||
file_size: content.len() as u32,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
total_size += content.len() + key.len();
|
||||
}
|
||||
|
||||
// Create a new cache instance which will automatically recover from local store
|
||||
let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
|
||||
|
||||
// Wait for recovery to complete synchronously
|
||||
cache.recover(true).await;
|
||||
|
||||
// Check size.
|
||||
cache.index.run_pending_tasks().await;
|
||||
let total_cached = cache.index.weighted_size() as usize;
|
||||
assert_eq!(total_size, total_cached);
|
||||
|
||||
// Verify all files
|
||||
for (i, key) in keys.iter().enumerate() {
|
||||
let value = cache.get(key).await.unwrap();
|
||||
assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cache_file_path() {
|
||||
let dir = create_temp_dir("");
|
||||
let local_store = new_fs_store(dir.path().to_str().unwrap());
|
||||
let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None).await;
|
||||
|
||||
assert_eq!(
|
||||
"cache/object/manifest/region_1/manifest/00000000000000000007.json",
|
||||
cache.cache_file_path("region_1/manifest/00000000000000000007.json")
|
||||
);
|
||||
assert_eq!(
|
||||
"cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
|
||||
cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
|
||||
);
|
||||
}
|
||||
}
|
||||
20
src/mito2/src/cache/write_cache.rs
vendored
20
src/mito2/src/cache/write_cache.rs
vendored
@@ -30,6 +30,7 @@ use crate::access_layer::{
|
||||
TempFileCleaner, WriteCachePathProvider, WriteType, new_fs_cache_store,
|
||||
};
|
||||
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
|
||||
use crate::cache::manifest_cache::ManifestCache;
|
||||
use crate::error::{self, Result};
|
||||
use crate::metrics::UPLOAD_BYTES_TOTAL;
|
||||
use crate::region::opener::RegionLoadCacheTask;
|
||||
@@ -53,6 +54,8 @@ pub struct WriteCache {
|
||||
intermediate_manager: IntermediateManager,
|
||||
/// Sender for region load cache tasks.
|
||||
task_sender: UnboundedSender<RegionLoadCacheTask>,
|
||||
/// Optional cache for manifest files.
|
||||
manifest_cache: Option<ManifestCache>,
|
||||
}
|
||||
|
||||
pub type WriteCacheRef = Arc<WriteCache>;
|
||||
@@ -67,6 +70,7 @@ impl WriteCache {
|
||||
index_cache_percent: Option<u8>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
manifest_cache: Option<ManifestCache>,
|
||||
) -> Result<Self> {
|
||||
let (task_sender, task_receiver) = unbounded_channel();
|
||||
|
||||
@@ -83,6 +87,7 @@ impl WriteCache {
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
task_sender,
|
||||
manifest_cache,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -94,10 +99,19 @@ impl WriteCache {
|
||||
index_cache_percent: Option<u8>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
intermediate_manager: IntermediateManager,
|
||||
manifest_cache_capacity: ReadableSize,
|
||||
) -> Result<Self> {
|
||||
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
|
||||
|
||||
let local_store = new_fs_cache_store(cache_dir).await?;
|
||||
|
||||
// Create manifest cache if capacity is non-zero
|
||||
let manifest_cache = if manifest_cache_capacity.as_bytes() > 0 {
|
||||
Some(ManifestCache::new(local_store.clone(), manifest_cache_capacity, ttl).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Self::new(
|
||||
local_store,
|
||||
cache_capacity,
|
||||
@@ -105,6 +119,7 @@ impl WriteCache {
|
||||
index_cache_percent,
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
manifest_cache,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -114,6 +129,11 @@ impl WriteCache {
|
||||
self.file_cache.clone()
|
||||
}
|
||||
|
||||
/// Returns the manifest cache if available.
|
||||
pub(crate) fn manifest_cache(&self) -> Option<ManifestCache> {
|
||||
self.manifest_cache.clone()
|
||||
}
|
||||
|
||||
/// Build the puffin manager
|
||||
pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
|
||||
let store = self.file_cache.local_store();
|
||||
|
||||
@@ -1110,6 +1110,7 @@ mod tests {
|
||||
compress_type: CompressionType::Uncompressed,
|
||||
checkpoint_distance: 10,
|
||||
remove_file_options: Default::default(),
|
||||
manifest_cache: None,
|
||||
},
|
||||
FormatType::PrimaryKey,
|
||||
&Default::default(),
|
||||
|
||||
@@ -24,6 +24,7 @@ use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::FileId;
|
||||
use store_api::{MAX_VERSION, MIN_VERSION, ManifestVersion};
|
||||
|
||||
use crate::cache::manifest_cache::ManifestCache;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::{
|
||||
self, InstallManifestToSnafu, NoCheckpointSnafu, NoManifestsSnafu, RegionStoppedSnafu, Result,
|
||||
@@ -52,6 +53,8 @@ pub struct RegionManifestOptions {
|
||||
/// Set to 0 to disable checkpoint.
|
||||
pub checkpoint_distance: u64,
|
||||
pub remove_file_options: RemoveFileOptions,
|
||||
/// Optional cache for manifest files.
|
||||
pub manifest_cache: Option<ManifestCache>,
|
||||
}
|
||||
|
||||
impl RegionManifestOptions {
|
||||
@@ -67,6 +70,7 @@ impl RegionManifestOptions {
|
||||
remove_file_options: RemoveFileOptions {
|
||||
enable_gc: config.gc.enable,
|
||||
},
|
||||
manifest_cache: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -174,6 +178,7 @@ impl RegionManifestManager {
|
||||
options.object_store.clone(),
|
||||
options.compress_type,
|
||||
stats.total_manifest_size.clone(),
|
||||
options.manifest_cache.clone(),
|
||||
);
|
||||
let manifest_version = stats.manifest_version.clone();
|
||||
|
||||
@@ -256,6 +261,7 @@ impl RegionManifestManager {
|
||||
options.object_store.clone(),
|
||||
options.compress_type,
|
||||
stats.total_manifest_size.clone(),
|
||||
options.manifest_cache.clone(),
|
||||
);
|
||||
let manifest_version = stats.manifest_version.clone();
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ use store_api::ManifestVersion;
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::cache::manifest_cache::ManifestCache;
|
||||
use crate::error::{
|
||||
ChecksumMismatchSnafu, CompressObjectSnafu, DecompressObjectSnafu, InvalidScanIndexSnafu,
|
||||
OpenDalSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
|
||||
@@ -144,6 +145,8 @@ pub struct ManifestObjectStore {
|
||||
/// Stores the size of each manifest file.
|
||||
manifest_size_map: Arc<RwLock<HashMap<FileKey, u64>>>,
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
/// Optional manifest cache for local caching.
|
||||
manifest_cache: Option<ManifestCache>,
|
||||
}
|
||||
|
||||
impl ManifestObjectStore {
|
||||
@@ -152,6 +155,7 @@ impl ManifestObjectStore {
|
||||
object_store: ObjectStore,
|
||||
compress_type: CompressionType,
|
||||
total_manifest_size: Arc<AtomicU64>,
|
||||
manifest_cache: Option<ManifestCache>,
|
||||
) -> Self {
|
||||
let path = util::normalize_dir(path);
|
||||
let staging_path = {
|
||||
@@ -166,6 +170,7 @@ impl ManifestObjectStore {
|
||||
staging_path,
|
||||
manifest_size_map: Arc::new(RwLock::new(HashMap::new())),
|
||||
total_manifest_size,
|
||||
manifest_cache,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -291,9 +296,11 @@ impl ManifestObjectStore {
|
||||
}
|
||||
|
||||
/// Common implementation for fetching manifests from entries in parallel.
|
||||
/// If `is_staging` is true, cache is skipped.
|
||||
async fn fetch_manifests_from_entries(
|
||||
&self,
|
||||
entries: Vec<(ManifestVersion, Entry)>,
|
||||
is_staging: bool,
|
||||
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
|
||||
if entries.is_empty() {
|
||||
return Ok(vec![]);
|
||||
@@ -306,6 +313,13 @@ impl ManifestObjectStore {
|
||||
// Safety: semaphore must exist.
|
||||
let _permit = semaphore.acquire().await.unwrap();
|
||||
|
||||
let cache_key = entry.path();
|
||||
// Try to get from cache first
|
||||
if let Some(data) = self.get_from_cache(cache_key, is_staging).await {
|
||||
return Ok((*v, data));
|
||||
}
|
||||
|
||||
// Fetch from remote object store
|
||||
let compress_type = file_compress_type(entry.name());
|
||||
let bytes = self
|
||||
.object_store
|
||||
@@ -319,6 +333,11 @@ impl ManifestObjectStore {
|
||||
compress_type,
|
||||
path: entry.path(),
|
||||
})?;
|
||||
|
||||
// Add to cache
|
||||
self.put_to_cache(cache_key.to_string(), &data, is_staging)
|
||||
.await;
|
||||
|
||||
Ok((*v, data))
|
||||
});
|
||||
|
||||
@@ -335,7 +354,7 @@ impl ManifestObjectStore {
|
||||
end_version: ManifestVersion,
|
||||
) -> Result<Vec<(ManifestVersion, Vec<u8>)>> {
|
||||
let manifests = self.scan(start_version, end_version).await?;
|
||||
self.fetch_manifests_from_entries(manifests).await
|
||||
self.fetch_manifests_from_entries(manifests, false).await
|
||||
}
|
||||
|
||||
/// Delete manifest files that version < end.
|
||||
@@ -405,6 +424,11 @@ impl ManifestObjectStore {
|
||||
ret, self.path, end, checkpoint_version, paths,
|
||||
);
|
||||
|
||||
// Remove from cache first
|
||||
for (entry, _, _) in &del_entries {
|
||||
self.remove_from_cache(entry.path()).await;
|
||||
}
|
||||
|
||||
self.object_store
|
||||
.delete_iter(paths)
|
||||
.await
|
||||
@@ -440,11 +464,10 @@ impl ManifestObjectStore {
|
||||
path: &path,
|
||||
})?;
|
||||
let delta_size = data.len();
|
||||
self.object_store
|
||||
.write(&path, data)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
|
||||
self.write_and_put_cache(&path, data, is_staging).await?;
|
||||
self.set_delta_file_size(version, delta_size as u64);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -465,10 +488,8 @@ impl ManifestObjectStore {
|
||||
})?;
|
||||
let checkpoint_size = data.len();
|
||||
let checksum = checkpoint_checksum(bytes);
|
||||
self.object_store
|
||||
.write(&path, data)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
|
||||
self.write_and_put_cache(&path, data, false).await?;
|
||||
self.set_checkpoint_file_size(version, checkpoint_size as u64);
|
||||
|
||||
// Because last checkpoint file only contain size and version, which is tiny, so we don't compress it.
|
||||
@@ -501,60 +522,80 @@ impl ManifestObjectStore {
|
||||
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
|
||||
let version = metadata.version;
|
||||
let path = self.checkpoint_file_path(version);
|
||||
|
||||
// Try to get from cache first
|
||||
if let Some(data) = self.get_from_cache(&path, false).await {
|
||||
verify_checksum(&data, metadata.checksum)?;
|
||||
return Ok(Some((version, data)));
|
||||
}
|
||||
|
||||
// Due to backward compatibility, it is possible that the user's checkpoint not compressed,
|
||||
// so if we don't find file by compressed type. fall back to checkpoint not compressed find again.
|
||||
let checkpoint_data =
|
||||
match self.object_store.read(&path).await {
|
||||
Ok(checkpoint) => {
|
||||
let checkpoint_size = checkpoint.len();
|
||||
let decompress_data = self.compress_type.decode(checkpoint).await.context(
|
||||
DecompressObjectSnafu {
|
||||
let checkpoint_data = match self.object_store.read(&path).await {
|
||||
Ok(checkpoint) => {
|
||||
let checkpoint_size = checkpoint.len();
|
||||
let decompress_data =
|
||||
self.compress_type
|
||||
.decode(checkpoint)
|
||||
.await
|
||||
.with_context(|_| DecompressObjectSnafu {
|
||||
compress_type: self.compress_type,
|
||||
path,
|
||||
},
|
||||
)?;
|
||||
verify_checksum(&decompress_data, metadata.checksum)?;
|
||||
// set the checkpoint size
|
||||
self.set_checkpoint_file_size(version, checkpoint_size as u64);
|
||||
Ok(Some(decompress_data))
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
|
||||
let fall_back_path = gen_path(
|
||||
&self.path,
|
||||
&checkpoint_file(version),
|
||||
FALL_BACK_COMPRESS_TYPE,
|
||||
);
|
||||
debug!(
|
||||
"Failed to load checkpoint from path: {}, fall back to path: {}",
|
||||
path, fall_back_path
|
||||
);
|
||||
match self.object_store.read(&fall_back_path).await {
|
||||
Ok(checkpoint) => {
|
||||
let checkpoint_size = checkpoint.len();
|
||||
let decompress_data = FALL_BACK_COMPRESS_TYPE
|
||||
.decode(checkpoint)
|
||||
.await
|
||||
.context(DecompressObjectSnafu {
|
||||
compress_type: FALL_BACK_COMPRESS_TYPE,
|
||||
path,
|
||||
})?;
|
||||
verify_checksum(&decompress_data, metadata.checksum)?;
|
||||
self.set_checkpoint_file_size(version, checkpoint_size as u64);
|
||||
Ok(Some(decompress_data))
|
||||
}
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e).context(OpenDalSnafu),
|
||||
path: path.clone(),
|
||||
})?;
|
||||
verify_checksum(&decompress_data, metadata.checksum)?;
|
||||
// set the checkpoint size
|
||||
self.set_checkpoint_file_size(version, checkpoint_size as u64);
|
||||
// Add to cache
|
||||
self.put_to_cache(path, &decompress_data, false).await;
|
||||
Ok(Some(decompress_data))
|
||||
}
|
||||
Err(e) => {
|
||||
if e.kind() == ErrorKind::NotFound {
|
||||
if self.compress_type != FALL_BACK_COMPRESS_TYPE {
|
||||
let fall_back_path = gen_path(
|
||||
&self.path,
|
||||
&checkpoint_file(version),
|
||||
FALL_BACK_COMPRESS_TYPE,
|
||||
);
|
||||
debug!(
|
||||
"Failed to load checkpoint from path: {}, fall back to path: {}",
|
||||
path, fall_back_path
|
||||
);
|
||||
|
||||
// Try to get fallback from cache first
|
||||
if let Some(data) = self.get_from_cache(&fall_back_path, false).await {
|
||||
verify_checksum(&data, metadata.checksum)?;
|
||||
return Ok(Some((version, data)));
|
||||
}
|
||||
|
||||
match self.object_store.read(&fall_back_path).await {
|
||||
Ok(checkpoint) => {
|
||||
let checkpoint_size = checkpoint.len();
|
||||
let decompress_data = FALL_BACK_COMPRESS_TYPE
|
||||
.decode(checkpoint)
|
||||
.await
|
||||
.with_context(|_| DecompressObjectSnafu {
|
||||
compress_type: FALL_BACK_COMPRESS_TYPE,
|
||||
path: fall_back_path.clone(),
|
||||
})?;
|
||||
verify_checksum(&decompress_data, metadata.checksum)?;
|
||||
self.set_checkpoint_file_size(version, checkpoint_size as u64);
|
||||
// Add fallback to cache
|
||||
self.put_to_cache(fall_back_path, &decompress_data, false)
|
||||
.await;
|
||||
Ok(Some(decompress_data))
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
|
||||
Err(e) => Err(e).context(OpenDalSnafu),
|
||||
}
|
||||
} else {
|
||||
Err(e).context(OpenDalSnafu)
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Err(e).context(OpenDalSnafu)
|
||||
}
|
||||
}?;
|
||||
}
|
||||
}?;
|
||||
Ok(checkpoint_data.map(|data| (version, data)))
|
||||
}
|
||||
|
||||
@@ -562,8 +603,10 @@ impl ManifestObjectStore {
|
||||
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
|
||||
pub async fn load_last_checkpoint(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
|
||||
let last_checkpoint_path = self.last_checkpoint_path();
|
||||
|
||||
// Fetch from remote object store without cache
|
||||
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
|
||||
Ok(data) => data,
|
||||
Ok(data) => data.to_vec(),
|
||||
Err(e) if e.kind() == ErrorKind::NotFound => {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -572,7 +615,7 @@ impl ManifestObjectStore {
|
||||
}
|
||||
};
|
||||
|
||||
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data.to_vec())?;
|
||||
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
|
||||
|
||||
debug!(
|
||||
"Load checkpoint in path: {}, metadata: {:?}",
|
||||
@@ -702,7 +745,8 @@ impl ManifestObjectStore {
|
||||
let mut sorted_entries = manifest_entries;
|
||||
Self::sort_manifests(&mut sorted_entries);
|
||||
|
||||
self.fetch_manifests_from_entries(sorted_entries).await
|
||||
self.fetch_manifests_from_entries(sorted_entries, true)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Clear all staging manifest files.
|
||||
@@ -719,6 +763,63 @@ impl ManifestObjectStore {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets a manifest file from cache.
|
||||
/// Returns the file data if found in cache, None otherwise.
|
||||
/// If `is_staging` is true, always returns None.
|
||||
async fn get_from_cache(&self, key: &str, is_staging: bool) -> Option<Vec<u8>> {
|
||||
if is_staging {
|
||||
return None;
|
||||
}
|
||||
let cache = self.manifest_cache.as_ref()?;
|
||||
cache.get_file(key).await
|
||||
}
|
||||
|
||||
/// Puts a manifest file into cache.
|
||||
/// If `is_staging` is true, does nothing.
|
||||
async fn put_to_cache(&self, key: String, data: &[u8], is_staging: bool) {
|
||||
if is_staging {
|
||||
return;
|
||||
}
|
||||
let Some(cache) = &self.manifest_cache else {
|
||||
return;
|
||||
};
|
||||
|
||||
cache.put_file(key, data.to_vec()).await;
|
||||
}
|
||||
|
||||
/// Writes data to object store and puts it into cache.
|
||||
/// If `is_staging` is true, cache is skipped.
|
||||
async fn write_and_put_cache(&self, path: &str, data: Vec<u8>, is_staging: bool) -> Result<()> {
|
||||
// Clone data for cache before writing, only if cache is enabled and not staging
|
||||
let cache_data = if !is_staging && self.manifest_cache.is_some() {
|
||||
Some(data.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Write to object store
|
||||
self.object_store
|
||||
.write(path, data)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
|
||||
// Put to cache if we cloned the data
|
||||
if let Some(data) = cache_data {
|
||||
self.put_to_cache(path.to_string(), &data, is_staging).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes a manifest file from cache.
|
||||
async fn remove_from_cache(&self, key: &str) {
|
||||
let Some(cache) = &self.manifest_cache else {
|
||||
return;
|
||||
};
|
||||
|
||||
cache.remove(key).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -762,6 +863,7 @@ mod tests {
|
||||
object_store,
|
||||
CompressionType::Uncompressed,
|
||||
Default::default(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -1355,6 +1355,7 @@ mod tests {
|
||||
compress_type: CompressionType::Uncompressed,
|
||||
checkpoint_distance: 10,
|
||||
remove_file_options: Default::default(),
|
||||
manifest_cache: None,
|
||||
},
|
||||
FormatType::PrimaryKey,
|
||||
&Default::default(),
|
||||
@@ -1421,6 +1422,7 @@ mod tests {
|
||||
compress_type: CompressionType::Uncompressed,
|
||||
checkpoint_distance: 10,
|
||||
remove_file_options: Default::default(),
|
||||
manifest_cache: None,
|
||||
},
|
||||
FormatType::PrimaryKey,
|
||||
&Default::default(),
|
||||
|
||||
@@ -41,7 +41,7 @@ use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::access_layer::AccessLayer;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::file_cache::{FileCache, FileType, IndexKey};
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
@@ -270,8 +270,14 @@ impl RegionOpener {
|
||||
FormatType::PrimaryKey
|
||||
};
|
||||
// Create a manifest manager for this region and writes regions to the manifest file.
|
||||
let region_manifest_options =
|
||||
let mut region_manifest_options =
|
||||
RegionManifestOptions::new(config, ®ion_dir, &object_store);
|
||||
// Set manifest cache if available
|
||||
region_manifest_options.manifest_cache = self
|
||||
.cache_manager
|
||||
.as_ref()
|
||||
.and_then(|cm| cm.write_cache())
|
||||
.and_then(|wc| wc.manifest_cache());
|
||||
// For remote WAL, we need to set flushed_entry_id to current topic's latest entry id.
|
||||
let flushed_entry_id = provider.initial_flushed_entry_id::<S>(wal.store());
|
||||
let manifest_manager = RegionManifestManager::new(
|
||||
@@ -407,8 +413,14 @@ impl RegionOpener {
|
||||
let now = Instant::now();
|
||||
let mut region_options = self.options.as_ref().unwrap().clone();
|
||||
let object_storage = get_object_store(®ion_options.storage, &self.object_store_manager)?;
|
||||
let region_manifest_options =
|
||||
let mut region_manifest_options =
|
||||
RegionManifestOptions::new(config, &self.region_dir(), &object_storage);
|
||||
// Set manifest cache if available
|
||||
region_manifest_options.manifest_cache = self
|
||||
.cache_manager
|
||||
.as_ref()
|
||||
.and_then(|cm| cm.write_cache())
|
||||
.and_then(|wc| wc.manifest_cache());
|
||||
let Some(manifest_manager) =
|
||||
RegionManifestManager::open(region_manifest_options, &self.stats).await?
|
||||
else {
|
||||
@@ -836,7 +848,7 @@ impl RegionLoadCacheTask {
|
||||
}
|
||||
|
||||
/// Fills the file cache with index files from the region.
|
||||
pub(crate) async fn fill_cache(&self, file_cache: FileCacheRef) {
|
||||
pub(crate) async fn fill_cache(&self, file_cache: &FileCache) {
|
||||
let region_id = self.region.region_id;
|
||||
let table_dir = self.region.access_layer.table_dir();
|
||||
let path_type = self.region.access_layer.path_type();
|
||||
|
||||
@@ -1750,6 +1750,7 @@ mod tests {
|
||||
None,
|
||||
factory,
|
||||
intm_manager,
|
||||
ReadableSize::mb(10),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
|
||||
@@ -626,6 +626,7 @@ impl TestEnv {
|
||||
compress_type,
|
||||
checkpoint_distance,
|
||||
remove_file_options: Default::default(),
|
||||
manifest_cache: None,
|
||||
};
|
||||
|
||||
if let Some(metadata) = initial_metadata {
|
||||
@@ -656,6 +657,7 @@ impl TestEnv {
|
||||
None,
|
||||
self.puffin_manager.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
None, // manifest_cache
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -676,6 +678,7 @@ impl TestEnv {
|
||||
None,
|
||||
self.puffin_manager.clone(),
|
||||
self.intermediate_manager.clone(),
|
||||
ReadableSize::mb(0), // manifest_cache_capacity
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -132,6 +132,7 @@ impl SchedulerEnv {
|
||||
compress_type: CompressionType::Uncompressed,
|
||||
checkpoint_distance: 10,
|
||||
remove_file_options: Default::default(),
|
||||
manifest_cache: None,
|
||||
},
|
||||
FormatType::PrimaryKey,
|
||||
&Default::default(),
|
||||
|
||||
@@ -37,6 +37,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::Plugins;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::key::SchemaMetadataManagerRef;
|
||||
use common_runtime::JoinHandle;
|
||||
@@ -455,6 +456,8 @@ pub async fn write_cache_from_config(
|
||||
Some(config.index_cache_percent),
|
||||
puffin_manager_factory,
|
||||
intermediate_manager,
|
||||
// TODO(yingwen): Enable manifest cache after removing read cache.
|
||||
ReadableSize(0),
|
||||
)
|
||||
.await?;
|
||||
Ok(Some(Arc::new(cache)))
|
||||
|
||||
@@ -51,6 +51,7 @@ where
|
||||
// Writes dropping marker
|
||||
// We rarely drop a region so we still operate in the worker loop.
|
||||
let region_dir = region.access_layer.build_region_dir(region_id);
|
||||
let table_dir = region.access_layer.table_dir().to_string();
|
||||
let marker_path = join_path(®ion_dir, DROPPING_MARKER_FILE);
|
||||
region
|
||||
.access_layer
|
||||
@@ -102,13 +103,14 @@ where
|
||||
let dropping_regions = self.dropping_regions.clone();
|
||||
let listener = self.listener.clone();
|
||||
let intm_manager = self.intermediate_manager.clone();
|
||||
let cache_manager = self.cache_manager.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let gc_duration = listener
|
||||
.on_later_drop_begin(region_id)
|
||||
.unwrap_or(Duration::from_secs(GC_TASK_INTERVAL_SEC));
|
||||
let removed = later_drop_task(
|
||||
region_id,
|
||||
region_dir,
|
||||
region_dir.clone(),
|
||||
object_store,
|
||||
dropping_regions,
|
||||
gc_duration,
|
||||
@@ -117,6 +119,16 @@ where
|
||||
if let Err(err) = intm_manager.prune_region_dir(®ion_id).await {
|
||||
warn!(err; "Failed to prune intermediate region directory, region_id: {}", region_id);
|
||||
}
|
||||
|
||||
// Clean manifest cache for the region
|
||||
if let Some(write_cache) = cache_manager.write_cache()
|
||||
&& let Some(manifest_cache) = write_cache.manifest_cache()
|
||||
{
|
||||
// We pass the table dir so we can remove the table dir in manifest cache
|
||||
// when the last region in the same host is dropped.
|
||||
manifest_cache.clean_manifests(&table_dir).await;
|
||||
}
|
||||
|
||||
listener.on_later_drop_end(region_id, removed);
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user