Move compute_ctl structs used in HTTP API and spec file to separate crate.

This is in preparation of using compute_ctl to launch postgres nodes
in the neon_local control plane. And seems like a good idea to
separate the public interfaces anyway.

One non-mechanical change here is that we now use a RwLock rather than
atomics to protect the ComputeNode::metrics field. We were not using
atomics for performance but for convenience here, and an RwLock is now
more convenient.
This commit is contained in:
Heikki Linnakangas
2023-04-05 19:49:08 +03:00
parent 957acb51b5
commit 8e06018dae
15 changed files with 232 additions and 186 deletions

13
Cargo.lock generated
View File

@@ -841,6 +841,18 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "compute_api"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"serde",
"serde_json",
"serde_with",
"workspace_hack",
]
[[package]]
name = "compute_tools"
version = "0.1.0"
@@ -848,6 +860,7 @@ dependencies = [
"anyhow",
"chrono",
"clap 4.1.4",
"compute_api",
"futures",
"hyper",
"notify",

View File

@@ -132,6 +132,7 @@ tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df6
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }

View File

@@ -27,4 +27,5 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
url.workspace = true
compute_api.workspace = true
workspace_hack.workspace = true

View File

@@ -41,15 +41,17 @@ use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info};
use url::Url;
use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus};
use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};
use compute_api::spec::ComputeSpec;
use compute_tools::compute::ComputeNode;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
use url::Url;
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
@@ -145,8 +147,10 @@ fn main() -> Result<()> {
.find("neon.timeline_id")
.expect("tenant id should be provided");
let now = Utc::now();
let compute_state = ComputeNode {
start_time: Utc::now(),
start_time: now,
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
@@ -155,8 +159,12 @@ fn main() -> Result<()> {
timeline,
pageserver_connstr,
storage_auth_token,
metrics: ComputeMetrics::default(),
state: RwLock::new(ComputeState::new()),
metrics: RwLock::new(ComputeMetrics::default()),
state: RwLock::new(ComputeState {
status: ComputeStatus::Init,
last_active: now,
error: None,
}),
};
let compute = Arc::new(compute_state);

View File

@@ -19,16 +19,17 @@ use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use serde::{Serialize, Serializer};
use tokio_postgres;
use tracing::{info, instrument, warn};
use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
use crate::config;
use crate::pg_helpers::*;
@@ -46,62 +47,13 @@ pub struct ComputeNode {
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub metrics: ComputeMetrics,
pub metrics: RwLock<ComputeMetrics>,
/// Volatile part of the `ComputeNode` so should be used under `RwLock`
/// to allow HTTP API server to serve status requests, while configuration
/// is in progress.
pub state: RwLock<ComputeState>,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}
#[derive(Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
impl ComputeState {
pub fn new() -> Self {
Self {
status: ComputeStatus::Init,
last_active: Utc::now(),
error: None,
}
}
}
impl Default for ComputeState {
fn default() -> Self {
Self::new()
}
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
Init,
Running,
Failed,
}
#[derive(Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: AtomicU64,
pub basebackup_ms: AtomicU64,
pub config_ms: AtomicU64,
pub total_startup_ms: AtomicU64,
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
self.state.write().unwrap().status = status;
@@ -155,15 +107,11 @@ impl ComputeNode {
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;
self.metrics.basebackup_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.write().unwrap().basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
Ok(())
}
@@ -201,14 +149,11 @@ impl ComputeNode {
);
}
self.metrics.sync_safekeepers_ms.store(
Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.write().unwrap().sync_safekeepers_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());
@@ -340,23 +285,19 @@ impl ComputeNode {
self.apply_config()?;
let startup_end_time = Utc::now();
self.metrics.config_ms.store(
startup_end_time
{
let mut metrics = self.metrics.write().unwrap();
metrics.config_ms = startup_end_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
self.metrics.total_startup_ms.store(
startup_end_time
.as_millis() as u64;
metrics.total_startup_ms = startup_end_time
.signed_duration_since(self.start_time)
.to_std()
.unwrap()
.as_millis() as u64,
Ordering::Relaxed,
);
.as_millis() as u64;
}
self.set_status(ComputeStatus::Running);
Ok(pg)

View File

@@ -6,7 +6,7 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::PgOptionsSerialize;
use crate::spec::ComputeSpec;
use compute_api::spec::ComputeSpec;
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.

View File

@@ -31,7 +31,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
let metrics = compute.metrics.read().unwrap();
Response::new(Body::from(serde_json::to_string(&*metrics).unwrap()))
}
// Collect Postgres current usage insights

View File

@@ -10,43 +10,12 @@ use std::time::{Duration, Instant};
use anyhow::{bail, Result};
use notify::{RecursiveMode, Watcher};
use postgres::{Client, Transaction};
use serde::Deserialize;
use tracing::{debug, instrument};
use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub options: GenericOptions,
}
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
}
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
pub vartype: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
/// Escape a string for including it in a SQL literal
fn escape_literal(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
@@ -58,9 +27,14 @@ fn escape_conf_value(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}
impl GenericOption {
trait GenericOptionExt {
fn to_pg_option(&self) -> String;
fn to_pg_setting(&self) -> String;
}
impl GenericOptionExt for GenericOption {
/// Represent `GenericOption` as SQL statement parameter.
pub fn to_pg_option(&self) -> String {
fn to_pg_option(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} '{}'", self.name, escape_literal(val)),
@@ -72,7 +46,7 @@ impl GenericOption {
}
/// Represent `GenericOption` as configuration option.
pub fn to_pg_setting(&self) -> String {
fn to_pg_setting(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} = '{}'", self.name, escape_conf_value(val)),
@@ -131,10 +105,14 @@ impl GenericOptionsSearch for GenericOptions {
}
}
impl Role {
pub trait RoleExt {
fn to_pg_options(&self) -> String;
}
impl RoleExt for Role {
/// Serialize a list of role parameters into a Postgres-acceptable
/// string of arguments.
pub fn to_pg_options(&self) -> String {
fn to_pg_options(&self) -> String {
// XXX: consider putting LOGIN as a default option somewhere higher, e.g. in control-plane.
// For now, we do not use generic `options` for roles. Once used, add
// `self.options.as_pg_options()` somewhere here.
@@ -159,21 +137,17 @@ impl Role {
}
}
impl Database {
pub fn new(name: PgIdent, owner: PgIdent) -> Self {
Self {
name,
owner,
options: None,
}
}
pub trait DatabaseExt {
fn to_pg_options(&self) -> String;
}
impl DatabaseExt for Database {
/// Serialize a list of database parameters into a Postgres-acceptable
/// string of arguments.
/// NB: `TEMPLATE` is actually also an identifier, but so far we only need
/// to use `template0` and `template1`, so it is not a problem. Yet in the future
/// it may require a proper quoting too.
pub fn to_pg_options(&self) -> String {
fn to_pg_options(&self) -> String {
let mut params: String = self.options.as_pg_options();
write!(params, " OWNER {}", &self.owner.pg_quote())
.expect("String is documented to not to error during write operations");
@@ -182,10 +156,6 @@ impl Database {
}
}
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
/// Generic trait used to provide quoting / encoding for strings used in the
/// Postgres SQL queries and DATABASE_URL.
pub trait Escaping {
@@ -226,7 +196,11 @@ pub fn get_existing_dbs(client: &mut Client) -> Result<Vec<Database>> {
&[],
)?
.iter()
.map(|row| Database::new(row.get("datname"), row.get("owner")))
.map(|row| Database {
name: row.get("datname"),
owner: row.get("owner"),
options: None,
})
.collect();
Ok(postgres_dbs)

View File

@@ -1,11 +1,9 @@
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use postgres::config::Config;
use postgres::{Client, NoTls};
use serde::Deserialize;
use tracing::{info, info_span, instrument, span_enabled, warn, Level};
use crate::compute::ComputeNode;
@@ -13,46 +11,7 @@ use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
pub storage_auth_token: Option<String>,
pub startup_tracing_context: Option<HashMap<String, String>>,
}
/// Cluster state seen from the perspective of the external tools
/// like Rails web console.
#[derive(Clone, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
pub state: Option<String>,
pub roles: Vec<Role>,
pub databases: Vec<Database>,
pub settings: GenericOptions,
}
/// Single cluster state changing operation that could not be represented as
/// a static `Cluster` structure. For example:
/// - DROP DATABASE
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.

View File

@@ -1,14 +1,13 @@
#[cfg(test)]
mod pg_helpers_tests {
use std::fs::File;
use compute_api::spec::{ComputeSpec, GenericOption, GenericOptions, PgIdent};
use compute_tools::pg_helpers::*;
use compute_tools::spec::ComputeSpec;
#[test]
fn params_serialize() {
let file = File::open("tests/cluster_spec.json").unwrap();
let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
assert_eq!(
@@ -23,7 +22,7 @@ mod pg_helpers_tests {
#[test]
fn settings_serialize() {
let file = File::open("tests/cluster_spec.json").unwrap();
let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap();
let spec: ComputeSpec = serde_json::from_reader(file).unwrap();
assert_eq!(

View File

@@ -0,0 +1,14 @@
[package]
name = "compute_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
anyhow.workspace = true
chrono.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,2 @@
pub mod models;
pub mod spec;

View File

@@ -0,0 +1,39 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use chrono::{DateTime, Utc};
use serde::{Serialize, Serializer};
/// Response of the /status API
///
#[derive(Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
Init,
Running,
Failed,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}
/// Response of the /metrics.json API
#[derive(Clone, Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,
pub config_ms: u64,
pub total_startup_ms: u64,
}

View File

@@ -0,0 +1,94 @@
//! `ComputeSpec` represents the contents of the spec.json file.
//!
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
//! all the information needed to start up the right version of PostgreSQL,
//! and connect it to the storage nodes.
use serde::Deserialize;
use std::collections::HashMap;
/// String type alias representing Postgres identifier and
/// intended to be used for DB / role names.
pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
pub timestamp: String,
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
pub storage_auth_token: Option<String>,
pub startup_tracing_context: Option<HashMap<String, String>>,
}
#[derive(Clone, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,
pub state: Option<String>,
pub roles: Vec<Role>,
pub databases: Vec<Database>,
pub settings: GenericOptions,
}
/// Single cluster state changing operation that could not be represented as
/// a static `Cluster` structure. For example:
/// - DROP DATABASE
/// - DROP ROLE
/// - ALTER ROLE name RENAME TO new_name
/// - ALTER DATABASE name RENAME TO new_name
#[derive(Clone, Deserialize)]
pub struct DeltaOp {
pub action: String,
pub name: PgIdent,
pub new_name: Option<PgIdent>,
}
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub options: GenericOptions,
}
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Deserialize)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
pub options: GenericOptions,
}
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Deserialize)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,
pub vartype: String,
}
/// Optional collection of `GenericOption`'s. Type alias allows us to
/// declare a `trait` on it.
pub type GenericOptions = Option<Vec<GenericOption>>;
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();
let _spec: ComputeSpec = serde_json::from_reader(file).unwrap();
}
}