fix: throw errors instead of ignoring (#5792)

* fix: throw errors instead of ignoring

* fix: fix unit tests

* refactor: remove schema version check

* fix: fix clippy

* chore: remove unused error

* refactor: remove schema version check

* feat: handle mutliple results

* feat: introduce consistency guard

* fix: release consistency guard on datanode operation completion

* test: add tests

* chore: remove schema version

* refactor: rename

* test: add more tests

* chore: print all error

* tests: query table after alteration

* log ignored request

* refine fuzz test

* chore: fix clippy and log mailbox message

* chore: close prepared statement after execution

* chore: add comment

* chore: remove log

* chore: rename to `ConsistencyPoison`

* chore: remove unused error

* fix: fix unit tests

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-07 21:51:00 +08:00
committed by GitHub
parent cf1eda28aa
commit 981d51785b
28 changed files with 738 additions and 179 deletions

View File

@@ -22,30 +22,31 @@ use std::vec;
use api::v1::alter_table_expr::Kind;
use api::v1::RenameTable;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_error::ext::BoxedError;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status, StringKey,
Context as ProcedureContext, ContextProvider, Error as ProcedureError, LockKey, PoisonKey,
PoisonKeys, Procedure, ProcedureId, Status, StringKey,
};
use common_telemetry::{debug, error, info};
use futures::future;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::metadata::{RawTableInfo, TableId, TableInfo};
use table::table_reference::TableReference;
use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::utils::{add_peer_context_if_needed, handle_multiple_results, MultipleResults};
use crate::ddl::DdlContext;
use crate::error::{Error, Result};
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
use crate::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
use crate::metrics;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, region_distribution};
@@ -104,7 +105,27 @@ impl AlterTableProcedure {
Ok(Status::executing(true))
}
pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
fn table_poison_key(&self) -> PoisonKey {
table_poison_key(self.data.table_id())
}
async fn put_poison(
&self,
ctx_provider: &dyn ContextProvider,
procedure_id: ProcedureId,
) -> Result<()> {
let poison_key = self.table_poison_key();
ctx_provider
.try_put_poison(&poison_key, procedure_id)
.await
.context(PutPoisonSnafu)
}
pub async fn submit_alter_region_requests(
&mut self,
procedure_id: ProcedureId,
ctx_provider: &dyn ContextProvider,
) -> Result<Status> {
let table_id = self.data.table_id();
let (_, physical_table_route) = self
.context
@@ -127,6 +148,9 @@ impl AlterTableProcedure {
alter_kind,
);
ensure!(!leaders.is_empty(), NoLeaderSnafu { table_id });
// Puts the poison before submitting alter region requests to datanodes.
self.put_poison(ctx_provider, procedure_id).await?;
for datanode in leaders {
let requester = self.context.node_manager.datanode(&datanode).await;
let regions = find_leader_regions(&physical_table_route.region_routes, &datanode);
@@ -140,28 +164,51 @@ impl AlterTableProcedure {
let requester = requester.clone();
alter_region_tasks.push(async move {
if let Err(err) = requester.handle(request).await {
if err.status_code() != StatusCode::RequestOutdated {
// Treat request outdated as success.
// The engine will throw this code when the schema version not match.
// As this procedure has locked the table, the only reason for this error
// is procedure is succeeded before and is retrying.
return Err(add_peer_context_if_needed(datanode)(err));
}
}
Ok(())
requester
.handle(request)
.await
.map_err(add_peer_context_if_needed(datanode))
});
}
}
future::join_all(alter_region_tasks)
let results = future::join_all(alter_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
.collect::<Vec<_>>();
match handle_multiple_results(results) {
MultipleResults::PartialRetryable(error) => {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::PartialNonRetryable(error) => {
error!(error; "Partial non-retryable errors occurred during alter table, table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
// No retry will be done.
Ok(Status::poisoned(
Some(self.table_poison_key()),
ProcedureError::external(error),
))
}
MultipleResults::AllRetryable(error) => {
// Just returns the error, and wait for the next try.
Err(error)
}
MultipleResults::Ok => {
self.data.state = AlterTableState::UpdateMetadata;
Ok(Status::executing_with_clean_poisons(true))
}
MultipleResults::AllNonRetryable(error) => {
error!(error; "All alter requests returned non-retryable errors for table {}, table_id: {}", self.data.table_ref(), self.data.table_id());
// It assumes the metadata on datanode is not changed.
// Case: The alter region request is sent but not applied. (e.g., InvalidArgument)
Ok(Status::executing(true))
let err = BoxedError::new(error);
Err(err).context(AbortProcedureSnafu {
clean_poisons: true,
})
}
}
}
/// Update table metadata.
@@ -250,10 +297,12 @@ impl Procedure for AlterTableProcedure {
Self::TYPE_NAME
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e: Error| {
if e.is_retry_later() {
ProcedureError::retry_later(e)
} else if e.need_clean_poisons() {
ProcedureError::external_and_clean_poisons(e)
} else {
ProcedureError::external(e)
}
@@ -269,7 +318,10 @@ impl Procedure for AlterTableProcedure {
match state {
AlterTableState::Prepare => self.on_prepare().await,
AlterTableState::SubmitAlterRegionRequests => self.submit_alter_region_requests().await,
AlterTableState::SubmitAlterRegionRequests => {
self.submit_alter_region_requests(ctx.procedure_id, ctx.provider.as_ref())
.await
}
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
}
@@ -285,6 +337,10 @@ impl Procedure for AlterTableProcedure {
LockKey::new(key)
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::new(vec![self.table_poison_key()])
}
}
#[derive(Debug, Serialize, Deserialize, AsRefStr)]

View File

@@ -299,7 +299,9 @@ impl Procedure for CreateTableProcedure {
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())

View File

@@ -130,7 +130,9 @@ impl Procedure for DropDatabaseProcedure {
self.state
.recover(&self.runtime_context)
.map_err(BoxedError::new)
.context(ExternalSnafu)
.context(ExternalSnafu {
clean_poisons: false,
})
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {

View File

@@ -200,7 +200,9 @@ impl Procedure for DropTableProcedure {
if register_operating_regions {
self.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
.context(ExternalSnafu {
clean_poisons: false,
})?;
}
Ok(())

View File

@@ -171,3 +171,74 @@ impl MockDatanodeHandler for NaiveDatanodeHandler {
unreachable!()
}
}
#[derive(Clone)]
pub struct PartialSuccessDatanodeHandler {
pub retryable: bool,
}
#[async_trait::async_trait]
impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
let success = peer.id % 2 == 0;
if success {
Ok(RegionResponse::new(0))
} else if self.retryable {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
} else {
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}
#[derive(Clone)]
pub struct AllFailureDatanodeHandler {
pub retryable: bool,
}
#[async_trait::async_trait]
impl MockDatanodeHandler for AllFailureDatanodeHandler {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
if self.retryable {
Err(Error::RetryLater {
source: BoxedError::new(
error::UnexpectedSnafu {
err_msg: "retry later",
}
.build(),
),
})
} else {
error::UnexpectedSnafu {
err_msg: "mock error",
}
.fail()
}
}
async fn handle_query(
&self,
_peer: &Peer,
_request: QueryRequest,
) -> Result<SendableRecordBatchStream> {
unreachable!()
}
}

View File

@@ -25,6 +25,9 @@ use api::v1::{
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::{ProcedureId, Status};
use common_procedure_test::MockContextProvider;
use store_api::storage::RegionId;
use table::requests::TTL_KEY;
use tokio::sync::mpsc::{self};
@@ -33,16 +36,46 @@ use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::test_util::alter_table::TestAlterTableExprBuilder;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::datanode_handler::{
DatanodeWatcher, RequestOutdatedErrorDatanodeHandler,
AllFailureDatanodeHandler, DatanodeWatcher, PartialSuccessDatanodeHandler,
RequestOutdatedErrorDatanodeHandler,
};
use crate::error::Error;
use crate::key::datanode_table::DatanodeTableKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::node_manager::NodeManagerRef;
use crate::peer::Peer;
use crate::poison_key::table_poison_key;
use crate::rpc::ddl::AlterTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::test_util::{new_ddl_context, MockDatanodeManager};
fn prepare_table_route(table_id: u32) -> TableRouteValue {
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
])
}
fn test_rename_alter_table_task(table_name: &str, new_table_name: &str) -> AlterTableTask {
let builder = TestAlterTableExprBuilder::default()
.table_name(table_name)
@@ -101,29 +134,7 @@ async fn test_on_submit_alter_request() {
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
]),
prepare_table_route(table_id),
HashMap::new(),
)
.await
@@ -141,9 +152,15 @@ async fn test_on_submit_alter_request() {
})),
},
};
let mut procedure = AlterTableProcedure::new(table_id, alter_table_task, ddl_context).unwrap();
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure =
AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
let check = |peer: Peer,
request: RegionRequest,
@@ -185,29 +202,7 @@ async fn test_on_submit_alter_request_with_outdated_request() {
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 1)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(5)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 2)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![Peer::empty(4)],
leader_state: None,
leader_down_since: None,
},
RegionRoute {
region: Region::new_test(RegionId::new(table_id, 3)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
]),
prepare_table_route(table_id),
HashMap::new(),
)
.await
@@ -225,9 +220,15 @@ async fn test_on_submit_alter_request_with_outdated_request() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure = AlterTableProcedure::new(table_id, alter_table_task, ddl_context).unwrap();
procedure.on_prepare().await.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(!err.is_retry_later());
}
#[tokio::test]
@@ -326,9 +327,14 @@ async fn test_on_update_metadata_add_columns() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
procedure.on_update_metadata().await.unwrap();
let table_info = ddl_context
@@ -387,9 +393,14 @@ async fn test_on_update_table_options() {
})),
},
};
let procedure_id = ProcedureId::random();
let provider = Arc::new(MockContextProvider::default());
let mut procedure = AlterTableProcedure::new(table_id, task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
procedure.submit_alter_region_requests().await.unwrap();
procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
procedure.on_update_metadata().await.unwrap();
let table_info = ddl_context
@@ -417,3 +428,156 @@ async fn test_on_update_table_options() {
HashMap::from(&table_info.meta.options)
);
}
async fn prepare_alter_table_procedure(
node_manager: NodeManagerRef,
) -> (AlterTableProcedure, ProcedureId) {
common_telemetry::init_default_ut_logging();
let ddl_context = new_ddl_context(node_manager);
let table_id = 1024;
let table_name = "foo";
let task = test_create_table_task(table_name, table_id);
// Puts a value to table name key.
ddl_context
.table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
prepare_table_route(table_id),
HashMap::new(),
)
.await
.unwrap();
let alter_table_task = AlterTableTask {
alter_table: AlterTableExpr {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
kind: Some(Kind::DropColumns(DropColumns {
drop_columns: vec![DropColumn {
name: "cpu".to_string(),
}],
})),
},
};
let procedure_id = ProcedureId::random();
let mut procedure =
AlterTableProcedure::new(table_id, alter_table_task, ddl_context.clone()).unwrap();
procedure.on_prepare().await.unwrap();
(procedure, procedure_id)
}
#[tokio::test]
async fn test_on_submit_alter_request_with_partial_success_retryable() {
let node_manager = Arc::new(MockDatanodeManager::new(PartialSuccessDatanodeHandler {
retryable: true,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(result.is_retry_later());
// Submits again
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(result.is_retry_later());
}
#[tokio::test]
async fn test_on_submit_alter_request_with_partial_success_non_retryable() {
let node_manager = Arc::new(MockDatanodeManager::new(PartialSuccessDatanodeHandler {
retryable: false,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
assert_matches!(result, Status::Poisoned { .. });
// submits again
let result = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap();
assert_matches!(result, Status::Poisoned { .. });
}
#[tokio::test]
async fn test_on_submit_alter_request_with_all_failure_retrybale() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler {
retryable: true,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(err.is_retry_later());
// submits again
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert!(err.is_retry_later());
}
#[tokio::test]
async fn test_on_submit_alter_request_with_all_failure_non_retrybale() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler {
retryable: false,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert_matches!(err, Error::AbortProcedure { .. });
assert!(!err.is_retry_later());
assert!(err.need_clean_poisons());
// submits again
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert_matches!(err, Error::AbortProcedure { .. });
assert!(!err.is_retry_later());
assert!(err.need_clean_poisons());
}
#[tokio::test]
async fn test_on_submit_alter_request_with_exist_poison() {
common_telemetry::init_default_ut_logging();
let node_manager = Arc::new(MockDatanodeManager::new(AllFailureDatanodeHandler {
retryable: false,
}));
let provider = Arc::new(MockContextProvider::default());
let (mut procedure, procedure_id) = prepare_alter_table_procedure(node_manager).await;
let table_id = 1024;
let key = table_poison_key(table_id).to_string();
let another_procedure_id = ProcedureId::random();
provider
.poison_manager()
.try_put_poison(key, another_procedure_id.to_string())
.await
.unwrap();
procedure.on_prepare().await.unwrap();
let err = procedure
.submit_alter_region_requests(procedure_id, provider.as_ref())
.await
.unwrap_err();
assert_matches!(err, Error::PutPoison { .. });
}

View File

@@ -13,10 +13,12 @@
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Debug;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_procedure::error::Error as ProcedureError;
use common_telemetry::{error, warn};
use common_wal::options::WalOptions;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
@@ -38,6 +40,7 @@ use crate::rpc::router::RegionRoute;
/// Adds [Peer] context if the error is unretryable.
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
move |err| {
error!(err; "Failed to operate datanode, peer: {}", datanode);
if !err.is_retry_later() {
return Err::<(), BoxedError>(BoxedError::new(err))
.context(OperateDatanodeSnafu { peer: datanode })
@@ -182,6 +185,85 @@ pub fn extract_region_wal_options(
Ok(region_wal_options)
}
/// The result of multiple operations.
///
/// - Ok: all operations are successful.
/// - PartialRetryable: if any operation is retryable and without non retryable error, the result is retryable.
/// - PartialNonRetryable: if any operation is non retryable, the result is non retryable.
/// - AllRetryable: all operations are retryable.
/// - AllNonRetryable: all operations are not retryable.
pub enum MultipleResults {
Ok,
PartialRetryable(Error),
PartialNonRetryable(Error),
AllRetryable(Error),
AllNonRetryable(Error),
}
/// Handles the results of alter region requests.
///
/// For partial success, we need to check if the errors are retryable.
/// If all the errors are retryable, we return a retryable error.
/// Otherwise, we return the first error.
pub fn handle_multiple_results<T: Debug>(results: Vec<Result<T>>) -> MultipleResults {
if results.is_empty() {
return MultipleResults::Ok;
}
let num_results = results.len();
let mut retryable_results = Vec::new();
let mut non_retryable_results = Vec::new();
let mut ok_results = Vec::new();
for result in results {
match result {
Ok(_) => ok_results.push(result),
Err(err) => {
if err.is_retry_later() {
retryable_results.push(err);
} else {
non_retryable_results.push(err);
}
}
}
}
common_telemetry::debug!(
"retryable_results: {}, non_retryable_results: {}, ok_results: {}",
retryable_results.len(),
non_retryable_results.len(),
ok_results.len()
);
if retryable_results.len() == num_results {
return MultipleResults::AllRetryable(retryable_results.into_iter().next().unwrap());
} else if non_retryable_results.len() == num_results {
warn!("all non retryable results: {}", non_retryable_results.len());
for err in &non_retryable_results {
error!(err; "non retryable error");
}
return MultipleResults::AllNonRetryable(non_retryable_results.into_iter().next().unwrap());
} else if ok_results.len() == num_results {
return MultipleResults::Ok;
} else if !retryable_results.is_empty()
&& !ok_results.is_empty()
&& non_retryable_results.is_empty()
{
return MultipleResults::PartialRetryable(retryable_results.into_iter().next().unwrap());
}
warn!(
"partial non retryable results: {}, retryable results: {}, ok results: {}",
non_retryable_results.len(),
retryable_results.len(),
ok_results.len()
);
for err in &non_retryable_results {
error!(err; "non retryable error");
}
// non_retryable_results.len() > 0
MultipleResults::PartialNonRetryable(non_retryable_results.into_iter().next().unwrap())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -449,6 +449,14 @@ pub enum Error {
#[snafu(display("Retry later"))]
RetryLater { source: BoxedError },
#[snafu(display("Abort procedure"))]
AbortProcedure {
#[snafu(implicit)]
location: Location,
source: BoxedError,
clean_poisons: bool,
},
#[snafu(display(
"Failed to encode a wal options to json string, wal_options: {:?}",
wal_options
@@ -749,6 +757,13 @@ pub enum Error {
error: serde_json::Error,
},
#[snafu(display("No leader found for table_id: {}", table_id))]
NoLeader {
table_id: TableId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Procedure poison key already exists with a different value, key: {}, value: {}",
key,
@@ -760,6 +775,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to put poison, table metadata may be corrupted"))]
PutPoison {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_procedure::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -778,6 +801,7 @@ impl ErrorExt for Error {
| SerializeToJson { .. }
| DeserializeFromJson { .. } => StatusCode::Internal,
NoLeader { .. } => StatusCode::TableUnavailable,
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,
@@ -849,7 +873,9 @@ impl ErrorExt for Error {
OperateDatanode { source, .. } => source.status_code(),
Table { source, .. } => source.status_code(),
RetryLater { source, .. } => source.status_code(),
AbortProcedure { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),
PutPoison { source, .. } => source.status_code(),
ParseProcedureId { .. }
| InvalidNumTopics { .. }
@@ -920,6 +946,11 @@ impl Error {
matches!(self, Error::RetryLater { .. })
}
/// Determine whether it needs to clean poisons.
pub fn need_clean_poisons(&self) -> bool {
matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons)
}
/// Returns true if the response exceeds the size limit.
pub fn is_exceeded_size_limit(&self) -> bool {
match self {

View File

@@ -37,6 +37,7 @@ pub mod metrics;
pub mod node_expiry_listener;
pub mod node_manager;
pub mod peer;
pub mod poison_key;
pub mod range_stream;
pub mod region_keeper;
pub mod region_registry;

View File

@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_procedure::PoisonKey;
use store_api::storage::TableId;
/// Returns the poison key for the table.
pub fn table_poison_key(table_id: TableId) -> PoisonKey {
let key = format!("table/{}", table_id);
PoisonKey::new(&key)
}

View File

@@ -40,6 +40,11 @@ impl MockContextProvider {
poison_manager: InMemoryPoisonStore::default(),
}
}
/// Returns a reference to the poison manager.
pub fn poison_manager(&self) -> &InMemoryPoisonStore {
&self.poison_manager
}
}
#[async_trait]

View File

@@ -28,8 +28,14 @@ use crate::PoisonKey;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to execute procedure due to external error"))]
External { source: BoxedError },
#[snafu(display(
"Failed to execute procedure due to external error, clean poisons: {}",
clean_poisons
))]
External {
source: BoxedError,
clean_poisons: bool,
},
#[snafu(display("Loader {} is already registered", name))]
LoaderConflict {
@@ -276,6 +282,15 @@ impl Error {
pub fn external<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::External {
source: BoxedError::new(err),
clean_poisons: false,
}
}
/// Creates a new [Error::External] error from source `err` and clean poisons.
pub fn external_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::External {
source: BoxedError::new(err),
clean_poisons: true,
}
}
@@ -291,6 +306,11 @@ impl Error {
matches!(self, Error::RetryLater { .. })
}
/// Determine whether it needs to clean poisons.
pub fn need_clean_poisons(&self) -> bool {
matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
}
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
/// to its [StatusCode].
pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {

View File

@@ -29,7 +29,7 @@ pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
PoisonKey, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef,
ProcedureState, ProcedureWithId, Status, StringKey,
PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -372,6 +372,14 @@ impl Runner {
return;
}
if e.need_clean_poisons() {
if let Err(e) = self.clean_poisons().await {
error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
}
if e.is_retry_later() {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
@@ -1361,12 +1369,9 @@ mod tests {
.await
.unwrap();
Ok(Status::executing_with_clean_poisons(true))
Ok(Status::executing(true))
} else {
Ok(Status::Poisoned {
keys: PoisonKeys::new(vec![poison_key.clone()]),
error: Error::external(MockError::new(StatusCode::Unexpected)),
})
Ok(Status::executing_with_clean_poisons(true))
}
}
.boxed()
@@ -1404,6 +1409,101 @@ mod tests {
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be exist.
assert!(procedure_id.is_some());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be deleted.
assert!(procedure_id.is_none());
}
#[tokio::test]
async fn test_execute_error_with_clean_poisons() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Ok(Status::executing(true))
} else {
Err(Error::external_and_clean_poisons(MockError::new(
StatusCode::Unexpected,
)))
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("error_with_clean_poisons");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be exist.
assert!(procedure_id.is_some());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager

View File

@@ -57,6 +57,14 @@ pub enum Status {
}
impl Status {
/// Returns a [Status::Poisoned] with given `keys` and `error`.
pub fn poisoned(keys: impl IntoIterator<Item = PoisonKey>, error: Error) -> Status {
Status::Poisoned {
keys: PoisonKeys::new(keys),
error,
}
}
/// Returns a [Status::Executing] with given `persist` flag.
pub fn executing(persist: bool) -> Status {
Status::Executing {

View File

@@ -14,7 +14,6 @@
use api::v1::meta::{HeartbeatRequest, Role};
use common_meta::region_registry::LeaderRegion;
use common_telemetry::info;
use store_api::region_engine::RegionRole;
use crate::error::Result;
@@ -52,7 +51,6 @@ impl HeartbeatHandler for CollectLeaderRegionHandler {
};
key_values.push((stat.id, value));
}
info!("collect leader region: {:?}", key_values);
ctx.leader_region_registry.batch_put(key_values);
Ok(HandleControl::Continue)

View File

@@ -103,7 +103,6 @@ impl DataRegion {
.get_metadata(region_id)
.await
.context(MitoReadOperationSnafu)?;
let version = region_metadata.schema_version;
// find the max column id
let new_column_id_start = 1 + region_metadata
@@ -166,7 +165,6 @@ impl DataRegion {
debug!("Adding (Column id assigned) columns {new_columns:?} to region {region_id:?}");
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
kind: AlterKind::AddColumns {
columns: new_columns,
},

View File

@@ -234,7 +234,6 @@ mod test {
// alter physical region
let physical_region_id = env.default_physical_region_id();
let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
@@ -262,7 +261,6 @@ mod test {
// alter physical region's option should work
let alter_region_option_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
},

View File

@@ -177,7 +177,6 @@ pub fn alter_logical_region_add_tag_columns(
});
}
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: new_columns,
},

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
@@ -19,7 +20,6 @@ use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions};
@@ -34,6 +34,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
use crate::engine::MitoEngine;
use crate::error;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
@@ -51,7 +52,6 @@ async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expect
fn add_tag1() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
@@ -71,7 +71,6 @@ fn add_tag1() -> RegionAlterRequest {
fn alter_column_inverted_index() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetIndex {
options: ApiSetIndexOptions::Inverted {
column_name: "tag_0".to_string(),
@@ -82,7 +81,6 @@ fn alter_column_inverted_index() -> RegionAlterRequest {
fn alter_column_fulltext_options() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetIndex {
options: ApiSetIndexOptions::Fulltext {
column_name: "tag_0".to_string(),
@@ -358,7 +356,8 @@ async fn test_alter_region_retry() {
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::RequestOutdated);
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
assert_matches!(err, &error::Error::InvalidRegionRequest { .. });
let expected = "\
+-------+-------+---------+---------------------+
@@ -733,7 +732,6 @@ async fn test_alter_region_ttl_options() {
.unwrap();
let engine_cloned = engine.clone();
let alter_ttl_request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Ttl(Some(Duration::from_secs(500).into()))],
},

View File

@@ -535,7 +535,6 @@ async fn test_change_region_compaction_window() {
// Change compaction window.
let request = RegionRequest::Alter(RegionAlterRequest {
schema_version: region.metadata().schema_version,
kind: SetRegionOptions {
options: vec![SetRegionOption::Twsc(
"compaction.twcs.time_window".to_string(),

View File

@@ -35,7 +35,6 @@ use crate::test_util::{
fn add_tag1() -> RegionAlterRequest {
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {

View File

@@ -490,14 +490,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Schema version doesn't match. Expect {} but gives {}", expect, actual))]
InvalidRegionRequestSchemaVersion {
expect: u64,
actual: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Region {} is in {:?} state, which does not permit manifest updates.",
region_id,
@@ -1086,8 +1078,6 @@ impl ErrorExt for Error {
| PartitionOutOfRange { .. }
| ParseJobId { .. } => StatusCode::InvalidArguments,
InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,
RegionMetadataNotFound { .. }
| Join { .. }
| WorkerStopped { .. }

View File

@@ -18,7 +18,8 @@ use std::str::FromStr;
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use common_telemetry::info;
use common_telemetry::tracing::warn;
use humantime_serde::re::humantime;
use snafu::ResultExt;
use store_api::metadata::{
@@ -29,9 +30,7 @@ use store_api::mito_engine_options;
use store_api::region_request::{AlterKind, RegionAlterRequest, SetRegionOption};
use store_api::storage::RegionId;
use crate::error::{
InvalidMetadataSnafu, InvalidRegionRequestSchemaVersionSnafu, InvalidRegionRequestSnafu, Result,
};
use crate::error::{InvalidMetadataSnafu, InvalidRegionRequestSnafu, Result};
use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::options::CompactionOptions::Twcs;
@@ -83,22 +82,6 @@ impl<S> RegionWorkerLoop<S> {
_ => {}
}
if version.metadata.schema_version != request.schema_version {
// This is possible if we retry the request.
debug!(
"Ignores alter request, region id:{}, region schema version {} is not equal to request schema version {}",
region_id, version.metadata.schema_version, request.schema_version
);
// Returns an error.
sender.send(
InvalidRegionRequestSchemaVersionSnafu {
expect: version.metadata.schema_version,
actual: request.schema_version,
}
.fail(),
);
return;
}
// Validate request.
if let Err(e) = request.validate(&version.metadata) {
// Invalid request.
@@ -108,7 +91,7 @@ impl<S> RegionWorkerLoop<S> {
// Checks whether we need to alter the region.
if !request.need_alter(&version.metadata) {
debug!(
warn!(
"Ignores alter request as it alters nothing, region_id: {}, request: {:?}",
region_id, request
);
@@ -219,7 +202,6 @@ fn metadata_after_alteration(
.context(InvalidRegionRequestSnafu)?
.bump_version();
let new_meta = builder.build().context(InvalidMetadataSnafu)?;
assert_eq!(request.schema_version + 1, new_meta.schema_version);
Ok(Arc::new(new_meta))
}

View File

@@ -436,8 +436,6 @@ pub struct RegionCloseRequest {}
/// Alter metadata of a region.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct RegionAlterRequest {
/// The version of the schema before applying the alteration.
pub schema_version: u64,
/// Kind of alteration to do.
pub kind: AlterKind,
}
@@ -445,17 +443,6 @@ pub struct RegionAlterRequest {
impl RegionAlterRequest {
/// Checks whether the request is valid, returns an error if it is invalid.
pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
ensure!(
metadata.schema_version == self.schema_version,
InvalidRegionRequestSnafu {
region_id: metadata.region_id,
err: format!(
"region schema version {} is not equal to request schema version {}",
metadata.schema_version, self.schema_version
),
}
);
self.kind.validate(metadata)?;
Ok(())
@@ -479,10 +466,7 @@ impl TryFrom<AlterRequest> for RegionAlterRequest {
})?;
let kind = AlterKind::try_from(kind)?;
Ok(RegionAlterRequest {
schema_version: value.schema_version,
kind,
})
Ok(RegionAlterRequest { kind })
}
}
@@ -1234,7 +1218,6 @@ mod tests {
assert_eq!(
request,
RegionAlterRequest {
schema_version: 1,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
@@ -1531,21 +1514,6 @@ mod tests {
assert!(kind.need_alter(&metadata));
}
#[test]
fn test_validate_schema_version() {
let mut metadata = new_metadata();
metadata.schema_version = 2;
RegionAlterRequest {
schema_version: 1,
kind: AlterKind::DropColumns {
names: vec!["field_0".to_string()],
},
}
.validate(&metadata)
.unwrap_err();
}
#[test]
fn test_validate_add_columns() {
let kind = AlterKind::AddColumns {
@@ -1576,10 +1544,7 @@ mod tests {
},
],
};
let request = RegionAlterRequest {
schema_version: 1,
kind,
};
let request = RegionAlterRequest { kind };
let mut metadata = new_metadata();
metadata.schema_version = 1;
request.validate(&metadata).unwrap();
@@ -1640,10 +1605,7 @@ mod tests {
},
},
};
let request = RegionAlterRequest {
schema_version: 1,
kind,
};
let request = RegionAlterRequest { kind };
let mut metadata = new_metadata();
metadata.schema_version = 1;
request.validate(&metadata).unwrap();
@@ -1653,10 +1615,7 @@ mod tests {
column_name: "tag_0".to_string(),
},
};
let request = RegionAlterRequest {
schema_version: 1,
kind,
};
let request = RegionAlterRequest { kind };
let mut metadata = new_metadata();
metadata.schema_version = 1;
request.validate(&metadata).unwrap();

View File

@@ -223,6 +223,15 @@ async fn execute_alter_table(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
assert_eq!(a, b);
}
});
// select from table to make sure the table is still ok
let sql = format!("SELECT * FROM {}", table_ctx.name);
let result = sqlx::query(&sql)
.persistent(false)
.execute(&ctx.greptime)
.await
.context(error::ExecuteQuerySnafu { sql: &sql })?;
info!("Select from table: {sql}, result: {result:?}");
}
// Cleans up

View File

@@ -192,3 +192,46 @@ DROP TABLE phy;
Affected Rows: 0
CREATE TABLE grpc_latencies (
ts TIMESTAMP TIME INDEX,
host STRING,
method_name STRING,
latency DOUBLE,
PRIMARY KEY (host, method_name)
) with('append_mode'='true');
Affected Rows: 0
INSERT INTO grpc_latencies (ts, host, method_name, latency) VALUES
('2024-07-11 20:00:06', 'host1', 'GetUser', 103.0);
Affected Rows: 1
SELECT * FROM grpc_latencies;
+---------------------+-------+-------------+---------+
| ts | host | method_name | latency |
+---------------------+-------+-------------+---------+
| 2024-07-11T20:00:06 | host1 | GetUser | 103.0 |
+---------------------+-------+-------------+---------+
ALTER TABLE grpc_latencies SET ttl = '10d';
Affected Rows: 0
ALTER TABLE grpc_latencies ADD COLUMN home INTEGER FIRST;
Affected Rows: 0
SELECT * FROM grpc_latencies;
+------+---------------------+-------+-------------+---------+
| home | ts | host | method_name | latency |
+------+---------------------+-------+-------------+---------+
| | 2024-07-11T20:00:06 | host1 | GetUser | 103.0 |
+------+---------------------+-------+-------------+---------+
DROP TABLE grpc_latencies;
Affected Rows: 0

View File

@@ -95,3 +95,24 @@ DROP TABLE t1;
DROP TABLE t2;
DROP TABLE phy;
CREATE TABLE grpc_latencies (
ts TIMESTAMP TIME INDEX,
host STRING,
method_name STRING,
latency DOUBLE,
PRIMARY KEY (host, method_name)
) with('append_mode'='true');
INSERT INTO grpc_latencies (ts, host, method_name, latency) VALUES
('2024-07-11 20:00:06', 'host1', 'GetUser', 103.0);
SELECT * FROM grpc_latencies;
ALTER TABLE grpc_latencies SET ttl = '10d';
ALTER TABLE grpc_latencies ADD COLUMN home INTEGER FIRST;
SELECT * FROM grpc_latencies;
DROP TABLE grpc_latencies;