diff --git a/Cargo.lock b/Cargo.lock index f36ef2d286..563841d26f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13065,6 +13065,7 @@ dependencies = [ "loki-proto", "meta-client", "meta-srv", + "mito2", "moka", "mysql_async", "object-store", diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 9c06a3c189..c8081a67fc 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -246,14 +246,6 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - - #[snafu(display("Loader for {type_name} is not implemented: {reason}"))] - ProcedureLoaderNotImplemented { - #[snafu(implicit)] - location: Location, - type_name: String, - reason: String, - }, } pub type Result = std::result::Result; @@ -274,8 +266,7 @@ impl ErrorExt for Error { Error::ToJson { .. } | Error::DeleteState { .. } | Error::FromJson { .. } - | Error::WaitWatcher { .. } - | Error::ProcedureLoaderNotImplemented { .. } => StatusCode::Internal, + | Error::WaitWatcher { .. } => StatusCode::Internal, Error::RetryTimesExceeded { .. } | Error::RollbackTimesExceeded { .. } diff --git a/src/meta-srv/src/gc.rs b/src/meta-srv/src/gc.rs index 0b9b6fe438..3677e72a41 100644 --- a/src/meta-srv/src/gc.rs +++ b/src/meta-srv/src/gc.rs @@ -28,9 +28,10 @@ mod procedure; mod scheduler; mod tracker; -pub(crate) use options::GcSchedulerOptions; +pub use options::GcSchedulerOptions; +pub use procedure::BatchGcProcedure; pub(crate) use scheduler::{GcScheduler, GcTickerRef}; -pub(crate) type Region2Peers = HashMap)>; +pub type Region2Peers = HashMap)>; pub(crate) type Peer2Regions = HashMap>; diff --git a/src/meta-srv/src/gc/ctx.rs b/src/meta-srv/src/gc/ctx.rs index 8ce83be362..7b1cfc68e1 100644 --- a/src/meta-srv/src/gc/ctx.rs +++ b/src/meta-srv/src/gc/ctx.rs @@ -84,44 +84,6 @@ impl DefaultGcSchedulerCtx { mailbox: MailboxRef, server_addr: String, ) -> Result { - // register a noop loader for `GcRegionProcedure` to avoid error when deserializing procedure when rebooting - - procedure_manager - .register_loader( - GcRegionProcedure::TYPE_NAME, - Box::new(move |json| { - common_procedure::error::ProcedureLoaderNotImplementedSnafu { - type_name: GcRegionProcedure::TYPE_NAME.to_string(), - reason: - "GC procedure should be retried by scheduler, not reloaded from storage" - .to_string(), - } - .fail() - }), - ) - .context(error::RegisterProcedureLoaderSnafu { - type_name: GcRegionProcedure::TYPE_NAME, - })?; - - // register a noop loader for `BatchGcProcedure` to avoid error when deserializing procedure when rebooting - - procedure_manager - .register_loader( - BatchGcProcedure::TYPE_NAME, - Box::new(move |json| { - common_procedure::error::ProcedureLoaderNotImplementedSnafu { - type_name: BatchGcProcedure::TYPE_NAME.to_string(), - reason: - "Batch GC procedure should not be reloaded from storage, as it doesn't need to be retried if interrupted" - .to_string(), - } - .fail() - }), - ) - .context(error::RegisterProcedureLoaderSnafu { - type_name: BatchGcProcedure::TYPE_NAME, - })?; - Ok(Self { table_metadata_manager, procedure_manager, diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 5721f12af9..0dbfededb3 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -54,6 +54,7 @@ log-query = { workspace = true } loki-proto.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } +mito2.workspace = true moka.workspace = true mysql_async = { version = "0.35", default-features = false, features = [ "time", diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index a7c7267e8b..fc3803f32e 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -59,8 +59,10 @@ use hyper_util::rt::TokioIo; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::discovery; +use meta_srv::gc::GcSchedulerOptions; use meta_srv::metasrv::{Metasrv, MetasrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; +use mito2::gc::GcConfig; use object_store::config::ObjectStoreConfig; use rand::Rng; use servers::grpc::GrpcOptions; @@ -103,6 +105,8 @@ pub struct GreptimeDbClusterBuilder { datanodes: Option, datanode_wal_config: DatanodeWalConfig, metasrv_wal_config: MetasrvWalConfig, + datanode_gc_config: GcConfig, + metasrv_gc_config: GcSchedulerOptions, shared_home_dir: Option>, meta_selector: Option, } @@ -134,6 +138,8 @@ impl GreptimeDbClusterBuilder { datanodes: None, datanode_wal_config: DatanodeWalConfig::default(), metasrv_wal_config: MetasrvWalConfig::default(), + datanode_gc_config: GcConfig::default(), + metasrv_gc_config: GcSchedulerOptions::default(), shared_home_dir: None, meta_selector: None, } @@ -169,6 +175,17 @@ impl GreptimeDbClusterBuilder { self } + #[must_use] + pub fn with_datanode_gc_config(mut self, datanode_gc_config: GcConfig) -> Self { + self.datanode_gc_config = datanode_gc_config; + self + } + + pub fn with_metasrv_gc_config(mut self, metasrv_gc_config: GcSchedulerOptions) -> Self { + self.metasrv_gc_config = metasrv_gc_config; + self + } + #[must_use] pub fn with_shared_home_dir(mut self, shared_home_dir: Arc) -> Self { self.shared_home_dir = Some(shared_home_dir); @@ -205,6 +222,7 @@ impl GreptimeDbClusterBuilder { server_addr: "127.0.0.1:3002".to_string(), ..Default::default() }, + gc: self.metasrv_gc_config.clone(), ..Default::default() }; @@ -279,6 +297,7 @@ impl GreptimeDbClusterBuilder { vec![], home_dir, self.datanode_wal_config.clone(), + self.datanode_gc_config.clone(), ) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( @@ -286,6 +305,7 @@ impl GreptimeDbClusterBuilder { self.store_providers.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), self.datanode_wal_config.clone(), + self.datanode_gc_config.clone(), ); guards.push(guard); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 0faa7cdbd5..b43c000189 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -309,6 +309,7 @@ impl GreptimeDbStandaloneBuilder { store_types, &self.instance_name, self.datanode_wal_config.clone(), + Default::default(), ); let kv_backend_config = KvBackendConfig::default(); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index e667bf7626..5a0619c1cc 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -32,6 +32,7 @@ use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, StorageConfig}; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; +use mito2::gc::GcConfig; use object_store::config::{ AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, }; @@ -145,6 +146,7 @@ fn s3_test_config() -> S3Config { secret_access_key: env::var("GT_S3_ACCESS_KEY").unwrap().into(), bucket: env::var("GT_S3_BUCKET").unwrap(), region: Some(env::var("GT_S3_REGION").unwrap()), + endpoint: env::var("GT_S3_ENDPOINT_URL").ok(), ..Default::default() }, ..Default::default() @@ -163,7 +165,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te scope: env::var("GT_GCS_SCOPE").unwrap(), credential_path: env::var("GT_GCS_CREDENTIAL_PATH").unwrap().into(), credential: env::var("GT_GCS_CREDENTIAL").unwrap().into(), - endpoint: env::var("GT_GCS_ENDPOINT").unwrap(), + endpoint: env::var("GT_GCS_ENDPOINT").unwrap_or_default(), }, ..Default::default() }; @@ -297,6 +299,7 @@ pub fn create_tmp_dir_and_datanode_opts( store_provider_types: Vec, name: &str, wal_config: DatanodeWalConfig, + gc_config: GcConfig, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); @@ -314,7 +317,13 @@ pub fn create_tmp_dir_and_datanode_opts( store_providers.push(store); storage_guards.push(StorageGuard(data_tmp_dir)) } - let opts = create_datanode_opts(default_store, store_providers, home_dir, wal_config); + let opts = create_datanode_opts( + default_store, + store_providers, + home_dir, + wal_config, + gc_config, + ); ( opts, @@ -330,7 +339,18 @@ pub(crate) fn create_datanode_opts( providers: Vec, home_dir: String, wal_config: DatanodeWalConfig, + gc_config: GcConfig, ) -> DatanodeOptions { + let region_engine = DatanodeOptions::default() + .region_engine + .into_iter() + .map(|mut v| { + if let datanode::config::RegionEngineConfig::Mito(mito_config) = &mut v { + mito_config.gc = gc_config.clone(); + } + v + }) + .collect(); DatanodeOptions { node_id: Some(0), require_lease_before_startup: true, @@ -343,6 +363,7 @@ pub(crate) fn create_datanode_opts( .with_bind_addr(PEER_PLACEHOLDER_ADDR) .with_server_addr(PEER_PLACEHOLDER_ADDR), wal: wal_config, + region_engine, ..Default::default() } } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index db5c00efff..74d713d3ee 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod gc; mod instance_kafka_wal_test; mod instance_noop_wal_test; mod instance_test; diff --git a/tests-integration/src/tests/gc.rs b/tests-integration/src/tests/gc.rs new file mode 100644 index 0000000000..c2b402eb1a --- /dev/null +++ b/tests-integration/src/tests/gc.rs @@ -0,0 +1,262 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::time::Duration; + +use common_meta::key::TableMetadataManagerRef; +use common_procedure::ProcedureWithId; +use common_telemetry::info; +use common_test_util::recordbatch::check_output_stream; +use futures::TryStreamExt as _; +use itertools::Itertools; +use meta_srv::gc::{BatchGcProcedure, GcSchedulerOptions, Region2Peers}; +use mito2::gc::GcConfig; +use store_api::storage::RegionId; +use table::metadata::TableId; + +use crate::cluster::GreptimeDbClusterBuilder; +use crate::test_util::{StorageType, TempDirGuard, get_test_store_config}; +use crate::tests::test_util::{MockInstanceBuilder, TestContext, execute_sql, wait_procedure}; + +/// Helper function to get table route information for GC procedure +async fn get_table_route( + table_metadata_manager: &TableMetadataManagerRef, + table_id: TableId, +) -> (Region2Peers, Vec) { + // Get physical table route + let (_, physical_table_route) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await + .unwrap(); + + let mut region_routes = Region2Peers::new(); + let mut regions = Vec::new(); + + // Convert region routes to Region2Peers format + for region_route in physical_table_route.region_routes { + let region_id = region_route.region.id; + let leader_peer = region_route.leader_peer.clone().unwrap(); + let follower_peers = region_route.follower_peers.clone(); + + region_routes.insert(region_id, (leader_peer, follower_peers)); + regions.push(region_id); + } + + (region_routes, regions) +} + +/// Helper function to list all SST files +async fn list_sst_files(test_context: &TestContext) -> HashSet { + let mut sst_files = HashSet::new(); + + for datanode in test_context.datanodes().values() { + let region_server = datanode.region_server(); + let mito = region_server.mito_engine().unwrap(); + let all_files = mito + .all_ssts_from_storage() + .try_collect::>() + .await + .unwrap() + .into_iter() + .map(|e| e.file_path) + .collect_vec(); + sst_files.extend(all_files); + } + + sst_files +} + +async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirGuard) { + common_telemetry::init_default_ut_logging(); + let test_name = uuid::Uuid::new_v4().to_string(); + let (store_config, guard) = get_test_store_config(store_type); + + let builder = GreptimeDbClusterBuilder::new(&test_name) + .await + .with_metasrv_gc_config(GcSchedulerOptions { + enable: true, + ..Default::default() + }) + .with_datanode_gc_config(GcConfig { + enable: true, + // set lingering time to zero for test speedup + lingering_time: Some(Duration::ZERO), + ..Default::default() + }) + .with_store_config(store_config); + ( + TestContext::new(MockInstanceBuilder::Distributed(builder)).await, + guard, + ) +} + +#[tokio::test] +async fn test_gc_basic_different_store() { + common_telemetry::init_default_ut_logging(); + let store_type = StorageType::build_storage_types_based_on_env(); + for store in store_type { + if store == StorageType::File { + continue; // no point in test gc in fs storage + } + info!("Running GC test with storage type: {}", store); + test_gc_basic(&store).await; + } +} + +async fn test_gc_basic(store_type: &StorageType) { + let (test_context, _guard) = distributed_with_gc(store_type).await; + let instance = test_context.frontend(); + let metasrv = test_context.metasrv(); + + // Step 1: Create table with append_mode to easily generate multiple files + let create_table_sql = r#" + CREATE TABLE test_gc_table ( + ts TIMESTAMP TIME INDEX, + val DOUBLE, + host STRING + ) WITH (append_mode = 'true') + "#; + execute_sql(&instance, create_table_sql).await; + + // Step 2: Generate SST files by inserting data and flushing multiple times + for i in 0..4 { + let insert_sql = format!( + r#" + INSERT INTO test_gc_table (ts, val, host) VALUES + ('2023-01-0{} 10:00:00', {}, 'host{}'), + ('2023-01-0{} 11:00:00', {}, 'host{}'), + ('2023-01-0{} 12:00:00', {}, 'host{}') + "#, + i + 1, + 10.0 + i as f64, + i, + i + 1, + 20.0 + i as f64, + i, + i + 1, + 30.0 + i as f64, + i + ); + execute_sql(&instance, &insert_sql).await; + + // Flush the table to create SST files + let flush_sql = "ADMIN FLUSH_TABLE('test_gc_table')"; + execute_sql(&instance, flush_sql).await; + } + + // Step 3: Get table information + let table = instance + .catalog_manager() + .table("greptime", "public", "test_gc_table", None) + .await + .unwrap() + .unwrap(); + let table_id = table.table_info().table_id(); + + // List SST files before compaction (for verification) + let sst_files_before_compaction = list_sst_files(&test_context).await; + info!( + "SST files before compaction: {:?}", + sst_files_before_compaction + ); + assert_eq!(sst_files_before_compaction.len(), 4); // 4 files from 4 flushes + + // Step 4: Trigger compaction to create garbage SST files + let compact_sql = "ADMIN COMPACT_TABLE('test_gc_table')"; + execute_sql(&instance, compact_sql).await; + + // Wait for compaction to complete + tokio::time::sleep(Duration::from_secs(2)).await; + + // List SST files after compaction (should have both old and new files) + let sst_files_after_compaction = list_sst_files(&test_context).await; + info!( + "SST files after compaction: {:?}", + sst_files_after_compaction + ); + assert_eq!(sst_files_after_compaction.len(), 5); // 4 old + 1 new + + // Step 5: Get table route information for GC procedure + let (region_routes, regions) = + get_table_route(metasrv.table_metadata_manager(), table_id).await; + + // Step 6: Create and execute BatchGcProcedure + let procedure = BatchGcProcedure::new( + metasrv.mailbox().clone(), + metasrv.options().grpc.server_addr.clone(), + regions.clone(), + false, // full_file_listing + region_routes, + HashMap::new(), // related_regions (empty for this simple test) + Duration::from_secs(10), // timeout + ); + + // Submit the procedure to the procedure manager + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + + let _watcher = metasrv + .procedure_manager() + .submit(procedure_with_id) + .await + .unwrap(); + + // Wait for the procedure to complete + wait_procedure(metasrv.procedure_manager(), procedure_id).await; + + // Step 7: Verify GC results + let sst_files_after_gc = list_sst_files(&test_context).await; + info!("SST files after GC: {:?}", sst_files_after_gc); + assert_eq!(sst_files_after_gc.len(), 1); // Only the compacted file should remain after gc + + // Verify that data is still accessible + let count_sql = "SELECT COUNT(*) FROM test_gc_table"; + let count_output = execute_sql(&instance, count_sql).await; + let expected = r#" ++----------+ +| count(*) | ++----------+ +| 12 | ++----------+"# + .trim(); + check_output_stream(count_output.data, expected).await; + + let select_sql = "SELECT * FROM test_gc_table ORDER BY ts"; + let select_output = execute_sql(&instance, select_sql).await; + let expected = r#" ++---------------------+------+-------+ +| ts | val | host | ++---------------------+------+-------+ +| 2023-01-01T10:00:00 | 10.0 | host0 | +| 2023-01-01T11:00:00 | 20.0 | host0 | +| 2023-01-01T12:00:00 | 30.0 | host0 | +| 2023-01-02T10:00:00 | 11.0 | host1 | +| 2023-01-02T11:00:00 | 21.0 | host1 | +| 2023-01-02T12:00:00 | 31.0 | host1 | +| 2023-01-03T10:00:00 | 12.0 | host2 | +| 2023-01-03T11:00:00 | 22.0 | host2 | +| 2023-01-03T12:00:00 | 32.0 | host2 | +| 2023-01-04T10:00:00 | 13.0 | host3 | +| 2023-01-04T11:00:00 | 23.0 | host3 | +| 2023-01-04T12:00:00 | 33.0 | host3 | ++---------------------+------+-------+"# + .trim(); + check_output_stream(select_output.data, expected).await; + + // TODO: Add more specific assertions once we have proper file system access + // For now, the test passes if the procedure executes without errors + info!("GC test completed successfully"); +} diff --git a/tests-integration/src/tests/instance_noop_wal_test.rs b/tests-integration/src/tests/instance_noop_wal_test.rs index 1bc4870fa8..dd9c46cb1d 100644 --- a/tests-integration/src/tests/instance_noop_wal_test.rs +++ b/tests-integration/src/tests/instance_noop_wal_test.rs @@ -19,7 +19,7 @@ use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; use crate::cluster::GreptimeDbClusterBuilder; use crate::tests::test_util::{ - MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql, + MockInstanceBuilder, RebuildableMockInstance, TestContext, execute_sql, }; pub(crate) async fn distributed_with_noop_wal() -> TestContext { diff --git a/tests-integration/src/tests/reconcile_table.rs b/tests-integration/src/tests/reconcile_table.rs index 3e8414436d..3fee7b7f56 100644 --- a/tests-integration/src/tests/reconcile_table.rs +++ b/tests-integration/src/tests/reconcile_table.rs @@ -24,8 +24,8 @@ use table::table_reference::TableReference; use crate::cluster::GreptimeDbClusterBuilder; use crate::tests::test_util::{ - MockInstance, MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, - execute_sql, restore_kvbackend, try_execute_sql, wait_procedure, + MockInstanceBuilder, RebuildableMockInstance, TestContext, dump_kvbackend, execute_sql, + restore_kvbackend, try_execute_sql, wait_procedure, }; const CREATE_MONITOR_TABLE_SQL: &str = r#" diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 019ccd79e5..eccca85305 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::env; use std::sync::Arc; use async_trait::async_trait; use client::OutputData; +use common_meta::DatanodeId; use common_meta::kv_backend::KvBackendRef; use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream}; use common_meta::rpc::KeyValue; @@ -30,6 +32,7 @@ use common_test_util::find_workspace_path; use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; +use datanode::datanode::Datanode; use frontend::error::Result; use frontend::instance::Instance; use futures::TryStreamExt; @@ -95,6 +98,13 @@ impl MockInstanceImpl { MockInstanceImpl::Distributed(instance) => &instance.metasrv, } } + + pub(crate) fn datanodes(&self) -> &HashMap { + match self { + MockInstanceImpl::Standalone(_) => unreachable!(), + MockInstanceImpl::Distributed(instance) => &instance.datanode_instances, + } + } } impl MockInstance for MockInstanceImpl { @@ -185,6 +195,14 @@ impl TestContext { pub(crate) fn metasrv(&self) -> &Arc { self.instance.as_ref().unwrap().metasrv() } + + pub(crate) fn frontend(&self) -> Arc { + self.instance.as_ref().unwrap().frontend() + } + + pub(crate) fn datanodes(&self) -> &HashMap { + self.instance.as_ref().unwrap().datanodes() + } } #[async_trait::async_trait]