Compare commits

...

20 Commits

Author SHA1 Message Date
BodoBolero
ea3a87105c demo scripts 2024-09-13 07:57:07 +01:00
Anastasia Lubennikova
33ad0dc177 Choose the max(branching_lsn, after_pg_upgrade_lsn) to import new timeline data.
This fixed xlog flush request %X/%X is not satisfied error.
See comments in the branch_timeline_impl().
2024-09-13 04:24:46 +01:00
Anastasia Lubennikova
168a6a87d7 Fix wal-redo unwrap() failure for timeline chains that have 0 records in some timeline 2024-09-13 03:31:54 +01:00
Anastasia Lubennikova
11e77815af do not import user files after pg_upgrade 2024-09-13 01:38:09 +01:00
Anastasia Lubennikova
39849cd1ff Cleanup run_pg_upgrade hardcoded path 2024-09-12 23:28:19 +01:00
Anastasia Lubennikova
87de28bc62 Fix pg_control Checkpoint in a new data directory
before importing it into the timeline. It must be equal to branching LSN.
This version passes few more steps
cargo neon timeline branch --tenant-id 14719455a7fbf1d257f427377d096cc2 --pg-version 16 --branch-name branch_16
cargo neon endpoint create ep_16 --pg-version 16 --tenant-id 14719455a7fbf1d257f427377d096cc2  --branch-name branch_16
cargo neon endpoint start ep_16

and if we connect to new endpoint, the version is correct and tables are there. But data is not visible for some reason
2024-09-12 23:26:43 +01:00
Anastasia Lubennikova
6736557ea6 Call pg_upgrade from pageserver.
This relies on the fact that we use neon_local and uses hardcoded neon_local paths
2024-09-12 22:19:22 +01:00
Anastasia Lubennikova
d6ae925739 Add pg-version argument to neon_local timeline create. Add some dummy code that runs initdb on a new brunch with a new version and tries to import it back to pageserver.
This version fails on pageserver assertion ' cannot modify relation after advancing last_record_lsn (incoming_lsn=0/14F3030, last_record_lsn=0/2225360)'

To test, create v15 tenant and try to branch using v16:
cargo neon timeline branch --tenant-id $TENANT_ID --pg-version 16 --branch-name branch_16

TODO: figure out what LSN to update
2024-09-12 18:57:35 +01:00
Joonas Koivunen
133745c005 complete the multiple walredo processes 2024-09-12 15:09:34 +00:00
BodoBolero
c62f1cc87f Merge branch 'hackathon/single_click_pg_upgrade' of https://github.com/neondatabase/neon into hackathon/single_click_pg_upgrade 2024-09-12 15:15:27 +01:00
BodoBolero
ae263e5adf branching in tenant rs depending if new version is higher 2024-09-12 15:15:14 +01:00
Joonas Koivunen
31ca007fb3 walredo process per pg_version handling 2024-09-12 13:33:40 +00:00
Joonas Koivunen
7b6a888c24 run correct pg_version walredo against the walrecords 2024-09-12 13:31:42 +00:00
Joonas Koivunen
08705d1b8c chore: remove extra clone 2024-09-12 13:31:42 +00:00
Joonas Koivunen
2cc0b392e8 chore: remove extra method 2024-09-12 13:31:42 +00:00
Joonas Koivunen
60169ad59d chore: missed formatting 2024-09-12 13:31:42 +00:00
Anastasia Lubennikova
ee2a6bad93 Bump vendor/postgres-v16. Disable file transfer for neon pg_upgrade 2024-09-12 14:16:30 +01:00
Anastasia Lubennikova
e9525d1f52 Bump vendor/postgres-v16. It has pg_upgrade changes.
With them pg_upgrade v15 ->v16 --check passes, if all arguments are set. See slack for more details
2024-09-12 14:04:15 +01:00
BodoBolero
e757bc9469 cargo fmt 2024-09-08 09:08:05 +02:00
BodoBolero
7089e34070 add pg_version to create branch trace in pageserver to test custom storage image in local tilt setup 2024-09-08 08:36:37 +02:00
10 changed files with 667 additions and 106 deletions

View File

