mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 11:20:38 +00:00
feat: start services after first heartbeat response processed (#2424)
* feat: start services after first heartbeat response processed * refactor: watch changes in RegionAliveKeeper * feat: add coordination to DatanodeOptions * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: enable coordination in sqlness
This commit is contained in:
@@ -8,6 +8,9 @@ rpc_addr = "127.0.0.1:3001"
|
||||
rpc_hostname = "127.0.0.1"
|
||||
# The number of gRPC server worker threads, 8 by default.
|
||||
rpc_runtime_size = 8
|
||||
# Start services after regions are coordinated.
|
||||
# It will block the datanode start if it can't receive the heartbeat from metasrv.
|
||||
coordination = false
|
||||
|
||||
[heartbeat]
|
||||
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
|
||||
|
||||
@@ -34,6 +34,8 @@ use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{Duration, Instant};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
const MAX_CLOSE_RETRY_TIMES: usize = 10;
|
||||
@@ -54,7 +56,7 @@ pub struct RegionAliveKeeper {
|
||||
region_server: RegionServer,
|
||||
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
|
||||
heartbeat_interval_millis: u64,
|
||||
started: AtomicBool,
|
||||
started: Arc<AtomicBool>,
|
||||
|
||||
/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
|
||||
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
|
||||
@@ -69,7 +71,7 @@ impl RegionAliveKeeper {
|
||||
region_server,
|
||||
tasks: Arc::new(Mutex::new(HashMap::new())),
|
||||
heartbeat_interval_millis,
|
||||
started: AtomicBool::new(false),
|
||||
started: Arc::new(AtomicBool::new(false)),
|
||||
epoch: Instant::now(),
|
||||
}
|
||||
}
|
||||
@@ -141,17 +143,72 @@ impl RegionAliveKeeper {
|
||||
deadline
|
||||
}
|
||||
|
||||
pub async fn start(&self) {
|
||||
pub async fn start(
|
||||
self: &Arc<Self>,
|
||||
event_receiver: Option<RegionServerEventReceiver>,
|
||||
) -> Result<()> {
|
||||
self.started.store(true, Ordering::Relaxed);
|
||||
|
||||
if let Some(mut event_receiver) = event_receiver {
|
||||
let keeper = self.clone();
|
||||
// Initializers region alive keeper.
|
||||
// It makes sure all opened regions are registered to `RegionAliveKeeper.`
|
||||
loop {
|
||||
match event_receiver.0.try_recv() {
|
||||
Ok(RegionServerEvent::Registered(region_id)) => {
|
||||
keeper.register_region(region_id).await;
|
||||
}
|
||||
Ok(RegionServerEvent::Deregistered(region_id)) => {
|
||||
keeper.deregister_region(region_id).await;
|
||||
}
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||||
return error::UnexpectedSnafu {
|
||||
violated: "RegionServerEventSender closed",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
Err(mpsc::error::TryRecvError::Empty) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
let running = self.started.clone();
|
||||
|
||||
// Watches changes
|
||||
common_runtime::spawn_bg(async move {
|
||||
loop {
|
||||
if !running.load(Ordering::Relaxed) {
|
||||
info!("RegionAliveKeeper stopped! Quits the watch loop!");
|
||||
break;
|
||||
}
|
||||
|
||||
match event_receiver.0.recv().await {
|
||||
Some(RegionServerEvent::Registered(region_id)) => {
|
||||
keeper.register_region(region_id).await;
|
||||
}
|
||||
Some(RegionServerEvent::Deregistered(region_id)) => {
|
||||
keeper.deregister_region(region_id).await;
|
||||
}
|
||||
None => {
|
||||
info!("RegionServerEventSender closed! Quits the watch loop!");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let tasks = self.tasks.lock().await;
|
||||
for task in tasks.values() {
|
||||
task.start(self.heartbeat_interval_millis).await;
|
||||
}
|
||||
self.started.store(true, Ordering::Relaxed);
|
||||
|
||||
info!(
|
||||
"RegionAliveKeeper is started with region {:?}",
|
||||
tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn epoch(&self) -> Instant {
|
||||
@@ -383,14 +440,14 @@ mod test {
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn region_alive_keeper() {
|
||||
let region_server = mock_region_server();
|
||||
let alive_keeper = RegionAliveKeeper::new(region_server, 300);
|
||||
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server, 300));
|
||||
let region_id = RegionId::new(1, 2);
|
||||
|
||||
// register a region before starting
|
||||
alive_keeper.register_region(region_id).await;
|
||||
assert!(alive_keeper.find_handle(region_id).await.is_some());
|
||||
|
||||
alive_keeper.start().await;
|
||||
alive_keeper.start(None).await.unwrap();
|
||||
|
||||
// started alive keeper should assign deadline to this region
|
||||
let deadline = alive_keeper.deadline(region_id).await.unwrap();
|
||||
|
||||
@@ -320,6 +320,7 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
|
||||
pub struct DatanodeOptions {
|
||||
pub mode: Mode,
|
||||
pub node_id: Option<u64>,
|
||||
pub coordination: bool,
|
||||
pub rpc_addr: String,
|
||||
pub rpc_hostname: Option<String>,
|
||||
pub rpc_runtime_size: usize,
|
||||
@@ -339,6 +340,7 @@ impl Default for DatanodeOptions {
|
||||
Self {
|
||||
mode: Mode::Standalone,
|
||||
node_id: None,
|
||||
coordination: false,
|
||||
rpc_addr: "127.0.0.1:3001".to_string(),
|
||||
rpc_hostname: None,
|
||||
rpc_runtime_size: 8,
|
||||
|
||||
@@ -44,6 +44,7 @@ use store_api::region_engine::RegionEngineRef;
|
||||
use store_api::region_request::{RegionOpenRequest, RegionRequest};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::fs;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use crate::config::{DatanodeOptions, RegionEngineConfig};
|
||||
use crate::error::{
|
||||
@@ -71,6 +72,7 @@ pub struct Datanode {
|
||||
region_event_receiver: Option<RegionServerEventReceiver>,
|
||||
region_server: RegionServer,
|
||||
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
|
||||
coordinated_notifier: Option<Arc<Notify>>,
|
||||
}
|
||||
|
||||
impl Datanode {
|
||||
@@ -78,6 +80,7 @@ impl Datanode {
|
||||
info!("Starting datanode instance...");
|
||||
|
||||
self.start_heartbeat().await?;
|
||||
self.wait_coordinated().await;
|
||||
|
||||
let _ = self.greptimedb_telemetry_task.start();
|
||||
self.start_services().await
|
||||
@@ -87,11 +90,20 @@ impl Datanode {
|
||||
if let Some(task) = &self.heartbeat_task {
|
||||
// Safety: The event_receiver must exist.
|
||||
let receiver = self.region_event_receiver.take().unwrap();
|
||||
task.start(receiver).await?;
|
||||
|
||||
task.start(receiver, self.coordinated_notifier.clone())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// If `coordinated_notifier` exists, it waits for all regions to be coordinated.
|
||||
pub async fn wait_coordinated(&mut self) {
|
||||
if let Some(notifier) = self.coordinated_notifier.take() {
|
||||
notifier.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Start services of datanode. This method call will block until services are shutdown.
|
||||
pub async fn start_services(&mut self) -> Result<()> {
|
||||
if let Some(service) = self.services.as_mut() {
|
||||
@@ -237,6 +249,12 @@ impl DatanodeBuilder {
|
||||
)
|
||||
.await;
|
||||
|
||||
let coordinated_notifier = if self.opts.coordination && matches!(mode, Mode::Distributed) {
|
||||
Some(Arc::new(Notify::new()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Datanode {
|
||||
opts: self.opts,
|
||||
services,
|
||||
@@ -244,6 +262,7 @@ impl DatanodeBuilder {
|
||||
region_server,
|
||||
greptimedb_telemetry_task,
|
||||
region_event_receiver,
|
||||
coordinated_notifier,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -28,14 +28,14 @@ use common_telemetry::{debug, error, info, trace, warn};
|
||||
use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder};
|
||||
use meta_client::MetaClientOptions;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{mpsc, Notify};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use self::handler::RegionHeartbeatResponseHandler;
|
||||
use crate::alive_keeper::RegionAliveKeeper;
|
||||
use crate::config::DatanodeOptions;
|
||||
use crate::error::{self, MetaClientInitSnafu, Result};
|
||||
use crate::event_listener::{RegionServerEvent, RegionServerEventReceiver};
|
||||
use crate::event_listener::RegionServerEventReceiver;
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
pub(crate) mod handler;
|
||||
@@ -96,6 +96,7 @@ impl HeartbeatTask {
|
||||
running: Arc<AtomicBool>,
|
||||
handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
mailbox: MailboxRef,
|
||||
mut notify: Option<Arc<Notify>>,
|
||||
) -> Result<HeartbeatSender> {
|
||||
let client_id = meta_client.id();
|
||||
|
||||
@@ -111,11 +112,13 @@ impl HeartbeatTask {
|
||||
if let Some(msg) = res.mailbox_message.as_ref() {
|
||||
info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
|
||||
}
|
||||
|
||||
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
|
||||
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
if let Some(notify) = notify.take() {
|
||||
notify.notify_one();
|
||||
}
|
||||
if !running.load(Ordering::Acquire) {
|
||||
info!("Heartbeat task shutdown");
|
||||
}
|
||||
@@ -137,7 +140,11 @@ impl HeartbeatTask {
|
||||
}
|
||||
|
||||
/// Start heartbeat task, spawn background task.
|
||||
pub async fn start(&self, mut event_receiver: RegionServerEventReceiver) -> Result<()> {
|
||||
pub async fn start(
|
||||
&self,
|
||||
event_receiver: RegionServerEventReceiver,
|
||||
notify: Option<Arc<Notify>>,
|
||||
) -> Result<()> {
|
||||
let running = self.running.clone();
|
||||
if running
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
@@ -152,8 +159,6 @@ impl HeartbeatTask {
|
||||
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
|
||||
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
|
||||
|
||||
self.region_alive_keeper.start().await;
|
||||
|
||||
let meta_client = self.meta_client.clone();
|
||||
let region_server_clone = self.region_server.clone();
|
||||
|
||||
@@ -167,6 +172,7 @@ impl HeartbeatTask {
|
||||
running.clone(),
|
||||
handler_executor.clone(),
|
||||
mailbox.clone(),
|
||||
notify,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -176,31 +182,7 @@ impl HeartbeatTask {
|
||||
});
|
||||
let epoch = self.region_alive_keeper.epoch();
|
||||
|
||||
let keeper = self.region_alive_keeper.clone();
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
loop {
|
||||
if !running.load(Ordering::Relaxed) {
|
||||
info!("shutdown heartbeat task");
|
||||
break;
|
||||
}
|
||||
|
||||
match event_receiver.0.recv().await {
|
||||
Some(RegionServerEvent::Registered(region_id)) => {
|
||||
keeper.register_region(region_id).await;
|
||||
}
|
||||
Some(RegionServerEvent::Deregistered(region_id)) => {
|
||||
keeper.deregister_region(region_id).await;
|
||||
}
|
||||
None => {
|
||||
info!("region server event sender closed!");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let running = self.running.clone();
|
||||
self.region_alive_keeper.start(Some(event_receiver)).await?;
|
||||
|
||||
common_runtime::spawn_bg(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
@@ -256,6 +238,7 @@ impl HeartbeatTask {
|
||||
running.clone(),
|
||||
handler_executor.clone(),
|
||||
mailbox.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -3,6 +3,7 @@ mode = 'distributed'
|
||||
rpc_addr = '127.0.0.1:4100'
|
||||
rpc_hostname = '127.0.0.1'
|
||||
rpc_runtime_size = 8
|
||||
coordination = true
|
||||
|
||||
[wal]
|
||||
file_size = '1GB'
|
||||
|
||||
Reference in New Issue
Block a user