diff --git a/src/common/meta/src/key/table_repart.rs b/src/common/meta/src/key/table_repart.rs index 0c067addde..ce599865c1 100644 --- a/src/common/meta/src/key/table_repart.rs +++ b/src/common/meta/src/key/table_repart.rs @@ -305,18 +305,38 @@ impl TableRepartManager { /// Updates mappings from src region to dst regions. /// Should be called once repartition is done. - pub async fn update_mappings(&self, src: RegionId, dst: &[RegionId]) -> Result<()> { - let table_id = src.table_id(); - + pub async fn update_mappings( + &self, + table_id: TableId, + region_mapping: &HashMap>, + ) -> Result<()> { // Get current table repart with raw bytes for CAS operation - let current_table_repart = self - .get_with_raw_bytes(table_id) - .await? - .context(crate::error::TableRepartNotFoundSnafu { table_id })?; + let Some(current_table_repart) = self.get_with_raw_bytes(table_id).await? else { + let mut new_table_repart_value = TableRepartValue::new(); + for (src, dsts) in region_mapping.iter() { + new_table_repart_value.update_mappings(*src, dsts); + } + + let (txn, _) = self.build_create_txn(table_id, &new_table_repart_value)?; + let result = self.kv_backend.txn(txn).await?; + ensure!( + result.succeeded, + crate::error::MetadataCorruptionSnafu { + err_msg: format!( + "Failed to create table repart for table {}: CAS operation failed", + table_id + ), + } + ); + + return Ok(()); + }; // Clone the current repart value and update mappings let mut new_table_repart_value = current_table_repart.inner.clone(); - new_table_repart_value.update_mappings(src, dst); + for (src, dsts) in region_mapping.iter() { + new_table_repart_value.update_mappings(*src, dsts); + } // Execute atomic update let (txn, _) = @@ -339,9 +359,11 @@ impl TableRepartManager { /// Removes mappings from src region to dst regions. /// Should be called once files from src region are cleaned up in dst regions. - pub async fn remove_mappings(&self, src: RegionId, dsts: &[RegionId]) -> Result<()> { - let table_id = src.table_id(); - + pub async fn remove_mappings( + &self, + table_id: TableId, + region_mapping: &HashMap>, + ) -> Result<()> { // Get current table repart with raw bytes for CAS operation let current_table_repart = self .get_with_raw_bytes(table_id) @@ -350,7 +372,9 @@ impl TableRepartManager { // Clone the current repart value and remove mappings let mut new_table_repart_value = current_table_repart.inner.clone(); - new_table_repart_value.remove_mappings(src, dsts); + for (src, dsts) in region_mapping.iter() { + new_table_repart_value.remove_mappings(*src, dsts); + } // Execute atomic update let (txn, _) = @@ -743,7 +767,11 @@ mod tests { // Update mappings let src = RegionId::new(1024, 1); let dst = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)]; - manager.update_mappings(src, &dst).await.unwrap(); + let region_mapping = HashMap::from([(src, dst)]); + manager + .update_mappings(1024, ®ion_mapping) + .await + .unwrap(); // Verify update let retrieved = manager.get(1024).await.unwrap().unwrap(); @@ -776,7 +804,11 @@ mod tests { // Remove some mappings let to_remove = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)]; - manager.remove_mappings(src, &to_remove).await.unwrap(); + let region_mapping = HashMap::from([(src, to_remove)]); + manager + .remove_mappings(1024, ®ion_mapping) + .await + .unwrap(); // Verify removal let retrieved = manager.get(1024).await.unwrap().unwrap(); @@ -831,8 +863,9 @@ mod tests { let src = RegionId::new(1024, 1); let dst = vec![RegionId::new(1024, 2)]; - // Try to update mappings on non-existent table - let result = manager.update_mappings(src, &dst).await; + // Try to remove mappings on non-existent table + let region_mapping = HashMap::from([(src, dst.clone())]); + let result = manager.remove_mappings(1024, ®ion_mapping).await; assert!(result.is_err()); let err_msg = result.unwrap_err().to_string(); assert!( @@ -840,14 +873,12 @@ mod tests { "{err_msg}" ); - // Try to remove mappings on non-existent table - let result = manager.remove_mappings(src, &dst).await; - assert!(result.is_err()); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("Failed to find table repartition metadata for table id 1024"), - "{err_msg}" - ); + // Try to update mappings on non-existent table + let region_mapping = HashMap::from([(src, dst)]); + manager + .update_mappings(1024, ®ion_mapping) + .await + .unwrap(); } #[tokio::test] diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index 4c4269edb7..c9d346e5f8 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -31,7 +31,7 @@ mod util; pub use options::GcSchedulerOptions; pub use procedure::BatchGcProcedure; -pub(crate) use scheduler::{GcScheduler, GcTickerRef}; +pub use scheduler::{Event, GcScheduler, GcTickerRef}; /// Mapping from region ID to its associated peers (leader and followers). pub type Region2Peers = HashMap)>; diff --git a/src/meta-srv/src/gc/procedure.rs b/src/meta-srv/src/gc/procedure.rs index 0165d47859..14b66f9e60 100644 --- a/src/meta-srv/src/gc/procedure.rs +++ b/src/meta-srv/src/gc/procedure.rs @@ -274,14 +274,40 @@ impl BatchGcProcedure { /// Clean up region repartition info in kvbackend after GC /// according to cross reference in `FileRefsManifest`. async fn cleanup_region_repartition(&self) -> Result<()> { - for (src_region, dst_regions) in self.data.file_refs.cross_region_refs.iter() { - // TODO(discord9): batch update + let mut table_grouped: HashMap>> = + HashMap::new(); + for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs { + table_grouped + .entry(src_region.table_id()) + .or_default() + .entry(*src_region) + .or_default() + .extend(dst_regions.iter().copied()); + } + // make sure for files without cross-region refs but with tmp refs, we DO NOT clean up repartition key entry + // so that dropped regions can still keep their region ids here + for src_region in self.data.file_refs.file_refs.keys() { + table_grouped + .entry(src_region.table_id()) + .or_default() + .entry(*src_region) + .or_default(); + } + for (table_id, region_mappings) in table_grouped { + let region_mapping = region_mappings + .iter() + .map(|(src_region, dst_regions)| { + (*src_region, dst_regions.iter().cloned().collect_vec()) + }) + .collect::>>(); + self.table_metadata_manager .table_repart_manager() - .update_mappings(*src_region, &dst_regions.iter().cloned().collect_vec()) + .update_mappings(table_id, ®ion_mapping) .await .context(KvBackendSnafu)?; } + Ok(()) } diff --git a/src/meta-srv/src/gc/scheduler.rs b/src/meta-srv/src/gc/scheduler.rs index 4fd46755b7..394af9006a 100644 --- a/src/meta-srv/src/gc/scheduler.rs +++ b/src/meta-srv/src/gc/scheduler.rs @@ -21,8 +21,8 @@ use common_meta::key::TableMetadataManagerRef; use common_procedure::ProcedureManagerRef; use common_telemetry::{error, info}; use store_api::storage::GcReport; -use tokio::sync::Mutex; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{Mutex, oneshot}; use crate::cluster::MetaPeerClientRef; use crate::define_ticker; @@ -43,12 +43,15 @@ pub struct GcJobReport { /// /// Variants: /// - `Tick`: This event is used to trigger gc periodically. -pub(crate) enum Event { +/// - `Manually`: This event is used to trigger a manual gc run and provides a channel +/// to send back the [`GcJobReport`] for that run. +pub enum Event { Tick, + Manually(oneshot::Sender), } #[allow(unused)] -pub(crate) type GcTickerRef = Arc; +pub type GcTickerRef = Arc; define_ticker!( /// [GcTicker] is used to trigger gc periodically. @@ -118,9 +121,21 @@ impl GcScheduler { Event::Tick => { info!("Received gc tick"); if let Err(e) = self.handle_tick().await { - error!("Failed to handle gc tick: {}", e); + error!(e; "Failed to handle gc tick"); } } + Event::Manually(sender) => { + info!("Received manually gc request"); + match self.handle_tick().await { + Ok(report) => { + // ignore error + let _ = sender.send(report); + } + Err(e) => { + error!(e; "Failed to handle gc tick"); + } + }; + } } } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 6b0968f224..165efd0555 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -947,6 +947,10 @@ impl Metasrv { self.started.clone() } + pub fn gc_ticker(&self) -> Option { + self.gc_ticker.as_ref().cloned() + } + #[inline] pub fn new_ctx(&self) -> Context { let server_addr = self.options().grpc.server_addr.clone(); diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index 08cb32e11a..722487f41b 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -131,7 +131,8 @@ pub async fn mock( }); let config = ChannelConfig::new() - .timeout(Some(Duration::from_secs(10))) + // Use an long timeout to prevent test failures due to slow operations (e.g., when testing with S3). + .timeout(Some(Duration::from_secs(60))) .connect_timeout(Duration::from_secs(10)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config, None); diff --git a/src/meta-srv/src/procedure/repartition/group.rs b/src/meta-srv/src/procedure/repartition/group.rs index 68ce46a92c..e4ccf629ab 100644 --- a/src/meta-srv/src/procedure/repartition/group.rs +++ b/src/meta-srv/src/procedure/repartition/group.rs @@ -489,6 +489,25 @@ impl Context { .context(error::TableMetadataManagerSnafu) } + /// Updates the table repart mapping. + pub async fn update_table_repart_mapping(&self) -> Result<()> { + info!( + "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}", + self.persistent_ctx.table_id, + self.persistent_ctx.group_id, + self.persistent_ctx.region_mapping + ); + + self.table_metadata_manager + .table_repart_manager() + .update_mappings( + self.persistent_ctx.table_id, + &self.persistent_ctx.region_mapping, + ) + .await + .context(error::TableMetadataManagerSnafu) + } + /// Returns the next operation timeout. /// /// If the next operation timeout is not set, it will return `None`. diff --git a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs index 4e82d4874b..4d35180a87 100644 --- a/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs +++ b/src/meta-srv/src/procedure/repartition/group/update_metadata/exit_staging_region.rs @@ -99,6 +99,8 @@ impl UpdateMetadata { }); }; + ctx.update_table_repart_mapping().await?; + Ok(()) } } diff --git a/src/meta-srv/src/region/flush_trigger.rs b/src/meta-srv/src/region/flush_trigger.rs index 3aee4488f6..31e1d44394 100644 --- a/src/meta-srv/src/region/flush_trigger.rs +++ b/src/meta-srv/src/region/flush_trigger.rs @@ -47,7 +47,7 @@ const RECENT_DURATION: Duration = Duration::from_secs(300); /// /// Variants: /// - `Tick`: This event is used to trigger region flush trigger periodically. -pub(crate) enum Event { +pub enum Event { Tick, } diff --git a/src/meta-srv/src/utils.rs b/src/meta-srv/src/utils.rs index 70b8e43283..c3b1e86da4 100644 --- a/src/meta-srv/src/utils.rs +++ b/src/meta-srv/src/utils.rs @@ -28,10 +28,10 @@ macro_rules! define_ticker { event_value = $event_val:expr ) => { $(#[$meta])* - pub(crate) struct $name { - pub(crate) tick_handle: std::sync::Mutex>>, - pub(crate) tick_interval: std::time::Duration, - pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>, + pub struct $name { + pub tick_handle: std::sync::Mutex>>, + pub tick_interval: std::time::Duration, + pub sender: tokio::sync::mpsc::Sender<$event_ty>, } #[async_trait::async_trait] diff --git a/src/mito2/src/gc.rs b/src/mito2/src/gc.rs index 7e57d1782c..dc93cf7d83 100644 --- a/src/mito2/src/gc.rs +++ b/src/mito2/src/gc.rs @@ -60,28 +60,18 @@ fn should_delete_file( is_in_tmp_ref: bool, is_linger: bool, is_eligible_for_delete: bool, - entry: &Entry, - unknown_file_may_linger_until: chrono::DateTime, + is_region_dropped: bool, + _entry: &Entry, + _unknown_file_may_linger_until: chrono::DateTime, ) -> bool { let is_known = is_linger || is_eligible_for_delete; - let is_unknown_linger_time_exceeded = || { - // if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while - // using it's last modified time - // notice unknown files use a different lingering time - entry - .metadata() - .last_modified() - .map(|t| t < unknown_file_may_linger_until) - .unwrap_or(false) - }; - !is_in_manifest && !is_in_tmp_ref && if is_known { is_eligible_for_delete } else { - is_unknown_linger_time_exceeded() + !is_in_tmp_ref && is_region_dropped } } @@ -433,6 +423,8 @@ impl LocalGcWorker { Default::default() }; + let is_region_dropped = region.is_none(); + let in_tmp_ref = tmp_ref_files .iter() .map(|file_ref| (file_ref.file_id, file_ref.index_version)) @@ -441,6 +433,7 @@ impl LocalGcWorker { let deletable_files = self .list_to_be_deleted_files( region_id, + is_region_dropped, &in_manifest, &in_tmp_ref, recently_removed_files, @@ -679,8 +672,10 @@ impl LocalGcWorker { Ok(all_entries) } + #[allow(clippy::too_many_arguments)] fn filter_deletable_files( &self, + is_region_dropped: bool, entries: Vec, in_manifest: &HashMap>, in_tmp_ref: &HashSet<(FileId, Option)>, @@ -741,6 +736,7 @@ impl LocalGcWorker { is_in_tmp_ref, is_linger, is_eligible_for_delete, + is_region_dropped, &entry, unknown_file_may_linger_until, ) @@ -769,6 +765,7 @@ impl LocalGcWorker { is_in_tmp_ref, is_linger, is_eligible_for_delete, + is_region_dropped, &entry, unknown_file_may_linger_until, ) @@ -804,6 +801,7 @@ impl LocalGcWorker { pub async fn list_to_be_deleted_files( &self, region_id: RegionId, + is_region_dropped: bool, in_manifest: &HashMap>, in_tmp_ref: &HashSet<(FileId, Option)>, recently_removed_files: BTreeMap>, @@ -888,6 +886,7 @@ impl LocalGcWorker { // Step 3: Filter files to determine which ones can be deleted let all_unused_files_ready_for_delete = self.filter_deletable_files( + is_region_dropped, all_entries, in_manifest, in_tmp_ref, diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 60abdbd29b..0cb0479e1d 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -328,6 +328,7 @@ impl RegionOpener { )), file_purger: create_file_purger( config.gc.enable, + self.path_type, self.purge_scheduler, access_layer, self.cache_manager, @@ -465,6 +466,7 @@ impl RegionOpener { )); let file_purger = create_file_purger( config.gc.enable, + self.path_type, self.purge_scheduler.clone(), access_layer.clone(), self.cache_manager.clone(), diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 7075acfce1..7c1598787e 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -16,6 +16,7 @@ use std::fmt; use std::sync::Arc; use common_telemetry::error; +use store_api::region_request::PathType; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; @@ -67,8 +68,23 @@ impl fmt::Debug for LocalFilePurger { } } -pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool { - sst_layer.object_store().info().scheme() == object_store::Scheme::Fs +#[cfg(not(debug_assertions))] +/// Whether to enable GC for the file purger. +pub fn should_enable_gc( + global_gc_enabled: bool, + object_store_scheme: object_store::Scheme, +) -> bool { + global_gc_enabled && object_store_scheme != object_store::Scheme::Fs +} + +#[cfg(debug_assertions)] +/// For debug build, we may use Fs as the object store scheme, +/// so we need to enable GC for local file system. +pub fn should_enable_gc( + global_gc_enabled: bool, + _object_store_scheme: object_store::Scheme, +) -> bool { + global_gc_enabled } /// Creates a file purger based on the storage type of the access layer. @@ -82,12 +98,18 @@ pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool { /// pub fn create_file_purger( gc_enabled: bool, + path_type: PathType, scheduler: SchedulerRef, sst_layer: AccessLayerRef, cache_manager: Option, file_ref_manager: FileReferenceManagerRef, ) -> FilePurgerRef { - if gc_enabled && !is_local_fs(&sst_layer) { + // Only enable GC for: + // - object store based storage + // - data or bare path type (metadata region doesn't need to be GCed) + if should_enable_gc(gc_enabled, sst_layer.object_store().info().scheme()) + && matches!(path_type, PathType::Data | PathType::Bare) + { Arc::new(ObjectStoreFilePurger { file_ref_manager }) } else { Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager)) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 13812c10f8..29e3cccc4f 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -1432,6 +1432,7 @@ impl StatementExecutor { .chain(into_partition_exprs.clone().into_iter()) .collect(); let new_partition_exprs_len = new_partition_exprs.len(); + let from_partition_exprs_len = from_partition_exprs.len(); // Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker. let _ = MultiDimPartitionRule::try_new( @@ -1445,14 +1446,6 @@ impl StatementExecutor { ) .context(InvalidPartitionSnafu)?; - info!( - "Submitting repartition task for table {} (table_id={}), from {} to {} partitions", - table_ref, - table_id, - from_partition_exprs.len(), - new_partition_exprs_len - ); - let ddl_options = parse_ddl_options(&request.options)?; let serialize_exprs = |exprs: Vec| -> Result> { let mut json_exprs = Vec::with_capacity(exprs.len()); @@ -1478,6 +1471,16 @@ impl StatementExecutor { req.wait = ddl_options.wait; req.timeout = ddl_options.timeout; + info!( + "Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}", + table_ref, + table_id, + from_partition_exprs_len, + new_partition_exprs_len, + ddl_options.timeout, + ddl_options.wait + ); + let response = self .procedure_executor .submit_ddl_task(&ExecutorContext::default(), req) diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 7903699453..be9b548f3f 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::env; use std::net::TcpListener; use std::ops::RangeInclusive; @@ -51,6 +51,7 @@ use frontend::frontend::{Frontend, FrontendOptions}; use frontend::instance::Instance as FeInstance; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; +use futures::TryStreamExt; use hyper_util::rt::TokioIo; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; @@ -90,6 +91,53 @@ impl GreptimeDbCluster { pub fn fe_instance(&self) -> &Arc { &self.frontend.instance } + + /// List all SST files from all datanodes. + pub async fn list_sst_files_from_all_datanodes(&self) -> BTreeSet { + let mut sst_files = BTreeSet::new(); + + for datanode in self.datanode_instances.values() { + let region_server = datanode.region_server(); + let mito = region_server.mito_engine().unwrap(); + let all_files = mito + .all_ssts_from_storage() + .try_collect::>() + .await + .unwrap() + .into_iter() + .map(|e| e.file_path) + .collect::>(); + sst_files.extend(all_files); + } + + sst_files + } + + /// List all SST files from the manifests of all datanodes. + pub async fn list_sst_files_from_manifests(&self) -> BTreeSet { + let mut sst_files = BTreeSet::new(); + + for datanode in self.datanode_instances.values() { + let region_server = datanode.region_server(); + let mito = region_server.mito_engine().unwrap(); + let all_files = mito + .all_ssts_from_manifest() + .await + .into_iter() + .flat_map(|e| { + if e.index_file_path.is_some() { + vec![e.file_path, e.index_file_path.unwrap()] + } else { + vec![e.file_path] + } + }) + .collect::>(); + + sst_files.extend(all_files); + } + + sst_files + } } pub struct GreptimeDbClusterBuilder { diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs index 585205ed37..317b889cc5 100644 --- a/tests-integration/src/tests/gc.rs +++ b/tests-integration/src/tests/gc.rs @@ -13,12 +13,15 @@ // limitations under the License. use std::collections::HashSet; +use std::sync::Arc; use std::time::Duration; use common_meta::key::TableMetadataManagerRef; use common_procedure::ProcedureWithId; use common_telemetry::info; use common_test_util::recordbatch::check_output_stream; +use common_test_util::temp_dir::create_temp_dir; +use common_wal::config::DatanodeWalConfig; use futures::TryStreamExt as _; use itertools::Itertools; use meta_srv::gc::{BatchGcProcedure, GcSchedulerOptions, Region2Peers}; @@ -84,8 +87,13 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG let test_name = uuid::Uuid::new_v4().to_string(); let (store_config, guard) = get_test_store_config(store_type); - let builder = GreptimeDbClusterBuilder::new(&test_name) - .await + let mut builder = GreptimeDbClusterBuilder::new(&test_name).await; + if matches!(store_type, StorageType::File) { + let home_dir = create_temp_dir("test_gc_data_home"); + builder = builder.with_shared_home_dir(Arc::new(home_dir)); + } + + let builder = builder .with_metasrv_gc_config(GcSchedulerOptions { enable: true, ..Default::default() @@ -96,6 +104,7 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG lingering_time: Some(Duration::ZERO), ..Default::default() }) + .with_datanode_wal_config(DatanodeWalConfig::Noop) .with_store_config(store_config); ( TestContext::new(MockInstanceBuilder::Distributed(builder)).await, @@ -110,9 +119,6 @@ async fn test_gc_basic_different_store() { let store_type = StorageType::build_storage_types_based_on_env(); info!("store type: {:?}", store_type); for store in store_type { - if store == StorageType::File { - continue; // no point in test gc in fs storage - } info!("Running GC test with storage type: {}", store); test_gc_basic(&store).await; } diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 1069414917..4bb34690d7 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -21,6 +21,8 @@ mod jsonbench; mod sql; #[macro_use] mod region_migration; +#[macro_use] +mod repartition; grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); @@ -30,4 +32,7 @@ sql_tests!(File); region_migration_tests!(File); +repartition_tests!(File); + +repartition_tests!(S3, S3WithCache, Oss, Azblob, Gcs); // TODO(niebayes): add integration tests for remote wal. diff --git a/tests-integration/tests/repartition.rs b/tests-integration/tests/repartition.rs new file mode 100644 index 0000000000..c8a17f6bc4 --- /dev/null +++ b/tests-integration/tests/repartition.rs @@ -0,0 +1,807 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_meta::key::table_name::TableNameKey; +use common_procedure::{ProcedureWithId, watcher}; +use common_query::Output; +use common_telemetry::info; +use common_test_util::recordbatch::check_output_stream; +use common_test_util::temp_dir::create_temp_dir; +use common_wal::config::DatanodeWalConfig; +use frontend::error::Result as FrontendResult; +use frontend::instance::Instance; +use meta_srv::gc::{self, BatchGcProcedure, GcSchedulerOptions, GcTickerRef}; +use meta_srv::metasrv::Metasrv; +use mito2::gc::GcConfig; +use servers::query_handler::sql::SqlQueryHandler; +use session::context::{QueryContext, QueryContextRef}; +use tests_integration::cluster::GreptimeDbClusterBuilder; +use tests_integration::test_util::{StorageType, get_test_store_config}; +use tokio::sync::oneshot; + +#[macro_export] +macro_rules! repartition_tests { + ($($service:ident),*) => { + $( + paste::item! { + mod [] { + #[tokio::test(flavor = "multi_thread")] + async fn [< test_repartition_mito >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + common_telemetry::init_default_ut_logging(); + $crate::repartition::test_repartition_mito(store_type).await + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn [< test_repartition_metric >]() { + let store_type = tests_integration::test_util::StorageType::$service; + if store_type.test_on() { + common_telemetry::init_default_ut_logging(); + $crate::repartition::test_repartition_metric(store_type).await + } + } + } + } + )* + }; +} + +async fn trigger_table_gc(metasrv: &Arc, table_name: &str) { + info!("triggering table gc for table: {}", table_name); + let table_metadata_manager = metasrv.table_metadata_manager(); + let table_id = table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + table_name, + )) + .await + .unwrap() + .unwrap() + .table_id(); + let (_, table_route_value) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .unwrap(); + let region_ids = table_route_value + .region_routes + .iter() + .map(|r| r.region.id) + .collect::>(); + let procedure = BatchGcProcedure::new( + metasrv.mailbox().clone(), + metasrv.table_metadata_manager().clone(), + metasrv.options().grpc.server_addr.clone(), + region_ids.clone(), + false, // full_file_listing + Duration::from_secs(10), // timeout + Default::default(), + ); + + // Submit the procedure to the procedure manager + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let mut watcher = metasrv + .procedure_manager() + .submit(procedure_with_id) + .await + .unwrap(); + watcher::wait(&mut watcher).await.unwrap(); +} + +async fn trigger_full_gc(ticker: &GcTickerRef) { + info!("triggering full gc"); + let (tx, rx) = oneshot::channel(); + ticker.sender.send(gc::Event::Manually(tx)).await.unwrap(); + let _ = rx.await.unwrap(); +} + +pub async fn test_repartition_mito(store_type: StorageType) { + let cluster_name = "test_repartition_mito"; + let (store_config, _guard) = get_test_store_config(&store_type); + let datanodes = 3u64; + let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await; + if matches!(store_type, StorageType::File) { + let home_dir = create_temp_dir("test_repartition_mito_data_home"); + builder = builder.with_shared_home_dir(Arc::new(home_dir)); + } + + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_datanode_wal_config(DatanodeWalConfig::Noop) + .with_metasrv_gc_config(GcSchedulerOptions { + enable: true, + gc_cooldown_period: Duration::from_nanos(1), + ..Default::default() + }) + .with_datanode_gc_config(GcConfig { + enable: true, + lingering_time: Some(Duration::from_secs(0)), + unknown_file_lingering_time: Duration::from_secs(0), + ..Default::default() + }) + .build(true) + .await; + let metasrv = &cluster.metasrv; + let ticker = metasrv.gc_ticker().unwrap(); + + let query_ctx = QueryContext::arc(); + let instance = cluster.fe_instance(); + + // 1. Setup: Create a table with partitions + let sql = r#" + CREATE TABLE `repartition_mito_table`( + `id` INT, + `city` STRING, + `ts` TIMESTAMP TIME INDEX, + PRIMARY KEY(`id`, `city`) + ) PARTITION ON COLUMNS (`id`) ( + `id` < 10, + `id` >= 10 AND `id` < 20, + `id` >= 20 + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + INSERT INTO `repartition_mito_table` VALUES + (1, 'New York', '2022-01-01 00:00:00'), + (5, 'London', '2022-01-01 00:00:00'), + (10, 'Paris', '2022-01-01 00:00:00'), + (15, 'Tokyo', '2022-01-01 00:00:00'), + (20, 'Beijing', '2022-01-01 00:00:00'), + (25, 'Shanghai', '2022-01-01 00:00:00'); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + + let expected = "\ ++----+----------+---------------------+ +| id | city | ts | ++----+----------+---------------------+ +| 1 | New York | 2022-01-01T00:00:00 | +| 5 | London | 2022-01-01T00:00:00 | +| 10 | Paris | 2022-01-01T00:00:00 | +| 15 | Tokyo | 2022-01-01T00:00:00 | +| 20 | Beijing | 2022-01-01T00:00:00 | +| 25 | Shanghai | 2022-01-01T00:00:00 | ++----+----------+---------------------+"; + check_output_stream(result.data, expected).await; + + // 2. Split Partition + let sql = r#" + ALTER TABLE `repartition_mito_table` SPLIT PARTITION ( + `id` < 10 + ) INTO ( + `id` < 5, + `id` >= 5 AND `id` < 10 + ); + "#; + let _result = run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + + let expected = "\ ++----+----------+---------------------+ +| id | city | ts | ++----+----------+---------------------+ +| 1 | New York | 2022-01-01T00:00:00 | +| 5 | London | 2022-01-01T00:00:00 | +| 10 | Paris | 2022-01-01T00:00:00 | +| 15 | Tokyo | 2022-01-01T00:00:00 | +| 20 | Beijing | 2022-01-01T00:00:00 | +| 25 | Shanghai | 2022-01-01T00:00:00 | ++----+----------+---------------------+"; + check_output_stream(result.data, expected).await; + + trigger_table_gc(metasrv, "repartition_mito_table").await; + // Should be ok before compact. + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + // It should be ok, if we try to compact the table after split partition. + let compact_sql = "ADMIN COMPACT_TABLE('repartition_mito_table', 'swcs', '3600')"; + let _result = run_sql(instance, compact_sql, query_ctx.clone()) + .await + .unwrap(); + + // Should be no change after compact. + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + + // Trigger GC to clean up the compacted files. + trigger_table_gc(metasrv, "repartition_mito_table").await; + // Should be no change after GC. + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + let result = run_sql( + instance, + "SHOW CREATE TABLE `repartition_mito_table`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_create_table_after_split = r#"+------------------------+-------------------------------------------------------+ +| Table | Create Table | ++------------------------+-------------------------------------------------------+ +| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( | +| | "id" INT NULL, | +| | "city" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("id", "city") | +| | ) | +| | PARTITION ON COLUMNS ("id") ( | +| | id < 5, | +| | id >= 10 AND id < 20, | +| | id >= 20, | +| | id >= 5 AND id < 10 | +| | ) | +| | ENGINE=mito | +| | | ++------------------------+-------------------------------------------------------+"#; + check_output_stream(result.data, expected_create_table_after_split).await; + + let sql = + r#"INSERT INTO `repartition_mito_table` VALUES (2, 'Split1', '2022-01-02 00:00:00');"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = + r#"INSERT INTO `repartition_mito_table` VALUES (7, 'Split2', '2022-01-02 00:00:00');"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` WHERE `id` IN (2, 7) ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_split_inserts = "\ ++----+--------+---------------------+ +| id | city | ts | ++----+--------+---------------------+ +| 2 | Split1 | 2022-01-02T00:00:00 | +| 7 | Split2 | 2022-01-02T00:00:00 | ++----+--------+---------------------+"; + check_output_stream(result.data, expected_split_inserts).await; + + // 3. Merge Partition + let sql = r#" + ALTER TABLE `repartition_mito_table` MERGE PARTITION ( + `id` >= 10 AND `id` < 20, + `id` >= 20 + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + + let expected_all = "\ ++----+----------+---------------------+ +| id | city | ts | ++----+----------+---------------------+ +| 1 | New York | 2022-01-01T00:00:00 | +| 2 | Split1 | 2022-01-02T00:00:00 | +| 5 | London | 2022-01-01T00:00:00 | +| 7 | Split2 | 2022-01-02T00:00:00 | +| 10 | Paris | 2022-01-01T00:00:00 | +| 15 | Tokyo | 2022-01-01T00:00:00 | +| 20 | Beijing | 2022-01-01T00:00:00 | +| 25 | Shanghai | 2022-01-01T00:00:00 | ++----+----------+---------------------+"; + check_output_stream(result.data, expected_all).await; + + trigger_table_gc(metasrv, "repartition_mito_table").await; + // Trigger GC to clean up the compacted files. + trigger_full_gc(&ticker).await; + + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected_all).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + // It should be ok, if we try to compact the table after merge partition. + let compact_sql = "ADMIN COMPACT_TABLE('repartition_mito_table', 'swcs', '3600')"; + let _result = run_sql(instance, compact_sql, query_ctx.clone()) + .await + .unwrap(); + + // Should be no change after compact. + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected_all).await; + + trigger_table_gc(metasrv, "repartition_mito_table").await; + trigger_full_gc(&ticker).await; + + // Should be no change after GC. + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected_all).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + let result = run_sql( + instance, + "SHOW CREATE TABLE `repartition_mito_table`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_create_table_after_merge = r#"+------------------------+-------------------------------------------------------+ +| Table | Create Table | ++------------------------+-------------------------------------------------------+ +| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( | +| | "id" INT NULL, | +| | "city" STRING NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("id", "city") | +| | ) | +| | PARTITION ON COLUMNS ("id") ( | +| | id < 5, | +| | id >= 10 AND id < 20 OR id >= 20, | +| | id >= 5 AND id < 10 | +| | ) | +| | ENGINE=mito | +| | | ++------------------------+-------------------------------------------------------+"#; + check_output_stream(result.data, expected_create_table_after_merge).await; + + let sql = + r#"INSERT INTO `repartition_mito_table` VALUES (12, 'Merge1', '2022-01-03 00:00:00');"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = + r#"INSERT INTO `repartition_mito_table` VALUES (30, 'Merge2', '2022-01-03 00:00:00');"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repartition_mito_table` WHERE `id` IN (12, 30) ORDER BY `id`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_merge_inserts = "\ ++----+--------+---------------------+ +| id | city | ts | ++----+--------+---------------------+ +| 12 | Merge1 | 2022-01-03T00:00:00 | +| 30 | Merge2 | 2022-01-03T00:00:00 | ++----+--------+---------------------+"; + check_output_stream(result.data, expected_merge_inserts).await; + + run_sql( + instance, + "DROP TABLE `repartition_mito_table`", + query_ctx.clone(), + ) + .await + .unwrap(); +} + +pub async fn test_repartition_metric(store_type: StorageType) { + let cluster_name = "test_repartition_metric"; + let (store_config, _guard) = get_test_store_config(&store_type); + let datanodes = 3u64; + let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await; + if matches!(store_type, StorageType::File) { + let home_dir = create_temp_dir("test_repartition_metric_data_home"); + builder = builder.with_shared_home_dir(Arc::new(home_dir)); + } + let cluster = builder + .with_datanodes(datanodes as u32) + .with_store_config(store_config) + .with_datanode_wal_config(DatanodeWalConfig::Noop) + .with_metasrv_gc_config(GcSchedulerOptions { + enable: true, + gc_cooldown_period: Duration::from_nanos(1), + ..Default::default() + }) + .with_datanode_gc_config(GcConfig { + enable: true, + lingering_time: Some(Duration::from_secs(0)), + unknown_file_lingering_time: Duration::from_secs(0), + ..Default::default() + }) + .build(true) + .await; + let metasrv = &cluster.metasrv; + let ticker = metasrv.gc_ticker().unwrap(); + + let query_ctx = QueryContext::arc(); + let instance = cluster.fe_instance(); + + let sql = r#" + CREATE TABLE `repart_phy_metric`( + `ts` TIMESTAMP TIME INDEX, + `val` DOUBLE, + `host` STRING PRIMARY KEY + ) PARTITION ON COLUMNS (`host`) ( + `host` < 'm', + `host` >= 'm' + ) ENGINE = metric WITH ("physical_metric_table" = ""); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + CREATE TABLE `repart_log_metric`( + `ts` TIMESTAMP TIME INDEX, + `val` DOUBLE, + `host` STRING PRIMARY KEY + ) ENGINE = metric WITH ("on_physical_table" = "repart_phy_metric"); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let sql = r#" + INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES + ('a_host', '2022-01-01 00:00:00', 1), + ('z_host', '2022-01-01 00:00:00', 2); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| a_host | 2022-01-01T00:00:00 | 1.0 | +| z_host | 2022-01-01T00:00:00 | 2.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + // Split physical table partition + let sql = r#" + ALTER TABLE `repart_phy_metric` SPLIT PARTITION ( + `host` < 'm' + ) INTO ( + `host` < 'g', + `host` >= 'g' AND `host` < 'm' + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SHOW CREATE TABLE `repart_phy_metric`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_create_table_after_split = r#"+-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" DOUBLE NULL, | +| | "host" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | PARTITION ON COLUMNS ("host") ( | +| | host < 'g', | +| | host >= 'm', | +| | host >= 'g' AND host < 'm' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | physical_metric_table = '' | +| | ) | ++-------------------+--------------------------------------------------+"#; + check_output_stream(result.data, expected_create_table_after_split).await; + + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| a_host | 2022-01-01T00:00:00 | 1.0 | +| z_host | 2022-01-01T00:00:00 | 2.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + trigger_table_gc(metasrv, "repart_phy_metric").await; + // Should be ok before compact. + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + // It should be ok, if we try to compact the table after split partition. + let compact_sql = "ADMIN COMPACT_TABLE('repart_phy_metric', 'swcs', '3600')"; + let _result = run_sql(instance, compact_sql, query_ctx.clone()) + .await + .unwrap(); + + // Should be no change after compact. + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + + // Trigger GC to clean up the compacted files. + trigger_table_gc(metasrv, "repart_phy_metric").await; + // Should be no change after GC. + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('b_host', '2022-01-02 00:00:00', 3.0);"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('h_host', '2022-01-02 00:00:00', 4.0);"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` WHERE `host` IN ('b_host', 'h_host') ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| b_host | 2022-01-02T00:00:00 | 3.0 | +| h_host | 2022-01-02T00:00:00 | 4.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + let sql = r#" + ALTER TABLE `repart_phy_metric` MERGE PARTITION ( + `host` < 'g', + `host` >= 'g' AND `host` < 'm' + ); + "#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SHOW CREATE TABLE `repart_phy_metric`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected_create_table_after_merge = r#"+-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | "val" DOUBLE NULL, | +| | "host" STRING NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("host") | +| | ) | +| | PARTITION ON COLUMNS ("host") ( | +| | host < 'g' OR host >= 'g' AND host < 'm', | +| | host >= 'm' | +| | ) | +| | ENGINE=metric | +| | WITH( | +| | physical_metric_table = '' | +| | ) | ++-------------------+--------------------------------------------------+"#; + check_output_stream(result.data, expected_create_table_after_merge).await; + + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| a_host | 2022-01-01T00:00:00 | 1.0 | +| b_host | 2022-01-02T00:00:00 | 3.0 | +| h_host | 2022-01-02T00:00:00 | 4.0 | +| z_host | 2022-01-01T00:00:00 | 2.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + trigger_table_gc(metasrv, "repart_phy_metric").await; + trigger_full_gc(&ticker).await; + // Should be no change after GC. + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + // It should be ok, if we try to compact the table after merge partition. + let compact_sql = "ADMIN COMPACT_TABLE('repart_phy_metric', 'swcs', '3600')"; + let _result = run_sql(instance, compact_sql, query_ctx.clone()) + .await + .unwrap(); + + // Should be no change after compact. + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + + // Trigger GC to clean up the compacted files. + trigger_table_gc(metasrv, "repart_phy_metric").await; + trigger_full_gc(&ticker).await; + + // Should be no change after GC. + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` ORDER BY `host`", + query_ctx.clone(), + ) + .await + .unwrap(); + check_output_stream(result.data, expected).await; + let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await; + let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await; + assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests); + + let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('c_host', '2022-01-03 00:00:00', 5.0);"#; + run_sql(instance, sql, query_ctx.clone()).await.unwrap(); + + let result = run_sql( + instance, + "SELECT * FROM `repart_log_metric` WHERE `host` = 'c_host'", + query_ctx.clone(), + ) + .await + .unwrap(); + let expected = "\ ++--------+---------------------+-----+ +| host | ts | val | ++--------+---------------------+-----+ +| c_host | 2022-01-03T00:00:00 | 5.0 | ++--------+---------------------+-----+"; + check_output_stream(result.data, expected).await; + + run_sql( + instance, + "DROP TABLE `repart_log_metric`", + query_ctx.clone(), + ) + .await + .unwrap(); + run_sql( + instance, + "DROP TABLE `repart_phy_metric`", + query_ctx.clone(), + ) + .await + .unwrap(); +} + +async fn run_sql( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> FrontendResult { + info!("Run SQL: {sql}"); + instance.do_query(sql, query_ctx).await.remove(0) +}