mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 12:52:55 +00:00
Compare commits
20 Commits
enable_v17
...
hackathon/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea3a87105c | ||
|
|
33ad0dc177 | ||
|
|
168a6a87d7 | ||
|
|
11e77815af | ||
|
|
39849cd1ff | ||
|
|
87de28bc62 | ||
|
|
6736557ea6 | ||
|
|
d6ae925739 | ||
|
|
133745c005 | ||
|
|
c62f1cc87f | ||
|
|
ae263e5adf | ||
|
|
31ca007fb3 | ||
|
|
7b6a888c24 | ||
|
|
08705d1b8c | ||
|
|
2cc0b392e8 | ||
|
|
60169ad59d | ||
|
|
ee2a6bad93 | ||
|
|
e9525d1f52 | ||
|
|
e757bc9469 | ||
|
|
7089e34070 |
@@ -653,6 +653,11 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
anyhow!("Found no timeline id for branch name '{ancestor_branch_name}'")
|
||||
})?;
|
||||
|
||||
let pg_version = branch_match
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let start_lsn = branch_match
|
||||
.get_one::<String>("ancestor-start-lsn")
|
||||
.map(|lsn_str| Lsn::from_str(lsn_str))
|
||||
@@ -665,7 +670,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
ancestor_timeline_id: Some(ancestor_timeline_id),
|
||||
existing_initdb_timeline_id: None,
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
pg_version: Some(pg_version),
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
@@ -1583,6 +1588,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("branch")
|
||||
.about("Create a new timeline, using another timeline as a base, copying its data")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
|
||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||
|
||||
5
demo0.sh
Executable file
5
demo0.sh
Executable file
@@ -0,0 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -o xtrace # Print each command before execution
|
||||
|
||||
PGPASSWORD=password psql -h localhost -U postgres -p 8432 -d dockercplane -c "select name, postgres_version from branches where deleted=false;"
|
||||
38
demo1.sh
Executable file
38
demo1.sh
Executable file
@@ -0,0 +1,38 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -o xtrace # Print each command before execution
|
||||
|
||||
cargo neon stop
|
||||
rm -rf .neon
|
||||
|
||||
sleep 4
|
||||
|
||||
cargo neon init
|
||||
|
||||
sleep 3
|
||||
|
||||
cargo neon start
|
||||
sleep 3
|
||||
|
||||
export TENANT_ID=14719455a7fbf1d257f427377d096cc2
|
||||
cargo neon tenant create --pg-version 15 --tenant-id $TENANT_ID
|
||||
|
||||
sleep 1
|
||||
|
||||
cargo neon endpoint create main --pg-version 15 --tenant-id $TENANT_ID
|
||||
|
||||
sleep 1
|
||||
|
||||
cargo neon endpoint start main
|
||||
cargo neon endpoint list --tenant-id $TENANT_ID
|
||||
|
||||
sleep 3
|
||||
|
||||
./pg_install/v15/bin/pgbench -i -s 10 -p 55432 -h 127.0.0.1 -U cloud_admin postgres
|
||||
|
||||
# This endpoint runs on version 15
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select version();"
|
||||
psql -p 55432 -h 127.0.0.1 -U cloud_admin postgres -c "select pg_current_wal_lsn()"
|
||||
psql -p 55432 -h 127.0.0.1 -U cloud_admin postgres -c "\d+"
|
||||
|
||||
|
||||
41
demo2.sh
Executable file
41
demo2.sh
Executable file
@@ -0,0 +1,41 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -o xtrace # Print each command before execution
|
||||
|
||||
# stop endpoint. Right now this is important, because pg_upgrade will start it
|
||||
# This is not strictly needed, so with some hacking we can implement upgrade without a pause.
|
||||
|
||||
cargo neon endpoint stop main
|
||||
cargo neon endpoint list --tenant-id $TENANT_ID
|
||||
|
||||
# Let's create branch with new major postgres version
|
||||
# !This is the feature that we developed during the hackathon!
|
||||
# everything else is setup and checks
|
||||
|
||||
cargo neon timeline branch --tenant-id $TENANT_ID --pg-version 16 --branch-name branch_16
|
||||
|
||||
# create and start endpoint on it
|
||||
cargo neon endpoint create ep_16 --pg-version 16 --tenant-id $TENANT_ID --branch-name branch_16
|
||||
|
||||
cargo neon endpoint start ep_16
|
||||
|
||||
# let's ensure that this new endpoint runs on a new version
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select version();"
|
||||
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select pg_current_wal_lsn()"
|
||||
|
||||
|
||||
# This will show 0 bytes size for all user relations
|
||||
# This is a known issue.
|
||||
# New timeline doesn't have these extensions, we will read them from parent.
|
||||
# Now relsize cache for them is also empty. After SeqScan this size cache fill be correct.
|
||||
# We need to copy the relsize cache from parent timeline.
|
||||
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "\d+"
|
||||
|
||||
# And as you can see, there is some data in the new endpoint.
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select count(*) from pgbench_accounts;"
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select count(*) from pgbench_branches;"
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "select count(*) from pgbench_tellers;"
|
||||
|
||||
psql -p 55434 -h 127.0.0.1 -U cloud_admin postgres -c "\d+"
|
||||
@@ -539,7 +539,11 @@ async fn timeline_create_handler(
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
if let Some(ancestor_id) = request_data.ancestor_timeline_id.as_ref() {
|
||||
tracing::info!(%ancestor_id, "starting to branch");
|
||||
if let Some(pg_version) = request_data.pg_version.as_ref() {
|
||||
tracing::info!(%pg_version, %ancestor_id, "starting to branch");
|
||||
} else {
|
||||
tracing::info!(%ancestor_id, "starting to branch");
|
||||
}
|
||||
} else {
|
||||
tracing::info!("bootstrapping");
|
||||
}
|
||||
|
||||
@@ -54,6 +54,8 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
tline: &Timeline,
|
||||
pgdata_path: &Utf8Path,
|
||||
pgdata_lsn: Lsn,
|
||||
change_control_file_lsn: bool,
|
||||
src_timeline: Option<&Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
@@ -76,8 +78,23 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
|
||||
let mut file = tokio::fs::File::open(absolute_path).await?;
|
||||
let len = metadata.len() as usize;
|
||||
if let Some(control_file) =
|
||||
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
|
||||
let new_checkpoint_lsn = if change_control_file_lsn {
|
||||
Some(pgdata_lsn)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// if this is import after pg_upgrade, skip all user data files
|
||||
// relfilenode > FirstNormalObjectId of the new cluster
|
||||
if let Some(control_file) = import_file(
|
||||
&mut modification,
|
||||
relative_path,
|
||||
&mut file,
|
||||
len,
|
||||
new_checkpoint_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
@@ -85,6 +102,37 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
}
|
||||
}
|
||||
|
||||
// // if we're importing after pg_upgrade
|
||||
// // also copy metadata for all relations that were not copied
|
||||
// // from the parent timeline
|
||||
// if let Some(src_timeline) = src_timeline {
|
||||
// for ((spcnode, dbnode), _) in src_timeline
|
||||
// .list_dbdirs(pgdata_lsn, ctx)
|
||||
// .await
|
||||
// .with_context(|| format!("Failed to list_dbdirs for src_timeline"))?
|
||||
// {
|
||||
// let rels = src_timeline
|
||||
// .list_rels(spcnode, dbnode, Version::Lsn(pgdata_lsn), ctx)
|
||||
// .await
|
||||
// .with_context(|| format!("Failed to list_rels for src_timeline"))?;
|
||||
|
||||
// let new_rels = tline
|
||||
// .list_rels(spcnode, dbnode, Version::Lsn(pgdata_lsn), ctx)
|
||||
// .await
|
||||
// .with_context(|| format!("Failed to list_rels for new_timeline"))?;
|
||||
|
||||
// for rel in rels {
|
||||
// if !new_rels.contains(&rel) {
|
||||
// let nblocks = src_timeline
|
||||
// .get_rel_size(rel, Version::Lsn(pgdata_lsn), ctx)
|
||||
// .await
|
||||
// .with_context(|| format!("Failed to get_rel_size for src_timeline"))?;
|
||||
// // TODO insert relation size into the new timeline's cache
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit(ctx).await?;
|
||||
|
||||
@@ -94,6 +142,10 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
pg_control.state == DBState_DB_SHUTDOWNED,
|
||||
"Postgres cluster was not shut down cleanly"
|
||||
);
|
||||
info!("pg_control: {:?}", pg_control);
|
||||
info!("checkpoint: {:?}", pg_control.checkPoint);
|
||||
info!("pgdata_lsn: {:?}", pgdata_lsn.0);
|
||||
info!("checkpoint redo: {:?}", pg_control.checkPointCopy.redo);
|
||||
ensure!(
|
||||
pg_control.checkPointCopy.redo == pgdata_lsn.0,
|
||||
"unexpected checkpoint REDO pointer"
|
||||
@@ -102,18 +154,46 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
|
||||
// this reads the checkpoint record itself, advancing the tip of the timeline to
|
||||
// *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
|
||||
import_wal(
|
||||
&pgdata_path.join("pg_wal"),
|
||||
tline,
|
||||
Lsn(pg_control.checkPointCopy.redo),
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
if !change_control_file_lsn {
|
||||
import_wal(
|
||||
&pgdata_path.join("pg_wal"),
|
||||
tline,
|
||||
Lsn(pg_control.checkPointCopy.redo),
|
||||
pgdata_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_user_relfile(path: &Path) -> bool {
|
||||
let filename = &path
|
||||
.file_name()
|
||||
.expect("missing rel filename")
|
||||
.to_string_lossy();
|
||||
let (relnode, _, _) = parse_relfilename(filename)
|
||||
.map_err(|e| {
|
||||
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
|
||||
e
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// if this is import after pg_upgrade, skip all user data files
|
||||
// relfilenode > FirstNormalObjectId of the new cluster
|
||||
|
||||
// THIS IS WRONG
|
||||
// if catalog relation was vacuumed with vacuum full, it will have a new relfilenode
|
||||
// which will be greater than FirstNormalObjectId
|
||||
// Use pg_relfilemap decide if the relation is a catalog relation
|
||||
if relnode > pg_constants::FIRST_NORMAL_OBJECT_ID {
|
||||
//
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
|
||||
async fn import_rel(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
@@ -367,8 +447,15 @@ pub async fn import_basebackup_from_tar(
|
||||
|
||||
match header.entry_type() {
|
||||
tokio_tar::EntryType::Regular => {
|
||||
if let Some(res) =
|
||||
import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
|
||||
if let Some(res) = import_file(
|
||||
&mut modification,
|
||||
file_path.as_ref(),
|
||||
&mut entry,
|
||||
len,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
// We found the pg_control file.
|
||||
pg_control = Some(res);
|
||||
@@ -493,6 +580,7 @@ async fn import_file(
|
||||
file_path: &Path,
|
||||
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
||||
len: usize,
|
||||
new_checkpoint_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<ControlFileData>> {
|
||||
let file_name = match file_path.file_name() {
|
||||
@@ -506,6 +594,13 @@ async fn import_file(
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if file_name == "pg_internal.init" {
|
||||
// tar archives on macOs, created without COPYFILE_DISABLE=1 env var
|
||||
// will contain "fork files", skip them.
|
||||
info!("skipping pg_internal.init");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if file_path.starts_with("global") {
|
||||
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
|
||||
let dbnode = 0;
|
||||
@@ -515,7 +610,14 @@ async fn import_file(
|
||||
let bytes = read_all_bytes(reader).await?;
|
||||
|
||||
// Extract the checkpoint record and import it separately.
|
||||
let pg_control = ControlFileData::decode(&bytes[..])?;
|
||||
let mut pg_control = ControlFileData::decode(&bytes[..])?;
|
||||
|
||||
if let Some(checkpoint_lsn) = new_checkpoint_lsn {
|
||||
// If we're not changing the checkpoint LSN, use the one from the control file.
|
||||
pg_control.checkPoint = checkpoint_lsn.0;
|
||||
pg_control.checkPointCopy.redo = checkpoint_lsn.0;
|
||||
};
|
||||
|
||||
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
|
||||
modification.put_checkpoint(checkpoint_bytes)?;
|
||||
debug!("imported control file");
|
||||
@@ -535,8 +637,16 @@ async fn import_file(
|
||||
debug!("ignored PG_VERSION file");
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
||||
debug!("imported rel creation");
|
||||
// if this is import after pg_upgrade, skip all user data files
|
||||
// relfilenode > FirstNormalObjectId of the new cluster
|
||||
// TODO Implement import_rel_from_old_version that will copy
|
||||
// relation metadata and cached size from the parent timeline
|
||||
if is_user_relfile(file_path) && new_checkpoint_lsn.is_some() {
|
||||
info!("after pg_restore skipping {:?}", file_path);
|
||||
} else {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
||||
debug!("imported rel creation");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("base") {
|
||||
@@ -560,8 +670,14 @@ async fn import_file(
|
||||
debug!("ignored PG_VERSION file");
|
||||
}
|
||||
_ => {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
||||
debug!("imported rel creation");
|
||||
// if this is import after pg_upgrade, skip all user data files
|
||||
// relfilenode > FirstNormalObjectId of the new cluster
|
||||
if is_user_relfile(file_path) && new_checkpoint_lsn.is_some() {
|
||||
info!("after pg_restore skipping {:?}", file_path);
|
||||
} else {
|
||||
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
||||
debug!("imported rel creation");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if file_path.starts_with("pg_xact") {
|
||||
|
||||
@@ -33,6 +33,7 @@ use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::process::Stdio;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
@@ -48,6 +49,7 @@ use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::failpoint_support;
|
||||
use utils::fs_ext;
|
||||
use utils::id::TenantId;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::gate::GateGuard;
|
||||
@@ -84,6 +86,7 @@ use crate::metrics::{
|
||||
remove_tenant_metrics, BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN,
|
||||
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
|
||||
};
|
||||
use crate::pgdatadir_mapping;
|
||||
use crate::repository::GcResult;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
@@ -1709,15 +1712,31 @@ impl Tenant {
|
||||
WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
|
||||
})?;
|
||||
}
|
||||
|
||||
self.branch_timeline(
|
||||
&ancestor_timeline,
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
create_guard,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
// hackathon hackaneon single click postgres upgrade
|
||||
if pg_version > ancestor_timeline.pg_version {
|
||||
let old_pg_version = ancestor_timeline.pg_version;
|
||||
tracing::info!("Upgrading timeline {new_timeline_id} from version {old_pg_version} to {pg_version}");
|
||||
// add new stuff here
|
||||
self.branch_timeline(
|
||||
&ancestor_timeline,
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
create_guard,
|
||||
ctx,
|
||||
pg_version,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
self.branch_timeline(
|
||||
&ancestor_timeline,
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
create_guard,
|
||||
ctx,
|
||||
ancestor_timeline.pg_version,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
None => {
|
||||
self.bootstrap_timeline(
|
||||
@@ -3229,9 +3248,17 @@ impl Tenant {
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_create_guard: TimelineCreateGuard<'_>,
|
||||
ctx: &RequestContext,
|
||||
pg_version: u32,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_create_guard, ctx)
|
||||
.await
|
||||
self.branch_timeline_impl(
|
||||
src_timeline,
|
||||
dst_id,
|
||||
start_lsn,
|
||||
timeline_create_guard,
|
||||
ctx,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn branch_timeline_impl(
|
||||
@@ -3240,7 +3267,8 @@ impl Tenant {
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_create_guard: TimelineCreateGuard<'_>,
|
||||
_ctx: &RequestContext,
|
||||
ctx: &RequestContext,
|
||||
pg_version: u32,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
@@ -3316,7 +3344,7 @@ impl Tenant {
|
||||
start_lsn,
|
||||
*src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
|
||||
src_timeline.initdb_lsn,
|
||||
src_timeline.pg_version,
|
||||
pg_version,
|
||||
);
|
||||
|
||||
let uninitialized_timeline = self
|
||||
@@ -3330,6 +3358,117 @@ impl Tenant {
|
||||
)
|
||||
.await?;
|
||||
|
||||
if pg_version != src_timeline.pg_version {
|
||||
info!(
|
||||
"branching timeline {dst_id} from timeline {src_id} with different pg_version: {pg_version}",
|
||||
);
|
||||
|
||||
let timeline_id = dst_id;
|
||||
|
||||
// prepare pgdata for the new timeline
|
||||
let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
|
||||
let pgdata_path = path_with_suffix_extension(
|
||||
timelines_path.join(format!("basebackup-{timeline_id}")),
|
||||
TEMP_FILE_SUFFIX,
|
||||
);
|
||||
|
||||
if pgdata_path.exists() {
|
||||
fs::remove_dir_all(&pgdata_path).with_context(|| {
|
||||
format!("Failed to remove already existing initdb directory: {pgdata_path}")
|
||||
})?;
|
||||
}
|
||||
// this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
|
||||
// scopeguard::defer! {
|
||||
// if let Err(e) = fs::remove_dir_all(&pgdata_path) {
|
||||
// // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
|
||||
// error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
|
||||
// }
|
||||
// }
|
||||
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to initdb {timeline_id} with pg_version {pg_version} at {pgdata_path}"
|
||||
)
|
||||
})?;
|
||||
|
||||
// TODO
|
||||
// do pg_upgrade bits here
|
||||
// Rust is not the most convenient for writing this,
|
||||
// So just call the pg_upgrade in the subprocess.
|
||||
// In the future we can turn it into API call to some service that will do the work
|
||||
//
|
||||
// 1. start postgres on a parent timeline at the start_lsn, using neon_local (now this is hardcoded)
|
||||
// 2. run pg_upgrade using neon_local for old version and freshly created pgdata for new version
|
||||
run_pg_upgrade(
|
||||
self.conf,
|
||||
&pgdata_path,
|
||||
src_timeline.pg_version,
|
||||
pg_version,
|
||||
src_timeline.timeline_id,
|
||||
self.tenant_shard_id.tenant_id,
|
||||
start_lsn,
|
||||
&self.cancel,
|
||||
).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to pg_upgrade {timeline_id} with pg_version {pg_version} at {pgdata_path}"
|
||||
)
|
||||
})?;
|
||||
|
||||
let contolfile_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
|
||||
let start_lsn = start_lsn.align();
|
||||
// choose the max of controlfile_lsn and start_lsn
|
||||
//
|
||||
// It is possible that the controlfile_lsn is ahead of the start_lsn,
|
||||
// especially for small databases
|
||||
// In that case, we need to start from the controlfile_lsn.
|
||||
// Otherwise we will have LSN on the pages larger that the lsn of the branch.
|
||||
// And this will lead to the error, when compute will try to flush the page
|
||||
// with the lsn larger than the branch lsn.
|
||||
//
|
||||
// ERROR : xlog flush request %X/%X is not satisfied --- flushed only to %X/%X
|
||||
//
|
||||
// We got another problem here - a gap between the
|
||||
// branching_lsn (where we diverged with the parent) and pgdata_lsn (import lsn of the new timeline)
|
||||
// We should teach the wal-redo to skip all the records between these two points.
|
||||
// Otherwise we will see some updates from the parent timeline in the new timeline
|
||||
let pgdata_lsn = std::cmp::max(contolfile_lsn, start_lsn);
|
||||
assert!(pgdata_lsn.is_aligned());
|
||||
|
||||
// TODO why do we need these lines?
|
||||
let tenant_shard_id = uninitialized_timeline.owning_tenant.tenant_shard_id;
|
||||
let unfinished_timeline = uninitialized_timeline.raw_timeline()?;
|
||||
|
||||
// Flush the new layer files to disk, before we make the timeline as available to
|
||||
// the outside world.
|
||||
//
|
||||
// Flush loop needs to be spawned in order to be able to flush.
|
||||
unfinished_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
import_datadir::import_timeline_from_postgres_datadir(
|
||||
unfinished_timeline,
|
||||
&pgdata_path,
|
||||
pgdata_lsn,
|
||||
true,
|
||||
Some(src_timeline),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
unfinished_timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
let new_timeline = uninitialized_timeline.finish_creation()?;
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
@@ -3537,6 +3676,8 @@ impl Tenant {
|
||||
unfinished_timeline,
|
||||
&pgdata_path,
|
||||
pgdata_lsn,
|
||||
false,
|
||||
None,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3894,6 +4035,95 @@ async fn run_initdb(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run pg_upgrade from the old cluster to the new cluster.
|
||||
async fn run_pg_upgrade(
|
||||
conf: &'static PageServerConf,
|
||||
new_pgdata: &Utf8Path,
|
||||
old_pg_version: u32,
|
||||
new_pg_version: u32,
|
||||
_parent_timeline_id: TimelineId,
|
||||
_tenant_id: TenantId,
|
||||
_start_lsn: Lsn, // this is where we need to start compute for parent timeline to dump the data
|
||||
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), InitdbError> {
|
||||
//let old_bin_path = conf.pg_bin_dir(old_pg_version).map_err(InitdbError::Other)?;
|
||||
let pg_upgrade_bin_path = conf
|
||||
.pg_bin_dir(new_pg_version)
|
||||
.map_err(InitdbError::Other)?
|
||||
.join("pg_upgrade");
|
||||
|
||||
let pg_upgrade_lib_dir = conf
|
||||
.pg_lib_dir(new_pg_version)
|
||||
.map_err(InitdbError::Other)?;
|
||||
|
||||
info!(
|
||||
"running {} in pgdata {} from version {} to {}",
|
||||
pg_upgrade_bin_path, new_pgdata, old_pg_version, new_pg_version,
|
||||
);
|
||||
|
||||
// TODO
|
||||
// start ad-hoc compute for parent timeline to connect and dump the data
|
||||
// inspired by the script https://github.com/neondatabase/cloud/pull/17267/files
|
||||
// and neon_local
|
||||
|
||||
// We test with neon_local, so let's hardcode it for now
|
||||
let base_dir = "/home/ana/work/neon/";
|
||||
let old_pgdata = format!("{}/.neon/endpoints/main/pgdata", base_dir);
|
||||
|
||||
let pg_upgrade_command = tokio::process::Command::new(&pg_upgrade_bin_path)
|
||||
.current_dir(base_dir)
|
||||
.args(["-b", format!("{}pg_install/v15/bin/", base_dir).as_str()])
|
||||
.args(["-B", format!("{}pg_install/v16/bin/", base_dir).as_str()])
|
||||
.args(["-d", old_pgdata.as_ref()])
|
||||
.args(["-D", new_pgdata.as_ref()])
|
||||
.args(["--username", &conf.superuser])
|
||||
.args(["--socketdir", "/tmp"])
|
||||
.args([
|
||||
"--neon_start",
|
||||
format!("{}target/debug/neon_local endpoint start main", base_dir).as_str(),
|
||||
])
|
||||
.args([
|
||||
"--neon_stop",
|
||||
format!("{}target/debug/neon_local endpoint stop main", base_dir).as_str(),
|
||||
])
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", &pg_upgrade_lib_dir)
|
||||
.env("DYLD_LIBRARY_PATH", &pg_upgrade_lib_dir)
|
||||
.env("PGPORTOLD", "55432")
|
||||
.env("PGPORTNEW", "55433")
|
||||
.stdin(std::process::Stdio::null())
|
||||
// stdout invocation produces the same output every time, we don't need it
|
||||
.stdout(std::process::Stdio::null())
|
||||
// we would be interested in the stderr output, if there was any
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
// print pg_upgrade_command
|
||||
info!("{:?}", pg_upgrade_command);
|
||||
|
||||
// Ideally we'd select here with the cancellation token, but the problem is that
|
||||
// we can't safely terminate initdb: it launches processes of its own, and killing
|
||||
// initdb doesn't kill them. After we return from this function, we want the target
|
||||
// directory to be able to be cleaned up.
|
||||
// See https://github.com/neondatabase/neon/issues/6385
|
||||
let pg_upgrade_output = pg_upgrade_command.wait_with_output().await?;
|
||||
if !pg_upgrade_output.status.success() {
|
||||
return Err(InitdbError::Failed(
|
||||
pg_upgrade_output.status,
|
||||
pg_upgrade_output.stderr,
|
||||
));
|
||||
}
|
||||
|
||||
// This isn't true cancellation support, see above. Still return an error to
|
||||
// excercise the cancellation code path.
|
||||
if cancel.is_cancelled() {
|
||||
return Err(InitdbError::Cancelled);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Dump contents of a layer file to stdout.
|
||||
pub async fn dump_layerfile_from_path(
|
||||
path: &Utf8Path,
|
||||
|
||||
@@ -5078,14 +5078,14 @@ impl Timeline {
|
||||
|
||||
// If we have a page image, and no WAL, we're all set
|
||||
if data.records.is_empty() {
|
||||
if let Some((img_lsn, img)) = &data.img {
|
||||
if let Some((img_lsn, img)) = data.img {
|
||||
trace!(
|
||||
"found page image for key {} at {}, no WAL redo required, req LSN {}",
|
||||
key,
|
||||
img_lsn,
|
||||
request_lsn,
|
||||
);
|
||||
Ok(img.clone())
|
||||
Ok(img)
|
||||
} else {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"base image for {key} at {request_lsn} not found"
|
||||
@@ -5096,33 +5096,138 @@ impl Timeline {
|
||||
//
|
||||
// If we don't have a base image, then the oldest WAL record better initialize
|
||||
// the page
|
||||
if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"Base image for {} at {} not found, but got {} WAL records",
|
||||
key,
|
||||
request_lsn,
|
||||
data.records.len()
|
||||
)))
|
||||
} else {
|
||||
if data.img.is_some() {
|
||||
|
||||
let have_img = data.img.is_some();
|
||||
let will_init = data
|
||||
.records
|
||||
.first()
|
||||
.map(|(_, rec)| rec.will_init())
|
||||
.expect("already checked to have records");
|
||||
|
||||
match (have_img, will_init) {
|
||||
(false, false) => {
|
||||
return Err(PageReconstructError::from(anyhow!(
|
||||
"Base image for {} at {} not found, but got {} WAL records",
|
||||
key,
|
||||
request_lsn,
|
||||
data.records.len()
|
||||
)))
|
||||
}
|
||||
(true, _) => {
|
||||
trace!(
|
||||
"found {} WAL records and a base image for {} at {}, performing WAL redo",
|
||||
data.records.len(),
|
||||
key,
|
||||
request_lsn
|
||||
);
|
||||
} else {
|
||||
}
|
||||
(false, _) => {
|
||||
assert!(will_init, "already checked above");
|
||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||
};
|
||||
let res = self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
}
|
||||
}
|
||||
|
||||
let oldest_lsn = data
|
||||
.records
|
||||
.first()
|
||||
.map(|(lsn, _)| lsn)
|
||||
.expect("again, checked");
|
||||
|
||||
// walk up the ancestry until we have found an ancestor covering the lsn range
|
||||
let ancestry = std::iter::successors(Some(self), |tl| tl.ancestor_timeline.as_deref())
|
||||
// 100 - initdb R pg14
|
||||
// 150 - branch S pg14
|
||||
// 200 - branch T pg15
|
||||
// 250 - branch U pg15
|
||||
// 300 - branch V pg16
|
||||
//
|
||||
// oldest_lsn = 155:
|
||||
// get [V pg16, U pg15(one_more=true), T pg15(one_more=true), S pg14(one_more=false)]
|
||||
.take_while({
|
||||
let mut one_more = true;
|
||||
|
||||
move |tl| {
|
||||
if *oldest_lsn < tl.ancestor_lsn {
|
||||
assert!(one_more);
|
||||
true
|
||||
} else {
|
||||
let prev = one_more;
|
||||
one_more = false;
|
||||
prev
|
||||
}
|
||||
}
|
||||
})
|
||||
// remove consecutive same pg_versions, which might be all in case we can use the
|
||||
// same timeline for all reconstruction.
|
||||
// [V pg16, U pg15, T pg15, S pg14] => [V pg16, T pg15, S pg14]
|
||||
.fold(Vec::<&Timeline>::with_capacity(4), |mut acc, next| {
|
||||
if acc
|
||||
.last()
|
||||
.map(|tl| tl.pg_version == next.pg_version)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
// overwrite with an earlier timeline; additionally we only allow upgrades,
|
||||
// so we cannot go backwards like pg14 (branch) pg15 (branch) pg14
|
||||
*acc.last_mut().unwrap() = next;
|
||||
} else {
|
||||
acc.push(next);
|
||||
}
|
||||
acc
|
||||
});
|
||||
|
||||
// shifted for the purpose of timeline_pairs
|
||||
let later_timelines = ancestry
|
||||
.iter()
|
||||
.rev()
|
||||
.skip(1)
|
||||
.map(Some)
|
||||
.chain(std::iter::once(None));
|
||||
|
||||
// zip older and later timelines into pair, which we then use to select parts of
|
||||
// wal records to be executed on which version walredo
|
||||
let timeline_pairs = ancestry.iter().rev().zip(later_timelines);
|
||||
|
||||
let mgr = self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
.context("timeline has no walredo manager")
|
||||
.map_err(PageReconstructError::WalRedo)?;
|
||||
|
||||
let mut img = data.img.clone();
|
||||
let mut records = &data.records[..];
|
||||
|
||||
for (older, later) in timeline_pairs {
|
||||
let scratch = records
|
||||
.iter()
|
||||
.take_while(|(lsn, _)| {
|
||||
// if there is no later, take all remaining
|
||||
later.map(|later| lsn < &later.ancestor_lsn).unwrap_or(true)
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
records = &records[scratch.len()..];
|
||||
|
||||
if later.is_none() {
|
||||
assert!(records.is_empty());
|
||||
}
|
||||
|
||||
// if we don't have any records for this timeline (which is possible)
|
||||
// go to the previous one
|
||||
if scratch.is_empty() {
|
||||
tracing::info!("no records for timeline {}", older.timeline_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// this is only used for logging on the next round
|
||||
let last_lsn = scratch.last().unwrap().0;
|
||||
|
||||
// is request_lsn ok? it's not used for anything important, just logging.
|
||||
let res = mgr
|
||||
.request_redo(key, request_lsn, img, scratch, older.pg_version)
|
||||
.await;
|
||||
let img = match res {
|
||||
Ok(img) => img,
|
||||
|
||||
img = match res {
|
||||
Ok(img) => Some((last_lsn, img)),
|
||||
Err(walredo::Error::Cancelled) => return Err(PageReconstructError::Cancelled),
|
||||
Err(walredo::Error::Other(e)) => {
|
||||
return Err(PageReconstructError::WalRedo(
|
||||
@@ -5130,8 +5235,9 @@ impl Timeline {
|
||||
))
|
||||
}
|
||||
};
|
||||
Ok(img)
|
||||
}
|
||||
|
||||
Ok(img.unwrap().1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -78,7 +78,11 @@ pub struct PostgresRedoManager {
|
||||
/// # Shutdown
|
||||
///
|
||||
/// See [`Self::launched_processes`].
|
||||
redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
|
||||
///
|
||||
/// # Different pg versions
|
||||
///
|
||||
/// We run a own quiesced process for each version (pg14, pg15, pg16 and maybe pg17).
|
||||
processes: [heavier_once_cell::OnceCell<ProcessOnceCell>; 4],
|
||||
|
||||
/// Gate that is entered when launching a walredo process and held open
|
||||
/// until the process has been `kill()`ed and `wait()`ed upon.
|
||||
@@ -215,10 +219,18 @@ impl PostgresRedoManager {
|
||||
chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
|
||||
})
|
||||
},
|
||||
process: self.redo_process.get().and_then(|p| match &*p {
|
||||
ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
|
||||
ProcessOnceCell::ManagerShutDown => None,
|
||||
}),
|
||||
process: self
|
||||
.processes
|
||||
.iter()
|
||||
.filter_map(|p| {
|
||||
p.get().and_then(|p| match &*p {
|
||||
ProcessOnceCell::Spawned(p) => {
|
||||
Some(WalRedoManagerProcessStatus { pid: p.id() })
|
||||
}
|
||||
ProcessOnceCell::ManagerShutDown => None,
|
||||
})
|
||||
})
|
||||
.next(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -236,7 +248,7 @@ impl PostgresRedoManager {
|
||||
tenant_shard_id,
|
||||
conf,
|
||||
last_redo_at: std::sync::Mutex::default(),
|
||||
redo_process: heavier_once_cell::OnceCell::default(),
|
||||
processes: Default::default(),
|
||||
launched_processes: utils::sync::gate::Gate::default(),
|
||||
}
|
||||
}
|
||||
@@ -256,26 +268,31 @@ impl PostgresRedoManager {
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
pub async fn shutdown(&self) -> bool {
|
||||
// prevent new processes from being spawned
|
||||
let maybe_permit = match self.redo_process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
|
||||
None
|
||||
} else {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
Some(permit)
|
||||
let mut it_was_us = false;
|
||||
for process in self.processes.iter() {
|
||||
// prevent new processes from being spawned
|
||||
let maybe_permit = match process.get_or_init_detached().await {
|
||||
Ok(guard) => {
|
||||
if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
|
||||
None
|
||||
} else {
|
||||
let (proc, permit) = guard.take_and_deinit();
|
||||
drop(proc); // this just drops the Arc, its refcount may not be zero yet
|
||||
Some(permit)
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(permit) => Some(permit),
|
||||
};
|
||||
let it_was_us = if let Some(permit) = maybe_permit {
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::ManagerShutDown, permit);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
Err(permit) => Some(permit),
|
||||
};
|
||||
let i_cant_see_why_this = if let Some(permit) = maybe_permit {
|
||||
process.set(ProcessOnceCell::ManagerShutDown, permit);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
// TODO: or is correct?
|
||||
it_was_us |= i_cant_see_why_this;
|
||||
}
|
||||
// wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
|
||||
// we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
|
||||
// for the underlying process.
|
||||
@@ -291,7 +308,10 @@ impl PostgresRedoManager {
|
||||
if let Some(last_redo_at) = *g {
|
||||
if last_redo_at.elapsed() >= idle_timeout {
|
||||
drop(g);
|
||||
drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
|
||||
|
||||
self.processes.iter().for_each(|c| {
|
||||
drop(c.get().map(|guard| guard.take_and_deinit()));
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -314,13 +334,23 @@ impl PostgresRedoManager {
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, Error> {
|
||||
assert!(
|
||||
(14..=17).contains(&pg_version),
|
||||
"this should be an enum, but no: {pg_version}"
|
||||
);
|
||||
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
|
||||
|
||||
let (rel, blknum) = key.to_rel_block().context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
|
||||
// handling multiple processes idea: just support N versions here, but the caller
|
||||
// splits per parent_lsn in the case that:
|
||||
// - reconstruct_data spans two versions
|
||||
// - reconstruct_data went to parent???
|
||||
let process = &self.processes[(pg_version - 14) as usize];
|
||||
|
||||
let proc: Arc<Process> = match process.get_or_init_detached().await {
|
||||
Ok(guard) => match &*guard {
|
||||
ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
|
||||
ProcessOnceCell::ManagerShutDown => {
|
||||
@@ -332,11 +362,11 @@ impl PostgresRedoManager {
|
||||
// acquire guard before spawning process, so that we don't spawn new processes
|
||||
// if the gate is already closed.
|
||||
let _launched_processes_guard = match self.launched_processes.enter() {
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => unreachable!(
|
||||
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
|
||||
),
|
||||
};
|
||||
Ok(guard) => guard,
|
||||
Err(GateError::GateClosed) => unreachable!(
|
||||
"shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
|
||||
),
|
||||
};
|
||||
let proc = Arc::new(Process {
|
||||
process: process::WalRedoProcess::launch(
|
||||
self.conf,
|
||||
@@ -353,8 +383,7 @@ impl PostgresRedoManager {
|
||||
pid = proc.id(),
|
||||
"launched walredo process"
|
||||
);
|
||||
self.redo_process
|
||||
.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
|
||||
process.set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
|
||||
proc
|
||||
}
|
||||
};
|
||||
@@ -419,7 +448,7 @@ impl PostgresRedoManager {
|
||||
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
|
||||
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
|
||||
// This probably needs revisiting at some later point.
|
||||
match self.redo_process.get() {
|
||||
match process.get() {
|
||||
None => (),
|
||||
Some(guard) => {
|
||||
match &*guard {
|
||||
@@ -448,9 +477,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Process a batch of WAL records using bespoken Neon code.
|
||||
///
|
||||
/// Process a batch of WAL records using bespoke Neon code.
|
||||
fn apply_batch_neon(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -471,7 +498,7 @@ impl PostgresRedoManager {
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
for (record_lsn, record) in records.iter() {
|
||||
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
|
||||
apply_neon::apply_in_neon(record, *record_lsn, key, &mut page)?;
|
||||
}
|
||||
// Success!
|
||||
let duration = start_time.elapsed();
|
||||
@@ -488,18 +515,6 @@ impl PostgresRedoManager {
|
||||
|
||||
Ok(page.freeze())
|
||||
}
|
||||
|
||||
fn apply_record_neon(
|
||||
&self,
|
||||
key: Key,
|
||||
page: &mut BytesMut,
|
||||
record_lsn: Lsn,
|
||||
record: &NeonWalRecord,
|
||||
) -> anyhow::Result<()> {
|
||||
apply_neon::apply_in_neon(record, record_lsn, key, page)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 6e9a4ff624...d1fabbb548
Reference in New Issue
Block a user