From ce440606a9e876d7a956158f8821369735ec2159 Mon Sep 17 00:00:00 2001 From: LFC Date: Thu, 1 Jun 2023 21:47:47 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20sqlness=20failed=20due=20to=20region=20f?= =?UTF-8?q?ailover=20wrongly=20kicks=20in=20for=20dropp=E2=80=A6=20(#1690)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix: sqlness failed due to region failover wrongly kicks in for dropped or renamed table --- .../src/handler/failure_handler/runner.rs | 17 ++++--- src/meta-srv/src/procedure/region_failover.rs | 33 +++++++++++-- tests/runner/src/env.rs | 49 +++++++++++++------ 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index 065ae61574..b9fd28ef93 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -153,13 +153,16 @@ impl FailureDetectRunner { .collect::>(); for r in failed_regions { - // Now that we know the region is failed, remove it from the failure - // detectors, avoiding the failover procedure to be triggered again. - // If the region is back alive (the failover procedure runs successfully), - // it will be added back to the failure detectors again. - failure_detectors.remove(&r); - - region_failover_manager.fire_region_failover(r) + if let Err(e) = region_failover_manager.do_region_failover(&r).await { + error!(e; "Failed to do region failover for {r}"); + } else { + // Now that we know the region is starting to do failover, remove it + // from the failure detectors, avoiding the failover procedure to be + // triggered again. + // If the region is back alive (the failover procedure runs successfully), + // it will be added back to the failure detectors again. + failure_detectors.remove(&r); + } } } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 35861dbe22..92349ef309 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; +use catalog::helper::TableGlobalKey; use common_meta::RegionIdent; use common_procedure::error::{ Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -42,6 +43,7 @@ use crate::error::{Error, RegisterProcedureLoaderSnafu, Result}; use crate::lock::DistLockRef; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::service::mailbox::MailboxRef; +use crate::service::store::ext::KvStoreExt; const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2); @@ -119,10 +121,18 @@ impl RegionFailoverManager { procedures.insert(failed_region.clone()) } - pub(crate) fn fire_region_failover(&self, failed_region: RegionIdent) { - if !self.insert_running_procedures(&failed_region) { + pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> { + if !self.insert_running_procedures(failed_region) { warn!("Region failover procedure for region {failed_region} is already running!"); - return; + return Ok(()); + } + + if !self.table_exists(failed_region).await? { + // The table could be dropped before the failure detector knows it. Then the region + // failover is not needed. + // Or the table could be renamed. But we will have a new region ident to detect failure. + // So the region failover here is not needed either. + return Ok(()); } let context = self.create_context(); @@ -133,6 +143,7 @@ impl RegionFailoverManager { let procedure_manager = self.procedure_manager.clone(); let running_procedures = self.running_procedures.clone(); + let failed_region = failed_region.clone(); common_runtime::spawn_bg(async move { let _guard = FailoverProcedureGuard { running_procedures, @@ -154,6 +165,22 @@ impl RegionFailoverManager { info!("Region failover procedure {procedure_id} for region {failed_region} is finished successfully!"); }); + Ok(()) + } + + async fn table_exists(&self, failed_region: &RegionIdent) -> Result { + let table_ident = &failed_region.table_ident; + let table_global_key = TableGlobalKey { + catalog_name: table_ident.catalog.clone(), + schema_name: table_ident.schema.clone(), + table_name: table_ident.table.clone(), + }; + let table_global_value = self + .selector_ctx + .kv_store + .get(table_global_key.to_string().into_bytes()) + .await?; + Ok(table_global_value.is_some()) } } diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 89e9e33f9b..73ec466b84 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -41,7 +41,8 @@ const SERVER_ADDR: &str = "127.0.0.1:4001"; const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log"; const METASRV_LOG_FILE: &str = "/tmp/greptime-sqlness-metasrv.log"; const FRONTEND_LOG_FILE: &str = "/tmp/greptime-sqlness-frontend.log"; -const DATANODE_LOG_FILE: &str = "/tmp/greptime-sqlness-datanode.log"; + +const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; pub struct Env {} @@ -64,8 +65,6 @@ impl EnvController for Env { } } -static DATANODE_ID: AtomicU32 = AtomicU32::new(1); - #[allow(clippy::print_stdout)] impl Env { pub async fn start_standalone() -> GreptimeDB { @@ -128,10 +127,17 @@ impl Env { truncate_log: bool, ) -> Child { let log_file_name = match subcommand { - "datanode" => DATANODE_LOG_FILE, - "frontend" => FRONTEND_LOG_FILE, - "metasrv" => METASRV_LOG_FILE, - "standalone" => STANDALONE_LOG_FILE, + "datanode" => { + db_ctx.incr_datanode_id(); + + format!( + "/tmp/greptime-sqlness-datanode-{}.log", + db_ctx.datanode_id() + ) + } + "frontend" => FRONTEND_LOG_FILE.to_string(), + "metasrv" => METASRV_LOG_FILE.to_string(), + "standalone" => STANDALONE_LOG_FILE.to_string(), _ => panic!("Unexpected subcommand: {subcommand}"), }; let log_file = OpenOptions::new() @@ -139,13 +145,13 @@ impl Env { .write(true) .truncate(truncate_log) .open(log_file_name) - .unwrap_or_else(|_| panic!("Cannot open log file at {log_file_name}")); + .unwrap(); let (args, check_ip_addr) = match subcommand { "datanode" => Self::datanode_start_args(db_ctx), "standalone" => { let args = vec![ - "--log-level=debug".to_string(), + DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), "-c".to_string(), @@ -156,7 +162,7 @@ impl Env { } "frontend" => { let args = vec![ - "--log-level=debug".to_string(), + DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), "--metasrv-addr=0.0.0.0:3002".to_string(), @@ -166,7 +172,7 @@ impl Env { } "metasrv" => { let args = vec![ - "--log-level=debug".to_string(), + DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), "--use-memory-store".to_string(), @@ -193,11 +199,11 @@ impl Env { } fn datanode_start_args(db_ctx: &GreptimeDBContext) -> (Vec, String) { - let id = DATANODE_ID.fetch_add(1, Ordering::Relaxed); + let id = db_ctx.datanode_id(); let subcommand = "datanode"; let mut args = vec![ - "--log-level=debug".to_string(), + DEFAULT_LOG_LEVEL.to_string(), subcommand.to_string(), "start".to_string(), ]; @@ -231,8 +237,9 @@ impl Env { let new_server_process = Env::start_server("standalone", &db.ctx, false).await; vec![new_server_process] } else { + db.ctx.reset_datanode_id(); + let mut processes = vec![]; - DATANODE_ID.store(1, Ordering::Relaxed); for _ in 0..3 { let new_server_process = Env::start_server("datanode", &db.ctx, false).await; processes.push(new_server_process); @@ -349,14 +356,28 @@ impl Drop for GreptimeDB { struct GreptimeDBContext { /// Start time in millisecond time: i64, + datanode_id: AtomicU32, } impl GreptimeDBContext { pub fn new() -> Self { Self { time: common_time::util::current_time_millis(), + datanode_id: AtomicU32::new(0), } } + + fn incr_datanode_id(&self) { + self.datanode_id.fetch_add(1, Ordering::Relaxed); + } + + fn datanode_id(&self) -> u32 { + self.datanode_id.load(Ordering::Relaxed) + } + + fn reset_datanode_id(&self) { + self.datanode_id.store(0, Ordering::Relaxed); + } } struct ResultDisplayer {