Compare commits

...

3 Commits

Author SHA1 Message Date
John Spray
417b1319cd DNM debug 2024-08-02 13:29:24 +01:00
John Spray
c2aadb0c7f wip2 2024-08-02 10:53:36 +01:00
John Spray
00648a9c49 pageserver: start refactoring ingest metadata ops 2024-08-02 10:02:26 +01:00
3 changed files with 270 additions and 104 deletions

View File

@@ -155,17 +155,9 @@ async fn import_rel(
//
// FIXME: Keep track of which relations we've already created?
// https://github.com/neondatabase/neon/issues/3309
if let Err(e) = modification
modification
.put_rel_creation(rel, nblocks as u32, ctx)
.await
{
match e {
RelationError::AlreadyExists => {
debug!("Relation {} already exist. We must be extending it.", rel)
}
_ => return Err(e.into()),
}
}
.await?;
loop {
let r = reader.read_exact(&mut buf).await;

View File

@@ -174,6 +174,7 @@ impl Timeline {
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
metadata_state: MetadataWriteState::new(self),
lsn,
}
}
@@ -1034,6 +1035,229 @@ impl Timeline {
}
}
/// Write something other than a simple postgres key/value: unlike regular relation page writes, these
/// require access to a Timeline in order to do read-modify-write.
#[derive(Debug)]
enum MetadataOp {
// - Insert to DBDIR_KEY if this (spcnode, dbnode) does not already exist
// - Insert to rel_dir_to_key(spcnode, dbnode)
UpsertRelDirectory { spcnode: Oid, dbnode: Oid },
UpsertRelDirectory2 { rel: RelTag, nblocks: BlockNumber },
// - Insert this xid to TWOPHASEDIR_KEY
// UpdateTwoPhaseDir{
// xid: TransactionId
// },
// // - Drop this (spcnode, dbnode) from DBDIR_KEY
// DropDbDir {
// spcnode: Oid,
// dbnode: Oid
// },
// // - Drop this (spcnode, dbnode) from DBDIR_KEY
// DropRel {
// rel: RelTag,
// },
// // - Read-subtract-write the relation size for this rel
// RelTruncate {
// rel: RelTag,
// nblocks: BlockNumber
// },
// // - Read-add-write the relation size for this rel
// RelExtend {
// rel: RelTag,
// nblocks: BlockNumber
// },
// // - Read-modify-write of `slru_dir_to_key` and `slru_segment_size_to_key`
// CreateSlruSegment {
// kind: SlruKind,
// segno: u32,
// nblocks: BlockNumber,
// }
}
/// State that spans all the apply() calls of all the MetadataOp in a DatadirMotification
struct MetadataWriteState<'a> {
/// The timeline this modification applies to. You can access this to
/// read the state, but note that any pending updates are *not* reflected
/// in the state in 'tline' yet.
pub tline: &'a Timeline,
// Write-through cache.
// For pages that we read-modify-write, stash the last value here after each MetadataOp,
// so that we don't have to enter Timeline::get more than necessary.
last_write: HashMap<Key, Bytes>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
// Debug assertions: for calls that we expect to always come in LSN order, track the last LSN we saw
#[cfg(debug_assertions)]
debug_last_lsn: Lsn,
}
impl<'a> MetadataWriteState<'a> {
fn new(timeline: &'a Timeline) -> Self {
if cfg!(debug_assertions) {
Self {
tline: timeline,
last_write: HashMap::default(),
pending_directory_entries: Vec::default(),
debug_last_lsn: Lsn(0),
}
} else {
Self {
tline: timeline,
last_write: HashMap::default(),
pending_directory_entries: Vec::default(),
debug_last_lsn: Lsn(0),
}
}
}
fn assert_lsn_order(&mut self, lsn: Lsn) {
#[cfg(debug_assertions)]
{
debug_assert!(lsn >= self.debug_last_lsn);
self.debug_last_lsn = lsn;
}
}
async fn get(
&mut self,
lsn: Lsn,
key: Key,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
self.assert_lsn_order(lsn);
match self.last_write.get(&key) {
Some(v) => Ok(v.clone()),
None => self.tline.get(key, lsn, ctx).await,
}
}
/// Observe a page write
fn put(&mut self, lsn: Lsn, key: Key, value: Bytes) {
self.assert_lsn_order(lsn);
self.last_write.insert(key, value);
}
}
impl MetadataOp {
async fn apply<'a>(
self,
lsn: Lsn,
data_dir_mod: &mut DatadirModification<'a>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
eprintln!("MetadataOp::apply: {self:?}");
match self {
Self::UpsertRelDirectory { spcnode, dbnode } => {
// Add it to the directory (if it doesn't exist already)
let buf = data_dir_mod.metadata_state.get(lsn, DBDIR_KEY, ctx).await?;
let mut dbdir = DbDirectory::des(&buf)?;
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
if r.is_none() || r == Some(false) {
// The dbdir entry didn't exist, or it contained a
// 'false'. The 'insert' call already updated it with
// 'true', now write the updated 'dbdirs' map back.
let buf = DbDirectory::ser(&dbdir)?;
data_dir_mod.put_metadata_page(lsn, DBDIR_KEY, Bytes::from(buf));
}
if r.is_none() {
// Create RelDirectory
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
data_dir_mod
.metadata_state
.pending_directory_entries
.push((DirectoryKind::Rel, 0));
data_dir_mod.put_metadata_page(
lsn,
rel_dir_to_key(spcnode, dbnode),
Bytes::from(buf),
);
}
Ok(())
}
Self::UpsertRelDirectory2 { rel, nblocks } => {
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(
&data_dir_mod
.metadata_state
.get(lsn, DBDIR_KEY, ctx)
.await
.context("read db")?,
)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if let hash_map::Entry::Vacant(e) =
dbdir.dbdirs.entry((rel.spcnode, rel.dbnode))
{
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
data_dir_mod
.metadata_state
.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
data_dir_mod.put_metadata_page(lsn, DBDIR_KEY, buf.into());
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(
&data_dir_mod
.metadata_state
.get(lsn, rel_dir_key, ctx)
.await
.context("read db")?,
)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
// Drop out early if the relation already existed
return Ok(());
}
data_dir_mod
.metadata_state
.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
data_dir_mod.put_metadata_page(
lsn,
rel_dir_key,
Bytes::from(RelDirectory::ser(&rel_dir).context("serialize")?),
);
// Put size
let size_key = rel_size_to_key(rel);
let buf = nblocks.to_le_bytes();
data_dir_mod.put_metadata_page(lsn, size_key, Bytes::from(buf.to_vec()));
data_dir_mod.pending_nblocks += nblocks as i64;
// Update relation size cache
data_dir_mod
.metadata_state
.tline
.set_cached_rel_size(rel, lsn, nblocks);
Ok(())
}
}
}
}
/// DatadirModification represents an operation to ingest an atomic set of
/// updates to the repository. It is created by the 'begin_record'
/// function. It is called for each WAL record, so that all the modifications
@@ -1055,6 +1279,8 @@ pub struct DatadirModification<'a> {
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
metadata_state: MetadataWriteState<'a>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
@@ -1081,6 +1307,26 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
/// Apply a complex write op that may require read-modify-write to the underlying Timeline.
async fn put_metadata_op(
&mut self,
lsn: Lsn,
meta: MetadataOp,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Placeholder: just run inline.
// TODO: make this sync, and defer all these to commit(), so that we don't have to carry a Timeline all the time.
meta.apply(lsn, self, ctx).await
}
/// While applying a metadata op, write a materialized page.
fn put_metadata_page(&mut self, lsn: Lsn, key: Key, value: Bytes) {
eprintln!("put_metadata_page {key} @ {lsn}");
self.put_at_lsn(lsn, key, Value::Image(value.clone()));
self.metadata_state.put(lsn, key, value);
}
/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
@@ -1092,9 +1338,6 @@ impl<'a> DatadirModification<'a> {
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory
self.init_aux_dir()?;
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})?;
@@ -1196,33 +1439,12 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Add it to the directory (if it doesn't exist already)
let buf = self.get(DBDIR_KEY, ctx).await?;
let mut dbdir = DbDirectory::des(&buf)?;
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
if r.is_none() || r == Some(false) {
// The dbdir entry didn't exist, or it contained a
// 'false'. The 'insert' call already updated it with
// 'true', now write the updated 'dbdirs' map back.
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory as well
self.init_aux_dir()?;
}
if r.is_none() {
// Create RelDirectory
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
self.put(
rel_dir_to_key(spcnode, dbnode),
Value::Image(Bytes::from(buf)),
);
}
self.put_metadata_op(
self.lsn,
MetadataOp::UpsertRelDirectory { spcnode, dbnode },
ctx,
)
.await?;
self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
Ok(())
}
@@ -1316,56 +1538,15 @@ impl<'a> DatadirModification<'a> {
rel: RelTag,
nblocks: BlockNumber,
ctx: &RequestContext,
) -> Result<(), RelationError> {
if rel.relnode == 0 {
return Err(RelationError::InvalidRelnode);
}
// It's possible that this is the first rel for this db in this
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
) -> anyhow::Result<()> {
// TODO: here, or earlier, validate that rel.relnode != 0 -- perhaps on construction of the RelTag?
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
let size_key = rel_size_to_key(rel);
let buf = nblocks.to_le_bytes();
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
self.pending_nblocks += nblocks as i64;
// Update relation size cache
self.tline.set_cached_rel_size(rel, self.lsn, nblocks);
self.put_metadata_op(
self.lsn,
MetadataOp::UpsertRelDirectory2 { rel, nblocks },
ctx,
)
.await?;
// Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
// caller.
@@ -1570,19 +1751,6 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub fn init_aux_dir(&mut self) -> anyhow::Result<()> {
if let AuxFilePolicy::V2 = self.tline.get_switch_aux_file_policy() {
return Ok(());
}
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, 0));
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
Ok(())
}
pub async fn put_file(
&mut self,
path: &str,
@@ -1877,6 +2045,7 @@ impl<'a> DatadirModification<'a> {
// Internal helper functions to batch the modifications
// TODO: retire this once all metadata writes are going via MetadataWriteState
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
@@ -1910,15 +2079,19 @@ impl<'a> DatadirModification<'a> {
}
fn put(&mut self, key: Key, val: Value) {
self.put_at_lsn(self.lsn, key, val)
}
fn put_at_lsn(&mut self, lsn: Lsn, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
if *last_lsn == lsn {
*last_value = val;
return;
}
}
values.push((self.lsn, val));
values.push((lsn, val));
}
fn delete(&mut self, key_range: Range<Key>) {

View File

@@ -92,6 +92,7 @@ impl WalIngest {
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<bool> {
eprintln!("ingest_record @ {lsn}");
WAL_INGEST.records_received.inc();
let pg_version = modification.tline.pg_version;
let prev_len = modification.len();