mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
fix: correctly generate sequences when the value is pre-existed (#3502)
This commit is contained in:
@@ -34,10 +34,14 @@ pub struct SequenceBuilder {
|
||||
max: u64,
|
||||
}
|
||||
|
||||
fn seq_name(name: impl AsRef<str>) -> String {
|
||||
format!("{}-{}", SEQ_PREFIX, name.as_ref())
|
||||
}
|
||||
|
||||
impl SequenceBuilder {
|
||||
pub fn new(name: impl AsRef<str>, generator: KvBackendRef) -> Self {
|
||||
Self {
|
||||
name: format!("{}-{}", SEQ_PREFIX, name.as_ref()),
|
||||
name: seq_name(name),
|
||||
initial: 0,
|
||||
step: 1,
|
||||
generator,
|
||||
@@ -138,13 +142,14 @@ impl Inner {
|
||||
pub async fn next_range(&self) -> Result<Range<u64>> {
|
||||
let key = self.name.as_bytes();
|
||||
let mut start = self.next;
|
||||
for _ in 0..self.force_quit {
|
||||
let expect = if start == self.initial {
|
||||
vec![]
|
||||
} else {
|
||||
u64::to_le_bytes(start).to_vec()
|
||||
};
|
||||
|
||||
let mut expect = if start == self.initial {
|
||||
vec![]
|
||||
} else {
|
||||
u64::to_le_bytes(start).to_vec()
|
||||
};
|
||||
|
||||
for _ in 0..self.force_quit {
|
||||
let step = self.step.min(self.max - start);
|
||||
|
||||
ensure!(
|
||||
@@ -167,15 +172,24 @@ impl Inner {
|
||||
|
||||
if !res.success {
|
||||
if let Some(kv) = res.prev_kv {
|
||||
let value = kv.value;
|
||||
ensure!(
|
||||
value.len() == std::mem::size_of::<u64>(),
|
||||
error::UnexpectedSequenceValueSnafu {
|
||||
err_msg: format!("key={}, unexpected value={:?}", self.name, value)
|
||||
expect = kv.value.clone();
|
||||
|
||||
let v: [u8; 8] = match kv.value.try_into() {
|
||||
Ok(a) => a,
|
||||
Err(v) => {
|
||||
return error::UnexpectedSequenceValueSnafu {
|
||||
err_msg: format!("Not a valid u64 for '{}': {v:?}", self.name),
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
);
|
||||
start = u64::from_le_bytes(value.try_into().unwrap());
|
||||
};
|
||||
let v = u64::from_le_bytes(v);
|
||||
|
||||
// If the existed value is smaller than the initial, we should start from the initial.
|
||||
start = v.max(self.initial);
|
||||
} else {
|
||||
expect = vec![];
|
||||
|
||||
start = self.initial;
|
||||
}
|
||||
continue;
|
||||
@@ -197,8 +211,12 @@ impl Inner {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::any::Any;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use itertools::{Itertools, MinMaxResult};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::kv_backend::memory::MemoryKvBackend;
|
||||
@@ -209,6 +227,76 @@ mod tests {
|
||||
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sequence_with_existed_value() {
|
||||
async fn test(exist: u64, expected: Vec<u64>) {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
let exist = u64::to_le_bytes(exist);
|
||||
kv_backend
|
||||
.put(PutRequest::new().with_key(seq_name("s")).with_value(exist))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let initial = 100;
|
||||
let seq = SequenceBuilder::new("s", kv_backend)
|
||||
.initial(initial)
|
||||
.build();
|
||||
|
||||
let mut actual = Vec::with_capacity(expected.len());
|
||||
for _ in 0..expected.len() {
|
||||
actual.push(seq.next().await.unwrap());
|
||||
}
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
// put a value not greater than the "initial", the sequence should start from "initial"
|
||||
test(1, vec![100, 101, 102]).await;
|
||||
test(100, vec![100, 101, 102]).await;
|
||||
|
||||
// put a value greater than the "initial", the sequence should start from the put value
|
||||
test(200, vec![200, 201, 202]).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_sequence_with_contention() {
|
||||
let seq = Arc::new(
|
||||
SequenceBuilder::new("s", Arc::new(MemoryKvBackend::default()))
|
||||
.initial(1024)
|
||||
.build(),
|
||||
);
|
||||
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
// Spawn 10 tasks to concurrently get the next sequence. Each task will get 100 sequences.
|
||||
for _ in 0..10 {
|
||||
tokio::spawn({
|
||||
let seq = seq.clone();
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
for _ in 0..100 {
|
||||
tx.send(seq.next().await.unwrap()).unwrap()
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Test that we get 1000 unique sequences, and start from 1024 to 2023.
|
||||
let mut nums = HashSet::new();
|
||||
let mut c = 0;
|
||||
while c < 1000
|
||||
&& let Some(x) = rx.recv().await
|
||||
{
|
||||
nums.insert(x);
|
||||
c += 1;
|
||||
}
|
||||
assert_eq!(nums.len(), 1000);
|
||||
let MinMaxResult::MinMax(min, max) = nums.iter().minmax() else {
|
||||
unreachable!("nums has more than one elements");
|
||||
};
|
||||
assert_eq!(*min, 1024);
|
||||
assert_eq!(*max, 2023);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sequence() {
|
||||
let kv_backend = Arc::new(MemoryKvBackend::default());
|
||||
|
||||
Reference in New Issue
Block a user