mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-03 13:50:40 +00:00
fix: sqlness failed due to region failover wrongly kicks in for dropp… (#1690)
fix: sqlness failed due to region failover wrongly kicks in for dropped or renamed table
This commit is contained in:
@@ -153,13 +153,16 @@ impl FailureDetectRunner {
|
||||
.collect::<Vec<RegionIdent>>();
|
||||
|
||||
for r in failed_regions {
|
||||
// Now that we know the region is failed, remove it from the failure
|
||||
// detectors, avoiding the failover procedure to be triggered again.
|
||||
// If the region is back alive (the failover procedure runs successfully),
|
||||
// it will be added back to the failure detectors again.
|
||||
failure_detectors.remove(&r);
|
||||
|
||||
region_failover_manager.fire_region_failover(r)
|
||||
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
|
||||
error!(e; "Failed to do region failover for {r}");
|
||||
} else {
|
||||
// Now that we know the region is starting to do failover, remove it
|
||||
// from the failure detectors, avoiding the failover procedure to be
|
||||
// triggered again.
|
||||
// If the region is back alive (the failover procedure runs successfully),
|
||||
// it will be added back to the failure detectors again.
|
||||
failure_detectors.remove(&r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::helper::TableGlobalKey;
|
||||
use common_meta::RegionIdent;
|
||||
use common_procedure::error::{
|
||||
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
|
||||
@@ -42,6 +43,7 @@ use crate::error::{Error, RegisterProcedureLoaderSnafu, Result};
|
||||
use crate::lock::DistLockRef;
|
||||
use crate::metasrv::{SelectorContext, SelectorRef};
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
use crate::service::store::ext::KvStoreExt;
|
||||
|
||||
const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
@@ -119,10 +121,18 @@ impl RegionFailoverManager {
|
||||
procedures.insert(failed_region.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn fire_region_failover(&self, failed_region: RegionIdent) {
|
||||
if !self.insert_running_procedures(&failed_region) {
|
||||
pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
|
||||
if !self.insert_running_procedures(failed_region) {
|
||||
warn!("Region failover procedure for region {failed_region} is already running!");
|
||||
return;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if !self.table_exists(failed_region).await? {
|
||||
// The table could be dropped before the failure detector knows it. Then the region
|
||||
// failover is not needed.
|
||||
// Or the table could be renamed. But we will have a new region ident to detect failure.
|
||||
// So the region failover here is not needed either.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let context = self.create_context();
|
||||
@@ -133,6 +143,7 @@ impl RegionFailoverManager {
|
||||
|
||||
let procedure_manager = self.procedure_manager.clone();
|
||||
let running_procedures = self.running_procedures.clone();
|
||||
let failed_region = failed_region.clone();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _guard = FailoverProcedureGuard {
|
||||
running_procedures,
|
||||
@@ -154,6 +165,22 @@ impl RegionFailoverManager {
|
||||
|
||||
info!("Region failover procedure {procedure_id} for region {failed_region} is finished successfully!");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn table_exists(&self, failed_region: &RegionIdent) -> Result<bool> {
|
||||
let table_ident = &failed_region.table_ident;
|
||||
let table_global_key = TableGlobalKey {
|
||||
catalog_name: table_ident.catalog.clone(),
|
||||
schema_name: table_ident.schema.clone(),
|
||||
table_name: table_ident.table.clone(),
|
||||
};
|
||||
let table_global_value = self
|
||||
.selector_ctx
|
||||
.kv_store
|
||||
.get(table_global_key.to_string().into_bytes())
|
||||
.await?;
|
||||
Ok(table_global_value.is_some())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,8 @@ const SERVER_ADDR: &str = "127.0.0.1:4001";
|
||||
const STANDALONE_LOG_FILE: &str = "/tmp/greptime-sqlness-standalone.log";
|
||||
const METASRV_LOG_FILE: &str = "/tmp/greptime-sqlness-metasrv.log";
|
||||
const FRONTEND_LOG_FILE: &str = "/tmp/greptime-sqlness-frontend.log";
|
||||
const DATANODE_LOG_FILE: &str = "/tmp/greptime-sqlness-datanode.log";
|
||||
|
||||
const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info";
|
||||
|
||||
pub struct Env {}
|
||||
|
||||
@@ -64,8 +65,6 @@ impl EnvController for Env {
|
||||
}
|
||||
}
|
||||
|
||||
static DATANODE_ID: AtomicU32 = AtomicU32::new(1);
|
||||
|
||||
#[allow(clippy::print_stdout)]
|
||||
impl Env {
|
||||
pub async fn start_standalone() -> GreptimeDB {
|
||||
@@ -128,10 +127,17 @@ impl Env {
|
||||
truncate_log: bool,
|
||||
) -> Child {
|
||||
let log_file_name = match subcommand {
|
||||
"datanode" => DATANODE_LOG_FILE,
|
||||
"frontend" => FRONTEND_LOG_FILE,
|
||||
"metasrv" => METASRV_LOG_FILE,
|
||||
"standalone" => STANDALONE_LOG_FILE,
|
||||
"datanode" => {
|
||||
db_ctx.incr_datanode_id();
|
||||
|
||||
format!(
|
||||
"/tmp/greptime-sqlness-datanode-{}.log",
|
||||
db_ctx.datanode_id()
|
||||
)
|
||||
}
|
||||
"frontend" => FRONTEND_LOG_FILE.to_string(),
|
||||
"metasrv" => METASRV_LOG_FILE.to_string(),
|
||||
"standalone" => STANDALONE_LOG_FILE.to_string(),
|
||||
_ => panic!("Unexpected subcommand: {subcommand}"),
|
||||
};
|
||||
let log_file = OpenOptions::new()
|
||||
@@ -139,13 +145,13 @@ impl Env {
|
||||
.write(true)
|
||||
.truncate(truncate_log)
|
||||
.open(log_file_name)
|
||||
.unwrap_or_else(|_| panic!("Cannot open log file at {log_file_name}"));
|
||||
.unwrap();
|
||||
|
||||
let (args, check_ip_addr) = match subcommand {
|
||||
"datanode" => Self::datanode_start_args(db_ctx),
|
||||
"standalone" => {
|
||||
let args = vec![
|
||||
"--log-level=debug".to_string(),
|
||||
DEFAULT_LOG_LEVEL.to_string(),
|
||||
subcommand.to_string(),
|
||||
"start".to_string(),
|
||||
"-c".to_string(),
|
||||
@@ -156,7 +162,7 @@ impl Env {
|
||||
}
|
||||
"frontend" => {
|
||||
let args = vec![
|
||||
"--log-level=debug".to_string(),
|
||||
DEFAULT_LOG_LEVEL.to_string(),
|
||||
subcommand.to_string(),
|
||||
"start".to_string(),
|
||||
"--metasrv-addr=0.0.0.0:3002".to_string(),
|
||||
@@ -166,7 +172,7 @@ impl Env {
|
||||
}
|
||||
"metasrv" => {
|
||||
let args = vec![
|
||||
"--log-level=debug".to_string(),
|
||||
DEFAULT_LOG_LEVEL.to_string(),
|
||||
subcommand.to_string(),
|
||||
"start".to_string(),
|
||||
"--use-memory-store".to_string(),
|
||||
@@ -193,11 +199,11 @@ impl Env {
|
||||
}
|
||||
|
||||
fn datanode_start_args(db_ctx: &GreptimeDBContext) -> (Vec<String>, String) {
|
||||
let id = DATANODE_ID.fetch_add(1, Ordering::Relaxed);
|
||||
let id = db_ctx.datanode_id();
|
||||
|
||||
let subcommand = "datanode";
|
||||
let mut args = vec![
|
||||
"--log-level=debug".to_string(),
|
||||
DEFAULT_LOG_LEVEL.to_string(),
|
||||
subcommand.to_string(),
|
||||
"start".to_string(),
|
||||
];
|
||||
@@ -231,8 +237,9 @@ impl Env {
|
||||
let new_server_process = Env::start_server("standalone", &db.ctx, false).await;
|
||||
vec![new_server_process]
|
||||
} else {
|
||||
db.ctx.reset_datanode_id();
|
||||
|
||||
let mut processes = vec![];
|
||||
DATANODE_ID.store(1, Ordering::Relaxed);
|
||||
for _ in 0..3 {
|
||||
let new_server_process = Env::start_server("datanode", &db.ctx, false).await;
|
||||
processes.push(new_server_process);
|
||||
@@ -349,14 +356,28 @@ impl Drop for GreptimeDB {
|
||||
struct GreptimeDBContext {
|
||||
/// Start time in millisecond
|
||||
time: i64,
|
||||
datanode_id: AtomicU32,
|
||||
}
|
||||
|
||||
impl GreptimeDBContext {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
time: common_time::util::current_time_millis(),
|
||||
datanode_id: AtomicU32::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn incr_datanode_id(&self) {
|
||||
self.datanode_id.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
fn datanode_id(&self) -> u32 {
|
||||
self.datanode_id.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn reset_datanode_id(&self) {
|
||||
self.datanode_id.store(0, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
struct ResultDisplayer {
|
||||
|
||||
Reference in New Issue
Block a user