pageserver - timeline history api

This commit is contained in:
Patrick Insinger
2021-05-27 23:15:50 -04:00
committed by Patrick Insinger
parent f2243d7459
commit 3364a8d442
4 changed files with 277 additions and 3 deletions

View File

@@ -473,6 +473,16 @@ impl Timeline for ObjectTimeline {
Ok(())
}
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>> {
let lsn = self.last_valid_lsn.load();
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory {
lsn,
iter,
last_relation_size: None,
}))
}
}
impl ObjectTimeline {
@@ -809,6 +819,89 @@ impl ObjectTimeline {
}
}
struct ObjectHistory<'a> {
iter: Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>,
lsn: Lsn,
last_relation_size: Option<(BufferTag, u32)>,
}
impl<'a> Iterator for ObjectHistory<'a> {
type Item = Result<RelationUpdate>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
}
}
impl<'a> History for ObjectHistory<'a> {
fn lsn(&self) -> Lsn {
self.lsn
}
}
impl<'a> ObjectHistory<'a> {
fn handle_relation_size(
&mut self,
buf_tag: BufferTag,
entry: RelationSizeEntry,
) -> Option<Update> {
match entry {
RelationSizeEntry::Size(size) => {
// we only want to output truncations, expansions are filtered out
let last_relation_size = self.last_relation_size.replace((buf_tag, size));
match last_relation_size {
Some((last_buf, last_size)) if last_buf != buf_tag || size < last_size => {
Some(Update::Truncate { n_blocks: size })
}
_ => None,
}
}
RelationSizeEntry::Unlink => Some(Update::Unlink),
}
}
fn handle_page(&mut self, buf_tag: BufferTag, entry: PageEntry) -> Update {
match entry {
PageEntry::Page(img) => Update::Page {
blknum: buf_tag.blknum,
img,
},
PageEntry::WALRecord(rec) => Update::WALRecord {
blknum: buf_tag.blknum,
rec,
},
}
}
fn next_result(&mut self) -> Result<Option<RelationUpdate>> {
while let Some((buf_tag, lsn, value)) = self.iter.next().transpose()? {
if buf_tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM {
continue;
}
let update = if buf_tag.blknum == RELATION_SIZE_BLKNUM {
let entry = RelationSizeEntry::des(&value)?;
match self.handle_relation_size(buf_tag, entry) {
Some(relation_update) => relation_update,
None => continue,
}
} else {
let entry = PageEntry::des(&value)?;
self.handle_page(buf_tag, entry)
};
return Ok(Some(RelationUpdate {
rel: buf_tag.rel,
lsn,
update,
}));
}
Ok(None)
}
}
///
/// We store two kinds of page versions in the repository:
///
@@ -846,12 +939,16 @@ pub enum RelationSizeEntry {
Unlink,
}
// No real block in PostgreSQL will have block number u32::MAX
// See vendor/postgres/src/include/storage/block.h
const RELATION_SIZE_BLKNUM: u32 = u32::MAX;
const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey {
ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
rel,
blknum: u32::MAX,
blknum: RELATION_SIZE_BLKNUM,
},
}
}

View File

