From eaaf9448c725733c1b6031a2d8064bb7d9c4ddb6 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 4 Aug 2025 19:31:06 +0800 Subject: [PATCH] fix: fix sequence peek method to return correct values when sequence is not initialized (#6643) fix: improve sequence peek method to handle uninitialized sequences Signed-off-by: WenyXu --- src/common/meta/src/sequence.rs | 65 ++++++++++++++++++++- src/meta-srv/src/error.rs | 13 ++++- src/meta-srv/src/service/admin/sequencer.rs | 22 +++++-- 3 files changed, 90 insertions(+), 10 deletions(-) diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index 18f3547b39..0f226291c7 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -15,7 +15,7 @@ use std::ops::Range; use std::sync::Arc; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use snafu::ensure; use tokio::sync::Mutex; @@ -96,9 +96,9 @@ impl Sequence { } /// Returns the next value without incrementing the sequence. - pub async fn peek(&self) -> u64 { + pub async fn peek(&self) -> Result { let inner = self.inner.lock().await; - inner.next + inner.peek().await } /// Jumps to the given value. @@ -136,6 +136,7 @@ impl Inner { if range.contains(&self.next) { let res = Ok(self.next); self.next += 1; + debug!("sequence {} next: {}", self.name, self.next); return res; } self.range = None; @@ -144,6 +145,10 @@ impl Inner { let range = self.next_range().await?; self.next = range.start; self.range = Some(range); + debug!( + "sequence {} next: {}, range: {:?}", + self.name, self.next, self.range + ); } } } @@ -154,6 +159,38 @@ impl Inner { .fail() } + pub async fn peek(&self) -> Result { + if self.range.is_some() { + return Ok(self.next); + } + + // If the range is not set, means the sequence is not initialized. + let key = self.name.as_bytes(); + let value = self.generator.get(key).await?.map(|kv| kv.value); + let next = if let Some(value) = value { + let v: [u8; 8] = match value.try_into() { + Ok(a) => a, + Err(v) => { + return error::UnexpectedSequenceValueSnafu { + err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name), + } + .fail() + } + }; + let next = u64::from_le_bytes(v); + debug!("The next value of sequence {} is {}", self.name, next); + next + } else { + debug!( + "The next value of sequence {} is not set, use initial value {}", + self.name, self.initial + ); + self.initial + }; + + Ok(next) + } + pub async fn next_range(&self) -> Result> { let key = self.name.as_bytes(); let mut start = self.next; @@ -458,4 +495,26 @@ mod tests { let next = seq.next().await; assert!(next.is_err()); } + + #[tokio::test] + async fn test_sequence_peek() { + common_telemetry::init_default_ut_logging(); + let kv_backend = Arc::new(MemoryKvBackend::default()); + let seq = SequenceBuilder::new("test_seq", kv_backend.clone()) + .step(10) + .initial(1024) + .build(); + // The sequence value in the kv backend is not set, so the peek value should be the initial value. + assert_eq!(seq.peek().await.unwrap(), 1024); + + for i in 0..11 { + let v = seq.next().await.unwrap(); + assert_eq!(v, 1024 + i); + } + let seq = SequenceBuilder::new("test_seq", kv_backend) + .initial(1024) + .build(); + // The sequence is not initialized, it will fetch the value from the kv backend. + assert_eq!(seq.peek().await.unwrap(), 1044); + } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9a136100e3..5e4dfb6e4b 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -127,6 +127,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to peek sequence number"))] + PeekSequence { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to start telemetry task"))] StartTelemetryTask { #[snafu(implicit)] @@ -1031,9 +1038,9 @@ impl ErrorExt for Error { | Error::ListTables { source, .. } => source.status_code(), Error::StartTelemetryTask { source, .. } => source.status_code(), - Error::NextSequence { source, .. } | Error::SetNextSequence { source, .. } => { - source.status_code() - } + Error::NextSequence { source, .. } + | Error::SetNextSequence { source, .. } + | Error::PeekSequence { source, .. } => source.status_code(), Error::DowngradeLeader { source, .. } => source.status_code(), Error::RegisterProcedureLoader { source, .. } => source.status_code(), Error::SubmitDdlTask { source, .. } diff --git a/src/meta-srv/src/service/admin/sequencer.rs b/src/meta-srv/src/service/admin/sequencer.rs index ea543c7515..fa0558672e 100644 --- a/src/meta-srv/src/service/admin/sequencer.rs +++ b/src/meta-srv/src/service/admin/sequencer.rs @@ -24,7 +24,9 @@ use serde::{Deserialize, Serialize}; use servers::http::result::error_result::ErrorResponse; use snafu::{ensure, ResultExt}; -use crate::error::{Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu}; +use crate::error::{ + PeekSequenceSnafu, Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu, +}; pub type TableIdSequenceHandlerRef = Arc; @@ -51,6 +53,15 @@ impl TableIdSequenceHandler { .await .context(SetNextSequenceSnafu) } + + async fn peek_table_id(&self) -> Result { + let next_table_id = self + .table_id_sequence + .peek() + .await + .context(PeekSequenceSnafu)?; + Ok(next_table_id as u32) + } } #[derive(Debug, Serialize, Deserialize)] @@ -80,7 +91,10 @@ pub(crate) async fn set_next_table_id( pub(crate) async fn get_next_table_id( State(handler): State, ) -> Response { - let next_table_id = handler.table_id_sequence.peek().await as u32; - - (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response() + match handler.peek_table_id().await { + Ok(next_table_id) => { + (StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response() + } + Err(e) => ErrorResponse::from_error(e).into_response(), + } }