[compute_ctl] Implement live reconfiguration (#3980)

With this commit one can request compute reconfiguration
from the running `compute_ctl` with compute in `Running` state
by sending a new spec:
```shell
curl -d "{\"spec\": $(cat ./compute-spec-new.json)}" http://localhost:3080/configure
```

Internally, we start a separate configurator thread that is waiting on
`Condvar` for `ConfigurationPending` compute state in a loop. Then it does
reconfiguration, sets compute back to `Running` state and notifies other
waiters.

It will need some follow-ups, e.g. for retry logic for control-plane
requests, but should be useful for testing in the current state. This
shouldn't affect any existing environment, since computes are configured
in a different way there.

Resolves neondatabase/cloud#4433
This commit is contained in:
Alexey Kondratov
2023-04-13 18:07:29 +02:00
committed by GitHub
parent 36c20946b4
commit db8dd6f380
7 changed files with 126 additions and 7 deletions

View File

@@ -46,6 +46,7 @@ use url::Url;
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -175,6 +176,8 @@ fn main() -> Result<()> {
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;

View File

@@ -356,6 +356,48 @@ impl ComputeNode {
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
// 'Close' connection
drop(client);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
"finished reconfiguration of compute node for operation {}",
op_id
);
Ok(())
}
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();

View File

@@ -0,0 +1,54 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus;
use crate::compute::ComputeNode;
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let state = compute.state.lock().unwrap();
let mut state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", state.status);
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
}

View File

@@ -155,7 +155,7 @@ async fn handle_configure_request(
// ```
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty {
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for configuration request: {:?}",
state.status.clone()

View File

@@ -4,6 +4,7 @@
//!
pub mod checker;
pub mod config;
pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;

View File

@@ -1,7 +1,7 @@
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use anyhow::{anyhow, bail, Result};
use postgres::config::Config;
use postgres::{Client, NoTls};
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
@@ -10,6 +10,7 @@ use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::responses::ControlPlaneSpecResponse;
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
@@ -26,13 +27,19 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<C
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
let resp: ControlPlaneSpecResponse = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
.send()
.map_err(|e| anyhow!("could not send spec request to control plane: {}", e))?
.json()
.map_err(|e| anyhow!("could not get compute spec from control plane: {}", e))?;
Ok(spec)
if let Some(spec) = resp.spec {
Ok(spec)
} else {
bail!("could not get compute spec from control plane")
}
}
/// It takes cluster specification and does the following:

View File

@@ -1,7 +1,9 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use chrono::{DateTime, Utc};
use serde::{Serialize, Serializer};
use serde::{Deserialize, Serialize, Serializer};
use crate::spec::ComputeSpec;
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
@@ -43,6 +45,8 @@ pub enum ComputeStatus {
Init,
// Compute is configured and running.
Running,
// New spec is being applied.
Configuration,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
@@ -64,3 +68,11 @@ pub struct ComputeMetrics {
pub config_ms: u64,
pub total_startup_ms: u64,
}
/// Response of the `/computes/{compute_id}/spec` control-plane API.
/// This is not actually a compute API response, so consider moving
/// to a different place.
#[derive(Deserialize, Debug)]
pub struct ControlPlaneSpecResponse {
pub spec: Option<ComputeSpec>,
}