fix: sequence peek with remote value (#6648)

* fix: sequence peek with remote value

* chore: more ut

* chore: add more ut
This commit is contained in:
jeremyhi
2025-08-05 16:28:09 +08:00
committed by Zhenchi
parent b91b520f54
commit 569d93c599

View File

@@ -95,13 +95,27 @@ impl Sequence {
inner.initial..inner.max
}
/// Returns the next value without incrementing the sequence.
/// Returns the current value stored in the remote storage without incrementing the sequence.
///
/// This function always fetches the true current state from the remote storage (KV backend),
/// ignoring any local cache to provide the most accurate view of the sequence's remote state.
/// It does not consume or advance the sequence value.
///
/// Note: Since this always queries the remote storage, it may be slower than `next()` but
/// provides the most accurate and up-to-date information about the sequence state.
pub async fn peek(&self) -> Result<u64> {
let inner = self.inner.lock().await;
inner.peek().await
}
/// Jumps to the given value.
///
/// The next value must be greater than both:
/// 1. The current local next value
/// 2. The current value stored in the remote storage (KV backend)
///
/// This ensures the sequence can only move forward and maintains consistency
/// across different instances accessing the same sequence.
pub async fn jump_to(&self, next: u64) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.jump_to(next).await
@@ -159,25 +173,13 @@ impl Inner {
.fail()
}
/// Returns the current value from remote storage without advancing the sequence.
/// If no value exists in remote storage, returns the initial value.
pub async fn peek(&self) -> Result<u64> {
if self.range.is_some() {
return Ok(self.next);
}
// If the range is not set, means the sequence is not initialized.
let key = self.name.as_bytes();
let value = self.generator.get(key).await?.map(|kv| kv.value);
let next = if let Some(value) = value {
let v: [u8; 8] = match value.try_into() {
Ok(a) => a,
Err(v) => {
return error::UnexpectedSequenceValueSnafu {
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
}
.fail()
}
};
let next = u64::from_le_bytes(v);
let next = self.initial.max(self.parse_sequence_value(value)?);
debug!("The next value of sequence {} is {}", self.name, next);
next
} else {
@@ -224,16 +226,7 @@ impl Inner {
if !res.success {
if let Some(kv) = res.prev_kv {
let v: [u8; 8] = match kv.value.clone().try_into() {
Ok(a) => a,
Err(v) => {
return error::UnexpectedSequenceValueSnafu {
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
}
.fail()
}
};
let v = u64::from_le_bytes(v);
let v = self.parse_sequence_value(kv.value.clone())?;
// If the existed value is smaller than the initial, we should start from the initial.
start = v.max(self.initial);
expect = kv.value;
@@ -257,25 +250,33 @@ impl Inner {
}
/// Jumps to the given value.
/// The next value must be greater than the current next value.
///
/// The next value must be greater than both:
/// 1. The current local next value (self.next)
/// 2. The current value stored in the remote storage (KV backend)
///
/// This ensures the sequence can only move forward and maintains consistency
/// across different instances accessing the same sequence.
pub async fn jump_to(&mut self, next: u64) -> Result<()> {
let key = self.name.as_bytes();
let current = self.generator.get(key).await?.map(|kv| kv.value);
let curr_val = match &current {
Some(val) => self.initial.max(self.parse_sequence_value(val.clone())?),
None => self.initial,
};
ensure!(
next > self.next,
next > curr_val,
error::UnexpectedSnafu {
err_msg: format!(
"The next value {} is not greater than the current next value {}",
next, self.next
next, curr_val
),
}
);
let key = self.name.as_bytes();
let expect = self
.generator
.get(key)
.await?
.map(|kv| kv.value)
.unwrap_or_default();
let expect = current.unwrap_or_default();
let req = CompareAndPutRequest {
key: key.to_vec(),
@@ -297,6 +298,20 @@ impl Inner {
Ok(())
}
/// Converts a Vec<u8> to u64 with proper error handling for sequence values
fn parse_sequence_value(&self, value: Vec<u8>) -> Result<u64> {
let v: [u8; 8] = match value.try_into() {
Ok(a) => a,
Err(v) => {
return error::UnexpectedSequenceValueSnafu {
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
}
.fail()
}
};
Ok(u64::from_le_bytes(v))
}
}
#[cfg(test)]
@@ -425,7 +440,7 @@ mod tests {
}
#[tokio::test]
async fn test_sequence_out_of_rage() {
async fn test_sequence_out_of_range() {
let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default()))
.initial(u64::MAX - 10)
.step(10)
@@ -517,4 +532,117 @@ mod tests {
// The sequence is not initialized, it will fetch the value from the kv backend.
assert_eq!(seq.peek().await.unwrap(), 1044);
}
#[tokio::test]
async fn test_sequence_peek_shared_storage() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let shared_seq = "shared_seq";
// Create two sequence instances with the SAME name but DIFFERENT configs
let seq1 = SequenceBuilder::new(shared_seq, kv_backend.clone())
.initial(100)
.step(5)
.build();
let seq2 = SequenceBuilder::new(shared_seq, kv_backend.clone())
.initial(200) // different initial
.step(3) // different step
.build();
// Initially both return their own initial values when no remote value exists
assert_eq!(seq1.peek().await.unwrap(), 100);
assert_eq!(seq2.peek().await.unwrap(), 200);
// seq1 calls next() to allocate range and update remote storage
assert_eq!(seq1.next().await.unwrap(), 100);
// After seq1.next(), remote storage has 100 + seq1.step(5) = 105
// seq2 should now see the updated remote value through peek(), not its own initial(200)
assert_eq!(seq1.peek().await.unwrap(), 105);
assert_eq!(seq2.peek().await.unwrap(), 200); // sees seq1's update, but use its own initial(200)
// seq2 calls next(), should start from its initial(200)
assert_eq!(seq2.next().await.unwrap(), 200);
// After seq2.next(), remote storage updated to 200 + seq2.step(3) = 203
// Both should see the new remote value (seq2's step was used)
assert_eq!(seq1.peek().await.unwrap(), 203);
assert_eq!(seq2.peek().await.unwrap(), 203);
// seq1 calls next(), should start from its next(105)
assert_eq!(seq1.next().await.unwrap(), 101);
assert_eq!(seq1.next().await.unwrap(), 102);
assert_eq!(seq1.next().await.unwrap(), 103);
assert_eq!(seq1.next().await.unwrap(), 104);
assert_eq!(seq1.next().await.unwrap(), 203);
// After seq1.next(), remote storage updated to 203 + seq1.step(5) = 208
assert_eq!(seq1.peek().await.unwrap(), 208);
assert_eq!(seq2.peek().await.unwrap(), 208);
}
#[tokio::test]
async fn test_sequence_peek_initial_max_logic() {
let kv_backend = Arc::new(MemoryKvBackend::default());
// Manually set a small value in storage
let key = seq_name("test_max").into_bytes();
kv_backend
.put(
PutRequest::new()
.with_key(key)
.with_value(u64::to_le_bytes(50)),
)
.await
.unwrap();
// Create sequence with larger initial value
let seq = SequenceBuilder::new("test_max", kv_backend)
.initial(100) // larger than remote value (50)
.build();
// peek() should return max(initial, remote) = max(100, 50) = 100
assert_eq!(seq.peek().await.unwrap(), 100);
// next() should start from the larger initial value
assert_eq!(seq.next().await.unwrap(), 100);
}
#[tokio::test]
async fn test_sequence_initial_greater_than_storage() {
let kv_backend = Arc::new(MemoryKvBackend::default());
// Test sequence behavior when initial > storage value
// This verifies the max(storage, initial) logic works correctly
// Step 1: Establish a low value in storage
let seq1 = SequenceBuilder::new("max_test", kv_backend.clone())
.initial(10)
.step(5)
.build();
assert_eq!(seq1.next().await.unwrap(), 10); // storage: 15
// Step 2: Create sequence with much larger initial
let seq2 = SequenceBuilder::new("max_test", kv_backend.clone())
.initial(100) // much larger than storage (15)
.step(5)
.build();
// seq2 should start from max(15, 100) = 100 (its initial value)
assert_eq!(seq2.next().await.unwrap(), 100); // storage updated to: 105
assert_eq!(seq2.peek().await.unwrap(), 105);
// Step 3: Verify subsequent sequences continue from updated storage
let seq3 = SequenceBuilder::new("max_test", kv_backend)
.initial(50) // smaller than current storage (105)
.step(1)
.build();
// seq3 should use max(105, 50) = 105 (storage value)
assert_eq!(seq3.peek().await.unwrap(), 105);
assert_eq!(seq3.next().await.unwrap(), 105); // storage: 106
// This demonstrates the correct max(storage, initial) behavior:
// - Sequences never generate values below their initial requirement
// - Storage always reflects the highest allocated value
// - Value gaps (15-99) are acceptable to maintain minimum constraints
}
}