feat: allow igoring nonexistent regions in recovery mode (#6592)

* feat: allow ignoring nonexistent regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: ignore nonexistent regions during startup in recovery mode

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: allow enabling recovery mode via http api

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-28 19:02:13 +08:00
committed by GitHub
parent 2e571e351f
commit b7fd4ca65d
6 changed files with 312 additions and 9 deletions

View File

@@ -167,6 +167,7 @@ pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance";
pub const MAINTENANCE_KEY: &str = "__switches/maintenance";
pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure";
pub const RECOVERY_MODE_KEY: &str = "__switches/recovery";
pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";

View File

@@ -21,7 +21,7 @@ use moka::future::Cache;
use snafu::ResultExt;
use crate::error::{GetCacheSnafu, Result};
use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY};
use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY, RECOVERY_MODE_KEY};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{BatchDeleteRequest, PutRequest};
@@ -131,6 +131,21 @@ impl RuntimeSwitchManager {
pub async fn is_procedure_paused(&self) -> Result<bool> {
self.exists(PAUSE_PROCEDURE_KEY).await
}
/// Enables recovery mode.
pub async fn set_recovery_mode(&self) -> Result<()> {
self.put_key(RECOVERY_MODE_KEY).await
}
/// Unsets recovery mode.
pub async fn unset_recovery_mode(&self) -> Result<()> {
self.delete_keys(&[RECOVERY_MODE_KEY]).await
}
/// Returns true if the system is currently in recovery mode.
pub async fn recovery_mode(&self) -> Result<bool> {
self.exists(RECOVERY_MODE_KEY).await
}
}
#[cfg(test)]
@@ -221,4 +236,15 @@ mod tests {
runtime_switch_manager.resume_procedure().await.unwrap();
assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap());
}
#[tokio::test]
async fn test_recovery_mode() {
let runtime_switch_manager =
Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new())));
assert!(!runtime_switch_manager.recovery_mode().await.unwrap());
runtime_switch_manager.set_recovery_mode().await.unwrap();
assert!(runtime_switch_manager.recovery_mode().await.unwrap());
runtime_switch_manager.unset_recovery_mode().await.unwrap();
assert!(!runtime_switch_manager.recovery_mode().await.unwrap());
}
}

View File

