fix: meta minor fix (#513)

* chore: fix metaclient example

* chore: initial sequece value
This commit is contained in:
Jiachun Feng
2022-11-15 16:38:05 +08:00
committed by GitHub
parent b34f26ee07
commit cabb55322b
3 changed files with 13 additions and 9 deletions

View File

@@ -72,7 +72,7 @@ async fn run() {
.add_partition(p1)
.add_partition(p2);
let res = meta_client.create_route(create_req).await.unwrap();
let res = meta_client.create_route(create_req).await;
event!(Level::INFO, "create_route result: {:#?}", res);
// put

View File

@@ -79,7 +79,7 @@ impl MetaSrv {
election: Option<ElectionRef>,
) -> Self {
let started = Arc::new(AtomicBool::new(false));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 10, kv_store.clone()));
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone()));
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {}));
let handler_group = HeartbeatHandlerGroup::default();
handler_group.add_handler(ResponseHeaderHandler).await;

View File

@@ -16,14 +16,15 @@ pub struct Sequence {
}
impl Sequence {
pub fn new(name: impl AsRef<str>, step: u64, generator: KvStoreRef) -> Self {
pub fn new(name: impl AsRef<str>, initial: u64, step: u64, generator: KvStoreRef) -> Self {
let name = format!("{}-{}", keys::SEQ_PREFIX, name.as_ref());
let step = step.max(1);
Self {
inner: Mutex::new(Inner {
name,
generator,
next: 0,
initial,
next: initial,
step,
range: None,
force_quit: 1024,
@@ -40,6 +41,8 @@ impl Sequence {
struct Inner {
name: String,
generator: KvStoreRef,
// The initial(minimal) value of the sequence.
initial: u64,
// The next available sequences(if it is in the range,
// otherwise it need to fetch from generator again).
next: u64,
@@ -84,7 +87,7 @@ impl Inner {
let key = self.name.as_bytes();
let mut start = self.next;
for _ in 0..self.force_quit {
let expect = if start == 0 {
let expect = if start == self.initial {
vec![]
} else {
u64::to_le_bytes(start).to_vec()
@@ -111,7 +114,7 @@ impl Inner {
);
start = u64::from_le_bytes(value.try_into().unwrap());
} else {
start = 0;
start = self.initial;
}
continue;
}
@@ -140,9 +143,10 @@ mod tests {
#[tokio::test]
async fn test_sequence() {
let kv_store = Arc::new(MemStore::new());
let seq = Sequence::new("test_seq", 10, kv_store);
let initial = 1024;
let seq = Sequence::new("test_seq", initial, 10, kv_store);
for i in 0..100 {
for i in initial..initial + 100 {
assert_eq!(i, seq.next().await.unwrap());
}
}
@@ -190,7 +194,7 @@ mod tests {
}
let kv_store = Arc::new(Noop {});
let seq = Sequence::new("test_seq", 10, kv_store);
let seq = Sequence::new("test_seq", 0, 10, kv_store);
let next = seq.next().await;
assert!(next.is_err());