fix: fix sequence peek method to return correct values when sequence is not initialized (#6643)

fix: improve sequence peek method to handle uninitialized sequences

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-04 19:31:06 +08:00
committed by Zhenchi
parent dc37382946
commit eaaf9448c7
3 changed files with 90 additions and 10 deletions

View File

@@ -15,7 +15,7 @@
use std::ops::Range;
use std::sync::Arc;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use snafu::ensure;
use tokio::sync::Mutex;
@@ -96,9 +96,9 @@ impl Sequence {
}
/// Returns the next value without incrementing the sequence.
pub async fn peek(&self) -> u64 {
pub async fn peek(&self) -> Result<u64> {
let inner = self.inner.lock().await;
inner.next
inner.peek().await
}
/// Jumps to the given value.
@@ -136,6 +136,7 @@ impl Inner {
if range.contains(&self.next) {
let res = Ok(self.next);
self.next += 1;
debug!("sequence {} next: {}", self.name, self.next);
return res;
}
self.range = None;
@@ -144,6 +145,10 @@ impl Inner {
let range = self.next_range().await?;
self.next = range.start;
self.range = Some(range);
debug!(
"sequence {} next: {}, range: {:?}",
self.name, self.next, self.range
);
}
}
}
@@ -154,6 +159,38 @@ impl Inner {
.fail()
}
pub async fn peek(&self) -> Result<u64> {
if self.range.is_some() {
return Ok(self.next);
}
// If the range is not set, means the sequence is not initialized.
let key = self.name.as_bytes();
let value = self.generator.get(key).await?.map(|kv| kv.value);
let next = if let Some(value) = value {
let v: [u8; 8] = match value.try_into() {
Ok(a) => a,
Err(v) => {
return error::UnexpectedSequenceValueSnafu {
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
}
.fail()
}
};
let next = u64::from_le_bytes(v);
debug!("The next value of sequence {} is {}", self.name, next);
next
} else {
debug!(
"The next value of sequence {} is not set, use initial value {}",
self.name, self.initial
);
self.initial
};
Ok(next)
}
pub async fn next_range(&self) -> Result<Range<u64>> {
let key = self.name.as_bytes();
let mut start = self.next;
@@ -458,4 +495,26 @@ mod tests {
let next = seq.next().await;
assert!(next.is_err());
}
#[tokio::test]
async fn test_sequence_peek() {
common_telemetry::init_default_ut_logging();
let kv_backend = Arc::new(MemoryKvBackend::default());
let seq = SequenceBuilder::new("test_seq", kv_backend.clone())
.step(10)
.initial(1024)
.build();
// The sequence value in the kv backend is not set, so the peek value should be the initial value.
assert_eq!(seq.peek().await.unwrap(), 1024);
for i in 0..11 {
let v = seq.next().await.unwrap();
assert_eq!(v, 1024 + i);
}
let seq = SequenceBuilder::new("test_seq", kv_backend)
.initial(1024)
.build();
// The sequence is not initialized, it will fetch the value from the kv backend.
assert_eq!(seq.peek().await.unwrap(), 1044);
}
}

View File

@@ -127,6 +127,13 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to peek sequence number"))]
PeekSequence {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to start telemetry task"))]
StartTelemetryTask {
#[snafu(implicit)]
@@ -1031,9 +1038,9 @@ impl ErrorExt for Error {
| Error::ListTables { source, .. } => source.status_code(),
Error::StartTelemetryTask { source, .. } => source.status_code(),
Error::NextSequence { source, .. } | Error::SetNextSequence { source, .. } => {
source.status_code()
}
Error::NextSequence { source, .. }
| Error::SetNextSequence { source, .. }
| Error::PeekSequence { source, .. } => source.status_code(),
Error::DowngradeLeader { source, .. } => source.status_code(),
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
Error::SubmitDdlTask { source, .. }

View File

@@ -24,7 +24,9 @@ use serde::{Deserialize, Serialize};
use servers::http::result::error_result::ErrorResponse;
use snafu::{ensure, ResultExt};
use crate::error::{Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu};
use crate::error::{
PeekSequenceSnafu, Result, RuntimeSwitchManagerSnafu, SetNextSequenceSnafu, UnexpectedSnafu,
};
pub type TableIdSequenceHandlerRef = Arc<TableIdSequenceHandler>;
@@ -51,6 +53,15 @@ impl TableIdSequenceHandler {
.await
.context(SetNextSequenceSnafu)
}
async fn peek_table_id(&self) -> Result<u32> {
let next_table_id = self
.table_id_sequence
.peek()
.await
.context(PeekSequenceSnafu)?;
Ok(next_table_id as u32)
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -80,7 +91,10 @@ pub(crate) async fn set_next_table_id(
pub(crate) async fn get_next_table_id(
State(handler): State<TableIdSequenceHandlerRef>,
) -> Response {
let next_table_id = handler.table_id_sequence.peek().await as u32;
(StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response()
match handler.peek_table_id().await {
Ok(next_table_id) => {
(StatusCode::OK, Json(NextTableIdResponse { next_table_id })).into_response()
}
Err(e) => ErrorResponse::from_error(e).into_response(),
}
}