feat: add event ts to region manifest (#6751)

* feat: add event ts to region manifest

Signed-off-by: discord9 <discord9@163.com>

delete files

Signed-off-by: discord9 <discord9@163.com>

chore: clippy

Signed-off-by: discord9 <discord9@163.com>

lower concurrency

Signed-off-by: discord9 <discord9@163.com>

feat: gc use delta manifest to get expel time

Signed-off-by: discord9 <discord9@163.com>

docs: terminalogy

Signed-off-by: discord9 <discord9@163.com>

refactor: some advices from review

Signed-off-by: discord9 <discord9@163.com>

feat: manifest add removed files field

Signed-off-by: discord9 <discord9@163.com>

feat(WIP): add remove time in manifest

Signed-off-by: discord9 <discord9@163.com>

wip: more config

Signed-off-by: discord9 <discord9@163.com>

feat: manifest uodate removed files

Signed-off-by: discord9 <discord9@163.com>

test: add remove file opts field

Signed-off-by: discord9 <discord9@163.com>

test: fix test

Signed-off-by: discord9 <discord9@163.com>

chore: delete gc.rs

Signed-off-by: discord9 <discord9@163.com>

* feat: proper option name

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* test: update manifest size

Signed-off-by: discord9 <discord9@163.com>

* test: fix eq

Signed-off-by: discord9 <discord9@163.com>

* refactor: some per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

* refactor: more per review

Signed-off-by: discord9 <discord9@163.com>

* test: update manifest size

Signed-off-by: discord9 <discord9@163.com>

* test: update config

Signed-off-by: discord9 <discord9@163.com>

* feat: keep count 0 means only use ttl

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-28 17:08:18 +08:00
committed by GitHub
parent f441598247
commit bacd9c7d15
23 changed files with 521 additions and 21 deletions

View File

@@ -20,6 +20,7 @@ async-stream.workspace = true
async-trait.workspace = true
bytemuck.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-config.workspace = true
common-datasource.workspace = true

View File

@@ -1082,6 +1082,7 @@ mod tests {
object_store: env.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),

View File

@@ -36,7 +36,7 @@ use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::metrics;
use crate::read::Source;
@@ -165,6 +165,10 @@ pub async fn open_compaction_region(
object_store: object_store.clone(),
compress_type: manifest_compress_type(mito_config.compress_manifest),
checkpoint_distance: mito_config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
keep_count: mito_config.experimental_manifest_keep_removed_file_count,
keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
},
};
RegionManifestManager::open(
@@ -436,6 +440,8 @@ impl Compactor for DefaultCompactor {
let edit = RegionEdit {
files_to_add: merge_output.files_to_add,
files_to_remove: merge_output.files_to_remove,
// Use current timestamp as the edit timestamp.
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: merge_output
.compaction_time_window
.map(|seconds| Duration::from_secs(seconds as u64)),

View File

@@ -65,6 +65,17 @@ pub struct MitoConfig {
/// Number of meta action updated to trigger a new checkpoint
/// for the manifest (default 10).
pub manifest_checkpoint_distance: u64,
/// Number of removed files to keep in manifest's `removed_files` field before also
/// remove them from `removed_files`. Mostly for debugging purpose.
/// If set to 0, it will only use `keep_removed_file_ttl` to decide when to remove files
/// from `removed_files` field.
pub experimental_manifest_keep_removed_file_count: usize,
/// How long to keep removed files in the `removed_files` field of manifest
/// after they are removed from manifest.
/// files will only be removed from `removed_files` field
/// if both `keep_removed_file_count` and `keep_removed_file_ttl` is reached.
#[serde(with = "humantime_serde")]
pub experimental_manifest_keep_removed_file_ttl: Duration,
/// Whether to compress manifest and checkpoint file by gzip (default false).
pub compress_manifest: bool,
@@ -139,6 +150,8 @@ impl Default for MitoConfig {
worker_channel_size: 128,
worker_request_batch_size: 64,
manifest_checkpoint_distance: 10,
experimental_manifest_keep_removed_file_count: 256,
experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
compress_manifest: false,
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),

View File

@@ -421,6 +421,7 @@ fn is_valid_region_edit(edit: &RegionEdit) -> bool {
RegionEdit {
files_to_add: _,
files_to_remove: _,
timestamp_ms: _,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -967,6 +968,7 @@ mod tests {
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -977,6 +979,7 @@ mod tests {
let edit = RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -987,6 +990,7 @@ mod tests {
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![FileMeta::default()],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -997,6 +1001,7 @@ mod tests {
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: Some(Duration::from_secs(1)),
flushed_entry_id: None,
flushed_sequence: None,
@@ -1005,6 +1010,7 @@ mod tests {
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: Some(1),
flushed_sequence: None,
@@ -1013,6 +1019,7 @@ mod tests {
let edit = RegionEdit {
files_to_add: vec![FileMeta::default()],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: Some(1),

View File

@@ -92,6 +92,7 @@ async fn test_edit_region_schedule_compaction() {
..Default::default()
}],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -179,6 +180,7 @@ async fn test_edit_region_fill_cache() {
..Default::default()
}],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -229,6 +231,7 @@ async fn test_edit_region_concurrently() {
let edit = RegionEdit {
files_to_add: vec![sst],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,

View File

@@ -1041,6 +1041,15 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Duration out of range: {input:?}"))]
DurationOutOfRange {
input: std::time::Duration,
#[snafu(source)]
error: chrono::OutOfRangeError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1098,7 +1107,8 @@ impl ErrorExt for Error {
| InvalidRegionOptions { .. }
| InvalidWalReadRequest { .. }
| PartitionOutOfRange { .. }
| ParseJobId { .. } => StatusCode::InvalidArguments,
| ParseJobId { .. }
| DurationOutOfRange { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. }
| Join { .. }

View File

@@ -446,6 +446,7 @@ impl RegionFlushTask {
let edit = RegionEdit {
files_to_add: file_metas,
files_to_remove: Vec::new(),
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
compaction_time_window: None,
// The last entry has been flushed.
flushed_entry_id: Some(version_data.last_entry_id),
@@ -1002,6 +1003,7 @@ mod tests {
RegionEdit {
files_to_add: Vec::new(),
files_to_remove: Vec::new(),
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,

View File

@@ -14,9 +14,10 @@
//! Defines [RegionMetaAction] related structs and [RegionCheckpoint].
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -24,7 +25,10 @@ use store_api::storage::{RegionId, SequenceNumber};
use store_api::ManifestVersion;
use strum::Display;
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::error::{
DurationOutOfRangeSnafu, RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu,
};
use crate::manifest::manager::RemoveFileOptions;
use crate::sst::file::{FileId, FileMeta};
use crate::wal::EntryId;
@@ -51,6 +55,9 @@ pub struct RegionChange {
pub struct RegionEdit {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
/// event unix timestamp in milliseconds, help to determine file deletion time.
#[serde(default)]
pub timestamp_ms: Option<i64>,
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option<Duration>,
pub flushed_entry_id: Option<EntryId>,
@@ -62,11 +69,16 @@ pub struct RegionRemove {
pub region_id: RegionId,
}
/// Last data truncated in the region.
///
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionTruncate {
pub region_id: RegionId,
#[serde(flatten)]
pub kind: TruncateKind,
/// event unix timestamp in milliseconds, help to determine file deletion time.
#[serde(default)]
pub timestamp_ms: Option<i64>,
}
/// The kind of truncate operation.
@@ -85,12 +97,24 @@ pub enum TruncateKind {
}
/// The region manifest data.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[cfg_attr(test, derive(Eq))]
pub struct RegionManifest {
/// Metadata of the region.
pub metadata: RegionMetadataRef,
/// SST files.
pub files: HashMap<FileId, FileMeta>,
/// Removed files, which are not in the current manifest but may still be kept for a while.
/// This is a list of (set of files, timestamp) pairs, where the timestamp is the time when
/// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
///
/// Using same checkpoint files and action files, the recovered manifest may differ in this
/// `removed_files` field, because the checkpointer may evict some removed files using
/// current machine time. This is acceptable because the removed files are not used in normal
/// read/write path.
///
#[serde(default)]
pub removed_files: RemovedFilesRecord,
/// Last WAL entry id of flushed data.
pub flushed_entry_id: EntryId,
/// Last sequence of flushed data.
@@ -104,10 +128,24 @@ pub struct RegionManifest {
pub compaction_time_window: Option<Duration>,
}
#[cfg(test)]
impl PartialEq for RegionManifest {
fn eq(&self, other: &Self) -> bool {
self.metadata == other.metadata
&& self.files == other.files
&& self.flushed_entry_id == other.flushed_entry_id
&& self.flushed_sequence == other.flushed_sequence
&& self.manifest_version == other.manifest_version
&& self.truncated_entry_id == other.truncated_entry_id
&& self.compaction_time_window == other.compaction_time_window
}
}
#[derive(Debug, Default)]
pub struct RegionManifestBuilder {
metadata: Option<RegionMetadataRef>,
files: HashMap<FileId, FileMeta>,
pub removed_files: RemovedFilesRecord,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
manifest_version: ManifestVersion,
@@ -122,6 +160,7 @@ impl RegionManifestBuilder {
Self {
metadata: Some(s.metadata),
files: s.files,
removed_files: s.removed_files,
flushed_entry_id: s.flushed_entry_id,
manifest_version: s.manifest_version,
flushed_sequence: s.flushed_sequence,
@@ -143,6 +182,14 @@ impl RegionManifestBuilder {
for file in edit.files_to_add {
self.files.insert(file.file_id, file);
}
self.removed_files.add_removed_files(
edit.files_to_remove
.iter()
.map(|meta| meta.file_id)
.collect(),
edit.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
);
for file in edit.files_to_remove {
self.files.remove(&file.file_id);
}
@@ -168,8 +215,20 @@ impl RegionManifestBuilder {
self.flushed_sequence = truncated_sequence;
self.truncated_entry_id = Some(truncated_entry_id);
self.files.clear();
self.removed_files.add_removed_files(
self.files.values().map(|meta| meta.file_id).collect(),
truncate
.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
);
}
TruncateKind::Partial { files_to_remove } => {
self.removed_files.add_removed_files(
files_to_remove.iter().map(|meta| meta.file_id).collect(),
truncate
.timestamp_ms
.unwrap_or_else(|| Utc::now().timestamp_millis()),
);
for file in files_to_remove {
self.files.remove(&file.file_id);
}
@@ -177,6 +236,10 @@ impl RegionManifestBuilder {
}
}
pub fn files(&self) -> &HashMap<FileId, FileMeta> {
&self.files
}
/// Check if the builder keeps a [RegionMetadata](store_api::metadata::RegionMetadata).
pub fn contains_metadata(&self) -> bool {
self.metadata.is_some()
@@ -187,6 +250,7 @@ impl RegionManifestBuilder {
Ok(RegionManifest {
metadata,
files: self.files,
removed_files: self.removed_files,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
manifest_version: self.manifest_version,
@@ -196,8 +260,72 @@ impl RegionManifestBuilder {
}
}
/// A record of removed files in the region manifest.
/// This is used to keep track of files that have been removed from the manifest but may still
/// be kept for a while
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct RemovedFilesRecord {
/// a list of `(FileIds, timestamp)` pairs, where the timestamp is the time when
/// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
pub removed_files: Vec<RemovedFiles>,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct RemovedFiles {
/// The timestamp is the time when
/// the files are removed from manifest. The timestamp is in milliseconds since unix epoch.
pub removed_at: i64,
/// The set of file ids that are removed.
pub file_ids: HashSet<FileId>,
}
impl RemovedFilesRecord {
/// Add a record of removed files with the current timestamp.
pub fn add_removed_files(&mut self, file_ids: HashSet<FileId>, at: i64) {
self.removed_files.push(RemovedFiles {
removed_at: at,
file_ids,
});
}
pub fn evict_old_removed_files(&mut self, opt: &RemoveFileOptions) -> Result<()> {
let total_removed_files: usize = self.removed_files.iter().map(|s| s.file_ids.len()).sum();
if opt.keep_count > 0 && total_removed_files <= opt.keep_count {
return Ok(());
}
let mut cur_file_cnt = total_removed_files;
let can_evict_until = chrono::Utc::now()
- chrono::Duration::from_std(opt.keep_ttl).context(DurationOutOfRangeSnafu {
input: opt.keep_ttl,
})?;
self.removed_files.sort_unstable_by_key(|f| f.removed_at);
let updated = std::mem::take(&mut self.removed_files)
.into_iter()
.filter_map(|f| {
if f.removed_at < can_evict_until.timestamp_millis()
&& (opt.keep_count == 0 || cur_file_cnt >= opt.keep_count)
{
// can evict all files
// TODO(discord9): maybe only evict to below keep_count? Maybe not, or the update might be too frequent.
cur_file_cnt -= f.file_ids.len();
None
} else {
Some(f)
}
})
.collect();
self.removed_files = updated;
Ok(())
}
}
// The checkpoint of region manifest, generated by checkpointer.
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[cfg_attr(test, derive(PartialEq, Eq))]
pub struct RegionCheckpoint {
/// The last manifest version that this checkpoint compacts(inclusive).
pub last_version: ManifestVersion,
@@ -484,4 +612,260 @@ mod tests {
}
);
}
#[test]
fn test_region_manifest_removed_files() {
let region_metadata = r#"{
"column_metadatas": [
{
"column_schema": {
"name": "a",
"data_type": {"Int64": {}},
"is_nullable": false,
"is_time_index": false,
"default_constraint": null,
"metadata": {}
},
"semantic_type": "Tag",
"column_id": 1
},
{
"column_schema": {
"name": "b",
"data_type": {"Float64": {}},
"is_nullable": false,
"is_time_index": false,
"default_constraint": null,
"metadata": {}
},
"semantic_type": "Field",
"column_id": 2
},
{
"column_schema": {
"name": "c",
"data_type": {"Timestamp": {"Millisecond": null}},
"is_nullable": false,
"is_time_index": false,
"default_constraint": null,
"metadata": {}
},
"semantic_type": "Timestamp",
"column_id": 3
}
],
"primary_key": [1],
"region_id": 4402341478400,
"schema_version": 0
}"#;
let metadata: RegionMetadataRef =
serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
let manifest = RegionManifest {
metadata: metadata.clone(),
files: HashMap::new(),
flushed_entry_id: 0,
flushed_sequence: 0,
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
removed_files: RemovedFilesRecord {
removed_files: vec![RemovedFiles {
removed_at: 0,
file_ids: HashSet::from([FileId::parse_str(
"4b220a70-2b03-4641-9687-b65d94641208",
)
.unwrap()]),
}],
},
};
let json = serde_json::to_string(&manifest).unwrap();
let new: RegionManifest = serde_json::from_str(&json).unwrap();
assert_eq!(manifest, new);
}
/// Test if old version can still be deserialized then serialized to the new version.
#[test]
fn test_old_region_manifest_compat() {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionManifestV1 {
/// Metadata of the region.
pub metadata: RegionMetadataRef,
/// SST files.
pub files: HashMap<FileId, FileMeta>,
/// Last WAL entry id of flushed data.
pub flushed_entry_id: EntryId,
/// Last sequence of flushed data.
pub flushed_sequence: SequenceNumber,
/// Current manifest version.
pub manifest_version: ManifestVersion,
/// Last WAL entry id of truncated data.
pub truncated_entry_id: Option<EntryId>,
/// Inferred compaction time window.
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option<Duration>,
}
let region_metadata = r#"{
"column_metadatas": [
{
"column_schema": {
"name": "a",
"data_type": {"Int64": {}},
"is_nullable": false,
"is_time_index": false,
"default_constraint": null,
"metadata": {}
},
"semantic_type": "Tag",
"column_id": 1
},
{
"column_schema": {
"name": "b",
"data_type": {"Float64": {}},
"is_nullable": false,
"is_time_index": false,
"default_constraint": null,
"metadata": {}
},
"semantic_type": "Field",
"column_id": 2
},
{
"column_schema": {
"name": "c",
"data_type": {"Timestamp": {"Millisecond": null}},
"is_nullable": false,
"is_time_index": false,
"default_constraint": null,
"metadata": {}
},
"semantic_type": "Timestamp",
"column_id": 3
}
],
"primary_key": [1],
"region_id": 4402341478400,
"schema_version": 0
}"#;
let metadata: RegionMetadataRef =
serde_json::from_str(region_metadata).expect("Failed to parse region metadata");
// first test v1 empty to new
let v1 = RegionManifestV1 {
metadata: metadata.clone(),
files: HashMap::new(),
flushed_entry_id: 0,
flushed_sequence: 0,
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
};
let json = serde_json::to_string(&v1).unwrap();
let new_from_old: RegionManifest = serde_json::from_str(&json).unwrap();
assert_eq!(
new_from_old,
RegionManifest {
metadata: metadata.clone(),
files: HashMap::new(),
removed_files: Default::default(),
flushed_entry_id: 0,
flushed_sequence: 0,
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
}
);
let new_manifest = RegionManifest {
metadata: metadata.clone(),
files: HashMap::new(),
removed_files: Default::default(),
flushed_entry_id: 0,
flushed_sequence: 0,
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
};
let json = serde_json::to_string(&new_manifest).unwrap();
let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
assert_eq!(
old_from_new,
RegionManifestV1 {
metadata: metadata.clone(),
files: HashMap::new(),
flushed_entry_id: 0,
flushed_sequence: 0,
manifest_version: 0,
truncated_entry_id: None,
compaction_time_window: None,
}
);
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionEditV1 {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
#[serde(with = "humantime_serde")]
pub compaction_time_window: Option<Duration>,
pub flushed_entry_id: Option<EntryId>,
pub flushed_sequence: Option<SequenceNumber>,
}
/// Last data truncated in the region.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionTruncateV1 {
pub region_id: RegionId,
pub kind: TruncateKind,
}
let json = serde_json::to_string(&RegionEditV1 {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
})
.unwrap();
let new_from_old: RegionEdit = serde_json::from_str(&json).unwrap();
assert_eq!(
RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
new_from_old
);
// test new version with timestamp_ms set can deserialize to old version
let new = RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: Some(42),
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
let new_json = serde_json::to_string(&new).unwrap();
let old_from_new: RegionEditV1 = serde_json::from_str(&new_json).unwrap();
assert_eq!(
RegionEditV1 {
files_to_add: vec![],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
},
old_from_new
);
}
}

View File

@@ -20,6 +20,7 @@ use common_telemetry::{error, info};
use store_api::storage::RegionId;
use store_api::{ManifestVersion, MIN_VERSION};
use crate::error::Result;
use crate::manifest::action::{RegionCheckpoint, RegionManifest};
use crate::manifest::manager::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
@@ -118,6 +119,19 @@ impl Checkpointer {
self.inner.last_checkpoint_version.load(Ordering::Relaxed)
}
/// Update the `removed_files` field in the manifest by the options in `manifest_options`.
/// This should be called before maybe do checkpoint to update the manifest.
pub(crate) fn update_manifest_removed_files(
&self,
mut manifest: RegionManifest,
) -> Result<RegionManifest> {
let opt = &self.manifest_options.remove_file_options;
manifest.removed_files.evict_old_removed_files(opt)?;
Ok(manifest)
}
/// Check if it's needed to do checkpoint for the region by the checkpoint distance.
/// If needed, and there's no currently running checkpoint task, it will start a new checkpoint
/// task running in the background.

View File

@@ -47,6 +47,28 @@ pub struct RegionManifestOptions {
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
/// Set to 0 to disable checkpoint.
pub checkpoint_distance: u64,
pub remove_file_options: RemoveFileOptions,
}
/// Options for updating `removed_files` field in [RegionManifest].
#[derive(Debug, Clone)]
pub struct RemoveFileOptions {
/// Number of removed files to keep in manifest's `removed_files` field before also
/// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
pub keep_count: usize,
/// Duration to keep removed files in manifest's `removed_files` field before also
/// remove them from `removed_files`. Only remove files when both `keep_count` and `keep_duration` is reached.
pub keep_ttl: std::time::Duration,
}
#[cfg(any(test, feature = "test"))]
impl Default for RemoveFileOptions {
fn default() -> Self {
Self {
keep_count: 256,
keep_ttl: std::time::Duration::from_secs(3600),
}
}
}
// rewrite note:
@@ -481,7 +503,10 @@ impl RegionManifestManager {
}
}
let new_manifest = manifest_builder.try_build()?;
self.manifest = Arc::new(new_manifest);
let updated_manifest = self
.checkpointer
.update_manifest_removed_files(new_manifest)?;
self.manifest = Arc::new(updated_manifest);
self.checkpointer
.maybe_do_checkpoint(self.manifest.as_ref(), region_state);
@@ -565,6 +590,10 @@ impl RegionManifestManager {
}
}
pub fn store(&self) -> ManifestObjectStore {
self.store.clone()
}
#[cfg(test)]
pub(crate) fn checkpointer(&self) -> &Checkpointer {
&self.checkpointer
@@ -579,10 +608,6 @@ impl RegionManifestManager {
assert_eq!(self.manifest.manifest_version, self.last_version());
assert_eq!(last_version, self.last_version());
}
pub fn store(&self) -> ManifestObjectStore {
self.store.clone()
}
}
#[cfg(test)]
@@ -786,6 +811,7 @@ mod test {
RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -816,6 +842,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1226);
assert_eq!(manifest_size, 1669);
}
}

View File

@@ -664,7 +664,7 @@ impl ManifestObjectStore {
}
#[derive(Serialize, Deserialize, Debug)]
struct CheckpointMetadata {
pub(crate) struct CheckpointMetadata {
pub size: usize,
/// The latest version this checkpoint contains.
pub version: ManifestVersion,

View File

@@ -25,6 +25,7 @@ use crate::manifest::action::{
RegionCheckpoint, RegionEdit, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::storage::CheckpointMetadata;
use crate::manifest::tests::utils::basic_region_metadata;
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::sst::file::{FileId, FileMeta};
@@ -72,6 +73,7 @@ fn nop_action() -> RegionMetaActionList {
RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -178,9 +180,11 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json =
"{\"size\":901,\"version\":10,\"checksum\":2571452538,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);
let checkpoint_metadata: CheckpointMetadata =
serde_json::from_str(raw_json).expect("Failed to parse checkpoint metadata");
// size and checksum vary due to different machine time, so we only check version
assert_eq!(checkpoint_metadata.version, 10);
// reopen the manager
manager.stop().await;
@@ -261,6 +265,7 @@ async fn checkpoint_with_different_compression_types() {
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
@@ -323,6 +328,7 @@ fn generate_action_lists(num: usize) -> (Vec<FileId>, Vec<RegionMetaActionList>)
let action = RegionMetaActionList::new(vec![RegionMetaAction::Edit(RegionEdit {
files_to_add: vec![file_meta],
files_to_remove: vec![],
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,

View File

@@ -547,7 +547,7 @@ impl MitoRegion {
#[derive(Debug)]
pub(crate) struct ManifestContext {
/// Manager to maintain manifest for this region.
manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
pub(crate) manifest_manager: tokio::sync::RwLock<RegionManifestManager>,
/// The state of the region. The region checks the state before updating
/// manifest.
state: AtomicCell<RegionRoleState>,
@@ -779,10 +779,8 @@ impl ManifestContext {
}
}
}
}
#[cfg(test)]
impl ManifestContext {
#[cfg(test)]
pub(crate) async fn manifest(&self) -> Arc<crate::manifest::action::RegionManifest> {
self.manifest_manager.read().await.manifest()
}
@@ -1129,6 +1127,7 @@ mod tests {
object_store: env.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),
@@ -1193,6 +1192,7 @@ mod tests {
object_store: access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),

View File

@@ -47,7 +47,7 @@ use crate::error::{
Result, StaleLogEntrySnafu,
};
use crate::manifest::action::RegionManifest;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::TimePartitions;
@@ -506,6 +506,10 @@ impl RegionOpener {
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
compress_type: manifest_compress_type(config.compress_manifest),
checkpoint_distance: config.manifest_checkpoint_distance,
remove_file_options: RemoveFileOptions {
keep_count: config.experimental_manifest_keep_removed_file_count,
keep_ttl: config.experimental_manifest_keep_removed_file_ttl,
},
})
}
}

View File

@@ -557,6 +557,7 @@ impl TestEnv {
object_store,
compress_type,
checkpoint_distance,
remove_file_options: Default::default(),
};
if let Some(metadata) = initial_metadata {

View File

@@ -121,6 +121,7 @@ impl SchedulerEnv {
object_store: self.access_layer.object_store().clone(),
compress_type: CompressionType::Uncompressed,
checkpoint_distance: 10,
remove_file_options: Default::default(),
},
Default::default(),
Default::default(),

View File

@@ -197,6 +197,7 @@ pub(crate) fn apply_edit(
RegionEdit {
files_to_add,
files_to_remove: files_to_remove.to_vec(),
timestamp_ms: None,
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,

View File

@@ -56,6 +56,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
truncated_entry_id,
truncated_sequence,
},
timestamp_ms: None,
};
self.handle_manifest_truncate_action(region, truncate, sender);
@@ -100,6 +101,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
kind: TruncateKind::Partial {
files_to_remove: files_to_truncate,
},
timestamp_ms: None,
};
self.handle_manifest_truncate_action(region, truncate, sender);
}