diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 3311ee47b3..bacaf05cd5 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -13,6 +13,12 @@ use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS}; const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500); +/// Struct to store runtime state of the compute monitor thread. +/// In theory, this could be a part of `Compute`, but i) +/// this state is expected to be accessed only by single thread, +/// so we don't need to care about locking; ii) `Compute` is +/// already quite big. Thus, it seems to be a good idea to keep +/// all the activity/health monitoring parts here. struct ComputeMonitor { compute: Arc, @@ -70,12 +76,36 @@ impl ComputeMonitor { ) } + /// Check if compute is in some terminal or soon-to-be-terminal + /// state, then return `true`, signalling the caller that it + /// should exit gracefully. Otherwise, return `false`. + fn check_interrupts(&mut self) -> bool { + let compute_status = self.compute.get_status(); + if matches!( + compute_status, + ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed + ) { + info!( + "compute is in {} status, stopping compute monitor", + compute_status + ); + return true; + } + + false + } + /// Spin in a loop and figure out the last activity time in the Postgres. - /// Then update it in the shared state. This function never errors out. + /// Then update it in the shared state. This function currently never + /// errors out explicitly, but there is a graceful termination path. + /// Every time we receive an error trying to check Postgres, we use + /// [`ComputeMonitor::check_interrupts()`] because it could be that + /// compute is being terminated already, then we can exit gracefully + /// to not produce errors' noise in the log. /// NB: the only expected panic is at `Mutex` unwrap(), all other errors /// should be handled gracefully. #[instrument(skip_all)] - pub fn run(&mut self) { + pub fn run(&mut self) -> anyhow::Result<()> { // Suppose that `connstr` doesn't change let connstr = self.compute.params.connstr.clone(); let conf = self @@ -93,6 +123,10 @@ impl ComputeMonitor { info!("starting compute monitor for {}", connstr); loop { + if self.check_interrupts() { + break; + } + match &mut client { Ok(cli) => { if cli.is_closed() { @@ -100,6 +134,10 @@ impl ComputeMonitor { downtime_info = self.downtime_info(), "connection to Postgres is closed, trying to reconnect" ); + if self.check_interrupts() { + break; + } + self.report_down(); // Connection is closed, reconnect and try again. @@ -111,15 +149,19 @@ impl ComputeMonitor { self.compute.update_last_active(self.last_active); } Err(e) => { + error!( + downtime_info = self.downtime_info(), + "could not check Postgres: {}", e + ); + if self.check_interrupts() { + break; + } + // Although we have many places where we can return errors in `check()`, // normally it shouldn't happen. I.e., we will likely return error if // connection got broken, query timed out, Postgres returned invalid data, etc. // In all such cases it's suspicious, so let's report this as downtime. self.report_down(); - error!( - downtime_info = self.downtime_info(), - "could not check Postgres: {}", e - ); // Reconnect to Postgres just in case. During tests, I noticed // that queries in `check()` can fail with `connection closed`, @@ -136,6 +178,10 @@ impl ComputeMonitor { downtime_info = self.downtime_info(), "could not connect to Postgres: {}, retrying", e ); + if self.check_interrupts() { + break; + } + self.report_down(); // Establish a new connection and try again. @@ -147,6 +193,9 @@ impl ComputeMonitor { self.last_checked = Utc::now(); thread::sleep(MONITOR_CHECK_INTERVAL); } + + // Graceful termination path + Ok(()) } #[instrument(skip_all)] @@ -429,7 +478,10 @@ pub fn launch_monitor(compute: &Arc) -> thread::JoinHandle<()> { .spawn(move || { let span = span!(Level::INFO, "compute_monitor"); let _enter = span.enter(); - monitor.run(); + match monitor.run() { + Ok(_) => info!("compute monitor thread terminated gracefully"), + Err(err) => error!("compute monitor thread terminated abnormally {:?}", err), + } }) .expect("cannot launch compute monitor thread") }