diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 421c0c44a7..04b3991fdf 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -72,7 +72,7 @@ async fn run() { .add_partition(p1) .add_partition(p2); - let res = meta_client.create_route(create_req).await.unwrap(); + let res = meta_client.create_route(create_req).await; event!(Level::INFO, "create_route result: {:#?}", res); // put diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 0bc66f55e3..a21debe78b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -79,7 +79,7 @@ impl MetaSrv { election: Option, ) -> Self { let started = Arc::new(AtomicBool::new(false)); - let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 10, kv_store.clone())); + let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector {})); let handler_group = HeartbeatHandlerGroup::default(); handler_group.add_handler(ResponseHeaderHandler).await; diff --git a/src/meta-srv/src/sequence.rs b/src/meta-srv/src/sequence.rs index 555bcfc77a..eef3948f03 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/meta-srv/src/sequence.rs @@ -16,14 +16,15 @@ pub struct Sequence { } impl Sequence { - pub fn new(name: impl AsRef, step: u64, generator: KvStoreRef) -> Self { + pub fn new(name: impl AsRef, initial: u64, step: u64, generator: KvStoreRef) -> Self { let name = format!("{}-{}", keys::SEQ_PREFIX, name.as_ref()); let step = step.max(1); Self { inner: Mutex::new(Inner { name, generator, - next: 0, + initial, + next: initial, step, range: None, force_quit: 1024, @@ -40,6 +41,8 @@ impl Sequence { struct Inner { name: String, generator: KvStoreRef, + // The initial(minimal) value of the sequence. + initial: u64, // The next available sequences(if it is in the range, // otherwise it need to fetch from generator again). next: u64, @@ -84,7 +87,7 @@ impl Inner { let key = self.name.as_bytes(); let mut start = self.next; for _ in 0..self.force_quit { - let expect = if start == 0 { + let expect = if start == self.initial { vec![] } else { u64::to_le_bytes(start).to_vec() @@ -111,7 +114,7 @@ impl Inner { ); start = u64::from_le_bytes(value.try_into().unwrap()); } else { - start = 0; + start = self.initial; } continue; } @@ -140,9 +143,10 @@ mod tests { #[tokio::test] async fn test_sequence() { let kv_store = Arc::new(MemStore::new()); - let seq = Sequence::new("test_seq", 10, kv_store); + let initial = 1024; + let seq = Sequence::new("test_seq", initial, 10, kv_store); - for i in 0..100 { + for i in initial..initial + 100 { assert_eq!(i, seq.next().await.unwrap()); } } @@ -190,7 +194,7 @@ mod tests { } let kv_store = Arc::new(Noop {}); - let seq = Sequence::new("test_seq", 10, kv_store); + let seq = Sequence::new("test_seq", 0, 10, kv_store); let next = seq.next().await; assert!(next.is_err());