diff --git a/Cargo.lock b/Cargo.lock index 4590e76014..c5b64b235a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,18 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "compute_api" +version = "0.1.0" +dependencies = [ + "anyhow", + "chrono", + "serde", + "serde_json", + "serde_with", + "workspace_hack", +] + [[package]] name = "compute_tools" version = "0.1.0" @@ -848,6 +860,7 @@ dependencies = [ "anyhow", "chrono", "clap 4.1.4", + "compute_api", "futures", "hyper", "notify", diff --git a/Cargo.toml b/Cargo.toml index 09cc150606..d563324c86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,6 +132,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6 heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending ## Local libraries +compute_api = { version = "0.1", path = "./libs/compute_api/" } consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 59433535f1..f315d2b7d9 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -27,4 +27,5 @@ tracing-subscriber.workspace = true tracing-utils.workspace = true url.workspace = true +compute_api.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index f29a576413..3d95d1cd3e 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -41,15 +41,17 @@ use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; use tracing::{error, info}; +use url::Url; -use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus}; +use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus}; +use compute_api::spec::ComputeSpec; + +use compute_tools::compute::ComputeNode; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::pg_helpers::*; -use compute_tools::spec::*; -use url::Url; fn main() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; @@ -145,8 +147,10 @@ fn main() -> Result<()> { .find("neon.timeline_id") .expect("tenant id should be provided"); + let now = Utc::now(); + let compute_state = ComputeNode { - start_time: Utc::now(), + start_time: now, connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, pgdata: pgdata.to_string(), pgbin: pgbin.to_string(), @@ -155,8 +159,12 @@ fn main() -> Result<()> { timeline, pageserver_connstr, storage_auth_token, - metrics: ComputeMetrics::default(), - state: RwLock::new(ComputeState::new()), + metrics: RwLock::new(ComputeMetrics::default()), + state: RwLock::new(ComputeState { + status: ComputeStatus::Init, + last_active: now, + error: None, + }), }; let compute = Arc::new(compute_state); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 00d1e234ab..80a6aac568 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -19,16 +19,17 @@ use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; -use serde::{Serialize, Serializer}; use tokio_postgres; use tracing::{info, instrument, warn}; +use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus}; +use compute_api::spec::ComputeSpec; + use crate::checker::create_writability_check_data; use crate::config; use crate::pg_helpers::*; @@ -46,62 +47,13 @@ pub struct ComputeNode { pub timeline: String, pub pageserver_connstr: String, pub storage_auth_token: Option, - pub metrics: ComputeMetrics, + pub metrics: RwLock, /// Volatile part of the `ComputeNode` so should be used under `RwLock` /// to allow HTTP API server to serve status requests, while configuration /// is in progress. pub state: RwLock, } -fn rfc3339_serialize(x: &DateTime, s: S) -> Result -where - S: Serializer, -{ - x.to_rfc3339().serialize(s) -} - -#[derive(Serialize)] -#[serde(rename_all = "snake_case")] -pub struct ComputeState { - pub status: ComputeStatus, - /// Timestamp of the last Postgres activity - #[serde(serialize_with = "rfc3339_serialize")] - pub last_active: DateTime, - pub error: Option, -} - -impl ComputeState { - pub fn new() -> Self { - Self { - status: ComputeStatus::Init, - last_active: Utc::now(), - error: None, - } - } -} - -impl Default for ComputeState { - fn default() -> Self { - Self::new() - } -} - -#[derive(Serialize, Clone, Copy, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -pub enum ComputeStatus { - Init, - Running, - Failed, -} - -#[derive(Default, Serialize)] -pub struct ComputeMetrics { - pub sync_safekeepers_ms: AtomicU64, - pub basebackup_ms: AtomicU64, - pub config_ms: AtomicU64, - pub total_startup_ms: AtomicU64, -} - impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { self.state.write().unwrap().status = status; @@ -155,15 +107,11 @@ impl ComputeNode { ar.set_ignore_zeros(true); ar.unpack(&self.pgdata)?; - self.metrics.basebackup_ms.store( - Utc::now() - .signed_duration_since(start_time) - .to_std() - .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); - + self.metrics.write().unwrap().basebackup_ms = Utc::now() + .signed_duration_since(start_time) + .to_std() + .unwrap() + .as_millis() as u64; Ok(()) } @@ -201,14 +149,11 @@ impl ComputeNode { ); } - self.metrics.sync_safekeepers_ms.store( - Utc::now() - .signed_duration_since(start_time) - .to_std() - .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); + self.metrics.write().unwrap().sync_safekeepers_ms = Utc::now() + .signed_duration_since(start_time) + .to_std() + .unwrap() + .as_millis() as u64; let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim()); @@ -340,23 +285,19 @@ impl ComputeNode { self.apply_config()?; let startup_end_time = Utc::now(); - self.metrics.config_ms.store( - startup_end_time + { + let mut metrics = self.metrics.write().unwrap(); + metrics.config_ms = startup_end_time .signed_duration_since(start_time) .to_std() .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); - self.metrics.total_startup_ms.store( - startup_end_time + .as_millis() as u64; + metrics.total_startup_ms = startup_end_time .signed_duration_since(self.start_time) .to_std() .unwrap() - .as_millis() as u64, - Ordering::Relaxed, - ); - + .as_millis() as u64; + } self.set_status(ComputeStatus::Running); Ok(pg) diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 6cbd0e3d4c..d25eb9b2fc 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -6,7 +6,7 @@ use std::path::Path; use anyhow::Result; use crate::pg_helpers::PgOptionsSerialize; -use crate::spec::ComputeSpec; +use compute_api::spec::ComputeSpec; /// Check that `line` is inside a text file and put it there if it is not. /// Create file if it doesn't exist. diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 199e0f3bd0..9d5b0d7a53 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -31,7 +31,8 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving /metrics.json GET request"); - Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap())) + let metrics = compute.metrics.read().unwrap(); + Response::new(Body::from(serde_json::to_string(&*metrics).unwrap())) } // Collect Postgres current usage insights diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 01b192b2de..bb787d0506 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -10,43 +10,12 @@ use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use notify::{RecursiveMode, Watcher}; use postgres::{Client, Transaction}; -use serde::Deserialize; use tracing::{debug, instrument}; +use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role}; + const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds -/// Rust representation of Postgres role info with only those fields -/// that matter for us. -#[derive(Clone, Deserialize)] -pub struct Role { - pub name: PgIdent, - pub encrypted_password: Option, - pub options: GenericOptions, -} - -/// Rust representation of Postgres database info with only those fields -/// that matter for us. -#[derive(Clone, Deserialize)] -pub struct Database { - pub name: PgIdent, - pub owner: PgIdent, - pub options: GenericOptions, -} - -/// Common type representing both SQL statement params with or without value, -/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config -/// options like `wal_level = logical`. -#[derive(Clone, Deserialize)] -pub struct GenericOption { - pub name: String, - pub value: Option, - pub vartype: String, -} - -/// Optional collection of `GenericOption`'s. Type alias allows us to -/// declare a `trait` on it. -pub type GenericOptions = Option>; - /// Escape a string for including it in a SQL literal fn escape_literal(s: &str) -> String { s.replace('\'', "''").replace('\\', "\\\\") @@ -58,9 +27,14 @@ fn escape_conf_value(s: &str) -> String { s.replace('\'', "''").replace('\\', "\\\\") } -impl GenericOption { +trait GenericOptionExt { + fn to_pg_option(&self) -> String; + fn to_pg_setting(&self) -> String; +} + +impl GenericOptionExt for GenericOption { /// Represent `GenericOption` as SQL statement parameter. - pub fn to_pg_option(&self) -> String { + fn to_pg_option(&self) -> String { if let Some(val) = &self.value { match self.vartype.as_ref() { "string" => format!("{} '{}'", self.name, escape_literal(val)), @@ -72,7 +46,7 @@ impl GenericOption { } /// Represent `GenericOption` as configuration option. - pub fn to_pg_setting(&self) -> String { + fn to_pg_setting(&self) -> String { if let Some(val) = &self.value { match self.vartype.as_ref() { "string" => format!("{} = '{}'", self.name, escape_conf_value(val)), @@ -131,10 +105,14 @@ impl GenericOptionsSearch for GenericOptions { } } -impl Role { +pub trait RoleExt { + fn to_pg_options(&self) -> String; +} + +impl RoleExt for Role { /// Serialize a list of role parameters into a Postgres-acceptable /// string of arguments. - pub fn to_pg_options(&self) -> String { + fn to_pg_options(&self) -> String { // XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane. // For now, we do not use generic `options` for roles. Once used, add // `self.options.as_pg_options()` somewhere here. @@ -159,21 +137,17 @@ impl Role { } } -impl Database { - pub fn new(name: PgIdent, owner: PgIdent) -> Self { - Self { - name, - owner, - options: None, - } - } +pub trait DatabaseExt { + fn to_pg_options(&self) -> String; +} +impl DatabaseExt for Database { /// Serialize a list of database parameters into a Postgres-acceptable /// string of arguments. /// NB: `TEMPLATE` is actually also an identifier, but so far we only need /// to use `template0` and `template1`, so it is not a problem. Yet in the future /// it may require a proper quoting too. - pub fn to_pg_options(&self) -> String { + fn to_pg_options(&self) -> String { let mut params: String = self.options.as_pg_options(); write!(params, " OWNER {}", &self.owner.pg_quote()) .expect("String is documented to not to error during write operations"); @@ -182,10 +156,6 @@ impl Database { } } -/// String type alias representing Postgres identifier and -/// intended to be used for DB / role names. -pub type PgIdent = String; - /// Generic trait used to provide quoting / encoding for strings used in the /// Postgres SQL queries and DATABASE_URL. pub trait Escaping { @@ -226,7 +196,11 @@ pub fn get_existing_dbs(client: &mut Client) -> Result> { &[], )? .iter() - .map(|row| Database::new(row.get("datname"), row.get("owner"))) + .map(|row| Database { + name: row.get("datname"), + owner: row.get("owner"), + options: None, + }) .collect(); Ok(postgres_dbs) diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 9694ba9a88..9c6825994d 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,11 +1,9 @@ -use std::collections::HashMap; use std::path::Path; use std::str::FromStr; use anyhow::Result; use postgres::config::Config; use postgres::{Client, NoTls}; -use serde::Deserialize; use tracing::{info, info_span, instrument, span_enabled, warn, Level}; use crate::compute::ComputeNode; @@ -13,46 +11,7 @@ use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; -/// Cluster spec or configuration represented as an optional number of -/// delta operations + final cluster state description. -#[derive(Clone, Deserialize)] -pub struct ComputeSpec { - pub format_version: f32, - pub timestamp: String, - pub operation_uuid: Option, - /// Expected cluster state at the end of transition process. - pub cluster: Cluster, - pub delta_operations: Option>, - - pub storage_auth_token: Option, - - pub startup_tracing_context: Option>, -} - -/// Cluster state seen from the perspective of the external tools -/// like Rails web console. -#[derive(Clone, Deserialize)] -pub struct Cluster { - pub cluster_id: String, - pub name: String, - pub state: Option, - pub roles: Vec, - pub databases: Vec, - pub settings: GenericOptions, -} - -/// Single cluster state changing operation that could not be represented as -/// a static `Cluster` structure. For example: -/// - DROP DATABASE -/// - DROP ROLE -/// - ALTER ROLE name RENAME TO new_name -/// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Deserialize)] -pub struct DeltaOp { - pub action: String, - pub name: PgIdent, - pub new_name: Option, -} +use compute_api::spec::{ComputeSpec, Database, PgIdent, Role}; /// It takes cluster specification and does the following: /// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file. diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index f48211f7ed..a63ee038c7 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -1,14 +1,13 @@ #[cfg(test)] mod pg_helpers_tests { - use std::fs::File; + use compute_api::spec::{ComputeSpec, GenericOption, GenericOptions, PgIdent}; use compute_tools::pg_helpers::*; - use compute_tools::spec::ComputeSpec; #[test] fn params_serialize() { - let file = File::open("tests/cluster_spec.json").unwrap(); + let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap(); let spec: ComputeSpec = serde_json::from_reader(file).unwrap(); assert_eq!( @@ -23,7 +22,7 @@ mod pg_helpers_tests { #[test] fn settings_serialize() { - let file = File::open("tests/cluster_spec.json").unwrap(); + let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap(); let spec: ComputeSpec = serde_json::from_reader(file).unwrap(); assert_eq!( diff --git a/libs/compute_api/Cargo.toml b/libs/compute_api/Cargo.toml new file mode 100644 index 0000000000..533a091207 --- /dev/null +++ b/libs/compute_api/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "compute_api" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +chrono.workspace = true +serde.workspace = true +serde_with.workspace = true +serde_json.workspace = true + +workspace_hack.workspace = true diff --git a/libs/compute_api/src/lib.rs b/libs/compute_api/src/lib.rs new file mode 100644 index 0000000000..911fc5c4ed --- /dev/null +++ b/libs/compute_api/src/lib.rs @@ -0,0 +1,2 @@ +pub mod models; +pub mod spec; diff --git a/libs/compute_api/src/models.rs b/libs/compute_api/src/models.rs new file mode 100644 index 0000000000..9d2575e7fd --- /dev/null +++ b/libs/compute_api/src/models.rs @@ -0,0 +1,39 @@ +//! Structs representing the JSON formats used in the compute_ctl's HTTP API. +use chrono::{DateTime, Utc}; +use serde::{Serialize, Serializer}; + +/// Response of the /status API +/// +#[derive(Serialize)] +#[serde(rename_all = "snake_case")] +pub struct ComputeState { + pub status: ComputeStatus, + /// Timestamp of the last Postgres activity + #[serde(serialize_with = "rfc3339_serialize")] + pub last_active: DateTime, + pub error: Option, +} + +#[derive(Serialize, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ComputeStatus { + Init, + Running, + Failed, +} + +fn rfc3339_serialize(x: &DateTime, s: S) -> Result +where + S: Serializer, +{ + x.to_rfc3339().serialize(s) +} + +/// Response of the /metrics.json API +#[derive(Clone, Default, Serialize)] +pub struct ComputeMetrics { + pub sync_safekeepers_ms: u64, + pub basebackup_ms: u64, + pub config_ms: u64, + pub total_startup_ms: u64, +} diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs new file mode 100644 index 0000000000..7b67806dd8 --- /dev/null +++ b/libs/compute_api/src/spec.rs @@ -0,0 +1,94 @@ +//! `ComputeSpec` represents the contents of the spec.json file. +//! +//! The spec.json file is used to pass information to 'compute_ctl'. It contains +//! all the information needed to start up the right version of PostgreSQL, +//! and connect it to the storage nodes. +use serde::Deserialize; +use std::collections::HashMap; + +/// String type alias representing Postgres identifier and +/// intended to be used for DB / role names. +pub type PgIdent = String; + +/// Cluster spec or configuration represented as an optional number of +/// delta operations + final cluster state description. +#[derive(Clone, Deserialize)] +pub struct ComputeSpec { + pub format_version: f32, + pub timestamp: String, + pub operation_uuid: Option, + /// Expected cluster state at the end of transition process. + pub cluster: Cluster, + pub delta_operations: Option>, + + pub storage_auth_token: Option, + + pub startup_tracing_context: Option>, +} + +#[derive(Clone, Deserialize)] +pub struct Cluster { + pub cluster_id: String, + pub name: String, + pub state: Option, + pub roles: Vec, + pub databases: Vec, + pub settings: GenericOptions, +} + +/// Single cluster state changing operation that could not be represented as +/// a static `Cluster` structure. For example: +/// - DROP DATABASE +/// - DROP ROLE +/// - ALTER ROLE name RENAME TO new_name +/// - ALTER DATABASE name RENAME TO new_name +#[derive(Clone, Deserialize)] +pub struct DeltaOp { + pub action: String, + pub name: PgIdent, + pub new_name: Option, +} + +/// Rust representation of Postgres role info with only those fields +/// that matter for us. +#[derive(Clone, Deserialize)] +pub struct Role { + pub name: PgIdent, + pub encrypted_password: Option, + pub options: GenericOptions, +} + +/// Rust representation of Postgres database info with only those fields +/// that matter for us. +#[derive(Clone, Deserialize)] +pub struct Database { + pub name: PgIdent, + pub owner: PgIdent, + pub options: GenericOptions, +} + +/// Common type representing both SQL statement params with or without value, +/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config +/// options like `wal_level = logical`. +#[derive(Clone, Deserialize)] +pub struct GenericOption { + pub name: String, + pub value: Option, + pub vartype: String, +} + +/// Optional collection of `GenericOption`'s. Type alias allows us to +/// declare a `trait` on it. +pub type GenericOptions = Option>; + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + + #[test] + fn parse_spec_file() { + let file = File::open("tests/cluster_spec.json").unwrap(); + let _spec: ComputeSpec = serde_json::from_reader(file).unwrap(); + } +} diff --git a/compute_tools/tests/cluster_spec.json b/libs/compute_api/tests/cluster_spec.json similarity index 100% rename from compute_tools/tests/cluster_spec.json rename to libs/compute_api/tests/cluster_spec.json