mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
fix: remove cached and uploaded files on failure (#5590)
This commit is contained in:
3
src/mito2/src/cache/file_cache.rs
vendored
3
src/mito2/src/cache/file_cache.rs
vendored
@@ -187,9 +187,12 @@ impl FileCache {
|
||||
}
|
||||
|
||||
/// Removes a file from the cache explicitly.
|
||||
/// It always tries to remove the file from the local store because we may not have the file
|
||||
/// in the memory index if upload is failed.
|
||||
pub(crate) async fn remove(&self, key: IndexKey) {
|
||||
let file_path = self.cache_file_path(key);
|
||||
self.memory_index.remove(&key).await;
|
||||
// Always delete the file from the local store.
|
||||
if let Err(e) = self.local_store.delete(&file_path).await {
|
||||
warn!(e; "Failed to delete a cached file {}", file_path);
|
||||
}
|
||||
|
||||
81
src/mito2/src/cache/write_cache.rs
vendored
81
src/mito2/src/cache/write_cache.rs
vendored
@@ -22,6 +22,7 @@ use common_telemetry::{debug, info};
|
||||
use futures::AsyncWriteExt;
|
||||
use object_store::ObjectStore;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::access_layer::{
|
||||
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
|
||||
@@ -149,24 +150,41 @@ impl WriteCache {
|
||||
return Ok(sst_info);
|
||||
}
|
||||
|
||||
let mut upload_tracker = UploadTracker::new(region_id);
|
||||
let mut err = None;
|
||||
let remote_store = &upload_request.remote_store;
|
||||
for sst in &sst_info {
|
||||
let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet);
|
||||
let parquet_path = upload_request
|
||||
.dest_path_provider
|
||||
.build_sst_file_path(sst.file_id);
|
||||
self.upload(parquet_key, &parquet_path, remote_store)
|
||||
.await?;
|
||||
if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await {
|
||||
err = Some(e);
|
||||
break;
|
||||
}
|
||||
upload_tracker.push_uploaded_file(parquet_path);
|
||||
|
||||
if sst.index_metadata.file_size > 0 {
|
||||
let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin);
|
||||
let puffin_path = &upload_request
|
||||
let puffin_path = upload_request
|
||||
.dest_path_provider
|
||||
.build_index_file_path(sst.file_id);
|
||||
self.upload(puffin_key, puffin_path, remote_store).await?;
|
||||
if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await {
|
||||
err = Some(e);
|
||||
break;
|
||||
}
|
||||
upload_tracker.push_uploaded_file(puffin_path);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(err) = err {
|
||||
// Cleans files on failure.
|
||||
upload_tracker
|
||||
.clean(&sst_info, &self.file_cache, remote_store)
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(sst_info)
|
||||
}
|
||||
|
||||
@@ -332,6 +350,61 @@ pub struct SstUploadRequest {
|
||||
pub remote_store: ObjectStore,
|
||||
}
|
||||
|
||||
/// A structs to track files to upload and clean them if upload failed.
|
||||
struct UploadTracker {
|
||||
/// Id of the region to track.
|
||||
region_id: RegionId,
|
||||
/// Paths of files uploaded successfully.
|
||||
files_uploaded: Vec<String>,
|
||||
}
|
||||
|
||||
impl UploadTracker {
|
||||
/// Creates a new instance of `UploadTracker` for a given region.
|
||||
fn new(region_id: RegionId) -> Self {
|
||||
Self {
|
||||
region_id,
|
||||
files_uploaded: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a file path to the list of uploaded files.
|
||||
fn push_uploaded_file(&mut self, path: String) {
|
||||
self.files_uploaded.push(path);
|
||||
}
|
||||
|
||||
/// Cleans uploaded files and files in the file cache at best effort.
|
||||
async fn clean(
|
||||
&self,
|
||||
sst_info: &SstInfoArray,
|
||||
file_cache: &FileCacheRef,
|
||||
remote_store: &ObjectStore,
|
||||
) {
|
||||
common_telemetry::info!(
|
||||
"Start cleaning files on upload failure, region: {}, num_ssts: {}",
|
||||
self.region_id,
|
||||
sst_info.len()
|
||||
);
|
||||
|
||||
// Cleans files in the file cache first.
|
||||
for sst in sst_info {
|
||||
let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet);
|
||||
file_cache.remove(parquet_key).await;
|
||||
|
||||
if sst.index_metadata.file_size > 0 {
|
||||
let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin);
|
||||
file_cache.remove(puffin_key).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Cleans uploaded files.
|
||||
for file_path in &self.files_uploaded {
|
||||
if let Err(e) = remote_store.delete(file_path).await {
|
||||
common_telemetry::error!(e; "Failed to delete file {}", file_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
|
||||
Reference in New Issue
Block a user