mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Use ObjectTag enum instead of special fork number to store metadata objects.
Extracted from Konstantin's larger PR: https://github.com/zenithdb/zenith/pull/268
This commit is contained in:
@@ -8,6 +8,7 @@ use std::time::Duration;
|
||||
|
||||
pub mod basebackup;
|
||||
pub mod branches;
|
||||
pub mod object_key;
|
||||
pub mod object_repository;
|
||||
pub mod object_store;
|
||||
pub mod page_cache;
|
||||
|
||||
27
pageserver/src/object_key.rs
Normal file
27
pageserver/src/object_key.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
use crate::repository::{BufferTag, RelTag};
|
||||
use crate::ZTimelineId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
///
|
||||
/// ObjectKey is the key type used to identify objects stored in an object
|
||||
/// repository. It is shared between object_repository.rs and object_store.rs.
|
||||
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects
|
||||
/// using the key given by the caller.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ObjectKey {
|
||||
pub timeline: ZTimelineId,
|
||||
pub tag: ObjectTag,
|
||||
}
|
||||
|
||||
/// ObjectTag is a part of ObjectKey that is specific to the type of
|
||||
/// the stored object.
|
||||
///
|
||||
/// NB: the order of the enum values is significant! In particular,
|
||||
/// rocksdb_storage.rs assumes that TimelineMetadataTag is first
|
||||
///
|
||||
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum ObjectTag {
|
||||
TimelineMetadataTag,
|
||||
RelationMetadata(RelTag),
|
||||
RelationBuffer(BufferTag),
|
||||
}
|
||||
@@ -13,7 +13,8 @@
|
||||
//! until we find the page we're looking for, making a separate lookup into the
|
||||
//! key-value store for each timeline.
|
||||
|
||||
use crate::object_store::{ObjectKey, ObjectStore};
|
||||
use crate::object_key::*;
|
||||
use crate::object_store::ObjectStore;
|
||||
use crate::repository::*;
|
||||
use crate::restore_local_repo::import_timeline_wal;
|
||||
use crate::walredo::WalRedoManager;
|
||||
@@ -21,7 +22,6 @@ use crate::{PageServerConf, ZTimelineId};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use postgres_ffi::pg_constants;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::max;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
@@ -329,7 +329,7 @@ impl Timeline for ObjectTimeline {
|
||||
let lsn = rec.lsn;
|
||||
let key = ObjectKey {
|
||||
timeline: self.timelineid,
|
||||
buf_tag: tag,
|
||||
tag: ObjectTag::RelationBuffer(tag),
|
||||
};
|
||||
let val = PageEntry::WALRecord(rec);
|
||||
|
||||
@@ -376,7 +376,7 @@ impl Timeline for ObjectTimeline {
|
||||
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||
let key = ObjectKey {
|
||||
timeline: self.timelineid,
|
||||
buf_tag: tag,
|
||||
tag: ObjectTag::RelationBuffer(tag),
|
||||
};
|
||||
let val = PageEntry::Page(img);
|
||||
|
||||
@@ -540,7 +540,7 @@ impl ObjectTimeline {
|
||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||
let searchkey = ObjectKey {
|
||||
timeline: self.timelineid,
|
||||
buf_tag: tag,
|
||||
tag: ObjectTag::RelationBuffer(tag),
|
||||
};
|
||||
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
|
||||
|
||||
@@ -642,7 +642,7 @@ impl ObjectTimeline {
|
||||
// old page image.
|
||||
let searchkey = ObjectKey {
|
||||
timeline: self.timelineid,
|
||||
buf_tag: tag,
|
||||
tag: ObjectTag::RelationBuffer(tag),
|
||||
};
|
||||
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
|
||||
while let Some((_key, value)) = iter.next().transpose()? {
|
||||
@@ -690,8 +690,9 @@ impl ObjectTimeline {
|
||||
|
||||
// Iterate through all relations
|
||||
for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? {
|
||||
let rel = *rels;
|
||||
let mut last_version = true;
|
||||
let mut key = relation_size_key(self.timelineid, *rels);
|
||||
let key = relation_size_key(self.timelineid, rel);
|
||||
let mut max_size = 0u32;
|
||||
let mut relation_dropped = false;
|
||||
|
||||
@@ -722,7 +723,14 @@ impl ObjectTimeline {
|
||||
}
|
||||
// Now process all relation blocks
|
||||
for blknum in 0..max_size {
|
||||
key.buf_tag.blknum = blknum;
|
||||
let buf_tag = BufferTag {
|
||||
rel,
|
||||
blknum,
|
||||
};
|
||||
let key = ObjectKey {
|
||||
timeline: self.timelineid,
|
||||
tag: ObjectTag::RelationBuffer(buf_tag),
|
||||
};
|
||||
last_version = true;
|
||||
for vers in self.obj_store.object_versions(&key, horizon)? {
|
||||
let lsn = vers.0;
|
||||
@@ -731,7 +739,7 @@ impl ObjectTimeline {
|
||||
truncated += 1;
|
||||
if !relation_dropped {
|
||||
// preserve and materialize last version before deleting all preceeding
|
||||
self.get_page_at_lsn_nowait(key.buf_tag, lsn)?;
|
||||
self.get_page_at_lsn_nowait(buf_tag, lsn)?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -797,7 +805,7 @@ impl ObjectTimeline {
|
||||
|
||||
Ok(ObjectVersionIter {
|
||||
obj_store,
|
||||
buf_tag: key.buf_tag,
|
||||
object_tag: key.tag,
|
||||
current_iter,
|
||||
ancestor_timeline: self.ancestor_timeline,
|
||||
ancestor_lsn: self.ancestor_lsn,
|
||||
@@ -806,9 +814,9 @@ impl ObjectTimeline {
|
||||
}
|
||||
|
||||
struct ObjectHistory<'a> {
|
||||
iter: Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>,
|
||||
iter: Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>,
|
||||
lsn: Lsn,
|
||||
last_relation_size: Option<(BufferTag, u32)>,
|
||||
last_relation_size: Option<(RelTag, u32)>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for ObjectHistory<'a> {
|
||||
@@ -828,16 +836,16 @@ impl<'a> History for ObjectHistory<'a> {
|
||||
impl<'a> ObjectHistory<'a> {
|
||||
fn handle_relation_size(
|
||||
&mut self,
|
||||
buf_tag: BufferTag,
|
||||
rel_tag: RelTag,
|
||||
entry: RelationSizeEntry,
|
||||
) -> Option<Update> {
|
||||
match entry {
|
||||
RelationSizeEntry::Size(size) => {
|
||||
// we only want to output truncations, expansions are filtered out
|
||||
let last_relation_size = self.last_relation_size.replace((buf_tag, size));
|
||||
let last_relation_size = self.last_relation_size.replace((rel_tag, size));
|
||||
|
||||
match last_relation_size {
|
||||
Some((last_buf, last_size)) if last_buf != buf_tag || size < last_size => {
|
||||
Some((last_buf, last_size)) if last_buf != rel_tag || size < last_size => {
|
||||
Some(Update::Truncate { n_blocks: size })
|
||||
}
|
||||
_ => None,
|
||||
@@ -861,24 +869,26 @@ impl<'a> ObjectHistory<'a> {
|
||||
}
|
||||
|
||||
fn next_result(&mut self) -> Result<Option<RelationUpdate>> {
|
||||
while let Some((buf_tag, lsn, value)) = self.iter.next().transpose()? {
|
||||
if buf_tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM {
|
||||
continue;
|
||||
}
|
||||
while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? {
|
||||
|
||||
let update = if buf_tag.blknum == RELATION_SIZE_BLKNUM {
|
||||
let entry = RelationSizeEntry::des(&value)?;
|
||||
match self.handle_relation_size(buf_tag, entry) {
|
||||
Some(relation_update) => relation_update,
|
||||
None => continue,
|
||||
let (rel_tag, update) = match object_tag {
|
||||
ObjectTag::TimelineMetadataTag => continue,
|
||||
ObjectTag::RelationMetadata(rel_tag) => {
|
||||
let entry = RelationSizeEntry::des(&value)?;
|
||||
match self.handle_relation_size(rel_tag, entry) {
|
||||
Some(relation_update) => (rel_tag, relation_update),
|
||||
None => continue,
|
||||
}
|
||||
},
|
||||
ObjectTag::RelationBuffer(buf_tag) => {
|
||||
let entry = PageEntry::des(&value)?;
|
||||
let update = self.handle_page(buf_tag, entry);
|
||||
(buf_tag.rel, update)
|
||||
}
|
||||
} else {
|
||||
let entry = PageEntry::des(&value)?;
|
||||
self.handle_page(buf_tag, entry)
|
||||
};
|
||||
|
||||
return Ok(Some(RelationUpdate {
|
||||
rel: buf_tag.rel,
|
||||
rel: rel_tag,
|
||||
lsn,
|
||||
update,
|
||||
}));
|
||||
@@ -925,24 +935,16 @@ pub enum RelationSizeEntry {
|
||||
Unlink,
|
||||
}
|
||||
|
||||
// No real block in PostgreSQL will have block number u32::MAX
|
||||
// See vendor/postgres/src/include/storage/block.h
|
||||
const RELATION_SIZE_BLKNUM: u32 = u32::MAX;
|
||||
|
||||
const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey {
|
||||
ObjectKey {
|
||||
timeline: timelineid,
|
||||
buf_tag: BufferTag {
|
||||
rel,
|
||||
blknum: RELATION_SIZE_BLKNUM,
|
||||
},
|
||||
tag: ObjectTag::RelationMetadata(rel),
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// In addition to those per-page and per-relation entries, we also
|
||||
/// store a little metadata blob for each timeline. It is stored using
|
||||
/// STORAGE_SPECIAL_FORKNUM.
|
||||
/// store a little metadata blob for each timeline.
|
||||
///
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MetadataEntry {
|
||||
@@ -955,15 +957,7 @@ pub struct MetadataEntry {
|
||||
const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey {
|
||||
ObjectKey {
|
||||
timeline: timelineid,
|
||||
buf_tag: BufferTag {
|
||||
rel: RelTag {
|
||||
forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM,
|
||||
spcnode: 0,
|
||||
dbnode: 0,
|
||||
relnode: 0,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
tag: ObjectTag::TimelineMetadataTag,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -976,7 +970,7 @@ const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey {
|
||||
struct ObjectVersionIter<'a> {
|
||||
obj_store: &'a dyn ObjectStore,
|
||||
|
||||
buf_tag: BufferTag,
|
||||
object_tag: ObjectTag,
|
||||
|
||||
/// Iterator on the current timeline.
|
||||
current_iter: Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>,
|
||||
@@ -1013,7 +1007,7 @@ impl<'a> ObjectVersionIter<'a> {
|
||||
if let Some(ancestor_timeline) = self.ancestor_timeline {
|
||||
let searchkey = ObjectKey {
|
||||
timeline: ancestor_timeline,
|
||||
buf_tag: self.buf_tag,
|
||||
tag: self.object_tag,
|
||||
};
|
||||
let ancestor_iter = self
|
||||
.obj_store
|
||||
|
||||
@@ -1,19 +1,13 @@
|
||||
//! Low-level key-value storage abstraction.
|
||||
//!
|
||||
use crate::repository::{BufferTag, RelTag};
|
||||
use crate::object_key::*;
|
||||
use crate::repository::RelTag;
|
||||
use crate::ZTimelineId;
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::iter::Iterator;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ObjectKey {
|
||||
pub timeline: ZTimelineId,
|
||||
pub buf_tag: BufferTag,
|
||||
}
|
||||
|
||||
///
|
||||
/// Low-level storage abstraction.
|
||||
///
|
||||
@@ -58,7 +52,7 @@ pub trait ObjectStore: Send + Sync {
|
||||
&'a self,
|
||||
timeline: ZTimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>>;
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>>;
|
||||
|
||||
/// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'.
|
||||
/// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster
|
||||
|
||||
@@ -139,10 +139,6 @@ pub struct RepositoryStats {
|
||||
/// are used for the same purpose.
|
||||
/// [See more related comments here](https:///github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/relfilenode.h#L57).
|
||||
///
|
||||
/// We use additional fork numbers to logically separate relational and
|
||||
/// non-relational data inside pageserver key-value storage.
|
||||
/// See, e.g., `ROCKSDB_SPECIAL_FORKNUM`.
|
||||
///
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
@@ -475,13 +471,16 @@ mod tests {
|
||||
tline.advance_last_valid_lsn(Lsn(2));
|
||||
let mut snapshot = tline.history()?;
|
||||
assert_eq!(snapshot.lsn(), Lsn(2));
|
||||
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
|
||||
|
||||
// TODO ordering not guaranteed by API. But currently it returns the
|
||||
// truncation entry before the block data.
|
||||
let expected_truncate = RelationUpdate {
|
||||
rel: buf.rel,
|
||||
lsn: Lsn(2),
|
||||
update: Update::Truncate { n_blocks: 0 },
|
||||
};
|
||||
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API
|
||||
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?);
|
||||
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
|
||||
assert_eq!(None, snapshot.next().transpose()?);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
//!
|
||||
//! An implementation of the ObjectStore interface, backed by RocksDB
|
||||
//!
|
||||
use crate::object_store::{ObjectKey, ObjectStore};
|
||||
use crate::repository::{BufferTag, RelTag};
|
||||
use crate::object_key::*;
|
||||
use crate::object_store::ObjectStore;
|
||||
use crate::repository::RelTag;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use anyhow::{bail, Result};
|
||||
@@ -24,7 +25,7 @@ impl StorageKey {
|
||||
Self {
|
||||
obj_key: ObjectKey {
|
||||
timeline,
|
||||
buf_tag: BufferTag::ZEROED,
|
||||
tag: ObjectTag::TimelineMetadataTag,
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
}
|
||||
@@ -140,42 +141,46 @@ impl ObjectStore for RocksObjectStore {
|
||||
|
||||
let mut rels: HashSet<RelTag> = HashSet::new();
|
||||
|
||||
let mut search_key = StorageKey {
|
||||
obj_key: ObjectKey {
|
||||
timeline: timelineid,
|
||||
buf_tag: BufferTag {
|
||||
rel: RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: 0,
|
||||
forknum: 0u8,
|
||||
},
|
||||
blknum: 0,
|
||||
},
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
let mut search_rel_tag = RelTag {
|
||||
spcnode,
|
||||
dbnode,
|
||||
relnode: 0,
|
||||
forknum: 0u8,
|
||||
};
|
||||
let mut iter = self.db.raw_iterator();
|
||||
loop {
|
||||
let search_key = StorageKey {
|
||||
obj_key: ObjectKey {
|
||||
timeline: timelineid,
|
||||
tag: ObjectTag::RelationMetadata(search_rel_tag),
|
||||
},
|
||||
lsn: Lsn(0),
|
||||
};
|
||||
iter.seek(search_key.ser()?);
|
||||
if !iter.valid() {
|
||||
break;
|
||||
}
|
||||
let key = StorageKey::des(iter.key().unwrap())?;
|
||||
if (spcnode != 0 && key.obj_key.buf_tag.rel.spcnode != spcnode)
|
||||
|| (dbnode != 0 && key.obj_key.buf_tag.rel.dbnode != dbnode)
|
||||
{
|
||||
|
||||
if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag {
|
||||
if spcnode != 0 && rel_tag.spcnode != spcnode
|
||||
|| dbnode != 0 && rel_tag.dbnode != dbnode
|
||||
{
|
||||
break;
|
||||
}
|
||||
if key.lsn < lsn
|
||||
{
|
||||
// visible in this snapshot
|
||||
rels.insert(rel_tag);
|
||||
}
|
||||
search_rel_tag = rel_tag;
|
||||
// skip to next relation
|
||||
// FIXME: What if relnode is u32::MAX ?
|
||||
search_rel_tag.relnode += 1;
|
||||
} else {
|
||||
// no more relation metadata entries
|
||||
break;
|
||||
}
|
||||
|
||||
if key.obj_key.buf_tag.rel.relnode != 0 // skip non-relational records (like timeline metadata)
|
||||
&& key.lsn < lsn
|
||||
// visible in this snapshot
|
||||
{
|
||||
rels.insert(key.obj_key.buf_tag.rel);
|
||||
}
|
||||
search_key = key.clone();
|
||||
search_key.obj_key.buf_tag.rel.relnode += 1; // skip to next relation
|
||||
}
|
||||
|
||||
Ok(rels)
|
||||
@@ -189,7 +194,7 @@ impl ObjectStore for RocksObjectStore {
|
||||
&'a self,
|
||||
timeline: ZTimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>> {
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>> {
|
||||
let start_key = StorageKey::timeline_start(timeline);
|
||||
let start_key_bytes = StorageKey::ser(&start_key)?;
|
||||
let iter = self.db.iterator(rocksdb::IteratorMode::From(
|
||||
@@ -296,7 +301,7 @@ impl<'a> Iterator for RocksObjectVersionIter<'a> {
|
||||
return None;
|
||||
}
|
||||
let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap();
|
||||
if key.obj_key.buf_tag != self.obj_key.buf_tag {
|
||||
if key.obj_key.tag != self.obj_key.tag {
|
||||
return None;
|
||||
}
|
||||
let val = self.dbiter.value().unwrap();
|
||||
@@ -314,7 +319,7 @@ struct RocksObjects<'r> {
|
||||
|
||||
impl<'r> Iterator for RocksObjects<'r> {
|
||||
// TODO consider returning Box<[u8]>
|
||||
type Item = Result<(BufferTag, Lsn, Vec<u8>)>;
|
||||
type Item = Result<(ObjectTag, Lsn, Vec<u8>)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.next_result().transpose()
|
||||
@@ -322,7 +327,7 @@ impl<'r> Iterator for RocksObjects<'r> {
|
||||
}
|
||||
|
||||
impl<'r> RocksObjects<'r> {
|
||||
fn next_result(&mut self) -> Result<Option<(BufferTag, Lsn, Vec<u8>)>> {
|
||||
fn next_result(&mut self) -> Result<Option<(ObjectTag, Lsn, Vec<u8>)>> {
|
||||
for (key_bytes, v) in &mut self.iter {
|
||||
let key = StorageKey::des(&key_bytes)?;
|
||||
|
||||
@@ -335,7 +340,7 @@ impl<'r> RocksObjects<'r> {
|
||||
continue;
|
||||
}
|
||||
|
||||
return Ok(Some((key.obj_key.buf_tag, key.lsn, v.to_vec())));
|
||||
return Ok(Some((key.obj_key.tag, key.lsn, v.to_vec())));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
|
||||
@@ -21,8 +21,6 @@ pub const FSM_FORKNUM: u8 = 1;
|
||||
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
|
||||
pub const INIT_FORKNUM: u8 = 3;
|
||||
|
||||
pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50;
|
||||
|
||||
// From storage_xlog.h
|
||||
pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001;
|
||||
pub const SMGR_TRUNCATE_VM: u32 = 0x0002;
|
||||
|
||||
Reference in New Issue
Block a user