fix: clean files under the atomic write dir on failure (#6112)

* fix: remove files under atomic dir on failure

* fix: clean atomic dir on download failure

* chore: update comment

* fix: clean if failed to write without write cache

* feat: add a TempFileCleaner to clean files on failure

* chore: after merge fix

* chore: more fix

---------

Co-authored-by: discord9 <55937128+discord9@users.noreply.github.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Yingwen
2025-05-16 19:18:11 +08:00
committed by GitHub
parent c7e9485534
commit 0ea9ab385d
5 changed files with 249 additions and 18 deletions

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use object_store::{ErrorKind, ObjectStore};
use smallvec::SmallVec;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
@@ -164,13 +164,18 @@ impl AccessLayer {
fulltext_index_config: request.fulltext_index_config,
bloom_filter_index_config: request.bloom_filter_index_config,
};
// We disable write cache on file system but we still use atomic write.
// TODO(yingwen): If we support other non-fs stores without the write cache, then
// we may have find a way to check whether we need the cleaner.
let cleaner = TempFileCleaner::new(region_id, self.object_store.clone());
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
request.metadata,
indexer_builder,
path_provider,
)
.await;
.await
.with_file_cleaner(cleaner);
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
@@ -217,6 +222,77 @@ pub struct SstWriteRequest {
pub bloom_filter_index_config: BloomFilterConfig,
}
/// Cleaner to remove temp files on the atomic write dir.
pub(crate) struct TempFileCleaner {
region_id: RegionId,
object_store: ObjectStore,
}
impl TempFileCleaner {
/// Constructs the cleaner for the region and store.
pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self {
Self {
region_id,
object_store,
}
}
/// Removes the SST and index file from the local atomic dir by the file id.
pub(crate) async fn clean_by_file_id(&self, file_id: FileId) {
let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string();
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string();
Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await;
}
/// Removes the files from the local atomic dir by their names.
pub(crate) async fn clean_atomic_dir_files(
local_store: &ObjectStore,
names_to_remove: &[&str],
) {
// We don't know the actual suffix of the file under atomic dir, so we have
// to list the dir. The cost should be acceptable as there won't be to many files.
let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| {
if e.kind() != ErrorKind::NotFound {
common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove)
}
}) else {
return;
};
// In our case, we can ensure the file id is unique so it is safe to remove all files
// with the same file id under the atomic write dir.
let actual_files: Vec<_> = entries
.into_iter()
.filter_map(|entry| {
if entry.metadata().is_dir() {
return None;
}
// Remove name that matches files_to_remove.
let should_remove = names_to_remove
.iter()
.any(|file| entry.name().starts_with(file));
if should_remove {
Some(entry.path().to_string())
} else {
None
}
})
.collect();
common_telemetry::warn!(
"Clean files {:?} under atomic write dir for {:?}",
actual_files,
names_to_remove
);
if let Err(e) = local_store.delete_iter(actual_files).await {
common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove);
}
}
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR);
clean_dir(&atomic_write_dir).await?;

View File

@@ -14,6 +14,7 @@
//! A cache for files.
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -339,6 +340,18 @@ impl IndexKey {
}
}
impl fmt::Display for IndexKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}.{}.{}",
self.region_id.as_u64(),
self.file_id,
self.file_type.as_str()
)
}
}
/// Type of the file.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FileType {
@@ -380,15 +393,7 @@ pub(crate) struct IndexValue {
///
/// The file name format is `{region_id}.{file_id}.{file_type}`
fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
join_path(
cache_file_dir,
&format!(
"{}.{}.{}",
key.region_id.as_u64(),
key.file_id,
key.file_type.as_str()
),
)
join_path(cache_file_dir, &key.to_string())
}
/// Parse index key from the file name.

View File

