Merge branch 'abstract_repository_iterator' into page_server_nonrel_wal_redo

This commit is contained in:
Konstantin Knizhnik
2021-05-24 11:36:44 +03:00
2 changed files with 280 additions and 240 deletions

View File

@@ -9,6 +9,8 @@ use postgres_ffi::relfile_utils::forknumber_to_name;
use std::fmt;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use log::*;
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
@@ -69,6 +71,12 @@ pub trait Timeline {
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Put raw data
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()>;
/// Get repository iterator
fn iterator(&self) -> Box<dyn RepositoryIterator + '_>;
/// Create a new database from a template database
///
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
@@ -81,7 +89,49 @@ pub trait Timeline {
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()>;
) -> Result<()> {
let mut n = 0;
for forknum in &[
pg_constants::MAIN_FORKNUM,
pg_constants::FSM_FORKNUM,
pg_constants::VISIBILITYMAP_FORKNUM,
pg_constants::INIT_FORKNUM,
pg_constants::PG_FILENODEMAP_FORKNUM,
] {
let key = RepositoryKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: *forknum,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.iterator();
iter.first(&key);
while iter.valid() {
let mut key = iter.key();
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
self.put_raw_data(key, iter.value())?;
n += 1;
iter.next();
}
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
///
/// Helper function to parse a WAL record and call the above functions for all the
@@ -167,15 +217,159 @@ pub trait Timeline {
fn advance_last_record_lsn(&self, lsn: Lsn);
fn get_last_record_lsn(&self) -> Lsn;
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn>;
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>;
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
let _lsn = self.wait_lsn(lsn)?;
let mut key = RepositoryKey {
// minimal key to start with
tag: BufferTag { rel, blknum: 0 },
lsn: Lsn(0),
};
let mut iter = self.iterator();
iter.first(&key);
if iter.valid() {
let thiskey = iter.key();
let tag = thiskey.tag;
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
iter.last(&key); // locate last entry
if iter.valid() {
let thiskey = iter.key();
let last_blknum = thiskey.tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
}
Ok((0, 0)) // empty range
}
/// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used)
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>>;
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
let key = RepositoryKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut dbs = Vec::new();
let mut iter = self.iterator();
iter.first(&key);
let mut prev_tag = key.tag.rel;
while iter.valid() {
let key = iter.key();
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if key.tag.rel != prev_tag && key.lsn <= lsn {
prev_tag = key.tag.rel;
dbs.push(prev_tag); // collect unique tags
}
iter.next();
}
return Ok(dbs);
}
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>>;
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
let key = RepositoryKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut gxacts = Vec::new();
let mut iter = self.iterator();
iter.first(&key);
while iter.valid() {
let key = iter.key();
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
if key.lsn <= lsn {
let xid = key.tag.blknum;
let tag = BufferTag {
rel: RelTag {
forknum: pg_constants::PG_XACT_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
};
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
gxacts.push(xid);
}
}
iter.next();
}
return Ok(gxacts);
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct RepositoryKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
impl RepositoryKey {
fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
fn unpack(buf: &mut Bytes) -> RepositoryKey {
RepositoryKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
pub trait RepositoryIterator {
fn first(&mut self, key: &RepositoryKey);
fn last(&mut self, key: &RepositoryKey);
fn next(&mut self);
fn prev(&mut self);
fn valid(&self) -> bool;
fn key(&self) -> RepositoryKey;
fn value(&self) -> &[u8];
}
#[derive(Clone)]

View File

@@ -5,9 +5,8 @@
// full page images, keyed by the RelFileNode, blocknumber, and the
// LSN.
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord, RepositoryKey, RepositoryIterator};
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::{Oid, TransactionId};
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -16,7 +15,6 @@ use crate::ZTimelineId;
use anyhow::{bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::nonrelfile_utils::transaction_id_get_status;
use postgres_ffi::*;
use std::cmp::min;
use std::collections::HashMap;
@@ -84,35 +82,6 @@ pub struct RocksTimeline {
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
struct CacheKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
impl CacheKey {
fn pack(&self, buf: &mut BytesMut) {
self.tag.pack(buf);
buf.put_u64(self.lsn.0);
}
fn unpack(buf: &mut Bytes) -> CacheKey {
CacheKey {
tag: BufferTag::unpack(buf),
lsn: Lsn::from(buf.get_u64()),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
enum CacheEntryContent {
PageImage(Bytes),
@@ -190,6 +159,33 @@ impl RocksRepository {
}
}
}
struct RocksIterator<'a> {
iter: rocksdb::DBRawIterator<'a>,
}
impl<'a> RepositoryIterator for RocksIterator<'a> {
fn next(&mut self) {
self.iter.next()
}
fn prev(&mut self) {
self.iter.prev()
}
fn valid(&self) -> bool {
self.iter.valid()
}
fn first(&mut self, key: &RepositoryKey) {
self.iter.seek(key.to_bytes())
}
fn last(&mut self, key: &RepositoryKey) {
self.iter.seek_for_prev(key.to_bytes())
}
fn key(&self) -> RepositoryKey {
RepositoryKey::from_slice(self.iter.key().unwrap())
}
fn value(&self) -> &[u8] {
self.iter.value().unwrap()
}
}
// Get handle to a given timeline. It is assumed to already exist.
impl Repository for RocksRepository {
@@ -299,7 +295,7 @@ impl RocksTimeline {
tag: BufferTag,
lsn: Lsn,
) -> (Option<Bytes>, Vec<WALRecord>) {
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
@@ -309,7 +305,7 @@ impl RocksTimeline {
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = RepositoryKey::from_slice(iter.key().unwrap());
if key.tag != tag {
break;
}
@@ -344,7 +340,7 @@ impl RocksTimeline {
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
assert!(lsn <= self.last_valid_lsn.load());
let mut key = CacheKey {
let mut key = RepositoryKey {
tag: BufferTag {
rel,
blknum: u32::MAX,
@@ -355,7 +351,7 @@ impl RocksTimeline {
loop {
iter.seek_for_prev(key.to_bytes());
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let thiskey = RepositoryKey::from_slice(iter.key().unwrap());
if thiskey.tag.rel == rel {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
if let CacheEntryContent::Truncation = content {
@@ -383,7 +379,7 @@ impl RocksTimeline {
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
let mut maxkey = CacheKey {
let mut maxkey = RepositoryKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
@@ -404,7 +400,7 @@ impl RocksTimeline {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = RepositoryKey::from_slice(iter.key().unwrap());
let v = iter.value().unwrap();
inspected += 1;
@@ -457,7 +453,7 @@ impl RocksTimeline {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let key = CacheKey::from_slice(iter.key().unwrap());
let key = RepositoryKey::from_slice(iter.key().unwrap());
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
@@ -495,7 +491,7 @@ impl RocksTimeline {
if !iter.valid() {
break;
}
let key = CacheKey::from_slice(iter.key().unwrap());
let key = RepositoryKey::from_slice(iter.key().unwrap());
if key.tag != maxkey.tag {
break;
}
@@ -526,35 +522,6 @@ impl RocksTimeline {
}
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
}
impl Timeline for RocksTimeline {
@@ -572,13 +539,13 @@ impl Timeline for RocksTimeline {
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = RepositoryKey::from_slice(iter.key().unwrap());
if key.tag == tag {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
let page_img: Bytes;
@@ -623,124 +590,14 @@ impl Timeline for RocksTimeline {
self.relsize_get_nowait(rel, lsn)
}
/// Get vector of prepared twophase transactions
fn get_twophase(&self, lsn: Lsn) -> Result<Vec<TransactionId>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_TWOPHASE_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut gxacts = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM {
break; // we are done with this fork
}
if key.lsn <= lsn {
let xid = key.tag.blknum;
let tag = BufferTag {
rel: RelTag {
forknum: pg_constants::PG_XACT_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE,
};
let clog_page = self.get_page_at_lsn(tag, lsn)?;
let status = transaction_id_get_status(xid, &clog_page[..]);
if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS {
gxacts.push(xid);
}
}
iter.next();
}
return Ok(gxacts);
}
/// Get databases. This function is used to local pg_filenode.map files
fn get_databases(&self, lsn: Lsn) -> Result<Vec<RelTag>> {
let key = CacheKey {
// minimal key
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::PG_FILENODEMAP_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut dbs = Vec::new();
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
let mut prev_tag = key.tag.rel;
while iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM {
break; // we are done with this fork
}
if key.tag.rel != prev_tag && key.lsn <= lsn {
prev_tag = key.tag.rel;
dbs.push(prev_tag); // collect unique tags
}
iter.next();
}
return Ok(dbs);
}
/// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations
/// but can be also applied to normal relations.
fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> {
let _lsn = self.wait_lsn(lsn)?;
let mut key = CacheKey {
// minimal key to start with
tag: BufferTag { rel, blknum: 0 },
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes()); // locate first entry
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let tag = thiskey.tag;
if tag.rel == rel {
// still trversing this relation
let first_blknum = tag.blknum;
key.tag.blknum = u32::MAX; // maximal key
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes()); // localte last entry
if iter.valid() {
let thiskey = CacheKey::from_slice(iter.key().unwrap());
let last_blknum = thiskey.tag.blknum;
return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive
}
}
}
Ok((0, 0)) // empty range
}
///
///
/// Does relation exist at given LSN?
///
/// FIXME: this actually returns true, if the relation exists at *any* LSN
fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
let key = CacheKey {
let key = RepositoryKey {
tag: BufferTag {
rel,
blknum: u32::MAX,
@@ -750,7 +607,7 @@ impl Timeline for RocksTimeline {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.to_bytes());
if iter.valid() {
let key = CacheKey::from_slice(iter.key().unwrap());
let key = RepositoryKey::from_slice(iter.key().unwrap());
if key.tag.rel == rel {
debug!("Relation {} exists at {}", rel, lsn);
return Ok(true);
@@ -768,7 +625,7 @@ impl Timeline for RocksTimeline {
///
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let content = CacheEntryContent::WALRecord(rec);
@@ -798,7 +655,7 @@ impl Timeline for RocksTimeline {
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
for blknum in nblocks..old_rel_size {
let key = CacheKey {
let key = RepositoryKey {
tag: BufferTag { rel, blknum },
lsn,
};
@@ -815,7 +672,7 @@ impl Timeline for RocksTimeline {
/// Get page image at particular LSN
///
fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result<Option<Bytes>> {
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
if let Some(bytes) = self.db.get(key.to_bytes())? {
let content = CacheEntryContent::from_slice(&bytes);
if let CacheEntryContent::PageImage(img) = content {
@@ -830,7 +687,7 @@ impl Timeline for RocksTimeline {
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let img_len = img.len();
let key = CacheKey { tag, lsn };
let key = RepositoryKey { tag, lsn };
let content = CacheEntryContent::PageImage(img);
let mut val_buf = content.to_bytes();
@@ -858,55 +715,14 @@ impl Timeline for RocksTimeline {
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
fn put_create_database(
&self,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()> {
let mut n = 0;
for forknum in &[
pg_constants::MAIN_FORKNUM,
pg_constants::FSM_FORKNUM,
pg_constants::VISIBILITYMAP_FORKNUM,
pg_constants::INIT_FORKNUM,
pg_constants::PG_FILENODEMAP_FORKNUM,
] {
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: *forknum,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.to_bytes());
while iter.valid() {
let mut key = CacheKey::from_slice(iter.key().unwrap());
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
fn iterator(&self) -> Box<dyn RepositoryIterator + '_> {
Box::new(RocksIterator {
iter: self.db.raw_iterator(),
})
}
let v = iter.value().unwrap();
self.db.put(key.to_bytes(), v)?;
n += 1;
iter.next();
}
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()> {
self.db.put(key.to_bytes(), data)?;
Ok(())
}
@@ -961,6 +777,36 @@ impl Timeline for RocksTimeline {
self.last_valid_lsn.load()
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, lsn: Lsn) -> Result<Lsn> {
let mut lsn = lsn;
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
//trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive",
lsn
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
//
// Get statistics to be displayed in the user interface.
//