Compare commits

...

3 Commits

Author SHA1 Message Date
Tristan Partin
bbd646325c Add match arm for unused builtin resource managers
Although we don't currently handle these, they are much different from
an unrecognized resource manager, which the comment in the last match
arm refers to.
2024-09-16 10:47:43 -05:00
Tristan Partin
5e71d8fddc Use pg_upgrade to upgrade projects from one Postgres major version to the next 2024-09-12 12:51:28 +01:00
Tristan Partin
3d07b6a483 Use the async Postgres client for compute_ctl
Necessary for continued development of the pg_upgrade work.
2024-09-12 11:55:39 +01:00
34 changed files with 984 additions and 300 deletions

4
.gitmodules vendored
View File

@@ -5,8 +5,8 @@
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
branch = tristan957/15/pg_upgrade
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon
branch = tristan957/pg_upgrade

2
Cargo.lock generated
View File

@@ -1220,6 +1220,7 @@ dependencies = [
"anyhow",
"async-compression",
"bytes",
"camino",
"cfg-if",
"chrono",
"clap",
@@ -1237,6 +1238,7 @@ dependencies = [
"reqwest 0.12.4",
"rlimit",
"rust-ini",
"scopeguard",
"serde",
"serde_json",
"signal-hook",

View File

@@ -12,6 +12,7 @@ testing = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true
camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
@@ -24,6 +25,7 @@ num_cpus.workspace = true
opentelemetry.workspace = true
postgres.workspace = true
regex.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true

View File

@@ -43,7 +43,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use clap::{Arg, ArgAction};
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
@@ -51,13 +51,12 @@ use tracing::{error, info, warn};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeSpec;
use compute_api::spec::{ComputeMode, ComputeSpec};
use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -70,13 +69,14 @@ use rlimit::{setrlimit, Resource};
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";
fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let (build_tag, clap_args) = init()?;
// enable core dumping for all child processes
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
let (pg_handle, start_pg_result) = {
let (http_handle, (pg_handle, start_pg_result)) = {
// Enter startup tracing context
let _startup_context_guard = startup_context_from_env();
@@ -84,13 +84,61 @@ fn main() -> Result<()> {
let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
let compute = Arc::new(ComputeNode {
connstr: Url::parse(cli_args.connstr).context("cannot parse connstr as a URL")?,
pgdata: cli_args.pgdata.to_string(),
pgroot: cli_args.pgroot.to_string(),
pgversion: cli_args.pgversion.to_string(),
http_port: cli_args.http_port,
live_config_allowed: cli_spec.live_config_allowed,
state: Mutex::new({
let mut state = ComputeState::new();
start_postgres(&clap_args, wait_spec_result)?
if let Some(spec) = cli_spec.spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
info!("new pspec.spec: {:?}", pspec.spec);
state.pspec = Some(pspec);
}
state
}),
state_changed: Condvar::new(),
ext_remote_storage: cli_args.ext_remote_storage.map(|s| s.to_string()),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag: build_tag.clone(),
});
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
// because QEMU will already have its memory allocated from the host, and
// the necessary binaries will already be cached.
if compute.state.lock().unwrap().pspec.is_none() {
compute.prewarm_postgres()?;
}
// Launch http service first, so that we can serve control-plane requests
// while configuration is still in progress.
let http_handle = launch_http_server(cli_args.http_port, &compute)
.expect("cannot launch http endpoint thread");
wait_spec(&compute)?;
(
http_handle,
start_postgres(
compute,
#[cfg(target_os = "linux")]
&clap_args,
cli_args.resize_swap_on_bind,
)
.await?,
)
// Startup is finished, exit the startup tracing span
};
let _ = http_handle.join();
// PostgreSQL is now running, if startup was successful. Wait until it exits.
let wait_pg_result = wait_postgres(pg_handle)?;
@@ -120,11 +168,14 @@ fn init() -> Result<(String, clap::ArgMatches)> {
}
fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
let pgbin_default = "postgres";
let pgbin = matches
.get_one::<String>("pgbin")
let pgroot = matches
.get_one::<String>("pgroot")
.map(|s| s.as_str())
.unwrap_or(pgbin_default);
.expect("pgroot is required");
let pgversion = matches
.get_one::<String>("pgversion")
.map(|s| s.as_str())
.expect("pgversion is required");
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
@@ -155,7 +206,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
Ok(ProcessCliResult {
connstr,
pgdata,
pgbin,
pgroot,
pgversion,
ext_remote_storage,
http_port,
spec_json,
@@ -167,7 +219,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
struct ProcessCliResult<'clap> {
connstr: &'clap str,
pgdata: &'clap str,
pgbin: &'clap str,
pgroot: &'clap str,
pgversion: &'clap str,
ext_remote_storage: Option<&'clap str>,
http_port: u16,
spec_json: Option<&'clap String>,
@@ -285,61 +338,8 @@ struct CliSpecParams {
live_config_allowed: bool,
}
fn wait_spec(
build_tag: String,
ProcessCliResult {
connstr,
pgdata,
pgbin,
ext_remote_storage,
resize_swap_on_bind,
http_port,
..
}: ProcessCliResult,
CliSpecParams {
spec,
live_config_allowed,
}: CliSpecParams,
) -> Result<WaitSpecResult> {
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
info!("new pspec.spec: {:?}", pspec.spec);
new_state.pspec = Some(pspec);
spec_set = true;
} else {
spec_set = false;
}
let compute_node = ComputeNode {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version(pgbin),
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
ext_download_progress: RwLock::new(HashMap::new()),
build_tag,
};
let compute = Arc::new(compute_node);
// If this is a pooled VM, prewarm before starting HTTP server and becoming
// available for binding. Prewarming helps Postgres start quicker later,
// because QEMU will already have its memory allocated from the host, and
// the necessary binaries will already be cached.
if !spec_set {
compute.prewarm_postgres()?;
}
// Launch http service first, so that we can serve control-plane requests
// while configuration is still in progress.
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
if !spec_set {
fn wait_spec(compute: &Arc<ComputeNode>) -> Result<()> {
if compute.state.lock().unwrap().pspec.is_none() {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -369,28 +369,13 @@ fn wait_spec(
launch_lsn_lease_bg_task_for_static(&compute);
Ok(WaitSpecResult {
compute,
http_port,
resize_swap_on_bind,
})
Ok(())
}
struct WaitSpecResult {
async fn start_postgres(
compute: Arc<ComputeNode>,
// passed through from ProcessCliResult
http_port: u16,
#[cfg(target_os = "linux")] matches: &clap::ArgMatches,
resize_swap_on_bind: bool,
}
fn start_postgres(
// need to allow unused because `matches` is only used if target_os = "linux"
#[allow(unused_variables)] matches: &clap::ArgMatches,
WaitSpecResult {
compute,
http_port,
resize_swap_on_bind,
}: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
@@ -441,27 +426,32 @@ fn start_postgres(
}
}
let extension_server_port: u16 = http_port;
compute.prepare_compute().await?;
// Start Postgres
let mut pg = None;
if !prestartup_failed {
pg = match compute.start_compute(extension_server_port) {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:#}", err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
// Notify others that Postgres failed to start. In case of configuring the
// empty compute, it's likely that API handler is still waiting for compute
// state change. With this we will notify it that compute is in Failed state,
// so control plane will know about it earlier and record proper error instead
// of timeout.
compute.state_changed.notify_all();
drop(state); // unlock
delay_exit = true;
None
match compute.get_mode() {
ComputeMode::Upgrade => {}
_ => {
// Start Postgres
pg = match compute.start_compute() {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:#}", err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
// Notify others that Postgres failed to start. In case of configuring the
// empty compute, it's likely that API handler is still waiting for compute
// state change. With this we will notify it that compute is in Failed state,
// so control plane will know about it earlier and record proper error instead
// of timeout.
compute.state_changed.notify_all();
drop(state); // unlock
delay_exit = true;
None
}
}
}
};
} else {
@@ -686,11 +676,17 @@ fn cli() -> clap::Command {
.required(true),
)
.arg(
Arg::new("pgbin")
.short('b')
.long("pgbin")
.default_value("postgres")
.value_name("POSTGRES_PATH"),
Arg::new("pgroot")
.short('R')
.long("pgroot")
.value_name("POSTGRES_ROOT")
.required(true),
)
.arg(
Arg::new("pgversion")
.long("pgversion")
.value_name("POSTGRES_VERSION")
.required(true),
)
.arg(
Arg::new("spec")
@@ -750,6 +746,11 @@ fn cli() -> clap::Command {
.long("resize-swap-on-bind")
.action(clap::ArgAction::SetTrue),
)
.arg(
Arg::new("no-postgres")
.long("no-postgres")
.action(ArgAction::SetTrue),
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers

View File

@@ -4,7 +4,7 @@ use compute_api::{
};
use futures::Stream;
use postgres::{Client, NoTls};
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
use std::{process::Stdio, result::Result, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
@@ -55,9 +55,7 @@ pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
let pgdump = compute.get_my_pg_binary("pg_dump");
let mut connstr = compute.connstr.clone();
connstr.set_path(dbname);
let mut cmd = Command::new(pgdump)

View File

@@ -1,9 +1,9 @@
use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::BufRead;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::path::Path;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
@@ -13,6 +13,8 @@ use std::thread;
use std::time::Instant;
use anyhow::{Context, Result};
use bytes::{Buf, BufMut};
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
@@ -20,13 +22,15 @@ use futures::StreamExt;
use nix::unistd::Pid;
use postgres::error::SqlState;
use postgres::{Client, NoTls};
use tokio;
use tokio_postgres;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::zstd::create_zst_tarball;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use nix::sys::signal::{kill, Signal};
@@ -47,8 +51,9 @@ pub struct ComputeNode {
// Url type maintains proper escaping
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub pgroot: String,
pub pgversion: String,
pub http_port: u16,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
@@ -309,6 +314,13 @@ impl ComputeNode {
self.state.lock().unwrap().status
}
/// Get the mode of this compute.
pub fn get_mode(&self) -> ComputeMode {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().spec.mode
}
// Remove `pgdata` directory and create it again with right permissions.
fn create_pgdata(&self) -> Result<()> {
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
@@ -320,15 +332,44 @@ impl ComputeNode {
Ok(())
}
/// Get path pointing to requested binary directory.
pub fn get_pg_bindir(&self, version: &str) -> PathBuf {
Path::new(&self.pgroot)
.join(format!("v{}", version))
.join("bin")
}
/// Get path to requested Postgres binary.
pub fn get_pg_binary(&self, version: &str, binary: &str) -> String {
self.get_pg_bindir(version)
.join(binary)
.into_os_string()
.into_string()
.expect(&format!(
"path to {}-{} cannot be represented as UTF-8",
binary, version
))
}
/// Get path to Postgres binary directory of this compute.
pub fn get_my_pg_bindir(&self) -> PathBuf {
self.get_pg_bindir(&self.pgversion)
}
/// Get path to specified Postgres binary of this compute.
pub fn get_my_pg_binary(&self, binary: &str) -> String {
self.get_pg_binary(&self.pgversion, binary)
}
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip_all, fields(%lsn))]
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
async fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Instant::now();
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
let mut config = tokio_postgres::Config::from_str(shard0_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
@@ -340,7 +381,12 @@ impl ComputeNode {
}
// Connect to pageserver
let mut client = config.connect(NoTls)?;
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
let basebackup_cmd = match lsn {
@@ -352,8 +398,18 @@ impl ComputeNode {
),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
let mut measured_reader = MeasuredReader::new(copyreader);
let mut copystream = std::pin::pin!(client.copy_out(basebackup_cmd.as_str()).await?);
let mut buf = bytes::BytesMut::with_capacity(1024);
while let Some(i) = copystream.next().await {
match i {
Ok(b) => {
buf.put(b);
}
Err(e) => return Err(anyhow::anyhow!(e)),
}
}
let basebackup_bytes = buf.len();
// Check the magic number to see if it's a gzip or not. Even though
// we might explicitly ask for gzip, an old pageserver with no implementation
@@ -366,11 +422,9 @@ impl ComputeNode {
// and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
// we send the "global" directory first from the pageserver, so it definitely
// won't be recognized as gzip.
let mut bufreader = std::io::BufReader::new(&mut measured_reader);
let gzip = {
let peek = bufreader.fill_buf().unwrap();
peek[0] == 0x1f && peek[1] == 0x8b
};
let gzip = buf[0] == 0x1f && buf[1] == 0x8b;
let mut bufreader = buf.reader();
// Read the archive directly from the `CopyOutReader`
//
@@ -390,14 +444,14 @@ impl ComputeNode {
// Report metrics
let mut state = self.state.lock().unwrap();
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
state.metrics.basebackup_bytes = basebackup_bytes as u64;
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
Ok(())
}
// Gets the basebackup in a retry loop
#[instrument(skip_all, fields(%lsn))]
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
pub async fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let mut retry_period_ms = 500.0;
let mut attempts = 0;
const DEFAULT_ATTEMPTS: u16 = 10;
@@ -410,7 +464,7 @@ impl ComputeNode {
#[cfg(not(feature = "testing"))]
let max_attempts = DEFAULT_ATTEMPTS;
loop {
let result = self.try_get_basebackup(compute_state, lsn);
let result = self.try_get_basebackup(compute_state, lsn).await;
match result {
Ok(_) => {
return result;
@@ -431,10 +485,14 @@ impl ComputeNode {
}
}
pub async fn check_safekeepers_synced_async(
// Fast path for sync_safekeepers. If they're already synced we get the lsn
// in one roundtrip. If not, we should do a full sync_safekeepers.
pub async fn check_safekeepers_synced(
&self,
compute_state: &ComputeState,
) -> Result<Option<Lsn>> {
let start_time = Utc::now();
// Construct a connection config for each safekeeper
let pspec: ParsedSpec = compute_state
.pspec
@@ -503,20 +561,7 @@ impl ComputeNode {
return Ok(None);
}
Ok(check_if_synced(responses))
}
// Fast path for sync_safekeepers. If they're already synced we get the lsn
// in one roundtrip. If not, we should do a full sync_safekeepers.
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
let start_time = Utc::now();
// Run actual work with new tokio runtime
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
let lsn = check_if_synced(responses);
// Record runtime
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
@@ -524,7 +569,8 @@ impl ComputeNode {
.to_std()
.unwrap()
.as_millis() as u64;
result
Ok(lsn)
}
// Run `postgres` in a special mode with `--sync-safekeepers` argument
@@ -533,7 +579,8 @@ impl ComputeNode {
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let mut sync_handle = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut sync_handle = maybe_cgexec(&postgres)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &storage_auth_token {
@@ -589,11 +636,7 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
pub fn prepare_pgdata(
&self,
compute_state: &ComputeState,
extension_server_port: u16,
) -> Result<()> {
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
@@ -603,7 +646,7 @@ impl ComputeNode {
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
&pspec.spec,
Some(extension_server_port),
Some(self.http_port),
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -612,7 +655,8 @@ impl ComputeNode {
let lsn = match spec.mode {
ComputeMode::Primary => {
info!("checking if safekeepers are synced");
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await
{
lsn
} else {
info!("starting safekeepers syncing");
@@ -630,18 +674,24 @@ impl ComputeNode {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
ComputeMode::Upgrade => {
info!("Starting upgrade node at latest Pageserver LSN");
Lsn(0)
}
};
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
})?;
self.get_basebackup(compute_state, lsn)
.await
.with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
})?;
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
@@ -689,7 +739,7 @@ impl ComputeNode {
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary | ComputeMode::Upgrade => {}
ComputeMode::Replica | ComputeMode::Static(..) => {
add_standby_signal(pgdata_path)?;
}
@@ -708,7 +758,7 @@ impl ComputeNode {
// Run initdb to completion
info!("running initdb");
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
let initdb_bin = self.get_my_pg_binary("initdb");
Command::new(initdb_bin)
.args(["-D", pgdata])
.output()
@@ -724,7 +774,8 @@ impl ComputeNode {
// Start postgres
info!("starting postgres");
let mut pg = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut pg = maybe_cgexec(&postgres)
.args(["-D", pgdata])
.spawn()
.expect("cannot start postgres process");
@@ -757,7 +808,8 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut pg = maybe_cgexec(&postgres)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
@@ -778,6 +830,152 @@ impl ComputeNode {
Ok((pg, logs_handle))
}
pub async fn upgrade(&self, pg_version: &str) -> Result<()> {
let old_bindir = self.get_my_pg_bindir();
let new_bindir = self.get_pg_bindir(pg_version);
let old_datadir = Utf8Path::new(&self.pgdata);
let parent_dir = old_datadir.parent().unwrap();
let new_datadir = parent_dir.join("new-pgdata");
// Delete the new data directory before attempting, just in case it exists
let _ = std::fs::remove_dir_all(&new_datadir);
// Step 1: Create new cluster
info!(
"Running initdb to start a cluster upgrade from v{} to v{}",
self.pgversion, pg_version
);
let initdb_bin = self.get_pg_binary(pg_version, "initdb");
let mut initdb_cmd = Command::new(&initdb_bin);
initdb_cmd
.args(["--pgdata", new_datadir.as_str()])
.args(["--username", "cloud_admin"])
.args(["--encoding", "utf8"])
.args(["--auth-local", "trust"])
.env_clear()
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());
match initdb_cmd.status() {
Ok(status) => {
if !status.success() {
return Err(anyhow::anyhow!("failed to initialize the new database"));
}
info!("Initialized v{} database", pg_version);
}
Err(_) => {
return Err(anyhow::anyhow!(
"failed to spawn initdb for the new database"
))
}
};
// Step 2: Run pg_upgrade
info!(
"Running pg_upgrade to upgrade from v{} to v{}",
self.pgversion, pg_version
);
let pg_upgrade_bin = self.get_pg_binary(pg_version, "pg_upgrade");
let mut pg_upgrade_cmd = Command::new(&pg_upgrade_bin);
let mut child = pg_upgrade_cmd
.args([
"--old-bindir",
&old_bindir.into_os_string().into_string().unwrap(),
])
.args(["--old-datadir", old_datadir.as_str()])
.args(["--old-options", "-c neon.safekeepers=''"])
.args([
"--new-bindir",
&new_bindir.into_os_string().into_string().unwrap(),
])
.args(["--new-datadir", new_datadir.as_str()])
.args(["--new-options", "-c neon.safekeepers=''"])
.args(["--username", "cloud_admin"])
.arg("--no-transfer")
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
let status = child.wait()?;
if status.success() {
info!("pg_upgrade was successful");
} else {
return Err(anyhow::anyhow!(
"pg_upgrade failed with exit code {}",
status.code().unwrap()
));
}
/* Step 3: Delete the script that pg_upgrade generates, which is created in the current
* working directory
*/
// TODO: We should write a patch for upstream to not generate this file
if cfg!(windows) {
let _ = std::fs::remove_file("delete_old_cluster.bat");
} else {
let _ = std::fs::remove_file("delete_old_cluster.sh");
}
/* Step 4: Re-prepare the pgdata directory to work with the latest basebackup from the
* pageserver
*/
{
let state = self.state.lock().unwrap().clone();
self.prepare_pgdata(&state).await?;
}
// Step 5: Create tarball minus things like pg_dynshm, etc.
info!("Creating tarball of upgraded initdb directory, minus some files");
let initdb_tar_path = parent_dir.join("initdb.tar.zst");
let _ = std::fs::remove_file(&initdb_tar_path);
create_zst_tarball(&new_datadir, &initdb_tar_path).await?;
// Step 6: Write new postgresql.conf file for upgraded initdb
std::fs::copy(
old_datadir.join("postgresql.conf"),
new_datadir.join("postgresql.conf"),
)?;
// Step 7: Write the tarball into the Postgres WAL
info!("Writing {} to WAL", initdb_tar_path.as_str());
let postgres_bin = self.get_my_pg_binary("postgres");
let mut wal_log_cmd = Command::new(&postgres_bin);
child = wal_log_cmd
.args(["--wal-log", initdb_tar_path.as_str()])
.env_clear()
.env("PGDATA", old_datadir.as_str())
.env("NEON_PURPOSE", "upgrade")
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.expect("postgres --wal-log failed to start");
match child.wait() {
Ok(s) => {
if !s.success() {
return Err(anyhow::anyhow!("Could not WAL log upgrade tarball"));
}
}
Err(e) => return Err(e.into()),
};
// Step 8: Sync the safekeepers to push WAL record to Neon
self.sync_safekeepers(None)?;
/* Note that whether any errors occur after this are unimportant. ALWAYS return success
* after this point. The compute will be terminated immediately after the upgrade. Remember
* that this is an upgrade-only compute, and it will not accept connections from users.
*/
Ok(())
}
/// Do post configuration of the already started Postgres. This function spawns a background thread to
/// configure the database after applying the compute spec. Currently, it upgrades the neon extension
/// version. In the future, it may upgrade all 3rd-party extensions.
@@ -895,8 +1093,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
let pgctl_bin = self.get_my_pg_binary("pg_ctl");
Command::new(&pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
@@ -980,43 +1178,25 @@ impl ComputeNode {
Ok(())
}
/// Prepares the compute for Postgres operations, including downloading
/// remote extensions and preparing the pgdata directory.
///
/// The caller must hold the state mutex.
#[instrument(skip_all)]
pub fn start_compute(
&self,
extension_server_port: u16,
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
pub async fn prepare_compute(&self) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let pspec = state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
"preparing compute for project {}, operation {}, tenant {}, timeline {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
pspec.tenant_id,
pspec.timeline_id,
);
// tune pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
// Spawn a thread to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
let _handle = thread::spawn(move || {
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
}
});
}
info!(
"start_compute spec.remote_extensions {:?}",
"prepare_compute spec.remote_extensions {:?}",
pspec.spec.remote_extensions
);
@@ -1024,7 +1204,8 @@ impl ComputeNode {
// remote shared_preload_libraries before postgres start (if any)
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
let postgres_bin = self.get_my_pg_binary("postgres");
extension_server::create_control_files(remote_extensions, &postgres_bin);
let library_load_start_time = Utc::now();
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
@@ -1046,7 +1227,17 @@ impl ComputeNode {
info!("{:?}", remote_ext_metrics);
}
self.prepare_pgdata(&compute_state, extension_server_port)?;
self.prepare_pgdata(&state).await?;
self.set_status(ComputeStatus::Prepared);
Ok(())
}
#[instrument(skip_all)]
pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
@@ -1152,9 +1343,11 @@ impl ComputeNode {
core_path.display()
);
let postgres_bin = self.get_my_pg_binary("postgres");
// Try first with gdb
let backtrace = Command::new("gdb")
.args(["--batch", "-q", "-ex", "bt", &self.pgbin])
.args(["--batch", "-q", "-ex", "bt", &postgres_bin])
.arg(&core_path)
.output();
@@ -1287,11 +1480,12 @@ LIMIT 100",
// then we try to download it here
info!("downloading new extension {ext_archive_name}");
let postgres_bin = self.get_my_pg_binary("postgres");
let download_size = extension_server::download_extension(
&real_ext_name,
&ext_path,
ext_remote_storage,
&self.pgbin,
&postgres_bin,
)
.await
.map_err(DownloadError::Other);

View File

@@ -74,7 +74,7 @@ pub fn write_postgres_conf(
}
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary | ComputeMode::Upgrade => {}
ComputeMode::Static(lsn) => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;

View File

@@ -75,7 +75,6 @@ use anyhow::Result;
use anyhow::{bail, Context};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
use std::path::Path;
@@ -103,34 +102,6 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
.to_string()
}
pub fn get_pg_version(pgbin: &str) -> String {
// pg_config --version returns a (platform specific) human readable string
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
let human_version = get_pg_config("--version", pgbin);
return parse_pg_version(&human_version).to_string();
}
fn parse_pg_version(human_version: &str) -> &str {
// Normal releases have version strings like "PostgreSQL 15.4". But there
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
// configure option, you can tack any string to the version number,
// e.g. "PostgreSQL 15.4foobar".
match Regex::new(r"^PostgreSQL (?<major>\d+).+")
.unwrap()
.captures(human_version)
{
Some(captures) if captures.len() == 2 => match &captures["major"] {
"14" => return "v14",
"15" => return "v15",
"16" => return "v16",
_ => {}
},
_ => {}
}
panic!("Unsuported postgres version {human_version}");
}
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
pub async fn download_extension(
@@ -255,42 +226,3 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
),
}
}
#[cfg(test)]
mod tests {
use super::parse_pg_version;
#[test]
fn test_parse_pg_version() {
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
assert_eq!(
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
"v15"
);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
assert_eq!(
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
"v14"
);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
}
#[test]
#[should_panic]
fn test_parse_pg_unsupported_version() {
parse_pg_version("PostgreSQL 13.14");
}
#[test]
#[should_panic]
fn test_parse_pg_incorrect_version_format() {
parse_pg_version("PostgreSQL 14");
}
}

View File

@@ -10,6 +10,7 @@ use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::requests::UpgradeRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
@@ -137,6 +138,20 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/upgrade") => {
info!("serving /upgrade POST request");
match handle_upgrade_request(req, compute).await {
Ok(_) => Response::builder()
.status(StatusCode::ACCEPTED)
.body(Body::from("Starting upgrade"))
.unwrap(),
Err((e, status)) => {
error!("error handling /upgrade request: {e}");
render_json_error(&format!("{}", e), status)
}
}
}
(&Method::GET, "/dbs_and_roles") => {
info!("serving /dbs_and_roles GET request",);
match get_dbs_and_roles(compute).await {
@@ -397,6 +412,59 @@ async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (Str
Ok(())
}
async fn handle_upgrade_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<(), (anyhow::Error, StatusCode)> {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let body_str = String::from_utf8(body_bytes.to_vec()).unwrap();
let body = match serde_json::from_str::<UpgradeRequest>(&body_str) {
Ok(r) => r,
Err(e) => return Err((Into::into(e), StatusCode::BAD_REQUEST)),
};
// No sense in trying to upgrade to the same version.
let curr_version = compute.pgversion.clone();
let new_version = body.pg_version;
if curr_version == new_version {
return Err((
anyhow::anyhow!("cannot upgrade endpoint to the same version"),
StatusCode::UNPROCESSABLE_ENTITY,
));
}
// Check that we are in the running state before trying to upgrade.
match compute.get_status() {
ComputeStatus::Prepared => (),
ComputeStatus::Upgrading => {
return Err((
anyhow::anyhow!("upgrade already in progress"),
StatusCode::CONFLICT,
));
}
_ => {
return Err((
anyhow::anyhow!("expected compute to be in the prepared state"),
StatusCode::CONFLICT,
));
}
}
compute.set_status(ComputeStatus::Upgrading);
let c = compute.clone();
tokio::spawn(async move {
if let Err(e) = c.upgrade(&new_version).await {
error!("Failed to upgrade database: {}", e);
}
c.set_status(ComputeStatus::Running);
});
Ok(())
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc<ComputeNode>) {

View File

@@ -212,6 +212,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/upgrade:
post:
tags:
- Upgrade
summary: Upgrade project to another Postgres major version.
operationId: upgradePostgres
responses:
202:
description: Upgrade request in progress.
409:
description: Upgrade already in progress.
422:
description: Upgrade request could not be processed.
500:
description: Upgrade request failed.
/terminate:
post:

View File

@@ -798,14 +798,19 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let upgrade_only = sub_args
.get_one::<bool>("upgrade-only")
.copied()
.unwrap_or(false);
let allow_multiple = sub_args.get_flag("allow-multiple");
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
let mode = match (lsn, hot_standby, upgrade_only) {
(Some(lsn), false, false) => ComputeMode::Static(lsn),
(None, true, false) => ComputeMode::Replica,
(None, false, false) => ComputeMode::Primary,
(None, false, true) => ComputeMode::Upgrade,
// Seeing this message means we aren't setting conflicts_with on clap arguments.
_ => anyhow::bail!("Invalid command line invocation"),
};
match (mode, hot_standby) {
@@ -1501,7 +1506,8 @@ fn cli() -> Command {
let lsn_arg = Arg::new("lsn")
.long("lsn")
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
.required(false)
.conflicts_with("hot-standby");
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
@@ -1718,6 +1724,18 @@ fn cli() -> Command {
.arg(hot_standby_arg.clone())
.arg(update_catalog)
.arg(allow_multiple.clone())
.arg(
Arg::new("upgrade-only")
.help("Mark this compute as an upgrade compute")
.long("upgrade-only")
.action(ArgAction::SetTrue)
.conflicts_with_all(&[
"config-only",
"hot-standby",
// Perhaps we could offer upgrades at a specific LSN in the future.
"lsn",
])
)
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")

View File

@@ -182,6 +182,7 @@ impl ComputeControlPlane {
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<()> {
// TODO: It really feels like I need to do some protection here
if matches!(mode, ComputeMode::Primary) {
// this check is not complete, as you could have a concurrent attempt at
// creating another primary, both reading the state before checking it here,
@@ -393,6 +394,7 @@ impl Endpoint {
conf.append("recovery_prefetch", "off");
}
}
ComputeMode::Upgrade => {}
}
Ok(conf)
@@ -502,7 +504,7 @@ impl Endpoint {
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
if self.mode == ComputeMode::Primary {
if matches!(self.mode, ComputeMode::Primary | ComputeMode::Upgrade) {
for sk_id in sk_ids {
let sk = self
.env
@@ -624,13 +626,16 @@ impl Endpoint {
self.endpoint_path().join("spec.json").to_str().unwrap(),
])
.args([
"--pgbin",
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
"--pgroot",
&self
.env
.pg_distrib_dir
.clone()
.into_os_string()
.into_string()
.unwrap(),
])
.args(["--pgversion", &self.pg_version.to_string()])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
@@ -674,7 +679,7 @@ impl Endpoint {
}
// keep retrying
}
ComputeStatus::Running => {
ComputeStatus::Running | ComputeStatus::Prepared => {
// All good!
break;
}
@@ -688,6 +693,7 @@ impl Endpoint {
);
}
ComputeStatus::Empty
| ComputeStatus::Upgrading
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending

View File

@@ -12,3 +12,9 @@ use serde::Deserialize;
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
/// Request body of the /upgrade API
#[derive(Deserialize, Debug)]
pub struct UpgradeRequest {
pub pg_version: String,
}

View File

@@ -44,10 +44,15 @@ pub enum ComputeStatus {
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute has been prepared, meaning that remote extensions have been
// downloaded and the data directory has been prepared.
Prepared,
// Compute is configured and running.
Running,
// New spec is being applied.
Configuration,
// Compute is upgrading Postgres.
Upgrading,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.

View File

@@ -199,6 +199,11 @@ pub enum ComputeMode {
/// Future versions may want to distinguish between replicas with hot standby
/// feedback and other kinds of replication configurations.
Replica,
/// An upgrade-only node
///
/// This node will not accept remote Postgres connections. It's only
/// purpose is to upgrade a timeline.
Upgrade,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]

View File

@@ -167,7 +167,16 @@ pub const RM_RELMAP_ID: u8 = 7;
pub const RM_STANDBY_ID: u8 = 8;
pub const RM_HEAP2_ID: u8 = 9;
pub const RM_HEAP_ID: u8 = 10;
pub const RM_BTREE_ID: u8 = 11;
pub const RM_HASH_ID: u8 = 12;
pub const RM_GIN_ID: u8 = 13;
pub const RM_GIST_ID: u8 = 14;
pub const RM_SEQ_ID: u8 = 15;
pub const RM_SPGIST_ID: u8 = 16;
pub const RM_BRIN_ID: u8 = 17;
pub const RM_COMMIT_TS_ID: u8 = 18;
pub const RM_REPLORIGIN_ID: u8 = 19;
pub const RM_GENERIC_ID: u8 = 20;
pub const RM_LOGICALMSG_ID: u8 = 21;
// from neon_rmgr.h
@@ -181,9 +190,27 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30;
pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40;
pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50;
pub const XLOG_NEON_FILE: u8 = 0x60;
pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40;
#[repr(C)]
#[derive(Debug)]
pub enum XlNeonFileFiletype {
UPGRADE_TARBALL,
}
impl TryFrom<u8> for XlNeonFileFiletype {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(XlNeonFileFiletype::UPGRADE_TARBALL),
_ => Err(()),
}
}
}
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;

View File

@@ -506,6 +506,10 @@ async fn import_file(
return Ok(None);
}
if file_name == "pg_internal.init" {
return Ok(None);
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;

View File

@@ -14,7 +14,7 @@ mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
use arc_swap::ArcSwap;
use bytes::Bytes;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
@@ -4357,6 +4357,10 @@ impl Timeline {
)
}
pub fn get_path(&self) -> Utf8PathBuf {
self.conf.timelines_path(&self.tenant_shard_id)
}
/// Detach this timeline from its ancestor by copying all of ancestors layers as this
/// Timelines layers up to the ancestor_lsn.
///

View File

@@ -24,6 +24,7 @@
use std::time::Duration;
use std::time::SystemTime;
use camino::Utf8PathBuf;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -33,8 +34,10 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use utils::failpoint_support;
use utils::rate_limit::RateLimit;
use utils::zstd::extract_zst_tarball;
use crate::context::RequestContext;
use crate::import_datadir;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::{DatadirModification, Version};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -69,7 +72,9 @@ impl CheckPoint {
}
}
pub struct WalIngest {
pub struct WalIngest<'t> {
timeline: &'t Timeline,
timeline_path: Utf8PathBuf,
shard: ShardIdentity,
checkpoint: CheckPoint,
checkpoint_modified: bool,
@@ -82,12 +87,12 @@ struct WarnIngestLag {
timestamp_invalid_msg_ratelimit: RateLimit,
}
impl WalIngest {
impl<'t> WalIngest<'t> {
pub async fn new(
timeline: &Timeline,
timeline: &'t Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
) -> anyhow::Result<WalIngest<'t>> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -100,6 +105,8 @@ impl WalIngest {
});
Ok(WalIngest {
timeline,
timeline_path: timeline.get_path(),
shard: *timeline.get_shard_identity(),
checkpoint,
checkpoint_modified: false,
@@ -458,6 +465,17 @@ impl WalIngest {
modification.drop_replorigin(xlrec.node_id).await?
}
}
pg_constants::RM_BTREE_ID
| pg_constants::RM_HASH_ID
| pg_constants::RM_GIN_ID
| pg_constants::RM_GIST_ID
| pg_constants::RM_SEQ_ID
| pg_constants::RM_SPGIST_ID
| pg_constants::RM_BRIN_ID
| pg_constants::RM_COMMIT_TS_ID
| pg_constants::RM_GENERIC_ID => {
// No special handling currently for these resource managers
}
_x => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol
@@ -923,6 +941,33 @@ impl WalIngest {
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match pg_version {
15 => {
let info = decoded.xl_info;
match info {
pg_constants::XLOG_NEON_FILE => {
info!(
"tristan: last_record_lsn={}",
self.timeline.get_last_record_lsn()
);
let xlrec = v16::rm_neon::XlNeonFile::decode(buf);
let pgdata_path = self.timeline_path.join("new-pgdata");
extract_zst_tarball(&pgdata_path, &*xlrec.data).await?;
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?;
info!("LSN from pg_upgraded controlfile: {lsn}");
Box::pin(import_datadir::import_timeline_from_postgres_datadir(
self.timeline,
&pgdata_path,
lsn,
ctx,
))
.await?;
}
_ => return Err(anyhow::anyhow!("Unknown XLOG xl_info field: {}", info)),
}
}
16 => {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;

View File

@@ -526,6 +526,29 @@ pub mod v16 {
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonFile {
pub filetype: u8,
pub size: u32,
pub data: Bytes,
}
impl XlNeonFile {
pub fn decode(buf: &mut Bytes) -> Self {
let filetype = buf.get_u8();
// Skip the padding
buf.advance(std::mem::size_of::<u8>() * 3);
let size = buf.get_u32_le();
Self {
filetype,
size,
data: buf.copy_to_bytes(buf.remaining()),
}
}
}
}
}

View File

@@ -23,7 +23,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
DATA = \
neon--1.0.sql \
neon--1.0--1.1.sql \
neon--1.1--1.2.sql \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \
neon--1.2--1.1.sql \
neon--1.1--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -7,3 +7,7 @@ LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;
CREATE FUNCTION wal_log_file(path text)
RETURNS pg_lsn
AS 'MODULE_PATHNAME', 'wal_log_file'
LANGUAGE C STRICT PARALLEL UNSAFE;

View File

@@ -0,0 +1,8 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.5'" to load this file. \quit
CREATE FUNCTION wal_log_file(path text)
RETURNS pg_lsn
AS 'MODULE_PATHNAME', 'wal_log_file'
LANGUAGE C STRICT PARALLEL UNSAFE;
GRANT EXECUTE ON FUNCTION wal_log_file TO pg_monitor;

View File

@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS wal_log_file(text) CASCADE;

View File

@@ -11,6 +11,8 @@
#include "postgres.h"
#include "fmgr.h"
#include <sys/stat.h>
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
@@ -29,11 +31,19 @@
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/xloginsert.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/timeout.h"
#include "utils/wait_event.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#if PG_MAJORVERSION_NUM >= 15
#include "access/neon_xlog.h"
#endif
#include "extension_server.h"
#include "neon.h"
@@ -629,10 +639,17 @@ ReportSearchPath(void)
void
_PG_init(void)
{
const char *purpose;
purpose = getenv("NEON_PURPOSE");
if (purpose && strcmp(purpose, "upgrade") == 0)
return;
/*
* Also load 'neon_rmgr'. This makes it unnecessary to list both 'neon'
* and 'neon_rmgr' in shared_preload_libraries.
*/
#if PG_VERSION_NUM >= 160000
load_file("$libdir/neon_rmgr", false);
#endif
@@ -676,6 +693,9 @@ _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
#if PG_MAJORVERSION_NUM >= 15
PG_FUNCTION_INFO_V1(wal_log_file);
#endif
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -721,3 +741,200 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
#if PG_MAJORVERSION_NUM >= 15
Datum
wal_log_file(PG_FUNCTION_ARGS)
{
int rc;
int fd;
ssize_t n;
text *path;
size_t off;
char *data;
short nargs;
struct stat st;
XLogRecPtr lsn;
size_t path_len;
xl_neon_file xlrec;
char file[MAXPGPATH];
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
bool wal_debug;
#endif
path = PG_GETARG_TEXT_PP(0);
path_len = VARSIZE(path) - VARHDRSZ;
memcpy(file, VARDATA(path), path_len);
file[path_len] = '\0';
/* Get the size of the file. Note that stat(2) follows symlinks. */
rc = stat(file, &st);
if (rc != 0)
ereport(ERROR,
(errmsg("failed to get size of file (%s): %m", file)));
xlrec.size = (size_t) st.st_size;
fd = open(file, O_RDONLY);
if (fd == -1)
ereport(ERROR,
(errmsg("could not open %s: %m", file)));
/* If the file is too large, error out. */
data = palloc(xlrec.size);
/* Copy the file contents */
off = 0;
while (true) {
n = read(fd, data + off, xlrec.size - off);
if (n == EOF)
{
close(fd);
ereport(ERROR,
(errmsg("failed to read %s: %m", file)));
}
off += n;
if (xlrec.size - off == 0)
break;
}
close(fd);
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
XLogRegisterData((char *) data, xlrec.size);
/*
* We must turn debugging off on anything where the Neon RMGR is not
* registered. Stash the original value for restoration later.
*/
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
wal_debug = XLOG_DEBUG;
XLOG_DEBUG = false;
#endif
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
XLOG_DEBUG = wal_debug;
#endif
PG_RETURN_LSN(lsn);
}
#endif
#if PG_MAJORVERSION_NUM >= 15
/*
* Entry point for `postgres --wal-log`.
*/
PGDLLEXPORT void
WalLog(int argc, char *argv[])
{
int rc;
int fd;
off_t off;
struct stat st;
XLogRecPtr lsn;
void *data;
xl_neon_file xlrec;
/* TODO: should this be PATH_MAX? should we require an absolute path? */
char file[MAXPGPATH];
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
bool wal_debug;
#endif
if (argc != 3)
ereport(ERROR, errmsg("wrong number of arguments passed to --wal-log"));
if (!realpath(argv[2], file))
ereport(ERROR, errmsg("failed to resolve path: %m"));
ereport(LOG, errmsg("writing %s to WAL", file));
ChangeToDataDir();
CreateDataDirLockFile(false);
LocalProcessControlFile(false);
InitializeMaxBackends();
CreateSharedMemoryAndSemaphores();
InitializeTimeouts();
InitProcess();
BaseInit();
CreateAuxProcessResourceOwner();
StartupXLOG();
/* Get the size of the file. Note that stat(2) follows symlinks. */
rc = stat(file, &st);
if (rc != 0)
ereport(ERROR,
(errmsg("failed to get size of file (%s): %m", file)));
xlrec.size = (size_t) st.st_size;
fd = open(file, O_RDONLY);
if (fd == -1)
ereport(ERROR,
(errmsg("could not open %s: %m", file)));
/* If the file is too large, error out. */
data = palloc(xlrec.size);
/* Copy the file contents */
off = 0;
while (true) {
ssize_t n;
n = read(fd, data + off, xlrec.size - off);
if (n == EOF)
{
close(fd);
ereport(ERROR,
(errmsg("failed to read %s: %m", file)));
}
off += n;
if (xlrec.size - off == 0)
break;
}
close(fd);
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
/* ereport(LOG, errmsg("Current LSN: %X/%X" , LSN_FORMAT_ARGS(GetXLogWriteRecPtr()))); */
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
XLogRegisterData(data, xlrec.size);
/*
* We must turn debugging off on anything where the Neon RMGR is not
* registered. Stash the original value for restoration later.
*/
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
wal_debug = XLOG_DEBUG;
XLOG_DEBUG = false;
#endif
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
XLOG_DEBUG = wal_debug;
#endif
exit(0);
}
#endif

View File

@@ -78,11 +78,16 @@ neon_smgr_shmem_startup(void)
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
if (relsize_hash_size <= 0)
return;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
if (!found)
{
elog(LOG, "neon_relsize: %d", relsize_hash_size);
relsize_lock = (LWLockId) GetNamedLWLockTranche("neon_relsize");
elog(LOG, "neon_relsize");
info.keysize = sizeof(RelTag);
info.entrysize = sizeof(RelSizeEntry);
relsize_hash = ShmemInitHash("neon_relsize",

View File

@@ -76,6 +76,8 @@ neon_rm_redo(XLogReaderState *record)
case XLOG_NEON_HEAP_MULTI_INSERT:
redo_neon_heap_multi_insert(record);
break;
case XLOG_NEON_FILE:
break;
default:
elog(PANIC, "neon_rm_redo: unknown op code %u", info);
}

View File

@@ -57,6 +57,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonMultiInsert(ctx, buf);
break;
case XLOG_NEON_FILE:
break;
default:
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
break;
@@ -401,4 +403,4 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
}
#endif
#endif

View File

@@ -134,6 +134,16 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record)
xlrec->ntuples, &offset_elem_desc, NULL);
}
}
else if (info == XLOG_NEON_FILE)
{
const xl_neon_file *xlrec = (xl_neon_file *) rec;
switch ((xl_neon_file_filetype) xlrec->filetype)
{
case XL_NEON_FILE_UPGRADE_TARBALL:
appendStringInfo(buf, "filetype: upgrade tarball, size: %zu", xlrec->size);
break;
}
}
}
const char *
@@ -173,6 +183,9 @@ neon_rm_identify(uint8 info)
case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE:
id = "MULTI_INSERT+INIT";
break;
case XLOG_NEON_FILE:
id = "FILE";
break;
}
return id;

View File

@@ -580,15 +580,15 @@ where
* because safekeepers parse WAL headers and the format
* may change between versions.
*/
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
bail!(
"incompatible server version {}, expected {}",
msg.pg_version,
self.state.server.pg_version
);
}
// if msg.pg_version / 10000 != self.state.server.pg_version / 10000
// && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
// {
// bail!(
// "incompatible server version {}, expected {}",
// msg.pg_version,
// self.state.server.pg_version
// );
// }
if msg.tenant_id != self.state.tenant_id {
bail!(

12
test.sh Executable file
View File

@@ -0,0 +1,12 @@
#!/bin/sh
cargo neon endpoint stop ep-main 2>/dev/null
kill $(pgrep compute_ctl)
cargo neon stop 2>/dev/null
rm -rf .neon
cargo neon init
cargo neon start
cargo neon tenant create --pg-version 15 --set-default
cargo neon endpoint create --pg-version 15 --upgrade-only
cargo neon endpoint start ep-main
curl -i -X POST http://localhost:55433/upgrade -d '{"pg_version": "16"}'

View File

@@ -0,0 +1,54 @@
from time import sleep
import pytest
import requests
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion
def test_upgrade(pg_version: PgVersion, neon_simple_env: NeonEnv):
env = neon_simple_env
upgrade_to: PgVersion
if pg_version == PgVersion.V14:
upgrade_to = PgVersion.V15
elif pg_version == PgVersion.V15:
upgrade_to = PgVersion.V16
else:
pytest.skip("Nothing to upgrade")
env.neon_cli.create_timeline("test_upgrade")
endpoint = env.endpoints.create_start("test_upgrade")
resp = requests.post(
f"http://localhost:{endpoint.http_port}/upgrade",
json={
"pg_version": upgrade_to,
},
)
assert resp.status_code == 202
while True:
resp = requests.get(f"http://localhost:{endpoint.http_port}/status")
assert resp.status_code == 200
data = resp.json()
if data["status"] == "upgrading":
sleep(1)
continue
elif data["status"] == "running":
break
else:
pytest.fail(f"Unexpected compute state during upgrade: {data['status']}")
endpoint.stop_and_destroy()
def test_upgrade_bad_request(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_timeline("test_upgrade_bad_request")
endpoint = env.endpoints.create_start("test_upgrade_bad_request")
resp = requests.post(f"http://localhost:{endpoint.http_port}/upgrade")
assert resp.status_code == 400
# TODO: Use postgres versions that are out of range.