fix: gc update repart map properly (#7606)

* feat: update repart map

Signed-off-by: discord9 <discord9@163.com>

* fix: table id write lock

Signed-off-by: discord9 <discord9@163.com>

* chore: default value

Signed-off-by: discord9 <discord9@163.com>

* chore: config

Signed-off-by: discord9 <discord9@163.com>

* test: update repartition map

Signed-off-by: discord9 <discord9@163.com>

* fix: empty file ref set

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: properly log error

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-01-28 12:31:19 +08:00
committed by GitHub
parent 5bfc728d32
commit 00f568ed28
12 changed files with 507 additions and 101 deletions

1
Cargo.lock generated
View File

@@ -13724,6 +13724,7 @@ dependencies = [
"common-memory-manager",
"common-meta",
"common-procedure",
"common-procedure-test",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -410,6 +410,9 @@
| `stats_persistence` | -- | -- | Configuration options for the stats persistence. |
| `stats_persistence.ttl` | String | `0s` | TTL for the stats table that will be used to store the stats.<br/>Set to `0s` to disable stats persistence.<br/>Default is `0s`.<br/>If you want to enable stats persistence, set the TTL to a value greater than 0.<br/>It is recommended to set a small value, e.g., `3h`. |
| `stats_persistence.interval` | String | `10m` | The interval to persist the stats. Default is `10m`.<br/>The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`. |
| `gc` | -- | -- | -- |
| `gc.enable` | Bool | `false` | Whether GC is enabled. Default to false. Need to be the same with datanode's `mito.gc.enable`<br/>If set to false, no GC will be performed |
| `gc.gc_cooldown_period` | String | `5m` | Cooldown period between GC operations on the same region. |
| `logging` | -- | -- | The logging options. |
| `logging.dir` | String | `./greptimedb_data/logs` | The directory to store the log files. If set to empty, logs will not be written to files. |
| `logging.level` | String | Unset | The log level. Can be `info`/`debug`/`warn`/`error`. |
@@ -581,6 +584,10 @@
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.<br/>Only available for `partition_tree` memtable. |
| `region_engine.mito.gc` | -- | -- | -- |
| `region_engine.mito.gc.enable` | Bool | `false` | Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur |
| `region_engine.mito.gc.lingering_time` | String | `1m` | Lingering time before deleting files.<br/>Should be long enough to allow long running queries to finish.<br/>If set to None, then unused files will be deleted immediately. |
| `region_engine.mito.gc.unknown_file_lingering_time` | String | `1h` | Lingering time before deleting unknown files(files with undetermine expel time).<br/>expel time is the time when the file is considered as removed, as in removed from the manifest.<br/>This should only occur rarely, as manifest keep tracks in `removed_files` field<br/>unless something goes wrong. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.sparse_primary_key_encoding` | Bool | `true` | Whether to use sparse primary key encoding. |

View File

@@ -672,6 +672,21 @@ data_freeze_threshold = 32768
## Only available for `partition_tree` memtable.
fork_dictionary_bytes = "1GiB"
[region_engine.mito.gc]
## Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur
enable = false
## Lingering time before deleting files.
## Should be long enough to allow long running queries to finish.
## If set to None, then unused files will be deleted immediately.
lingering_time = "1m"
## Lingering time before deleting unknown files(files with undetermine expel time).
## expel time is the time when the file is considered as removed, as in removed from the manifest.
## This should only occur rarely, as manifest keep tracks in `removed_files` field
## unless something goes wrong.
unknown_file_lingering_time = "1h"
[[region_engine]]
## Enable the file engine.
[region_engine.file]

View File

@@ -315,6 +315,13 @@ ttl = "0s"
## The minimum value is `10m`, if the value is less than `10m`, it will be overridden to `10m`.
interval = "10m"
[gc]
## Whether GC is enabled. Default to false. Need to be the same with datanode's `mito.gc.enable`
## If set to false, no GC will be performed
enable = false
## Cooldown period between GC operations on the same region.
gc_cooldown_period = "5m"
## The logging options.
[logging]
## The directory to store the log files. If set to empty, logs will not be written to files.

View File

@@ -310,51 +310,17 @@ impl TableRepartManager {
table_id: TableId,
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
) -> Result<()> {
// Get current table repart with raw bytes for CAS operation
let Some(current_table_repart) = self.get_with_raw_bytes(table_id).await? else {
let mut new_table_repart_value = TableRepartValue::new();
for (src, dsts) in region_mapping.iter() {
new_table_repart_value.update_mappings(*src, dsts);
}
let current = self.get_with_raw_bytes(table_id).await?;
let mut new_value = current
.as_ref()
.map(|c| c.inner.clone())
.unwrap_or_else(TableRepartValue::new);
let (txn, _) = self.build_create_txn(table_id, &new_table_repart_value)?;
let result = self.kv_backend.txn(txn).await?;
ensure!(
result.succeeded,
crate::error::MetadataCorruptionSnafu {
err_msg: format!(
"Failed to create table repart for table {}: CAS operation failed",
table_id
),
}
);
return Ok(());
};
// Clone the current repart value and update mappings
let mut new_table_repart_value = current_table_repart.inner.clone();
for (src, dsts) in region_mapping.iter() {
new_table_repart_value.update_mappings(*src, dsts);
new_value.update_mappings(*src, dsts);
}
// Execute atomic update
let (txn, _) =
self.build_update_txn(table_id, &current_table_repart, &new_table_repart_value)?;
let result = self.kv_backend.txn(txn).await?;
ensure!(
result.succeeded,
crate::error::MetadataCorruptionSnafu {
err_msg: format!(
"Failed to update mappings for table {}: CAS operation failed",
table_id
),
}
);
Ok(())
self.upsert_value(table_id, current, &new_value).await
}
/// Removes mappings from src region to dst regions.
@@ -364,33 +330,59 @@ impl TableRepartManager {
table_id: TableId,
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
) -> Result<()> {
// Get current table repart with raw bytes for CAS operation
let current_table_repart = self
let current = self
.get_with_raw_bytes(table_id)
.await?
.context(crate::error::TableRepartNotFoundSnafu { table_id })?;
// Clone the current repart value and remove mappings
let mut new_table_repart_value = current_table_repart.inner.clone();
let mut new_value = current.inner.clone();
for (src, dsts) in region_mapping.iter() {
new_table_repart_value.remove_mappings(*src, dsts);
new_value.remove_mappings(*src, dsts);
}
// Execute atomic update
let (txn, _) =
self.build_update_txn(table_id, &current_table_repart, &new_table_repart_value)?;
self.upsert_value(table_id, Some(current), &new_value).await
}
let result = self.kv_backend.txn(txn).await?;
/// Upserts the full repartition value with CAS semantics using the caller-provided snapshot.
/// If the value is empty and no existing repartition entry is present, it no-ops.
pub async fn upsert_value(
&self,
table_id: TableId,
current: Option<DeserializedValueWithBytes<TableRepartValue>>,
new_value: &TableRepartValue,
) -> Result<()> {
if new_value.src_to_dst.is_empty() && current.is_none() {
// Nothing to persist and caller confirmed no existing entry.
return Ok(());
}
ensure!(
result.succeeded,
crate::error::MetadataCorruptionSnafu {
err_msg: format!(
"Failed to remove mappings for table {}: CAS operation failed",
table_id
),
}
);
if let Some(current) = current {
let (txn, _) = self.build_update_txn(table_id, &current, new_value)?;
let result = self.kv_backend.txn(txn).await?;
ensure!(
result.succeeded,
crate::error::MetadataCorruptionSnafu {
err_msg: format!(
"Failed to update repartition mappings for table {}: CAS operation failed",
table_id
),
}
);
} else {
let (txn, _) = self.build_create_txn(table_id, new_value)?;
let result = self.kv_backend.txn(txn).await?;
ensure!(
result.succeeded,
crate::error::MetadataCorruptionSnafu {
err_msg: format!(
"Failed to create repartition mappings for table {}: CAS operation failed",
table_id
),
}
);
}
Ok(())
}

View File

@@ -203,7 +203,7 @@ impl GcScheduler {
report.per_datanode_reports.insert(peer.id, dn_report);
}
Err(e) => {
error!("Failed to process datanode GC for peer {}: {:#?}", peer, e);
error!(e; "Failed to process datanode GC for peer {}", peer);
// Note: We don't have a direct way to map peer to table_id here,
// so we just log the error. The table_reports will contain individual region failures.
report.failed_datanodes.entry(peer.id).or_default().push(e);
@@ -277,8 +277,8 @@ impl GcScheduler {
Ok(report) => combined_report.merge(report),
Err(e) => {
error!(
"Failed to GC regions {:?} on datanode {}: {}",
fast_list_regions, peer, e
e; "Failed to GC regions {:?} on datanode {}",
fast_list_regions, peer,
);
// Add to need_retry_regions since it failed
@@ -303,8 +303,8 @@ impl GcScheduler {
Ok(report) => combined_report.merge(report),
Err(e) => {
error!(
"Failed to GC regions {:?} on datanode {}: {}",
need_full_list_regions, peer, e
e; "Failed to GC regions {:?} on datanode {}",
need_full_list_regions, peer,
);
// Add to need_retry_regions since it failed

View File

@@ -40,6 +40,7 @@ pub struct GcSchedulerOptions {
/// Concurrency for region GC within a table.
pub region_gc_concurrency: usize,
/// Backoff duration between retries.
#[serde(with = "humantime_serde")]
pub retry_backoff_duration: Duration,
/// Minimum region size threshold for GC (in bytes).
pub min_region_size_threshold: u64,
@@ -48,19 +49,23 @@ pub struct GcSchedulerOptions {
/// Weight for file removal rate in GC scoring.
pub file_removed_count_weight: f64,
/// Cooldown period between GC operations on the same region.
#[serde(with = "humantime_serde")]
pub gc_cooldown_period: Duration,
/// Maximum number of regions to select for GC per table.
pub regions_per_table_threshold: usize,
/// Timeout duration for mailbox communication with datanodes.
#[serde(with = "humantime_serde")]
pub mailbox_timeout: Duration,
/// Interval for performing full file listing during GC to find orphan files.
/// Full file listing is expensive but necessary to clean up orphan files.
/// Set to a larger value (e.g., 24 hours) to balance performance and cleanup.
/// Every Nth GC cycle will use full file listing, where N = full_file_listing_interval / TICKER_INTERVAL.
#[serde(with = "humantime_serde")]
pub full_file_listing_interval: Duration,
/// Interval for cleaning up stale region entries from the GC tracker.
/// This removes entries for regions that no longer exist (e.g., after table drops).
/// Set to a larger value (e.g., 6 hours) since this is just for memory cleanup.
#[serde(with = "humantime_serde")]
pub tracker_cleanup_interval: Duration,
}

View File

@@ -13,15 +13,16 @@
// limitations under the License.
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{self, GcRegions, GetFileRefs, GetFileRefsReply, InstructionReply};
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::table_repart::TableRepartValue;
use common_meta::key::table_route::PhysicalTableRouteValue;
use common_meta::lock_key::RegionLock;
use common_meta::lock_key::{RegionLock, TableLock};
use common_meta::peer::Peer;
use common_procedure::error::ToJsonSnafu;
use common_procedure::{
@@ -69,8 +70,8 @@ async fn send_get_file_refs(
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
error!(
"Failed to receive reply from datanode {} for GetFileRefs: {}",
peer, e
e; "Failed to receive reply from datanode {} for GetFileRefs instruction",
peer,
);
return Err(e);
}
@@ -116,8 +117,8 @@ async fn send_gc_regions(
Ok(reply_msg) => HeartbeatMailbox::json_reply(&reply_msg)?,
Err(e) => {
error!(
"Failed to receive reply from datanode {} for {}: {}",
peer, description, e
e; "Failed to receive reply from datanode {} for {}",
peer, description
);
return Err(e);
}
@@ -136,8 +137,8 @@ async fn send_gc_regions(
Ok(report) => Ok(report),
Err(e) => {
error!(
"Datanode {} reported error during GC for regions {:?}: {}",
peer, gc_regions, e
e; "Datanode {} reported error during GC for regions {:?}",
peer, gc_regions
);
error::UnexpectedSnafu {
violated: format!(
@@ -222,6 +223,36 @@ impl BatchGcProcedure {
}
}
/// Test-only constructor to jump directly into the repartition update state.
/// Intended for integration tests that validate `cleanup_region_repartition` without
/// running the full batch GC state machine.
#[cfg(feature = "mock")]
pub fn new_update_repartition_for_test(
mailbox: MailboxRef,
table_metadata_manager: TableMetadataManagerRef,
server_addr: String,
regions: Vec<RegionId>,
file_refs: FileRefsManifest,
timeout: Duration,
) -> Self {
Self {
mailbox,
table_metadata_manager,
data: BatchGcData {
state: State::UpdateRepartition,
server_addr,
regions,
full_file_listing: false,
timeout,
region_routes: HashMap::new(),
region_routes_override: HashMap::new(),
related_regions: HashMap::new(),
file_refs,
gc_report: Some(GcReport::default()),
},
}
}
pub fn cast_result(res: Arc<dyn Any>) -> Result<GcReport> {
res.downcast_ref::<GcReport>().cloned().ok_or_else(|| {
error::UnexpectedSnafu {
@@ -273,37 +304,98 @@ impl BatchGcProcedure {
/// Clean up region repartition info in kvbackend after GC
/// according to cross reference in `FileRefsManifest`.
async fn cleanup_region_repartition(&self) -> Result<()> {
let mut table_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
async fn cleanup_region_repartition(&self, procedure_ctx: &ProcedureContext) -> Result<()> {
let mut cross_refs_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
HashMap::new();
for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
table_grouped
cross_refs_grouped
.entry(src_region.table_id())
.or_default()
.entry(*src_region)
.or_default()
.extend(dst_regions.iter().copied());
}
// make sure for files without cross-region refs but with tmp refs, we DO NOT clean up repartition key entry
// so that dropped regions can still keep their region ids here
for src_region in self.data.file_refs.file_refs.keys() {
table_grouped
let mut tmp_refs_grouped: HashMap<TableId, HashSet<RegionId>> = HashMap::new();
for (src_region, refs) in &self.data.file_refs.file_refs {
if refs.is_empty() {
continue;
}
tmp_refs_grouped
.entry(src_region.table_id())
.or_default()
.entry(*src_region)
.or_default();
.insert(*src_region);
}
for (table_id, region_mappings) in table_grouped {
let region_mapping = region_mappings
.iter()
.map(|(src_region, dst_regions)| {
(*src_region, dst_regions.iter().cloned().collect_vec())
})
.collect::<HashMap<RegionId, Vec<RegionId>>>();
self.table_metadata_manager
.table_repart_manager()
.update_mappings(table_id, &region_mapping)
let repart_mgr = self.table_metadata_manager.table_repart_manager();
let mut table_ids: HashSet<TableId> = cross_refs_grouped
.keys()
.copied()
.chain(tmp_refs_grouped.keys().copied())
.collect();
table_ids.extend(self.data.regions.iter().map(|r| r.table_id()));
for table_id in table_ids {
let table_lock = TableLock::Write(table_id).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
let cross_refs = cross_refs_grouped
.get(&table_id)
.cloned()
.unwrap_or_default();
let tmp_refs = tmp_refs_grouped.get(&table_id).cloned().unwrap_or_default();
let current = repart_mgr
.get_with_raw_bytes(table_id)
.await
.context(KvBackendSnafu)?;
let mut new_value = current
.as_ref()
.map(|v| (**v).clone())
.unwrap_or_else(TableRepartValue::new);
// We only touch regions involved in this GC batch for the current table to avoid
// clobbering unrelated repart entries. Start from the batch regions of this table.
let batch_src_regions: HashSet<RegionId> = self
.data
.regions
.iter()
.copied()
.filter(|r| r.table_id() == table_id)
.collect();
// Merge targets: only the batch regions of this table. This avoids touching unrelated
// repart entries; we just reconcile mappings for regions involved in the current GC
// cycle for this table.
let all_src_regions: HashSet<RegionId> = batch_src_regions;
for src_region in all_src_regions {
let cross_dst = cross_refs.get(&src_region);
let has_tmp_ref = tmp_refs.contains(&src_region);
if let Some(dst_regions) = cross_dst {
let mut set = BTreeSet::new();
set.extend(dst_regions.iter().copied());
new_value.src_to_dst.insert(src_region, set);
} else if has_tmp_ref {
// Keep a tombstone entry with an empty set so dropped regions that still
// have tmp refs are preserved; removing it would lose the repartition trace.
new_value.src_to_dst.insert(src_region, BTreeSet::new());
} else {
new_value.src_to_dst.remove(&src_region);
}
}
// If there is no repartition info to persist, skip creating/updating the key
if new_value.src_to_dst.is_empty() && current.is_none() {
continue;
}
repart_mgr
.upsert_value(table_id, current, &new_value)
.await
.context(KvBackendSnafu)?;
}
@@ -594,7 +686,7 @@ impl Procedure for BatchGcProcedure {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
match self.data.state {
State::Start => {
// Transition to Acquiring state
@@ -610,7 +702,7 @@ impl Procedure for BatchGcProcedure {
Ok(Status::executing(false))
}
Err(e) => {
error!("Failed to get file references: {}", e);
error!(e; "Failed to get file references");
Err(ProcedureError::external(e))
}
}
@@ -625,12 +717,12 @@ impl Procedure for BatchGcProcedure {
Ok(Status::executing(false))
}
Err(e) => {
error!("Failed to send GC instructions: {}", e);
error!(e; "Failed to send GC instructions");
Err(ProcedureError::external(e))
}
}
}
State::UpdateRepartition => match self.cleanup_region_repartition().await {
State::UpdateRepartition => match self.cleanup_region_repartition(ctx).await {
Ok(()) => {
info!(
"Cleanup region repartition info completed successfully for regions {:?}",
@@ -649,7 +741,7 @@ impl Procedure for BatchGcProcedure {
Ok(Status::done_with_output(report))
}
Err(e) => {
error!("Failed to cleanup region repartition info: {}", e);
error!(e; "Failed to cleanup region repartition info");
Err(ProcedureError::external(e))
}
},

View File

@@ -129,8 +129,6 @@ pub struct GcConfig {
/// Should be long enough to allow long running queries to finish.
/// If set to None, then unused files will be deleted immediately.
///
/// TODO(discord9): long running queries should actively write tmp manifest files
/// to prevent deletion of files they are using.
#[serde(with = "humantime_serde")]
pub lingering_time: Option<Duration>,
/// Lingering time before deleting unknown files(files with undetermine expel time).

View File

@@ -95,6 +95,7 @@ uuid.workspace = true
zstd.workspace = true
[dev-dependencies]
common-procedure-test.workspace = true
common-wal = { workspace = true, features = ["testing"] }
datafusion.workspace = true
datafusion-expr.workspace = true

View File

@@ -33,8 +33,10 @@ use crate::cluster::GreptimeDbClusterBuilder;
use crate::test_util::{StorageType, TempDirGuard, execute_sql, get_test_store_config};
use crate::tests::test_util::{MockInstanceBuilder, TestContext, wait_procedure};
mod repart;
/// Helper function to get table route information for GC procedure
async fn get_table_route(
pub(super) async fn get_table_route(
table_metadata_manager: &TableMetadataManagerRef,
table_id: TableId,
) -> (Region2Peers, Vec<RegionId>) {
@@ -62,7 +64,7 @@ async fn get_table_route(
}
/// Helper function to list all SST files
async fn list_sst_files(test_context: &TestContext) -> HashSet<String> {
pub(super) async fn list_sst_files(test_context: &TestContext) -> HashSet<String> {
let mut sst_files = HashSet::new();
for datanode in test_context.datanodes().values() {
@@ -82,7 +84,7 @@ async fn list_sst_files(test_context: &TestContext) -> HashSet<String> {
sst_files
}
async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirGuard) {
pub(super) async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirGuard) {
common_telemetry::init_default_ut_logging();
let test_name = uuid::Uuid::new_v4().to_string();
let (store_config, guard) = get_test_store_config(store_type);

View File

@@ -0,0 +1,286 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::time::Duration;
use common_meta::key::table_repart::TableRepartValue;
use common_procedure::{Procedure, Status};
use common_procedure_test::new_test_procedure_context;
use meta_srv::gc::BatchGcProcedure;
use store_api::storage::{FileId, FileRef, FileRefsManifest, RegionId};
use crate::test_util::{StorageType, execute_sql};
use crate::tests::gc::{distributed_with_gc, get_table_route};
#[tokio::test]
async fn test_cleanup_region_repartition_update_tombstone_remove() {
let _ = dotenv::dotenv();
let (test_context, _guard) = distributed_with_gc(&StorageType::File).await;
let instance = test_context.frontend();
let metasrv = test_context.metasrv();
let create_table_sql = r#"
CREATE TABLE test_cleanup_repartition (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING
)PARTITION ON COLUMNS (host) (
host < 'a',
host >= 'a' AND host < 'm',
host >= 'm'
) WITH (append_mode = 'true')"#;
execute_sql(&instance, create_table_sql).await;
let table = instance
.catalog_manager()
.table("greptime", "public", "test_cleanup_repartition", None)
.await
.unwrap()
.unwrap();
let table_id = table.table_info().table_id();
let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await;
let base_region = *regions.first().expect("table has at least one region");
let region_a = base_region;
let region_b = RegionId::new(table_id, base_region.region_number() + 1);
let region_c = RegionId::new(table_id, base_region.region_number() + 2);
let dst_a_initial = RegionId::new(table_id, base_region.region_number() + 10);
let dst_b_initial = RegionId::new(table_id, base_region.region_number() + 20);
let dst_c_initial = RegionId::new(table_id, base_region.region_number() + 30);
let dst_a_new = RegionId::new(table_id, base_region.region_number() + 40);
let repart_mgr = metasrv.table_metadata_manager().table_repart_manager();
let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap();
let mut initial_value = TableRepartValue::new();
initial_value.update_mappings(region_a, &[dst_a_initial]);
initial_value.update_mappings(region_b, &[dst_b_initial]);
initial_value.update_mappings(region_c, &[dst_c_initial]);
repart_mgr
.upsert_value(table_id, current, &initial_value)
.await
.unwrap();
let mut manifest = FileRefsManifest::default();
manifest
.cross_region_refs
.insert(region_a, [dst_a_new].into());
manifest.file_refs.insert(
region_b,
[FileRef::new(region_b, FileId::random(), None)]
.into_iter()
.collect(),
);
let regions = vec![region_a, region_b, region_c];
let mut procedure = BatchGcProcedure::new_update_repartition_for_test(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
regions,
manifest,
Duration::from_secs(5),
);
let procedure_ctx = new_test_procedure_context();
let status = procedure.execute(&procedure_ctx).await.unwrap();
assert!(matches!(status, Status::Done { .. }));
let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap();
assert_eq!(
repart_after.src_to_dst.get(&region_a),
Some(&BTreeSet::from([dst_a_new]))
);
assert_eq!(
repart_after.src_to_dst.get(&region_b),
Some(&BTreeSet::new())
);
assert!(!repart_after.src_to_dst.contains_key(&region_c));
}
#[tokio::test]
async fn test_cleanup_region_repartition_preserve_uninvolved_entries() {
let _ = dotenv::dotenv();
let (test_context, _guard) = distributed_with_gc(&StorageType::File).await;
let instance = test_context.frontend();
let metasrv = test_context.metasrv();
let create_table_sql = r#"
CREATE TABLE test_cleanup_repartition_preserve (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING
)PARTITION ON COLUMNS (host) (
host < 'a',
host >= 'a' AND host < 'm',
host >= 'm' AND host < 'x',
host >= 'x'
) WITH (append_mode = 'true')
"#;
execute_sql(&instance, create_table_sql).await;
let table = instance
.catalog_manager()
.table(
"greptime",
"public",
"test_cleanup_repartition_preserve",
None,
)
.await
.unwrap()
.unwrap();
let table_id = table.table_info().table_id();
let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await;
let base_region = *regions.first().expect("table has at least one region");
let region_a = base_region;
let region_b = RegionId::new(table_id, base_region.region_number() + 1);
let region_c = RegionId::new(table_id, base_region.region_number() + 2);
let region_d = RegionId::new(table_id, base_region.region_number() + 3);
let dst_a_initial = RegionId::new(table_id, base_region.region_number() + 10);
let dst_b_initial = RegionId::new(table_id, base_region.region_number() + 20);
let dst_c_initial = RegionId::new(table_id, base_region.region_number() + 30);
let dst_d_initial = RegionId::new(table_id, base_region.region_number() + 50);
let dst_a_new = RegionId::new(table_id, base_region.region_number() + 60);
let repart_mgr = metasrv.table_metadata_manager().table_repart_manager();
let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap();
let mut initial_value = TableRepartValue::new();
initial_value.update_mappings(region_a, &[dst_a_initial]);
initial_value.update_mappings(region_b, &[dst_b_initial]);
initial_value.update_mappings(region_c, &[dst_c_initial]);
initial_value.update_mappings(region_d, &[dst_d_initial]);
repart_mgr
.upsert_value(table_id, current, &initial_value)
.await
.unwrap();
let mut manifest = FileRefsManifest::default();
manifest
.cross_region_refs
.insert(region_a, [dst_a_new].into());
manifest.file_refs.insert(region_b, Default::default());
let regions = vec![region_a, region_b, region_c];
let mut procedure = BatchGcProcedure::new_update_repartition_for_test(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
regions,
manifest,
Duration::from_secs(5),
);
let procedure_ctx = new_test_procedure_context();
let status = procedure.execute(&procedure_ctx).await.unwrap();
assert!(matches!(status, Status::Done { .. }));
let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap();
assert_eq!(
repart_after.src_to_dst.get(&region_a),
Some(&BTreeSet::from([dst_a_new]))
);
assert_eq!(repart_after.src_to_dst.get(&region_b), None);
assert!(!repart_after.src_to_dst.contains_key(&region_c));
assert_eq!(
repart_after.src_to_dst.get(&region_d),
Some(&BTreeSet::from([dst_d_initial]))
);
}
#[tokio::test]
async fn test_cleanup_region_repartition_remove_when_tmp_refs_empty() {
let _ = dotenv::dotenv();
let (test_context, _guard) = distributed_with_gc(&StorageType::File).await;
let instance = test_context.frontend();
let metasrv = test_context.metasrv();
let create_table_sql = r#"
CREATE TABLE test_cleanup_repartition_empty_tmp_refs (
ts TIMESTAMP TIME INDEX,
val DOUBLE,
host STRING
)PARTITION ON COLUMNS (host) (
host < 'a',
host >= 'a' AND host < 'm',
host >= 'm'
) WITH (append_mode = 'true')
"#;
execute_sql(&instance, create_table_sql).await;
let table = instance
.catalog_manager()
.table(
"greptime",
"public",
"test_cleanup_repartition_empty_tmp_refs",
None,
)
.await
.unwrap()
.unwrap();
let table_id = table.table_info().table_id();
let (_routes, regions) = get_table_route(metasrv.table_metadata_manager(), table_id).await;
let base_region = *regions.first().expect("table has at least one region");
let region_a = base_region;
let region_b = RegionId::new(table_id, base_region.region_number() + 1);
let dst_a_initial = RegionId::new(table_id, base_region.region_number() + 10);
let dst_b_initial = RegionId::new(table_id, base_region.region_number() + 20);
let repart_mgr = metasrv.table_metadata_manager().table_repart_manager();
let current = repart_mgr.get_with_raw_bytes(table_id).await.unwrap();
let mut initial_value = TableRepartValue::new();
initial_value.update_mappings(region_a, &[dst_a_initial]);
initial_value.update_mappings(region_b, &[dst_b_initial]);
repart_mgr
.upsert_value(table_id, current, &initial_value)
.await
.unwrap();
let mut manifest = FileRefsManifest::default();
// Simulate empty tmp refs snapshot for region_a: entry exists but set is empty.
manifest.file_refs.insert(region_a, Default::default());
let regions = vec![region_a];
let mut procedure = BatchGcProcedure::new_update_repartition_for_test(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
regions,
manifest,
Duration::from_secs(5),
);
let procedure_ctx = new_test_procedure_context();
let status = procedure.execute(&procedure_ctx).await.unwrap();
assert!(matches!(status, Status::Done { .. }));
let repart_after = repart_mgr.get(table_id).await.unwrap().unwrap();
assert!(!repart_after.src_to_dst.contains_key(&region_a));
assert_eq!(
repart_after.src_to_dst.get(&region_b),
Some(&BTreeSet::from([dst_b_initial]))
);
}