feat: allow setting next table id via http api (#6597)

* feat: allow reset next table id via http api

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggesions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-07-30 11:46:39 +08:00
committed by WenyXu
parent 686ee9f579
commit ce1d0b6c4c
7 changed files with 324 additions and 2 deletions

View File

@@ -175,6 +175,10 @@ impl TableMetadataAllocator {
region_wal_options,
})
}
pub fn table_id_sequence(&self) -> SequenceRef {
self.table_id_sequence.clone()
}
}
pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;

View File

@@ -15,6 +15,7 @@
use std::ops::Range;
use std::sync::Arc;
use common_telemetry::warn;
use snafu::ensure;
use tokio::sync::Mutex;
@@ -82,15 +83,29 @@ pub struct Sequence {
}
impl Sequence {
/// Returns the next value and increments the sequence.
pub async fn next(&self) -> Result<u64> {
let mut inner = self.inner.lock().await;
inner.next().await
}
/// Returns the range of available sequences.
pub async fn min_max(&self) -> Range<u64> {
let inner = self.inner.lock().await;
inner.initial..inner.max
}
/// Returns the next value without incrementing the sequence.
pub async fn peek(&self) -> u64 {
let inner = self.inner.lock().await;
inner.next
}
/// Jumps to the given value.
pub async fn jump_to(&self, next: u64) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.jump_to(next).await
}
}
struct Inner {
@@ -203,11 +218,54 @@ impl Inner {
}
.fail()
}
/// Jumps to the given value.
/// The next value must be greater than the current next value.
pub async fn jump_to(&mut self, next: u64) -> Result<()> {
ensure!(
next > self.next,
error::UnexpectedSnafu {
err_msg: format!(
"The next value {} is not greater than the current next value {}",
next, self.next
),
}
);
let key = self.name.as_bytes();
let expect = self
.generator
.get(key)
.await?
.map(|kv| kv.value)
.unwrap_or_default();
let req = CompareAndPutRequest {
key: key.to_vec(),
expect,
value: u64::to_le_bytes(next).to_vec(),
};
let res = self.generator.compare_and_put(req).await?;
ensure!(
res.success,
error::UnexpectedSnafu {
err_msg: format!("Failed to reset sequence {} to {}", self.name, next),
}
);
warn!("Sequence {} jumped to {}", self.name, next);
// Reset the sequence to the initial value.
self.initial = next;
self.next = next;
self.range = None;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::assert_matches::assert_matches;
use std::collections::HashSet;
use std::sync::Arc;
@@ -307,6 +365,28 @@ mod tests {
}
}
#[tokio::test]
async fn test_sequence_set() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
.initial(1024)
.step(10)
.build();
seq.jump_to(1025).await.unwrap();
assert_eq!(seq.next().await.unwrap(), 1025);
let err = seq.jump_to(1025).await.unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
assert_eq!(seq.next().await.unwrap(), 1026);
seq.jump_to(1048).await.unwrap();
// Recreate the sequence to test the sequence is reset correctly.
let seq = SequenceBuilder::new("test_seq", kv_backend)
.initial(1024)
.step(10)
.build();
assert_eq!(seq.next().await.unwrap(), 1048);
}
#[tokio::test]
async fn test_sequence_out_of_rage() {
let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))

View File

