diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs
index a7e10d0aee..117919786e 100644
--- a/compute_tools/src/bin/compute_ctl.rs
+++ b/compute_tools/src/bin/compute_ctl.rs
@@ -45,7 +45,6 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
-use nix::sys::signal::{kill, Signal};
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info};
@@ -53,7 +52,9 @@ use url::Url;
use compute_api::responses::ComputeStatus;
-use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec, PG_PID, SYNC_SAFEKEEPERS_PID};
+use compute_tools::compute::{
+ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
+};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
@@ -394,6 +395,15 @@ fn main() -> Result<()> {
info!("synced safekeepers at lsn {lsn}");
}
+ let mut state = compute.state.lock().unwrap();
+ if state.status == ComputeStatus::TerminationPending {
+ state.status = ComputeStatus::Terminated;
+ compute.state_changed.notify_all();
+ // we were asked to terminate gracefully, don't exit to avoid restart
+ delay_exit = true
+ }
+ drop(state);
+
if let Err(err) = compute.check_for_core_dumps() {
error!("error while checking for core dumps: {err:?}");
}
@@ -523,16 +533,7 @@ fn cli() -> clap::Command {
/// wait for termination which would be easy then.
fn handle_exit_signal(sig: i32) {
info!("received {sig} termination signal");
- let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
- if ss_pid != 0 {
- let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
- kill(ss_pid, Signal::SIGTERM).ok();
- }
- let pg_pid = PG_PID.load(Ordering::SeqCst);
- if pg_pid != 0 {
- let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
- kill(pg_pid, Signal::SIGTERM).ok();
- }
+ forward_termination_signal();
exit(1);
}
diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs
index 1c5363d048..142bb14fe5 100644
--- a/compute_tools/src/compute.rs
+++ b/compute_tools/src/compute.rs
@@ -28,6 +28,8 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
+use nix::sys::signal::{kill, Signal};
+
use remote_storage::{DownloadError, RemotePath};
use crate::checker::create_availability_check_data;
@@ -1322,3 +1324,17 @@ LIMIT 100",
Ok(remote_ext_metrics)
}
}
+
+pub fn forward_termination_signal() {
+ let ss_pid = SYNC_SAFEKEEPERS_PID.load(Ordering::SeqCst);
+ if ss_pid != 0 {
+ let ss_pid = nix::unistd::Pid::from_raw(ss_pid as i32);
+ kill(ss_pid, Signal::SIGTERM).ok();
+ }
+ let pg_pid = PG_PID.load(Ordering::SeqCst);
+ if pg_pid != 0 {
+ let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
+ // use 'immediate' shutdown (SIGQUIT): https://www.postgresql.org/docs/current/server-shutdown.html
+ kill(pg_pid, Signal::SIGQUIT).ok();
+ }
+}
diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs
index fa2c4cff28..f076951239 100644
--- a/compute_tools/src/http/api.rs
+++ b/compute_tools/src/http/api.rs
@@ -5,6 +5,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
+use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
@@ -123,6 +124,17 @@ async fn routes(req: Request
, compute: &Arc) -> Response {
+ info!("serving /terminate POST request");
+ match handle_terminate_request(compute).await {
+ Ok(()) => Response::new(Body::empty()),
+ Err((msg, code)) => {
+ error!("error handling /terminate request: {msg}");
+ render_json_error(&msg, code)
+ }
+ }
+ }
+
// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
@@ -297,6 +309,49 @@ fn render_json_error(e: &str, status: StatusCode) -> Response {
.unwrap()
}
+async fn handle_terminate_request(compute: &Arc) -> Result<(), (String, StatusCode)> {
+ {
+ let mut state = compute.state.lock().unwrap();
+ if state.status == ComputeStatus::Terminated {
+ return Ok(());
+ }
+ if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
+ let msg = format!(
+ "invalid compute status for termination request: {:?}",
+ state.status.clone()
+ );
+ return Err((msg, StatusCode::PRECONDITION_FAILED));
+ }
+ state.status = ComputeStatus::TerminationPending;
+ compute.state_changed.notify_all();
+ drop(state);
+ }
+ forward_termination_signal();
+ info!("sent signal and notified waiters");
+
+ // Spawn a blocking thread to wait for compute to become Terminated.
+ // This is needed to do not block the main pool of workers and
+ // be able to serve other requests while some particular request
+ // is waiting for compute to finish configuration.
+ let c = compute.clone();
+ task::spawn_blocking(move || {
+ let mut state = c.state.lock().unwrap();
+ while state.status != ComputeStatus::Terminated {
+ state = c.state_changed.wait(state).unwrap();
+ info!(
+ "waiting for compute to become Terminated, current status: {:?}",
+ state.status
+ );
+ }
+
+ Ok(())
+ })
+ .await
+ .unwrap()?;
+ info!("terminated Postgres");
+ Ok(())
+}
+
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc) {
diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml
index cedc6ece8f..d2ec54299f 100644
--- a/compute_tools/src/http/openapi_spec.yaml
+++ b/compute_tools/src/http/openapi_spec.yaml
@@ -168,6 +168,29 @@ paths:
schema:
$ref: "#/components/schemas/GenericError"
+ /terminate:
+ post:
+ tags:
+ - Terminate
+ summary: Terminate Postgres and wait for it to exit
+ description: ""
+ operationId: terminate
+ responses:
+ 200:
+ description: Result
+ 412:
+ description: "wrong state"
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/GenericError"
+ 500:
+ description: "Unexpected error"
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/GenericError"
+
components:
securitySchemes:
JWT:
diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs
index f1fe12e05f..ce8f035dfc 100644
--- a/control_plane/src/endpoint.rs
+++ b/control_plane/src/endpoint.rs
@@ -652,7 +652,9 @@ impl Endpoint {
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
- | ComputeStatus::Configuration => {
+ | ComputeStatus::Configuration
+ | ComputeStatus::TerminationPending
+ | ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}
}
diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs
index 92bbf79cd4..fd0c90d447 100644
--- a/libs/compute_api/src/responses.rs
+++ b/libs/compute_api/src/responses.rs
@@ -52,6 +52,10 @@ pub enum ComputeStatus {
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
+ // Termination requested
+ TerminationPending,
+ // Terminated Postgres
+ Terminated,
}
fn rfc3339_serialize(x: &Option>, s: S) -> Result