Compare commits

...

4 Commits

Author SHA1 Message Date
Alexey Kondratov
4e6a23f779 Polish API handler and refresh OpenAPI spec 2023-04-04 16:50:57 +02:00
Alexey Kondratov
69552822b4 Use Condvar and make configuration API blocking 2023-04-04 16:50:57 +02:00
Alexey Kondratov
f0359fdbc0 Allow starting compute_ctl without spec
With this commit one can start compute with something like
```shell
cargo run --bin compute_ctl -- -i no-compute \
 -p http://localhost:9095 \
 -D compute_pgdata \
 -C "postgresql://cloud_admin@127.0.0.1:5434/postgres" \
 -b ./pg_install/v15/bin/postgres
```
and it will hang waiting for spec.

Then send one spec
```shell
curl -d "$(cat ./compute-spec.json)" http://localhost:3080/spec
```
Postgres will be started and configured.

Then reconfigure it with
```shell
curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec
```

Most of safeguards and comments are added. Some polishing especially
around HTTP API is still needed.
2023-04-04 16:50:57 +02:00
Alexey Kondratov
17d6c568bf Implement live reconfiguration in the compute_ctl
Accept spec in JSON format and request compute reconfiguration from
the configurator thread. If anything goes wrong after we set the
compute state to `ConfigurationPending` and / or sent spec to the
configurator thread, we basically leave compute in the potentially
wrong state. That said, it's control-plane's responsibility to
watch compute state after reconfiguration request and to clean
restart it in case of errors.

It still lacks ability of starting up without spec and some validations,
i.e. that live reconfiguration should be only available with
`--compute-id` and `--control-plane-uri` options.

Otherwise, it works fine and could be tested by running `compute_ctl`
locally, then sending it a new spec:
```shell
curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec
```

We have one configurator thread and async http server, so generally we
have single consumer - multiple producers pattern here. That's why we
use `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of
producers is limited to one due to code logic, but we still need an
ability to potentially pass `Sender` to several threads.

Next, we use async `hyper` + `tokio` http server, but all the other code
is completely synchronous. So we need to send data from async to sync,
that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`.
It doesn't make much sense to rewrite all code to async now, but we can
consider doing this in the future.

I think that a combination of `Mutex` and `CondVar` would work just fine
too, but as we already have `tokio`, I decided to try something from it.
2023-04-04 16:50:57 +02:00
11 changed files with 528 additions and 145 deletions

View File

