From bacd9c7d15c51476811170441a951aa0c19a1db5 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 28 Aug 2025 17:08:18 +0800 Subject: [PATCH] feat: add event ts to region manifest (#6751) * feat: add event ts to region manifest Signed-off-by: discord9 delete files Signed-off-by: discord9 chore: clippy Signed-off-by: discord9 lower concurrency Signed-off-by: discord9 feat: gc use delta manifest to get expel time Signed-off-by: discord9 docs: terminalogy Signed-off-by: discord9 refactor: some advices from review Signed-off-by: discord9 feat: manifest add removed files field Signed-off-by: discord9 feat(WIP): add remove time in manifest Signed-off-by: discord9 wip: more config Signed-off-by: discord9 feat: manifest uodate removed files Signed-off-by: discord9 test: add remove file opts field Signed-off-by: discord9 test: fix test Signed-off-by: discord9 chore: delete gc.rs Signed-off-by: discord9 * feat: proper option name Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * test: update manifest size Signed-off-by: discord9 * test: fix eq Signed-off-by: discord9 * refactor: some per review Signed-off-by: discord9 * refactor: per review Signed-off-by: discord9 * refactor: more per review Signed-off-by: discord9 * test: update manifest size Signed-off-by: discord9 * test: update config Signed-off-by: discord9 * feat: keep count 0 means only use ttl Signed-off-by: discord9 --------- Signed-off-by: discord9 --- Cargo.lock | 1 + config/config.md | 2 + config/datanode.example.toml | 13 + src/mito2/Cargo.toml | 1 + src/mito2/src/compaction.rs | 1 + src/mito2/src/compaction/compactor.rs | 8 +- src/mito2/src/config.rs | 13 + src/mito2/src/engine.rs | 7 + src/mito2/src/engine/edit_region_test.rs | 3 + src/mito2/src/error.rs | 12 +- src/mito2/src/flush.rs | 2 + src/mito2/src/manifest/action.rs | 392 ++++++++++++++++++++- src/mito2/src/manifest/checkpointer.rs | 14 + src/mito2/src/manifest/manager.rs | 38 +- src/mito2/src/manifest/storage.rs | 2 +- src/mito2/src/manifest/tests/checkpoint.rs | 12 +- src/mito2/src/region.rs | 8 +- src/mito2/src/region/opener.rs | 6 +- src/mito2/src/test_util.rs | 1 + src/mito2/src/test_util/scheduler_util.rs | 1 + src/mito2/src/test_util/version_util.rs | 1 + src/mito2/src/worker/handle_truncate.rs | 2 + tests-integration/tests/http.rs | 2 + 23 files changed, 521 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c707a69ed3..0f6092dcec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7571,6 +7571,7 @@ dependencies = [ "async-trait", "bytemuck", "bytes", + "chrono", "common-base", "common-config", "common-datasource", diff --git a/config/config.md b/config/config.md index d2204a26e9..1d92390926 100644 --- a/config/config.md +++ b/config/config.md @@ -520,6 +520,8 @@ | `region_engine.mito.worker_channel_size` | Integer | `128` | Request channel size of each worker. | | `region_engine.mito.worker_request_batch_size` | Integer | `64` | Max batch size for a worker to handle requests. | | `region_engine.mito.manifest_checkpoint_distance` | Integer | `10` | Number of meta action updated to trigger a new checkpoint for the manifest. | +| `region_engine.mito.experimental_manifest_keep_removed_file_count` | Integer | `256` | Number of removed files to keep in manifest's `removed_files` field before also
remove them from `removed_files`. Mostly for debugging purpose.
If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
from `removed_files` field. | +| `region_engine.mito.experimental_manifest_keep_removed_file_ttl` | String | `1h` | How long to keep removed files in the `removed_files` field of manifest
after they are removed from manifest.
files will only be removed from `removed_files` field
if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached. | | `region_engine.mito.compress_manifest` | Bool | `false` | Whether to compress manifest and checkpoint file by gzip (default false). | | `region_engine.mito.max_background_flushes` | Integer | Auto | Max number of running background flush jobs (default: 1/2 of cpu cores). | | `region_engine.mito.max_background_compactions` | Integer | Auto | Max number of running background compaction jobs (default: 1/4 of cpu cores). | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index b1af1755e4..5bde0947e4 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -409,6 +409,19 @@ worker_request_batch_size = 64 ## Number of meta action updated to trigger a new checkpoint for the manifest. manifest_checkpoint_distance = 10 + +## Number of removed files to keep in manifest's `removed_files` field before also +## remove them from `removed_files`. Mostly for debugging purpose. +## If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files +## from `removed_files` field. +experimental_manifest_keep_removed_file_count = 256 + +## How long to keep removed files in the `removed_files` field of manifest +## after they are removed from manifest. +## files will only be removed from `removed_files` field +## if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached. +experimental_manifest_keep_removed_file_ttl = "1h" + ## Whether to compress manifest and checkpoint file by gzip (default false). compress_manifest = false diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index ccd406d25b..1bbc481965 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -20,6 +20,7 @@ async-stream.workspace = true async-trait.workspace = true bytemuck.workspace = true bytes.workspace = true +chrono.workspace = true common-base.workspace = true common-config.workspace = true common-datasource.workspace = true diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 086e965e92..d6f324c358 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -1082,6 +1082,7 @@ mod tests { object_store: env.access_layer.object_store().clone(), compress_type: CompressionType::Uncompressed, checkpoint_distance: 10, + remove_file_options: Default::default(), }, Default::default(), Default::default(), diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 493371a0ca..ec36e0a93d 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -36,7 +36,7 @@ use crate::compaction::{find_ttl, CompactionSstReaderBuilder}; use crate::config::MitoConfig; use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; use crate::manifest::storage::manifest_compress_type; use crate::metrics; use crate::read::Source; @@ -165,6 +165,10 @@ pub async fn open_compaction_region( object_store: object_store.clone(), compress_type: manifest_compress_type(mito_config.compress_manifest), checkpoint_distance: mito_config.manifest_checkpoint_distance, + remove_file_options: RemoveFileOptions { + keep_count: mito_config.experimental_manifest_keep_removed_file_count, + keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl, + }, }; RegionManifestManager::open( @@ -436,6 +440,8 @@ impl Compactor for DefaultCompactor { let edit = RegionEdit { files_to_add: merge_output.files_to_add, files_to_remove: merge_output.files_to_remove, + // Use current timestamp as the edit timestamp. + timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), compaction_time_window: merge_output .compaction_time_window .map(|seconds| Duration::from_secs(seconds as u64)), diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 2c527b5341..942a2c8dcd 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -65,6 +65,17 @@ pub struct MitoConfig { /// Number of meta action updated to trigger a new checkpoint /// for the manifest (default 10). pub manifest_checkpoint_distance: u64, + /// Number of removed files to keep in manifest's `removed_files` field before also + /// remove them from `removed_files`. Mostly for debugging purpose. + /// If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files + /// from `removed_files` field. + pub experimental_manifest_keep_removed_file_count: usize, + /// How long to keep removed files in the `removed_files` field of manifest + /// after they are removed from manifest. + /// files will only be removed from `removed_files` field + /// if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached. + #[serde(with = "humantime_serde")] + pub experimental_manifest_keep_removed_file_ttl: Duration, /// Whether to compress manifest and checkpoint file by gzip (default false). pub compress_manifest: bool, @@ -139,6 +150,8 @@ impl Default for MitoConfig { worker_channel_size: 128, worker_request_batch_size: 64, manifest_checkpoint_distance: 10, + experimental_manifest_keep_removed_file_count: 256, + experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60), compress_manifest: false, max_background_flushes: divide_num_cpus(2), max_background_compactions: divide_num_cpus(4), diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 3a0337b58d..febd2be45e 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -421,6 +421,7 @@ fn is_valid_region_edit(edit: &RegionEdit) -> bool { RegionEdit { files_to_add: _, files_to_remove: _, + timestamp_ms: _, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -967,6 +968,7 @@ mod tests { let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -977,6 +979,7 @@ mod tests { let edit = RegionEdit { files_to_add: vec![], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -987,6 +990,7 @@ mod tests { let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![FileMeta::default()], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -997,6 +1001,7 @@ mod tests { let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: Some(Duration::from_secs(1)), flushed_entry_id: None, flushed_sequence: None, @@ -1005,6 +1010,7 @@ mod tests { let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: Some(1), flushed_sequence: None, @@ -1013,6 +1019,7 @@ mod tests { let edit = RegionEdit { files_to_add: vec![FileMeta::default()], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: Some(1), diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs index aae883a828..f7377d1be7 100644 --- a/src/mito2/src/engine/edit_region_test.rs +++ b/src/mito2/src/engine/edit_region_test.rs @@ -92,6 +92,7 @@ async fn test_edit_region_schedule_compaction() { ..Default::default() }], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -179,6 +180,7 @@ async fn test_edit_region_fill_cache() { ..Default::default() }], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -229,6 +231,7 @@ async fn test_edit_region_concurrently() { let edit = RegionEdit { files_to_add: vec![sst], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index aa3d91ff4b..e4d93e1253 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1041,6 +1041,15 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Duration out of range: {input:?}"))] + DurationOutOfRange { + input: std::time::Duration, + #[snafu(source)] + error: chrono::OutOfRangeError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -1098,7 +1107,8 @@ impl ErrorExt for Error { | InvalidRegionOptions { .. } | InvalidWalReadRequest { .. } | PartitionOutOfRange { .. } - | ParseJobId { .. } => StatusCode::InvalidArguments, + | ParseJobId { .. } + | DurationOutOfRange { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ff20b7af4c..9d8bbc0826 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -446,6 +446,7 @@ impl RegionFlushTask { let edit = RegionEdit { files_to_add: file_metas, files_to_remove: Vec::new(), + timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), compaction_time_window: None, // The last entry has been flushed. flushed_entry_id: Some(version_data.last_entry_id), @@ -1002,6 +1003,7 @@ mod tests { RegionEdit { files_to_add: Vec::new(), files_to_remove: Vec::new(), + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index a0b041ff8a..7a0c1afa04 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -14,9 +14,10 @@ //! Defines [RegionMetaAction] related structs and [RegionCheckpoint]. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; +use chrono::Utc; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -24,7 +25,10 @@ use store_api::storage::{RegionId, SequenceNumber}; use store_api::ManifestVersion; use strum::Display; -use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; +use crate::error::{ + DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu, +}; +use crate::manifest::manager::RemoveFileOptions; use crate::sst::file::{FileId, FileMeta}; use crate::wal::EntryId; @@ -51,6 +55,9 @@ pub struct RegionChange { pub struct RegionEdit { pub files_to_add: Vec, pub files_to_remove: Vec, + /// event unix timestamp in milliseconds, help to determine file deletion time. + #[serde(default)] + pub timestamp_ms: Option, #[serde(with = "humantime_serde")] pub compaction_time_window: Option, pub flushed_entry_id: Option, @@ -62,11 +69,16 @@ pub struct RegionRemove { pub region_id: RegionId, } +/// Last data truncated in the region. +/// #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionTruncate { pub region_id: RegionId, #[serde(flatten)] pub kind: TruncateKind, + /// event unix timestamp in milliseconds, help to determine file deletion time. + #[serde(default)] + pub timestamp_ms: Option, } /// The kind of truncate operation. @@ -85,12 +97,24 @@ pub enum TruncateKind { } /// The region manifest data. -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Serialize, Deserialize, Clone, Debug)] +#[cfg_attr(test, derive(Eq))] pub struct RegionManifest { /// Metadata of the region. pub metadata: RegionMetadataRef, /// SST files. pub files: HashMap, + /// Removed files, which are not in the current manifest but may still be kept for a while. + /// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when + /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch. + /// + /// Using same checkpoint files and action files, the recovered manifest may differ in this + /// `removed_files` field, because the checkpointer may evict some removed files using + /// current machine time. This is acceptable because the removed files are not used in normal + /// read/write path. + /// + #[serde(default)] + pub removed_files: RemovedFilesRecord, /// Last WAL entry id of flushed data. pub flushed_entry_id: EntryId, /// Last sequence of flushed data. @@ -104,10 +128,24 @@ pub struct RegionManifest { pub compaction_time_window: Option, } +#[cfg(test)] +impl PartialEq for RegionManifest { + fn eq(&self, other: &Self) -> bool { + self.metadata == other.metadata + && self.files == other.files + && self.flushed_entry_id == other.flushed_entry_id + && self.flushed_sequence == other.flushed_sequence + && self.manifest_version == other.manifest_version + && self.truncated_entry_id == other.truncated_entry_id + && self.compaction_time_window == other.compaction_time_window + } +} + #[derive(Debug, Default)] pub struct RegionManifestBuilder { metadata: Option, files: HashMap, + pub removed_files: RemovedFilesRecord, flushed_entry_id: EntryId, flushed_sequence: SequenceNumber, manifest_version: ManifestVersion, @@ -122,6 +160,7 @@ impl RegionManifestBuilder { Self { metadata: Some(s.metadata), files: s.files, + removed_files: s.removed_files, flushed_entry_id: s.flushed_entry_id, manifest_version: s.manifest_version, flushed_sequence: s.flushed_sequence, @@ -143,6 +182,14 @@ impl RegionManifestBuilder { for file in edit.files_to_add { self.files.insert(file.file_id, file); } + self.removed_files.add_removed_files( + edit.files_to_remove + .iter() + .map(|meta| meta.file_id) + .collect(), + edit.timestamp_ms + .unwrap_or_else(|| Utc::now().timestamp_millis()), + ); for file in edit.files_to_remove { self.files.remove(&file.file_id); } @@ -168,8 +215,20 @@ impl RegionManifestBuilder { self.flushed_sequence = truncated_sequence; self.truncated_entry_id = Some(truncated_entry_id); self.files.clear(); + self.removed_files.add_removed_files( + self.files.values().map(|meta| meta.file_id).collect(), + truncate + .timestamp_ms + .unwrap_or_else(|| Utc::now().timestamp_millis()), + ); } TruncateKind::Partial { files_to_remove } => { + self.removed_files.add_removed_files( + files_to_remove.iter().map(|meta| meta.file_id).collect(), + truncate + .timestamp_ms + .unwrap_or_else(|| Utc::now().timestamp_millis()), + ); for file in files_to_remove { self.files.remove(&file.file_id); } @@ -177,6 +236,10 @@ impl RegionManifestBuilder { } } + pub fn files(&self) -> &HashMap { + &self.files + } + /// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata). pub fn contains_metadata(&self) -> bool { self.metadata.is_some() @@ -187,6 +250,7 @@ impl RegionManifestBuilder { Ok(RegionManifest { metadata, files: self.files, + removed_files: self.removed_files, flushed_entry_id: self.flushed_entry_id, flushed_sequence: self.flushed_sequence, manifest_version: self.manifest_version, @@ -196,8 +260,72 @@ impl RegionManifestBuilder { } } +/// A record of removed files in the region manifest. +/// This is used to keep track of files that have been removed from the manifest but may still +/// be kept for a while +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)] +pub struct RemovedFilesRecord { + /// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when + /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch. + pub removed_files: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)] +pub struct RemovedFiles { + /// The timestamp is the time when + /// the files are removed from manifest. The timestamp is in milliseconds since unix epoch. + pub removed_at: i64, + /// The set of file ids that are removed. + pub file_ids: HashSet, +} + +impl RemovedFilesRecord { + /// Add a record of removed files with the current timestamp. + pub fn add_removed_files(&mut self, file_ids: HashSet, at: i64) { + self.removed_files.push(RemovedFiles { + removed_at: at, + file_ids, + }); + } + + pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> { + let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum(); + if opt.keep_count > 0 && total_removed_files <= opt.keep_count { + return Ok(()); + } + + let mut cur_file_cnt = total_removed_files; + + let can_evict_until = chrono::Utc::now() + - chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu { + input: opt.keep_ttl, + })?; + + self.removed_files.sort_unstable_by_key(|f| f.removed_at); + let updated = std::mem::take(&mut self.removed_files) + .into_iter() + .filter_map(|f| { + if f.removed_at < can_evict_until.timestamp_millis() + && (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count) + { + // can evict all files + // TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent. + cur_file_cnt -= f.file_ids.len(); + None + } else { + Some(f) + } + }) + .collect(); + self.removed_files = updated; + + Ok(()) + } +} + // The checkpoint of region manifest, generated by checkpointer. -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Debug, Clone)] +#[cfg_attr(test, derive(PartialEq, Eq))] pub struct RegionCheckpoint { /// The last manifest version that this checkpoint compacts(inclusive). pub last_version: ManifestVersion, @@ -484,4 +612,260 @@ mod tests { } ); } + + #[test] + fn test_region_manifest_removed_files() { + let region_metadata = r#"{ + "column_metadatas": [ + { + "column_schema": { + "name": "a", + "data_type": {"Int64": {}}, + "is_nullable": false, + "is_time_index": false, + "default_constraint": null, + "metadata": {} + }, + "semantic_type": "Tag", + "column_id": 1 + }, + { + "column_schema": { + "name": "b", + "data_type": {"Float64": {}}, + "is_nullable": false, + "is_time_index": false, + "default_constraint": null, + "metadata": {} + }, + "semantic_type": "Field", + "column_id": 2 + }, + { + "column_schema": { + "name": "c", + "data_type": {"Timestamp": {"Millisecond": null}}, + "is_nullable": false, + "is_time_index": false, + "default_constraint": null, + "metadata": {} + }, + "semantic_type": "Timestamp", + "column_id": 3 + } + ], + "primary_key": [1], + "region_id": 4402341478400, + "schema_version": 0 + }"#; + + let metadata: RegionMetadataRef = + serde_json::from_str(region_metadata).expect("Failed to parse region metadata"); + let manifest = RegionManifest { + metadata: metadata.clone(), + files: HashMap::new(), + flushed_entry_id: 0, + flushed_sequence: 0, + manifest_version: 0, + truncated_entry_id: None, + compaction_time_window: None, + removed_files: RemovedFilesRecord { + removed_files: vec![RemovedFiles { + removed_at: 0, + file_ids: HashSet::from([FileId::parse_str( + "4b220a70-2b03-4641-9687-b65d94641208", + ) + .unwrap()]), + }], + }, + }; + + let json = serde_json::to_string(&manifest).unwrap(); + let new: RegionManifest = serde_json::from_str(&json).unwrap(); + + assert_eq!(manifest, new); + } + + /// Test if old version can still be deserialized then serialized to the new version. + #[test] + fn test_old_region_manifest_compat() { + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] + pub struct RegionManifestV1 { + /// Metadata of the region. + pub metadata: RegionMetadataRef, + /// SST files. + pub files: HashMap, + /// Last WAL entry id of flushed data. + pub flushed_entry_id: EntryId, + /// Last sequence of flushed data. + pub flushed_sequence: SequenceNumber, + /// Current manifest version. + pub manifest_version: ManifestVersion, + /// Last WAL entry id of truncated data. + pub truncated_entry_id: Option, + /// Inferred compaction time window. + #[serde(with = "humantime_serde")] + pub compaction_time_window: Option, + } + + let region_metadata = r#"{ + "column_metadatas": [ + { + "column_schema": { + "name": "a", + "data_type": {"Int64": {}}, + "is_nullable": false, + "is_time_index": false, + "default_constraint": null, + "metadata": {} + }, + "semantic_type": "Tag", + "column_id": 1 + }, + { + "column_schema": { + "name": "b", + "data_type": {"Float64": {}}, + "is_nullable": false, + "is_time_index": false, + "default_constraint": null, + "metadata": {} + }, + "semantic_type": "Field", + "column_id": 2 + }, + { + "column_schema": { + "name": "c", + "data_type": {"Timestamp": {"Millisecond": null}}, + "is_nullable": false, + "is_time_index": false, + "default_constraint": null, + "metadata": {} + }, + "semantic_type": "Timestamp", + "column_id": 3 + } + ], + "primary_key": [1], + "region_id": 4402341478400, + "schema_version": 0 + }"#; + + let metadata: RegionMetadataRef = + serde_json::from_str(region_metadata).expect("Failed to parse region metadata"); + + // first test v1 empty to new + let v1 = RegionManifestV1 { + metadata: metadata.clone(), + files: HashMap::new(), + flushed_entry_id: 0, + flushed_sequence: 0, + manifest_version: 0, + truncated_entry_id: None, + compaction_time_window: None, + }; + let json = serde_json::to_string(&v1).unwrap(); + let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap(); + assert_eq!( + new_from_old, + RegionManifest { + metadata: metadata.clone(), + files: HashMap::new(), + removed_files: Default::default(), + flushed_entry_id: 0, + flushed_sequence: 0, + manifest_version: 0, + truncated_entry_id: None, + compaction_time_window: None, + } + ); + + let new_manifest = RegionManifest { + metadata: metadata.clone(), + files: HashMap::new(), + removed_files: Default::default(), + flushed_entry_id: 0, + flushed_sequence: 0, + manifest_version: 0, + truncated_entry_id: None, + compaction_time_window: None, + }; + let json = serde_json::to_string(&new_manifest).unwrap(); + let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap(); + assert_eq!( + old_from_new, + RegionManifestV1 { + metadata: metadata.clone(), + files: HashMap::new(), + flushed_entry_id: 0, + flushed_sequence: 0, + manifest_version: 0, + truncated_entry_id: None, + compaction_time_window: None, + } + ); + + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] + pub struct RegionEditV1 { + pub files_to_add: Vec, + pub files_to_remove: Vec, + #[serde(with = "humantime_serde")] + pub compaction_time_window: Option, + pub flushed_entry_id: Option, + pub flushed_sequence: Option, + } + + /// Last data truncated in the region. + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] + pub struct RegionTruncateV1 { + pub region_id: RegionId, + pub kind: TruncateKind, + } + + let json = serde_json::to_string(&RegionEditV1 { + files_to_add: vec![], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }) + .unwrap(); + let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap(); + assert_eq!( + RegionEdit { + files_to_add: vec![], + files_to_remove: vec![], + timestamp_ms: None, + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + new_from_old + ); + + // test new version with timestamp_ms set can deserialize to old version + let new = RegionEdit { + files_to_add: vec![], + files_to_remove: vec![], + timestamp_ms: Some(42), + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }; + + let new_json = serde_json::to_string(&new).unwrap(); + + let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap(); + assert_eq!( + RegionEditV1 { + files_to_add: vec![], + files_to_remove: vec![], + compaction_time_window: None, + flushed_entry_id: None, + flushed_sequence: None, + }, + old_from_new + ); + } } diff --git a/src/mito2/src/manifest/checkpointer.rs b/src/mito2/src/manifest/checkpointer.rs index 6a65d6e6b8..88a2c683bf 100644 --- a/src/mito2/src/manifest/checkpointer.rs +++ b/src/mito2/src/manifest/checkpointer.rs @@ -20,6 +20,7 @@ use common_telemetry::{error, info}; use store_api::storage::RegionId; use store_api::{ManifestVersion, MIN_VERSION}; +use crate::error::Result; use crate::manifest::action::{RegionCheckpoint, RegionManifest}; use crate::manifest::manager::RegionManifestOptions; use crate::manifest::storage::ManifestObjectStore; @@ -118,6 +119,19 @@ impl Checkpointer { self.inner.last_checkpoint_version.load(Ordering::Relaxed) } + /// Update the `removed_files` field in the manifest by the options in `manifest_options`. + /// This should be called before maybe do checkpoint to update the manifest. + pub(crate) fn update_manifest_removed_files( + &self, + mut manifest: RegionManifest, + ) -> Result { + let opt = &self.manifest_options.remove_file_options; + + manifest.removed_files.evict_old_removed_files(opt)?; + + Ok(manifest) + } + /// Check if it's needed to do checkpoint for the region by the checkpoint distance. /// If needed, and there's no currently running checkpoint task, it will start a new checkpoint /// task running in the background. diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index e765528b66..a52c18db95 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -47,6 +47,28 @@ pub struct RegionManifestOptions { /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints. /// Set to 0 to disable checkpoint. pub checkpoint_distance: u64, + pub remove_file_options: RemoveFileOptions, +} + +/// Options for updating `removed_files` field in [RegionManifest]. +#[derive(Debug, Clone)] +pub struct RemoveFileOptions { + /// Number of removed files to keep in manifest's `removed_files` field before also + /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached. + pub keep_count: usize, + /// Duration to keep removed files in manifest's `removed_files` field before also + /// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached. + pub keep_ttl: std::time::Duration, +} + +#[cfg(any(test, feature = "test"))] +impl Default for RemoveFileOptions { + fn default() -> Self { + Self { + keep_count: 256, + keep_ttl: std::time::Duration::from_secs(3600), + } + } } // rewrite note: @@ -481,7 +503,10 @@ impl RegionManifestManager { } } let new_manifest = manifest_builder.try_build()?; - self.manifest = Arc::new(new_manifest); + let updated_manifest = self + .checkpointer + .update_manifest_removed_files(new_manifest)?; + self.manifest = Arc::new(updated_manifest); self.checkpointer .maybe_do_checkpoint(self.manifest.as_ref(), region_state); @@ -565,6 +590,10 @@ impl RegionManifestManager { } } + pub fn store(&self) -> ManifestObjectStore { + self.store.clone() + } + #[cfg(test)] pub(crate) fn checkpointer(&self) -> &Checkpointer { &self.checkpointer @@ -579,10 +608,6 @@ impl RegionManifestManager { assert_eq!(self.manifest.manifest_version, self.last_version()); assert_eq!(last_version, self.last_version()); } - - pub fn store(&self) -> ManifestObjectStore { - self.store.clone() - } } #[cfg(test)] @@ -786,6 +811,7 @@ mod test { RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -816,6 +842,6 @@ mod test { // get manifest size again let manifest_size = manager.manifest_usage(); - assert_eq!(manifest_size, 1226); + assert_eq!(manifest_size, 1669); } } diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index e2d0572958..746350b4bc 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -664,7 +664,7 @@ impl ManifestObjectStore { } #[derive(Serialize, Deserialize, Debug)] -struct CheckpointMetadata { +pub(crate) struct CheckpointMetadata { pub size: usize, /// The latest version this checkpoint contains. pub version: ManifestVersion, diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 77ef19591e..dff372eb7a 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -25,6 +25,7 @@ use crate::manifest::action::{ RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList, }; use crate::manifest::manager::RegionManifestManager; +use crate::manifest::storage::CheckpointMetadata; use crate::manifest::tests::utils::basic_region_metadata; use crate::region::{RegionLeaderState, RegionRoleState}; use crate::sst::file::{FileId, FileMeta}; @@ -72,6 +73,7 @@ fn nop_action() -> RegionMetaActionList { RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -178,9 +180,11 @@ async fn manager_with_checkpoint_distance_1() { .await .unwrap(); let raw_json = std::str::from_utf8(&raw_bytes).unwrap(); - let expected_json = - "{\"size\":901,\"version\":10,\"checksum\":2571452538,\"extend_metadata\":{}}"; - assert_eq!(expected_json, raw_json); + let checkpoint_metadata: CheckpointMetadata = + serde_json::from_str(raw_json).expect("Failed to parse checkpoint metadata"); + + // size and checksum vary due to different machine time, so we only check version + assert_eq!(checkpoint_metadata.version, 10); // reopen the manager manager.stop().await; @@ -261,6 +265,7 @@ async fn checkpoint_with_different_compression_types() { let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, @@ -323,6 +328,7 @@ fn generate_action_lists(num: usize) -> (Vec, Vec) let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit { files_to_add: vec![file_meta], files_to_remove: vec![], + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 2227bbe85a..e6dd142f0b 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -547,7 +547,7 @@ impl MitoRegion { #[derive(Debug)] pub(crate) struct ManifestContext { /// Manager to maintain manifest for this region. - manifest_manager: tokio::sync::RwLock, + pub(crate) manifest_manager: tokio::sync::RwLock, /// The state of the region. The region checks the state before updating /// manifest. state: AtomicCell, @@ -779,10 +779,8 @@ impl ManifestContext { } } } -} -#[cfg(test)] -impl ManifestContext { + #[cfg(test)] pub(crate) async fn manifest(&self) -> Arc { self.manifest_manager.read().await.manifest() } @@ -1129,6 +1127,7 @@ mod tests { object_store: env.access_layer.object_store().clone(), compress_type: CompressionType::Uncompressed, checkpoint_distance: 10, + remove_file_options: Default::default(), }, Default::default(), Default::default(), @@ -1193,6 +1192,7 @@ mod tests { object_store: access_layer.object_store().clone(), compress_type: CompressionType::Uncompressed, checkpoint_distance: 10, + remove_file_options: Default::default(), }, Default::default(), Default::default(), diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 48a303921a..e0c343994f 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -47,7 +47,7 @@ use crate::error::{ Result, StaleLogEntrySnafu, }; use crate::manifest::action::RegionManifest; -use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions}; use crate::manifest::storage::manifest_compress_type; use crate::memtable::bulk::part::BulkPart; use crate::memtable::time_partition::TimePartitions; @@ -506,6 +506,10 @@ impl RegionOpener { // Currently, the manifest storage doesn't have good support for changing compression algorithms. compress_type: manifest_compress_type(config.compress_manifest), checkpoint_distance: config.manifest_checkpoint_distance, + remove_file_options: RemoveFileOptions { + keep_count: config.experimental_manifest_keep_removed_file_count, + keep_ttl: config.experimental_manifest_keep_removed_file_ttl, + }, }) } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index fe70f5cc36..e6325b513e 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -557,6 +557,7 @@ impl TestEnv { object_store, compress_type, checkpoint_distance, + remove_file_options: Default::default(), }; if let Some(metadata) = initial_metadata { diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 3709148408..c33ec6b4e6 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -121,6 +121,7 @@ impl SchedulerEnv { object_store: self.access_layer.object_store().clone(), compress_type: CompressionType::Uncompressed, checkpoint_distance: 10, + remove_file_options: Default::default(), }, Default::default(), Default::default(), diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs index d8e4bd159d..63aca919a9 100644 --- a/src/mito2/src/test_util/version_util.rs +++ b/src/mito2/src/test_util/version_util.rs @@ -197,6 +197,7 @@ pub(crate) fn apply_edit( RegionEdit { files_to_add, files_to_remove: files_to_remove.to_vec(), + timestamp_ms: None, compaction_time_window: None, flushed_entry_id: None, flushed_sequence: None, diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 41ad5b8869..16a1b5a59a 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -56,6 +56,7 @@ impl RegionWorkerLoop { truncated_entry_id, truncated_sequence, }, + timestamp_ms: None, }; self.handle_manifest_truncate_action(region, truncate, sender); @@ -100,6 +101,7 @@ impl RegionWorkerLoop { kind: TruncateKind::Partial { files_to_remove: files_to_truncate, }, + timestamp_ms: None, }; self.handle_manifest_truncate_action(region, truncate, sender); } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 822c88031f..a0a9bd5d3f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1421,6 +1421,8 @@ enable_otlp_tracing = false worker_channel_size = 128 worker_request_batch_size = 64 manifest_checkpoint_distance = 10 +experimental_manifest_keep_removed_file_count = 256 +experimental_manifest_keep_removed_file_ttl = "1h" compress_manifest = false auto_flush_interval = "30m" enable_write_cache = false