Use neonvm-daemon's control socket for swap resizing and setting disk quota

ATTENTION: This requires a new-enough neonvm-runner image, which contains
the new interface in neonvm-daemon
This commit is contained in:
Heikki Linnakangas
2025-02-10 17:14:58 +02:00
parent 7263076c17
commit 9a5a7ebac2
7 changed files with 113 additions and 78 deletions

3
Cargo.lock generated
View File

@@ -1315,6 +1315,9 @@ dependencies = [
"flate2",
"futures",
"http 1.1.0",
"http-body-util",
"hyper 1.4.1",
"hyper-util",
"jsonwebtoken",
"metrics",
"nix 0.27.1",

View File

@@ -24,6 +24,9 @@ fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
http-body-util.workspace = true
hyper-util.workspace = true
hyper.workspace = true
jsonwebtoken.workspace = true
metrics.workspace = true
nix.workspace = true

View File

@@ -47,9 +47,9 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Parser;
use compute_tools::disk_quota::set_disk_quota;
use compute_tools::http::server::Server;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use compute_tools::neonvmd_client::{resize_swap, set_disk_quota};
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
use tracing::{error, info, warn};
@@ -67,7 +67,6 @@ use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use utils::failpoint_support;
@@ -147,6 +146,7 @@ struct Cli {
#[arg(long, action = clap::ArgAction::SetTrue)]
pub resize_swap_on_bind: bool,
/// This is no longer used for anything. It's kept for now just for backwards-compatibility.
#[arg(long)]
pub set_disk_quota_for_fs: Option<String>,
@@ -474,10 +474,8 @@ fn start_postgres(
}
// Set disk quota if the compute spec says so
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
{
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
if let Some(disk_quota_bytes) = disk_quota_bytes {
match set_disk_quota(disk_quota_bytes) {
Ok(()) => {
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
info!(%disk_quota_bytes, %size_mib, "set disk quota");

View File

@@ -1,25 +0,0 @@
use anyhow::Context;
pub const DISK_QUOTA_BIN: &str = "/neonvm/bin/set-disk-quota";
/// If size_bytes is 0, it disables the quota. Otherwise, it sets filesystem quota to size_bytes.
/// `fs_mountpoint` should point to the mountpoint of the filesystem where the quota should be set.
pub fn set_disk_quota(size_bytes: u64, fs_mountpoint: &str) -> anyhow::Result<()> {
let size_kb = size_bytes / 1024;
// run `/neonvm/bin/set-disk-quota {size_kb} {mountpoint}`
let child_result = std::process::Command::new("/usr/bin/sudo")
.arg(DISK_QUOTA_BIN)
.arg(size_kb.to_string())
.arg(fs_mountpoint)
.spawn();
child_result
.context("spawn() failed")
.and_then(|mut child| child.wait().context("wait() failed"))
.and_then(|status| match status.success() {
true => Ok(()),
false => Err(anyhow::anyhow!("process exited with {status}")),
})
// wrap any prior error with the overall context that we couldn't run the command
.with_context(|| format!("could not run `/usr/bin/sudo {DISK_QUOTA_BIN}`"))
}

View File

@@ -11,7 +11,6 @@ pub mod http;
pub mod logger;
pub mod catalog;
pub mod compute;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;
pub mod local_proxy;
@@ -19,9 +18,9 @@ pub mod lsn_lease;
pub mod metrics;
mod migration;
pub mod monitor;
pub mod neonvmd_client;
pub mod params;
pub mod pg_helpers;
pub mod spec;
mod spec_apply;
pub mod swap;
pub mod sync_sk;

View File

@@ -0,0 +1,102 @@
use anyhow::Context;
use hyper::client::conn;
use hyper::client::conn::http1::SendRequest;
use hyper::{Request, StatusCode};
use hyper_util::rt::TokioIo;
use tracing::warn;
const NEONVM_DAEMON_CONTROL_SOCKET_PATH: &str = "/neonvm/run/neonvm-daemon-socket";
/// Open a connection to neonvm-daemon's control socket, prepare to send
/// requests to it with hyper.
async fn connect_neonvm_daemon<B>() -> anyhow::Result<SendRequest<B>>
where
B: hyper::body::Body + 'static + Send,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let mut attempts = 0;
let stream = loop {
match tokio::net::UnixStream::connect(NEONVM_DAEMON_CONTROL_SOCKET_PATH).await {
Ok(stream) => break stream,
Err(err) if err.kind() == std::io::ErrorKind::NotFound && attempts < 50 => {
// Retry
warn!("neonvm-daemon control socket not found, retrying...");
attempts += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Err(err) => Err(err).context("opening neonvm-daemon control socket")?,
}
};
let io = TokioIo::new(stream);
let (request_sender, connection) = conn::http1::handshake(io).await.unwrap();
// spawn a task to poll the connection and drive the HTTP state
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("Error in connection: {}", e);
}
});
Ok(request_sender)
}
pub fn resize_swap(size_bytes: u64) -> anyhow::Result<()> {
let rt = tokio::runtime::Handle::current();
rt.block_on(resize_swap_async(size_bytes))
}
pub async fn resize_swap_async(size_bytes: u64) -> anyhow::Result<()> {
let mut neonvmd = connect_neonvm_daemon().await?;
// Passing 'once' causes neonvm-daemon to reject any future resize requests
let request = Request::builder()
.method("POST")
.uri("/resize-swap-once")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(format!("{}", size_bytes))
.unwrap();
let resp = neonvmd.send_request(request).await?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
StatusCode::CONFLICT => {
// 409 Conflict means that the swap was already resized. That happens if the
// compute_ctl restarts within the VM. That's considered OK.
warn!("Swap was already resized");
Ok(())
}
_ => Err(anyhow::anyhow!(
"error resizing swap: {}",
status.to_string()
)),
}
}
pub fn set_disk_quota(size_bytes: u64) -> anyhow::Result<()> {
let rt = tokio::runtime::Handle::current();
rt.block_on(set_disk_quota_async(size_bytes))
}
/// If size_bytes is 0, it disables the quota. Otherwise, it sets filesystem quota to size_bytes.
pub async fn set_disk_quota_async(size_bytes: u64) -> anyhow::Result<()> {
let mut neonvmd = connect_neonvm_daemon().await?;
let request = Request::builder()
.method("POST")
.uri("/set-disk-quota")
.header("Host", "localhost") // hyper requires Host, even though the server won't care
.body(format!("{}", size_bytes))
.unwrap();
let resp = neonvmd.send_request(request).await?;
let status = resp.status();
match status {
StatusCode::OK => Ok(()),
_ => Err(anyhow::anyhow!(
"error setting disk quota: {}",
status.to_string()
)),
}
}

View File

@@ -1,45 +0,0 @@
use std::path::Path;
use anyhow::{anyhow, Context};
use tracing::warn;
pub const RESIZE_SWAP_BIN: &str = "/neonvm/bin/resize-swap";
pub fn resize_swap(size_bytes: u64) -> anyhow::Result<()> {
// run `/neonvm/bin/resize-swap --once {size_bytes}`
//
// Passing '--once' causes resize-swap to delete itself after successful completion, which
// means that if compute_ctl restarts later, we won't end up calling 'swapoff' while
// postgres is running.
//
// NOTE: resize-swap is not very clever. If present, --once MUST be the first arg.
let child_result = std::process::Command::new("/usr/bin/sudo")
.arg(RESIZE_SWAP_BIN)
.arg("--once")
.arg(size_bytes.to_string())
.spawn();
child_result
.context("spawn() failed")
.and_then(|mut child| child.wait().context("wait() failed"))
.and_then(|status| match status.success() {
true => Ok(()),
false => {
// The command failed. Maybe it was because the resize-swap file doesn't exist?
// The --once flag causes it to delete itself on success so we don't disable swap
// while postgres is running; maybe this is fine.
match Path::new(RESIZE_SWAP_BIN).try_exists() {
Err(_) | Ok(true) => Err(anyhow!("process exited with {status}")),
// The path doesn't exist; we're actually ok
Ok(false) => {
warn!("ignoring \"not found\" error from resize-swap to avoid swapoff while compute is running");
Ok(())
},
}
}
})
// wrap any prior error with the overall context that we couldn't run the command
.with_context(|| {
format!("could not run `/usr/bin/sudo {RESIZE_SWAP_BIN} --once {size_bytes}`")
})
}