@@ -26,7 +26,7 @@ use store_api::storage::RegionId;
use crate::access_layer::{
new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest,
WriteCachePathProvider,
TempFileCleaner, WriteCachePathProvider,
};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
@@ -122,7 +122,7 @@ impl WriteCache {
row_group_size: write_opts.row_group_size,
puffin_manager: self
.puffin_manager_factory
.build(store, path_provider.clone()),
.build(store.clone(), path_provider.clone()),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
@@ -130,14 +130,16 @@ impl WriteCache {
bloom_filter_index_config: write_request.bloom_filter_index_config,
};
let cleaner = TempFileCleaner::new(region_id, store.clone());
// Write to FileCache.
let mut writer = ParquetWriter::new_with_object_store(
self.file_cache.local_store(),
store.clone(),
write_request.metadata,
indexer,
path_provider,
path_provider.clone(),
)
.await;
.await
.with_file_cleaner(cleaner);
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
@@ -201,6 +203,26 @@ impl WriteCache {
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
if let Err(e) = self
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
.await
{
let filename = index_key.to_string();
TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename])
.await;
return Err(e);
}
Ok(())
}
async fn download_without_cleaning(
&self,
index_key: IndexKey,
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
@@ -410,9 +432,11 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use super::*;
use crate::access_layer::OperationType;
use crate::access_layer::{OperationType, ATOMIC_WRITE_DIR};
use crate::cache::test_util::new_fs_store;
use crate::cache::{CacheManager, CacheStrategy};
use crate::error::InvalidBatchSnafu;
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::test_util::sst_util::{
@@ -578,4 +602,82 @@ mod tests {
// Check parquet metadata
assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata());
}
#[tokio::test]
async fn test_write_cache_clean_tmp_files() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let data_home = env.data_home().display().to_string();
let mock_store = env.init_object_store_manager();
let write_cache_dir = create_temp_dir("");
let write_cache_path = write_cache_dir.path().to_str().unwrap();
let write_cache = env
.create_write_cache_from_path(write_cache_path, ReadableSize::mb(10))
.await;
// Create a cache manager using only write cache
let cache_manager = Arc::new(
CacheManager::builder()
.write_cache(Some(write_cache.clone()))
.build(),
);
// Create source
let metadata = Arc::new(sst_region_metadata());
// Creates a source that can return an error to abort the writer.
let source = Source::Iter(Box::new(
[
Ok(new_batch_by_range(&["a", "d"], 0, 60)),
InvalidBatchSnafu {
reason: "Abort the writer",
}
.fail(),
]
.into_iter(),
));
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata,
source,
storage: None,
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
..Default::default()
};
let upload_request = SstUploadRequest {
dest_path_provider: RegionFilePathFactory::new(data_home.clone()),
remote_store: mock_store.clone(),
};
write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts)
.await
.unwrap_err();
let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);
let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap();
let mut has_files = false;
while let Some(entry) = entries.next_entry().await.unwrap() {
if entry.file_type().await.unwrap().is_dir() {
continue;
}
has_files = true;
common_telemetry::warn!(
"Found remaining temporary file in atomic dir: {}",
entry.path().display()
);
}
assert!(!has_files);
}
}

View File

@@ -36,7 +36,7 @@ use store_api::storage::SequenceNumber;
use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::access_layer::{FilePathProvider, SstInfoArray};
use crate::access_layer::{FilePathProvider, SstInfoArray, TempFileCleaner};
use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
use crate::read::{Batch, Source};
use crate::sst::file::FileId;
@@ -61,6 +61,8 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
/// Current active indexer.
current_indexer: Option<Indexer>,
bytes_written: Arc<AtomicUsize>,
/// Cleaner to remove temp files on failure.
file_cleaner: Option<TempFileCleaner>,
}
pub trait WriterFactory {
@@ -105,6 +107,11 @@ where
)
.await
}
pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
self.file_cleaner = Some(cleaner);
self
}
}
impl<F, I, P> ParquetWriter<F, I, P>
@@ -132,6 +139,7 @@ where
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
file_cleaner: None,
}
}
@@ -152,6 +160,25 @@ where
///
/// Returns the [SstInfo] if the SST is written.
pub async fn write_all(
&mut self,
source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
) -> Result<SstInfoArray> {
let res = self
.write_all_without_cleaning(source, override_sequence, opts)
.await;
if res.is_err() {
// Clean tmp files explicitly on failure.
let file_id = self.current_file;
if let Some(cleaner) = &self.file_cleaner {
cleaner.clean_by_file_id(file_id).await;
}
}
res
}
async fn write_all_without_cleaning(
&mut self,
mut source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`

View File

@@ -659,6 +659,27 @@ impl TestEnv {
Arc::new(write_cache)
}
/// Creates a write cache from a path.
pub async fn create_write_cache_from_path(
&self,
path: &str,
capacity: ReadableSize,
) -> WriteCacheRef {
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let write_cache = WriteCache::new_fs(path, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();
Arc::new(write_cache)
}
pub fn get_schema_metadata_manager(&self) -> SchemaMetadataManagerRef {
self.schema_metadata_manager.clone()
}