From 569d93c599e35a8c2b9ba0a1a770494156d1e7ca Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 5 Aug 2025 16:28:09 +0800 Subject: [PATCH] fix: sequence peek with remote value (#6648) * fix: sequence peek with remote value * chore: more ut * chore: add more ut --- src/common/meta/src/sequence.rs | 202 ++++++++++++++++++++++++++------ 1 file changed, 165 insertions(+), 37 deletions(-) diff --git a/src/common/meta/src/sequence.rs b/src/common/meta/src/sequence.rs index 0f226291c7..0e26b4c594 100644 --- a/src/common/meta/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -95,13 +95,27 @@ impl Sequence { inner.initial..inner.max } - /// Returns the next value without incrementing the sequence. + /// Returns the current value stored in the remote storage without incrementing the sequence. + /// + /// This function always fetches the true current state from the remote storage (KV backend), + /// ignoring any local cache to provide the most accurate view of the sequence's remote state. + /// It does not consume or advance the sequence value. + /// + /// Note: Since this always queries the remote storage, it may be slower than `next()` but + /// provides the most accurate and up-to-date information about the sequence state. pub async fn peek(&self) -> Result { let inner = self.inner.lock().await; inner.peek().await } /// Jumps to the given value. + /// + /// The next value must be greater than both: + /// 1. The current local next value + /// 2. The current value stored in the remote storage (KV backend) + /// + /// This ensures the sequence can only move forward and maintains consistency + /// across different instances accessing the same sequence. pub async fn jump_to(&self, next: u64) -> Result<()> { let mut inner = self.inner.lock().await; inner.jump_to(next).await @@ -159,25 +173,13 @@ impl Inner { .fail() } + /// Returns the current value from remote storage without advancing the sequence. + /// If no value exists in remote storage, returns the initial value. pub async fn peek(&self) -> Result { - if self.range.is_some() { - return Ok(self.next); - } - - // If the range is not set, means the sequence is not initialized. let key = self.name.as_bytes(); let value = self.generator.get(key).await?.map(|kv| kv.value); let next = if let Some(value) = value { - let v: [u8; 8] = match value.try_into() { - Ok(a) => a, - Err(v) => { - return error::UnexpectedSequenceValueSnafu { - err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name), - } - .fail() - } - }; - let next = u64::from_le_bytes(v); + let next = self.initial.max(self.parse_sequence_value(value)?); debug!("The next value of sequence {} is {}", self.name, next); next } else { @@ -224,16 +226,7 @@ impl Inner { if !res.success { if let Some(kv) = res.prev_kv { - let v: [u8; 8] = match kv.value.clone().try_into() { - Ok(a) => a, - Err(v) => { - return error::UnexpectedSequenceValueSnafu { - err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name), - } - .fail() - } - }; - let v = u64::from_le_bytes(v); + let v = self.parse_sequence_value(kv.value.clone())?; // If the existed value is smaller than the initial, we should start from the initial. start = v.max(self.initial); expect = kv.value; @@ -257,25 +250,33 @@ impl Inner { } /// Jumps to the given value. - /// The next value must be greater than the current next value. + /// + /// The next value must be greater than both: + /// 1. The current local next value (self.next) + /// 2. The current value stored in the remote storage (KV backend) + /// + /// This ensures the sequence can only move forward and maintains consistency + /// across different instances accessing the same sequence. pub async fn jump_to(&mut self, next: u64) -> Result<()> { + let key = self.name.as_bytes(); + let current = self.generator.get(key).await?.map(|kv| kv.value); + + let curr_val = match ¤t { + Some(val) => self.initial.max(self.parse_sequence_value(val.clone())?), + None => self.initial, + }; + ensure!( - next > self.next, + next > curr_val, error::UnexpectedSnafu { err_msg: format!( "The next value {} is not greater than the current next value {}", - next, self.next + next, curr_val ), } ); - let key = self.name.as_bytes(); - let expect = self - .generator - .get(key) - .await? - .map(|kv| kv.value) - .unwrap_or_default(); + let expect = current.unwrap_or_default(); let req = CompareAndPutRequest { key: key.to_vec(), @@ -297,6 +298,20 @@ impl Inner { Ok(()) } + + /// Converts a Vec to u64 with proper error handling for sequence values + fn parse_sequence_value(&self, value: Vec) -> Result { + let v: [u8; 8] = match value.try_into() { + Ok(a) => a, + Err(v) => { + return error::UnexpectedSequenceValueSnafu { + err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name), + } + .fail() + } + }; + Ok(u64::from_le_bytes(v)) + } } #[cfg(test)] @@ -425,7 +440,7 @@ mod tests { } #[tokio::test] - async fn test_sequence_out_of_rage() { + async fn test_sequence_out_of_range() { let seq = SequenceBuilder::new("test_seq", Arc::new(MemoryKvBackend::default())) .initial(u64::MAX - 10) .step(10) @@ -517,4 +532,117 @@ mod tests { // The sequence is not initialized, it will fetch the value from the kv backend. assert_eq!(seq.peek().await.unwrap(), 1044); } + + #[tokio::test] + async fn test_sequence_peek_shared_storage() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let shared_seq = "shared_seq"; + + // Create two sequence instances with the SAME name but DIFFERENT configs + let seq1 = SequenceBuilder::new(shared_seq, kv_backend.clone()) + .initial(100) + .step(5) + .build(); + let seq2 = SequenceBuilder::new(shared_seq, kv_backend.clone()) + .initial(200) // different initial + .step(3) // different step + .build(); + + // Initially both return their own initial values when no remote value exists + assert_eq!(seq1.peek().await.unwrap(), 100); + assert_eq!(seq2.peek().await.unwrap(), 200); + + // seq1 calls next() to allocate range and update remote storage + assert_eq!(seq1.next().await.unwrap(), 100); + // After seq1.next(), remote storage has 100 + seq1.step(5) = 105 + + // seq2 should now see the updated remote value through peek(), not its own initial(200) + assert_eq!(seq1.peek().await.unwrap(), 105); + assert_eq!(seq2.peek().await.unwrap(), 200); // sees seq1's update, but use its own initial(200) + + // seq2 calls next(), should start from its initial(200) + assert_eq!(seq2.next().await.unwrap(), 200); + // After seq2.next(), remote storage updated to 200 + seq2.step(3) = 203 + + // Both should see the new remote value (seq2's step was used) + assert_eq!(seq1.peek().await.unwrap(), 203); + assert_eq!(seq2.peek().await.unwrap(), 203); + + // seq1 calls next(), should start from its next(105) + assert_eq!(seq1.next().await.unwrap(), 101); + assert_eq!(seq1.next().await.unwrap(), 102); + assert_eq!(seq1.next().await.unwrap(), 103); + assert_eq!(seq1.next().await.unwrap(), 104); + assert_eq!(seq1.next().await.unwrap(), 203); + // After seq1.next(), remote storage updated to 203 + seq1.step(5) = 208 + assert_eq!(seq1.peek().await.unwrap(), 208); + assert_eq!(seq2.peek().await.unwrap(), 208); + } + + #[tokio::test] + async fn test_sequence_peek_initial_max_logic() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + + // Manually set a small value in storage + let key = seq_name("test_max").into_bytes(); + kv_backend + .put( + PutRequest::new() + .with_key(key) + .with_value(u64::to_le_bytes(50)), + ) + .await + .unwrap(); + + // Create sequence with larger initial value + let seq = SequenceBuilder::new("test_max", kv_backend) + .initial(100) // larger than remote value (50) + .build(); + + // peek() should return max(initial, remote) = max(100, 50) = 100 + assert_eq!(seq.peek().await.unwrap(), 100); + + // next() should start from the larger initial value + assert_eq!(seq.next().await.unwrap(), 100); + } + + #[tokio::test] + async fn test_sequence_initial_greater_than_storage() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + + // Test sequence behavior when initial > storage value + // This verifies the max(storage, initial) logic works correctly + + // Step 1: Establish a low value in storage + let seq1 = SequenceBuilder::new("max_test", kv_backend.clone()) + .initial(10) + .step(5) + .build(); + assert_eq!(seq1.next().await.unwrap(), 10); // storage: 15 + + // Step 2: Create sequence with much larger initial + let seq2 = SequenceBuilder::new("max_test", kv_backend.clone()) + .initial(100) // much larger than storage (15) + .step(5) + .build(); + + // seq2 should start from max(15, 100) = 100 (its initial value) + assert_eq!(seq2.next().await.unwrap(), 100); // storage updated to: 105 + assert_eq!(seq2.peek().await.unwrap(), 105); + + // Step 3: Verify subsequent sequences continue from updated storage + let seq3 = SequenceBuilder::new("max_test", kv_backend) + .initial(50) // smaller than current storage (105) + .step(1) + .build(); + + // seq3 should use max(105, 50) = 105 (storage value) + assert_eq!(seq3.peek().await.unwrap(), 105); + assert_eq!(seq3.next().await.unwrap(), 105); // storage: 106 + + // This demonstrates the correct max(storage, initial) behavior: + // - Sequences never generate values below their initial requirement + // - Storage always reflects the highest allocated value + // - Value gaps (15-99) are acceptable to maintain minimum constraints + } }