diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 626f62e1e9..a0c313b210 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; -use object_store::ObjectStore; +use object_store::{ErrorKind, ObjectStore}; use smallvec::SmallVec; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; @@ -164,13 +164,18 @@ impl AccessLayer { fulltext_index_config: request.fulltext_index_config, bloom_filter_index_config: request.bloom_filter_index_config, }; + // We disable write cache on file system but we still use atomic write. + // TODO(yingwen): If we support other non-fs stores without the write cache, then + // we may have find a way to check whether we need the cleaner. + let cleaner = TempFileCleaner::new(region_id, self.object_store.clone()); let mut writer = ParquetWriter::new_with_object_store( self.object_store.clone(), request.metadata, indexer_builder, path_provider, ) - .await; + .await + .with_file_cleaner(cleaner); writer .write_all(request.source, request.max_sequence, write_opts) .await? @@ -217,6 +222,77 @@ pub struct SstWriteRequest { pub bloom_filter_index_config: BloomFilterConfig, } +/// Cleaner to remove temp files on the atomic write dir. +pub(crate) struct TempFileCleaner { + region_id: RegionId, + object_store: ObjectStore, +} + +impl TempFileCleaner { + /// Constructs the cleaner for the region and store. + pub(crate) fn new(region_id: RegionId, object_store: ObjectStore) -> Self { + Self { + region_id, + object_store, + } + } + + /// Removes the SST and index file from the local atomic dir by the file id. + pub(crate) async fn clean_by_file_id(&self, file_id: FileId) { + let sst_key = IndexKey::new(self.region_id, file_id, FileType::Parquet).to_string(); + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin).to_string(); + + Self::clean_atomic_dir_files(&self.object_store, &[&sst_key, &index_key]).await; + } + + /// Removes the files from the local atomic dir by their names. + pub(crate) async fn clean_atomic_dir_files( + local_store: &ObjectStore, + names_to_remove: &[&str], + ) { + // We don't know the actual suffix of the file under atomic dir, so we have + // to list the dir. The cost should be acceptable as there won't be to many files. + let Ok(entries) = local_store.list(ATOMIC_WRITE_DIR).await.inspect_err(|e| { + if e.kind() != ErrorKind::NotFound { + common_telemetry::error!(e; "Failed to list tmp files for {:?}", names_to_remove) + } + }) else { + return; + }; + + // In our case, we can ensure the file id is unique so it is safe to remove all files + // with the same file id under the atomic write dir. + let actual_files: Vec<_> = entries + .into_iter() + .filter_map(|entry| { + if entry.metadata().is_dir() { + return None; + } + + // Remove name that matches files_to_remove. + let should_remove = names_to_remove + .iter() + .any(|file| entry.name().starts_with(file)); + if should_remove { + Some(entry.path().to_string()) + } else { + None + } + }) + .collect(); + + common_telemetry::warn!( + "Clean files {:?} under atomic write dir for {:?}", + actual_files, + names_to_remove + ); + + if let Err(e) = local_store.delete_iter(actual_files).await { + common_telemetry::error!(e; "Failed to delete tmp file for {:?}", names_to_remove); + } + } +} + pub(crate) async fn new_fs_cache_store(root: &str) -> Result { let atomic_write_dir = join_dir(root, ATOMIC_WRITE_DIR); clean_dir(&atomic_write_dir).await?; diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 54fb5e47c0..2fdc53dd0a 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -14,6 +14,7 @@ //! A cache for files. +use std::fmt; use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -339,6 +340,18 @@ impl IndexKey { } } +impl fmt::Display for IndexKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}.{}.{}", + self.region_id.as_u64(), + self.file_id, + self.file_type.as_str() + ) + } +} + /// Type of the file. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum FileType { @@ -380,15 +393,7 @@ pub(crate) struct IndexValue { /// /// The file name format is `{region_id}.{file_id}.{file_type}` fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String { - join_path( - cache_file_dir, - &format!( - "{}.{}.{}", - key.region_id.as_u64(), - key.file_id, - key.file_type.as_str() - ), - ) + join_path(cache_file_dir, &key.to_string()) } /// Parse index key from the file name. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 974f0caef0..458acc968f 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -26,7 +26,7 @@ use store_api::storage::RegionId; use crate::access_layer::{ new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, - WriteCachePathProvider, + TempFileCleaner, WriteCachePathProvider, }; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; @@ -122,7 +122,7 @@ impl WriteCache { row_group_size: write_opts.row_group_size, puffin_manager: self .puffin_manager_factory - .build(store, path_provider.clone()), + .build(store.clone(), path_provider.clone()), intermediate_manager: self.intermediate_manager.clone(), index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, @@ -130,14 +130,16 @@ impl WriteCache { bloom_filter_index_config: write_request.bloom_filter_index_config, }; + let cleaner = TempFileCleaner::new(region_id, store.clone()); // Write to FileCache. let mut writer = ParquetWriter::new_with_object_store( - self.file_cache.local_store(), + store.clone(), write_request.metadata, indexer, - path_provider, + path_provider.clone(), ) - .await; + .await + .with_file_cleaner(cleaner); let sst_info = writer .write_all(write_request.source, write_request.max_sequence, write_opts) @@ -201,6 +203,26 @@ impl WriteCache { remote_path: &str, remote_store: &ObjectStore, file_size: u64, + ) -> Result<()> { + if let Err(e) = self + .download_without_cleaning(index_key, remote_path, remote_store, file_size) + .await + { + let filename = index_key.to_string(); + TempFileCleaner::clean_atomic_dir_files(&self.file_cache.local_store(), &[&filename]) + .await; + + return Err(e); + } + Ok(()) + } + + async fn download_without_cleaning( + &self, + index_key: IndexKey, + remote_path: &str, + remote_store: &ObjectStore, + file_size: u64, ) -> Result<()> { const DOWNLOAD_READER_CONCURRENCY: usize = 8; const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8); @@ -410,9 +432,11 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use super::*; - use crate::access_layer::OperationType; + use crate::access_layer::{OperationType, ATOMIC_WRITE_DIR}; use crate::cache::test_util::new_fs_store; use crate::cache::{CacheManager, CacheStrategy}; + use crate::error::InvalidBatchSnafu; + use crate::read::Source; use crate::region::options::IndexOptions; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::sst_util::{ @@ -578,4 +602,82 @@ mod tests { // Check parquet metadata assert_parquet_metadata_eq(write_parquet_metadata, reader.parquet_metadata()); } + + #[tokio::test] + async fn test_write_cache_clean_tmp_files() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); + let data_home = env.data_home().display().to_string(); + let mock_store = env.init_object_store_manager(); + + let write_cache_dir = create_temp_dir(""); + let write_cache_path = write_cache_dir.path().to_str().unwrap(); + let write_cache = env + .create_write_cache_from_path(write_cache_path, ReadableSize::mb(10)) + .await; + + // Create a cache manager using only write cache + let cache_manager = Arc::new( + CacheManager::builder() + .write_cache(Some(write_cache.clone())) + .build(), + ); + + // Create source + let metadata = Arc::new(sst_region_metadata()); + + // Creates a source that can return an error to abort the writer. + let source = Source::Iter(Box::new( + [ + Ok(new_batch_by_range(&["a", "d"], 0, 60)), + InvalidBatchSnafu { + reason: "Abort the writer", + } + .fail(), + ] + .into_iter(), + )); + + // Write to local cache and upload sst to mock remote store + let write_request = SstWriteRequest { + op_type: OperationType::Flush, + metadata, + source, + storage: None, + max_sequence: None, + cache_manager: cache_manager.clone(), + index_options: IndexOptions::default(), + inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), + }; + let write_opts = WriteOptions { + row_group_size: 512, + ..Default::default() + }; + let upload_request = SstUploadRequest { + dest_path_provider: RegionFilePathFactory::new(data_home.clone()), + remote_store: mock_store.clone(), + }; + + write_cache + .write_and_upload_sst(write_request, upload_request, &write_opts) + .await + .unwrap_err(); + let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR); + let mut entries = tokio::fs::read_dir(&atomic_write_dir).await.unwrap(); + let mut has_files = false; + while let Some(entry) = entries.next_entry().await.unwrap() { + if entry.file_type().await.unwrap().is_dir() { + continue; + } + has_files = true; + common_telemetry::warn!( + "Found remaining temporary file in atomic dir: {}", + entry.path().display() + ); + } + + assert!(!has_files); + } } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 3aad380eb5..9f5f4af05c 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -36,7 +36,7 @@ use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; -use crate::access_layer::{FilePathProvider, SstInfoArray}; +use crate::access_layer::{FilePathProvider, SstInfoArray, TempFileCleaner}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; use crate::sst::file::FileId; @@ -61,6 +61,8 @@ pub struct ParquetWriter, bytes_written: Arc, + /// Cleaner to remove temp files on failure. + file_cleaner: Option, } pub trait WriterFactory { @@ -105,6 +107,11 @@ where ) .await } + + pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self { + self.file_cleaner = Some(cleaner); + self + } } impl ParquetWriter @@ -132,6 +139,7 @@ where indexer_builder, current_indexer: Some(indexer), bytes_written: Arc::new(AtomicUsize::new(0)), + file_cleaner: None, } } @@ -152,6 +160,25 @@ where /// /// Returns the [SstInfo] if the SST is written. pub async fn write_all( + &mut self, + source: Source, + override_sequence: Option, // override the `sequence` field from `Source` + opts: &WriteOptions, + ) -> Result { + let res = self + .write_all_without_cleaning(source, override_sequence, opts) + .await; + if res.is_err() { + // Clean tmp files explicitly on failure. + let file_id = self.current_file; + if let Some(cleaner) = &self.file_cleaner { + cleaner.clean_by_file_id(file_id).await; + } + } + res + } + + async fn write_all_without_cleaning( &mut self, mut source: Source, override_sequence: Option, // override the `sequence` field from `Source` diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 1e5b44941d..65af855e04 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -659,6 +659,27 @@ impl TestEnv { Arc::new(write_cache) } + /// Creates a write cache from a path. + pub async fn create_write_cache_from_path( + &self, + path: &str, + capacity: ReadableSize, + ) -> WriteCacheRef { + let index_aux_path = self.data_home.path().join("index_aux"); + let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None) + .await + .unwrap(); + let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap()) + .await + .unwrap(); + + let write_cache = WriteCache::new_fs(path, capacity, None, puffin_mgr, intm_mgr) + .await + .unwrap(); + + Arc::new(write_cache) + } + pub fn get_schema_metadata_manager(&self) -> SchemaMetadataManagerRef { self.schema_metadata_manager.clone() }