feat: remove files from the write cache in purger (#4655)

* feat: remove files from the write cache in purger

* chore: fix typo
This commit is contained in:
Yingwen
2024-08-31 12:19:52 +08:00
committed by GitHub
parent 68b59e0e5e
commit 8eda36bfe3
5 changed files with 39 additions and 4 deletions

View File

@@ -313,7 +313,7 @@ struct SystemCatalog {
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
// system_schema_provier for default catalog
// system_schema_provider for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,

View File

@@ -860,7 +860,7 @@ impl RegionServerInner {
// complains "higher-ranked lifetime error". Rust can't prove some future is legit.
// Possible related issue: https://github.com/rust-lang/rust/issues/102211
//
// The walkaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// The workaround is to put the async functions in the `common_runtime::spawn_global`. Or like
// it here, collect the values first then use later separately.
let regions = self

View File

@@ -179,7 +179,6 @@ impl FileCache {
}
}
#[allow(unused)]
/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);

View File

@@ -169,6 +169,11 @@ impl WriteCache {
Ok(Some(sst_info))
}
/// Removes a file from the cache by `index_key`.
pub(crate) async fn remove(&self, index_key: IndexKey) {
self.file_cache.remove(index_key).await
}
/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`).
pub(crate) async fn download(
@@ -424,6 +429,13 @@ mod tests {
.await
.unwrap();
assert_eq!(remote_index_data.to_vec(), cache_index_data.to_vec());
// Removes the file from the cache.
let sst_index_key = IndexKey::new(region_id, file_id, FileType::Parquet);
write_cache.remove(sst_index_key).await;
assert!(!write_cache.file_cache.contains_key(&sst_index_key));
write_cache.remove(index_key).await;
assert!(!write_cache.file_cache.contains_key(&index_key));
}
#[tokio::test]

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use common_telemetry::{error, info};
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileMeta;
@@ -77,9 +78,10 @@ impl FilePurger for LocalFilePurger {
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
}
let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
} else {
info!(
@@ -87,6 +89,28 @@ impl FilePurger for LocalFilePurger {
file_meta.file_id, file_meta.region_id
);
}
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
{
// Removes the inverted index from the cache.
if file_meta.inverted_index_available() {
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Puffin,
))
.await;
}
// Remove the SST file from the cache.
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Parquet,
))
.await;
}
})) {
error!(e; "Failed to schedule the file purge request");
}