@@ -121,6 +121,13 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to set next sequence number"))]
SetNextSequence {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to start telemetry task"))]
StartTelemetryTask {
#[snafu(implicit)]
@@ -1017,7 +1024,9 @@ impl ErrorExt for Error {
| Error::ListTables { source, .. } => source.status_code(),
Error::StartTelemetryTask { source, .. } => source.status_code(),
Error::NextSequence { source, .. } => source.status_code(),
Error::NextSequence { source, .. } | Error::SetNextSequence { source, .. } => {
source.status_code()
}
Error::DowngradeLeader { source, .. } => source.status_code(),
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
Error::SubmitDdlTask { source, .. } => source.status_code(),

View File

@@ -37,6 +37,7 @@ use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::sequence::SequenceRef;
use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
use common_options::datanode::DatanodeClientOptions;
use common_procedure::options::ProcedureConfig;
@@ -436,6 +437,7 @@ pub struct Metasrv {
cache_invalidator: CacheInvalidatorRef,
leader_region_registry: LeaderRegionRegistryRef,
wal_prune_ticker: Option<WalPruneTickerRef>,
table_id_sequence: SequenceRef,
plugins: Plugins,
}
@@ -707,6 +709,10 @@ impl Metasrv {
self.plugins.get::<SubscriptionManagerRef>()
}
pub fn table_id_sequence(&self) -> &SequenceRef {
&self.table_id_sequence
}
pub fn plugins(&self) -> &Plugins {
&self.plugins
}

View File

@@ -235,6 +235,7 @@ impl MetasrvBuilder {
peer_allocator,
))
});
let table_id_sequence = table_metadata_allocator.table_id_sequence();
let flow_selector = Arc::new(RoundRobinSelector::new(
SelectTarget::Flownode,
@@ -477,6 +478,7 @@ impl MetasrvBuilder {
cache_invalidator,
leader_region_registry,
wal_prune_ticker,
table_id_sequence,
})
}
}

View File