@@ -34,22 +34,23 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info};
use url::Url;
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_tools::configurator::launch_configurator;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
use url::Url;
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
@@ -62,7 +63,7 @@ fn main() -> Result<()> {
let connstr = matches
.get_one::<String>("connstr")
.expect("Postgres connection string is required");
let spec = matches.get_one::<String>("spec");
let spec_json = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
@@ -71,40 +72,110 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let spec: ComputeSpec = match spec {
let mut spec = Default::default();
let mut spec_set = false;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
Some(json) => serde_json::from_str(json)?,
Some(json) => {
spec = serde_json::from_str(json)?;
spec_set = true;
}
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
serde_json::from_reader(file)?
spec = serde_json::from_reader(file)?;
spec_set = true;
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = s;
spec_set = true;
}
} else {
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
panic!("must specify both --control-plane-uri and --compute-id or none");
}
} else {
panic!("compute spec should be provided via --spec or --spec-path argument");
panic!(
"compute spec should be provided by one of the following ways: \
--spec OR --spec-path OR --control-plane-uri and --compute-id"
);
}
}
};
let mut new_state = ComputeState::new();
if spec_set {
new_state.spec = spec;
}
// Volatile compute state under mutex and condition variable to notify everyone
// who is interested in the state changes.
let compute_state = (Mutex::new(new_state), Condvar::new());
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
metrics: ComputeMetrics::default(),
state: compute_state,
};
let compute = Arc::new(compute_node);
// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
let (state, state_changed) = &compute.state;
let mut state = state.lock().unwrap();
while state.status != ComputeStatus::ConfigurationPending {
state = state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got spec, continue configuration");
// Spec is already set by the http server handler.
break;
}
}
}
// We got all we need, fill in the state.
let (state, _) = &compute.state;
let mut state = state.lock().unwrap();
let pageserver_connstr = state
.spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = state.spec.storage_auth_token.clone();
let tenant = state
.spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = state
.spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let startup_tracing_context = state.spec.startup_tracing_context.clone();
state.pageserver_connstr = pageserver_connstr;
state.storage_auth_token = storage_auth_token;
state.tenant = tenant;
state.timeline = timeline;
state.status = ComputeStatus::Init;
drop(state);
// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
//
@@ -120,7 +191,7 @@ fn main() -> Result<()> {
// postgres is configured and up-and-running, we exit this span. Any other
// actions that are performed on incoming HTTP requests, for example, are
// performed in separate spans.
let startup_context_guard = if let Some(ref carrier) = spec.startup_tracing_context {
let startup_context_guard = if let Some(ref carrier) = startup_tracing_context {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
Some(TraceContextPropagator::new().extract(carrier).attach())
@@ -128,42 +199,10 @@ fn main() -> Result<()> {
None
};
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let compute_state = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
spec,
tenant,
timeline,
pageserver_connstr,
storage_auth_token,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
};
let compute = Arc::new(compute_state);
// Launch service threads first, so we were able to serve availability
// requests, while configuration is still in progress.
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;
@@ -172,7 +211,8 @@ fn main() -> Result<()> {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
let mut state = compute.state.write().unwrap();
let (state, _) = &compute.state;
let mut state = state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
@@ -262,7 +302,7 @@ fn cli() -> clap::Command {
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE"),
.value_name("CONTROL_PLANE_API_BASE_URI"),
)
}

View File

@@ -20,7 +20,8 @@ use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
// use std::sync::RwLock;
use std::sync::{Condvar, Mutex};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
@@ -41,16 +42,22 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub metrics: ComputeMetrics,
/// Volatile part of the `ComputeNode` so should be used under `RwLock`
/// to allow HTTP API server to serve status requests, while configuration
/// is in progress.
pub state: RwLock<ComputeState>,
// We only allow live re- / configuration of the compute node if
// it uses 'pull model', i.e. it can go to control-plane and fetch
// the latest configuration. Otherwise, there could be a case:
// - we start compute with some spec provided as argument
// - we push new spec and it does reconfiguration
// - but then something happens and compute pod / VM is destroyed,
// so k8s controller starts it again with the **old** spec
pub live_config_allowed: bool,
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
/// Coupled with `Condvar` to allow notifying HTTP API and configurator
/// thread about state changes. To allow HTTP API server to serving status
/// requests, while configuration is in progress, lock should be held only
/// for short periods of time to do read/write, not the whole configuration
/// process.
pub state: (Mutex<ComputeState>, Condvar),
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
@@ -60,7 +67,7 @@ where
x.to_rfc3339().serialize(s)
}
#[derive(Serialize)]
#[derive(Serialize, Clone)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
@@ -68,14 +75,27 @@ pub struct ComputeState {
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
#[serde(skip_serializing)]
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
#[serde(skip_serializing)]
pub pageserver_connstr: String,
#[serde(skip_serializing)]
pub storage_auth_token: Option<String>,
}
impl ComputeState {
pub fn new() -> Self {
Self {
status: ComputeStatus::Init,
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
spec: ComputeSpec::default(),
tenant: String::new(),
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
}
}
}
@@ -86,12 +106,25 @@ impl Default for ComputeState {
}
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq)]
#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided as start, waiting for it to be
// provided by control-plane.
Empty,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute is configured and running.
Running,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
// Control-plane requested reconfiguration.
ConfigurationPending,
// New spec is being applied.
Configuration,
}
#[derive(Default, Serialize)]
@@ -104,11 +137,16 @@ pub struct ComputeMetrics {
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
self.state.write().unwrap().status = status;
let (state, state_changed) = &self.state;
let mut state = state.lock().unwrap();
state.status = status;
state_changed.notify_all();
}
pub fn get_status(&self) -> ComputeStatus {
self.state.read().unwrap().status
// self.state.read().unwrap().status
let (state, _) = &self.state;
state.lock().unwrap().status
}
// Remove `pgdata` directory and create it again with right permissions.
@@ -124,15 +162,15 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self))]
fn get_basebackup(&self, lsn: &str) -> Result<()> {
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> {
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&self.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &self.storage_auth_token {
if let Some(storage_auth_token) = &compute_state.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
@@ -141,8 +179,14 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!("basebackup {} {}", &self.tenant, &self.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn),
"0/0" => format!(
"basebackup {} {}",
&compute_state.tenant, &compute_state.timeline
), // First start of the compute
_ => format!(
"basebackup {} {} {}",
&compute_state.tenant, &compute_state.timeline, lsn
),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -169,14 +213,14 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self))]
fn sync_safekeepers(&self) -> Result<String> {
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -217,9 +261,9 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self))]
pub fn prepare_pgdata(&self) -> Result<()> {
let spec = &self.spec;
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let spec = &compute_state.spec;
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
@@ -228,18 +272,18 @@ impl ComputeNode {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers()
.sync_safekeepers(compute_state.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
lsn, &self.pageserver_connstr
lsn, &compute_state.pageserver_connstr
);
self.get_basebackup(&lsn).with_context(|| {
self.get_basebackup(compute_state, &lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &self.pageserver_connstr
lsn, &compute_state.pageserver_connstr
)
})?;
@@ -252,13 +296,16 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip(self))]
pub fn start_postgres(&self) -> Result<std::process::Child> {
pub fn start_postgres(
&self,
storage_auth_token: Option<String>,
) -> Result<std::process::Child> {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = Command::new(&self.pgbin)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &self.storage_auth_token {
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
} else {
vec![]
@@ -271,8 +318,9 @@ impl ComputeNode {
Ok(pg)
}
#[instrument(skip(self))]
pub fn apply_config(&self) -> Result<()> {
/// Do initial configuration of the already started Postgres.
#[instrument(skip(self, compute_state))]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
@@ -303,19 +351,62 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&self.spec, &mut client)?;
handle_databases(&self.spec, &mut client)?;
handle_role_deletions(self, &mut client)?;
handle_grants(self, &mut client)?;
handle_roles(&compute_state.spec, &mut client)?;
handle_databases(&compute_state.spec, &mut client)?;
handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(&self.spec, &mut client)?;
handle_extensions(&compute_state.spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
self.spec.cluster.cluster_id
compute_state.spec.cluster.cluster_id
);
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let (state, _) = &self.state;
let spec = state.lock().unwrap().spec.clone();
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
// 'Close' connection
drop(client);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
"finished reconfiguration of compute node for operation {}",
op_id
);
Ok(())
@@ -323,21 +414,24 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
// let compute_state = self.state.read().unwrap().clone();
let (state, _) = &self.state;
let compute_state = state.lock().unwrap().clone();
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
self.spec.cluster.cluster_id,
self.spec.operation_uuid.as_ref().unwrap(),
self.tenant,
self.timeline,
compute_state.spec.cluster.cluster_id,
compute_state.spec.operation_uuid.as_ref().unwrap(),
compute_state.tenant,
compute_state.timeline,
);
self.prepare_pgdata()?;
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres()?;
let pg = self.start_postgres(compute_state.storage_auth_token.clone())?;
self.apply_config()?;
self.apply_config(&compute_state)?;
let startup_end_time = Utc::now();
self.metrics.config_ms.store(

View File

@@ -0,0 +1,53 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use crate::compute::{ComputeNode, ComputeStatus};
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
let (state, state_changed) = &compute.state;
loop {
let state = state.lock().unwrap();
let mut state = state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;
state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(2000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", state.status);
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
}

View File

@@ -3,7 +3,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::ComputeNode;
use crate::compute::{ComputeNode, ComputeStatus};
use crate::http::models::{ConfigurationRequest, GenericAPIError};
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
@@ -23,7 +25,9 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Serialized compute state.
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.read().unwrap();
// let state = compute.state.read().unwrap();
let (state, _) = &compute.state;
let state = state.lock().unwrap();
Response::new(Body::from(serde_json::to_string(&*state).unwrap()))
}
@@ -37,12 +41,29 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!("compute is not running, current status: {:?}", status);
error!(msg);
return Response::new(Body::from(msg));
}
let insights = compute.collect_insights().await;
Response::new(Body::from(insights))
}
(&Method::POST, "/check_writability") => {
info!("serving /check_writability POST request");
let status = compute.get_status();
if status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for check_writability request: {:?}",
status
);
error!(msg);
return Response::new(Body::from(msg));
}
let res = crate::checker::check_writability(compute).await;
match res {
Ok(_) => Response::new(Body::from("true")),
@@ -61,6 +82,61 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
))
}
// Accept spec in JSON format and request compute configuration from
// the configurator thread. If anything goes wrong after we set the
// compute state to `ConfigurationPending` and / or sent spec to the
// configurator thread, we basically leave compute in the potentially
// wrong state. That said, it's control-plane's responsibility to
// watch compute state after reconfiguration request and to clean
// restart in case of errors.
(&Method::POST, "/configure") => {
info!("serving /configure POST request");
if !compute.live_config_allowed {
let msg = "live reconfiguration is not allowed for this compute node";
error!(msg);
return render_json_error(msg, StatusCode::PRECONDITION_FAILED);
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
let spec = request.spec;
let (state, state_changed) = &compute.state;
let mut state = state.lock().unwrap();
if !(state.status == ComputeStatus::Empty || state.status == ComputeStatus::Running)
{
let msg = format!(
"invalid compute status for reconfiguration request: {}",
serde_json::to_string(&*state).unwrap()
);
error!(msg);
return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
}
state.spec = spec;
state.status = ComputeStatus::ConfigurationPending;
state_changed.notify_all();
drop(state);
info!("set new spec and notified configurator");
let (state, state_changed) = &compute.state;
let mut state = state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
}
// Return current compute state if everything went well.
Response::new(Body::from(serde_json::to_string(&*state).unwrap()))
} else {
let msg = "invalid spec";
error!(msg);
render_json_error(msg, StatusCode::BAD_REQUEST)
}
}
// Return the `404 Not Found` for any other routes.
_ => {
let mut not_found = Response::new(Body::from("404 Not Found"));
@@ -70,6 +146,16 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
let error = GenericAPIError {
error: e.to_string(),
};
Response::builder()
.status(status)
.body(Body::from(serde_json::to_string(&error).unwrap()))
.unwrap()
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(state: Arc<ComputeNode>) {
@@ -110,7 +196,6 @@ async fn serve(state: Arc<ComputeNode>) {
/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
pub fn launch_http_server(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
Ok(thread::Builder::new()
.name("http-endpoint".into())
.spawn(move || serve(state))?)

View File

@@ -1 +1,2 @@
pub mod api;
pub mod models;

View File

@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};
use crate::spec::ComputeSpec;
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}

View File

@@ -11,7 +11,7 @@ paths:
get:
tags:
- Info
summary: Get compute node internal status
summary: Get compute node internal status.
description: ""
operationId: getComputeStatus
responses:
@@ -26,7 +26,7 @@ paths:
get:
tags:
- Info
summary: Get compute node startup metrics in JSON format
summary: Get compute node startup metrics in JSON format.
description: ""
operationId: getComputeMetricsJSON
responses:
@@ -41,9 +41,9 @@ paths:
get:
tags:
- Info
summary: Get current compute insights in JSON format
summary: Get current compute insights in JSON format.
description: |
Note, that this doesn't include any historical data
Note, that this doesn't include any historical data.
operationId: getComputeInsights
responses:
200:
@@ -56,12 +56,12 @@ paths:
/info:
get:
tags:
- "info"
summary: Get info about the compute Pod/VM
- Info
summary: Get info about the compute pod / VM.
description: ""
operationId: getInfo
responses:
"200":
200:
description: Info
content:
application/json:
@@ -72,7 +72,7 @@ paths:
post:
tags:
- Check
summary: Check that we can write new data on this compute
summary: Check that we can write new data on this compute.
description: ""
operationId: checkComputeWritability
responses:
@@ -82,9 +82,57 @@ paths:
text/plain:
schema:
type: string
description: Error text or 'true' if check passed
description: Error text or 'true' if check passed.
example: "true"
/configure:
post:
tags:
- Configure
summary: Request compute node configuration.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later. Currently,
it's also assumed that reconfiguration doesn't require restart.
operationId: configureCompute
requestBody:
description: Configuration request.
required: true
content:
application/json:
schema:
type: object
required:
- spec
properties:
spec:
# XXX: I don't want to explain current spec in the OpenAPI format,
# as it could be changed really soon. Consider doing it later.
type: object
responses:
200:
description: Compute configuration finished.
content:
application/json:
schema:
$ref: "#/components/schemas/ComputeState"
400:
description: Provided spec is invalid.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
412:
description: |
It's not possible to do live-configuration of the compute.
It's either in the wrong state, or compute doesn't use pull
mode of configuration.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:
JWT:
@@ -95,7 +143,7 @@ components:
schemas:
ComputeMetrics:
type: object
description: Compute startup metrics
description: Compute startup metrics.
required:
- sync_safekeepers_ms
- basebackup_ms
@@ -113,7 +161,7 @@ components:
Info:
type: object
description: Information about VM/Pod
description: Information about VM/Pod.
required:
- num_cpus
properties:
@@ -130,17 +178,26 @@ components:
$ref: '#/components/schemas/ComputeStatus'
last_active:
type: string
description: The last detected compute activity timestamp in UTC and RFC3339 format
description: The last detected compute activity timestamp in UTC and RFC3339 format.
example: "2022-10-12T07:20:50.52Z"
error:
type: string
description: Text of the error during compute startup, if any
description: Text of the error during compute startup, if any.
example: ""
tenant:
type: string
description: Identifier of the current tenant served by compute node, if any.
example: c9269c359e9a199fad1ea0981246a78f
timeline:
type: string
description: Identifier of the current timeline served by compute node, if any.
example: ece7de74d4b8cbe5433a68ce4d1b97b4
ComputeInsights:
type: object
properties:
pg_stat_statements:
description: Contains raw output from pg_stat_statements in JSON format
description: Contains raw output from pg_stat_statements in JSON format.
type: array
items:
type: object
@@ -151,6 +208,19 @@ components:
- init
- failed
- running
example: running
#
# Errors
#
GenericError:
type: object
required:
- error
properties:
error:
type: string
security:
- JWT: []

View File

@@ -4,6 +4,7 @@
//!
pub mod checker;
pub mod config;
pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;

View File

@@ -46,7 +46,9 @@ fn watch_compute_activity(compute: &ComputeNode) {
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
&[],
);
let mut last_active = compute.state.read().unwrap().last_active;
// let mut last_active = compute.state.read().unwrap().last_active;
let (state, _) = &compute.state;
let mut last_active = state.lock().unwrap().last_active;
if let Ok(backs) = backends {
let mut idle_backs: Vec<DateTime<Utc>> = vec![];
@@ -87,7 +89,8 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
// Update the last activity in the shared state if we got a more recent one.
let mut state = compute.state.write().unwrap();
let (state, _) = &compute.state;
let mut state = state.lock().unwrap();
if last_active > state.last_active {
state.last_active = last_active;
debug!("set the last compute activity time to: {}", last_active);

View File

@@ -17,7 +17,7 @@ const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // mil
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
@@ -26,7 +26,7 @@ pub struct Role {
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
@@ -36,7 +36,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,

View File

@@ -8,14 +8,13 @@ use postgres::{Client, NoTls};
use serde::Deserialize;
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::compute::ComputeNode;
use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug, Default)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
@@ -31,7 +30,7 @@ pub struct ComputeSpec {
/// Cluster state seen from the perspective of the external tools
/// like Rails web console.
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug, Default)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
@@ -47,13 +46,36 @@ pub struct Cluster {
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
info!("getting spec from control plane: {}", cp_uri);
// TODO: check the response. We should distinguish cases when it's
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
Ok(spec)
}
/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
/// - Update `pg_hba.conf` to allow external connections.
@@ -226,8 +248,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Reassign all dependent objects and delete requested roles.
#[instrument(skip_all)]
pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> {
if let Some(ops) = &node.spec.delta_operations {
pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
if let Some(ops) = &spec.delta_operations {
// First, reassign all dependent objects to db owners.
info!("reassigning dependent objects of to-be-deleted roles");
@@ -244,7 +266,7 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<
// Check that role is still present in Postgres, as this could be a
// restart with the same spec after role deletion.
if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) {
reassign_owned_objects(node, &op.name)?;
reassign_owned_objects(spec, connstr, &op.name)?;
}
}
@@ -268,10 +290,10 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
for db in &node.spec.cluster.databases {
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.cluster.databases {
if db.owner != *role_name {
let mut conf = Config::from_str(node.connstr.as_str())?;
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut client = conf.connect(NoTls)?;
@@ -416,9 +438,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
let spec = &node.spec;
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
@@ -450,8 +470,8 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.
for db in &node.spec.cluster.databases {
let mut conf = Config::from_str(node.connstr.as_str())?;
for db in &spec.cluster.databases {
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;