mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 08:29:59 +00:00
refactor: rename coordination to require_lease_before_startup (#2431)
This commit is contained in:
@@ -8,9 +8,9 @@ rpc_addr = "127.0.0.1:3001"
|
||||
rpc_hostname = "127.0.0.1"
|
||||
# The number of gRPC server worker threads, 8 by default.
|
||||
rpc_runtime_size = 8
|
||||
# Start services after regions are coordinated.
|
||||
# It will block the datanode start if it can't receive the heartbeat from metasrv.
|
||||
coordination = false
|
||||
# Start services after regions have obtained leases.
|
||||
# It will block the datanode start if it can't receive leases in the heartbeat from metasrv.
|
||||
require_lease_before_startup = false
|
||||
|
||||
[heartbeat]
|
||||
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
|
||||
|
||||
@@ -320,7 +320,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
|
||||
pub struct DatanodeOptions {
|
||||
pub mode: Mode,
|
||||
pub node_id: Option<u64>,
|
||||
pub coordination: bool,
|
||||
pub require_lease_before_startup: bool,
|
||||
pub rpc_addr: String,
|
||||
pub rpc_hostname: Option<String>,
|
||||
pub rpc_runtime_size: usize,
|
||||
@@ -340,7 +340,7 @@ impl Default for DatanodeOptions {
|
||||
Self {
|
||||
mode: Mode::Standalone,
|
||||
node_id: None,
|
||||
coordination: false,
|
||||
require_lease_before_startup: false,
|
||||
rpc_addr: "127.0.0.1:3001".to_string(),
|
||||
rpc_hostname: None,
|
||||
rpc_runtime_size: 8,
|
||||
|
||||
@@ -72,7 +72,7 @@ pub struct Datanode {
|
||||
region_event_receiver: Option<RegionServerEventReceiver>,
|
||||
region_server: RegionServer,
|
||||
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
|
||||
coordinated_notifier: Option<Arc<Notify>>,
|
||||
leases_notifier: Option<Arc<Notify>>,
|
||||
}
|
||||
|
||||
impl Datanode {
|
||||
@@ -91,15 +91,14 @@ impl Datanode {
|
||||
// Safety: The event_receiver must exist.
|
||||
let receiver = self.region_event_receiver.take().unwrap();
|
||||
|
||||
task.start(receiver, self.coordinated_notifier.clone())
|
||||
.await?;
|
||||
task.start(receiver, self.leases_notifier.clone()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// If `coordinated_notifier` exists, it waits for all regions to be coordinated.
|
||||
/// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
|
||||
pub async fn wait_coordinated(&mut self) {
|
||||
if let Some(notifier) = self.coordinated_notifier.take() {
|
||||
if let Some(notifier) = self.leases_notifier.take() {
|
||||
notifier.notified().await;
|
||||
}
|
||||
}
|
||||
@@ -249,11 +248,12 @@ impl DatanodeBuilder {
|
||||
)
|
||||
.await;
|
||||
|
||||
let coordinated_notifier = if self.opts.coordination && matches!(mode, Mode::Distributed) {
|
||||
Some(Arc::new(Notify::new()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let leases_notifier =
|
||||
if self.opts.require_lease_before_startup && matches!(mode, Mode::Distributed) {
|
||||
Some(Arc::new(Notify::new()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Datanode {
|
||||
opts: self.opts,
|
||||
@@ -262,7 +262,7 @@ impl DatanodeBuilder {
|
||||
region_server,
|
||||
greptimedb_telemetry_task,
|
||||
region_event_receiver,
|
||||
coordinated_notifier,
|
||||
leases_notifier,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ mode = 'distributed'
|
||||
rpc_addr = '127.0.0.1:4100'
|
||||
rpc_hostname = '127.0.0.1'
|
||||
rpc_runtime_size = 8
|
||||
coordination = true
|
||||
require_lease_before_startup = true
|
||||
|
||||
[wal]
|
||||
file_size = '1GB'
|
||||
|
||||
Reference in New Issue
Block a user