diff --git a/Cargo.lock b/Cargo.lock index 69f82d5986..3075cad48a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13724,6 +13724,7 @@ dependencies = [ "common-memory-manager", "common-meta", "common-procedure", + "common-procedure-test", "common-query", "common-recordbatch", "common-runtime", diff --git a/config/config.md b/config/config.md index 66b345b5ef..2ac11dd6e6 100644 --- a/config/config.md +++ b/config/config.md @@ -410,6 +410,9 @@ | `stats_persistence` | -- | -- | Configuration options for the stats persistence. | | `stats_persistence.ttl` | String | `0s` | TTL for the stats table that will be used to store the stats.
Set to `0s` to disable stats persistence.
Default is `0s`.
If you want to enable stats persistence, set the TTL to a value greater than 0.
It is recommended to set a small value, e.g., `3h`. | | `stats_persistence.interval` | String | `10m` | The interval to persist the stats. Default is `10m`.
The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`. | +| `gc` | -- | -- | -- | +| `gc.enable` | Bool | `false` | Whether GC is enabled. Default to false. Need to be the same with datanode's `mito.gc.enable`
If set to false, no GC will be performed | +| `gc.gc_cooldown_period` | String | `5m` | Cooldown period between GC operations on the same region. | | `logging` | -- | -- | The logging options. | | `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. | | `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. | @@ -581,6 +584,10 @@ | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | | `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.
Only available for `partition_tree` memtable. | | `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. | +| `region_engine.mito.gc` | -- | -- | -- | +| `region_engine.mito.gc.enable` | Bool | `false` | Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur | +| `region_engine.mito.gc.lingering_time` | String | `1m` | Lingering time before deleting files.
Should be long enough to allow long running queries to finish.
If set to None, then unused files will be deleted immediately. | +| `region_engine.mito.gc.unknown_file_lingering_time` | String | `1h` | Lingering time before deleting unknown files(files with undetermine expel time).
expel time is the time when the file is considered as removed, as in removed from the manifest.
This should only occur rarely, as manifest keep tracks in `removed_files` field
unless something goes wrong. | | `region_engine.file` | -- | -- | Enable the file engine. | | `region_engine.metric` | -- | -- | Metric engine options. | | `region_engine.metric.sparse_primary_key_encoding` | Bool | `true` | Whether to use sparse primary key encoding. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 85e5415e16..2631a089e1 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -672,6 +672,21 @@ data_freeze_threshold = 32768 ## Only available for `partition_tree` memtable. fork_dictionary_bytes = "1GiB" +[region_engine.mito.gc] +## Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur +enable = false + +## Lingering time before deleting files. +## Should be long enough to allow long running queries to finish. +## If set to None, then unused files will be deleted immediately. +lingering_time = "1m" + +## Lingering time before deleting unknown files(files with undetermine expel time). +## expel time is the time when the file is considered as removed, as in removed from the manifest. +## This should only occur rarely, as manifest keep tracks in `removed_files` field +## unless something goes wrong. +unknown_file_lingering_time = "1h" + [[region_engine]] ## Enable the file engine. [region_engine.file] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index aedd670740..fed6e3b814 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -315,6 +315,13 @@ ttl = "0s" ## The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`. interval = "10m" +[gc] +## Whether GC is enabled. Default to false. Need to be the same with datanode's `mito.gc.enable` +## If set to false, no GC will be performed +enable = false +## Cooldown period between GC operations on the same region. +gc_cooldown_period = "5m" + ## The logging options. [logging] ## The directory to store the log files. If set to empty, logs will not be written to files. diff --git a/src/common/meta/src/key/table_repart.rs b/src/common/meta/src/key/table_repart.rs index ce599865c1..30295f0d61 100644 --- a/src/common/meta/src/key/table_repart.rs +++ b/src/common/meta/src/key/table_repart.rs @@ -310,51 +310,17 @@ impl TableRepartManager { table_id: TableId, region_mapping: &HashMap>, ) -> Result<()> { - // Get current table repart with raw bytes for CAS operation - let Some(current_table_repart) = self.get_with_raw_bytes(table_id).await? else { - let mut new_table_repart_value = TableRepartValue::new(); - for (src, dsts) in region_mapping.iter() { - new_table_repart_value.update_mappings(*src, dsts); - } + let current = self.get_with_raw_bytes(table_id).await?; + let mut new_value = current + .as_ref() + .map(|c| c.inner.clone()) + .unwrap_or_else(TableRepartValue::new); - let (txn, _) = self.build_create_txn(table_id, &new_table_repart_value)?; - let result = self.kv_backend.txn(txn).await?; - ensure!( - result.succeeded, - crate::error::MetadataCorruptionSnafu { - err_msg: format!( - "Failed to create table repart for table {}: CAS operation failed", - table_id - ), - } - ); - - return Ok(()); - }; - - // Clone the current repart value and update mappings - let mut new_table_repart_value = current_table_repart.inner.clone(); for (src, dsts) in region_mapping.iter() { - new_table_repart_value.update_mappings(*src, dsts); + new_value.update_mappings(*src, dsts); } - // Execute atomic update - let (txn, _) = - self.build_update_txn(table_id, ¤t_table_repart, &new_table_repart_value)?; - - let result = self.kv_backend.txn(txn).await?; - - ensure!( - result.succeeded, - crate::error::MetadataCorruptionSnafu { - err_msg: format!( - "Failed to update mappings for table {}: CAS operation failed", - table_id - ), - } - ); - - Ok(()) + self.upsert_value(table_id, current, &new_value).await } /// Removes mappings from src region to dst regions. @@ -364,33 +330,59 @@ impl TableRepartManager { table_id: TableId, region_mapping: &HashMap>, ) -> Result<()> { - // Get current table repart with raw bytes for CAS operation - let current_table_repart = self + let current = self .get_with_raw_bytes(table_id) .await? .context(crate::error::TableRepartNotFoundSnafu { table_id })?; - // Clone the current repart value and remove mappings - let mut new_table_repart_value = current_table_repart.inner.clone(); + let mut new_value = current.inner.clone(); for (src, dsts) in region_mapping.iter() { - new_table_repart_value.remove_mappings(*src, dsts); + new_value.remove_mappings(*src, dsts); } - // Execute atomic update - let (txn, _) = - self.build_update_txn(table_id, ¤t_table_repart, &new_table_repart_value)?; + self.upsert_value(table_id, Some(current), &new_value).await + } - let result = self.kv_backend.txn(txn).await?; + /// Upserts the full repartition value with CAS semantics using the caller-provided snapshot. + /// If the value is empty and no existing repartition entry is present, it no-ops. + pub async fn upsert_value( + &self, + table_id: TableId, + current: Option>, + new_value: &TableRepartValue, + ) -> Result<()> { + if new_value.src_to_dst.is_empty() && current.is_none() { + // Nothing to persist and caller confirmed no existing entry. + return Ok(()); + } - ensure!( - result.succeeded, - crate::error::MetadataCorruptionSnafu { - err_msg: format!( - "Failed to remove mappings for table {}: CAS operation failed", - table_id - ), - } - ); + if let Some(current) = current { + let (txn, _) = self.build_update_txn(table_id, ¤t, new_value)?; + let result = self.kv_backend.txn(txn).await?; + + ensure!( + result.succeeded, + crate::error::MetadataCorruptionSnafu { + err_msg: format!( + "Failed to update repartition mappings for table {}: CAS operation failed", + table_id + ), + } + ); + } else { + let (txn, _) = self.build_create_txn(table_id, new_value)?; + let result = self.kv_backend.txn(txn).await?; + + ensure!( + result.succeeded, + crate::error::MetadataCorruptionSnafu { + err_msg: format!( + "Failed to create repartition mappings for table {}: CAS operation failed", + table_id + ), + } + ); + } Ok(()) } diff --git a/src/meta-srv/src/gc/handler.rs b/src/meta-srv/src/gc/handler.rs index 71d09d6796..d140e40d9f 100644 --- a/src/meta-srv/src/gc/handler.rs +++ b/src/meta-srv/src/gc/handler.rs @@ -203,7 +203,7 @@ impl GcScheduler { report.per_datanode_reports.insert(peer.id, dn_report); } Err(e) => { - error!("Failed to process datanode GC for peer {}: {:#?}", peer, e); + error!(e; "Failed to process datanode GC for peer {}", peer); // Note: We don't have a direct way to map peer to table_id here, // so we just log the error. The table_reports will contain individual region failures. report.failed_datanodes.entry(peer.id).or_default().push(e); @@ -277,8 +277,8 @@ impl GcScheduler { Ok(report) => combined_report.merge(report), Err(e) => { error!( - "Failed to GC regions {:?} on datanode {}: {}", - fast_list_regions, peer, e + e; "Failed to GC regions {:?} on datanode {}", + fast_list_regions, peer, ); // Add to need_retry_regions since it failed @@ -303,8 +303,8 @@ impl GcScheduler { Ok(report) => combined_report.merge(report), Err(e) => { error!( - "Failed to GC regions {:?} on datanode {}: {}", - need_full_list_regions, peer, e + e; "Failed to GC regions {:?} on datanode {}", + need_full_list_regions, peer, ); // Add to need_retry_regions since it failed diff --git a/src/meta-srv/src/gc/options.rs b/src/meta-srv/src/gc/options.rs index ed47e13304..f6da6b850a 100644 --- a/src/meta-srv/src/gc/options.rs +++ b/src/meta-srv/src/gc/options.rs @@ -40,6 +40,7 @@ pub struct GcSchedulerOptions { /// Concurrency for region GC within a table. pub region_gc_concurrency: usize, /// Backoff duration between retries. + #[serde(with = "humantime_serde")] pub retry_backoff_duration: Duration, /// Minimum region size threshold for GC (in bytes). pub min_region_size_threshold: u64, @@ -48,19 +49,23 @@ pub struct GcSchedulerOptions { /// Weight for file removal rate in GC scoring. pub file_removed_count_weight: f64, /// Cooldown period between GC operations on the same region. + #[serde(with = "humantime_serde")] pub gc_cooldown_period: Duration, /// Maximum number of regions to select for GC per table. pub regions_per_table_threshold: usize, /// Timeout duration for mailbox communication with datanodes. + #[serde(with = "humantime_serde")] pub mailbox_timeout: Duration, /// Interval for performing full file listing during GC to find orphan files. /// Full file listing is expensive but necessary to clean up orphan files. /// Set to a larger value (e.g., 24 hours) to balance performance and cleanup. /// Every Nth GC cycle will use full file listing, where N = full_file_listing_interval / TICKER_INTERVAL. + #[serde(with = "humantime_serde")] pub full_file_listing_interval: Duration, /// Interval for cleaning up stale region entries from the GC tracker. /// This removes entries for regions that no longer exist (e.g., after table drops). /// Set to a larger value (e.g., 6 hours) since this is just for memory cleanup. + #[serde(with = "humantime_serde")] pub tracker_cleanup_interval: Duration, } diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 14b66f9e60..0cc9fc5dc1 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -13,15 +13,16 @@ // limitations under the License. use std::any::Any; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply}; use common_meta::key::TableMetadataManagerRef; +use common_meta::key::table_repart::TableRepartValue; use common_meta::key::table_route::PhysicalTableRouteValue; -use common_meta::lock_key::RegionLock; +use common_meta::lock_key::{RegionLock, TableLock}; use common_meta::peer::Peer; use common_procedure::error::ToJsonSnafu; use common_procedure::{ @@ -69,8 +70,8 @@ async fn send_get_file_refs( Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, Err(e) => { error!( - "Failed to receive reply from datanode {} for GetFileRefs: {}", - peer, e + e; "Failed to receive reply from datanode {} for GetFileRefs instruction", + peer, ); return Err(e); } @@ -116,8 +117,8 @@ async fn send_gc_regions( Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?, Err(e) => { error!( - "Failed to receive reply from datanode {} for {}: {}", - peer, description, e + e; "Failed to receive reply from datanode {} for {}", + peer, description ); return Err(e); } @@ -136,8 +137,8 @@ async fn send_gc_regions( Ok(report) => Ok(report), Err(e) => { error!( - "Datanode {} reported error during GC for regions {:?}: {}", - peer, gc_regions, e + e; "Datanode {} reported error during GC for regions {:?}", + peer, gc_regions ); error::UnexpectedSnafu { violated: format!( @@ -222,6 +223,36 @@ impl BatchGcProcedure { } } + /// Test-only constructor to jump directly into the repartition update state. + /// Intended for integration tests that validate `cleanup_region_repartition` without + /// running the full batch GC state machine. + #[cfg(feature = "mock")] + pub fn new_update_repartition_for_test( + mailbox: MailboxRef, + table_metadata_manager: TableMetadataManagerRef, + server_addr: String, + regions: Vec, + file_refs: FileRefsManifest, + timeout: Duration, + ) -> Self { + Self { + mailbox, + table_metadata_manager, + data: BatchGcData { + state: State::UpdateRepartition, + server_addr, + regions, + full_file_listing: false, + timeout, + region_routes: HashMap::new(), + region_routes_override: HashMap::new(), + related_regions: HashMap::new(), + file_refs, + gc_report: Some(GcReport::default()), + }, + } + } + pub fn cast_result(res: Arc) -> Result { res.downcast_ref::().cloned().ok_or_else(|| { error::UnexpectedSnafu { @@ -273,37 +304,98 @@ impl BatchGcProcedure { /// Clean up region repartition info in kvbackend after GC /// according to cross reference in `FileRefsManifest`. - async fn cleanup_region_repartition(&self) -> Result<()> { - let mut table_grouped: HashMap>> = + async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> { + let mut cross_refs_grouped: HashMap>> = HashMap::new(); for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs { - table_grouped + cross_refs_grouped .entry(src_region.table_id()) .or_default() .entry(*src_region) .or_default() .extend(dst_regions.iter().copied()); } - // make sure for files without cross-region refs but with tmp refs, we DO NOT clean up repartition key entry - // so that dropped regions can still keep their region ids here - for src_region in self.data.file_refs.file_refs.keys() { - table_grouped + + let mut tmp_refs_grouped: HashMap> = HashMap::new(); + for (src_region, refs) in &self.data.file_refs.file_refs { + if refs.is_empty() { + continue; + } + + tmp_refs_grouped .entry(src_region.table_id()) .or_default() - .entry(*src_region) - .or_default(); + .insert(*src_region); } - for (table_id, region_mappings) in table_grouped { - let region_mapping = region_mappings - .iter() - .map(|(src_region, dst_regions)| { - (*src_region, dst_regions.iter().cloned().collect_vec()) - }) - .collect::>>(); - self.table_metadata_manager - .table_repart_manager() - .update_mappings(table_id, ®ion_mapping) + let repart_mgr = self.table_metadata_manager.table_repart_manager(); + + let mut table_ids: HashSet = cross_refs_grouped + .keys() + .copied() + .chain(tmp_refs_grouped.keys().copied()) + .collect(); + table_ids.extend(self.data.regions.iter().map(|r| r.table_id())); + + for table_id in table_ids { + let table_lock = TableLock::Write(table_id).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; + + let cross_refs = cross_refs_grouped + .get(&table_id) + .cloned() + .unwrap_or_default(); + let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default(); + + let current = repart_mgr + .get_with_raw_bytes(table_id) + .await + .context(KvBackendSnafu)?; + + let mut new_value = current + .as_ref() + .map(|v| (**v).clone()) + .unwrap_or_else(TableRepartValue::new); + + // We only touch regions involved in this GC batch for the current table to avoid + // clobbering unrelated repart entries. Start from the batch regions of this table. + let batch_src_regions: HashSet = self + .data + .regions + .iter() + .copied() + .filter(|r| r.table_id() == table_id) + .collect(); + + // Merge targets: only the batch regions of this table. This avoids touching unrelated + // repart entries; we just reconcile mappings for regions involved in the current GC + // cycle for this table. + let all_src_regions: HashSet = batch_src_regions; + + for src_region in all_src_regions { + let cross_dst = cross_refs.get(&src_region); + let has_tmp_ref = tmp_refs.contains(&src_region); + + if let Some(dst_regions) = cross_dst { + let mut set = BTreeSet::new(); + set.extend(dst_regions.iter().copied()); + new_value.src_to_dst.insert(src_region, set); + } else if has_tmp_ref { + // Keep a tombstone entry with an empty set so dropped regions that still + // have tmp refs are preserved; removing it would lose the repartition trace. + new_value.src_to_dst.insert(src_region, BTreeSet::new()); + } else { + new_value.src_to_dst.remove(&src_region); + } + } + + // If there is no repartition info to persist, skip creating/updating the key + if new_value.src_to_dst.is_empty() && current.is_none() { + continue; + } + + repart_mgr + .upsert_value(table_id, current, &new_value) .await .context(KvBackendSnafu)?; } @@ -594,7 +686,7 @@ impl Procedure for BatchGcProcedure { Self::TYPE_NAME } - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { match self.data.state { State::Start => { // Transition to Acquiring state @@ -610,7 +702,7 @@ impl Procedure for BatchGcProcedure { Ok(Status::executing(false)) } Err(e) => { - error!("Failed to get file references: {}", e); + error!(e; "Failed to get file references"); Err(ProcedureError::external(e)) } } @@ -625,12 +717,12 @@ impl Procedure for BatchGcProcedure { Ok(Status::executing(false)) } Err(e) => { - error!("Failed to send GC instructions: {}", e); + error!(e; "Failed to send GC instructions"); Err(ProcedureError::external(e)) } } } - State::UpdateRepartition => match self.cleanup_region_repartition().await { + State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await { Ok(()) => { info!( "Cleanup region repartition info completed successfully for regions {:?}", @@ -649,7 +741,7 @@ impl Procedure for BatchGcProcedure { Ok(Status::done_with_output(report)) } Err(e) => { - error!("Failed to cleanup region repartition info: {}", e); + error!(e; "Failed to cleanup region repartition info"); Err(ProcedureError::external(e)) } }, diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index dc93cf7d83..af77ce2036 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -129,8 +129,6 @@ pub struct GcConfig { /// Should be long enough to allow long running queries to finish. /// If set to None, then unused files will be deleted immediately. /// - /// TODO(discord9): long running queries should actively write tmp manifest files - /// to prevent deletion of files they are using. #[serde(with = "humantime_serde")] pub lingering_time: Option, /// Lingering time before deleting unknown files(files with undetermine expel time). diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 90207abe1e..0c6b965fd3 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -95,6 +95,7 @@ uuid.workspace = true zstd.workspace = true [dev-dependencies] +common-procedure-test.workspace = true common-wal = { workspace = true, features = ["testing"] } datafusion.workspace = true datafusion-expr.workspace = true diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs index 317b889cc5..1502099e7a 100644 --- a/tests-integration/src/tests/gc.rs +++ b/tests-integration/src/tests/gc.rs @@ -33,8 +33,10 @@ use crate::cluster::GreptimeDbClusterBuilder; use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config}; use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure}; +mod repart; + /// Helper function to get table route information for GC procedure -async fn get_table_route( +pub(super) async fn get_table_route( table_metadata_manager: &TableMetadataManagerRef, table_id: TableId, ) -> (Region2Peers, Vec) { @@ -62,7 +64,7 @@ async fn get_table_route( } /// Helper function to list all SST files -async fn list_sst_files(test_context: &TestContext) -> HashSet { +pub(super) async fn list_sst_files(test_context: &TestContext) -> HashSet { let mut sst_files = HashSet::new(); for datanode in test_context.datanodes().values() { @@ -82,7 +84,7 @@ async fn list_sst_files(test_context: &TestContext) -> HashSet { sst_files } -async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirGuard) { +pub(super) async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirGuard) { common_telemetry::init_default_ut_logging(); let test_name = uuid::Uuid::new_v4().to_string(); let (store_config, guard) = get_test_store_config(store_type); diff --git a/tests-integration/src/tests/gc/repart.rs b/tests-integration/src/tests/gc/repart.rs new file mode 100644 index 0000000000..7e7a3edd5a --- /dev/null +++ b/tests-integration/src/tests/gc/repart.rs @@ -0,0 +1,286 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::time::Duration; + +use common_meta::key::table_repart::TableRepartValue; +use common_procedure::{Procedure, Status}; +use common_procedure_test::new_test_procedure_context; +use meta_srv::gc::BatchGcProcedure; +use store_api::storage::{FileId, FileRef, FileRefsManifest, RegionId}; + +use crate::test_util::{StorageType, execute_sql}; +use crate::tests::gc::{distributed_with_gc, get_table_route}; + +#[tokio::test] +async fn test_cleanup_region_repartition_update_tombstone_remove() { + let _ = dotenv::dotenv(); + let (test_context, _guard) = distributed_with_gc(&StorageType::File).await; + let instance = test_context.frontend(); + let metasrv = test_context.metasrv(); + + let create_table_sql = r#" +CREATE TABLE test_cleanup_repartition ( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + host STRING +)PARTITION ON COLUMNS (host) ( + host < 'a', + host >= 'a' AND host < 'm', + host >= 'm' +) WITH (append_mode = 'true')"#; + execute_sql(&instance, create_table_sql).await; + + let table = instance + .catalog_manager() + .table("greptime", "public", "test_cleanup_repartition", None) + .await + .unwrap() + .unwrap(); + let table_id = table.table_info().table_id(); + + let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await; + let base_region = *regions.first().expect("table has at least one region"); + + let region_a = base_region; + let region_b = RegionId::new(table_id, base_region.region_number() + 1); + let region_c = RegionId::new(table_id, base_region.region_number() + 2); + + let dst_a_initial = RegionId::new(table_id, base_region.region_number() + 10); + let dst_b_initial = RegionId::new(table_id, base_region.region_number() + 20); + let dst_c_initial = RegionId::new(table_id, base_region.region_number() + 30); + let dst_a_new = RegionId::new(table_id, base_region.region_number() + 40); + + let repart_mgr = metasrv.table_metadata_manager().table_repart_manager(); + let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap(); + let mut initial_value = TableRepartValue::new(); + initial_value.update_mappings(region_a, &[dst_a_initial]); + initial_value.update_mappings(region_b, &[dst_b_initial]); + initial_value.update_mappings(region_c, &[dst_c_initial]); + repart_mgr + .upsert_value(table_id, current, &initial_value) + .await + .unwrap(); + + let mut manifest = FileRefsManifest::default(); + manifest + .cross_region_refs + .insert(region_a, [dst_a_new].into()); + manifest.file_refs.insert( + region_b, + [FileRef::new(region_b, FileId::random(), None)] + .into_iter() + .collect(), + ); + + let regions = vec![region_a, region_b, region_c]; + + let mut procedure = BatchGcProcedure::new_update_repartition_for_test( + metasrv.mailbox().clone(), + metasrv.table_metadata_manager().clone(), + metasrv.options().grpc.server_addr.clone(), + regions, + manifest, + Duration::from_secs(5), + ); + + let procedure_ctx = new_test_procedure_context(); + let status = procedure.execute(&procedure_ctx).await.unwrap(); + assert!(matches!(status, Status::Done { .. })); + + let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap(); + assert_eq!( + repart_after.src_to_dst.get(®ion_a), + Some(&BTreeSet::from([dst_a_new])) + ); + assert_eq!( + repart_after.src_to_dst.get(®ion_b), + Some(&BTreeSet::new()) + ); + assert!(!repart_after.src_to_dst.contains_key(®ion_c)); +} + +#[tokio::test] +async fn test_cleanup_region_repartition_preserve_uninvolved_entries() { + let _ = dotenv::dotenv(); + let (test_context, _guard) = distributed_with_gc(&StorageType::File).await; + let instance = test_context.frontend(); + let metasrv = test_context.metasrv(); + + let create_table_sql = r#" + CREATE TABLE test_cleanup_repartition_preserve ( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + host STRING + )PARTITION ON COLUMNS (host) ( + host < 'a', + host >= 'a' AND host < 'm', + host >= 'm' AND host < 'x', + host >= 'x' +) WITH (append_mode = 'true') + "#; + execute_sql(&instance, create_table_sql).await; + + let table = instance + .catalog_manager() + .table( + "greptime", + "public", + "test_cleanup_repartition_preserve", + None, + ) + .await + .unwrap() + .unwrap(); + let table_id = table.table_info().table_id(); + + let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await; + let base_region = *regions.first().expect("table has at least one region"); + + let region_a = base_region; + let region_b = RegionId::new(table_id, base_region.region_number() + 1); + let region_c = RegionId::new(table_id, base_region.region_number() + 2); + let region_d = RegionId::new(table_id, base_region.region_number() + 3); + + let dst_a_initial = RegionId::new(table_id, base_region.region_number() + 10); + let dst_b_initial = RegionId::new(table_id, base_region.region_number() + 20); + let dst_c_initial = RegionId::new(table_id, base_region.region_number() + 30); + let dst_d_initial = RegionId::new(table_id, base_region.region_number() + 50); + let dst_a_new = RegionId::new(table_id, base_region.region_number() + 60); + + let repart_mgr = metasrv.table_metadata_manager().table_repart_manager(); + let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap(); + let mut initial_value = TableRepartValue::new(); + initial_value.update_mappings(region_a, &[dst_a_initial]); + initial_value.update_mappings(region_b, &[dst_b_initial]); + initial_value.update_mappings(region_c, &[dst_c_initial]); + initial_value.update_mappings(region_d, &[dst_d_initial]); + repart_mgr + .upsert_value(table_id, current, &initial_value) + .await + .unwrap(); + + let mut manifest = FileRefsManifest::default(); + manifest + .cross_region_refs + .insert(region_a, [dst_a_new].into()); + manifest.file_refs.insert(region_b, Default::default()); + + let regions = vec![region_a, region_b, region_c]; + + let mut procedure = BatchGcProcedure::new_update_repartition_for_test( + metasrv.mailbox().clone(), + metasrv.table_metadata_manager().clone(), + metasrv.options().grpc.server_addr.clone(), + regions, + manifest, + Duration::from_secs(5), + ); + + let procedure_ctx = new_test_procedure_context(); + let status = procedure.execute(&procedure_ctx).await.unwrap(); + assert!(matches!(status, Status::Done { .. })); + + let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap(); + assert_eq!( + repart_after.src_to_dst.get(®ion_a), + Some(&BTreeSet::from([dst_a_new])) + ); + assert_eq!(repart_after.src_to_dst.get(®ion_b), None); + assert!(!repart_after.src_to_dst.contains_key(®ion_c)); + assert_eq!( + repart_after.src_to_dst.get(®ion_d), + Some(&BTreeSet::from([dst_d_initial])) + ); +} + +#[tokio::test] +async fn test_cleanup_region_repartition_remove_when_tmp_refs_empty() { + let _ = dotenv::dotenv(); + let (test_context, _guard) = distributed_with_gc(&StorageType::File).await; + let instance = test_context.frontend(); + let metasrv = test_context.metasrv(); + + let create_table_sql = r#" + CREATE TABLE test_cleanup_repartition_empty_tmp_refs ( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + host STRING + )PARTITION ON COLUMNS (host) ( + host < 'a', + host >= 'a' AND host < 'm', + host >= 'm' + ) WITH (append_mode = 'true') + "#; + execute_sql(&instance, create_table_sql).await; + + let table = instance + .catalog_manager() + .table( + "greptime", + "public", + "test_cleanup_repartition_empty_tmp_refs", + None, + ) + .await + .unwrap() + .unwrap(); + let table_id = table.table_info().table_id(); + + let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await; + let base_region = *regions.first().expect("table has at least one region"); + + let region_a = base_region; + let region_b = RegionId::new(table_id, base_region.region_number() + 1); + + let dst_a_initial = RegionId::new(table_id, base_region.region_number() + 10); + let dst_b_initial = RegionId::new(table_id, base_region.region_number() + 20); + + let repart_mgr = metasrv.table_metadata_manager().table_repart_manager(); + let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap(); + let mut initial_value = TableRepartValue::new(); + initial_value.update_mappings(region_a, &[dst_a_initial]); + initial_value.update_mappings(region_b, &[dst_b_initial]); + repart_mgr + .upsert_value(table_id, current, &initial_value) + .await + .unwrap(); + + let mut manifest = FileRefsManifest::default(); + // Simulate empty tmp refs snapshot for region_a: entry exists but set is empty. + manifest.file_refs.insert(region_a, Default::default()); + + let regions = vec![region_a]; + + let mut procedure = BatchGcProcedure::new_update_repartition_for_test( + metasrv.mailbox().clone(), + metasrv.table_metadata_manager().clone(), + metasrv.options().grpc.server_addr.clone(), + regions, + manifest, + Duration::from_secs(5), + ); + + let procedure_ctx = new_test_procedure_context(); + let status = procedure.execute(&procedure_ctx).await.unwrap(); + assert!(matches!(status, Status::Done { .. })); + + let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap(); + assert!(!repart_after.src_to_dst.contains_key(®ion_a)); + assert_eq!( + repart_after.src_to_dst.get(®ion_b), + Some(&BTreeSet::from([dst_b_initial])) + ); +}