diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 36becc09e2..2862d9d402 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -167,6 +167,7 @@ pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; pub const LEGACY_MAINTENANCE_KEY: &str = "__maintenance"; pub const MAINTENANCE_KEY: &str = "__switches/maintenance"; pub const PAUSE_PROCEDURE_KEY: &str = "__switches/pause_procedure"; +pub const RECOVERY_MODE_KEY: &str = "__switches/recovery"; pub const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; diff --git a/src/common/meta/src/key/runtime_switch.rs b/src/common/meta/src/key/runtime_switch.rs index 3fdaa24e85..f5eb9b058c 100644 --- a/src/common/meta/src/key/runtime_switch.rs +++ b/src/common/meta/src/key/runtime_switch.rs @@ -21,7 +21,7 @@ use moka::future::Cache; use snafu::ResultExt; use crate::error::{GetCacheSnafu, Result}; -use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY}; +use crate::key::{LEGACY_MAINTENANCE_KEY, MAINTENANCE_KEY, PAUSE_PROCEDURE_KEY, RECOVERY_MODE_KEY}; use crate::kv_backend::KvBackendRef; use crate::rpc::store::{BatchDeleteRequest, PutRequest}; @@ -131,6 +131,21 @@ impl RuntimeSwitchManager { pub async fn is_procedure_paused(&self) -> Result { self.exists(PAUSE_PROCEDURE_KEY).await } + + /// Enables recovery mode. + pub async fn set_recovery_mode(&self) -> Result<()> { + self.put_key(RECOVERY_MODE_KEY).await + } + + /// Unsets recovery mode. + pub async fn unset_recovery_mode(&self) -> Result<()> { + self.delete_keys(&[RECOVERY_MODE_KEY]).await + } + + /// Returns true if the system is currently in recovery mode. + pub async fn recovery_mode(&self) -> Result { + self.exists(RECOVERY_MODE_KEY).await + } } #[cfg(test)] @@ -221,4 +236,15 @@ mod tests { runtime_switch_manager.resume_procedure().await.unwrap(); assert!(!runtime_switch_manager.is_procedure_paused().await.unwrap()); } + + #[tokio::test] + async fn test_recovery_mode() { + let runtime_switch_manager = + Arc::new(RuntimeSwitchManager::new(Arc::new(MemoryKvBackend::new()))); + assert!(!runtime_switch_manager.recovery_mode().await.unwrap()); + runtime_switch_manager.set_recovery_mode().await.unwrap(); + assert!(runtime_switch_manager.recovery_mode().await.unwrap()); + runtime_switch_manager.unset_recovery_mode().await.unwrap(); + assert!(!runtime_switch_manager.recovery_mode().await.unwrap()); + } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5c87641947..cc05e038c0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -24,6 +24,7 @@ use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue}; +use common_meta::key::runtime_switch::RuntimeSwitchManager; use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::wal_options_allocator::prepare_wal_options; @@ -230,6 +231,12 @@ impl DatanodeBuilder { .new_region_server(schema_metadata_manager, region_event_listener) .await?; + // TODO(weny): Considering introducing a readonly kv_backend trait. + let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone()); + let is_recovery_mode = runtime_switch_manager + .recovery_mode() + .await + .context(GetMetadataSnafu)?; let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone()); let table_values = datanode_table_manager .tables(node_id) @@ -242,6 +249,8 @@ impl DatanodeBuilder { table_values, !controlled_by_metasrv, self.opts.init_regions_parallelism, + // Ignore nonexistent regions in recovery mode. + is_recovery_mode, ); if self.opts.init_regions_in_background { @@ -323,6 +332,12 @@ impl DatanodeBuilder { ) -> Result<()> { let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; + let runtime_switch_manager = RuntimeSwitchManager::new(kv_backend.clone()); + let is_recovery_mode = runtime_switch_manager + .recovery_mode() + .await + .context(GetMetadataSnafu)?; + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); let table_values = datanode_table_manager .tables(node_id) @@ -335,6 +350,7 @@ impl DatanodeBuilder { table_values, open_with_writable, self.opts.init_regions_parallelism, + is_recovery_mode, ) .await } @@ -558,6 +574,7 @@ async fn open_all_regions( table_values: Vec, open_with_writable: bool, init_regions_parallelism: usize, + ignore_nonexistent_region: bool, ) -> Result<()> { let mut regions = vec![]; #[cfg(feature = "enterprise")] @@ -616,7 +633,11 @@ async fn open_all_regions( } let open_regions = region_server - .handle_batch_open_requests(init_regions_parallelism, region_requests) + .handle_batch_open_requests( + init_regions_parallelism, + region_requests, + ignore_nonexistent_region, + ) .await?; ensure!( open_regions.len() == num_regions, @@ -660,7 +681,11 @@ async fn open_all_regions( } let open_regions = region_server - .handle_batch_open_requests(init_regions_parallelism, region_requests) + .handle_batch_open_requests( + init_regions_parallelism, + region_requests, + ignore_nonexistent_region, + ) .await?; ensure!( diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ac51fdbe23..3c73ff4000 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -154,9 +154,10 @@ impl RegionServer { &self, parallelism: usize, requests: Vec<(RegionId, RegionOpenRequest)>, + ignore_nonexistent_region: bool, ) -> Result> { self.inner - .handle_batch_open_requests(parallelism, requests) + .handle_batch_open_requests(parallelism, requests, ignore_nonexistent_region) .await } @@ -799,6 +800,7 @@ impl RegionServerInner { engine: RegionEngineRef, parallelism: usize, requests: Vec<(RegionId, RegionOpenRequest)>, + ignore_nonexistent_region: bool, ) -> Result> { let region_changes = requests .iter() @@ -836,8 +838,14 @@ impl RegionServerInner { } Err(e) => { self.unset_region_status(region_id, &engine, *region_change); - error!(e; "Failed to open region: {}", region_id); - errors.push(e); + if e.status_code() == StatusCode::RegionNotFound + && ignore_nonexistent_region + { + warn!("Region {} not found, ignore it, source: {:?}", region_id, e); + } else { + error!(e; "Failed to open region: {}", region_id); + errors.push(e); + } } } } @@ -866,6 +874,7 @@ impl RegionServerInner { &self, parallelism: usize, requests: Vec<(RegionId, RegionOpenRequest)>, + ignore_nonexistent_region: bool, ) -> Result> { let mut engine_grouped_requests: HashMap> = HashMap::with_capacity(requests.len()); @@ -888,8 +897,13 @@ impl RegionServerInner { .with_context(|| RegionEngineNotFoundSnafu { name: &engine })? .clone(); results.push( - self.handle_batch_open_requests_inner(engine, parallelism, requests) - .await, + self.handle_batch_open_requests_inner( + engine, + parallelism, + requests, + ignore_nonexistent_region, + ) + .await, ) } @@ -1489,6 +1503,85 @@ mod tests { assert!(status.is_some()); } + #[tokio::test] + async fn test_batch_open_region_ignore_nonexistent_regions() { + common_telemetry::init_default_ut_logging(); + let mut mock_region_server = mock_region_server(); + let (engine, _receiver) = MockRegionEngine::with_mock_fn( + MITO_ENGINE_NAME, + Box::new(|region_id, _request| { + if region_id == RegionId::new(1, 1) { + error::RegionNotFoundSnafu { region_id }.fail() + } else { + Ok(0) + } + }), + ); + mock_region_server.register_engine(engine.clone()); + + let region_ids = mock_region_server + .handle_batch_open_requests( + 8, + vec![ + ( + RegionId::new(1, 1), + RegionOpenRequest { + engine: MITO_ENGINE_NAME.to_string(), + table_dir: String::new(), + path_type: PathType::Bare, + options: Default::default(), + skip_wal_replay: false, + }, + ), + ( + RegionId::new(1, 2), + RegionOpenRequest { + engine: MITO_ENGINE_NAME.to_string(), + table_dir: String::new(), + path_type: PathType::Bare, + options: Default::default(), + skip_wal_replay: false, + }, + ), + ], + true, + ) + .await + .unwrap(); + assert_eq!(region_ids, vec![RegionId::new(1, 2)]); + + let err = mock_region_server + .handle_batch_open_requests( + 8, + vec![ + ( + RegionId::new(1, 1), + RegionOpenRequest { + engine: MITO_ENGINE_NAME.to_string(), + table_dir: String::new(), + path_type: PathType::Bare, + options: Default::default(), + skip_wal_replay: false, + }, + ), + ( + RegionId::new(1, 2), + RegionOpenRequest { + engine: MITO_ENGINE_NAME.to_string(), + table_dir: String::new(), + path_type: PathType::Bare, + options: Default::default(), + skip_wal_replay: false, + }, + ), + ], + false, + ) + .await + .unwrap_err(); + assert_eq!(err.status_code(), StatusCode::Unexpected); + } + struct CurrentEngineTest { region_id: RegionId, current_region_status: Option, diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index 23fc7f89c0..a986b15cd4 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -18,6 +18,7 @@ pub(crate) mod leader; pub(crate) mod maintenance; pub(crate) mod node_lease; pub(crate) mod procedure; +pub(crate) mod recovery; mod util; use std::collections::HashMap; @@ -41,6 +42,9 @@ use crate::service::admin::leader::LeaderHandler; use crate::service::admin::maintenance::MaintenanceHandler; use crate::service::admin::node_lease::NodeLeaseHandler; use crate::service::admin::procedure::ProcedureManagerHandler; +use crate::service::admin::recovery::{ + get_recovery_mode, set_recovery_mode, unset_recovery_mode, RecoveryHandler, +}; use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response}; pub fn make_admin_service(metasrv: Arc) -> Admin { @@ -250,6 +254,9 @@ pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { let procedure_handler = Arc::new(ProcedureManagerHandler { manager: metasrv.runtime_switch_manager().clone(), }); + let recovery_handler = Arc::new(RecoveryHandler { + manager: metasrv.runtime_switch_manager().clone(), + }); let health_router = AxumRouter::new().route( "/", @@ -457,13 +464,20 @@ pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { }), ); + let recovery_router = AxumRouter::new() + .route("/enable", routing::post(set_recovery_mode)) + .route("/disable", routing::post(unset_recovery_mode)) + .route("/status", routing::get(get_recovery_mode)) + .with_state(recovery_handler); + let admin_router = AxumRouter::new() .nest("/health", health_router) .nest("/node-lease", node_lease_router) .nest("/leader", leader_router) .nest("/heartbeat", heartbeat_router) .nest("/maintenance", maintenance_router) - .nest("/procedure-manager", procedure_router); + .nest("/procedure-manager", procedure_router) + .nest("/recovery", recovery_router); AxumRouter::new().nest("/admin", admin_router) } @@ -1029,4 +1043,85 @@ mod axum_admin_tests { let body = get_body_string(response).await; assert!(body.contains("running")); } + + #[tokio::test] + async fn test_admin_recovery() { + let app = setup_axum_app().await; + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/recovery/status") + .method(Method::GET) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("false")); + + // Enable recovery + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/recovery/enable") + .method(Method::POST) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("true")); + + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/recovery/status") + .method(Method::GET) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("true")); + + // Disable recovery + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/recovery/disable") + .method(Method::POST) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("false")); + + let response = app + .clone() + .oneshot( + Request::builder() + .uri("/admin/recovery/status") + .method(Method::GET) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = get_body_string(response).await; + assert!(body.contains("false")); + } } diff --git a/src/meta-srv/src/service/admin/recovery.rs b/src/meta-srv/src/service/admin/recovery.rs new file mode 100644 index 0000000000..d35d52450d --- /dev/null +++ b/src/meta-srv/src/service/admin/recovery.rs @@ -0,0 +1,63 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; +use serde::{Deserialize, Serialize}; +use servers::http::result::error_result::ErrorResponse; + +pub(crate) type RecoveryHandlerRef = Arc; + +pub(crate) struct RecoveryHandler { + pub(crate) manager: RuntimeSwitchManagerRef, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct RecoveryResponse { + pub enabled: bool, +} + +/// Get the recovery mode. +#[axum_macros::debug_handler] +pub(crate) async fn get_recovery_mode(State(handler): State) -> Response { + let enabled = handler.manager.recovery_mode().await; + + match enabled { + Ok(enabled) => (StatusCode::OK, Json(RecoveryResponse { enabled })).into_response(), + Err(e) => ErrorResponse::from_error(e).into_response(), + } +} + +/// Set the recovery mode. +#[axum_macros::debug_handler] +pub(crate) async fn set_recovery_mode(State(handler): State) -> Response { + match handler.manager.set_recovery_mode().await { + Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: true })).into_response(), + Err(e) => ErrorResponse::from_error(e).into_response(), + } +} + +/// Unset the recovery mode. +#[axum_macros::debug_handler] +pub(crate) async fn unset_recovery_mode(State(handler): State) -> Response { + match handler.manager.unset_recovery_mode().await { + Ok(_) => (StatusCode::OK, Json(RecoveryResponse { enabled: false })).into_response(), + Err(e) => ErrorResponse::from_error(e).into_response(), + } +}