[compute/postgres] feature: PostgreSQL 17 (#8573)

This adds preliminary PG17 support to Neon, based on RC1 / 2024-09-04
07b828e9d4

NOTICE: The data produced by the included version of the PostgreSQL fork
may not be compatible with the future full release of PostgreSQL 17 due to
expected or unexpected future changes in magic numbers and internals.
DO NOT EXPECT DATA IN V17-TENANTS TO BE COMPATIBLE WITH THE 17.0
RELEASE!

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Matthias van de Meent
2024-09-12 23:18:41 +01:00
committed by GitHub
parent fcab61bdcd
commit 78938d1b59
49 changed files with 2907 additions and 787 deletions

View File

@@ -633,7 +633,7 @@ impl Timeline {
pub(crate) async fn get_twophase_file(
&self,
xid: TransactionId,
xid: u64,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
@@ -646,11 +646,19 @@ impl Timeline {
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> Result<HashSet<TransactionId>, PageReconstructError> {
) -> Result<HashSet<u64>, PageReconstructError> {
// fetch directory entry
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
Ok(TwoPhaseDirectory::des(&buf)?.xids)
if self.pg_version >= 17 {
Ok(TwoPhaseDirectoryV17::des(&buf)?.xids)
} else {
Ok(TwoPhaseDirectory::des(&buf)?
.xids
.iter()
.map(|x| u64::from(*x))
.collect())
}
}
pub(crate) async fn get_control_file(
@@ -902,9 +910,13 @@ impl Timeline {
// Then pg_twophase
result.add_key(TWOPHASEDIR_KEY);
let buf = self.get(TWOPHASEDIR_KEY, lsn, ctx).await?;
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
let mut xids: Vec<u64> = self
.list_twophase_files(lsn, ctx)
.await?
.iter()
.cloned()
.collect();
xids.sort_unstable();
for xid in xids {
result.add_key(twophase_file_key(xid));
@@ -1127,9 +1139,15 @@ impl<'a> DatadirModification<'a> {
// Create AuxFilesDirectory
self.init_aux_dir()?;
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})?;
let buf = if self.tline.pg_version >= 17 {
TwoPhaseDirectoryV17::ser(&TwoPhaseDirectoryV17 {
xids: HashSet::new(),
})
} else {
TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})
}?;
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, 0));
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
@@ -1321,22 +1339,31 @@ impl<'a> DatadirModification<'a> {
pub async fn put_twophase_file(
&mut self,
xid: TransactionId,
xid: u64,
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Add it to the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
self.put(
TWOPHASEDIR_KEY,
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
);
let dirbuf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid = xid as u32;
let mut dir = TwoPhaseDirectory::des(&dirbuf)?;
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
self.put(twophase_file_key(xid), Value::Image(img));
Ok(())
@@ -1639,22 +1666,32 @@ impl<'a> DatadirModification<'a> {
/// This method is used for marking truncated SLRU files
pub async fn drop_twophase_file(
&mut self,
xid: TransactionId,
xid: u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Remove it from the directory entry
let buf = self.get(TWOPHASEDIR_KEY, ctx).await?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
let newdirbuf = if self.tline.pg_version >= 17 {
let mut dir = TwoPhaseDirectoryV17::des(&buf)?;
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
self.put(
TWOPHASEDIR_KEY,
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
);
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
let mut dir = TwoPhaseDirectory::des(&buf)?;
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
// Delete it
self.delete(twophase_key_range(xid));
@@ -2124,11 +2161,21 @@ struct DbDirectory {
dbdirs: HashMap<(Oid, Oid), bool>,
}
// The format of TwoPhaseDirectory changed in PostgreSQL v17, because the filenames of
// pg_twophase files was expanded from 32-bit XIDs to 64-bit XIDs. Previously, the files
// were named like "pg_twophase/000002E5", now they're like
// "pg_twophsae/0000000A000002E4".
#[derive(Debug, Serialize, Deserialize)]
struct TwoPhaseDirectory {
xids: HashSet<TransactionId>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TwoPhaseDirectoryV17 {
xids: HashSet<u64>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct RelDirectory {
// Set of relations that exist. (relfilenode, forknum)