@@ -653,6 +653,11 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
})?;
let pg_version = branch_match
.get_one::<u32>("pg-version")
.copied()
.context("Failed to parse postgres version from the argument string")?;
let start_lsn = branch_match
.get_one::<String>("ancestor-start-lsn")
.map(|lsn_str| Lsn::from_str(lsn_str))
@@ -665,7 +670,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
ancestor_timeline_id: Some(ancestor_timeline_id),
existing_initdb_timeline_id: None,
ancestor_start_lsn: start_lsn,
pg_version: None,
pg_version: Some(pg_version),
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
@@ -1583,6 +1588,7 @@ fn cli() -> Command {
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(pg_version_arg.clone())
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))

5
demo0.sh Executable file
View File

@@ -0,0 +1,5 @@
#!/bin/bash
set -o xtrace # Print each command before execution
PGPASSWORD=password psql -h localhost -U postgres -p 8432 -d dockercplane -c "select name, postgres_version from branches where deleted=false;"

38
demo1.sh Executable file
View File

@@ -0,0 +1,38 @@
#!/bin/bash
set -o xtrace # Print each command before execution
cargo neon stop
rm -rf .neon
sleep 4
cargo neon init
sleep 3
cargo neon start
sleep 3
export TENANT_ID=14719455a7fbf1d257f427377d096cc2
cargo neon tenant create --pg-version 15 --tenant-id $TENANT_ID
sleep 1
cargo neon endpoint create main --pg-version 15 --tenant-id $TENANT_ID
sleep 1
cargo neon endpoint start main
cargo neon endpoint list --tenant-id $TENANT_ID
sleep 3
./pg_install/v15/bin/pgbench -i -s 10 -p 55432 -h 127.0.0.1 -U cloud_admin postgres
# This endpoint runs on version 15
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select version();"
psql -p 55432 -h 127.0.0.1 -U cloud_admin postgres -c "select pg_current_wal_lsn()"
psql -p 55432 -h 127.0.0.1 -U cloud_admin postgres -c "\d+"

41
demo2.sh Executable file
View File

@@ -0,0 +1,41 @@
#!/bin/bash
set -o xtrace # Print each command before execution
# stop endpoint. Right now this is important, because pg_upgrade will start it
# This is not strictly needed, so with some hacking we can implement upgrade without a pause.
cargo neon endpoint stop main
cargo neon endpoint list --tenant-id $TENANT_ID
# Let's create branch with new major postgres version
# !This is the feature that we developed during the hackathon!
# everything else is setup and checks
cargo neon timeline branch --tenant-id $TENANT_ID --pg-version 16 --branch-name branch_16
# create and start endpoint on it
cargo neon endpoint create ep_16 --pg-version 16 --tenant-id $TENANT_ID --branch-name branch_16
cargo neon endpoint start ep_16
# let's ensure that this new endpoint runs on a new version
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select version();"
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select pg_current_wal_lsn()"
# This will show 0 bytes size for all user relations
# This is a known issue.
# New timeline doesn't have these extensions, we will read them from parent.
# Now relsize cache for them is also empty. After SeqScan this size cache fill be correct.
# We need to copy the relsize cache from parent timeline.
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "\d+"
# And as you can see, there is some data in the new endpoint.
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select count(*) from pgbench_accounts;"
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select count(*) from pgbench_branches;"
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select count(*) from pgbench_tellers;"
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "\d+"

View File

@@ -539,7 +539,11 @@ async fn timeline_create_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
tracing::info!(%ancestor_id, "starting to branch");
if let Some(pg_version) = request_data.pg_version.as_ref() {
tracing::info!(%pg_version, %ancestor_id, "starting to branch");
} else {
tracing::info!(%ancestor_id, "starting to branch");
}
} else {
tracing::info!("bootstrapping");
}

View File