@@ -24,6 +24,7 @@ use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::runtime_switch::RuntimeSwitchManager;
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::wal_options_allocator::prepare_wal_options;
@@ -243,6 +244,12 @@ impl DatanodeBuilder {
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
// TODO(weny): Considering introducing a readonly kv_backend trait.
let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
let is_recovery_mode = runtime_switch_manager
.recovery_mode()
.await
.context(GetMetadataSnafu)?;
let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
@@ -255,6 +262,8 @@ impl DatanodeBuilder {
table_values,
!controlled_by_metasrv,
self.opts.init_regions_parallelism,
// Ignore nonexistent regions in recovery mode.
is_recovery_mode,
);
if self.opts.init_regions_in_background {
@@ -336,6 +345,12 @@ impl DatanodeBuilder {
) -> Result<()> {
let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
let runtime_switch_manager = RuntimeSwitchManager::new(kv_backend.clone());
let is_recovery_mode = runtime_switch_manager
.recovery_mode()
.await
.context(GetMetadataSnafu)?;
let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
let table_values = datanode_table_manager
.tables(node_id)
@@ -348,6 +363,7 @@ impl DatanodeBuilder {
table_values,
open_with_writable,
self.opts.init_regions_parallelism,
is_recovery_mode,
)
.await
}
@@ -587,6 +603,7 @@ async fn open_all_regions(
table_values: Vec<DatanodeTableValue>,
open_with_writable: bool,
init_regions_parallelism: usize,
ignore_nonexistent_region: bool,
) -> Result<()> {
let mut regions = vec![];
#[cfg(feature = "enterprise")]
@@ -646,7 +663,11 @@ async fn open_all_regions(
}
let open_regions = region_server
.handle_batch_open_requests(init_regions_parallelism, region_requests)
.handle_batch_open_requests(
init_regions_parallelism,
region_requests,
ignore_nonexistent_region,
)
.await?;
ensure!(
open_regions.len() == num_regions,
@@ -691,7 +712,11 @@ async fn open_all_regions(
}
let open_regions = region_server
.handle_batch_open_requests(init_regions_parallelism, region_requests)
.handle_batch_open_requests(
init_regions_parallelism,
region_requests,
ignore_nonexistent_region,
)
.await?;
ensure!(

View File

@@ -154,9 +154,10 @@ impl RegionServer {
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
ignore_nonexistent_region: bool,
) -> Result<Vec<RegionId>> {
self.inner
.handle_batch_open_requests(parallelism, requests)
.handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region)
.await
}
@@ -799,6 +800,7 @@ impl RegionServerInner {
engine: RegionEngineRef,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
ignore_nonexistent_region: bool,
) -> Result<Vec<RegionId>> {
let region_changes = requests
.iter()
@@ -836,8 +838,14 @@ impl RegionServerInner {
}
Err(e) => {
self.unset_region_status(region_id, &engine, *region_change);
error!(e; "Failed to open region: {}", region_id);
errors.push(e);
if e.status_code() == StatusCode::RegionNotFound
&& ignore_nonexistent_region
{
warn!("Region {} not found, ignore it, source: {:?}", region_id, e);
} else {
error!(e; "Failed to open region: {}", region_id);
errors.push(e);
}
}
}
}
@@ -866,6 +874,7 @@ impl RegionServerInner {
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
ignore_nonexistent_region: bool,
) -> Result<Vec<RegionId>> {
let mut engine_grouped_requests: HashMap<String, Vec<_>> =
HashMap::with_capacity(requests.len());
@@ -888,8 +897,13 @@ impl RegionServerInner {
.with_context(|| RegionEngineNotFoundSnafu { name: &engine })?
.clone();
results.push(
self.handle_batch_open_requests_inner(engine, parallelism, requests)
.await,
self.handle_batch_open_requests_inner(
engine,
parallelism,
requests,
ignore_nonexistent_region,
)
.await,
)
}
@@ -1492,6 +1506,85 @@ mod tests {
assert!(status.is_some());
}
#[tokio::test]
async fn test_batch_open_region_ignore_nonexistent_regions() {
common_telemetry::init_default_ut_logging();
let mut mock_region_server = mock_region_server();
let (engine, _receiver) = MockRegionEngine::with_mock_fn(
MITO_ENGINE_NAME,
Box::new(|region_id, _request| {
if region_id == RegionId::new(1, 1) {
error::RegionNotFoundSnafu { region_id }.fail()
} else {
Ok(0)
}
}),
);
mock_region_server.register_engine(engine.clone());
let region_ids = mock_region_server
.handle_batch_open_requests(
8,
vec![
(
RegionId::new(1, 1),
RegionOpenRequest {
engine: MITO_ENGINE_NAME.to_string(),
table_dir: String::new(),
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
},
),
(
RegionId::new(1, 2),
RegionOpenRequest {
engine: MITO_ENGINE_NAME.to_string(),
table_dir: String::new(),
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
},
),
],
true,
)
.await
.unwrap();
assert_eq!(region_ids, vec![RegionId::new(1, 2)]);
let err = mock_region_server
.handle_batch_open_requests(
8,
vec![
(
RegionId::new(1, 1),
RegionOpenRequest {
engine: MITO_ENGINE_NAME.to_string(),
table_dir: String::new(),
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
},
),
(
RegionId::new(1, 2),
RegionOpenRequest {
engine: MITO_ENGINE_NAME.to_string(),
table_dir: String::new(),
path_type: PathType::Bare,
options: Default::default(),
skip_wal_replay: false,
},
),
],
false,
)
.await
.unwrap_err();
assert_eq!(err.status_code(), StatusCode::Unexpected);
}
struct CurrentEngineTest {
region_id: RegionId,
current_region_status: Option<RegionEngineWithStatus>,

View File

@@ -18,6 +18,7 @@ pub(crate) mod leader;
pub(crate) mod maintenance;
pub(crate) mod node_lease;
pub(crate) mod procedure;
pub(crate) mod recovery;
mod util;
use std::collections::HashMap;
@@ -41,6 +42,9 @@ use crate::service::admin::leader::LeaderHandler;
use crate::service::admin::maintenance::MaintenanceHandler;
use crate::service::admin::node_lease::NodeLeaseHandler;
use crate::service::admin::procedure::ProcedureManagerHandler;
use crate::service::admin::recovery::{
get_recovery_mode, set_recovery_mode, unset_recovery_mode, RecoveryHandler,
};
use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response};
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
@@ -250,6 +254,9 @@ pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
let procedure_handler = Arc::new(ProcedureManagerHandler {
manager: metasrv.runtime_switch_manager().clone(),
});
let recovery_handler = Arc::new(RecoveryHandler {
manager: metasrv.runtime_switch_manager().clone(),
});
let health_router = AxumRouter::new().route(
"/",
@@ -457,13 +464,20 @@ pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
}),
);
let recovery_router = AxumRouter::new()
.route("/enable", routing::post(set_recovery_mode))
.route("/disable", routing::post(unset_recovery_mode))
.route("/status", routing::get(get_recovery_mode))
.with_state(recovery_handler);
let admin_router = AxumRouter::new()
.nest("/health", health_router)
.nest("/node-lease", node_lease_router)
.nest("/leader", leader_router)
.nest("/heartbeat", heartbeat_router)
.nest("/maintenance", maintenance_router)
.nest("/procedure-manager", procedure_router);
.nest("/procedure-manager", procedure_router)
.nest("/recovery", recovery_router);
AxumRouter::new().nest("/admin", admin_router)
}
@@ -1029,4 +1043,85 @@ mod axum_admin_tests {
let body = get_body_string(response).await;
assert!(body.contains("running"));
}
#[tokio::test]
async fn test_admin_recovery() {
let app = setup_axum_app().await;
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/admin/recovery/status")
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("false"));
// Enable recovery
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/admin/recovery/enable")
.method(Method::POST)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("true"));
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/admin/recovery/status")
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("true"));
// Disable recovery
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/admin/recovery/disable")
.method(Method::POST)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("false"));
let response = app
.clone()
.oneshot(
Request::builder()
.uri("/admin/recovery/status")
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = get_body_string(response).await;
assert!(body.contains("false"));
}
}