@@ -19,6 +19,7 @@ pub(crate) mod maintenance;
pub(crate) mod node_lease;
pub(crate) mod procedure;
pub(crate) mod recovery;
pub(crate) mod sequencer;
mod util;
use std::collections::HashMap;
@@ -45,6 +46,7 @@ use crate::service::admin::procedure::ProcedureManagerHandler;
use crate::service::admin::recovery::{
get_recovery_mode, set_recovery_mode, unset_recovery_mode, RecoveryHandler,
};
use crate::service::admin::sequencer::TableIdSequenceHandler;
use crate::service::admin::util::{to_axum_json_response, to_axum_not_found_response};
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
@@ -257,6 +259,17 @@ pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
let recovery_handler = Arc::new(RecoveryHandler {
manager: metasrv.runtime_switch_manager().clone(),
});
let table_id_sequence_handler = Arc::new(TableIdSequenceHandler {
table_id_sequence: metasrv.table_id_sequence().clone(),
runtime_switch_manager: metasrv.runtime_switch_manager().clone(),
});
let sequence_router = AxumRouter::new().nest(
"/table",
AxumRouter::new()
.route("/next-id", routing::get(sequencer::get_next_table_id))
.route("/set-next-id", routing::post(sequencer::set_next_table_id))
.with_state(table_id_sequence_handler),
);
let health_router = AxumRouter::new().route(
"/",
@@ -477,7 +490,8 @@ pub fn admin_axum_router(metasrv: Arc<Metasrv>) -> AxumRouter {
.nest("/heartbeat", heartbeat_router)
.nest("/maintenance", maintenance_router)
.nest("/procedure-manager", procedure_router)
.nest("/recovery", recovery_router);
.nest("/recovery", recovery_router)
.nest("/sequence", sequence_router);
AxumRouter::new().nest("/admin", admin_router)
}
@@ -830,6 +844,7 @@ mod axum_admin_tests {
use super::*;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::MetasrvOptions;
use crate::service::admin::sequencer::NextTableIdResponse;
async fn setup_axum_app() -> AxumRouter {
let kv_backend = Arc::new(MemoryKvBackend::new());
@@ -848,6 +863,11 @@ mod axum_admin_tests {
String::from_utf8_lossy(&body_bytes).to_string()
}
async fn into_bytes(resp: axum::response::Response) -> Vec<u8> {
let body_bytes = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
body_bytes.to_vec()
}
#[tokio::test]
async fn test_admin_health() {
let app = setup_axum_app().await;
@@ -1124,4 +1144,119 @@ mod axum_admin_tests {
let body = get_body_string(response).await;
assert!(body.contains("false"));
}
#[tokio::test]
async fn test_admin_sequence_table_id() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::new());
let metasrv = MetasrvBuilder::new()
.options(MetasrvOptions::default())
.kv_backend(kv_backend)
.build()
.await
.unwrap();
let metasrv = Arc::new(metasrv);
let runtime_switch_manager = metasrv.runtime_switch_manager().clone();
let app = admin_axum_router(metasrv);
// Set recovery mode to true
runtime_switch_manager.set_recovery_mode().await.unwrap();
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri("/admin/sequence/table/next-id")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = into_bytes(response).await;
let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(resp.next_table_id, 1024);
// Bad request
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::POST)
.header(http::header::CONTENT_TYPE, "application/json")
.uri("/admin/sequence/table/set-next-id")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::BAD_REQUEST);
// Bad next id
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::POST)
.header(http::header::CONTENT_TYPE, "application/json")
.uri("/admin/sequence/table/set-next-id")
.body(Body::from(r#"{"next_table_id": 0}"#))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
let body = get_body_string(response).await;
assert!(body.contains("is not greater than the current next value"));
// Set next id
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::POST)
.header(http::header::CONTENT_TYPE, "application/json")
.uri("/admin/sequence/table/set-next-id")
.body(Body::from(r#"{"next_table_id": 2048}"#))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
// Set next id
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::GET)
.uri("/admin/sequence/table/next-id")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let body = into_bytes(response).await;
let resp: NextTableIdResponse = serde_json::from_slice(&body).unwrap();
assert_eq!(resp.next_table_id, 2048);
// Set recovery mode to false
runtime_switch_manager.unset_recovery_mode().await.unwrap();
// Set next id with recovery mode disabled
let response = app
.clone()
.oneshot(
Request::builder()
.method(Method::POST)
.header(http::header::CONTENT_TYPE, "application/json")
.uri("/admin/sequence/table/set-next-id")
.body(Body::from(r#"{"next_table_id": 2049}"#))
.unwrap(),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
let body = get_body_string(response).await;
assert!(body.contains("Setting next table id is only allowed in recovery mode"));
}
}

View File

@@ -0,0 +1,86 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use axum::extract::{self, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::sequence::SequenceRef;
use serde::{Deserialize, Serialize};
use servers::http::result::error_result::ErrorResponse;
use snafu::{ensure, ResultExt};
use crate::error::{Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu};
pub type TableIdSequenceHandlerRef = Arc<TableIdSequenceHandler>;
#[derive(Clone)]
pub(crate) struct TableIdSequenceHandler {
pub(crate) table_id_sequence: SequenceRef,
pub(crate) runtime_switch_manager: RuntimeSwitchManagerRef,
}
impl TableIdSequenceHandler {
async fn set_next_table_id(&self, next_table_id: u32) -> Result<()> {
ensure!(
self.runtime_switch_manager
.recovery_mode()
.await
.context(RuntimeSwitchManagerSnafu)?,
UnexpectedSnafu {
violated: "Setting next table id is only allowed in recovery mode",
}
);
self.table_id_sequence
.jump_to(next_table_id as u64)
.await
.context(SetNextSequenceSnafu)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct NextTableIdResponse {
pub(crate) next_table_id: u32,
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct ResetTableIdRequest {
pub(crate) next_table_id: u32,
}
/// Set the next table id.
#[axum_macros::debug_handler]
pub(crate) async fn set_next_table_id(
State(handler): State<TableIdSequenceHandlerRef>,
extract::Json(ResetTableIdRequest { next_table_id }): extract::Json<ResetTableIdRequest>,
) -> Response {
match handler.set_next_table_id(next_table_id).await {
Ok(_) => (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response(),
Err(e) => ErrorResponse::from_error(e).into_response(),
}
}
/// Get the next table id without incrementing the sequence.
#[axum_macros::debug_handler]
pub(crate) async fn get_next_table_id(
State(handler): State<TableIdSequenceHandlerRef>,
) -> Response {
let next_table_id = handler.table_id_sequence.peek().await as u32;
(StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response()
}