From 2ee2d29085c35be30e9c494ec125b9510067006f Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 6 Sep 2023 14:45:27 +0800 Subject: [PATCH] refactor: move Sequence to common meta (#2337) --- src/common/meta/src/error.rs | 17 +++++++++++ src/common/meta/src/lib.rs | 1 + src/{meta-srv => common/meta}/src/sequence.rs | 28 +++++++++---------- src/meta-srv/src/error.rs | 24 +++++----------- src/meta-srv/src/handler.rs | 13 ++++++--- .../src/handler/persist_stats_handler.rs | 4 +-- .../src/handler/response_header_handler.rs | 4 +-- src/meta-srv/src/keys.rs | 1 - src/meta-srv/src/lib.rs | 1 - src/meta-srv/src/metasrv.rs | 2 +- src/meta-srv/src/metasrv/builder.rs | 16 +++++++---- src/meta-srv/src/procedure/region_failover.rs | 10 +++++-- src/meta-srv/src/procedure/utils.rs | 9 ++++-- src/meta-srv/src/table_creator.rs | 7 +++-- src/meta-srv/src/test_util.rs | 9 ++++-- 15 files changed, 89 insertions(+), 57 deletions(-) rename src/{meta-srv => common/meta}/src/sequence.rs (93%) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index d7d021c3db..c7090cb5c1 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -26,6 +26,20 @@ use crate::peer::Peer; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to get sequence: {}", err_msg))] + NextSequence { err_msg: String, location: Location }, + + #[snafu(display("Sequence out of range: {}, start={}, step={}", name, start, step))] + SequenceOutOfRange { + name: String, + start: u64, + step: u64, + location: Location, + }, + + #[snafu(display("Unexpected sequence value: {}", err_msg))] + UnexpectedSequenceValue { err_msg: String, location: Location }, + #[snafu(display("Table info not found: {}", table_name))] TableInfoNotFound { table_name: String, @@ -266,6 +280,9 @@ impl ErrorExt for Error { | Unexpected { .. } | External { .. } | TableInfoNotFound { .. } + | NextSequence { .. } + | SequenceOutOfRange { .. } + | UnexpectedSequenceValue { .. } | InvalidHeartbeatResponse { .. } => StatusCode::Unexpected, SendMessage { .. } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index ecf640ea21..fb002207d6 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -31,6 +31,7 @@ pub mod metrics; pub mod peer; pub mod range_stream; pub mod rpc; +pub mod sequence; pub mod table_name; pub mod util; diff --git a/src/meta-srv/src/sequence.rs b/src/common/meta/src/sequence.rs similarity index 93% rename from src/meta-srv/src/sequence.rs rename to src/common/meta/src/sequence.rs index 4718ece71e..184d6cdac9 100644 --- a/src/meta-srv/src/sequence.rs +++ b/src/common/meta/src/sequence.rs @@ -15,23 +15,24 @@ use std::ops::Range; use std::sync::Arc; -use common_meta::rpc::store::CompareAndPutRequest; use snafu::{ensure, OptionExt}; use tokio::sync::Mutex; use crate::error::{self, Result}; -use crate::keys; -use crate::service::store::kv::KvStoreRef; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::CompareAndPutRequest; pub type SequenceRef = Arc; +pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; + pub struct Sequence { inner: Mutex, } impl Sequence { - pub fn new(name: impl AsRef, initial: u64, step: u64, generator: KvStoreRef) -> Self { - let name = format!("{}-{}", keys::SEQ_PREFIX, name.as_ref()); + pub fn new(name: impl AsRef, initial: u64, step: u64, generator: KvBackendRef) -> Self { + let name = format!("{}-{}", SEQ_PREFIX, name.as_ref()); let step = step.max(1); Self { inner: Mutex::new(Inner { @@ -54,7 +55,7 @@ impl Sequence { struct Inner { name: String, - generator: KvStoreRef, + generator: KvBackendRef, // The initial(minimal) value of the sequence. initial: u64, // The next available sequences(if it is in the range, @@ -158,21 +159,20 @@ mod tests { use std::any::Any; use std::sync::Arc; - use common_meta::kv_backend::{KvBackend, TxnService}; - use common_meta::rpc::store::{ + use super::*; + use crate::error::Error; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::{KvBackend, TxnService}; + use crate::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, BatchPutResponse, CompareAndPutResponse, DeleteRangeRequest, DeleteRangeResponse, MoveValueRequest, MoveValueResponse, PutRequest, PutResponse, RangeRequest, RangeResponse, }; - use super::*; - use crate::error::Error; - use crate::service::store::memory::MemStore; - #[tokio::test] async fn test_sequence() { - let kv_store = Arc::new(MemStore::new()); + let kv_store = Arc::new(MemoryKvBackend::default()); let initial = 1024; let seq = Sequence::new("test_seq", initial, 10, kv_store); @@ -183,7 +183,7 @@ mod tests { #[tokio::test] async fn test_sequence_out_of_rage() { - let kv_store = Arc::new(MemStore::new()); + let kv_store = Arc::new(MemoryKvBackend::default()); let initial = u64::MAX - 10; let seq = Sequence::new("test_seq", initial, 10, kv_store); diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index a664f1770e..3e2d27e416 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,12 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to allocate next sequence number: {}", source))] + NextSequence { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to submit ddl task: {}", source))] SubmitDdlTask { location: Location, @@ -222,9 +228,6 @@ pub enum Error { source: common_catalog::error::Error, }, - #[snafu(display("Unexpected sequence value: {}", err_msg))] - UnexpectedSequenceValue { err_msg: String, location: Location }, - #[snafu(display("Failed to decode table route, source: {}", source))] DecodeTableRoute { source: prost::DecodeError, @@ -250,17 +253,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to get sequence: {}", err_msg))] - NextSequence { err_msg: String, location: Location }, - - #[snafu(display("Sequence out of range: {}, start={}, step={}", name, start, step))] - SequenceOutOfRange { - name: String, - start: u64, - step: u64, - location: Location, - }, - #[snafu(display("MetaSrv has no leader at this moment"))] NoLeader { location: Location }, @@ -575,12 +567,9 @@ impl ErrorExt for Error { | Error::StatKeyFromUtf8 { .. } | Error::StatValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } - | Error::UnexpectedSequenceValue { .. } | Error::TableRouteNotFound { .. } | Error::TableInfoNotFound { .. } | Error::CorruptedTableRoute { .. } - | Error::NextSequence { .. } - | Error::SequenceOutOfRange { .. } | Error::MoveValue { .. } | Error::InvalidTxnResult { .. } | Error::InvalidUtf8Value { .. } @@ -604,6 +593,7 @@ impl ErrorExt for Error { } Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, + Error::NextSequence { source, .. } => source.status_code(), Error::RegisterProcedureLoader { source, .. } => source.status_code(), Error::OperateRegion { source, .. } => source.status_code(), diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index bc51f7a3ea..e3ad12ba3c 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -23,6 +23,7 @@ use api::v1::meta::{ ResponseHeader, Role, PROTOCOL_VERSION, }; use common_meta::instruction::{Instruction, InstructionReply}; +use common_meta::sequence::Sequence; use common_telemetry::{debug, info, timer, warn}; use dashmap::DashMap; use metrics::{decrement_gauge, increment_gauge}; @@ -34,7 +35,6 @@ use self::node_stat::Stat; use crate::error::{self, DeserializeFromJsonSnafu, Result, UnexpectedInstructionReplySnafu}; use crate::metasrv::Context; use crate::metrics::{METRIC_META_HANDLER_EXECUTE, METRIC_META_HEARTBEAT_CONNECTION_NUM}; -use crate::sequence::Sequence; use crate::service::mailbox::{ BroadcastChannel, Channel, Mailbox, MailboxReceiver, MailboxRef, MessageId, }; @@ -336,7 +336,11 @@ impl HeartbeatMailbox { // In this implementation, we pre-occupy the message_id of 0, // and we use `message_id = 0` to mark a Message as a one-way call. loop { - let next = self.sequence.next().await?; + let next = self + .sequence + .next() + .await + .context(error::NextSequenceSnafu)?; if next > 0 { return Ok(next); } @@ -397,6 +401,7 @@ mod tests { use std::time::Duration; use api::v1::meta::{MailboxMessage, RequestHeader, Role, PROTOCOL_VERSION}; + use common_meta::sequence::Sequence; use tokio::sync::mpsc; use crate::handler::check_leader_handler::CheckLeaderHandler; @@ -406,8 +411,8 @@ mod tests { use crate::handler::persist_stats_handler::PersistStatsHandler; use crate::handler::response_header_handler::ResponseHeaderHandler; use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pusher}; - use crate::sequence::Sequence; use crate::service::mailbox::{Channel, MailboxReceiver, MailboxRef}; + use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; #[tokio::test] @@ -451,7 +456,7 @@ mod tests { .await; let kv_store = Arc::new(MemStore::new()); - let seq = Sequence::new("test_seq", 0, 10, kv_store); + let seq = Sequence::new("test_seq", 0, 10, KvBackendAdapter::wrap(kv_store)); let mailbox = HeartbeatMailbox::create(handler_group.pushers(), seq); let msg = MailboxMessage { diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 93373218b4..e0835de243 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -142,12 +142,12 @@ mod tests { use std::sync::Arc; use common_meta::key::TableMetadataManager; + use common_meta::sequence::Sequence; use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::keys::StatKey; - use crate::sequence::Sequence; use crate::service::store::cached_kv::LeaderCachedKvStore; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; @@ -158,7 +158,7 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let leader_cached_kv_store = Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone())); - let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); + let seq = Sequence::new("test_seq", 0, 10, KvBackendAdapter::wrap(kv_store.clone())); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 54eda6288e..741596baaa 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -51,11 +51,11 @@ mod tests { use api::v1::meta::{HeartbeatResponse, RequestHeader}; use common_meta::key::TableMetadataManager; + use common_meta::sequence::Sequence; use super::*; use crate::cluster::MetaPeerClientBuilder; use crate::handler::{Context, HeartbeatMailbox, Pushers}; - use crate::sequence::Sequence; use crate::service::store::cached_kv::LeaderCachedKvStore; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; @@ -66,7 +66,7 @@ mod tests { let kv_store = Arc::new(MemStore::new()); let leader_cached_kv_store = Arc::new(LeaderCachedKvStore::with_always_leader(kv_store.clone())); - let seq = Sequence::new("test_seq", 0, 10, kv_store.clone()); + let seq = Sequence::new("test_seq", 0, 10, KvBackendAdapter::wrap(kv_store.clone())); let mailbox = HeartbeatMailbox::create(Pushers::default(), seq); let meta_peer_client = MetaPeerClientBuilder::default() .election(None) diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index af7bc1b1e9..a74f032b37 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -25,7 +25,6 @@ use crate::error::Result; use crate::handler::node_stat::Stat; pub(crate) const DN_LEASE_PREFIX: &str = "__meta_dnlease"; -pub(crate) const SEQ_PREFIX: &str = "__meta_seq"; pub(crate) const INACTIVE_REGION_PREFIX: &str = "__meta_inactive_region"; pub const DN_STAT_PREFIX: &str = "__meta_dnstat"; diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 4a5d86c9d3..a340fb205b 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -33,7 +33,6 @@ pub mod mocks; pub mod procedure; pub mod pubsub; pub mod selector; -mod sequence; pub mod service; pub mod table_creator; pub mod table_routes; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 1807654fdf..da1a46fd39 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -24,6 +24,7 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; use common_meta::ddl::DdlExecutorRef; use common_meta::key::TableMetadataManagerRef; +use common_meta::sequence::SequenceRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; @@ -41,7 +42,6 @@ use crate::lock::DistLockRef; use crate::metadata_service::MetadataServiceRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::{Selector, SelectorType}; -use crate::sequence::SequenceRef; use crate::service::mailbox::MailboxRef; use crate::service::store::kv::{KvStoreRef, ResettableKvStoreRef}; pub const TABLE_ID_SEQ: &str = "table_id"; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 92b92356d6..93882563bd 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -20,6 +20,7 @@ use client::client_manager::DatanodeClients; use common_grpc::channel_manager::ChannelConfig; use common_meta::ddl_manager::{DdlManager, DdlManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::sequence::{Sequence, SequenceRef}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; @@ -49,7 +50,6 @@ use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; use crate::pubsub::{PublishRef, SubscribeManagerRef}; use crate::selector::lease_based::LeaseBasedSelector; -use crate::sequence::{Sequence, SequenceRef}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::{CheckLeader, LeaderCachedKvStore}; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef, ResettableKvStoreRef}; @@ -171,10 +171,9 @@ impl MetaSrvBuilder { let pushers = Pushers::default(); let mailbox = build_mailbox(&kv_store, &pushers); let procedure_manager = build_procedure_manager(&options, &kv_store); - let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_store.clone())); - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - kv_store.clone(), - ))); + let kv_backend = KvBackendAdapter::wrap(kv_store.clone()); + let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone())); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); let metadata_service = metadata_service .unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager))); let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default())); @@ -315,7 +314,12 @@ fn build_default_meta_peer_client( } fn build_mailbox(kv_store: &KvStoreRef, pushers: &Pushers) -> MailboxRef { - let mailbox_sequence = Sequence::new("heartbeat_mailbox", 1, 100, kv_store.clone()); + let mailbox_sequence = Sequence::new( + "heartbeat_mailbox", + 1, + 100, + KvBackendAdapter::wrap(kv_store.clone()), + ); HeartbeatMailbox::create(pushers.clone(), mailbox_sequence) } diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 3c5de1cc1b..d328c9f112 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -393,6 +393,7 @@ mod tests { use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::TableMetadataManager; + use common_meta::sequence::Sequence; use common_meta::DatanodeId; use common_procedure::{BoxedProcedure, ProcedureId}; use common_procedure_test::MockContextProvider; @@ -404,7 +405,6 @@ mod tests { use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; use crate::lock::memory::MemLock; use crate::selector::{Namespace, Selector}; - use crate::sequence::Sequence; use crate::service::mailbox::Channel; use crate::service::store::kv::{KvBackendAdapter, KvStoreRef}; use crate::service::store::memory::MemStore; @@ -521,8 +521,12 @@ mod tests { let _ = heartbeat_receivers.insert(datanode_id, rx); } - let mailbox_sequence = - Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone()); + let mailbox_sequence = Sequence::new( + "test_heartbeat_mailbox", + 0, + 100, + KvBackendAdapter::wrap(kv_store.clone()), + ); let mailbox = HeartbeatMailbox::create(pushers.clone(), mailbox_sequence); let selector = self.selector.unwrap_or_else(|| { diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index 3297225218..965ae26037 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -110,6 +110,7 @@ pub mod test_data { use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; + use common_meta::sequence::Sequence; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; @@ -118,7 +119,6 @@ pub mod test_data { use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::handler::{HeartbeatMailbox, Pushers}; use crate::metasrv::MetasrvInfo; - use crate::sequence::Sequence; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; use crate::test_util::new_region_route; @@ -190,7 +190,12 @@ pub mod test_data { pub(crate) fn new_ddl_context(datanode_manager: DatanodeManagerRef) -> DdlContext { let kv_store = Arc::new(MemStore::new()); - let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone()); + let mailbox_sequence = Sequence::new( + "test_heartbeat_mailbox", + 0, + 100, + KvBackendAdapter::wrap(kv_store.clone()), + ); let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); let kv_backend = KvBackendAdapter::wrap(kv_store); diff --git a/src/meta-srv/src/table_creator.rs b/src/meta-srv/src/table_creator.rs index c54afe4b82..af417dfa96 100644 --- a/src/meta-srv/src/table_creator.rs +++ b/src/meta-srv/src/table_creator.rs @@ -18,6 +18,7 @@ use common_error::ext::BoxedError; use common_meta::ddl::{TableCreator, TableCreatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; +use common_meta::sequence::SequenceRef; use common_telemetry::warn; use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; @@ -25,7 +26,6 @@ use table::metadata::RawTableInfo; use crate::error::{self, Result, TooManyPartitionsSnafu}; use crate::metasrv::{SelectorContext, SelectorRef}; -use crate::sequence::SequenceRef; pub struct MetaSrvTableCreator { ctx: SelectorContext, @@ -93,7 +93,10 @@ async fn handle_create_region_routes( // If the peers are not enough, some peers will be used for multiple partitions. peers.truncate(partitions.len()); - let table_id = table_id_sequence.next().await? as u32; + let table_id = table_id_sequence + .next() + .await + .context(error::NextSequenceSnafu)? as u32; table_info.ident.table_id = table_id; ensure!( diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index dd2fc8f0e6..5d2f928dc3 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; +use common_meta::sequence::Sequence; use common_procedure::local::{LocalManager, ManagerConfig}; use crate::cluster::MetaPeerClientBuilder; @@ -26,7 +27,6 @@ use crate::metasrv::SelectorContext; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; use crate::selector::lease_based::LeaseBasedSelector; -use crate::sequence::Sequence; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; @@ -49,7 +49,12 @@ pub(crate) fn create_region_failover_manager() -> Arc { let kv_store = Arc::new(MemStore::new()); let pushers = Pushers::default(); - let mailbox_sequence = Sequence::new("test_heartbeat_mailbox", 0, 100, kv_store.clone()); + let mailbox_sequence = Sequence::new( + "test_heartbeat_mailbox", + 0, + 100, + KvBackendAdapter::wrap(kv_store.clone()), + ); let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence); let state_store = Arc::new(MetaStateStore::new(kv_store.clone()));