Compare commits

...

8 Commits

Author SHA1 Message Date
Alex Chi Z
bd348e50a4 fmt
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 13:16:23 -05:00
Alex Chi Z
d00f7f4f24 ensure image layers are created correctly
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 13:06:59 -05:00
Alex Chi Z
e1ec90207f remove from original keyspace
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
9b5767aba5 use scan instaed of get_vectored
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
9d28011e42 more comments
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
17b0b6da9f const fn
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
f2b4ce3224 fix scan not found <-> empty
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
Alex Chi Z
61a4ff912a feat(pageserver): migrate reldir to sparse keyspace
Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-01-13 11:13:12 -05:00
3 changed files with 231 additions and 92 deletions

View File

@@ -5,6 +5,7 @@ use postgres_ffi::Oid;
use postgres_ffi::RepOriginId;
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Range};
use utils::const_assert;
use crate::reltag::{BlockNumber, RelTag, SlruKind};
@@ -47,6 +48,12 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
/// The key prefix of ReplOrigin keys.
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
/// The key prefix of db directory keys.
pub const DB_DIR_KEY_PREFIX: u8 = 0x64;
/// The key prefix of rel direcotry keys.
pub const REL_DIR_KEY_PREFIX: u8 = 0x65;
/// Check if the key falls in the range of metadata keys.
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
@@ -108,6 +115,24 @@ impl Key {
}
}
pub fn rel_dir_sparse_key_range() -> Range<Self> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX + 1,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
/// This function checks more extensively what keys we can take on the write path.
/// If a key beginning with 00 does not have a global/default tablespace OID, it
/// will be rejected on the write path.
@@ -438,6 +463,36 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
#[inline(always)]
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: relnode,
field5: forknum,
field6: 1,
}
}
pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
} // it's fine to exclude the last key b/c we only use field6 == 1
}
#[inline(always)]
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
@@ -732,9 +787,9 @@ impl Key {
self.field1 == RELATION_SIZE_PREFIX
}
pub fn sparse_non_inherited_keyspace() -> Range<Key> {
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX);
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
Key {
field1: AUX_KEY_PREFIX,
field2: 0,

View File

@@ -22,13 +22,14 @@ use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
twophase_file_key, twophase_key_range, CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY,
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::key::{rel_tag_sparse_key, Key};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
@@ -55,6 +56,8 @@ pub const MAX_AUX_FILE_DELTAS: usize = 1024;
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
pub const REL_STORE_V2: bool = true;
#[derive(Debug)]
pub enum LsnForTimestamp {
/// Found commits both before and after the given timestamp
@@ -483,12 +486,24 @@ impl Timeline {
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
return Ok(false);
}
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
if REL_STORE_V2 {
// fetch directory listing
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
let buf = version.get(self, key, ctx).await;
if let Ok(buf) = buf {
Ok(!buf.is_empty())
} else {
Ok(false)
} // TODO: sparse keyspace needs a different get function
} else {
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
}
}
/// Get a list of all existing relations in given tablespace and database.
@@ -506,20 +521,44 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
if REL_STORE_V2 {
// scan directory listing
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let results = self
.scan(KeySpace::single(key_range), version.get_lsn(), ctx)
.await?;
let mut rels = HashSet::new();
for (key, val) in results {
if val?.is_empty() {
continue;
}
assert_eq!(key.field6, 1);
assert_eq!(key.field2, spcnode);
assert_eq!(key.field3, dbnode);
rels.insert(RelTag {
spcnode,
dbnode,
relnode: key.field4,
forknum: key.field5,
});
}
Ok(rels)
} else {
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
let dir = RelDirectory::des(&buf)?;
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
relnode: *relnode,
forknum: *forknum,
}));
Ok(rels)
Ok(rels)
}
}
/// Get the whole SLRU segment
@@ -1042,7 +1081,7 @@ impl Timeline {
if has_relmap_file {
result.add_key(relmap_file_key(spcnode, dbnode));
}
result.add_key(rel_dir_to_key(spcnode, dbnode));
// result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
@@ -1116,7 +1155,11 @@ impl Timeline {
let dense_keyspace = result.to_keyspace();
let sparse_keyspace = SparseKeySpace(KeySpace {
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
ranges: vec![
Key::metadata_aux_key_range(),
repl_origin_key_range(),
Key::rel_dir_sparse_key_range(),
],
});
if cfg!(debug_assertions) {
@@ -1608,7 +1651,7 @@ impl DatadirModification<'_> {
let buf = DbDirectory::ser(&dbdir)?;
self.put(DBDIR_KEY, Value::Image(buf.into()));
}
if r.is_none() {
if !REL_STORE_V2 && r.is_none() {
// Create RelDirectory
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
@@ -1730,8 +1773,8 @@ impl DatadirModification<'_> {
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if REL_STORE_V2 {
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
@@ -1739,30 +1782,51 @@ impl DatadirModification<'_> {
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
}
let rel_dir_key = rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists
if let Ok(buf) = self.get(rel_dir_key, ctx).await {
if !buf.is_empty() {
return Err(RelationError::AlreadyExists);
}
}
self.put(rel_dir_key, Value::Image(Bytes::from_static(b"1")));
// TODO: update directory_entries_count, it seems to be a metrics
} else {
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
let size_key = rel_size_to_key(rel);
let buf = nblocks.to_le_bytes();
@@ -1841,33 +1905,57 @@ impl DatadirModification<'_> {
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
if REL_STORE_V2 {
for ((spc_node, db_node), rel_tags) in drop_relations {
for rel_tag in rel_tags {
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
if self.get(key, ctx).await.is_ok() {
// remove the relation key
self.put(key, Value::Image(Bytes::from_static(b""))); // put tombstone
let mut dirty = false;
for rel_tag in rel_tags {
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
}
}
}
} else {
for ((spc_node, db_node), rel_tags) in drop_relations {
let dir_key = rel_dir_to_key(spc_node, db_node);
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
let mut dirty = false;
for rel_tag in rel_tags {
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
self.pending_nblocks -= old_size as i64;
// Remove entry from relation size cache
self.tline.remove_cached_rel_size(&rel_tag);
// Delete size entry, as well as all blocks
self.delete(rel_key_range(rel_tag));
}
}
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
}
}
}

View File

@@ -3243,7 +3243,9 @@ impl Timeline {
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
// Do not fire missing key error for sparse keys.
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
// figuring out what is the inherited key range and do a fine-grained pruning.
removed.remove_overlapping_with(&KeySpace {
ranges: vec![SPARSE_RANGE],
});
@@ -3781,36 +3783,30 @@ impl Timeline {
return Err(FlushLayerError::Cancelled);
}
let mut layers_to_upload = Vec::new();
layers_to_upload.extend(
self.create_image_layers(
&rel_partition,
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
let mut partitions = KeyPartitioning::default();
partitions.parts.extend(rel_partition.parts);
if !metadata_partition.parts.is_empty() {
assert_eq!(
metadata_partition.parts.len(),
1,
"currently sparse keyspace should only contain a single metadata keyspace"
);
layers_to_upload.extend(
self.create_image_layers(
// Safety: create_image_layers treat sparse keyspaces differently that it does not scan
// every single key within the keyspace, and therefore, it's safe to force converting it
// into a dense keyspace before calling this function.
&metadata_partition.into_dense(),
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
partitions
.parts
.extend(metadata_partition.into_dense().parts);
}
let mut layers_to_upload = Vec::new();
layers_to_upload.extend(
self.create_image_layers(
&partitions,
self.initdb_lsn,
ImageLayerCreationMode::Initial,
ctx,
)
.await?,
);
(layers_to_upload, None)
} else {
// Normal case, write out a L0 delta layer file.