mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
8 Commits
devin/1747
...
skyzh/reld
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd348e50a4 | ||
|
|
d00f7f4f24 | ||
|
|
e1ec90207f | ||
|
|
9b5767aba5 | ||
|
|
9d28011e42 | ||
|
|
17b0b6da9f | ||
|
|
f2b4ce3224 | ||
|
|
61a4ff912a |
@@ -5,6 +5,7 @@ use postgres_ffi::Oid;
|
||||
use postgres_ffi::RepOriginId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{fmt, ops::Range};
|
||||
use utils::const_assert;
|
||||
|
||||
use crate::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
|
||||
@@ -47,6 +48,12 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
|
||||
/// The key prefix of ReplOrigin keys.
|
||||
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
|
||||
|
||||
/// The key prefix of db directory keys.
|
||||
pub const DB_DIR_KEY_PREFIX: u8 = 0x64;
|
||||
|
||||
/// The key prefix of rel direcotry keys.
|
||||
pub const REL_DIR_KEY_PREFIX: u8 = 0x65;
|
||||
|
||||
/// Check if the key falls in the range of metadata keys.
|
||||
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
|
||||
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
|
||||
@@ -108,6 +115,24 @@ impl Key {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rel_dir_sparse_key_range() -> Range<Self> {
|
||||
Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: REL_DIR_KEY_PREFIX + 1,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// This function checks more extensively what keys we can take on the write path.
|
||||
/// If a key beginning with 00 does not have a global/default tablespace OID, it
|
||||
/// will be rejected on the write path.
|
||||
@@ -438,6 +463,36 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
|
||||
Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: relnode,
|
||||
field5: forknum,
|
||||
field6: 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
|
||||
Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: REL_DIR_KEY_PREFIX,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: u32::MAX,
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX,
|
||||
} // it's fine to exclude the last key b/c we only use field6 == 1
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
Key {
|
||||
@@ -732,9 +787,9 @@ impl Key {
|
||||
self.field1 == RELATION_SIZE_PREFIX
|
||||
}
|
||||
|
||||
pub fn sparse_non_inherited_keyspace() -> Range<Key> {
|
||||
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
|
||||
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
|
||||
debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX);
|
||||
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
|
||||
Key {
|
||||
field1: AUX_KEY_PREFIX,
|
||||
field2: 0,
|
||||
|
||||
@@ -22,13 +22,14 @@ use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
|
||||
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
|
||||
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
|
||||
slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
|
||||
twophase_file_key, twophase_key_range, CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY,
|
||||
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
|
||||
};
|
||||
use pageserver_api::key::{rel_tag_sparse_key, Key};
|
||||
use pageserver_api::keyspace::SparseKeySpace;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
@@ -55,6 +56,8 @@ pub const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
|
||||
|
||||
pub const REL_STORE_V2: bool = true;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum LsnForTimestamp {
|
||||
/// Found commits both before and after the given timestamp
|
||||
@@ -483,12 +486,24 @@ impl Timeline {
|
||||
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
|
||||
return Ok(false);
|
||||
}
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
|
||||
if REL_STORE_V2 {
|
||||
// fetch directory listing
|
||||
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
|
||||
let buf = version.get(self, key, ctx).await;
|
||||
if let Ok(buf) = buf {
|
||||
Ok(!buf.is_empty())
|
||||
} else {
|
||||
Ok(false)
|
||||
} // TODO: sparse keyspace needs a different get function
|
||||
} else {
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a list of all existing relations in given tablespace and database.
|
||||
@@ -506,20 +521,44 @@ impl Timeline {
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<HashSet<RelTag>, PageReconstructError> {
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
if REL_STORE_V2 {
|
||||
// scan directory listing
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let results = self
|
||||
.scan(KeySpace::single(key_range), version.get_lsn(), ctx)
|
||||
.await?;
|
||||
let mut rels = HashSet::new();
|
||||
for (key, val) in results {
|
||||
if val?.is_empty() {
|
||||
continue;
|
||||
}
|
||||
assert_eq!(key.field6, 1);
|
||||
assert_eq!(key.field2, spcnode);
|
||||
assert_eq!(key.field3, dbnode);
|
||||
rels.insert(RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
});
|
||||
}
|
||||
Ok(rels)
|
||||
} else {
|
||||
// fetch directory listing
|
||||
let key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
let rels: HashSet<RelTag> =
|
||||
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: *relnode,
|
||||
forknum: *forknum,
|
||||
}));
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
let rels: HashSet<RelTag> =
|
||||
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: *relnode,
|
||||
forknum: *forknum,
|
||||
}));
|
||||
|
||||
Ok(rels)
|
||||
Ok(rels)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the whole SLRU segment
|
||||
@@ -1042,7 +1081,7 @@ impl Timeline {
|
||||
if has_relmap_file {
|
||||
result.add_key(relmap_file_key(spcnode, dbnode));
|
||||
}
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
// result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
|
||||
@@ -1116,7 +1155,11 @@ impl Timeline {
|
||||
|
||||
let dense_keyspace = result.to_keyspace();
|
||||
let sparse_keyspace = SparseKeySpace(KeySpace {
|
||||
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
|
||||
ranges: vec![
|
||||
Key::metadata_aux_key_range(),
|
||||
repl_origin_key_range(),
|
||||
Key::rel_dir_sparse_key_range(),
|
||||
],
|
||||
});
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
@@ -1608,7 +1651,7 @@ impl DatadirModification<'_> {
|
||||
let buf = DbDirectory::ser(&dbdir)?;
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
}
|
||||
if r.is_none() {
|
||||
if !REL_STORE_V2 && r.is_none() {
|
||||
// Create RelDirectory
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
@@ -1730,8 +1773,8 @@ impl DatadirModification<'_> {
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?;
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir =
|
||||
|
||||
if REL_STORE_V2 {
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
@@ -1739,30 +1782,51 @@ impl DatadirModification<'_> {
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
}
|
||||
let rel_dir_key = rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
|
||||
// check if the rel_dir_key exists
|
||||
if let Ok(buf) = self.get(rel_dir_key, ctx).await {
|
||||
if !buf.is_empty() {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
}
|
||||
self.put(rel_dir_key, Value::Image(Bytes::from_static(b"1")));
|
||||
// TODO: update directory_entries_count, it seems to be a metrics
|
||||
} else {
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir =
|
||||
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
|
||||
// Didn't exist. Update dbdir
|
||||
e.insert(false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
// and create the RelDirectory
|
||||
RelDirectory::default()
|
||||
} else {
|
||||
// reldir already exists, fetch it
|
||||
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
|
||||
.context("deserialize db")?
|
||||
};
|
||||
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
RelDirectory::ser(&rel_dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
|
||||
// Put size
|
||||
let size_key = rel_size_to_key(rel);
|
||||
let buf = nblocks.to_le_bytes();
|
||||
@@ -1841,33 +1905,57 @@ impl DatadirModification<'_> {
|
||||
drop_relations: HashMap<(u32, u32), Vec<RelTag>>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
let dir_key = rel_dir_to_key(spc_node, db_node);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = RelDirectory::des(&buf)?;
|
||||
if REL_STORE_V2 {
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
for rel_tag in rel_tags {
|
||||
let key =
|
||||
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
|
||||
if self.get(key, ctx).await.is_ok() {
|
||||
// remove the relation key
|
||||
self.put(key, Value::Image(Bytes::from_static(b""))); // put tombstone
|
||||
|
||||
let mut dirty = false;
|
||||
for rel_tag in rel_tags {
|
||||
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
|
||||
dirty = true;
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel_tag);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as i64;
|
||||
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel_tag);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as i64;
|
||||
// Remove entry from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel_tag);
|
||||
|
||||
// Remove entry from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel_tag);
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel_tag));
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel_tag));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for ((spc_node, db_node), rel_tags) in drop_relations {
|
||||
let dir_key = rel_dir_to_key(spc_node, db_node);
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = RelDirectory::des(&buf)?;
|
||||
|
||||
if dirty {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, dir.rels.len()));
|
||||
let mut dirty = false;
|
||||
for rel_tag in rel_tags {
|
||||
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
|
||||
dirty = true;
|
||||
|
||||
// update logical size
|
||||
let size_key = rel_size_to_key(rel_tag);
|
||||
let old_size = self.get(size_key, ctx).await?.get_u32_le();
|
||||
self.pending_nblocks -= old_size as i64;
|
||||
|
||||
// Remove entry from relation size cache
|
||||
self.tline.remove_cached_rel_size(&rel_tag);
|
||||
|
||||
// Delete size entry, as well as all blocks
|
||||
self.delete(rel_key_range(rel_tag));
|
||||
}
|
||||
}
|
||||
|
||||
if dirty {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, dir.rels.len()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3243,7 +3243,9 @@ impl Timeline {
|
||||
// space. If that's not the case, we had at least one key encounter a gap in the image layer
|
||||
// and stop the search as a result of that.
|
||||
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
|
||||
// Do not fire missing key error for sparse keys.
|
||||
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
|
||||
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
|
||||
// figuring out what is the inherited key range and do a fine-grained pruning.
|
||||
removed.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![SPARSE_RANGE],
|
||||
});
|
||||
@@ -3781,36 +3783,30 @@ impl Timeline {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
let mut layers_to_upload = Vec::new();
|
||||
layers_to_upload.extend(
|
||||
self.create_image_layers(
|
||||
&rel_partition,
|
||||
self.initdb_lsn,
|
||||
ImageLayerCreationMode::Initial,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
let mut partitions = KeyPartitioning::default();
|
||||
partitions.parts.extend(rel_partition.parts);
|
||||
if !metadata_partition.parts.is_empty() {
|
||||
assert_eq!(
|
||||
metadata_partition.parts.len(),
|
||||
1,
|
||||
"currently sparse keyspace should only contain a single metadata keyspace"
|
||||
);
|
||||
layers_to_upload.extend(
|
||||
self.create_image_layers(
|
||||
// Safety: create_image_layers treat sparse keyspaces differently that it does not scan
|
||||
// every single key within the keyspace, and therefore, it's safe to force converting it
|
||||
// into a dense keyspace before calling this function.
|
||||
&metadata_partition.into_dense(),
|
||||
self.initdb_lsn,
|
||||
ImageLayerCreationMode::Initial,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
partitions
|
||||
.parts
|
||||
.extend(metadata_partition.into_dense().parts);
|
||||
}
|
||||
|
||||
let mut layers_to_upload = Vec::new();
|
||||
layers_to_upload.extend(
|
||||
self.create_image_layers(
|
||||
&partitions,
|
||||
self.initdb_lsn,
|
||||
ImageLayerCreationMode::Initial,
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
(layers_to_upload, None)
|
||||
} else {
|
||||
// Normal case, write out a L0 delta layer file.
|
||||
|
||||
Reference in New Issue
Block a user