diff --git a/src/common/meta/src/reconciliation.rs b/src/common/meta/src/reconciliation.rs index 3f851d0163..351a415566 100644 --- a/src/common/meta/src/reconciliation.rs +++ b/src/common/meta/src/reconciliation.rs @@ -18,3 +18,4 @@ pub(crate) mod reconcile_database; pub(crate) mod reconcile_logical_tables; pub(crate) mod reconcile_table; pub(crate) mod utils; +pub use reconcile_table::resolve_column_metadata::ResolveStrategy; diff --git a/src/common/meta/src/reconciliation/manager.rs b/src/common/meta/src/reconciliation/manager.rs index 29e15b4692..d5ffd36d84 100644 --- a/src/common/meta/src/reconciliation/manager.rs +++ b/src/common/meta/src/reconciliation/manager.rs @@ -117,7 +117,8 @@ impl ReconciliationManager { .await?; if physical_table_id == table_id { - Ok(self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy)) + self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy) + .await } else { let physical_table_info = table_metadata_manager .table_info_manager() @@ -127,24 +128,25 @@ impl ReconciliationManager { table_name: format!("table_id: {}", physical_table_id), })?; - Ok(self.reconcile_logical_tables( + self.reconcile_logical_tables( physical_table_id, physical_table_info.table_name(), vec![(table_id, table_ref.into())], - )) + ) + .await } } /// Reconcile a database. /// /// Returns the procedure id of the reconciliation procedure. - pub fn reconcile_database( + pub async fn reconcile_database( &self, catalog: String, schema: String, resolve_strategy: ResolveStrategy, parallelism: usize, - ) -> ProcedureId { + ) -> Result { let parallelism = normalize_parallelism(parallelism); let procedure = ReconcileDatabaseProcedure::new( self.context.clone(), @@ -155,15 +157,15 @@ impl ReconciliationManager { resolve_strategy, false, ); - self.spawn_procedure(Box::new(procedure)) + self.spawn_procedure(Box::new(procedure)).await } - fn reconcile_physical_table( + async fn reconcile_physical_table( &self, table_id: TableId, table_name: TableName, resolve_strategy: ResolveStrategy, - ) -> ProcedureId { + ) -> Result { let procedure = ReconcileTableProcedure::new( self.context.clone(), table_id, @@ -171,15 +173,15 @@ impl ReconciliationManager { resolve_strategy, false, ); - self.spawn_procedure(Box::new(procedure)) + self.spawn_procedure(Box::new(procedure)).await } - fn reconcile_logical_tables( + async fn reconcile_logical_tables( &self, physical_table_id: TableId, physical_table_name: TableName, logical_tables: Vec<(TableId, TableName)>, - ) -> ProcedureId { + ) -> Result { let procedure = ReconcileLogicalTablesProcedure::new( self.context.clone(), physical_table_id, @@ -187,18 +189,18 @@ impl ReconciliationManager { logical_tables, false, ); - self.spawn_procedure(Box::new(procedure)) + self.spawn_procedure(Box::new(procedure)).await } /// Reconcile a catalog. /// /// Returns the procedure id of the reconciliation procedure. - pub fn reconcile_catalog( + pub async fn reconcile_catalog( &self, catalog: String, resolve_strategy: ResolveStrategy, parallelism: usize, - ) -> ProcedureId { + ) -> Result { let parallelism = normalize_parallelism(parallelism); let procedure = ReconcileCatalogProcedure::new( self.context.clone(), @@ -207,29 +209,26 @@ impl ReconciliationManager { resolve_strategy, parallelism, ); - self.spawn_procedure(Box::new(procedure)) + self.spawn_procedure(Box::new(procedure)).await } - fn spawn_procedure(&self, procedure: BoxedProcedure) -> ProcedureId { + async fn spawn_procedure(&self, procedure: BoxedProcedure) -> Result { let procedure_manager = self.procedure_manager.clone(); let procedure_with_id = ProcedureWithId::with_random_id(procedure); let procedure_id = procedure_with_id.id; + let mut watcher = procedure_manager + .submit(procedure_with_id) + .await + .context(error::SubmitProcedureSnafu)?; common_runtime::spawn_global(async move { - let watcher = &mut match procedure_manager.submit(procedure_with_id).await { - Ok(watcher) => watcher, - Err(e) => { - error!(e; "Failed to submit reconciliation procedure {procedure_id}"); - return; - } - }; - if let Err(e) = watcher::wait(watcher).await { + if let Err(e) = watcher::wait(&mut watcher).await { error!(e; "Failed to wait reconciliation procedure {procedure_id}"); return; } info!("Reconciliation procedure {procedure_id} is finished successfully!"); }); - procedure_id + Ok(procedure_id) } } diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 5b318c51b9..9dc61b1986 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -189,12 +189,15 @@ impl procedure_service_server::ProcedureService for Metasrv { parallelism, } = database; let resolve_strategy = parse_resolve_strategy(resolve_strategy)?; - self.reconciliation_manager().reconcile_database( - catalog_name, - database_name, - resolve_strategy.into(), - parallelism as usize, - ) + self.reconciliation_manager() + .reconcile_database( + catalog_name, + database_name, + resolve_strategy.into(), + parallelism as usize, + ) + .await + .context(error::SubmitReconcileProcedureSnafu)? } Target::ReconcileCatalog(catalog) => { let ReconcileCatalog { @@ -203,11 +206,10 @@ impl procedure_service_server::ProcedureService for Metasrv { parallelism, } = catalog; let resolve_strategy = parse_resolve_strategy(resolve_strategy)?; - self.reconciliation_manager().reconcile_catalog( - catalog_name, - resolve_strategy.into(), - parallelism as usize, - ) + self.reconciliation_manager() + .reconcile_catalog(catalog_name, resolve_strategy.into(), parallelism as usize) + .await + .context(error::SubmitReconcileProcedureSnafu)? } }; Ok(Response::new(ReconcileResponse { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index c5018d137f..a7847394a5 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -315,7 +315,7 @@ impl GreptimeDbClusterBuilder { meta_peer_client: &MetaPeerClientRef, expected_datanodes: usize, ) { - for _ in 0..10 { + for _ in 0..100 { let alive_datanodes = meta_srv::lease::alive_datanodes(meta_peer_client, u64::MAX) .await .unwrap() @@ -323,7 +323,7 @@ impl GreptimeDbClusterBuilder { if alive_datanodes == expected_datanodes { return; } - tokio::time::sleep(Duration::from_secs(1)).await + tokio::time::sleep(Duration::from_micros(100)).await } panic!("Some Datanodes are not alive in 10 seconds!") } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 255409a300..c0179fa781 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -15,6 +15,7 @@ mod instance_kafka_wal_test; mod instance_test; mod promql_test; +mod reconcile_table; pub(crate) mod test_util; use std::collections::HashMap; diff --git a/tests-integration/src/tests/reconcile_table.rs b/tests-integration/src/tests/reconcile_table.rs new file mode 100644 index 0000000000..8a0bf9e26d --- /dev/null +++ b/tests-integration/src/tests/reconcile_table.rs @@ -0,0 +1,581 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use client::{OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_meta::reconciliation::manager::ReconciliationManagerRef; +use common_meta::reconciliation::ResolveStrategy; +use common_procedure::ProcedureManagerRef; +use common_recordbatch::util::collect_batches; +use common_test_util::recordbatch::check_output_stream; +use table::table_reference::TableReference; + +use crate::cluster::GreptimeDbClusterBuilder; +use crate::tests::test_util::{ + dump_kvbackend, execute_sql, restore_kvbackend, try_execute_sql, wait_procedure, MockInstance, + MockInstanceBuilder, RebuildableMockInstance, TestContext, +}; + +const CREATE_MONITOR_TABLE_SQL: &str = r#" +CREATE TABLE monitor ( + t TIMESTAMP TIME INDEX, + env STRING INVERTED INDEX, + cloud_provider STRING, + latency Float, + PRIMARY KEY (env, cloud_provider) +) +PARTITION ON COLUMNS (`env`) ( + `env` < 'env-0', + `env` >= 'env-0' AND `env` < 'env-1', + `env` >= 'env-1' AND `env` < 'env-2', + `env` >= 'env-2' +) +with('append_mode'='true');"#; + +const CREATE_TABLE_SQL: &str = r#" +CREATE TABLE grpc_latencies ( + ts TIMESTAMP TIME INDEX, + host STRING INVERTED INDEX, + method_name STRING, + env STRING NOT NULL, + latency Float, + PRIMARY KEY (host, method_name) +) +PARTITION ON COLUMNS (`host`) ( + `host` < 'host1', + `host` >= 'host1' AND `host` < 'host2', + `host` >= 'host2' AND `host` < 'host3', + `host` >= 'host3' +) +with('append_mode'='true');"#; + +const INSERT_DATA_SQL: &str = r#" +INSERT INTO grpc_latencies ( + ts, + host, + method_name, + latency, + env +) VALUES ( + '2025-08-08 20:00:06', + 'host1', + 'GetUser', + 103.0, + 'prod' +); +"#; + +const INSERT_DATA_SQL_WITHOUT_ENV: &str = r#" +INSERT INTO grpc_latencies ( + ts, + host, + method_name, + latency +) VALUES ( + '2025-08-08 20:00:07', + 'host2', + 'GetUser', + 104.0 +); +"#; + +const RENAME_TABLE_SQL: &str = r#" +ALTER TABLE grpc_latencies RENAME grpc_latencies_renamed; +"#; + +const MODIFY_COLUMN_LATENCY_TYPE_SQL: &str = r#" +ALTER TABLE grpc_latencies MODIFY COLUMN latency DOUBLE; +"#; + +const INSERT_DATA_SQL_WITH_CLOUD_PROVIDER: &str = r#" +INSERT INTO grpc_latencies ( + ts, + host, + method_name, + cloud_provider, + latency, +) VALUES ( + '2025-08-08 20:00:08', + 'host3', + 'GetUser', + 'aws', + 105.0 +); +"#; + +const ADD_COLUMN_CLOUD_PROVIDER_SQL: &str = r#" +ALTER TABLE grpc_latencies ADD COLUMN cloud_provider STRING; +"#; + +const ADD_COLUMN_ENV_SQL: &str = r#" +ALTER TABLE grpc_latencies ADD COLUMN env STRING; +"#; + +const DROP_COLUMN_ENV_SQL: &str = r#" +ALTER TABLE grpc_latencies DROP COLUMN env; +"#; + +#[tokio::test] +async fn test_reconcile_dropped_column() { + let builder = GreptimeDbClusterBuilder::new("test_reconcile_dropped_column").await; + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + // Create the table. + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + // Insert data. + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + + // Drop column env. + let output = execute_sql(&test_context.frontend(), DROP_COLUMN_ENV_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL_WITHOUT_ENV).await; + assert_affected_rows(output.data).await; + + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues).await; + + test_context.rebuild().await; + let frontend = test_context.frontend(); + let metasrv = test_context.metasrv(); + let reconciliation_manager = metasrv.reconciliation_manager(); + + // We should unable to query table due to the column env is dropped. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies").await; + assert_no_field_error(output.data).await; + + // Reconcile the table. + reconcile_table( + metasrv.procedure_manager(), + reconciliation_manager, + "grpc_latencies", + ) + .await; + + // Now we should able to query table again. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+ +| ts | host | method_name | latency | ++---------------------+-------+-------------+---------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | +| 2025-08-08T20:00:07 | host2 | GetUser | 104.0 | ++---------------------+-------+-------------+---------+"#; + check_output_stream(output.data, expected).await; + + // Add column env. + let output = execute_sql(&frontend, ADD_COLUMN_ENV_SQL).await; + assert_affected_rows(output.data).await; + + // Query table again. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+-----+ +| ts | host | method_name | latency | env | ++---------------------+-------+-------------+---------+-----+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | | +| 2025-08-08T20:00:07 | host2 | GetUser | 104.0 | | ++---------------------+-------+-------------+---------+-----+"#; + check_output_stream(output.data, expected).await; + + // Drop column env. + let output = execute_sql(&frontend, DROP_COLUMN_ENV_SQL).await; + assert_affected_rows(output.data).await; + + // Add column cloud_provider. + let output = execute_sql(&frontend, ADD_COLUMN_CLOUD_PROVIDER_SQL).await; + assert_affected_rows(output.data).await; + + // Inserts data with cloud_provider. + let output = execute_sql(&frontend, INSERT_DATA_SQL_WITH_CLOUD_PROVIDER).await; + assert_affected_rows(output.data).await; + + // Now we should able to query table again. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+----------------+ +| ts | host | method_name | latency | cloud_provider | ++---------------------+-------+-------------+---------+----------------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | | +| 2025-08-08T20:00:07 | host2 | GetUser | 104.0 | | +| 2025-08-08T20:00:08 | host3 | GetUser | 105.0 | aws | ++---------------------+-------+-------------+---------+----------------+"#; + check_output_stream(output.data, expected).await; +} + +#[tokio::test] +async fn test_reconcile_added_column() { + let builder = GreptimeDbClusterBuilder::new("test_reconcile_added_column").await; + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + // Create the table. + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + // Insert data. + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + // Drop column env. + let output = execute_sql(&test_context.frontend(), DROP_COLUMN_ENV_SQL).await; + assert_affected_rows(output.data).await; + + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + + // Add column cloud_provider. + let output = execute_sql(&test_context.frontend(), ADD_COLUMN_CLOUD_PROVIDER_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql( + &test_context.frontend(), + INSERT_DATA_SQL_WITH_CLOUD_PROVIDER, + ) + .await; + assert_affected_rows(output.data).await; + + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues).await; + + test_context.rebuild().await; + let frontend = test_context.frontend(); + let metasrv = test_context.metasrv(); + let reconciliation_manager = metasrv.reconciliation_manager(); + + // The column cloud_provider is missing. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+ +| ts | host | method_name | latency | ++---------------------+-------+-------------+---------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | +| 2025-08-08T20:00:08 | host3 | GetUser | 105.0 | ++---------------------+-------+-------------+---------+"#; + check_output_stream(output.data, expected).await; + + // Reconcile the table. + reconcile_table( + metasrv.procedure_manager(), + reconciliation_manager, + "grpc_latencies", + ) + .await; + + // Now the column cloud_provider is available. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+----------------+ +| ts | host | method_name | latency | cloud_provider | ++---------------------+-------+-------------+---------+----------------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | | +| 2025-08-08T20:00:08 | host3 | GetUser | 105.0 | aws | ++---------------------+-------+-------------+---------+----------------+"#; + check_output_stream(output.data, expected).await; + + // Add column env. + let output = execute_sql(&frontend, ADD_COLUMN_ENV_SQL).await; + assert_affected_rows(output.data).await; + + // Insert data with env. + let output = execute_sql(&frontend, INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host, env").await; + let expected = r#"+---------------------+-------+-------------+---------+----------------+------+ +| ts | host | method_name | latency | cloud_provider | env | ++---------------------+-------+-------------+---------+----------------+------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | | prod | +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | | | +| 2025-08-08T20:00:08 | host3 | GetUser | 105.0 | aws | | ++---------------------+-------+-------------+---------+----------------+------+"#; + check_output_stream(output.data, expected).await; +} + +#[tokio::test] +async fn test_reconcile_modify_column_type() { + let builder = GreptimeDbClusterBuilder::new("test_reconcile_added_column").await; + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + // Create the table. + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + // Insert data. + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + // Drop column env. + let output = execute_sql(&test_context.frontend(), DROP_COLUMN_ENV_SQL).await; + assert_affected_rows(output.data).await; + + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + + // Add column cloud_provider. + let output = execute_sql(&test_context.frontend(), MODIFY_COLUMN_LATENCY_TYPE_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL_WITHOUT_ENV).await; + assert_affected_rows(output.data).await; + + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues).await; + + test_context.rebuild().await; + let frontend = test_context.frontend(); + let metasrv = test_context.metasrv(); + let reconciliation_manager = metasrv.reconciliation_manager(); + + // The column cloud_provider is missing. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + assert_has_different_type(output.data).await; + + // Reconcile the table. + reconcile_table( + metasrv.procedure_manager(), + reconciliation_manager, + "grpc_latencies", + ) + .await; + + // Now we can query the table again. + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+ +| ts | host | method_name | latency | ++---------------------+-------+-------------+---------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | +| 2025-08-08T20:00:07 | host2 | GetUser | 104.0 | ++---------------------+-------+-------------+---------+"#; + check_output_stream(output.data, expected).await; + + // Add column env. + let output = execute_sql(&frontend, ADD_COLUMN_CLOUD_PROVIDER_SQL).await; + assert_affected_rows(output.data).await; + + // Insert data with env. + let output = execute_sql(&frontend, INSERT_DATA_SQL_WITH_CLOUD_PROVIDER).await; + assert_affected_rows(output.data).await; + + let output = execute_sql(&frontend, "SELECT * FROM grpc_latencies ORDER BY host").await; + let expected = r#"+---------------------+-------+-------------+---------+----------------+ +| ts | host | method_name | latency | cloud_provider | ++---------------------+-------+-------------+---------+----------------+ +| 2025-08-08T20:00:06 | host1 | GetUser | 103.0 | | +| 2025-08-08T20:00:07 | host2 | GetUser | 104.0 | | +| 2025-08-08T20:00:08 | host3 | GetUser | 105.0 | aws | ++---------------------+-------+-------------+---------+----------------+"#; + check_output_stream(output.data, expected).await; +} + +#[tokio::test] +async fn test_recover_metadata_failed() { + let builder = GreptimeDbClusterBuilder::new("test_recover_metadata_failed") + .await + .with_datanodes(1); + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + // Backup the kv backend. + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql(&test_context.frontend(), "SELECT * FROM grpc_latencies").await; + let expected = r#"+---------------------+-------+-------------+------+---------+ +| ts | host | method_name | env | latency | ++---------------------+-------+-------------+------+---------+ +| 2025-08-08T20:00:06 | host1 | GetUser | prod | 103.0 | ++---------------------+-------+-------------+------+---------+"#; + check_output_stream(output.data, expected).await; + + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues.clone()).await; + test_context.rebuild().await; + + // Only grpc_latencies table is visible. + let output = execute_sql(&test_context.frontend(), "show tables;").await; + let expected = r#"+---------+ +| Tables | ++---------+ +| numbers | ++---------+"#; + check_output_stream(output.data, expected).await; + + // Expect table creation to fail because the region directory already exists. + let error = try_execute_sql(&test_context.frontend(), CREATE_MONITOR_TABLE_SQL) + .await + .unwrap_err(); + assert!(format!("{error:?}").contains("recovered metadata has different schema")); +} + +#[tokio::test] +async fn test_set_table_id() { + let builder = GreptimeDbClusterBuilder::new("test_set_table_id").await; + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues).await; + test_context.rebuild().await; + + let metasrv = test_context.metasrv(); + // Due to the table id 1024 already allocated, we need to jump to 1025. + metasrv.table_id_sequence().jump_to(1025).await.unwrap(); + + // We should able to create table now. + let output = execute_sql(&test_context.frontend(), CREATE_MONITOR_TABLE_SQL).await; + assert_affected_rows(output.data).await; +} + +#[tokio::test] +async fn test_dropped_table() { + let builder = GreptimeDbClusterBuilder::new("test_dropped_table").await; + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + // Creates the table. + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + // Insert data. + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + // Backup the kv backend. + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + + // Drop the table. + let output = execute_sql(&test_context.frontend(), "DROP TABLE grpc_latencies").await; + assert_affected_rows(output.data).await; + + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues).await; + // Enable the recovery mode, so the datanode will ignore empty region directory during recovery. + test_context + .metasrv() + .runtime_switch_manager() + .set_recovery_mode() + .await + .unwrap(); + test_context.rebuild().await; + + let output = execute_sql(&test_context.frontend(), "show tables;").await; + let expected = r#"+----------------+ +| Tables | ++----------------+ +| grpc_latencies | +| numbers | ++----------------+"#; + check_output_stream(output.data, expected).await; + + // We can't query the table because the table is dropped. + let output = execute_sql(&test_context.frontend(), "SELECT * FROM grpc_latencies").await; + region_not_found(output.data).await; + + // We should able to drop the table. + let output = execute_sql(&test_context.frontend(), "DROP TABLE grpc_latencies").await; + assert_affected_rows(output.data).await; +} + +#[tokio::test] +async fn test_renamed_table() { + let builder = GreptimeDbClusterBuilder::new("test_renamed_table").await; + let mut test_context = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + + // Creates the table. + let output = execute_sql(&test_context.frontend(), CREATE_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + // Inserts data. + let output = execute_sql(&test_context.frontend(), INSERT_DATA_SQL).await; + assert_affected_rows(output.data).await; + + // Backup the kv backend. + let keyvalues = dump_kvbackend(test_context.metasrv().kv_backend()).await; + + // Renames the table. + let output = execute_sql(&test_context.frontend(), RENAME_TABLE_SQL).await; + assert_affected_rows(output.data).await; + + let output = execute_sql( + &test_context.frontend(), + "SELECT * FROM grpc_latencies_renamed", + ) + .await; + let expected = r#"+---------------------+-------+-------------+------+---------+ +| ts | host | method_name | env | latency | ++---------------------+-------+-------------+------+---------+ +| 2025-08-08T20:00:06 | host1 | GetUser | prod | 103.0 | ++---------------------+-------+-------------+------+---------+"#; + check_output_stream(output.data, expected).await; + + restore_kvbackend(test_context.metasrv().kv_backend(), keyvalues).await; + test_context.rebuild().await; + + // After restoring the metadata, only the table with its original name (before the rename) is visible. + let output = execute_sql(&test_context.frontend(), "SELECT * FROM grpc_latencies").await; + check_output_stream(output.data, expected).await; + + let output = execute_sql(&test_context.frontend(), "show tables;").await; + let expected = r#"+----------------+ +| Tables | ++----------------+ +| grpc_latencies | +| numbers | ++----------------+"#; + check_output_stream(output.data, expected).await; +} + +#[tokio::test] +async fn test_rename_table() {} + +async fn unwrap_err(output: OutputData) -> String { + let error = match output { + OutputData::Stream(stream) => collect_batches(stream).await.unwrap_err(), + _ => unreachable!(), + }; + format!("{error:?}") +} + +async fn assert_no_field_error(output: OutputData) { + let error = unwrap_err(output).await; + assert!(error.contains("No field named")); +} + +async fn assert_has_different_type(output: OutputData) { + let error = unwrap_err(output).await; + assert!(error.contains("schema has a different type")); +} + +async fn region_not_found(output: OutputData) { + let error = unwrap_err(output).await; + assert!(error.contains("not found")); + assert!(error.contains("RegionId")); +} + +async fn reconcile_table( + procedure_manager: &ProcedureManagerRef, + reconciliation_manager: &ReconciliationManagerRef, + table_name: &str, +) { + let table_ref = TableReference { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + table: table_name, + }; + let procedure_id = reconciliation_manager + .reconcile_table(table_ref, ResolveStrategy::default()) + .await + .unwrap(); + wait_procedure(procedure_manager, procedure_id).await; +} + +async fn assert_affected_rows(output: OutputData) { + assert!(matches!(output, OutputData::AffectedRows(_))); +} diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index e3db5ea186..5c79b5b2d1 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -17,15 +17,26 @@ use std::sync::Arc; use async_trait::async_trait; use client::OutputData; +use common_meta::kv_backend::KvBackendRef; +use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use common_meta::rpc::store::{BatchPutRequest, DeleteRangeRequest, RangeRequest}; +use common_meta::rpc::KeyValue; +use common_procedure::{watcher, ProcedureId, ProcedureManagerRef}; use common_query::Output; use common_recordbatch::util; +use common_telemetry::tracing::info; use common_telemetry::warn; use common_test_util::find_workspace_path; use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig}; use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig}; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; +use frontend::error::Result; use frontend::instance::Instance; +use futures::TryStreamExt; +use meta_srv::metasrv::Metasrv; use rstest_reuse::{self, template}; +use servers::query_handler::sql::SqlQueryHandler; +use session::context::{QueryContext, QueryContextRef}; use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; @@ -77,6 +88,15 @@ pub(crate) enum MockInstanceImpl { Distributed(GreptimeDbCluster), } +impl MockInstanceImpl { + pub(crate) fn metasrv(&self) -> &Arc { + match self { + MockInstanceImpl::Standalone(_) => unreachable!(), + MockInstanceImpl::Distributed(instance) => &instance.metasrv, + } + } +} + impl MockInstance for MockInstanceImpl { fn frontend(&self) -> Arc { match self { @@ -128,9 +148,17 @@ impl MockInstanceBuilder { let GreptimeDbCluster { guards, datanode_options, + mut datanode_instances, .. } = instance; + for (id, instance) in datanode_instances.iter_mut() { + instance + .shutdown() + .await + .unwrap_or_else(|_| panic!("Failed to shutdown datanode {}", id)); + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; MockInstanceImpl::Distributed( builder.build_with(datanode_options, false, guards).await, ) @@ -145,7 +173,7 @@ pub(crate) struct TestContext { } impl TestContext { - async fn new(builder: MockInstanceBuilder) -> Self { + pub(crate) async fn new(builder: MockInstanceBuilder) -> Self { let instance = builder.build().await; Self { @@ -153,11 +181,16 @@ impl TestContext { builder, } } + + pub(crate) fn metasrv(&self) -> &Arc { + self.instance.as_ref().unwrap().metasrv() + } } #[async_trait::async_trait] impl RebuildableMockInstance for TestContext { async fn rebuild(&mut self) { + info!("Rebuilding the instance"); let instance = self.builder.rebuild(self.instance.take().unwrap()).await; self.instance = Some(instance); } @@ -387,3 +420,58 @@ pub fn find_testing_resource(path: &str) -> String { prepare_path(&p) } + +pub async fn execute_sql(instance: &Arc, sql: &str) -> Output { + execute_sql_with(instance, sql, QueryContext::arc()).await +} + +pub async fn try_execute_sql(instance: &Arc, sql: &str) -> Result { + try_execute_sql_with(instance, sql, QueryContext::arc()).await +} + +pub async fn try_execute_sql_with( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> Result { + instance.do_query(sql, query_ctx).await.remove(0) +} + +pub async fn execute_sql_with( + instance: &Arc, + sql: &str, + query_ctx: QueryContextRef, +) -> Output { + try_execute_sql_with(instance, sql, query_ctx) + .await + .unwrap_or_else(|e| panic!("Failed to execute sql: {sql}, error: {e:?}")) +} + +/// Dump the kv backend to a vector of key-value pairs. +pub async fn dump_kvbackend(kv_backend: &KvBackendRef) -> Vec<(Vec, Vec)> { + let req = RangeRequest::new().with_range(vec![0], vec![0]); + let stream = PaginationStream::new(kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| { + Ok((kv.key, kv.value)) + }) + .into_stream(); + stream.try_collect::>().await.unwrap() +} + +/// Clear the kv backend and restore the key-value pairs. +pub async fn restore_kvbackend(kv_backend: &KvBackendRef, keyvalues: Vec<(Vec, Vec)>) { + // Clear the kv backend before restoring. + let req = DeleteRangeRequest::new().with_range(vec![0], vec![0]); + kv_backend.delete_range(req).await.unwrap(); + + let mut req = BatchPutRequest::default(); + for (key, value) in keyvalues { + req.kvs.push(KeyValue { key, value }); + } + kv_backend.batch_put(req).await.unwrap(); +} + +/// Wait for the procedure to complete. +pub async fn wait_procedure(procedure_manager: &ProcedureManagerRef, procedure_id: ProcedureId) { + let mut watcher = procedure_manager.procedure_watcher(procedure_id).unwrap(); + watcher::wait(&mut watcher).await.unwrap(); +}