pageserver: store aux files as deltas (#6742)

## Problem

Aux files were stored with an O(N^2) cost, since on each modification
the entire map is re-written as a page image.

This addresses one axis of the inefficiency in logical replication's use
of storage (https://github.com/neondatabase/neon/issues/6626). It will
still be writing a large amount of duplicative data if writing the same
slot's state every 15 seconds, but the impact will be O(N) instead of
O(N^2).

## Summary of changes

- Introduce `NeonWalRecord::AuxFile`
- In `DatadirModification`, if the AUX_FILES_KEY has already been set,
then write a delta instead of an image
This commit is contained in:
John Spray
2024-02-14 15:01:16 +00:00
committed by GitHub
parent 774a6e7475
commit 840abe3954
5 changed files with 242 additions and 38 deletions

View File

@@ -156,6 +156,7 @@ impl Timeline {
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_aux_files: None,
pending_directory_entries: Vec::new(),
lsn,
}
@@ -870,6 +871,14 @@ pub struct DatadirModification<'a> {
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
// If we already wrote any aux file changes in this modification, stash the latest dir. If set,
// [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
// if AUX_FILES_KEY is already set.
pending_aux_files: Option<AuxFilesDirectory>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
}
@@ -1384,31 +1393,76 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
Ok(buf) => AuxFilesDirectory::des(&buf)?,
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
AuxFilesDirectory {
files: HashMap::new(),
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
// We already updated aux files in `self`: emit a delta and update our latest value
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// we are assuming that all _other_ possible errors represents a missing key. If some
// other error occurs, we may incorrectly reset the map of aux files.
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
dir
}
}
};
let path = path.to_string();
if content.is_empty() {
dir.files.remove(&path);
} else {
dir.files.insert(path, Bytes::copy_from_slice(content));
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, dir.files.len()));
self.pending_aux_files = Some(dir);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
Ok(())
}
@@ -1618,8 +1672,18 @@ struct RelDirectory {
}
#[derive(Debug, Serialize, Deserialize, Default)]
struct AuxFilesDirectory {
files: HashMap<String, Bytes>,
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}
impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -1655,8 +1719,60 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
//use super::repo_harness::*;
//use super::*;
use hex_literal::hex;
use utils::id::TimelineId;
use super::*;
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name)?;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tline.raw_timeline().unwrap();
// First modification: insert two keys
let mut modification = tline.begin_modification(Lsn(0x1000));
modification.put_file("foo/bar1", b"content1", &ctx).await?;
modification.set_lsn(Lsn(0x1008))?;
modification.put_file("foo/bar2", b"content2", &ctx).await?;
modification.commit(&ctx).await?;
let expect_1008 = HashMap::from([
("foo/bar1".to_string(), Bytes::from_static(b"content1")),
("foo/bar2".to_string(), Bytes::from_static(b"content2")),
]);
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
assert_eq!(readback, expect_1008);
// Second modification: update one key, remove the other
let mut modification = tline.begin_modification(Lsn(0x2000));
modification.put_file("foo/bar1", b"content3", &ctx).await?;
modification.set_lsn(Lsn(0x2008))?;
modification.put_file("foo/bar2", b"", &ctx).await?;
modification.commit(&ctx).await?;
let expect_2008 =
HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
assert_eq!(readback, expect_2008);
// Reading back in time works
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
assert_eq!(readback, expect_1008);
Ok(())
}
/*
fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {

View File

@@ -3901,6 +3901,7 @@ pub(crate) mod harness {
use utils::lsn::Lsn;
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::walredo::apply_neon;
use crate::{
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
};
@@ -4160,20 +4161,34 @@ pub(crate) mod harness {
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,
lsn,
if base_img.is_some() {
"base image"
} else {
"no base image"
},
records.len()
);
println!("{s}");
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
Ok(TEST_IMG(&s))
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
for (_record_lsn, record) in records {
apply_neon::apply_in_neon(&record, key, &mut page)?;
}
Ok(page.freeze())
} else {
// We never spawn a postgres walredo process in unit tests: just log what we might have done.
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,
lsn,
if base_img.is_some() {
"base image"
} else {
"no base image"
},
records.len()
);
println!("{s}");
Ok(TEST_IMG(&s))
}
}
}
}

View File

@@ -44,6 +44,11 @@ pub enum NeonWalRecord {
moff: MultiXactOffset,
members: Vec<MultiXactMember>,
},
/// Update the map of AUX files, either writing or dropping an entry
AuxFile {
file_path: String,
content: Option<Bytes>,
},
}
impl NeonWalRecord {

View File

@@ -22,7 +22,7 @@
mod process;
/// Code to apply [`NeonWalRecord`]s.
mod apply_neon;
pub(crate) mod apply_neon;
use crate::config::PageServerConf;
use crate::metrics::{

View File

@@ -1,7 +1,8 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
@@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;
/// Can this request be served by neon redo functions
/// or we need to pass it to wal-redo postgres process?
@@ -230,6 +232,72 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { file_path, content } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(file_path.clone(), content.clone());
page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
}
Ok(())
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::key::AUX_FILES_KEY;
use super::*;
use std::collections::HashMap;
use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
#[test]
fn apply_aux_file_deltas() -> anyhow::Result<()> {
let base_dir = AuxFilesDirectory {
files: HashMap::from([
("two".to_string(), Bytes::from_static(b"content0")),
("three".to_string(), Bytes::from_static(b"contentX")),
]),
};
let base_image = AuxFilesDirectory::ser(&base_dir)?;
let deltas = vec![
// Insert
NeonWalRecord::AuxFile {
file_path: "one".to_string(),
content: Some(Bytes::from_static(b"content1")),
},
// Update
NeonWalRecord::AuxFile {
file_path: "two".to_string(),
content: Some(Bytes::from_static(b"content99")),
},
// Delete
NeonWalRecord::AuxFile {
file_path: "three".to_string(),
content: None,
},
];
let file_path = AUX_FILES_KEY;
let mut page = BytesMut::from_iter(base_image);
for record in deltas {
apply_in_neon(&record, file_path, &mut page)?;
}
let reconstructed = AuxFilesDirectory::des(&page)?;
let expect = HashMap::from([
("one".to_string(), Bytes::from_static(b"content1")),
("two".to_string(), Bytes::from_static(b"content99")),
]);
assert_eq!(reconstructed.files, expect);
Ok(())
}
}