View File

@@ -0,0 +1,63 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use serde::{Deserialize, Serialize};
use servers::http::result::error_result::ErrorResponse;
pub(crate) type RecoveryHandlerRef = Arc<RecoveryHandler>;
pub(crate) struct RecoveryHandler {
pub(crate) manager: RuntimeSwitchManagerRef,
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct RecoveryResponse {
pub enabled: bool,
}
/// Get the recovery mode.
#[axum_macros::debug_handler]
pub(crate) async fn get_recovery_mode(State(handler): State<RecoveryHandlerRef>) -> Response {
let enabled = handler.manager.recovery_mode().await;
match enabled {
Ok(enabled) => (StatusCode::OK, Json(RecoveryResponse { enabled })).into_response(),
Err(e) => ErrorResponse::from_error(e).into_response(),
}
}
/// Set the recovery mode.
#[axum_macros::debug_handler]
pub(crate) async fn set_recovery_mode(State(handler): State<RecoveryHandlerRef>) -> Response {
match handler.manager.set_recovery_mode().await {
Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: true })).into_response(),
Err(e) => ErrorResponse::from_error(e).into_response(),
}
}
/// Unset the recovery mode.
#[axum_macros::debug_handler]
pub(crate) async fn unset_recovery_mode(State(handler): State<RecoveryHandlerRef>) -> Response {
match handler.manager.unset_recovery_mode().await {
Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: false })).into_response(),
Err(e) => ErrorResponse::from_error(e).into_response(),
}
}