diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs new file mode 100644 index 0000000000..e4d384d0f9 --- /dev/null +++ b/src/mito2/src/gc.rs @@ -0,0 +1,677 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! GC worker which periodically checks and removes unused/obsolete SST files. +//! +//! `expel time`: the time when the file is considered as removed, as in removed from the manifest. +//! `lingering time`: the time duration before deleting files after they are removed from manifest. +//! `delta manifest`: the manifest files after the last checkpoint that contains the changes to the manifest. +//! `delete time`: the time when the file is actually deleted from the object store. +//! `unknown files`: files that are not recorded in the manifest, usually due to saved checkpoint which remove actions before the checkpoint. +//! + +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::time::Duration; + +use common_telemetry::{error, info, warn}; +use common_time::Timestamp; +use object_store::{Entry, Lister}; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt as _, ensure}; +use store_api::storage::{FileId, RegionId}; +use tokio_stream::StreamExt; + +use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; +use crate::config::MitoConfig; +use crate::error::{ + DurationOutOfRangeSnafu, EmptyRegionDirSnafu, JoinSnafu, OpenDalSnafu, RegionNotFoundSnafu, + Result, UnexpectedSnafu, +}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; +use crate::manifest::storage::manifest_compress_type; +use crate::metrics::GC_FILE_CNT; +use crate::region::opener::new_manifest_dir; +use crate::sst::file::delete_files; +use crate::sst::file_ref::TableFileRefsManifest; +use crate::sst::location::{self, region_dir_from_table_dir}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct GcReport { + /// deleted files per region + pub deleted_files: HashMap>, + /// Regions that need retry in next gc round, usually because their tmp ref files are outdated + pub need_retry_regions: HashSet, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct FileGcOption { + /// Lingering time before deleting files. + /// Should be long enough to allow long running queries to finish. + /// + /// TODO(discord9): long running queries should actively write tmp manifest files + /// to prevent deletion of files they are using. + #[serde(with = "humantime_serde")] + pub lingering_time: Duration, + /// Lingering time before deleting unknown files(files with undetermine expel time). + /// expel time is the time when the file is considered as removed, as in removed from the manifest. + /// This should only occur rarely, as manifest keep tracks in `removed_files` field + /// unless something goes wrong. + #[serde(with = "humantime_serde")] + pub unknown_file_lingering_time: Duration, + /// Maximum concurrent list operations per GC job. + /// This is used to limit the number of concurrent listing operations and speed up listing. + pub max_concurrent_lister_per_gc_job: usize, +} + +impl Default for FileGcOption { + fn default() -> Self { + Self { + // expect long running queries to be finished within a reasonable time + lingering_time: Duration::from_secs(60 * 5), + // 6 hours, for unknown expel time, which is when this file get removed from manifest, it should rarely happen, can keep it longer + unknown_file_lingering_time: Duration::from_secs(60 * 60 * 6), + max_concurrent_lister_per_gc_job: 32, + } + } +} + +pub struct LocalGcWorker { + pub(crate) access_layer: AccessLayerRef, + pub(crate) cache_manager: Option, + pub(crate) manifest_mgrs: HashMap, + /// Lingering time before deleting files. + pub(crate) opt: FileGcOption, + pub(crate) manifest_open_config: ManifestOpenConfig, + /// Tmp ref files manifest, used to determine which files are still in use by ongoing queries. + /// + /// Also contains manifest versions of regions when the tmp ref files are generated. + /// Used to determine whether the tmp ref files are outdated. + pub(crate) file_ref_manifest: TableFileRefsManifest, +} + +pub struct ManifestOpenConfig { + pub compress_manifest: bool, + pub manifest_checkpoint_distance: u64, + pub experimental_manifest_keep_removed_file_count: usize, + pub experimental_manifest_keep_removed_file_ttl: Duration, +} + +impl From for ManifestOpenConfig { + fn from(mito_config: MitoConfig) -> Self { + Self { + compress_manifest: mito_config.compress_manifest, + manifest_checkpoint_distance: mito_config.manifest_checkpoint_distance, + experimental_manifest_keep_removed_file_count: mito_config + .experimental_manifest_keep_removed_file_count, + experimental_manifest_keep_removed_file_ttl: mito_config + .experimental_manifest_keep_removed_file_ttl, + } + } +} + +impl LocalGcWorker { + /// Create a new LocalGcWorker, with `regions_to_gc` regions to GC. + /// The regions are specified by their `RegionId` and should all belong to the same table. + /// + pub async fn try_new( + access_layer: AccessLayerRef, + cache_manager: Option, + regions_to_gc: BTreeSet, + opt: FileGcOption, + manifest_open_config: ManifestOpenConfig, + file_ref_manifest: TableFileRefsManifest, + ) -> Result { + let table_id = regions_to_gc + .first() + .context(UnexpectedSnafu { + reason: "Expect at least one region, found none", + })? + .table_id(); + let mut zelf = Self { + access_layer, + cache_manager, + manifest_mgrs: HashMap::new(), + opt, + manifest_open_config, + file_ref_manifest, + }; + + // dedup just in case + for region_id in regions_to_gc { + ensure!( + region_id.table_id() == table_id, + UnexpectedSnafu { + reason: format!( + "All regions should belong to the same table, found region {} and table {}", + region_id, table_id + ), + } + ); + let mgr = zelf.open_mgr_for(region_id).await?; + zelf.manifest_mgrs.insert(region_id, mgr); + } + + Ok(zelf) + } + + /// Get tmp ref files for all current regions + /// + /// Outdated regions are added to `outdated_regions` set + pub async fn read_tmp_ref_files( + &self, + outdated_regions: &mut HashSet, + ) -> Result>> { + for (region_id, region_mgr) in &self.manifest_mgrs { + let current_version = region_mgr.manifest().manifest_version; + if ¤t_version + > self + .file_ref_manifest + .manifest_version + .get(region_id) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Region {} not found in tmp ref manifest version map", + region_id + ), + })? + { + outdated_regions.insert(*region_id); + } + } + // TODO(discord9): verify manifest version before reading tmp ref files + + let mut tmp_ref_files = HashMap::new(); + for file_ref in &self.file_ref_manifest.file_refs { + if outdated_regions.contains(&file_ref.region_id) { + // skip outdated regions + continue; + } + tmp_ref_files + .entry(file_ref.region_id) + .or_insert_with(HashSet::new) + .insert(file_ref.file_id); + } + + Ok(tmp_ref_files) + } + + /// Run the GC worker in serial mode, + /// considering list files could be slow and run multiple regions in parallel + /// may cause too many concurrent listing operations. + /// + /// TODO(discord9): consider instead running in parallel mode + pub async fn run(self) -> Result { + info!("LocalGcWorker started"); + let now = std::time::Instant::now(); + + let mut outdated_regions = HashSet::new(); + let mut deleted_files = HashMap::new(); + let tmp_ref_files = self.read_tmp_ref_files(&mut outdated_regions).await?; + for region_id in self.manifest_mgrs.keys() { + info!("Doing gc for region {}", region_id); + let tmp_ref_files = tmp_ref_files + .get(region_id) + .cloned() + .unwrap_or_else(HashSet::new); + let files = self.do_region_gc(*region_id, &tmp_ref_files).await?; + deleted_files.insert(*region_id, files); + info!("Gc for region {} finished", region_id); + } + info!( + "LocalGcWorker finished after {} secs.", + now.elapsed().as_secs() + ); + let report = GcReport { + deleted_files, + need_retry_regions: outdated_regions.into_iter().collect(), + }; + Ok(report) + } +} + +impl LocalGcWorker { + /// concurrency of listing files per region. + /// This is used to limit the number of concurrent listing operations and speed up listing + pub const CONCURRENCY_LIST_PER_FILES: usize = 512; + + /// Perform GC for the region. + /// 1. Get all the removed files in delta manifest files and their expel times + /// 2. List all files in the region dir concurrently + /// 3. Filter out the files that are still in use or may still be kept for a while + /// 4. Delete the unused files + /// + /// Note that the files that are still in use or may still be kept for a while are not deleted + /// to avoid deleting files that are still needed. + pub async fn do_region_gc( + &self, + region_id: RegionId, + tmp_ref_files: &HashSet, + ) -> Result> { + info!("Doing gc for region {}", region_id); + let manifest = self + .manifest_mgrs + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .manifest(); + let region_id = manifest.metadata.region_id; + let current_files = &manifest.files; + + let recently_removed_files = self.get_removed_files_expel_times(region_id).await?; + + if recently_removed_files.is_empty() { + // no files to remove, skip + info!("No recently removed files to gc for region {}", region_id); + } + + info!( + "Found {} recently removed files sets for region {}", + recently_removed_files.len(), + region_id + ); + + let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES) + .max(1) + .min(self.opt.max_concurrent_lister_per_gc_job); + + let in_used = current_files + .keys() + .cloned() + .chain(tmp_ref_files.clone().into_iter()) + .collect(); + + let true_tmp_ref_files = tmp_ref_files + .iter() + .filter(|f| !current_files.contains_key(f)) + .collect::>(); + + info!("True tmp ref files: {:?}", true_tmp_ref_files); + + let unused_files = self + .list_to_be_deleted_files(region_id, in_used, recently_removed_files, concurrency) + .await?; + + let unused_len = unused_files.len(); + + info!( + "Found {} unused files to delete for region {}", + unused_len, region_id + ); + + self.delete_files(region_id, &unused_files).await?; + + info!( + "Successfully deleted {} unused files for region {}", + unused_len, region_id + ); + + Ok(unused_files) + } + + async fn delete_files(&self, region_id: RegionId, file_ids: &[FileId]) -> Result<()> { + delete_files( + region_id, + file_ids, + true, + &self.access_layer, + &self.cache_manager, + ) + .await?; + + GC_FILE_CNT.add(file_ids.len() as i64); + + Ok(()) + } + + /// Get the manifest manager for the region. + async fn open_mgr_for(&self, region_id: RegionId) -> Result { + let table_dir = self.access_layer.table_dir(); + let path_type = self.access_layer.path_type(); + let mito_config = &self.manifest_open_config; + + let region_manifest_options = RegionManifestOptions { + manifest_dir: new_manifest_dir(®ion_dir_from_table_dir( + table_dir, region_id, path_type, + )), + object_store: self.access_layer.object_store().clone(), + compress_type: manifest_compress_type(mito_config.compress_manifest), + checkpoint_distance: mito_config.manifest_checkpoint_distance, + remove_file_options: RemoveFileOptions { + keep_count: mito_config.experimental_manifest_keep_removed_file_count, + keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl, + }, + }; + + RegionManifestManager::open( + region_manifest_options, + Default::default(), + Default::default(), + ) + .await? + .context(EmptyRegionDirSnafu { + region_id, + region_dir: ®ion_dir_from_table_dir(table_dir, region_id, path_type), + }) + } + + /// Get all the removed files in delta manifest files and their expel times. + /// The expel time is the time when the file is considered as removed. + /// Which is the last modified time of delta manifest which contains the remove action. + /// + pub async fn get_removed_files_expel_times( + &self, + region_id: RegionId, + ) -> Result>> { + let region_manifest = self + .manifest_mgrs + .get(®ion_id) + .context(RegionNotFoundSnafu { region_id })? + .manifest(); + + let mut ret = BTreeMap::new(); + for files in ®ion_manifest.removed_files.removed_files { + let expel_time = Timestamp::new_millisecond(files.removed_at); + let set = ret.entry(expel_time).or_insert_with(HashSet::new); + set.extend(files.file_ids.iter().cloned()); + } + + Ok(ret) + } + + /// Create partitioned listers for concurrent file listing based on concurrency level. + /// Returns a vector of (lister, end_boundary) pairs for parallel processing. + async fn partition_region_files( + &self, + region_id: RegionId, + concurrency: usize, + ) -> Result)>> { + let region_dir = self.access_layer.build_region_dir(region_id); + + let partitions = gen_partition_from_concurrency(concurrency); + let bounds = vec![None] + .into_iter() + .chain(partitions.iter().map(|p| Some(p.clone()))) + .chain(vec![None]) + .collect::>(); + + let mut listers = vec![]; + for part in bounds.windows(2) { + let start = part[0].clone(); + let end = part[1].clone(); + let mut lister = self.access_layer.object_store().lister_with(®ion_dir); + if let Some(s) = start { + lister = lister.start_after(&s); + } + + let lister = lister.await.context(OpenDalSnafu)?; + listers.push((lister, end)); + } + + Ok(listers) + } + + /// Concurrently list all files in the region directory using the provided listers. + /// Returns a vector of all file entries found across all partitions. + async fn list_region_files_concurrent( + &self, + listers: Vec<(Lister, Option)>, + ) -> Result> { + let (tx, mut rx) = tokio::sync::mpsc::channel(1024); + let mut handles = vec![]; + + for (lister, end) in listers { + let tx = tx.clone(); + let handle = tokio::spawn(async move { + let stream = lister.take_while(|e: &std::result::Result| match e { + Ok(e) => { + if let Some(end) = &end { + // reach end, stop listing + e.name() < end.as_str() + } else { + // no end, take all entries + true + } + } + // entry went wrong, log and skip it + Err(err) => { + warn!("Failed to list entry: {}", err); + true + } + }); + let stream = stream + .filter(|e| { + if let Ok(e) = &e { + // notice that we only care about files, skip dirs + e.metadata().is_file() + } else { + // error entry, take for further logging + true + } + }) + .collect::>() + .await; + // ordering of files doesn't matter here, so we can send them directly + tx.send(stream).await.expect("Failed to send entries"); + }); + + handles.push(handle); + } + + // Wait for all listers to finish + for handle in handles { + handle.await.context(JoinSnafu)?; + } + + drop(tx); // Close the channel to stop receiving + + // Collect all entries from the channel + let mut all_entries = vec![]; + while let Some(stream) = rx.recv().await { + all_entries.extend(stream.into_iter().filter_map(Result::ok)); + } + + Ok(all_entries) + } + + /// Filter files to determine which ones can be deleted based on usage status and lingering time. + /// Returns a vector of file IDs that are safe to delete. + fn filter_deletable_files( + &self, + entries: Vec, + in_use_filenames: &HashSet<&FileId>, + may_linger_filenames: &HashSet<&FileId>, + all_files_appear_in_delta_manifests: &HashSet<&FileId>, + unknown_file_may_linger_until: chrono::DateTime, + ) -> (Vec, HashSet) { + let mut all_unused_files_ready_for_delete = vec![]; + let mut all_in_exist_linger_files = HashSet::new(); + + for entry in entries { + let file_id = match location::parse_file_id_from_path(entry.name()) { + Ok(file_id) => file_id, + Err(err) => { + error!(err; "Failed to parse file id from path: {}", entry.name()); + // if we can't parse the file id, it means it's not a sst or index file + // shouldn't delete it because we don't know what it is + continue; + } + }; + + if may_linger_filenames.contains(&file_id) { + all_in_exist_linger_files.insert(file_id); + } + + let should_delete = !in_use_filenames.contains(&file_id) + && !may_linger_filenames.contains(&file_id) + && { + if !all_files_appear_in_delta_manifests.contains(&file_id) { + // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while + // using it's last modified time + // notice unknown files use a different lingering time + entry + .metadata() + .last_modified() + .map(|t| t < unknown_file_may_linger_until) + .unwrap_or(false) + } else { + // if the file did appear in manifest delta(and passes previous predicate), we can delete it immediately + true + } + }; + + if should_delete { + all_unused_files_ready_for_delete.push(file_id); + } + } + + (all_unused_files_ready_for_delete, all_in_exist_linger_files) + } + + /// Concurrently list unused files in the region dir + /// because there may be a lot of files in the region dir + /// and listing them may take a long time. + pub async fn list_to_be_deleted_files( + &self, + region_id: RegionId, + in_used: HashSet, + recently_removed_files: BTreeMap>, + concurrency: usize, + ) -> Result> { + let now = chrono::Utc::now(); + let may_linger_until = now + - chrono::Duration::from_std(self.opt.lingering_time).with_context(|_| { + DurationOutOfRangeSnafu { + input: self.opt.lingering_time, + } + })?; + + let unknown_file_may_linger_until = now + - chrono::Duration::from_std(self.opt.unknown_file_lingering_time).with_context( + |_| DurationOutOfRangeSnafu { + input: self.opt.unknown_file_lingering_time, + }, + )?; + + // files that may linger, which means they are not in use but may still be kept for a while + let threshold = Timestamp::new_millisecond(may_linger_until.timestamp_millis()); + let mut recently_removed_files = recently_removed_files; + let may_linger_files = recently_removed_files.split_off(&threshold); + let may_linger_filenames = may_linger_files.values().flatten().collect::>(); + + let all_files_appear_in_delta_manifests = recently_removed_files + .values() + .flatten() + .collect::>(); + + // in use filenames, include sst and index files + let in_use_filenames = in_used.iter().collect::>(); + + // Step 1: Create partitioned listers for concurrent processing + let listers = self.partition_region_files(region_id, concurrency).await?; + + // Step 2: Concurrently list all files in the region directory + let all_entries = self.list_region_files_concurrent(listers).await?; + + // Step 3: Filter files to determine which ones can be deleted + let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self + .filter_deletable_files( + all_entries, + &in_use_filenames, + &may_linger_filenames, + &all_files_appear_in_delta_manifests, + unknown_file_may_linger_until, + ); + + info!("All in exist linger files: {:?}", all_in_exist_linger_files); + + Ok(all_unused_files_ready_for_delete) + } +} + +/// Generate partition prefixes based on concurrency and +/// assume file names are evenly-distributed uuid string, +/// to evenly distribute files across partitions. +/// For example, if concurrency is 2, partition prefixes will be: +/// ["8"] so it divide uuids into two partitions based on the first character. +/// If concurrency is 32, partition prefixes will be: +/// ["08", "10", "18", "20", "28", "30", "38" ..., "f0", "f8"] +/// if concurrency is 1, it returns an empty vector. +/// +fn gen_partition_from_concurrency(concurrency: usize) -> Vec { + let n = concurrency.next_power_of_two(); + if n <= 1 { + return vec![]; + } + + // `d` is the number of hex characters required to build the partition key. + // `p` is the total number of possible values for a key of length `d`. + // We need to find the smallest `d` such that 16^d >= n. + let mut d = 0; + let mut p: u128 = 1; + while p < n as u128 { + p *= 16; + d += 1; + } + + let total_space = p; + let step = total_space / n as u128; + + (1..n) + .map(|i| { + let boundary = i as u128 * step; + format!("{:0width$x}", boundary, width = d) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gen_partition_from_concurrency() { + let partitions = gen_partition_from_concurrency(1); + assert!(partitions.is_empty()); + + let partitions = gen_partition_from_concurrency(2); + assert_eq!(partitions, vec!["8"]); + + let partitions = gen_partition_from_concurrency(3); + assert_eq!(partitions, vec!["4", "8", "c"]); + + let partitions = gen_partition_from_concurrency(4); + assert_eq!(partitions, vec!["4", "8", "c"]); + + let partitions = gen_partition_from_concurrency(8); + assert_eq!(partitions, vec!["2", "4", "6", "8", "a", "c", "e"]); + + let partitions = gen_partition_from_concurrency(16); + assert_eq!( + partitions, + vec![ + "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" + ] + ); + + let partitions = gen_partition_from_concurrency(32); + assert_eq!( + partitions, + [ + "08", "10", "18", "20", "28", "30", "38", "40", "48", "50", "58", "60", "68", "70", + "78", "80", "88", "90", "98", "a0", "a8", "b0", "b8", "c0", "c8", "d0", "d8", "e0", + "e8", "f0", "f8", + ] + ); + } +} diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index ad4045c86e..69b0dc9996 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -36,6 +36,7 @@ pub mod error; #[cfg(feature = "enterprise")] pub mod extension; pub mod flush; +pub mod gc; pub mod manifest; pub mod memtable; mod metrics; diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 87b46ae3cb..0f923f60a6 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -456,6 +456,13 @@ lazy_static! { exponential_buckets(0.001, 10.0, 8).unwrap(), ) .unwrap(); + + /// Counter for the number of files deleted by the GC worker. + pub static ref GC_FILE_CNT: IntGauge = + register_int_gauge!( + "greptime_mito_gc_file_count", + "mito gc deleted file count", + ).unwrap(); } /// Stager notifier to collect metrics. diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 3dca030734..0e5635610f 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use common_base::readable_size::ReadableSize; +use common_telemetry::{error, info}; use common_time::Timestamp; use partition::expr::PartitionExpr; use serde::{Deserialize, Serialize}; @@ -28,6 +29,9 @@ use smallvec::SmallVec; use store_api::region_request::PathType; use store_api::storage::{FileId, RegionId}; +use crate::access_layer::AccessLayerRef; +use crate::cache::CacheManagerRef; +use crate::cache::file_cache::{FileType, IndexKey}; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location; @@ -359,6 +363,71 @@ impl FileHandleInner { } } +/// Delete +pub async fn delete_files( + region_id: RegionId, + file_ids: &[FileId], + delete_index: bool, + access_layer: &AccessLayerRef, + cache_manager: &Option, +) -> crate::error::Result<()> { + // Remove meta of the file from cache. + if let Some(cache) = &cache_manager { + for file_id in file_ids { + cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id)); + } + } + let mut deleted_files = Vec::with_capacity(file_ids.len()); + + for file_id in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + match access_layer.delete_sst(®ion_file_id).await { + Ok(_) => { + deleted_files.push(*file_id); + } + Err(e) => { + error!(e; "Failed to delete sst and index file for {}", region_file_id); + } + } + } + + info!( + "Deleted {} files for region {}: {:?}", + deleted_files.len(), + region_id, + deleted_files + ); + + for file_id in file_ids { + let region_file_id = RegionFileId::new(region_id, *file_id); + + if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) { + // Removes index file from the cache. + if delete_index { + write_cache + .remove(IndexKey::new(region_id, *file_id, FileType::Puffin)) + .await; + } + + // Remove the SST file from the cache. + write_cache + .remove(IndexKey::new(region_id, *file_id, FileType::Parquet)) + .await; + } + + // Purges index content in the stager. + if let Err(e) = access_layer + .puffin_manager_factory() + .purge_stager(region_file_id) + .await + { + error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", + file_id, region_id); + } + } + Ok(()) +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 4dc17315be..7bd0e6b515 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -15,14 +15,13 @@ use std::fmt; use std::sync::Arc; -use common_telemetry::{error, info}; +use common_telemetry::error; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; -use crate::cache::file_cache::{FileType, IndexKey}; use crate::error::Result; use crate::schedule::scheduler::SchedulerRef; -use crate::sst::file::FileMeta; +use crate::sst::file::{FileMeta, delete_files}; use crate::sst::file_ref::FileReferenceManagerRef; /// A worker to delete files in background. @@ -125,62 +124,18 @@ impl LocalFilePurger { /// Deletes the file(and it's index, if any) from cache and storage. fn delete_file(&self, file_meta: FileMeta) { let sst_layer = self.sst_layer.clone(); - - // Remove meta of the file from cache. - if let Some(cache) = &self.cache_manager { - cache.remove_parquet_meta_data(file_meta.file_id()); - } - let cache_manager = self.cache_manager.clone(); if let Err(e) = self.scheduler.schedule(Box::pin(async move { - if let Err(e) = sst_layer.delete_sst(&file_meta.file_id()).await { - error!(e; "Failed to delete SST file, file_id: {}, region: {}", - file_meta.file_id, file_meta.region_id); - } else { - info!( - "Successfully deleted SST file, file_id: {}, region: {}", - file_meta.file_id, file_meta.region_id - ); - } - - if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) + if let Err(e) = delete_files( + file_meta.region_id, + &[file_meta.file_id], + file_meta.exists_index(), + &sst_layer, + &cache_manager, + ) + .await { - // Removes index file from the cache. - if file_meta.exists_index() { - write_cache - .remove(IndexKey::new( - file_meta.region_id, - file_meta.file_id, - FileType::Puffin, - )) - .await; - } - // Remove the SST file from the cache. - write_cache - .remove(IndexKey::new( - file_meta.region_id, - file_meta.file_id, - FileType::Parquet, - )) - .await; - } - - // Purges index content in the stager. - if let Err(e) = sst_layer - .puffin_manager_factory() - .purge_stager(file_meta.file_id()) - .await - { - error!(e; "Failed to purge stager with index file, file_id: {}, region: {}", - file_meta.file_id(), file_meta.region_id); - } - let file_id = file_meta.file_id(); - if let Err(e) = sst_layer - .intermediate_manager() - .prune_sst_dir(&file_id.region_id(), &file_id.file_id()) - .await - { - error!(e; "Failed to prune intermediate sst directory, region_id: {}, file_id: {}", file_id.region_id(), file_id.file_id()); + error!(e; "Failed to delete file {:?} from storage", file_meta); } })) { error!(e; "Failed to schedule the file purge request"); diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs index 0f630b3a09..8b9fa9f88a 100644 --- a/src/mito2/src/sst/location.rs +++ b/src/mito2/src/sst/location.rs @@ -13,11 +13,13 @@ // limitations under the License. use object_store::util; +use snafu::OptionExt as _; use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR}; use store_api::path_utils::region_name; use store_api::region_request::PathType; -use store_api::storage::RegionId; +use store_api::storage::{FileId, RegionId}; +use crate::error::UnexpectedSnafu; use crate::sst::file::RegionFileId; /// Generate region dir from table_dir, region_id and path_type @@ -54,6 +56,33 @@ pub fn index_file_path( util::join_path(&index_dir, &format!("{}.puffin", region_file_id.file_id())) } +/// Get RegionFileId from sst or index filename +pub fn parse_file_id_from_path(filepath: &str) -> crate::error::Result { + let filename = filepath.rsplit('/').next().context(UnexpectedSnafu { + reason: format!("invalid file path: {}", filepath), + })?; + let parts: Vec<&str> = filename.split('.').collect(); + if parts.len() != 2 { + return UnexpectedSnafu { + reason: format!("invalid file name: {}", filename), + } + .fail(); + } + if parts[1] != "parquet" && parts[1] != "puffin" { + return UnexpectedSnafu { + reason: format!("invalid file extension: {}", parts[1]), + } + .fail(); + } + let file_id = parts[0]; + FileId::parse_str(file_id).map_err(|e| { + UnexpectedSnafu { + reason: format!("invalid file id: {}, err: {}", file_id, e), + } + .build() + }) +} + #[cfg(test)] mod tests { use store_api::storage::{FileId, RegionId}; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 32bfe3ecbc..84bdbdf7a8 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -47,7 +47,7 @@ pub const MAX_REGION_SEQ: u32 = REGION_SEQ_MASK; /// └────────────────────────────────────┴──────────┴──────────────────┘ /// Region Number(32) /// ``` -#[derive(Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct RegionId(u64); impl RegionId {