@@ -54,6 +54,8 @@ pub async fn import_timeline_from_postgres_datadir(
tline: &Timeline,
pgdata_path: &Utf8Path,
pgdata_lsn: Lsn,
change_control_file_lsn: bool,
src_timeline: Option<&Timeline>,
ctx: &RequestContext,
) -> Result<()> {
let mut pg_control: Option<ControlFileData> = None;
@@ -76,8 +78,23 @@ pub async fn import_timeline_from_postgres_datadir(
let mut file = tokio::fs::File::open(absolute_path).await?;
let len = metadata.len() as usize;
if let Some(control_file) =
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
let new_checkpoint_lsn = if change_control_file_lsn {
Some(pgdata_lsn)
} else {
None
};
// if this is import after pg_upgrade, skip all user data files
// relfilenode > FirstNormalObjectId of the new cluster
if let Some(control_file) = import_file(
&mut modification,
relative_path,
&mut file,
len,
new_checkpoint_lsn,
ctx,
)
.await?
{
pg_control = Some(control_file);
}
@@ -85,6 +102,37 @@ pub async fn import_timeline_from_postgres_datadir(
}
}
// // if we're importing after pg_upgrade
// // also copy metadata for all relations that were not copied
// // from the parent timeline
// if let Some(src_timeline) = src_timeline {
// for ((spcnode, dbnode), _) in src_timeline
// .list_dbdirs(pgdata_lsn, ctx)
// .await
// .with_context(|| format!("Failed to list_dbdirs for src_timeline"))?
// {
// let rels = src_timeline
// .list_rels(spcnode, dbnode, Version::Lsn(pgdata_lsn), ctx)
// .await
// .with_context(|| format!("Failed to list_rels for src_timeline"))?;
// let new_rels = tline
// .list_rels(spcnode, dbnode, Version::Lsn(pgdata_lsn), ctx)
// .await
// .with_context(|| format!("Failed to list_rels for new_timeline"))?;
// for rel in rels {
// if !new_rels.contains(&rel) {
// let nblocks = src_timeline
// .get_rel_size(rel, Version::Lsn(pgdata_lsn), ctx)
// .await
// .with_context(|| format!("Failed to get_rel_size for src_timeline"))?;
// // TODO insert relation size into the new timeline's cache
// }
// }
// }
// }
// We're done importing all the data files.
modification.commit(ctx).await?;
@@ -94,6 +142,10 @@ pub async fn import_timeline_from_postgres_datadir(
pg_control.state == DBState_DB_SHUTDOWNED,
"Postgres cluster was not shut down cleanly"
);
info!("pg_control: {:?}", pg_control);
info!("checkpoint: {:?}", pg_control.checkPoint);
info!("pgdata_lsn: {:?}", pgdata_lsn.0);
info!("checkpoint redo: {:?}", pg_control.checkPointCopy.redo);
ensure!(
pg_control.checkPointCopy.redo == pgdata_lsn.0,
"unexpected checkpoint REDO pointer"
@@ -102,18 +154,46 @@ pub async fn import_timeline_from_postgres_datadir(
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
// this reads the checkpoint record itself, advancing the tip of the timeline to
// *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
import_wal(
&pgdata_path.join("pg_wal"),
tline,
Lsn(pg_control.checkPointCopy.redo),
pgdata_lsn,
ctx,
)
.await?;
if !change_control_file_lsn {
import_wal(
&pgdata_path.join("pg_wal"),
tline,
Lsn(pg_control.checkPointCopy.redo),
pgdata_lsn,
ctx,
)
.await?;
}
Ok(())
}
fn is_user_relfile(path: &Path) -> bool {
let filename = &path
.file_name()
.expect("missing rel filename")
.to_string_lossy();
let (relnode, _, _) = parse_relfilename(filename)
.map_err(|e| {
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
e
})
.unwrap();
// if this is import after pg_upgrade, skip all user data files
// relfilenode > FirstNormalObjectId of the new cluster
// THIS IS WRONG
// if catalog relation was vacuumed with vacuum full, it will have a new relfilenode
// which will be greater than FirstNormalObjectId
// Use pg_relfilemap decide if the relation is a catalog relation
if relnode > pg_constants::FIRST_NORMAL_OBJECT_ID {
//
return true;
}
false
}
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
async fn import_rel(
modification: &mut DatadirModification<'_>,
@@ -367,8 +447,15 @@ pub async fn import_basebackup_from_tar(
match header.entry_type() {
tokio_tar::EntryType::Regular => {
if let Some(res) =
import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
if let Some(res) = import_file(
&mut modification,
file_path.as_ref(),
&mut entry,
len,
None,
ctx,
)
.await?
{
// We found the pg_control file.
pg_control = Some(res);
@@ -493,6 +580,7 @@ async fn import_file(
file_path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
len: usize,
new_checkpoint_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<Option<ControlFileData>> {
let file_name = match file_path.file_name() {
@@ -506,6 +594,13 @@ async fn import_file(
return Ok(None);
}
if file_name == "pg_internal.init" {
// tar archives on macOs, created without COPYFILE_DISABLE=1 env var
// will contain "fork files", skip them.
info!("skipping pg_internal.init");
return Ok(None);
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
@@ -515,7 +610,14 @@ async fn import_file(
let bytes = read_all_bytes(reader).await?;
// Extract the checkpoint record and import it separately.
let pg_control = ControlFileData::decode(&bytes[..])?;
let mut pg_control = ControlFileData::decode(&bytes[..])?;
if let Some(checkpoint_lsn) = new_checkpoint_lsn {
// If we're not changing the checkpoint LSN, use the one from the control file.
pg_control.checkPoint = checkpoint_lsn.0;
pg_control.checkPointCopy.redo = checkpoint_lsn.0;
};
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
modification.put_checkpoint(checkpoint_bytes)?;
debug!("imported control file");
@@ -535,8 +637,16 @@ async fn import_file(
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
debug!("imported rel creation");
// if this is import after pg_upgrade, skip all user data files
// relfilenode > FirstNormalObjectId of the new cluster
// TODO Implement import_rel_from_old_version that will copy
// relation metadata and cached size from the parent timeline
if is_user_relfile(file_path) && new_checkpoint_lsn.is_some() {
info!("after pg_restore skipping {:?}", file_path);
} else {
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
debug!("imported rel creation");
}
}
}
} else if file_path.starts_with("base") {
@@ -560,8 +670,14 @@ async fn import_file(
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
debug!("imported rel creation");
// if this is import after pg_upgrade, skip all user data files
// relfilenode > FirstNormalObjectId of the new cluster
if is_user_relfile(file_path) && new_checkpoint_lsn.is_some() {
info!("after pg_restore skipping {:?}", file_path);
} else {
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
debug!("imported rel creation");
}
}
}
} else if file_path.starts_with("pg_xact") {

View File

@@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use std::collections::BTreeMap;
use std::fmt;
use std::process::Stdio;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
@@ -48,6 +49,7 @@ use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
use utils::fs_ext;
use utils::id::TenantId;
use utils::pausable_failpoint;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
@@ -84,6 +86,7 @@ use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
};
use crate::pgdatadir_mapping;
use crate::repository::GcResult;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
@@ -1709,15 +1712,31 @@ impl Tenant {
WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
})?;
}
self.branch_timeline(
&ancestor_timeline,
new_timeline_id,
ancestor_start_lsn,
create_guard,
ctx,
)
.await?
// hackathon hackaneon single click postgres upgrade
if pg_version > ancestor_timeline.pg_version {
let old_pg_version = ancestor_timeline.pg_version;
tracing::info!("Upgrading timeline {new_timeline_id} from version {old_pg_version} to {pg_version}");
// add new stuff here
self.branch_timeline(
&ancestor_timeline,
new_timeline_id,
ancestor_start_lsn,
create_guard,
ctx,
pg_version,
)
.await?
} else {
self.branch_timeline(
&ancestor_timeline,
new_timeline_id,
ancestor_start_lsn,
create_guard,
ctx,
ancestor_timeline.pg_version,
)
.await?
}
}
None => {
self.bootstrap_timeline(
@@ -3229,9 +3248,17 @@ impl Tenant {
start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>,
ctx: &RequestContext,
pg_version: u32,
) -> Result<Arc<Timeline>, CreateTimelineError> {
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
.await
self.branch_timeline_impl(
src_timeline,
dst_id,
start_lsn,
timeline_create_guard,
ctx,
pg_version,
)
.await
}
async fn branch_timeline_impl(
@@ -3240,7 +3267,8 @@ impl Tenant {
dst_id: TimelineId,
start_lsn: Option<Lsn>,
timeline_create_guard: TimelineCreateGuard<'_>,
_ctx: &RequestContext,
ctx: &RequestContext,
pg_version: u32,
) -> Result<Arc<Timeline>, CreateTimelineError> {
let src_id = src_timeline.timeline_id;
@@ -3316,7 +3344,7 @@ impl Tenant {
start_lsn,
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
src_timeline.initdb_lsn,
src_timeline.pg_version,
pg_version,
);
let uninitialized_timeline = self
@@ -3330,6 +3358,117 @@ impl Tenant {
)
.await?;
if pg_version != src_timeline.pg_version {
info!(
"branching timeline {dst_id} from timeline {src_id} with different pg_version: {pg_version}",
);
let timeline_id = dst_id;
// prepare pgdata for the new timeline
let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
let pgdata_path = path_with_suffix_extension(
timelines_path.join(format!("basebackup-{timeline_id}")),
TEMP_FILE_SUFFIX,
);
if pgdata_path.exists() {
fs::remove_dir_all(&pgdata_path).with_context(|| {
format!("Failed to remove already existing initdb directory: {pgdata_path}")
})?;
}
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
// scopeguard::defer! {
// if let Err(e) = fs::remove_dir_all(&pgdata_path) {
// // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
// error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
// }
// }
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel)
.await
.with_context(|| {
format!(
"Failed to initdb {timeline_id} with pg_version {pg_version} at {pgdata_path}"
)
})?;
// TODO
// do pg_upgrade bits here
// Rust is not the most convenient for writing this,
// So just call the pg_upgrade in the subprocess.
// In the future we can turn it into API call to some service that will do the work
//
// 1. start postgres on a parent timeline at the start_lsn, using neon_local (now this is hardcoded)
// 2. run pg_upgrade using neon_local for old version and freshly created pgdata for new version
run_pg_upgrade(
self.conf,
&pgdata_path,
src_timeline.pg_version,
pg_version,
src_timeline.timeline_id,
self.tenant_shard_id.tenant_id,
start_lsn,
&self.cancel,
).await.with_context(|| {
format!(
"Failed to pg_upgrade {timeline_id} with pg_version {pg_version} at {pgdata_path}"
)
})?;
let contolfile_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
let start_lsn = start_lsn.align();
// choose the max of controlfile_lsn and start_lsn
//
// It is possible that the controlfile_lsn is ahead of the start_lsn,
// especially for small databases
// In that case, we need to start from the controlfile_lsn.
// Otherwise we will have LSN on the pages larger that the lsn of the branch.
// And this will lead to the error, when compute will try to flush the page
// with the lsn larger than the branch lsn.
//
// ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X
//
// We got another problem here - a gap between the
// branching_lsn (where we diverged with the parent) and pgdata_lsn (import lsn of the new timeline)
// We should teach the wal-redo to skip all the records between these two points.
// Otherwise we will see some updates from the parent timeline in the new timeline
let pgdata_lsn = std::cmp::max(contolfile_lsn, start_lsn);
assert!(pgdata_lsn.is_aligned());
// TODO why do we need these lines?
let tenant_shard_id = uninitialized_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = uninitialized_timeline.raw_timeline()?;
// Flush the new layer files to disk, before we make the timeline as available to
// the outside world.
//
// Flush loop needs to be spawned in order to be able to flush.
unfinished_timeline.maybe_spawn_flush_loop();
import_datadir::import_timeline_from_postgres_datadir(
unfinished_timeline,
&pgdata_path,
pgdata_lsn,
true,
Some(src_timeline),
ctx,
)
.await
.with_context(|| {
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;
unfinished_timeline
.freeze_and_flush()
.await
.with_context(|| {
format!(
"Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
)
})?;
}
let new_timeline = uninitialized_timeline.finish_creation()?;
// Root timeline gets its layers during creation and uploads them along with the metadata.
@@ -3537,6 +3676,8 @@ impl Tenant {
unfinished_timeline,
&pgdata_path,
pgdata_lsn,
false,
None,
ctx,
)
.await
@@ -3894,6 +4035,95 @@ async fn run_initdb(
Ok(())
}
/// Run pg_upgrade from the old cluster to the new cluster.
async fn run_pg_upgrade(
conf: &'static PageServerConf,
new_pgdata: &Utf8Path,
old_pg_version: u32,
new_pg_version: u32,
_parent_timeline_id: TimelineId,
_tenant_id: TenantId,
_start_lsn: Lsn, // this is where we need to start compute for parent timeline to dump the data
cancel: &CancellationToken,
) -> Result<(), InitdbError> {
//let old_bin_path = conf.pg_bin_dir(old_pg_version).map_err(InitdbError::Other)?;
let pg_upgrade_bin_path = conf
.pg_bin_dir(new_pg_version)
.map_err(InitdbError::Other)?
.join("pg_upgrade");
let pg_upgrade_lib_dir = conf
.pg_lib_dir(new_pg_version)
.map_err(InitdbError::Other)?;
info!(
"running {} in pgdata {} from version {} to {}",
pg_upgrade_bin_path, new_pgdata, old_pg_version, new_pg_version,
);
// TODO
// start ad-hoc compute for parent timeline to connect and dump the data
// inspired by the script https://github.com/neondatabase/cloud/pull/17267/files
// and neon_local
// We test with neon_local, so let's hardcode it for now
let base_dir = "/home/ana/work/neon/";
let old_pgdata = format!("{}/.neon/endpoints/main/pgdata", base_dir);
let pg_upgrade_command = tokio::process::Command::new(&pg_upgrade_bin_path)
.current_dir(base_dir)
.args(["-b", format!("{}pg_install/v15/bin/", base_dir).as_str()])
.args(["-B", format!("{}pg_install/v16/bin/", base_dir).as_str()])
.args(["-d", old_pgdata.as_ref()])
.args(["-D", new_pgdata.as_ref()])
.args(["--username", &conf.superuser])
.args(["--socketdir", "/tmp"])
.args([
"--neon_start",
format!("{}target/debug/neon_local endpoint start main", base_dir).as_str(),
])
.args([
"--neon_stop",
format!("{}target/debug/neon_local endpoint stop main", base_dir).as_str(),
])
.env_clear()
.env("LD_LIBRARY_PATH", &pg_upgrade_lib_dir)
.env("DYLD_LIBRARY_PATH", &pg_upgrade_lib_dir)
.env("PGPORTOLD", "55432")
.env("PGPORTNEW", "55433")
.stdin(std::process::Stdio::null())
// stdout invocation produces the same output every time, we don't need it
.stdout(std::process::Stdio::null())
// we would be interested in the stderr output, if there was any
.stderr(std::process::Stdio::piped())
.spawn()?;
// print pg_upgrade_command
info!("{:?}", pg_upgrade_command);
// Ideally we'd select here with the cancellation token, but the problem is that
// we can't safely terminate initdb: it launches processes of its own, and killing
// initdb doesn't kill them. After we return from this function, we want the target
// directory to be able to be cleaned up.
// See https://github.com/neondatabase/neon/issues/6385
let pg_upgrade_output = pg_upgrade_command.wait_with_output().await?;
if !pg_upgrade_output.status.success() {
return Err(InitdbError::Failed(
pg_upgrade_output.status,
pg_upgrade_output.stderr,
));
}
// This isn't true cancellation support, see above. Still return an error to
// excercise the cancellation code path.
if cancel.is_cancelled() {
return Err(InitdbError::Cancelled);
}
Ok(())
}
/// Dump contents of a layer file to stdout.
pub async fn dump_layerfile_from_path(
path: &Utf8Path,

View File

@@ -5078,14 +5078,14 @@ impl Timeline {
// If we have a page image, and no WAL, we're all set
if data.records.is_empty() {
if let Some((img_lsn, img)) = &data.img {
if let Some((img_lsn, img)) = data.img {
trace!(
"found page image for key {} at {}, no WAL redo required, req LSN {}",
key,
img_lsn,
request_lsn,
);
Ok(img.clone())
Ok(img)
} else {
Err(PageReconstructError::from(anyhow!(
"base image for {key} at {request_lsn} not found"
@@ -5096,33 +5096,138 @@ impl Timeline {
//
// If we don't have a base image, then the oldest WAL record better initialize
// the page
if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
Err(PageReconstructError::from(anyhow!(
"Base image for {} at {} not found, but got {} WAL records",
key,
request_lsn,
data.records.len()
)))
} else {
if data.img.is_some() {
let have_img = data.img.is_some();
let will_init = data
.records
.first()
.map(|(_, rec)| rec.will_init())
.expect("already checked to have records");
match (have_img, will_init) {
(false, false) => {
return Err(PageReconstructError::from(anyhow!(
"Base image for {} at {} not found, but got {} WAL records",
key,
request_lsn,
data.records.len()
)))
}
(true, _) => {
trace!(
"found {} WAL records and a base image for {} at {}, performing WAL redo",
data.records.len(),
key,
request_lsn
);
} else {
}
(false, _) => {
assert!(will_init, "already checked above");
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let res = self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
}
}
let oldest_lsn = data
.records
.first()
.map(|(lsn, _)| lsn)
.expect("again, checked");
// walk up the ancestry until we have found an ancestor covering the lsn range
let ancestry = std::iter::successors(Some(self), |tl| tl.ancestor_timeline.as_deref())
// 100 - initdb R pg14
// 150 - branch S pg14
// 200 - branch T pg15
// 250 - branch U pg15
// 300 - branch V pg16
//
// oldest_lsn = 155:
// get [V pg16, U pg15(one_more=true), T pg15(one_more=true), S pg14(one_more=false)]
.take_while({
let mut one_more = true;
move |tl| {
if *oldest_lsn < tl.ancestor_lsn {
assert!(one_more);
true
} else {
let prev = one_more;
one_more = false;
prev
}
}
})
// remove consecutive same pg_versions, which might be all in case we can use the
// same timeline for all reconstruction.
// [V pg16, U pg15, T pg15, S pg14] => [V pg16, T pg15, S pg14]
.fold(Vec::<&Timeline>::with_capacity(4), |mut acc, next| {
if acc
.last()
.map(|tl| tl.pg_version == next.pg_version)
.unwrap_or(false)
{
// overwrite with an earlier timeline; additionally we only allow upgrades,
// so we cannot go backwards like pg14 (branch) pg15 (branch) pg14
*acc.last_mut().unwrap() = next;
} else {
acc.push(next);
}
acc
});
// shifted for the purpose of timeline_pairs
let later_timelines = ancestry
.iter()
.rev()
.skip(1)
.map(Some)
.chain(std::iter::once(None));
// zip older and later timelines into pair, which we then use to select parts of
// wal records to be executed on which version walredo
let timeline_pairs = ancestry.iter().rev().zip(later_timelines);
let mgr = self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?;
let mut img = data.img.clone();
let mut records = &data.records[..];
for (older, later) in timeline_pairs {
let scratch = records
.iter()
.take_while(|(lsn, _)| {
// if there is no later, take all remaining
later.map(|later| lsn < &later.ancestor_lsn).unwrap_or(true)
})
.cloned()
.collect::<Vec<_>>();
records = &records[scratch.len()..];
if later.is_none() {
assert!(records.is_empty());
}
// if we don't have any records for this timeline (which is possible)
// go to the previous one
if scratch.is_empty() {
tracing::info!("no records for timeline {}", older.timeline_id);
continue;
}
// this is only used for logging on the next round
let last_lsn = scratch.last().unwrap().0;
// is request_lsn ok? it's not used for anything important, just logging.
let res = mgr
.request_redo(key, request_lsn, img, scratch, older.pg_version)
.await;
let img = match res {
Ok(img) => img,
img = match res {
Ok(img) => Some((last_lsn, img)),
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
Err(walredo::Error::Other(e)) => {
return Err(PageReconstructError::WalRedo(
@@ -5130,8 +5235,9 @@ impl Timeline {
))
}
};
Ok(img)
}
Ok(img.unwrap().1)
}
}

View File

@@ -78,7 +78,11 @@ pub struct PostgresRedoManager {
/// # Shutdown
///
/// See [`Self::launched_processes`].
redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
///
/// # Different pg versions
///
/// We run a own quiesced process for each version (pg14, pg15, pg16 and maybe pg17).
processes: [heavier_once_cell::OnceCell<ProcessOnceCell>; 4],
/// Gate that is entered when launching a walredo process and held open
/// until the process has been `kill()`ed and `wait()`ed upon.
@@ -215,10 +219,18 @@ impl PostgresRedoManager {
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
})
},
process: self.redo_process.get().and_then(|p| match &*p {
ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
ProcessOnceCell::ManagerShutDown => None,
}),
process: self
.processes
.iter()
.filter_map(|p| {
p.get().and_then(|p| match &*p {
ProcessOnceCell::Spawned(p) => {
Some(WalRedoManagerProcessStatus { pid: p.id() })
}
ProcessOnceCell::ManagerShutDown => None,
})
})
.next(),
}
}
}
@@ -236,7 +248,7 @@ impl PostgresRedoManager {
tenant_shard_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: heavier_once_cell::OnceCell::default(),
processes: Default::default(),
launched_processes: utils::sync::gate::Gate::default(),
}
}
@@ -256,26 +268,31 @@ impl PostgresRedoManager {
///
/// This method is cancellation-safe.
pub async fn shutdown(&self) -> bool {
// prevent new processes from being spawned
let maybe_permit = match self.redo_process.get_or_init_detached().await {
Ok(guard) => {
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
let mut it_was_us = false;
for process in self.processes.iter() {
// prevent new processes from being spawned
let maybe_permit = match process.get_or_init_detached().await {
Ok(guard) => {
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
None
} else {
let (proc, permit) = guard.take_and_deinit();
drop(proc); // this just drops the Arc, its refcount may not be zero yet
Some(permit)
}
}
}
Err(permit) => Some(permit),
};
let it_was_us = if let Some(permit) = maybe_permit {
self.redo_process
.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
Err(permit) => Some(permit),
};
let i_cant_see_why_this = if let Some(permit) = maybe_permit {
process.set(ProcessOnceCell::ManagerShutDown, permit);
true
} else {
false
};
// TODO: or is correct?
it_was_us |= i_cant_see_why_this;
}
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
// for the underlying process.
@@ -291,7 +308,10 @@ impl PostgresRedoManager {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
self.processes.iter().for_each(|c| {
drop(c.get().map(|guard| guard.take_and_deinit()));
})
}
}
}
@@ -314,13 +334,23 @@ impl PostgresRedoManager {
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, Error> {
assert!(
(14..=17).contains(&pg_version),
"this should be an enum, but no: {pg_version}"
);
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
loop {
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
// handling multiple processes idea: just support N versions here, but the caller
// splits per parent_lsn in the case that:
// - reconstruct_data spans two versions
// - reconstruct_data went to parent???
let process = &self.processes[(pg_version - 14) as usize];
let proc: Arc<Process> = match process.get_or_init_detached().await {
Ok(guard) => match &*guard {
ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
ProcessOnceCell::ManagerShutDown => {
@@ -332,11 +362,11 @@ impl PostgresRedoManager {
// acquire guard before spawning process, so that we don't spawn new processes
// if the gate is already closed.
let _launched_processes_guard = match self.launched_processes.enter() {
Ok(guard) => guard,
Err(GateError::GateClosed) => unreachable!(
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
),
};
Ok(guard) => guard,
Err(GateError::GateClosed) => unreachable!(
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
),
};
let proc = Arc::new(Process {
process: process::WalRedoProcess::launch(
self.conf,
@@ -353,8 +383,7 @@ impl PostgresRedoManager {
pid = proc.id(),
"launched walredo process"
);
self.redo_process
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
process.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
proc
}
};
@@ -419,7 +448,7 @@ impl PostgresRedoManager {
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
// This probably needs revisiting at some later point.
match self.redo_process.get() {
match process.get() {
None => (),
Some(guard) => {
match &*guard {
@@ -448,9 +477,7 @@ impl PostgresRedoManager {
}
}
///
/// Process a batch of WAL records using bespoken Neon code.
///
/// Process a batch of WAL records using bespoke Neon code.
fn apply_batch_neon(
&self,
key: Key,
@@ -471,7 +498,7 @@ impl PostgresRedoManager {
// Apply all the WAL records in the batch
for (record_lsn, record) in records.iter() {
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
apply_neon::apply_in_neon(record, *record_lsn, key, &mut page)?;
}
// Success!
let duration = start_time.elapsed();
@@ -488,18 +515,6 @@ impl PostgresRedoManager {
Ok(page.freeze())
}
fn apply_record_neon(
&self,
key: Key,
page: &mut BytesMut,
record_lsn: Lsn,
record: &NeonWalRecord,
) -> anyhow::Result<()> {
apply_neon::apply_in_neon(record, record_lsn, key, page)?;
Ok(())
}
}
#[cfg(test)]