From cbb599f353a7489e18201dbbcf8e7d596f9bfb66 Mon Sep 17 00:00:00 2001 From: Nikita Kalyanov <44959448+nikitakalyanov@users.noreply.github.com> Date: Tue, 20 Feb 2024 19:42:36 +0200 Subject: [PATCH] Add /terminate API (#6745) this is to speed up suspends, see https://github.com/neondatabase/cloud/issues/10284 ## Problem ## Summary of changes ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist --- compute_tools/src/bin/compute_ctl.rs | 25 +++++------ compute_tools/src/compute.rs | 16 +++++++ compute_tools/src/http/api.rs | 55 ++++++++++++++++++++++++ compute_tools/src/http/openapi_spec.yaml | 23 ++++++++++ control_plane/src/endpoint.rs | 4 +- libs/compute_api/src/responses.rs | 4 ++ 6 files changed, 114 insertions(+), 13 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index a7e10d0aee..117919786e 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -45,7 +45,6 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; -use nix::sys::signal::{kill, Signal}; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; use tracing::{error, info}; @@ -53,7 +52,9 @@ use url::Url; use compute_api::responses::ComputeStatus; -use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID}; +use compute_tools::compute::{ + forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, +}; use compute_tools::configurator::launch_configurator; use compute_tools::extension_server::get_pg_version; use compute_tools::http::api::launch_http_server; @@ -394,6 +395,15 @@ fn main() -> Result<()> { info!("synced safekeepers at lsn {lsn}"); } + let mut state = compute.state.lock().unwrap(); + if state.status == ComputeStatus::TerminationPending { + state.status = ComputeStatus::Terminated; + compute.state_changed.notify_all(); + // we were asked to terminate gracefully, don't exit to avoid restart + delay_exit = true + } + drop(state); + if let Err(err) = compute.check_for_core_dumps() { error!("error while checking for core dumps: {err:?}"); } @@ -523,16 +533,7 @@ fn cli() -> clap::Command { /// wait for termination which would be easy then. fn handle_exit_signal(sig: i32) { info!("received {sig} termination signal"); - let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst); - if ss_pid != 0 { - let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32); - kill(ss_pid, Signal::SIGTERM).ok(); - } - let pg_pid = PG_PID.load(Ordering::SeqCst); - if pg_pid != 0 { - let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32); - kill(pg_pid, Signal::SIGTERM).ok(); - } + forward_termination_signal(); exit(1); } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 1c5363d048..142bb14fe5 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -28,6 +28,8 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec}; use utils::measured_stream::MeasuredReader; +use nix::sys::signal::{kill, Signal}; + use remote_storage::{DownloadError, RemotePath}; use crate::checker::create_availability_check_data; @@ -1322,3 +1324,17 @@ LIMIT 100", Ok(remote_ext_metrics) } } + +pub fn forward_termination_signal() { + let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst); + if ss_pid != 0 { + let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32); + kill(ss_pid, Signal::SIGTERM).ok(); + } + let pg_pid = PG_PID.load(Ordering::SeqCst); + if pg_pid != 0 { + let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32); + // use 'immediate' shutdown (SIGQUIT): https://www.postgresql.org/docs/current/server-shutdown.html + kill(pg_pid, Signal::SIGQUIT).ok(); + } +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index fa2c4cff28..f076951239 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -5,6 +5,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::thread; +use crate::compute::forward_termination_signal; use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_api::requests::ConfigurationRequest; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; @@ -123,6 +124,17 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /terminate POST request"); + match handle_terminate_request(compute).await { + Ok(()) => Response::new(Body::empty()), + Err((msg, code)) => { + error!("error handling /terminate request: {msg}"); + render_json_error(&msg, code) + } + } + } + // download extension files from remote extension storage on demand (&Method::POST, route) if route.starts_with("/extension_server/") => { info!("serving {:?} POST request", route); @@ -297,6 +309,49 @@ fn render_json_error(e: &str, status: StatusCode) -> Response { .unwrap() } +async fn handle_terminate_request(compute: &Arc) -> Result<(), (String, StatusCode)> { + { + let mut state = compute.state.lock().unwrap(); + if state.status == ComputeStatus::Terminated { + return Ok(()); + } + if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { + let msg = format!( + "invalid compute status for termination request: {:?}", + state.status.clone() + ); + return Err((msg, StatusCode::PRECONDITION_FAILED)); + } + state.status = ComputeStatus::TerminationPending; + compute.state_changed.notify_all(); + drop(state); + } + forward_termination_signal(); + info!("sent signal and notified waiters"); + + // Spawn a blocking thread to wait for compute to become Terminated. + // This is needed to do not block the main pool of workers and + // be able to serve other requests while some particular request + // is waiting for compute to finish configuration. + let c = compute.clone(); + task::spawn_blocking(move || { + let mut state = c.state.lock().unwrap(); + while state.status != ComputeStatus::Terminated { + state = c.state_changed.wait(state).unwrap(); + info!( + "waiting for compute to become Terminated, current status: {:?}", + state.status + ); + } + + Ok(()) + }) + .await + .unwrap()?; + info!("terminated Postgres"); + Ok(()) +} + // Main Hyper HTTP server function that runs it and blocks waiting on it forever. #[tokio::main] async fn serve(port: u16, state: Arc) { diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index cedc6ece8f..d2ec54299f 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -168,6 +168,29 @@ paths: schema: $ref: "#/components/schemas/GenericError" + /terminate: + post: + tags: + - Terminate + summary: Terminate Postgres and wait for it to exit + description: "" + operationId: terminate + responses: + 200: + description: Result + 412: + description: "wrong state" + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + 500: + description: "Unexpected error" + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + components: securitySchemes: JWT: diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index f1fe12e05f..ce8f035dfc 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -652,7 +652,9 @@ impl Endpoint { } ComputeStatus::Empty | ComputeStatus::ConfigurationPending - | ComputeStatus::Configuration => { + | ComputeStatus::Configuration + | ComputeStatus::TerminationPending + | ComputeStatus::Terminated => { bail!("unexpected compute status: {:?}", state.status) } } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 92bbf79cd4..fd0c90d447 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -52,6 +52,10 @@ pub enum ComputeStatus { // compute will exit soon or is waiting for // control-plane to terminate it. Failed, + // Termination requested + TerminationPending, + // Terminated Postgres + Terminated, } fn rfc3339_serialize(x: &Option>, s: S) -> Result