From 981d51785b3f8d3ce2870eb4aebf657f2712fd77 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 7 Apr 2025 21:51:00 +0800 Subject: [PATCH] fix: throw errors instead of ignoring (#5792) * fix: throw errors instead of ignoring * fix: fix unit tests * refactor: remove schema version check * fix: fix clippy * chore: remove unused error * refactor: remove schema version check * feat: handle mutliple results * feat: introduce consistency guard * fix: release consistency guard on datanode operation completion * test: add tests * chore: remove schema version * refactor: rename * test: add more tests * chore: print all error * tests: query table after alteration * log ignored request * refine fuzz test * chore: fix clippy and log mailbox message * chore: close prepared statement after execution * chore: add comment * chore: remove log * chore: rename to `ConsistencyPoison` * chore: remove unused error * fix: fix unit tests * chore: apply suggestions from CR --- src/common/meta/src/ddl/alter_table.rs | 102 +++++-- src/common/meta/src/ddl/create_table.rs | 4 +- src/common/meta/src/ddl/drop_database.rs | 4 +- src/common/meta/src/ddl/drop_table.rs | 4 +- .../src/ddl/test_util/datanode_handler.rs | 71 +++++ src/common/meta/src/ddl/tests/alter_table.rs | 268 ++++++++++++++---- src/common/meta/src/ddl/utils.rs | 82 ++++++ src/common/meta/src/error.rs | 31 ++ src/common/meta/src/lib.rs | 1 + src/common/meta/src/poison_key.rs | 22 ++ src/common/procedure-test/src/lib.rs | 5 + src/common/procedure/src/error.rs | 24 +- src/common/procedure/src/lib.rs | 4 +- src/common/procedure/src/local/runner.rs | 110 ++++++- src/common/procedure/src/procedure.rs | 8 + .../handler/collect_leader_region_handler.rs | 2 - src/metric-engine/src/data_region.rs | 2 - src/metric-engine/src/engine/alter.rs | 2 - src/metric-engine/src/test_util.rs | 1 - src/mito2/src/engine/alter_test.rs | 10 +- src/mito2/src/engine/compaction_test.rs | 1 - src/mito2/src/engine/sync_test.rs | 1 - src/mito2/src/error.rs | 10 - src/mito2/src/worker/handle_alter.rs | 26 +- src/store-api/src/region_request.rs | 49 +--- tests-fuzz/targets/ddl/fuzz_alter_table.rs | 9 + .../common/alter/alter_table.result | 43 +++ .../standalone/common/alter/alter_table.sql | 21 ++ 28 files changed, 738 insertions(+), 179 deletions(-) create mode 100644 src/common/meta/src/poison_key.rs diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index e18d532292..89406e6a96 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -22,30 +22,31 @@ use std::vec; use api::v1::alter_table_expr::Kind; use api::v1::RenameTable; use async_trait::async_trait; -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; +use common_error::ext::BoxedError; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{ - Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey, + Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey, + PoisonKeys, Procedure, ProcedureId, Status, StringKey, }; use common_telemetry::{debug, error, info}; use futures::future; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use store_api::storage::RegionId; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId, TableInfo}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; -use crate::ddl::utils::add_peer_context_if_needed; +use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults}; use crate::ddl::DdlContext; -use crate::error::{Error, Result}; +use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result}; use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::{DeserializedValueWithBytes, RegionDistribution}; use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock}; use crate::metrics; +use crate::poison_key::table_poison_key; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution}; @@ -104,7 +105,27 @@ impl AlterTableProcedure { Ok(Status::executing(true)) } - pub async fn submit_alter_region_requests(&mut self) -> Result { + fn table_poison_key(&self) -> PoisonKey { + table_poison_key(self.data.table_id()) + } + + async fn put_poison( + &self, + ctx_provider: &dyn ContextProvider, + procedure_id: ProcedureId, + ) -> Result<()> { + let poison_key = self.table_poison_key(); + ctx_provider + .try_put_poison(&poison_key, procedure_id) + .await + .context(PutPoisonSnafu) + } + + pub async fn submit_alter_region_requests( + &mut self, + procedure_id: ProcedureId, + ctx_provider: &dyn ContextProvider, + ) -> Result { let table_id = self.data.table_id(); let (_, physical_table_route) = self .context @@ -127,6 +148,9 @@ impl AlterTableProcedure { alter_kind, ); + ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id }); + // Puts the poison before submitting alter region requests to datanodes. + self.put_poison(ctx_provider, procedure_id).await?; for datanode in leaders { let requester = self.context.node_manager.datanode(&datanode).await; let regions = find_leader_regions(&physical_table_route.region_routes, &datanode); @@ -140,28 +164,51 @@ impl AlterTableProcedure { let requester = requester.clone(); alter_region_tasks.push(async move { - if let Err(err) = requester.handle(request).await { - if err.status_code() != StatusCode::RequestOutdated { - // Treat request outdated as success. - // The engine will throw this code when the schema version not match. - // As this procedure has locked the table, the only reason for this error - // is procedure is succeeded before and is retrying. - return Err(add_peer_context_if_needed(datanode)(err)); - } - } - Ok(()) + requester + .handle(request) + .await + .map_err(add_peer_context_if_needed(datanode)) }); } } - future::join_all(alter_region_tasks) + let results = future::join_all(alter_region_tasks) .await .into_iter() - .collect::>>()?; + .collect::>(); - self.data.state = AlterTableState::UpdateMetadata; + match handle_multiple_results(results) { + MultipleResults::PartialRetryable(error) => { + // Just returns the error, and wait for the next try. + Err(error) + } + MultipleResults::PartialNonRetryable(error) => { + error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id()); + // No retry will be done. + Ok(Status::poisoned( + Some(self.table_poison_key()), + ProcedureError::external(error), + )) + } + MultipleResults::AllRetryable(error) => { + // Just returns the error, and wait for the next try. + Err(error) + } + MultipleResults::Ok => { + self.data.state = AlterTableState::UpdateMetadata; + Ok(Status::executing_with_clean_poisons(true)) + } + MultipleResults::AllNonRetryable(error) => { + error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id()); + // It assumes the metadata on datanode is not changed. + // Case: The alter region request is sent but not applied. (e.g., InvalidArgument) - Ok(Status::executing(true)) + let err = BoxedError::new(error); + Err(err).context(AbortProcedureSnafu { + clean_poisons: true, + }) + } + } } /// Update table metadata. @@ -250,10 +297,12 @@ impl Procedure for AlterTableProcedure { Self::TYPE_NAME } - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { let error_handler = |e: Error| { if e.is_retry_later() { ProcedureError::retry_later(e) + } else if e.need_clean_poisons() { + ProcedureError::external_and_clean_poisons(e) } else { ProcedureError::external(e) } @@ -269,7 +318,10 @@ impl Procedure for AlterTableProcedure { match state { AlterTableState::Prepare => self.on_prepare().await, - AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await, + AlterTableState::SubmitAlterRegionRequests => { + self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref()) + .await + } AlterTableState::UpdateMetadata => self.on_update_metadata().await, AlterTableState::InvalidateTableCache => self.on_broadcast().await, } @@ -285,6 +337,10 @@ impl Procedure for AlterTableProcedure { LockKey::new(key) } + + fn poison_keys(&self) -> PoisonKeys { + PoisonKeys::new(vec![self.table_poison_key()]) + } } #[derive(Debug, Serialize, Deserialize, AsRefStr)] diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 3bd97827df..bbd169658b 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -299,7 +299,9 @@ impl Procedure for CreateTableProcedure { .creator .register_opening_regions(&self.context, &x.region_routes) .map_err(BoxedError::new) - .context(ExternalSnafu)?; + .context(ExternalSnafu { + clean_poisons: false, + })?; } Ok(()) diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index ce62b7d0c3..4aa4a1e362 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -130,7 +130,9 @@ impl Procedure for DropDatabaseProcedure { self.state .recover(&self.runtime_context) .map_err(BoxedError::new) - .context(ExternalSnafu) + .context(ExternalSnafu { + clean_poisons: false, + }) } async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index e68cae3382..d661744ce3 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -200,7 +200,9 @@ impl Procedure for DropTableProcedure { if register_operating_regions { self.register_dropping_regions() .map_err(BoxedError::new) - .context(ExternalSnafu)?; + .context(ExternalSnafu { + clean_poisons: false, + })?; } Ok(()) diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index b82609b985..7f02d9cc5a 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -171,3 +171,74 @@ impl MockDatanodeHandler for NaiveDatanodeHandler { unreachable!() } } + +#[derive(Clone)] +pub struct PartialSuccessDatanodeHandler { + pub retryable: bool, +} + +#[async_trait::async_trait] +impl MockDatanodeHandler for PartialSuccessDatanodeHandler { + async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result { + let success = peer.id % 2 == 0; + if success { + Ok(RegionResponse::new(0)) + } else if self.retryable { + Err(Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + }) + } else { + error::UnexpectedSnafu { + err_msg: "mock error", + } + .fail() + } + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} + +#[derive(Clone)] +pub struct AllFailureDatanodeHandler { + pub retryable: bool, +} + +#[async_trait::async_trait] +impl MockDatanodeHandler for AllFailureDatanodeHandler { + async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result { + if self.retryable { + Err(Error::RetryLater { + source: BoxedError::new( + error::UnexpectedSnafu { + err_msg: "retry later", + } + .build(), + ), + }) + } else { + error::UnexpectedSnafu { + err_msg: "mock error", + } + .fail() + } + } + + async fn handle_query( + &self, + _peer: &Peer, + _request: QueryRequest, + ) -> Result { + unreachable!() + } +} diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index f3abfab91a..c8d0450b90 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -25,6 +25,9 @@ use api::v1::{ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_procedure::store::poison_store::PoisonStore; +use common_procedure::{ProcedureId, Status}; +use common_procedure_test::MockContextProvider; use store_api::storage::RegionId; use table::requests::TTL_KEY; use tokio::sync::mpsc::{self}; @@ -33,16 +36,46 @@ use crate::ddl::alter_table::AlterTableProcedure; use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::datanode_handler::{ - DatanodeWatcher, RequestOutdatedErrorDatanodeHandler, + AllFailureDatanodeHandler, DatanodeWatcher, PartialSuccessDatanodeHandler, + RequestOutdatedErrorDatanodeHandler, }; +use crate::error::Error; use crate::key::datanode_table::DatanodeTableKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; +use crate::node_manager::NodeManagerRef; use crate::peer::Peer; +use crate::poison_key::table_poison_key; use crate::rpc::ddl::AlterTableTask; use crate::rpc::router::{Region, RegionRoute}; use crate::test_util::{new_ddl_context, MockDatanodeManager}; +fn prepare_table_route(table_id: u32) -> TableRouteValue { + TableRouteValue::physical(vec![ + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 1)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(5)], + leader_state: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 2)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![Peer::empty(4)], + leader_state: None, + leader_down_since: None, + }, + RegionRoute { + region: Region::new_test(RegionId::new(table_id, 3)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + ]) +} + fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> AlterTableTask { let builder = TestAlterTableExprBuilder::default() .table_name(table_name) @@ -101,29 +134,7 @@ async fn test_on_submit_alter_request() { .table_metadata_manager .create_table_metadata( task.table_info.clone(), - TableRouteValue::physical(vec![ - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5)], - leader_state: None, - leader_down_since: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 2)), - leader_peer: Some(Peer::empty(2)), - follower_peers: vec![Peer::empty(4)], - leader_state: None, - leader_down_since: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 3)), - leader_peer: Some(Peer::empty(3)), - follower_peers: vec![], - leader_state: None, - leader_down_since: None, - }, - ]), + prepare_table_route(table_id), HashMap::new(), ) .await @@ -141,9 +152,15 @@ async fn test_on_submit_alter_request() { })), }, }; - let mut procedure = AlterTableProcedure::new(table_id, alter_table_task, ddl_context).unwrap(); + let procedure_id = ProcedureId::random(); + let provider = Arc::new(MockContextProvider::default()); + let mut procedure = + AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap(); procedure.on_prepare().await.unwrap(); - procedure.submit_alter_region_requests().await.unwrap(); + procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap(); let check = |peer: Peer, request: RegionRequest, @@ -185,29 +202,7 @@ async fn test_on_submit_alter_request_with_outdated_request() { .table_metadata_manager .create_table_metadata( task.table_info.clone(), - TableRouteValue::physical(vec![ - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 1)), - leader_peer: Some(Peer::empty(1)), - follower_peers: vec![Peer::empty(5)], - leader_state: None, - leader_down_since: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 2)), - leader_peer: Some(Peer::empty(2)), - follower_peers: vec![Peer::empty(4)], - leader_state: None, - leader_down_since: None, - }, - RegionRoute { - region: Region::new_test(RegionId::new(table_id, 3)), - leader_peer: Some(Peer::empty(3)), - follower_peers: vec![], - leader_state: None, - leader_down_since: None, - }, - ]), + prepare_table_route(table_id), HashMap::new(), ) .await @@ -225,9 +220,15 @@ async fn test_on_submit_alter_request_with_outdated_request() { })), }, }; + let procedure_id = ProcedureId::random(); + let provider = Arc::new(MockContextProvider::default()); let mut procedure = AlterTableProcedure::new(table_id, alter_table_task, ddl_context).unwrap(); procedure.on_prepare().await.unwrap(); - procedure.submit_alter_region_requests().await.unwrap(); + let err = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert!(!err.is_retry_later()); } #[tokio::test] @@ -326,9 +327,14 @@ async fn test_on_update_metadata_add_columns() { })), }, }; + let procedure_id = ProcedureId::random(); + let provider = Arc::new(MockContextProvider::default()); let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context.clone()).unwrap(); procedure.on_prepare().await.unwrap(); - procedure.submit_alter_region_requests().await.unwrap(); + procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap(); procedure.on_update_metadata().await.unwrap(); let table_info = ddl_context @@ -387,9 +393,14 @@ async fn test_on_update_table_options() { })), }, }; + let procedure_id = ProcedureId::random(); + let provider = Arc::new(MockContextProvider::default()); let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context.clone()).unwrap(); procedure.on_prepare().await.unwrap(); - procedure.submit_alter_region_requests().await.unwrap(); + procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap(); procedure.on_update_metadata().await.unwrap(); let table_info = ddl_context @@ -417,3 +428,156 @@ async fn test_on_update_table_options() { HashMap::from(&table_info.meta.options) ); } + +async fn prepare_alter_table_procedure( + node_manager: NodeManagerRef, +) -> (AlterTableProcedure, ProcedureId) { + common_telemetry::init_default_ut_logging(); + let ddl_context = new_ddl_context(node_manager); + let table_id = 1024; + let table_name = "foo"; + let task = test_create_table_task(table_name, table_id); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + prepare_table_route(table_id), + HashMap::new(), + ) + .await + .unwrap(); + + let alter_table_task = AlterTableTask { + alter_table: AlterTableExpr { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + kind: Some(Kind::DropColumns(DropColumns { + drop_columns: vec![DropColumn { + name: "cpu".to_string(), + }], + })), + }, + }; + let procedure_id = ProcedureId::random(); + let mut procedure = + AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap(); + procedure.on_prepare().await.unwrap(); + (procedure, procedure_id) +} + +#[tokio::test] +async fn test_on_submit_alter_request_with_partial_success_retryable() { + let node_manager = Arc::new(MockDatanodeManager::new(PartialSuccessDatanodeHandler { + retryable: true, + })); + let provider = Arc::new(MockContextProvider::default()); + let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await; + let result = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert!(result.is_retry_later()); + + // Submits again + let result = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert!(result.is_retry_later()); +} + +#[tokio::test] +async fn test_on_submit_alter_request_with_partial_success_non_retryable() { + let node_manager = Arc::new(MockDatanodeManager::new(PartialSuccessDatanodeHandler { + retryable: false, + })); + let provider = Arc::new(MockContextProvider::default()); + let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await; + let result = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap(); + assert_matches!(result, Status::Poisoned { .. }); + + // submits again + let result = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap(); + assert_matches!(result, Status::Poisoned { .. }); +} + +#[tokio::test] +async fn test_on_submit_alter_request_with_all_failure_retrybale() { + common_telemetry::init_default_ut_logging(); + let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler { + retryable: true, + })); + let provider = Arc::new(MockContextProvider::default()); + let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await; + let err = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert!(err.is_retry_later()); + // submits again + let err = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert!(err.is_retry_later()); +} + +#[tokio::test] +async fn test_on_submit_alter_request_with_all_failure_non_retrybale() { + common_telemetry::init_default_ut_logging(); + let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler { + retryable: false, + })); + let provider = Arc::new(MockContextProvider::default()); + let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await; + let err = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert_matches!(err, Error::AbortProcedure { .. }); + assert!(!err.is_retry_later()); + assert!(err.need_clean_poisons()); + + // submits again + let err = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert_matches!(err, Error::AbortProcedure { .. }); + assert!(!err.is_retry_later()); + assert!(err.need_clean_poisons()); +} + +#[tokio::test] +async fn test_on_submit_alter_request_with_exist_poison() { + common_telemetry::init_default_ut_logging(); + let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler { + retryable: false, + })); + let provider = Arc::new(MockContextProvider::default()); + let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await; + + let table_id = 1024; + let key = table_poison_key(table_id).to_string(); + let another_procedure_id = ProcedureId::random(); + provider + .poison_manager() + .try_put_poison(key, another_procedure_id.to_string()) + .await + .unwrap(); + + procedure.on_prepare().await.unwrap(); + let err = procedure + .submit_alter_region_requests(procedure_id, provider.as_ref()) + .await + .unwrap_err(); + assert_matches!(err, Error::PutPoison { .. }); +} diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index a0973855f6..2e909946e0 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -13,10 +13,12 @@ // limitations under the License. use std::collections::HashMap; +use std::fmt::Debug; use common_catalog::consts::METRIC_ENGINE; use common_error::ext::BoxedError; use common_procedure::error::Error as ProcedureError; +use common_telemetry::{error, warn}; use common_wal::options::WalOptions; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; @@ -38,6 +40,7 @@ use crate::rpc::router::RegionRoute; /// Adds [Peer] context if the error is unretryable. pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error { move |err| { + error!(err; "Failed to operate datanode, peer: {}", datanode); if !err.is_retry_later() { return Err::<(), BoxedError>(BoxedError::new(err)) .context(OperateDatanodeSnafu { peer: datanode }) @@ -182,6 +185,85 @@ pub fn extract_region_wal_options( Ok(region_wal_options) } +/// The result of multiple operations. +/// +/// - Ok: all operations are successful. +/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable. +/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable. +/// - AllRetryable: all operations are retryable. +/// - AllNonRetryable: all operations are not retryable. +pub enum MultipleResults { + Ok, + PartialRetryable(Error), + PartialNonRetryable(Error), + AllRetryable(Error), + AllNonRetryable(Error), +} + +/// Handles the results of alter region requests. +/// +/// For partial success, we need to check if the errors are retryable. +/// If all the errors are retryable, we return a retryable error. +/// Otherwise, we return the first error. +pub fn handle_multiple_results(results: Vec>) -> MultipleResults { + if results.is_empty() { + return MultipleResults::Ok; + } + let num_results = results.len(); + let mut retryable_results = Vec::new(); + let mut non_retryable_results = Vec::new(); + let mut ok_results = Vec::new(); + + for result in results { + match result { + Ok(_) => ok_results.push(result), + Err(err) => { + if err.is_retry_later() { + retryable_results.push(err); + } else { + non_retryable_results.push(err); + } + } + } + } + + common_telemetry::debug!( + "retryable_results: {}, non_retryable_results: {}, ok_results: {}", + retryable_results.len(), + non_retryable_results.len(), + ok_results.len() + ); + + if retryable_results.len() == num_results { + return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap()); + } else if non_retryable_results.len() == num_results { + warn!("all non retryable results: {}", non_retryable_results.len()); + for err in &non_retryable_results { + error!(err; "non retryable error"); + } + return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap()); + } else if ok_results.len() == num_results { + return MultipleResults::Ok; + } else if !retryable_results.is_empty() + && !ok_results.is_empty() + && non_retryable_results.is_empty() + { + return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap()); + } + + warn!( + "partial non retryable results: {}, retryable results: {}, ok results: {}", + non_retryable_results.len(), + retryable_results.len(), + ok_results.len() + ); + for err in &non_retryable_results { + error!(err; "non retryable error"); + } + // non_retryable_results.len() > 0 + MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 56ef212cec..1bdb3d0857 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -449,6 +449,14 @@ pub enum Error { #[snafu(display("Retry later"))] RetryLater { source: BoxedError }, + #[snafu(display("Abort procedure"))] + AbortProcedure { + #[snafu(implicit)] + location: Location, + source: BoxedError, + clean_poisons: bool, + }, + #[snafu(display( "Failed to encode a wal options to json string, wal_options: {:?}", wal_options @@ -749,6 +757,13 @@ pub enum Error { error: serde_json::Error, }, + #[snafu(display("No leader found for table_id: {}", table_id))] + NoLeader { + table_id: TableId, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Procedure poison key already exists with a different value, key: {}, value: {}", key, @@ -760,6 +775,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to put poison, table metadata may be corrupted"))] + PutPoison { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: common_procedure::error::Error, + }, } pub type Result = std::result::Result; @@ -778,6 +801,7 @@ impl ErrorExt for Error { | SerializeToJson { .. } | DeserializeFromJson { .. } => StatusCode::Internal, + NoLeader { .. } => StatusCode::TableUnavailable, ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected, Unsupported { .. } => StatusCode::Unsupported, @@ -849,7 +873,9 @@ impl ErrorExt for Error { OperateDatanode { source, .. } => source.status_code(), Table { source, .. } => source.status_code(), RetryLater { source, .. } => source.status_code(), + AbortProcedure { source, .. } => source.status_code(), ConvertAlterTableRequest { source, .. } => source.status_code(), + PutPoison { source, .. } => source.status_code(), ParseProcedureId { .. } | InvalidNumTopics { .. } @@ -920,6 +946,11 @@ impl Error { matches!(self, Error::RetryLater { .. }) } + /// Determine whether it needs to clean poisons. + pub fn need_clean_poisons(&self) -> bool { + matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons) + } + /// Returns true if the response exceeds the size limit. pub fn is_exceeded_size_limit(&self) -> bool { match self { diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 96d5326d13..b1cc18d5e4 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -37,6 +37,7 @@ pub mod metrics; pub mod node_expiry_listener; pub mod node_manager; pub mod peer; +pub mod poison_key; pub mod range_stream; pub mod region_keeper; pub mod region_registry; diff --git a/src/common/meta/src/poison_key.rs b/src/common/meta/src/poison_key.rs new file mode 100644 index 0000000000..43d614d2c1 --- /dev/null +++ b/src/common/meta/src/poison_key.rs @@ -0,0 +1,22 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_procedure::PoisonKey; +use store_api::storage::TableId; + +/// Returns the poison key for the table. +pub fn table_poison_key(table_id: TableId) -> PoisonKey { + let key = format!("table/{}", table_id); + PoisonKey::new(&key) +} diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index ce1960c778..4a5a2b85cb 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -40,6 +40,11 @@ impl MockContextProvider { poison_manager: InMemoryPoisonStore::default(), } } + + /// Returns a reference to the poison manager. + pub fn poison_manager(&self) -> &InMemoryPoisonStore { + &self.poison_manager + } } #[async_trait] diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 9eea4dd0e7..109b878d70 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -28,8 +28,14 @@ use crate::PoisonKey; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Failed to execute procedure due to external error"))] - External { source: BoxedError }, + #[snafu(display( + "Failed to execute procedure due to external error, clean poisons: {}", + clean_poisons + ))] + External { + source: BoxedError, + clean_poisons: bool, + }, #[snafu(display("Loader {} is already registered", name))] LoaderConflict { @@ -276,6 +282,15 @@ impl Error { pub fn external(err: E) -> Error { Error::External { source: BoxedError::new(err), + clean_poisons: false, + } + } + + /// Creates a new [Error::External] error from source `err` and clean poisons. + pub fn external_and_clean_poisons(err: E) -> Error { + Error::External { + source: BoxedError::new(err), + clean_poisons: true, } } @@ -291,6 +306,11 @@ impl Error { matches!(self, Error::RetryLater { .. }) } + /// Determine whether it needs to clean poisons. + pub fn need_clean_poisons(&self) -> bool { + matches!(self, Error::External { clean_poisons, .. } if *clean_poisons) + } + /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according /// to its [StatusCode]. pub fn from_error_ext(err: E) -> Self { diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index e737366926..69b4f2fa5c 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -29,7 +29,7 @@ pub mod test_util; pub use crate::error::{Error, Result}; pub use crate::procedure::{ BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError, - PoisonKey, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, - ProcedureState, ProcedureWithId, Status, StringKey, + PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, + ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index cf322f0967..441ef2d94d 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -372,6 +372,14 @@ impl Runner { return; } + if e.need_clean_poisons() { + if let Err(e) = self.clean_poisons().await { + error!(e; "Failed to clean poison for procedure: {}", self.meta.id); + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return; + } + } + if e.is_retry_later() { self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return; @@ -1361,12 +1369,9 @@ mod tests { .await .unwrap(); - Ok(Status::executing_with_clean_poisons(true)) + Ok(Status::executing(true)) } else { - Ok(Status::Poisoned { - keys: PoisonKeys::new(vec![poison_key.clone()]), - error: Error::external(MockError::new(StatusCode::Unexpected)), - }) + Ok(Status::executing_with_clean_poisons(true)) } } .boxed() @@ -1404,6 +1409,101 @@ mod tests { let state = runner.meta.state(); assert!(state.is_running(), "{state:?}"); + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap(); + // poison key should be exist. + assert!(procedure_id.is_some()); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap(); + // poison key should be deleted. + assert!(procedure_id.is_none()); + } + + #[tokio::test] + async fn test_execute_error_with_clean_poisons() { + common_telemetry::init_default_ut_logging(); + let mut times = 0; + let poison_key = PoisonKey::new("table/1024"); + let moved_poison_key = poison_key.clone(); + let exec_fn = move |ctx: Context| { + times += 1; + let poison_key = moved_poison_key.clone(); + async move { + if times == 1 { + // Put the poison to the context. + ctx.provider + .try_put_poison(&poison_key, ctx.procedure_id) + .await + .unwrap(); + + Ok(Status::executing(true)) + } else { + Err(Error::external_and_clean_poisons(MockError::new( + StatusCode::Unexpected, + ))) + } + } + .boxed() + }; + let poison = ProcedureAdapter { + data: "poison".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::new(vec![poison_key.clone()]), + exec_fn, + rollback_fn: None, + }; + + let dir = create_temp_dir("error_with_clean_poisons"); + let meta = poison.new_meta(ROOT_ID); + + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone()); + + // Use the manager ctx as the context provider. + let ctx = context_with_provider( + meta.id, + runner.manager_ctx.clone() as Arc, + ); + // Manually add this procedure to the manager ctx. + runner + .manager_ctx + .procedures + .write() + .unwrap() + .insert(meta.id, runner.meta.clone()); + + runner.manager_ctx.start(); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap(); + // poison key should be exist. + assert!(procedure_id.is_some()); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_prepare_rollback(), "{state:?}"); + let procedure_id = runner .manager_ctx .poison_manager diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 03a855c6dd..0d3e7509ad 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -57,6 +57,14 @@ pub enum Status { } impl Status { + /// Returns a [Status::Poisoned] with given `keys` and `error`. + pub fn poisoned(keys: impl IntoIterator, error: Error) -> Status { + Status::Poisoned { + keys: PoisonKeys::new(keys), + error, + } + } + /// Returns a [Status::Executing] with given `persist` flag. pub fn executing(persist: bool) -> Status { Status::Executing { diff --git a/src/meta-srv/src/handler/collect_leader_region_handler.rs b/src/meta-srv/src/handler/collect_leader_region_handler.rs index 9021fe32eb..fd5fab3639 100644 --- a/src/meta-srv/src/handler/collect_leader_region_handler.rs +++ b/src/meta-srv/src/handler/collect_leader_region_handler.rs @@ -14,7 +14,6 @@ use api::v1::meta::{HeartbeatRequest, Role}; use common_meta::region_registry::LeaderRegion; -use common_telemetry::info; use store_api::region_engine::RegionRole; use crate::error::Result; @@ -52,7 +51,6 @@ impl HeartbeatHandler for CollectLeaderRegionHandler { }; key_values.push((stat.id, value)); } - info!("collect leader region: {:?}", key_values); ctx.leader_region_registry.batch_put(key_values); Ok(HandleControl::Continue) diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs index 211ae007fd..a491b22063 100644 --- a/src/metric-engine/src/data_region.rs +++ b/src/metric-engine/src/data_region.rs @@ -103,7 +103,6 @@ impl DataRegion { .get_metadata(region_id) .await .context(MitoReadOperationSnafu)?; - let version = region_metadata.schema_version; // find the max column id let new_column_id_start = 1 + region_metadata @@ -166,7 +165,6 @@ impl DataRegion { debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}"); // assemble alter request let alter_request = RegionRequest::Alter(RegionAlterRequest { - schema_version: version, kind: AlterKind::AddColumns { columns: new_columns, }, diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index 35bc7ce097..0b23a80bfd 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -234,7 +234,6 @@ mod test { // alter physical region let physical_region_id = env.default_physical_region_id(); let request = RegionAlterRequest { - schema_version: 0, kind: AlterKind::AddColumns { columns: vec![AddColumn { column_metadata: ColumnMetadata { @@ -262,7 +261,6 @@ mod test { // alter physical region's option should work let alter_region_option_request = RegionAlterRequest { - schema_version: 0, kind: AlterKind::SetRegionOptions { options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))], }, diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index 5e081982c8..284834a029 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -177,7 +177,6 @@ pub fn alter_logical_region_add_tag_columns( }); } RegionAlterRequest { - schema_version: 0, kind: AlterKind::AddColumns { columns: new_columns, }, diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 1f0de1734a..6dae283061 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -19,7 +20,6 @@ use std::time::Duration; use api::v1::value::ValueData; use api::v1::{ColumnDataType, Row, Rows, SemanticType}; use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions}; @@ -34,6 +34,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener}; use crate::engine::MitoEngine; +use crate::error; use crate::test_util::{ build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv, @@ -51,7 +52,6 @@ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expect fn add_tag1() -> RegionAlterRequest { RegionAlterRequest { - schema_version: 0, kind: AlterKind::AddColumns { columns: vec![AddColumn { column_metadata: ColumnMetadata { @@ -71,7 +71,6 @@ fn add_tag1() -> RegionAlterRequest { fn alter_column_inverted_index() -> RegionAlterRequest { RegionAlterRequest { - schema_version: 0, kind: AlterKind::SetIndex { options: ApiSetIndexOptions::Inverted { column_name: "tag_0".to_string(), @@ -82,7 +81,6 @@ fn alter_column_inverted_index() -> RegionAlterRequest { fn alter_column_fulltext_options() -> RegionAlterRequest { RegionAlterRequest { - schema_version: 0, kind: AlterKind::SetIndex { options: ApiSetIndexOptions::Fulltext { column_name: "tag_0".to_string(), @@ -358,7 +356,8 @@ async fn test_alter_region_retry() { .handle_request(region_id, RegionRequest::Alter(request)) .await .unwrap_err(); - assert_eq!(err.status_code(), StatusCode::RequestOutdated); + let err = err.as_any().downcast_ref::().unwrap(); + assert_matches!(err, &error::Error::InvalidRegionRequest { .. }); let expected = "\ +-------+-------+---------+---------------------+ @@ -733,7 +732,6 @@ async fn test_alter_region_ttl_options() { .unwrap(); let engine_cloned = engine.clone(); let alter_ttl_request = RegionAlterRequest { - schema_version: 0, kind: AlterKind::SetRegionOptions { options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))], }, diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 5e5cb0de75..1e740b746b 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -535,7 +535,6 @@ async fn test_change_region_compaction_window() { // Change compaction window. let request = RegionRequest::Alter(RegionAlterRequest { - schema_version: region.metadata().schema_version, kind: SetRegionOptions { options: vec![SetRegionOption::Twsc( "compaction.twcs.time_window".to_string(), diff --git a/src/mito2/src/engine/sync_test.rs b/src/mito2/src/engine/sync_test.rs index 3c0120fd4d..c55d476bcf 100644 --- a/src/mito2/src/engine/sync_test.rs +++ b/src/mito2/src/engine/sync_test.rs @@ -35,7 +35,6 @@ use crate::test_util::{ fn add_tag1() -> RegionAlterRequest { RegionAlterRequest { - schema_version: 0, kind: AlterKind::AddColumns { columns: vec![AddColumn { column_metadata: ColumnMetadata { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 5bd183ee99..9c9c78f07e 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -490,14 +490,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Schema version doesn't match. Expect {} but gives {}", expect, actual))] - InvalidRegionRequestSchemaVersion { - expect: u64, - actual: u64, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display( "Region {} is in {:?} state, which does not permit manifest updates.", region_id, @@ -1086,8 +1078,6 @@ impl ErrorExt for Error { | PartitionOutOfRange { .. } | ParseJobId { .. } => StatusCode::InvalidArguments, - InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated, - RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index 6554ff333f..85e1c9144a 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -18,7 +18,8 @@ use std::str::FromStr; use std::sync::Arc; use common_base::readable_size::ReadableSize; -use common_telemetry::{debug, info}; +use common_telemetry::info; +use common_telemetry::tracing::warn; use humantime_serde::re::humantime; use snafu::ResultExt; use store_api::metadata::{ @@ -29,9 +30,7 @@ use store_api::mito_engine_options; use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption}; use store_api::storage::RegionId; -use crate::error::{ - InvalidMetadataSnafu, InvalidRegionRequestSchemaVersionSnafu, InvalidRegionRequestSnafu, Result, -}; +use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result}; use crate::flush::FlushReason; use crate::manifest::action::RegionChange; use crate::region::options::CompactionOptions::Twcs; @@ -83,22 +82,6 @@ impl RegionWorkerLoop { _ => {} } - if version.metadata.schema_version != request.schema_version { - // This is possible if we retry the request. - debug!( - "Ignores alter request, region id:{}, region schema version {} is not equal to request schema version {}", - region_id, version.metadata.schema_version, request.schema_version - ); - // Returns an error. - sender.send( - InvalidRegionRequestSchemaVersionSnafu { - expect: version.metadata.schema_version, - actual: request.schema_version, - } - .fail(), - ); - return; - } // Validate request. if let Err(e) = request.validate(&version.metadata) { // Invalid request. @@ -108,7 +91,7 @@ impl RegionWorkerLoop { // Checks whether we need to alter the region. if !request.need_alter(&version.metadata) { - debug!( + warn!( "Ignores alter request as it alters nothing, region_id: {}, request: {:?}", region_id, request ); @@ -219,7 +202,6 @@ fn metadata_after_alteration( .context(InvalidRegionRequestSnafu)? .bump_version(); let new_meta = builder.build().context(InvalidMetadataSnafu)?; - assert_eq!(request.schema_version + 1, new_meta.schema_version); Ok(Arc::new(new_meta)) } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 1df363da0d..d11963987e 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -436,8 +436,6 @@ pub struct RegionCloseRequest {} /// Alter metadata of a region. #[derive(Debug, PartialEq, Eq, Clone)] pub struct RegionAlterRequest { - /// The version of the schema before applying the alteration. - pub schema_version: u64, /// Kind of alteration to do. pub kind: AlterKind, } @@ -445,17 +443,6 @@ pub struct RegionAlterRequest { impl RegionAlterRequest { /// Checks whether the request is valid, returns an error if it is invalid. pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> { - ensure!( - metadata.schema_version == self.schema_version, - InvalidRegionRequestSnafu { - region_id: metadata.region_id, - err: format!( - "region schema version {} is not equal to request schema version {}", - metadata.schema_version, self.schema_version - ), - } - ); - self.kind.validate(metadata)?; Ok(()) @@ -479,10 +466,7 @@ impl TryFrom for RegionAlterRequest { })?; let kind = AlterKind::try_from(kind)?; - Ok(RegionAlterRequest { - schema_version: value.schema_version, - kind, - }) + Ok(RegionAlterRequest { kind }) } } @@ -1234,7 +1218,6 @@ mod tests { assert_eq!( request, RegionAlterRequest { - schema_version: 1, kind: AlterKind::AddColumns { columns: vec![AddColumn { column_metadata: ColumnMetadata { @@ -1531,21 +1514,6 @@ mod tests { assert!(kind.need_alter(&metadata)); } - #[test] - fn test_validate_schema_version() { - let mut metadata = new_metadata(); - metadata.schema_version = 2; - - RegionAlterRequest { - schema_version: 1, - kind: AlterKind::DropColumns { - names: vec!["field_0".to_string()], - }, - } - .validate(&metadata) - .unwrap_err(); - } - #[test] fn test_validate_add_columns() { let kind = AlterKind::AddColumns { @@ -1576,10 +1544,7 @@ mod tests { }, ], }; - let request = RegionAlterRequest { - schema_version: 1, - kind, - }; + let request = RegionAlterRequest { kind }; let mut metadata = new_metadata(); metadata.schema_version = 1; request.validate(&metadata).unwrap(); @@ -1640,10 +1605,7 @@ mod tests { }, }, }; - let request = RegionAlterRequest { - schema_version: 1, - kind, - }; + let request = RegionAlterRequest { kind }; let mut metadata = new_metadata(); metadata.schema_version = 1; request.validate(&metadata).unwrap(); @@ -1653,10 +1615,7 @@ mod tests { column_name: "tag_0".to_string(), }, }; - let request = RegionAlterRequest { - schema_version: 1, - kind, - }; + let request = RegionAlterRequest { kind }; let mut metadata = new_metadata(); metadata.schema_version = 1; request.validate(&metadata).unwrap(); diff --git a/tests-fuzz/targets/ddl/fuzz_alter_table.rs b/tests-fuzz/targets/ddl/fuzz_alter_table.rs index 4e4f336f26..adff64827c 100644 --- a/tests-fuzz/targets/ddl/fuzz_alter_table.rs +++ b/tests-fuzz/targets/ddl/fuzz_alter_table.rs @@ -223,6 +223,15 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> { assert_eq!(a, b); } }); + + // select from table to make sure the table is still ok + let sql = format!("SELECT * FROM {}", table_ctx.name); + let result = sqlx::query(&sql) + .persistent(false) + .execute(&ctx.greptime) + .await + .context(error::ExecuteQuerySnafu { sql: &sql })?; + info!("Select from table: {sql}, result: {result:?}"); } // Cleans up diff --git a/tests/cases/standalone/common/alter/alter_table.result b/tests/cases/standalone/common/alter/alter_table.result index a8eb2ae3fd..8fefc9f2c3 100644 --- a/tests/cases/standalone/common/alter/alter_table.result +++ b/tests/cases/standalone/common/alter/alter_table.result @@ -192,3 +192,46 @@ DROP TABLE phy; Affected Rows: 0 +CREATE TABLE grpc_latencies ( + ts TIMESTAMP TIME INDEX, + host STRING, + method_name STRING, + latency DOUBLE, + PRIMARY KEY (host, method_name) +) with('append_mode'='true'); + +Affected Rows: 0 + +INSERT INTO grpc_latencies (ts, host, method_name, latency) VALUES + ('2024-07-11 20:00:06', 'host1', 'GetUser', 103.0); + +Affected Rows: 1 + +SELECT * FROM grpc_latencies; + ++---------------------+-------+-------------+---------+ +| ts | host | method_name | latency | ++---------------------+-------+-------------+---------+ +| 2024-07-11T20:00:06 | host1 | GetUser | 103.0 | ++---------------------+-------+-------------+---------+ + +ALTER TABLE grpc_latencies SET ttl = '10d'; + +Affected Rows: 0 + +ALTER TABLE grpc_latencies ADD COLUMN home INTEGER FIRST; + +Affected Rows: 0 + +SELECT * FROM grpc_latencies; + ++------+---------------------+-------+-------------+---------+ +| home | ts | host | method_name | latency | ++------+---------------------+-------+-------------+---------+ +| | 2024-07-11T20:00:06 | host1 | GetUser | 103.0 | ++------+---------------------+-------+-------------+---------+ + +DROP TABLE grpc_latencies; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_table.sql b/tests/cases/standalone/common/alter/alter_table.sql index ddb662fa80..7f65a99f65 100644 --- a/tests/cases/standalone/common/alter/alter_table.sql +++ b/tests/cases/standalone/common/alter/alter_table.sql @@ -95,3 +95,24 @@ DROP TABLE t1; DROP TABLE t2; DROP TABLE phy; + +CREATE TABLE grpc_latencies ( + ts TIMESTAMP TIME INDEX, + host STRING, + method_name STRING, + latency DOUBLE, + PRIMARY KEY (host, method_name) +) with('append_mode'='true'); + +INSERT INTO grpc_latencies (ts, host, method_name, latency) VALUES + ('2024-07-11 20:00:06', 'host1', 'GetUser', 103.0); + +SELECT * FROM grpc_latencies; + +ALTER TABLE grpc_latencies SET ttl = '10d'; + +ALTER TABLE grpc_latencies ADD COLUMN home INTEGER FIRST; + +SELECT * FROM grpc_latencies; + +DROP TABLE grpc_latencies; \ No newline at end of file