mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 19:20:36 +00:00
Compare commits
3 Commits
fcdm/dev-e
...
tristan957
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bbd646325c | ||
|
|
5e71d8fddc | ||
|
|
3d07b6a483 |
4
.gitmodules
vendored
4
.gitmodules
vendored
@@ -5,8 +5,8 @@
|
||||
[submodule "vendor/postgres-v15"]
|
||||
path = vendor/postgres-v15
|
||||
url = https://github.com/neondatabase/postgres.git
|
||||
branch = REL_15_STABLE_neon
|
||||
branch = tristan957/15/pg_upgrade
|
||||
[submodule "vendor/postgres-v16"]
|
||||
path = vendor/postgres-v16
|
||||
url = https://github.com/neondatabase/postgres.git
|
||||
branch = REL_16_STABLE_neon
|
||||
branch = tristan957/pg_upgrade
|
||||
|
||||
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1220,6 +1220,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-compression",
|
||||
"bytes",
|
||||
"camino",
|
||||
"cfg-if",
|
||||
"chrono",
|
||||
"clap",
|
||||
@@ -1237,6 +1238,7 @@ dependencies = [
|
||||
"reqwest 0.12.4",
|
||||
"rlimit",
|
||||
"rust-ini",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"signal-hook",
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = []
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-compression.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
clap.workspace = true
|
||||
@@ -24,6 +25,7 @@ num_cpus.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
postgres.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
|
||||
@@ -43,7 +43,7 @@ use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Arg;
|
||||
use clap::{Arg, ArgAction};
|
||||
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use signal_hook::consts::{SIGQUIT, SIGTERM};
|
||||
use signal_hook::{consts::SIGINT, iterator::Signals};
|
||||
@@ -51,13 +51,12 @@ use tracing::{error, info, warn};
|
||||
use url::Url;
|
||||
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeSpec;
|
||||
use compute_api::spec::{ComputeMode, ComputeSpec};
|
||||
|
||||
use compute_tools::compute::{
|
||||
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
|
||||
};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::get_pg_version;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
@@ -70,13 +69,14 @@ use rlimit::{setrlimit, Resource};
|
||||
// in-case of not-set environment var
|
||||
const BUILD_TAG_DEFAULT: &str = "latest";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let (build_tag, clap_args) = init()?;
|
||||
|
||||
// enable core dumping for all child processes
|
||||
setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?;
|
||||
|
||||
let (pg_handle, start_pg_result) = {
|
||||
let (http_handle, (pg_handle, start_pg_result)) = {
|
||||
// Enter startup tracing context
|
||||
let _startup_context_guard = startup_context_from_env();
|
||||
|
||||
@@ -84,13 +84,61 @@ fn main() -> Result<()> {
|
||||
|
||||
let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?;
|
||||
|
||||
let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?;
|
||||
let compute = Arc::new(ComputeNode {
|
||||
connstr: Url::parse(cli_args.connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: cli_args.pgdata.to_string(),
|
||||
pgroot: cli_args.pgroot.to_string(),
|
||||
pgversion: cli_args.pgversion.to_string(),
|
||||
http_port: cli_args.http_port,
|
||||
live_config_allowed: cli_spec.live_config_allowed,
|
||||
state: Mutex::new({
|
||||
let mut state = ComputeState::new();
|
||||
|
||||
start_postgres(&clap_args, wait_spec_result)?
|
||||
if let Some(spec) = cli_spec.spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
info!("new pspec.spec: {:?}", pspec.spec);
|
||||
state.pspec = Some(pspec);
|
||||
}
|
||||
|
||||
state
|
||||
}),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage: cli_args.ext_remote_storage.map(|s| s.to_string()),
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
build_tag: build_tag.clone(),
|
||||
});
|
||||
|
||||
// If this is a pooled VM, prewarm before starting HTTP server and becoming
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
// because QEMU will already have its memory allocated from the host, and
|
||||
// the necessary binaries will already be cached.
|
||||
if compute.state.lock().unwrap().pspec.is_none() {
|
||||
compute.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Launch http service first, so that we can serve control-plane requests
|
||||
// while configuration is still in progress.
|
||||
let http_handle = launch_http_server(cli_args.http_port, &compute)
|
||||
.expect("cannot launch http endpoint thread");
|
||||
|
||||
wait_spec(&compute)?;
|
||||
|
||||
(
|
||||
http_handle,
|
||||
start_postgres(
|
||||
compute,
|
||||
#[cfg(target_os = "linux")]
|
||||
&clap_args,
|
||||
cli_args.resize_swap_on_bind,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
|
||||
// Startup is finished, exit the startup tracing span
|
||||
};
|
||||
|
||||
let _ = http_handle.join();
|
||||
|
||||
// PostgreSQL is now running, if startup was successful. Wait until it exits.
|
||||
let wait_pg_result = wait_postgres(pg_handle)?;
|
||||
|
||||
@@ -120,11 +168,14 @@ fn init() -> Result<(String, clap::ArgMatches)> {
|
||||
}
|
||||
|
||||
fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
|
||||
let pgbin_default = "postgres";
|
||||
let pgbin = matches
|
||||
.get_one::<String>("pgbin")
|
||||
let pgroot = matches
|
||||
.get_one::<String>("pgroot")
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or(pgbin_default);
|
||||
.expect("pgroot is required");
|
||||
let pgversion = matches
|
||||
.get_one::<String>("pgversion")
|
||||
.map(|s| s.as_str())
|
||||
.expect("pgversion is required");
|
||||
|
||||
let ext_remote_storage = matches
|
||||
.get_one::<String>("remote-ext-config")
|
||||
@@ -155,7 +206,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
|
||||
Ok(ProcessCliResult {
|
||||
connstr,
|
||||
pgdata,
|
||||
pgbin,
|
||||
pgroot,
|
||||
pgversion,
|
||||
ext_remote_storage,
|
||||
http_port,
|
||||
spec_json,
|
||||
@@ -167,7 +219,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
|
||||
struct ProcessCliResult<'clap> {
|
||||
connstr: &'clap str,
|
||||
pgdata: &'clap str,
|
||||
pgbin: &'clap str,
|
||||
pgroot: &'clap str,
|
||||
pgversion: &'clap str,
|
||||
ext_remote_storage: Option<&'clap str>,
|
||||
http_port: u16,
|
||||
spec_json: Option<&'clap String>,
|
||||
@@ -285,61 +338,8 @@ struct CliSpecParams {
|
||||
live_config_allowed: bool,
|
||||
}
|
||||
|
||||
fn wait_spec(
|
||||
build_tag: String,
|
||||
ProcessCliResult {
|
||||
connstr,
|
||||
pgdata,
|
||||
pgbin,
|
||||
ext_remote_storage,
|
||||
resize_swap_on_bind,
|
||||
http_port,
|
||||
..
|
||||
}: ProcessCliResult,
|
||||
CliSpecParams {
|
||||
spec,
|
||||
live_config_allowed,
|
||||
}: CliSpecParams,
|
||||
) -> Result<WaitSpecResult> {
|
||||
let mut new_state = ComputeState::new();
|
||||
let spec_set;
|
||||
|
||||
if let Some(spec) = spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
info!("new pspec.spec: {:?}", pspec.spec);
|
||||
new_state.pspec = Some(pspec);
|
||||
spec_set = true;
|
||||
} else {
|
||||
spec_set = false;
|
||||
}
|
||||
let compute_node = ComputeNode {
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
pgversion: get_pg_version(pgbin),
|
||||
live_config_allowed,
|
||||
state: Mutex::new(new_state),
|
||||
state_changed: Condvar::new(),
|
||||
ext_remote_storage: ext_remote_storage.map(|s| s.to_string()),
|
||||
ext_download_progress: RwLock::new(HashMap::new()),
|
||||
build_tag,
|
||||
};
|
||||
let compute = Arc::new(compute_node);
|
||||
|
||||
// If this is a pooled VM, prewarm before starting HTTP server and becoming
|
||||
// available for binding. Prewarming helps Postgres start quicker later,
|
||||
// because QEMU will already have its memory allocated from the host, and
|
||||
// the necessary binaries will already be cached.
|
||||
if !spec_set {
|
||||
compute.prewarm_postgres()?;
|
||||
}
|
||||
|
||||
// Launch http service first, so that we can serve control-plane requests
|
||||
// while configuration is still in progress.
|
||||
let _http_handle =
|
||||
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
|
||||
|
||||
if !spec_set {
|
||||
fn wait_spec(compute: &Arc<ComputeNode>) -> Result<()> {
|
||||
if compute.state.lock().unwrap().pspec.is_none() {
|
||||
// No spec provided, hang waiting for it.
|
||||
info!("no compute spec provided, waiting");
|
||||
|
||||
@@ -369,28 +369,13 @@ fn wait_spec(
|
||||
|
||||
launch_lsn_lease_bg_task_for_static(&compute);
|
||||
|
||||
Ok(WaitSpecResult {
|
||||
compute,
|
||||
http_port,
|
||||
resize_swap_on_bind,
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct WaitSpecResult {
|
||||
async fn start_postgres(
|
||||
compute: Arc<ComputeNode>,
|
||||
// passed through from ProcessCliResult
|
||||
http_port: u16,
|
||||
#[cfg(target_os = "linux")] matches: &clap::ArgMatches,
|
||||
resize_swap_on_bind: bool,
|
||||
}
|
||||
|
||||
fn start_postgres(
|
||||
// need to allow unused because `matches` is only used if target_os = "linux"
|
||||
#[allow(unused_variables)] matches: &clap::ArgMatches,
|
||||
WaitSpecResult {
|
||||
compute,
|
||||
http_port,
|
||||
resize_swap_on_bind,
|
||||
}: WaitSpecResult,
|
||||
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
|
||||
// We got all we need, update the state.
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
@@ -441,27 +426,32 @@ fn start_postgres(
|
||||
}
|
||||
}
|
||||
|
||||
let extension_server_port: u16 = http_port;
|
||||
compute.prepare_compute().await?;
|
||||
|
||||
// Start Postgres
|
||||
let mut pg = None;
|
||||
if !prestartup_failed {
|
||||
pg = match compute.start_compute(extension_server_port) {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:#}", err);
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
state.error = Some(format!("{:?}", err));
|
||||
state.status = ComputeStatus::Failed;
|
||||
// Notify others that Postgres failed to start. In case of configuring the
|
||||
// empty compute, it's likely that API handler is still waiting for compute
|
||||
// state change. With this we will notify it that compute is in Failed state,
|
||||
// so control plane will know about it earlier and record proper error instead
|
||||
// of timeout.
|
||||
compute.state_changed.notify_all();
|
||||
drop(state); // unlock
|
||||
delay_exit = true;
|
||||
None
|
||||
match compute.get_mode() {
|
||||
ComputeMode::Upgrade => {}
|
||||
_ => {
|
||||
// Start Postgres
|
||||
pg = match compute.start_compute() {
|
||||
Ok(pg) => Some(pg),
|
||||
Err(err) => {
|
||||
error!("could not start the compute node: {:#}", err);
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
state.error = Some(format!("{:?}", err));
|
||||
state.status = ComputeStatus::Failed;
|
||||
// Notify others that Postgres failed to start. In case of configuring the
|
||||
// empty compute, it's likely that API handler is still waiting for compute
|
||||
// state change. With this we will notify it that compute is in Failed state,
|
||||
// so control plane will know about it earlier and record proper error instead
|
||||
// of timeout.
|
||||
compute.state_changed.notify_all();
|
||||
drop(state); // unlock
|
||||
delay_exit = true;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
} else {
|
||||
@@ -686,11 +676,17 @@ fn cli() -> clap::Command {
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("pgbin")
|
||||
.short('b')
|
||||
.long("pgbin")
|
||||
.default_value("postgres")
|
||||
.value_name("POSTGRES_PATH"),
|
||||
Arg::new("pgroot")
|
||||
.short('R')
|
||||
.long("pgroot")
|
||||
.value_name("POSTGRES_ROOT")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("pgversion")
|
||||
.long("pgversion")
|
||||
.value_name("POSTGRES_VERSION")
|
||||
.required(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("spec")
|
||||
@@ -750,6 +746,11 @@ fn cli() -> clap::Command {
|
||||
.long("resize-swap-on-bind")
|
||||
.action(clap::ArgAction::SetTrue),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("no-postgres")
|
||||
.long("no-postgres")
|
||||
.action(ArgAction::SetTrue),
|
||||
)
|
||||
}
|
||||
|
||||
/// When compute_ctl is killed, send also termination signal to sync-safekeepers
|
||||
|
||||
@@ -4,7 +4,7 @@ use compute_api::{
|
||||
};
|
||||
use futures::Stream;
|
||||
use postgres::{Client, NoTls};
|
||||
use std::{path::Path, process::Stdio, result::Result, sync::Arc};
|
||||
use std::{process::Stdio, result::Result, sync::Arc};
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
process::Command,
|
||||
@@ -55,9 +55,7 @@ pub async fn get_database_schema(
|
||||
compute: &Arc<ComputeNode>,
|
||||
dbname: &str,
|
||||
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
|
||||
let pgbin = &compute.pgbin;
|
||||
let basepath = Path::new(pgbin).parent().unwrap();
|
||||
let pgdump = basepath.join("pg_dump");
|
||||
let pgdump = compute.get_my_pg_binary("pg_dump");
|
||||
let mut connstr = compute.connstr.clone();
|
||||
connstr.set_path(dbname);
|
||||
let mut cmd = Command::new(pgdump)
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::os::unix::fs::{symlink, PermissionsExt};
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
@@ -13,6 +13,8 @@ use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use bytes::{Buf, BufMut};
|
||||
use camino::Utf8Path;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::future::join_all;
|
||||
use futures::stream::FuturesUnordered;
|
||||
@@ -20,13 +22,15 @@ use futures::StreamExt;
|
||||
use nix::unistd::Pid;
|
||||
use postgres::error::SqlState;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio;
|
||||
use tokio_postgres;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::zstd::create_zst_tarball;
|
||||
|
||||
use compute_api::responses::{ComputeMetrics, ComputeStatus};
|
||||
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
|
||||
use utils::measured_stream::MeasuredReader;
|
||||
|
||||
use nix::sys::signal::{kill, Signal};
|
||||
|
||||
@@ -47,8 +51,9 @@ pub struct ComputeNode {
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub pgroot: String,
|
||||
pub pgversion: String,
|
||||
pub http_port: u16,
|
||||
/// We should only allow live re- / configuration of the compute node if
|
||||
/// it uses 'pull model', i.e. it can go to control-plane and fetch
|
||||
/// the latest configuration. Otherwise, there could be a case:
|
||||
@@ -309,6 +314,13 @@ impl ComputeNode {
|
||||
self.state.lock().unwrap().status
|
||||
}
|
||||
|
||||
/// Get the mode of this compute.
|
||||
pub fn get_mode(&self) -> ComputeMode {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
state.pspec.as_ref().unwrap().spec.mode
|
||||
}
|
||||
|
||||
// Remove `pgdata` directory and create it again with right permissions.
|
||||
fn create_pgdata(&self) -> Result<()> {
|
||||
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
|
||||
@@ -320,15 +332,44 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get path pointing to requested binary directory.
|
||||
pub fn get_pg_bindir(&self, version: &str) -> PathBuf {
|
||||
Path::new(&self.pgroot)
|
||||
.join(format!("v{}", version))
|
||||
.join("bin")
|
||||
}
|
||||
|
||||
/// Get path to requested Postgres binary.
|
||||
pub fn get_pg_binary(&self, version: &str, binary: &str) -> String {
|
||||
self.get_pg_bindir(version)
|
||||
.join(binary)
|
||||
.into_os_string()
|
||||
.into_string()
|
||||
.expect(&format!(
|
||||
"path to {}-{} cannot be represented as UTF-8",
|
||||
binary, version
|
||||
))
|
||||
}
|
||||
|
||||
/// Get path to Postgres binary directory of this compute.
|
||||
pub fn get_my_pg_bindir(&self) -> PathBuf {
|
||||
self.get_pg_bindir(&self.pgversion)
|
||||
}
|
||||
|
||||
/// Get path to specified Postgres binary of this compute.
|
||||
pub fn get_my_pg_binary(&self, binary: &str) -> String {
|
||||
self.get_pg_binary(&self.pgversion, binary)
|
||||
}
|
||||
|
||||
// Get basebackup from the libpq connection to pageserver using `connstr` and
|
||||
// unarchive it to `pgdata` directory overriding all its previous content.
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
async fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let spec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let start_time = Instant::now();
|
||||
|
||||
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
|
||||
let mut config = postgres::Config::from_str(shard0_connstr)?;
|
||||
let mut config = tokio_postgres::Config::from_str(shard0_connstr)?;
|
||||
|
||||
// Use the storage auth token from the config file, if given.
|
||||
// Note: this overrides any password set in the connection string.
|
||||
@@ -340,7 +381,12 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
// Connect to pageserver
|
||||
let mut client = config.connect(NoTls)?;
|
||||
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
||||
|
||||
let basebackup_cmd = match lsn {
|
||||
@@ -352,8 +398,18 @@ impl ComputeNode {
|
||||
),
|
||||
};
|
||||
|
||||
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
||||
let mut measured_reader = MeasuredReader::new(copyreader);
|
||||
let mut copystream = std::pin::pin!(client.copy_out(basebackup_cmd.as_str()).await?);
|
||||
let mut buf = bytes::BytesMut::with_capacity(1024);
|
||||
while let Some(i) = copystream.next().await {
|
||||
match i {
|
||||
Ok(b) => {
|
||||
buf.put(b);
|
||||
}
|
||||
Err(e) => return Err(anyhow::anyhow!(e)),
|
||||
}
|
||||
}
|
||||
|
||||
let basebackup_bytes = buf.len();
|
||||
|
||||
// Check the magic number to see if it's a gzip or not. Even though
|
||||
// we might explicitly ask for gzip, an old pageserver with no implementation
|
||||
@@ -366,11 +422,9 @@ impl ComputeNode {
|
||||
// and 0x1f and 0x8b are unlikely first characters for any filename. Moreover,
|
||||
// we send the "global" directory first from the pageserver, so it definitely
|
||||
// won't be recognized as gzip.
|
||||
let mut bufreader = std::io::BufReader::new(&mut measured_reader);
|
||||
let gzip = {
|
||||
let peek = bufreader.fill_buf().unwrap();
|
||||
peek[0] == 0x1f && peek[1] == 0x8b
|
||||
};
|
||||
let gzip = buf[0] == 0x1f && buf[1] == 0x8b;
|
||||
|
||||
let mut bufreader = buf.reader();
|
||||
|
||||
// Read the archive directly from the `CopyOutReader`
|
||||
//
|
||||
@@ -390,14 +444,14 @@ impl ComputeNode {
|
||||
// Report metrics
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.metrics.pageserver_connect_micros = pageserver_connect_micros;
|
||||
state.metrics.basebackup_bytes = measured_reader.get_byte_count() as u64;
|
||||
state.metrics.basebackup_bytes = basebackup_bytes as u64;
|
||||
state.metrics.basebackup_ms = start_time.elapsed().as_millis() as u64;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Gets the basebackup in a retry loop
|
||||
#[instrument(skip_all, fields(%lsn))]
|
||||
pub fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
pub async fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
|
||||
let mut retry_period_ms = 500.0;
|
||||
let mut attempts = 0;
|
||||
const DEFAULT_ATTEMPTS: u16 = 10;
|
||||
@@ -410,7 +464,7 @@ impl ComputeNode {
|
||||
#[cfg(not(feature = "testing"))]
|
||||
let max_attempts = DEFAULT_ATTEMPTS;
|
||||
loop {
|
||||
let result = self.try_get_basebackup(compute_state, lsn);
|
||||
let result = self.try_get_basebackup(compute_state, lsn).await;
|
||||
match result {
|
||||
Ok(_) => {
|
||||
return result;
|
||||
@@ -431,10 +485,14 @@ impl ComputeNode {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn check_safekeepers_synced_async(
|
||||
// Fast path for sync_safekeepers. If they're already synced we get the lsn
|
||||
// in one roundtrip. If not, we should do a full sync_safekeepers.
|
||||
pub async fn check_safekeepers_synced(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
) -> Result<Option<Lsn>> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Construct a connection config for each safekeeper
|
||||
let pspec: ParsedSpec = compute_state
|
||||
.pspec
|
||||
@@ -503,20 +561,7 @@ impl ComputeNode {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(check_if_synced(responses))
|
||||
}
|
||||
|
||||
// Fast path for sync_safekeepers. If they're already synced we get the lsn
|
||||
// in one roundtrip. If not, we should do a full sync_safekeepers.
|
||||
pub fn check_safekeepers_synced(&self, compute_state: &ComputeState) -> Result<Option<Lsn>> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
// Run actual work with new tokio runtime
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
let result = rt.block_on(self.check_safekeepers_synced_async(compute_state));
|
||||
let lsn = check_if_synced(responses);
|
||||
|
||||
// Record runtime
|
||||
self.state.lock().unwrap().metrics.sync_sk_check_ms = Utc::now()
|
||||
@@ -524,7 +569,8 @@ impl ComputeNode {
|
||||
.to_std()
|
||||
.unwrap()
|
||||
.as_millis() as u64;
|
||||
result
|
||||
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
// Run `postgres` in a special mode with `--sync-safekeepers` argument
|
||||
@@ -533,7 +579,8 @@ impl ComputeNode {
|
||||
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
|
||||
let start_time = Utc::now();
|
||||
|
||||
let mut sync_handle = maybe_cgexec(&self.pgbin)
|
||||
let postgres = self.get_my_pg_binary("postgres");
|
||||
let mut sync_handle = maybe_cgexec(&postgres)
|
||||
.args(["--sync-safekeepers"])
|
||||
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
@@ -589,11 +636,7 @@ impl ComputeNode {
|
||||
/// Do all the preparations like PGDATA directory creation, configuration,
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip_all)]
|
||||
pub fn prepare_pgdata(
|
||||
&self,
|
||||
compute_state: &ComputeState,
|
||||
extension_server_port: u16,
|
||||
) -> Result<()> {
|
||||
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
@@ -603,7 +646,7 @@ impl ComputeNode {
|
||||
config::write_postgres_conf(
|
||||
&pgdata_path.join("postgresql.conf"),
|
||||
&pspec.spec,
|
||||
Some(extension_server_port),
|
||||
Some(self.http_port),
|
||||
)?;
|
||||
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
@@ -612,7 +655,8 @@ impl ComputeNode {
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("checking if safekeepers are synced");
|
||||
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state) {
|
||||
let lsn = if let Ok(Some(lsn)) = self.check_safekeepers_synced(compute_state).await
|
||||
{
|
||||
lsn
|
||||
} else {
|
||||
info!("starting safekeepers syncing");
|
||||
@@ -630,18 +674,24 @@ impl ComputeNode {
|
||||
info!("Initializing standby from latest Pageserver LSN");
|
||||
Lsn(0)
|
||||
}
|
||||
ComputeMode::Upgrade => {
|
||||
info!("Starting upgrade node at latest Pageserver LSN");
|
||||
Lsn(0)
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
"getting basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
);
|
||||
self.get_basebackup(compute_state, lsn).with_context(|| {
|
||||
format!(
|
||||
"failed to get basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
)
|
||||
})?;
|
||||
self.get_basebackup(compute_state, lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to get basebackup@{} from pageserver {}",
|
||||
lsn, &pspec.pageserver_connstr
|
||||
)
|
||||
})?;
|
||||
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path)?;
|
||||
@@ -689,7 +739,7 @@ impl ComputeNode {
|
||||
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
|
||||
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Primary | ComputeMode::Upgrade => {}
|
||||
ComputeMode::Replica | ComputeMode::Static(..) => {
|
||||
add_standby_signal(pgdata_path)?;
|
||||
}
|
||||
@@ -708,7 +758,7 @@ impl ComputeNode {
|
||||
|
||||
// Run initdb to completion
|
||||
info!("running initdb");
|
||||
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
|
||||
let initdb_bin = self.get_my_pg_binary("initdb");
|
||||
Command::new(initdb_bin)
|
||||
.args(["-D", pgdata])
|
||||
.output()
|
||||
@@ -724,7 +774,8 @@ impl ComputeNode {
|
||||
|
||||
// Start postgres
|
||||
info!("starting postgres");
|
||||
let mut pg = maybe_cgexec(&self.pgbin)
|
||||
let postgres = self.get_my_pg_binary("postgres");
|
||||
let mut pg = maybe_cgexec(&postgres)
|
||||
.args(["-D", pgdata])
|
||||
.spawn()
|
||||
.expect("cannot start postgres process");
|
||||
@@ -757,7 +808,8 @@ impl ComputeNode {
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
// Run postgres as a child process.
|
||||
let mut pg = maybe_cgexec(&self.pgbin)
|
||||
let postgres = self.get_my_pg_binary("postgres");
|
||||
let mut pg = maybe_cgexec(&postgres)
|
||||
.args(["-D", &self.pgdata])
|
||||
.envs(if let Some(storage_auth_token) = &storage_auth_token {
|
||||
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
|
||||
@@ -778,6 +830,152 @@ impl ComputeNode {
|
||||
Ok((pg, logs_handle))
|
||||
}
|
||||
|
||||
pub async fn upgrade(&self, pg_version: &str) -> Result<()> {
|
||||
let old_bindir = self.get_my_pg_bindir();
|
||||
let new_bindir = self.get_pg_bindir(pg_version);
|
||||
|
||||
let old_datadir = Utf8Path::new(&self.pgdata);
|
||||
let parent_dir = old_datadir.parent().unwrap();
|
||||
let new_datadir = parent_dir.join("new-pgdata");
|
||||
|
||||
// Delete the new data directory before attempting, just in case it exists
|
||||
let _ = std::fs::remove_dir_all(&new_datadir);
|
||||
|
||||
// Step 1: Create new cluster
|
||||
info!(
|
||||
"Running initdb to start a cluster upgrade from v{} to v{}",
|
||||
self.pgversion, pg_version
|
||||
);
|
||||
|
||||
let initdb_bin = self.get_pg_binary(pg_version, "initdb");
|
||||
let mut initdb_cmd = Command::new(&initdb_bin);
|
||||
initdb_cmd
|
||||
.args(["--pgdata", new_datadir.as_str()])
|
||||
.args(["--username", "cloud_admin"])
|
||||
.args(["--encoding", "utf8"])
|
||||
.args(["--auth-local", "trust"])
|
||||
.env_clear()
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit());
|
||||
|
||||
match initdb_cmd.status() {
|
||||
Ok(status) => {
|
||||
if !status.success() {
|
||||
return Err(anyhow::anyhow!("failed to initialize the new database"));
|
||||
}
|
||||
|
||||
info!("Initialized v{} database", pg_version);
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to spawn initdb for the new database"
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
// Step 2: Run pg_upgrade
|
||||
info!(
|
||||
"Running pg_upgrade to upgrade from v{} to v{}",
|
||||
self.pgversion, pg_version
|
||||
);
|
||||
|
||||
let pg_upgrade_bin = self.get_pg_binary(pg_version, "pg_upgrade");
|
||||
let mut pg_upgrade_cmd = Command::new(&pg_upgrade_bin);
|
||||
let mut child = pg_upgrade_cmd
|
||||
.args([
|
||||
"--old-bindir",
|
||||
&old_bindir.into_os_string().into_string().unwrap(),
|
||||
])
|
||||
.args(["--old-datadir", old_datadir.as_str()])
|
||||
.args(["--old-options", "-c neon.safekeepers=''"])
|
||||
.args([
|
||||
"--new-bindir",
|
||||
&new_bindir.into_os_string().into_string().unwrap(),
|
||||
])
|
||||
.args(["--new-datadir", new_datadir.as_str()])
|
||||
.args(["--new-options", "-c neon.safekeepers=''"])
|
||||
.args(["--username", "cloud_admin"])
|
||||
.arg("--no-transfer")
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()?;
|
||||
|
||||
let status = child.wait()?;
|
||||
if status.success() {
|
||||
info!("pg_upgrade was successful");
|
||||
} else {
|
||||
return Err(anyhow::anyhow!(
|
||||
"pg_upgrade failed with exit code {}",
|
||||
status.code().unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
/* Step 3: Delete the script that pg_upgrade generates, which is created in the current
|
||||
* working directory
|
||||
*/
|
||||
// TODO: We should write a patch for upstream to not generate this file
|
||||
if cfg!(windows) {
|
||||
let _ = std::fs::remove_file("delete_old_cluster.bat");
|
||||
} else {
|
||||
let _ = std::fs::remove_file("delete_old_cluster.sh");
|
||||
}
|
||||
|
||||
/* Step 4: Re-prepare the pgdata directory to work with the latest basebackup from the
|
||||
* pageserver
|
||||
*/
|
||||
{
|
||||
let state = self.state.lock().unwrap().clone();
|
||||
|
||||
self.prepare_pgdata(&state).await?;
|
||||
}
|
||||
|
||||
// Step 5: Create tarball minus things like pg_dynshm, etc.
|
||||
info!("Creating tarball of upgraded initdb directory, minus some files");
|
||||
|
||||
let initdb_tar_path = parent_dir.join("initdb.tar.zst");
|
||||
let _ = std::fs::remove_file(&initdb_tar_path);
|
||||
create_zst_tarball(&new_datadir, &initdb_tar_path).await?;
|
||||
|
||||
// Step 6: Write new postgresql.conf file for upgraded initdb
|
||||
std::fs::copy(
|
||||
old_datadir.join("postgresql.conf"),
|
||||
new_datadir.join("postgresql.conf"),
|
||||
)?;
|
||||
|
||||
// Step 7: Write the tarball into the Postgres WAL
|
||||
info!("Writing {} to WAL", initdb_tar_path.as_str());
|
||||
|
||||
let postgres_bin = self.get_my_pg_binary("postgres");
|
||||
let mut wal_log_cmd = Command::new(&postgres_bin);
|
||||
child = wal_log_cmd
|
||||
.args(["--wal-log", initdb_tar_path.as_str()])
|
||||
.env_clear()
|
||||
.env("PGDATA", old_datadir.as_str())
|
||||
.env("NEON_PURPOSE", "upgrade")
|
||||
.stdout(Stdio::inherit())
|
||||
.stderr(Stdio::inherit())
|
||||
.spawn()
|
||||
.expect("postgres --wal-log failed to start");
|
||||
match child.wait() {
|
||||
Ok(s) => {
|
||||
if !s.success() {
|
||||
return Err(anyhow::anyhow!("Could not WAL log upgrade tarball"));
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Step 8: Sync the safekeepers to push WAL record to Neon
|
||||
self.sync_safekeepers(None)?;
|
||||
|
||||
/* Note that whether any errors occur after this are unimportant. ALWAYS return success
|
||||
* after this point. The compute will be terminated immediately after the upgrade. Remember
|
||||
* that this is an upgrade-only compute, and it will not accept connections from users.
|
||||
*/
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Do post configuration of the already started Postgres. This function spawns a background thread to
|
||||
/// configure the database after applying the compute spec. Currently, it upgrades the neon extension
|
||||
/// version. In the future, it may upgrade all 3rd-party extensions.
|
||||
@@ -895,8 +1093,8 @@ impl ComputeNode {
|
||||
// `pg_ctl` for start / stop.
|
||||
#[instrument(skip_all)]
|
||||
fn pg_reload_conf(&self) -> Result<()> {
|
||||
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
|
||||
Command::new(pgctl_bin)
|
||||
let pgctl_bin = self.get_my_pg_binary("pg_ctl");
|
||||
Command::new(&pgctl_bin)
|
||||
.args(["reload", "-D", &self.pgdata])
|
||||
.output()
|
||||
.expect("cannot run pg_ctl process");
|
||||
@@ -980,43 +1178,25 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepares the compute for Postgres operations, including downloading
|
||||
/// remote extensions and preparing the pgdata directory.
|
||||
///
|
||||
/// The caller must hold the state mutex.
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_compute(
|
||||
&self,
|
||||
extension_server_port: u16,
|
||||
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
pub async fn prepare_compute(&self) -> Result<()> {
|
||||
let state = self.state.lock().unwrap().clone();
|
||||
let pspec = state.pspec.as_ref().expect("spec must be set");
|
||||
|
||||
info!(
|
||||
"starting compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
"preparing compute for project {}, operation {}, tenant {}, timeline {}",
|
||||
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
|
||||
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
|
||||
pspec.tenant_id,
|
||||
pspec.timeline_id,
|
||||
);
|
||||
|
||||
// tune pgbouncer
|
||||
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
|
||||
info!("tuning pgbouncer");
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create rt");
|
||||
|
||||
// Spawn a thread to do the tuning,
|
||||
// so that we don't block the main thread that starts Postgres.
|
||||
let pgbouncer_settings = pgbouncer_settings.clone();
|
||||
let _handle = thread::spawn(move || {
|
||||
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
|
||||
if let Err(err) = res {
|
||||
error!("error while tuning pgbouncer: {err:?}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
info!(
|
||||
"start_compute spec.remote_extensions {:?}",
|
||||
"prepare_compute spec.remote_extensions {:?}",
|
||||
pspec.spec.remote_extensions
|
||||
);
|
||||
|
||||
@@ -1024,7 +1204,8 @@ impl ComputeNode {
|
||||
// remote shared_preload_libraries before postgres start (if any)
|
||||
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
|
||||
// First, create control files for all availale extensions
|
||||
extension_server::create_control_files(remote_extensions, &self.pgbin);
|
||||
let postgres_bin = self.get_my_pg_binary("postgres");
|
||||
extension_server::create_control_files(remote_extensions, &postgres_bin);
|
||||
|
||||
let library_load_start_time = Utc::now();
|
||||
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
|
||||
@@ -1046,7 +1227,17 @@ impl ComputeNode {
|
||||
info!("{:?}", remote_ext_metrics);
|
||||
}
|
||||
|
||||
self.prepare_pgdata(&compute_state, extension_server_port)?;
|
||||
self.prepare_pgdata(&state).await?;
|
||||
|
||||
self.set_status(ComputeStatus::Prepared);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
|
||||
let compute_state = self.state.lock().unwrap().clone();
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
|
||||
let start_time = Utc::now();
|
||||
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
|
||||
@@ -1152,9 +1343,11 @@ impl ComputeNode {
|
||||
core_path.display()
|
||||
);
|
||||
|
||||
let postgres_bin = self.get_my_pg_binary("postgres");
|
||||
|
||||
// Try first with gdb
|
||||
let backtrace = Command::new("gdb")
|
||||
.args(["--batch", "-q", "-ex", "bt", &self.pgbin])
|
||||
.args(["--batch", "-q", "-ex", "bt", &postgres_bin])
|
||||
.arg(&core_path)
|
||||
.output();
|
||||
|
||||
@@ -1287,11 +1480,12 @@ LIMIT 100",
|
||||
// then we try to download it here
|
||||
info!("downloading new extension {ext_archive_name}");
|
||||
|
||||
let postgres_bin = self.get_my_pg_binary("postgres");
|
||||
let download_size = extension_server::download_extension(
|
||||
&real_ext_name,
|
||||
&ext_path,
|
||||
ext_remote_storage,
|
||||
&self.pgbin,
|
||||
&postgres_bin,
|
||||
)
|
||||
.await
|
||||
.map_err(DownloadError::Other);
|
||||
|
||||
@@ -74,7 +74,7 @@ pub fn write_postgres_conf(
|
||||
}
|
||||
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Primary | ComputeMode::Upgrade => {}
|
||||
ComputeMode::Static(lsn) => {
|
||||
// hot_standby is 'on' by default, but let's be explicit
|
||||
writeln!(file, "hot_standby=on")?;
|
||||
|
||||
@@ -75,7 +75,6 @@ use anyhow::Result;
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use compute_api::spec::RemoteExtSpec;
|
||||
use regex::Regex;
|
||||
use remote_storage::*;
|
||||
use reqwest::StatusCode;
|
||||
use std::path::Path;
|
||||
@@ -103,34 +102,6 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
pub fn get_pg_version(pgbin: &str) -> String {
|
||||
// pg_config --version returns a (platform specific) human readable string
|
||||
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
|
||||
let human_version = get_pg_config("--version", pgbin);
|
||||
return parse_pg_version(&human_version).to_string();
|
||||
}
|
||||
|
||||
fn parse_pg_version(human_version: &str) -> &str {
|
||||
// Normal releases have version strings like "PostgreSQL 15.4". But there
|
||||
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
|
||||
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
|
||||
// configure option, you can tack any string to the version number,
|
||||
// e.g. "PostgreSQL 15.4foobar".
|
||||
match Regex::new(r"^PostgreSQL (?<major>\d+).+")
|
||||
.unwrap()
|
||||
.captures(human_version)
|
||||
{
|
||||
Some(captures) if captures.len() == 2 => match &captures["major"] {
|
||||
"14" => return "v14",
|
||||
"15" => return "v15",
|
||||
"16" => return "v16",
|
||||
_ => {}
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
panic!("Unsuported postgres version {human_version}");
|
||||
}
|
||||
|
||||
// download the archive for a given extension,
|
||||
// unzip it, and place files in the appropriate locations (share/lib)
|
||||
pub async fn download_extension(
|
||||
@@ -255,42 +226,3 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::parse_pg_version;
|
||||
|
||||
#[test]
|
||||
fn test_parse_pg_version() {
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
|
||||
assert_eq!(
|
||||
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
|
||||
"v15"
|
||||
);
|
||||
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
|
||||
assert_eq!(
|
||||
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
|
||||
"v14"
|
||||
);
|
||||
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
|
||||
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_parse_pg_unsupported_version() {
|
||||
parse_pg_version("PostgreSQL 13.14");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_parse_pg_incorrect_version_format() {
|
||||
parse_pg_version("PostgreSQL 14");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::catalog::{get_database_schema, get_dbs_and_roles};
|
||||
use crate::compute::forward_termination_signal;
|
||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::requests::UpgradeRequest;
|
||||
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
|
||||
|
||||
use anyhow::Result;
|
||||
@@ -137,6 +138,20 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::POST, "/upgrade") => {
|
||||
info!("serving /upgrade POST request");
|
||||
match handle_upgrade_request(req, compute).await {
|
||||
Ok(_) => Response::builder()
|
||||
.status(StatusCode::ACCEPTED)
|
||||
.body(Body::from("Starting upgrade"))
|
||||
.unwrap(),
|
||||
Err((e, status)) => {
|
||||
error!("error handling /upgrade request: {e}");
|
||||
render_json_error(&format!("{}", e), status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(&Method::GET, "/dbs_and_roles") => {
|
||||
info!("serving /dbs_and_roles GET request",);
|
||||
match get_dbs_and_roles(compute).await {
|
||||
@@ -397,6 +412,59 @@ async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (Str
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_upgrade_request(
|
||||
req: Request<Body>,
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<(), (anyhow::Error, StatusCode)> {
|
||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let body_str = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||
|
||||
let body = match serde_json::from_str::<UpgradeRequest>(&body_str) {
|
||||
Ok(r) => r,
|
||||
Err(e) => return Err((Into::into(e), StatusCode::BAD_REQUEST)),
|
||||
};
|
||||
|
||||
// No sense in trying to upgrade to the same version.
|
||||
let curr_version = compute.pgversion.clone();
|
||||
let new_version = body.pg_version;
|
||||
if curr_version == new_version {
|
||||
return Err((
|
||||
anyhow::anyhow!("cannot upgrade endpoint to the same version"),
|
||||
StatusCode::UNPROCESSABLE_ENTITY,
|
||||
));
|
||||
}
|
||||
|
||||
// Check that we are in the running state before trying to upgrade.
|
||||
match compute.get_status() {
|
||||
ComputeStatus::Prepared => (),
|
||||
ComputeStatus::Upgrading => {
|
||||
return Err((
|
||||
anyhow::anyhow!("upgrade already in progress"),
|
||||
StatusCode::CONFLICT,
|
||||
));
|
||||
}
|
||||
_ => {
|
||||
return Err((
|
||||
anyhow::anyhow!("expected compute to be in the prepared state"),
|
||||
StatusCode::CONFLICT,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
compute.set_status(ComputeStatus::Upgrading);
|
||||
|
||||
let c = compute.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = c.upgrade(&new_version).await {
|
||||
error!("Failed to upgrade database: {}", e);
|
||||
}
|
||||
|
||||
c.set_status(ComputeStatus::Running);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
|
||||
#[tokio::main]
|
||||
async fn serve(port: u16, state: Arc<ComputeNode>) {
|
||||
|
||||
@@ -212,6 +212,21 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
/upgrade:
|
||||
post:
|
||||
tags:
|
||||
- Upgrade
|
||||
summary: Upgrade project to another Postgres major version.
|
||||
operationId: upgradePostgres
|
||||
responses:
|
||||
202:
|
||||
description: Upgrade request in progress.
|
||||
409:
|
||||
description: Upgrade already in progress.
|
||||
422:
|
||||
description: Upgrade request could not be processed.
|
||||
500:
|
||||
description: Upgrade request failed.
|
||||
|
||||
/terminate:
|
||||
post:
|
||||
|
||||
@@ -798,14 +798,19 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
.get_one::<bool>("hot-standby")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let upgrade_only = sub_args
|
||||
.get_one::<bool>("upgrade-only")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
let allow_multiple = sub_args.get_flag("allow-multiple");
|
||||
|
||||
let mode = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => ComputeMode::Static(lsn),
|
||||
(None, true) => ComputeMode::Replica,
|
||||
(None, false) => ComputeMode::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
let mode = match (lsn, hot_standby, upgrade_only) {
|
||||
(Some(lsn), false, false) => ComputeMode::Static(lsn),
|
||||
(None, true, false) => ComputeMode::Replica,
|
||||
(None, false, false) => ComputeMode::Primary,
|
||||
(None, false, true) => ComputeMode::Upgrade,
|
||||
// Seeing this message means we aren't setting conflicts_with on clap arguments.
|
||||
_ => anyhow::bail!("Invalid command line invocation"),
|
||||
};
|
||||
|
||||
match (mode, hot_standby) {
|
||||
@@ -1501,7 +1506,8 @@ fn cli() -> Command {
|
||||
let lsn_arg = Arg::new("lsn")
|
||||
.long("lsn")
|
||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||
.required(false);
|
||||
.required(false)
|
||||
.conflicts_with("hot-standby");
|
||||
|
||||
let hot_standby_arg = Arg::new("hot-standby")
|
||||
.value_parser(value_parser!(bool))
|
||||
@@ -1718,6 +1724,18 @@ fn cli() -> Command {
|
||||
.arg(hot_standby_arg.clone())
|
||||
.arg(update_catalog)
|
||||
.arg(allow_multiple.clone())
|
||||
.arg(
|
||||
Arg::new("upgrade-only")
|
||||
.help("Mark this compute as an upgrade compute")
|
||||
.long("upgrade-only")
|
||||
.action(ArgAction::SetTrue)
|
||||
.conflicts_with_all(&[
|
||||
"config-only",
|
||||
"hot-standby",
|
||||
// Perhaps we could offer upgrades at a specific LSN in the future.
|
||||
"lsn",
|
||||
])
|
||||
)
|
||||
)
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
|
||||
@@ -182,6 +182,7 @@ impl ComputeControlPlane {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<()> {
|
||||
// TODO: It really feels like I need to do some protection here
|
||||
if matches!(mode, ComputeMode::Primary) {
|
||||
// this check is not complete, as you could have a concurrent attempt at
|
||||
// creating another primary, both reading the state before checking it here,
|
||||
@@ -393,6 +394,7 @@ impl Endpoint {
|
||||
conf.append("recovery_prefetch", "off");
|
||||
}
|
||||
}
|
||||
ComputeMode::Upgrade => {}
|
||||
}
|
||||
|
||||
Ok(conf)
|
||||
@@ -502,7 +504,7 @@ impl Endpoint {
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
if matches!(self.mode, ComputeMode::Primary | ComputeMode::Upgrade) {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
@@ -624,13 +626,16 @@ impl Endpoint {
|
||||
self.endpoint_path().join("spec.json").to_str().unwrap(),
|
||||
])
|
||||
.args([
|
||||
"--pgbin",
|
||||
self.env
|
||||
.pg_bin_dir(self.pg_version)?
|
||||
.join("postgres")
|
||||
.to_str()
|
||||
"--pgroot",
|
||||
&self
|
||||
.env
|
||||
.pg_distrib_dir
|
||||
.clone()
|
||||
.into_os_string()
|
||||
.into_string()
|
||||
.unwrap(),
|
||||
])
|
||||
.args(["--pgversion", &self.pg_version.to_string()])
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stderr(logfile.try_clone()?)
|
||||
.stdout(logfile);
|
||||
@@ -674,7 +679,7 @@ impl Endpoint {
|
||||
}
|
||||
// keep retrying
|
||||
}
|
||||
ComputeStatus::Running => {
|
||||
ComputeStatus::Running | ComputeStatus::Prepared => {
|
||||
// All good!
|
||||
break;
|
||||
}
|
||||
@@ -688,6 +693,7 @@ impl Endpoint {
|
||||
);
|
||||
}
|
||||
ComputeStatus::Empty
|
||||
| ComputeStatus::Upgrading
|
||||
| ComputeStatus::ConfigurationPending
|
||||
| ComputeStatus::Configuration
|
||||
| ComputeStatus::TerminationPending
|
||||
|
||||
@@ -12,3 +12,9 @@ use serde::Deserialize;
|
||||
pub struct ConfigurationRequest {
|
||||
pub spec: ComputeSpec,
|
||||
}
|
||||
|
||||
/// Request body of the /upgrade API
|
||||
#[derive(Deserialize, Debug)]
|
||||
pub struct UpgradeRequest {
|
||||
pub pg_version: String,
|
||||
}
|
||||
|
||||
@@ -44,10 +44,15 @@ pub enum ComputeStatus {
|
||||
// Compute node has spec and initial startup and
|
||||
// configuration is in progress.
|
||||
Init,
|
||||
// Compute has been prepared, meaning that remote extensions have been
|
||||
// downloaded and the data directory has been prepared.
|
||||
Prepared,
|
||||
// Compute is configured and running.
|
||||
Running,
|
||||
// New spec is being applied.
|
||||
Configuration,
|
||||
// Compute is upgrading Postgres.
|
||||
Upgrading,
|
||||
// Either startup or configuration failed,
|
||||
// compute will exit soon or is waiting for
|
||||
// control-plane to terminate it.
|
||||
|
||||
@@ -199,6 +199,11 @@ pub enum ComputeMode {
|
||||
/// Future versions may want to distinguish between replicas with hot standby
|
||||
/// feedback and other kinds of replication configurations.
|
||||
Replica,
|
||||
/// An upgrade-only node
|
||||
///
|
||||
/// This node will not accept remote Postgres connections. It's only
|
||||
/// purpose is to upgrade a timeline.
|
||||
Upgrade,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
|
||||
|
||||
@@ -167,7 +167,16 @@ pub const RM_RELMAP_ID: u8 = 7;
|
||||
pub const RM_STANDBY_ID: u8 = 8;
|
||||
pub const RM_HEAP2_ID: u8 = 9;
|
||||
pub const RM_HEAP_ID: u8 = 10;
|
||||
pub const RM_BTREE_ID: u8 = 11;
|
||||
pub const RM_HASH_ID: u8 = 12;
|
||||
pub const RM_GIN_ID: u8 = 13;
|
||||
pub const RM_GIST_ID: u8 = 14;
|
||||
pub const RM_SEQ_ID: u8 = 15;
|
||||
pub const RM_SPGIST_ID: u8 = 16;
|
||||
pub const RM_BRIN_ID: u8 = 17;
|
||||
pub const RM_COMMIT_TS_ID: u8 = 18;
|
||||
pub const RM_REPLORIGIN_ID: u8 = 19;
|
||||
pub const RM_GENERIC_ID: u8 = 20;
|
||||
pub const RM_LOGICALMSG_ID: u8 = 21;
|
||||
|
||||
// from neon_rmgr.h
|
||||
@@ -181,9 +190,27 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20;
|
||||
pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30;
|
||||
pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40;
|
||||
pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50;
|
||||
pub const XLOG_NEON_FILE: u8 = 0x60;
|
||||
|
||||
pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40;
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub enum XlNeonFileFiletype {
|
||||
UPGRADE_TARBALL,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for XlNeonFileFiletype {
|
||||
type Error = ();
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
match value {
|
||||
0 => Ok(XlNeonFileFiletype::UPGRADE_TARBALL),
|
||||
_ => Err(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// from xlogreader.h
|
||||
pub const XLR_INFO_MASK: u8 = 0x0F;
|
||||
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
|
||||
|
||||
@@ -506,6 +506,10 @@ async fn import_file(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if file_name == "pg_internal.init" {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if file_path.starts_with("global") {
|
||||
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
|
||||
let dbnode = 0;
|
||||
|
||||
@@ -14,7 +14,7 @@ mod walreceiver;
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use arc_swap::ArcSwap;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
@@ -4357,6 +4357,10 @@ impl Timeline {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn get_path(&self) -> Utf8PathBuf {
|
||||
self.conf.timelines_path(&self.tenant_shard_id)
|
||||
}
|
||||
|
||||
/// Detach this timeline from its ancestor by copying all of ancestors layers as this
|
||||
/// Timelines layers up to the ancestor_lsn.
|
||||
///
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
|
||||
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
@@ -33,8 +34,10 @@ use bytes::{Buf, Bytes, BytesMut};
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use utils::zstd::extract_zst_tarball;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::import_datadir;
|
||||
use crate::metrics::WAL_INGEST;
|
||||
use crate::pgdatadir_mapping::{DatadirModification, Version};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
@@ -69,7 +72,9 @@ impl CheckPoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WalIngest {
|
||||
pub struct WalIngest<'t> {
|
||||
timeline: &'t Timeline,
|
||||
timeline_path: Utf8PathBuf,
|
||||
shard: ShardIdentity,
|
||||
checkpoint: CheckPoint,
|
||||
checkpoint_modified: bool,
|
||||
@@ -82,12 +87,12 @@ struct WarnIngestLag {
|
||||
timestamp_invalid_msg_ratelimit: RateLimit,
|
||||
}
|
||||
|
||||
impl WalIngest {
|
||||
impl<'t> WalIngest<'t> {
|
||||
pub async fn new(
|
||||
timeline: &Timeline,
|
||||
timeline: &'t Timeline,
|
||||
startpoint: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<WalIngest> {
|
||||
) -> anyhow::Result<WalIngest<'t>> {
|
||||
// Fetch the latest checkpoint into memory, so that we can compare with it
|
||||
// quickly in `ingest_record` and update it when it changes.
|
||||
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
|
||||
@@ -100,6 +105,8 @@ impl WalIngest {
|
||||
});
|
||||
|
||||
Ok(WalIngest {
|
||||
timeline,
|
||||
timeline_path: timeline.get_path(),
|
||||
shard: *timeline.get_shard_identity(),
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
@@ -458,6 +465,17 @@ impl WalIngest {
|
||||
modification.drop_replorigin(xlrec.node_id).await?
|
||||
}
|
||||
}
|
||||
pg_constants::RM_BTREE_ID
|
||||
| pg_constants::RM_HASH_ID
|
||||
| pg_constants::RM_GIN_ID
|
||||
| pg_constants::RM_GIST_ID
|
||||
| pg_constants::RM_SEQ_ID
|
||||
| pg_constants::RM_SPGIST_ID
|
||||
| pg_constants::RM_BRIN_ID
|
||||
| pg_constants::RM_COMMIT_TS_ID
|
||||
| pg_constants::RM_GENERIC_ID => {
|
||||
// No special handling currently for these resource managers
|
||||
}
|
||||
_x => {
|
||||
// TODO: should probably log & fail here instead of blindly
|
||||
// doing something without understanding the protocol
|
||||
@@ -923,6 +941,33 @@ impl WalIngest {
|
||||
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
|
||||
|
||||
match pg_version {
|
||||
15 => {
|
||||
let info = decoded.xl_info;
|
||||
|
||||
match info {
|
||||
pg_constants::XLOG_NEON_FILE => {
|
||||
info!(
|
||||
"tristan: last_record_lsn={}",
|
||||
self.timeline.get_last_record_lsn()
|
||||
);
|
||||
let xlrec = v16::rm_neon::XlNeonFile::decode(buf);
|
||||
let pgdata_path = self.timeline_path.join("new-pgdata");
|
||||
|
||||
extract_zst_tarball(&pgdata_path, &*xlrec.data).await?;
|
||||
|
||||
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?;
|
||||
info!("LSN from pg_upgraded controlfile: {lsn}");
|
||||
Box::pin(import_datadir::import_timeline_from_postgres_datadir(
|
||||
self.timeline,
|
||||
&pgdata_path,
|
||||
lsn,
|
||||
ctx,
|
||||
))
|
||||
.await?;
|
||||
}
|
||||
_ => return Err(anyhow::anyhow!("Unknown XLOG xl_info field: {}", info)),
|
||||
}
|
||||
}
|
||||
16 => {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;
|
||||
|
||||
|
||||
@@ -526,6 +526,29 @@ pub mod v16 {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Debug)]
|
||||
pub struct XlNeonFile {
|
||||
pub filetype: u8,
|
||||
pub size: u32,
|
||||
pub data: Bytes,
|
||||
}
|
||||
|
||||
impl XlNeonFile {
|
||||
pub fn decode(buf: &mut Bytes) -> Self {
|
||||
let filetype = buf.get_u8();
|
||||
// Skip the padding
|
||||
buf.advance(std::mem::size_of::<u8>() * 3);
|
||||
let size = buf.get_u32_le();
|
||||
|
||||
Self {
|
||||
filetype,
|
||||
size,
|
||||
data: buf.copy_to_bytes(buf.remaining()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
|
||||
DATA = \
|
||||
neon--1.0.sql \
|
||||
neon--1.0--1.1.sql \
|
||||
neon--1.1--1.2.sql \
|
||||
neon--1.2--1.3.sql \
|
||||
neon--1.3--1.4.sql \
|
||||
neon--1.4--1.5.sql \
|
||||
neon--1.5--1.4.sql \
|
||||
neon--1.4--1.3.sql \
|
||||
neon--1.3--1.2.sql \
|
||||
neon--1.2--1.1.sql \
|
||||
neon--1.1--1.0.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
@@ -7,3 +7,7 @@ LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;
|
||||
|
||||
CREATE FUNCTION wal_log_file(path text)
|
||||
RETURNS pg_lsn
|
||||
AS 'MODULE_PATHNAME', 'wal_log_file'
|
||||
LANGUAGE C STRICT PARALLEL UNSAFE;
|
||||
|
||||
8
pgxn/neon/neon--1.4--1.5.sql
Normal file
8
pgxn/neon/neon--1.4--1.5.sql
Normal file
@@ -0,0 +1,8 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.5'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION wal_log_file(path text)
|
||||
RETURNS pg_lsn
|
||||
AS 'MODULE_PATHNAME', 'wal_log_file'
|
||||
LANGUAGE C STRICT PARALLEL UNSAFE;
|
||||
|
||||
GRANT EXECUTE ON FUNCTION wal_log_file TO pg_monitor;
|
||||
1
pgxn/neon/neon--1.5--1.4.sql
Normal file
1
pgxn/neon/neon--1.5--1.4.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP FUNCTION IF EXISTS wal_log_file(text) CASCADE;
|
||||
217
pgxn/neon/neon.c
217
pgxn/neon/neon.c
@@ -11,6 +11,8 @@
|
||||
#include "postgres.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
|
||||
#include "miscadmin.h"
|
||||
#include "access/subtrans.h"
|
||||
#include "access/twophase.h"
|
||||
@@ -29,11 +31,19 @@
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "funcapi.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "access/xloginsert.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/guc_tables.h"
|
||||
#include "utils/timeout.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/proc.h"
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
#include "access/neon_xlog.h"
|
||||
#endif
|
||||
|
||||
#include "extension_server.h"
|
||||
#include "neon.h"
|
||||
@@ -629,10 +639,17 @@ ReportSearchPath(void)
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
const char *purpose;
|
||||
|
||||
purpose = getenv("NEON_PURPOSE");
|
||||
if (purpose && strcmp(purpose, "upgrade") == 0)
|
||||
return;
|
||||
|
||||
/*
|
||||
* Also load 'neon_rmgr'. This makes it unnecessary to list both 'neon'
|
||||
* and 'neon_rmgr' in shared_preload_libraries.
|
||||
*/
|
||||
|
||||
#if PG_VERSION_NUM >= 160000
|
||||
load_file("$libdir/neon_rmgr", false);
|
||||
#endif
|
||||
@@ -676,6 +693,9 @@ _PG_init(void)
|
||||
PG_FUNCTION_INFO_V1(pg_cluster_size);
|
||||
PG_FUNCTION_INFO_V1(backpressure_lsns);
|
||||
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
PG_FUNCTION_INFO_V1(wal_log_file);
|
||||
#endif
|
||||
|
||||
Datum
|
||||
pg_cluster_size(PG_FUNCTION_ARGS)
|
||||
@@ -721,3 +741,200 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PG_RETURN_UINT64(BackpressureThrottlingTime());
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
|
||||
Datum
|
||||
wal_log_file(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int rc;
|
||||
int fd;
|
||||
ssize_t n;
|
||||
text *path;
|
||||
size_t off;
|
||||
char *data;
|
||||
short nargs;
|
||||
struct stat st;
|
||||
XLogRecPtr lsn;
|
||||
size_t path_len;
|
||||
xl_neon_file xlrec;
|
||||
char file[MAXPGPATH];
|
||||
|
||||
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
|
||||
bool wal_debug;
|
||||
#endif
|
||||
|
||||
path = PG_GETARG_TEXT_PP(0);
|
||||
path_len = VARSIZE(path) - VARHDRSZ;
|
||||
|
||||
memcpy(file, VARDATA(path), path_len);
|
||||
file[path_len] = '\0';
|
||||
|
||||
/* Get the size of the file. Note that stat(2) follows symlinks. */
|
||||
rc = stat(file, &st);
|
||||
if (rc != 0)
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to get size of file (%s): %m", file)));
|
||||
|
||||
xlrec.size = (size_t) st.st_size;
|
||||
|
||||
fd = open(file, O_RDONLY);
|
||||
if (fd == -1)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not open %s: %m", file)));
|
||||
|
||||
/* If the file is too large, error out. */
|
||||
|
||||
data = palloc(xlrec.size);
|
||||
|
||||
/* Copy the file contents */
|
||||
off = 0;
|
||||
while (true) {
|
||||
n = read(fd, data + off, xlrec.size - off);
|
||||
if (n == EOF)
|
||||
{
|
||||
close(fd);
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to read %s: %m", file)));
|
||||
}
|
||||
|
||||
off += n;
|
||||
|
||||
if (xlrec.size - off == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
close(fd);
|
||||
|
||||
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
|
||||
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
|
||||
XLogRegisterData((char *) data, xlrec.size);
|
||||
|
||||
/*
|
||||
* We must turn debugging off on anything where the Neon RMGR is not
|
||||
* registered. Stash the original value for restoration later.
|
||||
*/
|
||||
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
|
||||
wal_debug = XLOG_DEBUG;
|
||||
XLOG_DEBUG = false;
|
||||
#endif
|
||||
|
||||
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
|
||||
|
||||
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
|
||||
XLOG_DEBUG = wal_debug;
|
||||
#endif
|
||||
|
||||
PG_RETURN_LSN(lsn);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 15
|
||||
|
||||
/*
|
||||
* Entry point for `postgres --wal-log`.
|
||||
*/
|
||||
PGDLLEXPORT void
|
||||
WalLog(int argc, char *argv[])
|
||||
{
|
||||
int rc;
|
||||
int fd;
|
||||
off_t off;
|
||||
struct stat st;
|
||||
XLogRecPtr lsn;
|
||||
void *data;
|
||||
xl_neon_file xlrec;
|
||||
/* TODO: should this be PATH_MAX? should we require an absolute path? */
|
||||
char file[MAXPGPATH];
|
||||
|
||||
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
|
||||
bool wal_debug;
|
||||
#endif
|
||||
|
||||
if (argc != 3)
|
||||
ereport(ERROR, errmsg("wrong number of arguments passed to --wal-log"));
|
||||
|
||||
if (!realpath(argv[2], file))
|
||||
ereport(ERROR, errmsg("failed to resolve path: %m"));
|
||||
|
||||
ereport(LOG, errmsg("writing %s to WAL", file));
|
||||
|
||||
ChangeToDataDir();
|
||||
CreateDataDirLockFile(false);
|
||||
LocalProcessControlFile(false);
|
||||
InitializeMaxBackends();
|
||||
CreateSharedMemoryAndSemaphores();
|
||||
InitializeTimeouts();
|
||||
InitProcess();
|
||||
BaseInit();
|
||||
CreateAuxProcessResourceOwner();
|
||||
StartupXLOG();
|
||||
|
||||
/* Get the size of the file. Note that stat(2) follows symlinks. */
|
||||
rc = stat(file, &st);
|
||||
if (rc != 0)
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to get size of file (%s): %m", file)));
|
||||
|
||||
xlrec.size = (size_t) st.st_size;
|
||||
|
||||
fd = open(file, O_RDONLY);
|
||||
if (fd == -1)
|
||||
ereport(ERROR,
|
||||
(errmsg("could not open %s: %m", file)));
|
||||
|
||||
/* If the file is too large, error out. */
|
||||
|
||||
data = palloc(xlrec.size);
|
||||
|
||||
/* Copy the file contents */
|
||||
off = 0;
|
||||
while (true) {
|
||||
ssize_t n;
|
||||
|
||||
n = read(fd, data + off, xlrec.size - off);
|
||||
if (n == EOF)
|
||||
{
|
||||
close(fd);
|
||||
ereport(ERROR,
|
||||
(errmsg("failed to read %s: %m", file)));
|
||||
}
|
||||
|
||||
off += n;
|
||||
|
||||
if (xlrec.size - off == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
close(fd);
|
||||
|
||||
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
|
||||
|
||||
/* ereport(LOG, errmsg("Current LSN: %X/%X" , LSN_FORMAT_ARGS(GetXLogWriteRecPtr()))); */
|
||||
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
|
||||
XLogRegisterData(data, xlrec.size);
|
||||
|
||||
/*
|
||||
* We must turn debugging off on anything where the Neon RMGR is not
|
||||
* registered. Stash the original value for restoration later.
|
||||
*/
|
||||
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
|
||||
wal_debug = XLOG_DEBUG;
|
||||
XLOG_DEBUG = false;
|
||||
#endif
|
||||
|
||||
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
|
||||
|
||||
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
|
||||
XLOG_DEBUG = wal_debug;
|
||||
#endif
|
||||
|
||||
exit(0);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@@ -78,11 +78,16 @@ neon_smgr_shmem_startup(void)
|
||||
if (prev_shmem_startup_hook)
|
||||
prev_shmem_startup_hook();
|
||||
|
||||
if (relsize_hash_size <= 0)
|
||||
return;
|
||||
|
||||
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
|
||||
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
|
||||
if (!found)
|
||||
{
|
||||
elog(LOG, "neon_relsize: %d", relsize_hash_size);
|
||||
relsize_lock = (LWLockId) GetNamedLWLockTranche("neon_relsize");
|
||||
elog(LOG, "neon_relsize");
|
||||
info.keysize = sizeof(RelTag);
|
||||
info.entrysize = sizeof(RelSizeEntry);
|
||||
relsize_hash = ShmemInitHash("neon_relsize",
|
||||
|
||||
@@ -76,6 +76,8 @@ neon_rm_redo(XLogReaderState *record)
|
||||
case XLOG_NEON_HEAP_MULTI_INSERT:
|
||||
redo_neon_heap_multi_insert(record);
|
||||
break;
|
||||
case XLOG_NEON_FILE:
|
||||
break;
|
||||
default:
|
||||
elog(PANIC, "neon_rm_redo: unknown op code %u", info);
|
||||
}
|
||||
|
||||
@@ -57,6 +57,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
|
||||
if (SnapBuildProcessChange(builder, xid, buf->origptr))
|
||||
DecodeNeonMultiInsert(ctx, buf);
|
||||
break;
|
||||
case XLOG_NEON_FILE:
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
|
||||
break;
|
||||
@@ -401,4 +403,4 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
#endif
|
||||
|
||||
@@ -134,6 +134,16 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record)
|
||||
xlrec->ntuples, &offset_elem_desc, NULL);
|
||||
}
|
||||
}
|
||||
else if (info == XLOG_NEON_FILE)
|
||||
{
|
||||
const xl_neon_file *xlrec = (xl_neon_file *) rec;
|
||||
switch ((xl_neon_file_filetype) xlrec->filetype)
|
||||
{
|
||||
case XL_NEON_FILE_UPGRADE_TARBALL:
|
||||
appendStringInfo(buf, "filetype: upgrade tarball, size: %zu", xlrec->size);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const char *
|
||||
@@ -173,6 +183,9 @@ neon_rm_identify(uint8 info)
|
||||
case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE:
|
||||
id = "MULTI_INSERT+INIT";
|
||||
break;
|
||||
case XLOG_NEON_FILE:
|
||||
id = "FILE";
|
||||
break;
|
||||
}
|
||||
|
||||
return id;
|
||||
|
||||
@@ -580,15 +580,15 @@ where
|
||||
* because safekeepers parse WAL headers and the format
|
||||
* may change between versions.
|
||||
*/
|
||||
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
|
||||
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
bail!(
|
||||
"incompatible server version {}, expected {}",
|
||||
msg.pg_version,
|
||||
self.state.server.pg_version
|
||||
);
|
||||
}
|
||||
// if msg.pg_version / 10000 != self.state.server.pg_version / 10000
|
||||
// && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
// {
|
||||
// bail!(
|
||||
// "incompatible server version {}, expected {}",
|
||||
// msg.pg_version,
|
||||
// self.state.server.pg_version
|
||||
// );
|
||||
// }
|
||||
|
||||
if msg.tenant_id != self.state.tenant_id {
|
||||
bail!(
|
||||
|
||||
12
test.sh
Executable file
12
test.sh
Executable file
@@ -0,0 +1,12 @@
|
||||
#!/bin/sh
|
||||
|
||||
cargo neon endpoint stop ep-main 2>/dev/null
|
||||
kill $(pgrep compute_ctl)
|
||||
cargo neon stop 2>/dev/null
|
||||
rm -rf .neon
|
||||
cargo neon init
|
||||
cargo neon start
|
||||
cargo neon tenant create --pg-version 15 --set-default
|
||||
cargo neon endpoint create --pg-version 15 --upgrade-only
|
||||
cargo neon endpoint start ep-main
|
||||
curl -i -X POST http://localhost:55433/upgrade -d '{"pg_version": "16"}'
|
||||
54
test_runner/regress/test_upgrade.py
Normal file
54
test_runner/regress/test_upgrade.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from time import sleep
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
def test_upgrade(pg_version: PgVersion, neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
upgrade_to: PgVersion
|
||||
if pg_version == PgVersion.V14:
|
||||
upgrade_to = PgVersion.V15
|
||||
elif pg_version == PgVersion.V15:
|
||||
upgrade_to = PgVersion.V16
|
||||
else:
|
||||
pytest.skip("Nothing to upgrade")
|
||||
|
||||
env.neon_cli.create_timeline("test_upgrade")
|
||||
endpoint = env.endpoints.create_start("test_upgrade")
|
||||
resp = requests.post(
|
||||
f"http://localhost:{endpoint.http_port}/upgrade",
|
||||
json={
|
||||
"pg_version": upgrade_to,
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 202
|
||||
|
||||
while True:
|
||||
resp = requests.get(f"http://localhost:{endpoint.http_port}/status")
|
||||
assert resp.status_code == 200
|
||||
|
||||
data = resp.json()
|
||||
if data["status"] == "upgrading":
|
||||
sleep(1)
|
||||
continue
|
||||
elif data["status"] == "running":
|
||||
break
|
||||
else:
|
||||
pytest.fail(f"Unexpected compute state during upgrade: {data['status']}")
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
|
||||
|
||||
def test_upgrade_bad_request(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_timeline("test_upgrade_bad_request")
|
||||
endpoint = env.endpoints.create_start("test_upgrade_bad_request")
|
||||
resp = requests.post(f"http://localhost:{endpoint.http_port}/upgrade")
|
||||
assert resp.status_code == 400
|
||||
|
||||
# TODO: Use postgres versions that are out of range.
|
||||
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 6f6d77fb59...732b361914
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 0baa7346df...3f8405a0d1
Reference in New Issue
Block a user