mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Make read-only replicas explicit in compute spec (#4136)
This builds on top of PR #4058, and supersedes #4018
This commit is contained in:
committed by
GitHub
parent
7dd9553bbb
commit
b627fa71e4
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1032,6 +1032,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -1105,6 +1106,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"clap 4.2.2",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"git-version",
|
||||
"nix",
|
||||
"once_cell",
|
||||
|
||||
@@ -30,7 +30,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
use crate::config;
|
||||
use crate::pg_helpers::*;
|
||||
@@ -249,38 +249,10 @@ impl ComputeNode {
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip(self, compute_state))]
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
#[derive(Clone)]
|
||||
enum Replication {
|
||||
Primary,
|
||||
Static { lsn: Lsn },
|
||||
HotStandby,
|
||||
}
|
||||
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
|
||||
if let Some(value) = &option.value {
|
||||
anyhow::ensure!(option.vartype == "bool");
|
||||
matches!(value.as_str(), "on" | "yes" | "true")
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let replication = if hot_replica {
|
||||
Replication::HotStandby
|
||||
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
|
||||
Replication::Static {
|
||||
lsn: Lsn::from_str(&lsn)?,
|
||||
}
|
||||
} else {
|
||||
Replication::Primary
|
||||
};
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||
@@ -288,8 +260,8 @@ impl ComputeNode {
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
// cannot sync safekeepers.
|
||||
let lsn = match &replication {
|
||||
Replication::Primary => {
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
@@ -297,11 +269,11 @@ impl ComputeNode {
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
Replication::Static { lsn } => {
|
||||
ComputeMode::Static(lsn) => {
|
||||
info!("Starting read-only node at static LSN {}", lsn);
|
||||
*lsn
|
||||
lsn
|
||||
}
|
||||
Replication::HotStandby => {
|
||||
ComputeMode::Replica => {
|
||||
info!("Initializing standby from latest Pageserver LSN");
|
||||
Lsn(0)
|
||||
}
|
||||
@@ -321,9 +293,9 @@ impl ComputeNode {
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
match &replication {
|
||||
Replication::Primary | Replication::Static { .. } => {}
|
||||
Replication::HotStandby => {
|
||||
match spec.mode {
|
||||
ComputeMode::Primary | ComputeMode::Static(..) => {}
|
||||
ComputeMode::Replica => {
|
||||
add_standby_signal(pgdata_path)?;
|
||||
}
|
||||
}
|
||||
@@ -430,11 +402,13 @@ impl ComputeNode {
|
||||
self.pg_reload_conf(&mut client)?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
if spec.mode == ComputeMode::Primary {
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
drop(client);
|
||||
@@ -467,7 +441,9 @@ impl ComputeNode {
|
||||
|
||||
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
|
||||
|
||||
self.apply_config(&compute_state)?;
|
||||
if spec.spec.mode == ComputeMode::Primary {
|
||||
self.apply_config(&compute_state)?;
|
||||
}
|
||||
|
||||
let startup_end_time = Utc::now();
|
||||
{
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::path::Path;
|
||||
use anyhow::Result;
|
||||
|
||||
use crate::pg_helpers::PgOptionsSerialize;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
/// Check that `line` is inside a text file and put it there if it is not.
|
||||
/// Create file if it doesn't exist.
|
||||
@@ -34,17 +34,25 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
|
||||
// File::create() destroys the file content if it exists.
|
||||
let mut postgres_conf = File::create(path)?;
|
||||
let mut file = File::create(path)?;
|
||||
|
||||
write_auto_managed_block(&mut postgres_conf, &spec.cluster.settings.as_pg_settings())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Write Postgres config block wrapped with generated comment section
|
||||
fn write_auto_managed_block(file: &mut File, buf: &str) -> Result<()> {
|
||||
writeln!(file, "# Managed by compute_ctl: begin")?;
|
||||
writeln!(file, "{}", buf)?;
|
||||
|
||||
write!(file, "{}", &spec.cluster.settings.as_pg_settings())?;
|
||||
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Static(lsn) => {
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
writeln!(file, "recovery_target_lsn='{lsn}'")?;
|
||||
}
|
||||
ComputeMode::Replica => {
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
}
|
||||
}
|
||||
|
||||
writeln!(file, "# Managed by compute_ctl: end")?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -30,4 +30,5 @@ postgres_connection.workspace = true
|
||||
storage_broker.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
compute_api.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -7,8 +7,8 @@
|
||||
//!
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use compute_api::spec::ComputeMode;
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint::ComputeMode;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
|
||||
@@ -22,6 +22,8 @@ use crate::local_env::LocalEnv;
|
||||
use crate::pageserver::PageServerNode;
|
||||
use crate::postgresql_conf::PostgresConf;
|
||||
|
||||
use compute_api::spec::ComputeMode;
|
||||
|
||||
// contents of a endpoint.json file
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
|
||||
@@ -122,26 +124,12 @@ impl ComputeControlPlane {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum ComputeMode {
|
||||
// Regular read-write node
|
||||
Primary,
|
||||
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
|
||||
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
|
||||
// Hot standby; read-only replica.
|
||||
// Future versions may want to distinguish between replicas with hot standby
|
||||
// feedback and other kinds of replication configurations.
|
||||
Replica,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Endpoint {
|
||||
/// used as the directory name
|
||||
name: String,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
|
||||
pub mode: ComputeMode,
|
||||
|
||||
// port and address of the Postgres server
|
||||
|
||||
@@ -11,4 +11,5 @@ serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
utils = { path = "../utils" }
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -3,8 +3,10 @@
|
||||
//! The spec.json file is used to pass information to 'compute_ctl'. It contains
|
||||
//! all the information needed to start up the right version of PostgreSQL,
|
||||
//! and connect it to the storage nodes.
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::collections::HashMap;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// String type alias representing Postgres identifier and
|
||||
/// intended to be used for DB / role names.
|
||||
@@ -12,6 +14,7 @@ pub type PgIdent = String;
|
||||
|
||||
/// Cluster spec or configuration represented as an optional number of
|
||||
/// delta operations + final cluster state description.
|
||||
#[serde_as]
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
pub struct ComputeSpec {
|
||||
pub format_version: f32,
|
||||
@@ -24,11 +27,29 @@ pub struct ComputeSpec {
|
||||
pub cluster: Cluster,
|
||||
pub delta_operations: Option<Vec<DeltaOp>>,
|
||||
|
||||
#[serde(default)]
|
||||
pub mode: ComputeMode,
|
||||
|
||||
pub storage_auth_token: Option<String>,
|
||||
|
||||
pub startup_tracing_context: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeMode {
|
||||
/// A read-write node
|
||||
#[default]
|
||||
Primary,
|
||||
/// A read-only node, pinned at a particular LSN
|
||||
Static(#[serde_as(as = "DisplayFromStr")] Lsn),
|
||||
/// A read-only node that follows the tip of the branch in hot standby mode
|
||||
///
|
||||
/// Future versions may want to distinguish between replicas with hot standby
|
||||
/// feedback and other kinds of replication configurations.
|
||||
Replica,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: String,
|
||||
|
||||
Reference in New Issue
Block a user