refactor: metasrv cannot be cloned (#4834)

* refactor: metasrv cannot be cloned

* chore: remove MetasrvInstance's clone
This commit is contained in:
jeremyhi
2024-10-15 19:36:48 +08:00
committed by GitHub
parent 16b8cdc3d5
commit 59ec90299b
10 changed files with 38 additions and 46 deletions

View File

@@ -56,9 +56,8 @@ use crate::selector::SelectorType;
use crate::service::admin;
use crate::{error, Result};
#[derive(Clone)]
pub struct MetasrvInstance {
metasrv: Metasrv,
metasrv: Arc<Metasrv>,
httpsrv: Arc<HttpServer>,
@@ -83,8 +82,9 @@ impl MetasrvInstance {
.with_greptime_config_options(opts.to_toml().context(TomlFormatSnafu)?)
.build(),
);
let metasrv = Arc::new(metasrv);
// put metasrv into plugins for later use
plugins.insert::<Arc<Metasrv>>(Arc::new(metasrv.clone()));
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
Ok(MetasrvInstance {
@@ -178,13 +178,13 @@ pub async fn bootstrap_metasrv_with_router(
Ok(())
}
pub fn router(metasrv: Metasrv) -> Router {
pub fn router(metasrv: Arc<Metasrv>) -> Router {
tonic::transport::Server::builder()
.accept_http1(true) // for admin services
.add_service(HeartbeatServer::new(metasrv.clone()))
.add_service(StoreServer::new(metasrv.clone()))
.add_service(ClusterServer::new(metasrv.clone()))
.add_service(ProcedureServiceServer::new(metasrv.clone()))
.add_service(HeartbeatServer::from_arc(metasrv.clone()))
.add_service(StoreServer::from_arc(metasrv.clone()))
.add_service(ClusterServer::from_arc(metasrv.clone()))
.add_service(ProcedureServiceServer::from_arc(metasrv.clone()))
.add_service(admin::make_admin_service(metasrv))
}

View File

@@ -443,7 +443,6 @@ impl Mailbox for HeartbeatMailbox {
}
/// The builder to build the group of heartbeat handlers.
#[derive(Clone)]
pub struct HeartbeatHandlerGroupBuilder {
/// The handler to handle region failure.
region_failure_handler: Option<RegionFailureHandler>,

View File

@@ -21,7 +21,6 @@ use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};
#[derive(Clone)]
pub struct RegionFailureHandler {
heartbeat_acceptor: HeartbeatAcceptor,
}

View File

@@ -26,7 +26,6 @@ use crate::metasrv::Context;
use crate::region::lease_keeper::{RegionLeaseKeeperRef, RenewRegionLeasesResponse};
use crate::region::RegionLeaseKeeper;
#[derive(Clone)]
pub struct RegionLeaseHandler {
region_lease_seconds: u64,
region_lease_keeper: RegionLeaseKeeperRef,

View File

@@ -16,7 +16,7 @@ pub mod builder;
use std::fmt::Display;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use clap::ValueEnum;
@@ -337,7 +337,6 @@ impl MetaStateHandler {
}
}
#[derive(Clone)]
pub struct Metasrv {
state: StateRef,
started: Arc<AtomicBool>,
@@ -353,8 +352,8 @@ pub struct Metasrv {
selector: SelectorRef,
// The flow selector is used to select a target flownode.
flow_selector: SelectorRef,
handler_group: Option<HeartbeatHandlerGroupRef>,
handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
election: Option<ElectionRef>,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
@@ -371,15 +370,7 @@ pub struct Metasrv {
}
impl Metasrv {
pub async fn try_start(&mut self) -> Result<()> {
let builder = self
.handler_group_builder
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
self.handler_group = Some(Arc::new(builder.build()?));
pub async fn try_start(&self) -> Result<()> {
if self
.started
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
@@ -389,6 +380,16 @@ impl Metasrv {
return Ok(());
}
let handler_group_builder =
self.handler_group_builder
.lock()
.unwrap()
.take()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handler group builder",
})?;
*self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
// Creates default schema if not exists
self.table_metadata_manager
.init()
@@ -567,12 +568,8 @@ impl Metasrv {
&self.flow_selector
}
pub fn handler_group(&self) -> &Option<HeartbeatHandlerGroupRef> {
&self.handler_group
}
pub fn handler_group_builder(&mut self) -> &mut Option<HeartbeatHandlerGroupBuilder> {
&mut self.handler_group_builder
pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
self.handler_group.read().unwrap().clone()
}
pub fn election(&self) -> Option<&ElectionRef> {

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use client::client_manager::NodeClients;
@@ -371,8 +371,8 @@ impl MetasrvBuilder {
selector,
// TODO(jeremy): We do not allow configuring the flow selector.
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
handler_group: None,
handler_group_builder: Some(handler_group_builder),
handler_group: RwLock::new(None),
handler_group_builder: Mutex::new(Some(handler_group_builder)),
election,
procedure_manager,
mailbox,

View File

@@ -33,7 +33,7 @@ use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
pub struct MockInfo {
pub server_addr: String,
pub channel_manager: ChannelManager,
pub metasrv: Metasrv,
pub metasrv: Arc<Metasrv>,
}
pub async fn mock_with_memstore() -> MockInfo {
@@ -74,16 +74,17 @@ pub async fn mock(
None => builder,
};
let mut metasrv = builder.build().await.unwrap();
let metasrv = builder.build().await.unwrap();
metasrv.try_start().await.unwrap();
let (client, server) = tokio::io::duplex(1024);
let metasrv = Arc::new(metasrv);
let service = metasrv.clone();
let _handle = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(HeartbeatServer::new(service.clone()))
.add_service(StoreServer::new(service.clone()))
.add_service(ProcedureServiceServer::new(service.clone()))
.add_service(HeartbeatServer::from_arc(service.clone()))
.add_service(StoreServer::from_arc(service.clone()))
.add_service(ProcedureServiceServer::from_arc(service.clone()))
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
});

View File

@@ -30,7 +30,7 @@ use tonic::server::NamedService;
use crate::metasrv::Metasrv;
pub fn make_admin_service(metasrv: Metasrv) -> Admin {
pub fn make_admin_service(metasrv: Arc<Metasrv>) -> Admin {
let router = Router::new().route("/health", health::HealthHandler);
let router = router.route(

View File

@@ -46,12 +46,9 @@ impl heartbeat_server::Heartbeat for Metasrv {
) -> GrpcResult<Self::HeartbeatStream> {
let mut in_stream = req.into_inner();
let (tx, rx) = mpsc::channel(128);
let handler_group = self
.handler_group()
.clone()
.context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;
let handler_group = self.handler_group().context(error::UnexpectedSnafu {
violated: "expected heartbeat handlers",
})?;
let ctx = self.new_ctx();
let _handle = common_runtime::spawn_global(async move {

View File

@@ -72,7 +72,7 @@ pub struct GreptimeDbCluster {
pub datanode_instances: HashMap<DatanodeId, Datanode>,
pub kv_backend: KvBackendRef,
pub metasrv: Metasrv,
pub metasrv: Arc<Metasrv>,
pub frontend: Arc<FeInstance>,
}