From 5b1fca825a3eb1c07b6b528bec864ee1131e594d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 25 Feb 2025 16:51:37 +0800 Subject: [PATCH] fix: remove cached and uploaded files on failure (#5590) --- src/mito2/src/cache/file_cache.rs | 3 ++ src/mito2/src/cache/write_cache.rs | 81 ++++++++++++++++++++++++++++-- 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index c0ea9629fe..54fb5e47c0 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -187,9 +187,12 @@ impl FileCache { } /// Removes a file from the cache explicitly. + /// It always tries to remove the file from the local store because we may not have the file + /// in the memory index if upload is failed. pub(crate) async fn remove(&self, key: IndexKey) { let file_path = self.cache_file_path(key); self.memory_index.remove(&key).await; + // Always delete the file from the local store. if let Err(e) = self.local_store.delete(&file_path).await { warn!(e; "Failed to delete a cached file {}", file_path); } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 257692c67b..974f0caef0 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -22,6 +22,7 @@ use common_telemetry::{debug, info}; use futures::AsyncWriteExt; use object_store::ObjectStore; use snafu::ResultExt; +use store_api::storage::RegionId; use crate::access_layer::{ new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, @@ -149,24 +150,41 @@ impl WriteCache { return Ok(sst_info); } + let mut upload_tracker = UploadTracker::new(region_id); + let mut err = None; let remote_store = &upload_request.remote_store; for sst in &sst_info { let parquet_key = IndexKey::new(region_id, sst.file_id, FileType::Parquet); let parquet_path = upload_request .dest_path_provider .build_sst_file_path(sst.file_id); - self.upload(parquet_key, &parquet_path, remote_store) - .await?; + if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await { + err = Some(e); + break; + } + upload_tracker.push_uploaded_file(parquet_path); if sst.index_metadata.file_size > 0 { let puffin_key = IndexKey::new(region_id, sst.file_id, FileType::Puffin); - let puffin_path = &upload_request + let puffin_path = upload_request .dest_path_provider .build_index_file_path(sst.file_id); - self.upload(puffin_key, puffin_path, remote_store).await?; + if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await { + err = Some(e); + break; + } + upload_tracker.push_uploaded_file(puffin_path); } } + if let Some(err) = err { + // Cleans files on failure. + upload_tracker + .clean(&sst_info, &self.file_cache, remote_store) + .await; + return Err(err); + } + Ok(sst_info) } @@ -332,6 +350,61 @@ pub struct SstUploadRequest { pub remote_store: ObjectStore, } +/// A structs to track files to upload and clean them if upload failed. +struct UploadTracker { + /// Id of the region to track. + region_id: RegionId, + /// Paths of files uploaded successfully. + files_uploaded: Vec, +} + +impl UploadTracker { + /// Creates a new instance of `UploadTracker` for a given region. + fn new(region_id: RegionId) -> Self { + Self { + region_id, + files_uploaded: Vec::new(), + } + } + + /// Add a file path to the list of uploaded files. + fn push_uploaded_file(&mut self, path: String) { + self.files_uploaded.push(path); + } + + /// Cleans uploaded files and files in the file cache at best effort. + async fn clean( + &self, + sst_info: &SstInfoArray, + file_cache: &FileCacheRef, + remote_store: &ObjectStore, + ) { + common_telemetry::info!( + "Start cleaning files on upload failure, region: {}, num_ssts: {}", + self.region_id, + sst_info.len() + ); + + // Cleans files in the file cache first. + for sst in sst_info { + let parquet_key = IndexKey::new(self.region_id, sst.file_id, FileType::Parquet); + file_cache.remove(parquet_key).await; + + if sst.index_metadata.file_size > 0 { + let puffin_key = IndexKey::new(self.region_id, sst.file_id, FileType::Puffin); + file_cache.remove(puffin_key).await; + } + } + + // Cleans uploaded files. + for file_path in &self.files_uploaded { + if let Err(e) = remote_store.delete(file_path).await { + common_telemetry::error!(e; "Failed to delete file {}", file_path); + } + } + } +} + #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir;