From ce1d0b6c4ce3849059c0331b676bb25bd57769b2 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 30 Jul 2025 11:46:39 +0800 Subject: [PATCH] feat: allow setting next table id via http api (#6597) * feat: allow reset next table id via http api Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: apply suggesions from CR Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/ddl/table_meta.rs | 4 + src/common/meta/src/sequence.rs | 80 ++++++++++++ src/meta-srv/src/error.rs | 11 +- src/meta-srv/src/metasrv.rs | 6 + src/meta-srv/src/metasrv/builder.rs | 2 + src/meta-srv/src/service/admin.rs | 137 +++++++++++++++++++- src/meta-srv/src/service/admin/sequencer.rs | 86 ++++++++++++ 7 files changed, 324 insertions(+), 2 deletions(-) create mode 100644 src/meta-srv/src/service/admin/sequencer.rs diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index f07e2c2f6f..e2e2ce36c4 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -175,6 +175,10 @@ impl TableMetadataAllocator { region_wal_options, }) } + + pub fn table_id_sequence(&self) -> SequenceRef { + self.table_id_sequence.clone() + } } pub type PeerAllocatorRef = Arc; diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index b1acc961d4..18f3547b39 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -15,6 +15,7 @@ use std::ops::Range; use std::sync::Arc; +use common_telemetry::warn; use snafu::ensure; use tokio::sync::Mutex; @@ -82,15 +83,29 @@ pub struct Sequence { } impl Sequence { + /// Returns the next value and increments the sequence. pub async fn next(&self) -> Result { let mut inner = self.inner.lock().await; inner.next().await } + /// Returns the range of available sequences. pub async fn min_max(&self) -> Range { let inner = self.inner.lock().await; inner.initial..inner.max } + + /// Returns the next value without incrementing the sequence. + pub async fn peek(&self) -> u64 { + let inner = self.inner.lock().await; + inner.next + } + + /// Jumps to the given value. + pub async fn jump_to(&self, next: u64) -> Result<()> { + let mut inner = self.inner.lock().await; + inner.jump_to(next).await + } } struct Inner { @@ -203,11 +218,54 @@ impl Inner { } .fail() } + + /// Jumps to the given value. + /// The next value must be greater than the current next value. + pub async fn jump_to(&mut self, next: u64) -> Result<()> { + ensure!( + next > self.next, + error::UnexpectedSnafu { + err_msg: format!( + "The next value {} is not greater than the current next value {}", + next, self.next + ), + } + ); + + let key = self.name.as_bytes(); + let expect = self + .generator + .get(key) + .await? + .map(|kv| kv.value) + .unwrap_or_default(); + + let req = CompareAndPutRequest { + key: key.to_vec(), + expect, + value: u64::to_le_bytes(next).to_vec(), + }; + let res = self.generator.compare_and_put(req).await?; + ensure!( + res.success, + error::UnexpectedSnafu { + err_msg: format!("Failed to reset sequence {} to {}", self.name, next), + } + ); + warn!("Sequence {} jumped to {}", self.name, next); + // Reset the sequence to the initial value. + self.initial = next; + self.next = next; + self.range = None; + + Ok(()) + } } #[cfg(test)] mod tests { use std::any::Any; + use std::assert_matches::assert_matches; use std::collections::HashSet; use std::sync::Arc; @@ -307,6 +365,28 @@ mod tests { } } + #[tokio::test] + async fn test_sequence_set() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()) + .initial(1024) + .step(10) + .build(); + seq.jump_to(1025).await.unwrap(); + assert_eq!(seq.next().await.unwrap(), 1025); + let err = seq.jump_to(1025).await.unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + assert_eq!(seq.next().await.unwrap(), 1026); + + seq.jump_to(1048).await.unwrap(); + // Recreate the sequence to test the sequence is reset correctly. + let seq = SequenceBuilder::new("test_seq", kv_backend) + .initial(1024) + .step(10) + .build(); + assert_eq!(seq.next().await.unwrap(), 1048); + } + #[tokio::test] async fn test_sequence_out_of_rage() { let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default())) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index dad3a701d5..cb8329c38c 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -121,6 +121,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to set next sequence number"))] + SetNextSequence { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to start telemetry task"))] StartTelemetryTask { #[snafu(implicit)] @@ -1017,7 +1024,9 @@ impl ErrorExt for Error { | Error::ListTables { source, .. } => source.status_code(), Error::StartTelemetryTask { source, .. } => source.status_code(), - Error::NextSequence { source, .. } => source.status_code(), + Error::NextSequence { source, .. } | Error::SetNextSequence { source, .. } => { + source.status_code() + } Error::DowngradeLeader { source, .. } => source.status_code(), Error::RegisterProcedureLoader { source, .. } => source.status_code(), Error::SubmitDdlTask { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index a9c507ff92..dc5da88c46 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -37,6 +37,7 @@ use common_meta::node_expiry_listener::NodeExpiryListener; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; +use common_meta::sequence::SequenceRef; use common_meta::wal_options_allocator::WalOptionsAllocatorRef; use common_options::datanode::DatanodeClientOptions; use common_procedure::options::ProcedureConfig; @@ -436,6 +437,7 @@ pub struct Metasrv { cache_invalidator: CacheInvalidatorRef, leader_region_registry: LeaderRegionRegistryRef, wal_prune_ticker: Option, + table_id_sequence: SequenceRef, plugins: Plugins, } @@ -707,6 +709,10 @@ impl Metasrv { self.plugins.get::() } + pub fn table_id_sequence(&self) -> &SequenceRef { + &self.table_id_sequence + } + pub fn plugins(&self) -> &Plugins { &self.plugins } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index da4036428e..245a5c87d3 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -235,6 +235,7 @@ impl MetasrvBuilder { peer_allocator, )) }); + let table_id_sequence = table_metadata_allocator.table_id_sequence(); let flow_selector = Arc::new(RoundRobinSelector::new( SelectTarget::Flownode, @@ -477,6 +478,7 @@ impl MetasrvBuilder { cache_invalidator, leader_region_registry, wal_prune_ticker, + table_id_sequence, }) } } diff --git a/src/meta-srv/src/service/admin.rs b/src/meta-srv/src/service/admin.rs index a986b15cd4..de588904e0 100644 --- a/src/meta-srv/src/service/admin.rs +++ b/src/meta-srv/src/service/admin.rs @@ -19,6 +19,7 @@ pub(crate) mod maintenance; pub(crate) mod node_lease; pub(crate) mod procedure; pub(crate) mod recovery; +pub(crate) mod sequencer; mod util; use std::collections::HashMap; @@ -45,6 +46,7 @@ use crate::service::admin::procedure::ProcedureManagerHandler; use crate::service::admin::recovery::{ get_recovery_mode, set_recovery_mode, unset_recovery_mode, RecoveryHandler, }; +use crate::service::admin::sequencer::TableIdSequenceHandler; use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response}; pub fn make_admin_service(metasrv: Arc) -> Admin { @@ -257,6 +259,17 @@ pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { let recovery_handler = Arc::new(RecoveryHandler { manager: metasrv.runtime_switch_manager().clone(), }); + let table_id_sequence_handler = Arc::new(TableIdSequenceHandler { + table_id_sequence: metasrv.table_id_sequence().clone(), + runtime_switch_manager: metasrv.runtime_switch_manager().clone(), + }); + let sequence_router = AxumRouter::new().nest( + "/table", + AxumRouter::new() + .route("/next-id", routing::get(sequencer::get_next_table_id)) + .route("/set-next-id", routing::post(sequencer::set_next_table_id)) + .with_state(table_id_sequence_handler), + ); let health_router = AxumRouter::new().route( "/", @@ -477,7 +490,8 @@ pub fn admin_axum_router(metasrv: Arc) -> AxumRouter { .nest("/heartbeat", heartbeat_router) .nest("/maintenance", maintenance_router) .nest("/procedure-manager", procedure_router) - .nest("/recovery", recovery_router); + .nest("/recovery", recovery_router) + .nest("/sequence", sequence_router); AxumRouter::new().nest("/admin", admin_router) } @@ -830,6 +844,7 @@ mod axum_admin_tests { use super::*; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::MetasrvOptions; + use crate::service::admin::sequencer::NextTableIdResponse; async fn setup_axum_app() -> AxumRouter { let kv_backend = Arc::new(MemoryKvBackend::new()); @@ -848,6 +863,11 @@ mod axum_admin_tests { String::from_utf8_lossy(&body_bytes).to_string() } + async fn into_bytes(resp: axum::response::Response) -> Vec { + let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); + body_bytes.to_vec() + } + #[tokio::test] async fn test_admin_health() { let app = setup_axum_app().await; @@ -1124,4 +1144,119 @@ mod axum_admin_tests { let body = get_body_string(response).await; assert!(body.contains("false")); } + + #[tokio::test] + async fn test_admin_sequence_table_id() { + common_telemetry::init_default_ut_logging(); + let kv_backend = Arc::new(MemoryKvBackend::new()); + let metasrv = MetasrvBuilder::new() + .options(MetasrvOptions::default()) + .kv_backend(kv_backend) + .build() + .await + .unwrap(); + let metasrv = Arc::new(metasrv); + let runtime_switch_manager = metasrv.runtime_switch_manager().clone(); + let app = admin_axum_router(metasrv); + // Set recovery mode to true + runtime_switch_manager.set_recovery_mode().await.unwrap(); + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/admin/sequence/table/next-id") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = into_bytes(response).await; + let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.next_table_id, 1024); + + // Bad request + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .header(http::header::CONTENT_TYPE, "application/json") + .uri("/admin/sequence/table/set-next-id") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + + // Bad next id + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .header(http::header::CONTENT_TYPE, "application/json") + .uri("/admin/sequence/table/set-next-id") + .body(Body::from(r#"{"next_table_id": 0}"#)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + let body = get_body_string(response).await; + assert!(body.contains("is not greater than the current next value")); + + // Set next id + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .header(http::header::CONTENT_TYPE, "application/json") + .uri("/admin/sequence/table/set-next-id") + .body(Body::from(r#"{"next_table_id": 2048}"#)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + // Set next id + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::GET) + .uri("/admin/sequence/table/next-id") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = into_bytes(response).await; + let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.next_table_id, 2048); + + // Set recovery mode to false + runtime_switch_manager.unset_recovery_mode().await.unwrap(); + // Set next id with recovery mode disabled + let response = app + .clone() + .oneshot( + Request::builder() + .method(Method::POST) + .header(http::header::CONTENT_TYPE, "application/json") + .uri("/admin/sequence/table/set-next-id") + .body(Body::from(r#"{"next_table_id": 2049}"#)) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + let body = get_body_string(response).await; + assert!(body.contains("Setting next table id is only allowed in recovery mode")); + } } diff --git a/src/meta-srv/src/service/admin/sequencer.rs b/src/meta-srv/src/service/admin/sequencer.rs new file mode 100644 index 0000000000..ea543c7515 --- /dev/null +++ b/src/meta-srv/src/service/admin/sequencer.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use axum::extract::{self, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; +use common_meta::sequence::SequenceRef; +use serde::{Deserialize, Serialize}; +use servers::http::result::error_result::ErrorResponse; +use snafu::{ensure, ResultExt}; + +use crate::error::{Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu}; + +pub type TableIdSequenceHandlerRef = Arc; + +#[derive(Clone)] +pub(crate) struct TableIdSequenceHandler { + pub(crate) table_id_sequence: SequenceRef, + pub(crate) runtime_switch_manager: RuntimeSwitchManagerRef, +} + +impl TableIdSequenceHandler { + async fn set_next_table_id(&self, next_table_id: u32) -> Result<()> { + ensure!( + self.runtime_switch_manager + .recovery_mode() + .await + .context(RuntimeSwitchManagerSnafu)?, + UnexpectedSnafu { + violated: "Setting next table id is only allowed in recovery mode", + } + ); + + self.table_id_sequence + .jump_to(next_table_id as u64) + .await + .context(SetNextSequenceSnafu) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct NextTableIdResponse { + pub(crate) next_table_id: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ResetTableIdRequest { + pub(crate) next_table_id: u32, +} + +/// Set the next table id. +#[axum_macros::debug_handler] +pub(crate) async fn set_next_table_id( + State(handler): State, + extract::Json(ResetTableIdRequest { next_table_id }): extract::Json, +) -> Response { + match handler.set_next_table_id(next_table_id).await { + Ok(_) => (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response(), + Err(e) => ErrorResponse::from_error(e).into_response(), + } +} + +/// Get the next table id without incrementing the sequence. +#[axum_macros::debug_handler] +pub(crate) async fn get_next_table_id( + State(handler): State, +) -> Response { + let next_table_id = handler.table_id_sequence.peek().await as u32; + + (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response() +}