Call pg_upgrade from pageserver.

This relies on the fact that we use neon_local and uses hardcoded neon_local paths
This commit is contained in:
Anastasia Lubennikova
2024-09-12 22:19:22 +01:00
parent d6ae925739
commit 6736557ea6
4 changed files with 83 additions and 30 deletions

View File

@@ -652,11 +652,11 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.ok_or_else(|| {
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
let pg_version = branch_match
.get_one::<u32>("pg-version")
.copied()
.context("Failed to parse postgres version from the argument string")?;
.get_one::<u32>("pg-version")
.copied()
.context("Failed to parse postgres version from the argument string")?;
let start_lsn = branch_match
.get_one::<String>("ancestor-start-lsn")

View File

@@ -506,6 +506,13 @@ async fn import_file(
return Ok(None);
}
if file_name == "pg_internal.init" {
// tar archives on macOs, created without COPYFILE_DISABLE=1 env var
// will contain "fork files", skip them.
info!("skipping pg_internal.init");
return Ok(None);
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;

View File

@@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::process::Stdio;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
@@ -48,6 +49,7 @@ use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
use utils::fs_ext;
use utils::id::TenantId;
use utils::pausable_failpoint;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
@@ -84,6 +86,7 @@ use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
};
use crate::pgdatadir_mapping;
use crate::repository::GcResult;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
@@ -1733,10 +1736,7 @@ impl Tenant {
ancestor_timeline.pg_version,
)
.await?
}
}
None => {
self.bootstrap_timeline(
@@ -3250,8 +3250,15 @@ impl Tenant {
ctx: &RequestContext,
pg_version: u32,
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx, pg_version)
.await
self.branch_timeline_impl(
src_timeline,
dst_id,
start_lsn,
timeline_create_guard,
ctx,
pg_version,
)
.await
}
async fn branch_timeline_impl(
@@ -3351,7 +3358,6 @@ impl Tenant {
)
.await?;
if pg_version != src_timeline.pg_version {
info!(
"branching timeline {dst_id} from timeline {src_id} with different pg_version: {pg_version}",
@@ -3379,12 +3385,28 @@ impl Tenant {
}
}
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await.with_context(|| {
format!(
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel)
.await
.with_context(|| {
format!(
"Failed to initdb {timeline_id} with pg_version {pg_version} at {pgdata_path}"
)
})?;
})?;
run_pg_upgrade(
self.conf,
&pgdata_path,
src_timeline.pg_version,
pg_version,
src_timeline.timeline_id,
self.tenant_shard_id.tenant_id,
start_lsn,
&self.cancel,
).await.with_context(|| {
format!(
"Failed to pg_upgrade {timeline_id} with pg_version {pg_version} at {pgdata_path}"
)
})?;
// TODO
// do pg_upgrade bits here
// Rust is not the most convenient for writing this,
@@ -3399,9 +3421,10 @@ impl Tenant {
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
// TODO lsn gap is probably ok here
//assert!(pgdata_lsn >= start_lsn + 1);
// TODO Do we need to adjust something else?
// Or should it be just start_lsn as it is?
let pgdata_lsn = (start_lsn + 1).align();
assert!(pgdata_lsn.is_aligned());
// TODO
// We must have start_lsn+1 == pgdata_lsn
// Set it somehow
@@ -3409,13 +3432,13 @@ impl Tenant {
// TODO why do we need these lines?
let tenant_shard_id = uninitialized_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = uninitialized_timeline.raw_timeline()?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
@@ -3426,8 +3449,7 @@ impl Tenant {
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;
unfinished_timeline
.freeze_and_flush()
.await
@@ -4002,41 +4024,62 @@ async fn run_initdb(
Ok(())
}
/// Run pg_upgrade from the old cluster to the new cluster.
async fn run_pg_upgrade(
conf: &'static PageServerConf,
old_pgdata: &Utf8Path,
new_pgdata: &Utf8Path,
old_pg_version: u32,
new_pg_version: u32,
_parent_timeline_id: TimelineId,
_tenant_id: TenantId,
_start_lsn: Lsn, // this is where we need to start compute for parent timeline to dump the data
cancel: &CancellationToken,
) -> Result<(), InitdbError> {
//let old_bin_path = conf.pg_bin_dir(old_pg_version).map_err(InitdbError::Other)?;
let pg_upgrade_bin_path = conf
.pg_bin_dir(new_pg_version)
.map_err(InitdbError::Other)?
.join("pg_upgrade");
let pg_upgrade_lib_dir = conf.pg_lib_dir(new_pg_version).map_err(InitdbError::Other)?;
let pg_upgrade_lib_dir = conf
.pg_lib_dir(new_pg_version)
.map_err(InitdbError::Other)?;
info!(
"running {} from {} to {} version {} -> {}",
pg_upgrade_bin_path, old_pgdata, new_pgdata,
old_pg_version, new_pg_version,
"running {} in pgdata {} from version {} to {}",
pg_upgrade_bin_path, new_pgdata, old_pg_version, new_pg_version,
);
let _permit = INIT_DB_SEMAPHORE.acquire().await;
// TODO
// start ad-hoc compute for parent timeline to connect and dump the data
// inspired by the script https://github.com/neondatabase/cloud/pull/17267/files
// and neon_local
// We test with neon_local, so let's hardcode it for now
let old_pgdata = "/home/ana/work/neon/.neon/endpoints/main/pgdata";
let pg_upgrade_command = tokio::process::Command::new(&pg_upgrade_bin_path)
.current_dir("/home/ana/work/neon/")
.args(["-b", "/home/ana/work/neon/pg_install/v15/bin/"])
.args(["-B", "/home/ana/work/neon/pg_install/v16/bin/"])
.args(["-d", old_pgdata.as_ref()])
.args(["-D", new_pgdata.as_ref()])
.args(["--username", &conf.superuser])
.args(["--neon_start", "cargo neon endpoint start main"])
.args(["--neon_stop", "cargo neon endpoint start main"])
.args(["--socketdir", "/tmp"])
.args([
"--neon_start",
"/home/ana/.cargo/bin/cargo neon endpoint start main",
])
.args([
"--neon_stop",
"/home/ana/.cargo/bin/cargo neon endpoint stop main",
])
.env_clear()
.env("LD_LIBRARY_PATH", &pg_upgrade_lib_dir)
.env("DYLD_LIBRARY_PATH", &pg_upgrade_lib_dir)
.env("PGPORTOLD", "55432")
.env("PGPORTNEW", "55433")
.stdin(std::process::Stdio::null())
// stdout invocation produces the same output every time, we don't need it
.stdout(std::process::Stdio::null())
@@ -4044,6 +4087,9 @@ async fn run_pg_upgrade(
.stderr(std::process::Stdio::piped())
.spawn()?;
// print pg_upgrade_command
info!("{:?}", pg_upgrade_command);
// Ideally we'd select here with the cancellation token, but the problem is that
// we can't safely terminate initdb: it launches processes of its own, and killing
// initdb doesn't kill them. After we return from this function, we want the target