@@ -1,5 +1,5 @@
//! Low-level key-value storage abstraction.
//!
//!
use crate::repository::{BufferTag, RelTag};
use crate::ZTimelineId;
use anyhow::Result;
@@ -50,6 +50,16 @@ pub trait ObjectStore: Send + Sync {
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>>;
/// Iterate through versions of all objects in a timeline.
///
/// Returns objects in increasing key-version order.
/// Returns all versions up to and including the specified LSN.
fn objects<'a>(
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>>;
/// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'.
///
/// This is used to implement 'create database'

View File

@@ -86,6 +86,34 @@ pub trait Timeline: Send + Sync {
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint(&self) -> Result<()>;
/// Events for all relations in the timeline.
/// Contains updates from start up to the last valid LSN
/// at time of history() call. This lsn can be read via the lsn() function.
///
/// Relation size is increased implicitly and decreased with Truncate updates.
// TODO ordering guarantee?
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>>;
}
pub trait History: Iterator<Item = Result<RelationUpdate>> {
/// The last_valid_lsn at the time of history() call.
fn lsn(&self) -> Lsn;
}
#[derive(Debug, PartialEq, Eq)]
pub struct RelationUpdate {
pub rel: RelTag,
pub lsn: Lsn,
pub update: Update,
}
#[derive(Debug, PartialEq, Eq)]
pub enum Update {
Page { blknum: u32, img: Bytes },
WALRecord { blknum: u32, rec: WALRecord },
Truncate { n_blocks: u32 },
Unlink,
}
#[derive(Clone)]
@@ -123,6 +151,15 @@ pub struct RelTag {
pub relnode: u32,
}
impl RelTag {
pub const ZEROED: Self = Self {
forknum: 0,
spcnode: 0,
dbnode: 0,
relnode: 0,
};
}
/// Display RelTag in the same format that's used in most PostgreSQL debug messages:
///
/// <spcnode>/<dbnode>/<relnode>[_fsm|_vm|_init]
@@ -154,7 +191,14 @@ pub struct BufferTag {
pub blknum: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
impl BufferTag {
pub const ZEROED: Self = Self {
rel: RelTag::ZEROED,
blknum: 0,
};
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record
pub will_init: bool,
@@ -392,6 +436,57 @@ mod tests {
Ok(())
}
#[test]
fn test_history() -> Result<()> {
let repo = get_test_repo("test_snapshot")?;
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(0));
assert_eq!(None, snapshot.next().transpose()?);
// add a page and advance the last valid LSN
let buf = TEST_BUF(1);
tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?;
tline.advance_last_valid_lsn(Lsn(1));
let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let expected_page = RelationUpdate {
rel: buf.rel,
lsn: Lsn(1),
update: Update::Page {
blknum: buf.blknum,
img: TEST_IMG("blk 1 @ lsn 1"),
},
};
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// truncate to zero, but don't advance the last valid LSN
tline.put_truncation(buf.rel, Lsn(2), 0)?;
let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// advance the last valid LSN and the truncation should be observable
tline.advance_last_valid_lsn(Lsn(2));
let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(2));
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
let expected_truncate = RelationUpdate {
rel: buf.rel,
lsn: Lsn(2),
update: Update::Truncate { n_blocks: 0 },
};
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API
assert_eq!(None, snapshot.next().transpose()?);
Ok(())
}
// Mock WAL redo manager that doesn't do much
struct TestRedoManager {}

View File

@@ -17,6 +17,19 @@ struct StorageKey {
lsn: Lsn,
}
impl StorageKey {
/// The first key for a given timeline
fn timeline_start(timeline: ZTimelineId) -> Self {
Self {
obj_key: ObjectKey {
timeline,
buf_tag: BufferTag::ZEROED,
},
lsn: Lsn(0),
}
}
}
pub struct RocksObjectStore {
_conf: &'static PageServerConf,
@@ -109,6 +122,29 @@ impl ObjectStore for RocksObjectStore {
Ok(rels)
}
/// Iterate through versions of all objects in a timeline.
///
/// Returns objects in increasing key-version order.
/// Returns all versions up to and including the specified LSN.
fn objects<'a>(
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(BufferTag, Lsn, Vec<u8>)>> + 'a>> {
let start_key = StorageKey::timeline_start(timeline);
let start_key_bytes = StorageKey::ser(&start_key)?;
let iter = self.db.iterator(rocksdb::IteratorMode::From(
&start_key_bytes,
rocksdb::Direction::Forward,
));
Ok(Box::new(RocksObjects {
timeline,
lsn,
iter,
}))
}
}
impl RocksObjectStore {
@@ -206,3 +242,39 @@ impl<'a> Iterator for RocksObjectVersionIter<'a> {
Some((key.lsn, result))
}
}
struct RocksObjects<'r> {
iter: rocksdb::DBIterator<'r>,
timeline: ZTimelineId,
lsn: Lsn,
}
impl<'r> Iterator for RocksObjects<'r> {
// TODO consider returning Box<[u8]>
type Item = Result<(BufferTag, Lsn, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
}
}
impl<'r> RocksObjects<'r> {
fn next_result(&mut self) -> Result<Option<(BufferTag, Lsn, Vec<u8>)>> {
for (key_bytes, v) in &mut self.iter {
let key = StorageKey::des(&key_bytes)?;
if key.obj_key.timeline != self.timeline {
return Ok(None);
}
if key.lsn > self.lsn {
// TODO can speed up by seeking iterator
continue;
}
return Ok(Some((key.obj_key.buf_tag, key.lsn, v.to_vec())));
}
Ok(None)
}
}