Minor refactoring and cleanup of the Timeline interface.

Move `save_decoded_record` out of the Timeline trait. The storage
implementation shouldn't need to know how to decode records.

Also move put_create_database() out of the Timeline trait. Add a new
`list_rels` function to Timeline to support it, instead.

Rename `get_relsize` to `get_rel_size`, and `get_relsize_exists` to
`get_rel_exists`. Seems nicer.
This commit is contained in:
Heikki Linnakangas
2021-05-27 09:44:46 +03:00
parent 1ccf82f932
commit cb6e2d9ddb
5 changed files with 204 additions and 151 deletions

View File

@@ -835,7 +835,7 @@ impl Connection {
forknum: req.forknum,
};
let exist = timeline.get_relsize_exists(tag, req.lsn).unwrap_or(false);
let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false);
PagestreamBeMessage::Status(PagestreamStatusResponse {
ok: exist,
@@ -850,7 +850,7 @@ impl Connection {
forknum: req.forknum,
};
let n_blocks = timeline.get_relsize(tag, req.lsn).unwrap_or(0);
let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0);
PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks })
}

View File

@@ -1,12 +1,11 @@
pub mod rocksdb;
use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate};
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::forknumber_to_name;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
@@ -40,10 +39,13 @@ pub trait Timeline {
fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes>;
/// Get size of relation
fn get_relsize(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
/// Does relation exist?
fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool>;
fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool>;
/// Get a list of all distinct relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>>;
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
@@ -63,87 +65,6 @@ pub trait Timeline {
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
/// Create a new database from a template database
///
/// In PostgreSQL, CREATE DATABASE works by scanning the data directory and
/// copying all relation files from the template database. This is the equivalent
/// of that.
fn put_create_database(
&self,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()>;
///
/// Helper function to parse a WAL record and call the above functions for all the
/// relations/pages that the record affects.
///
fn save_decoded_record(
&self,
decoded: DecodedWALRecord,
recdata: Bytes,
lsn: Lsn,
) -> Result<()> {
// Figure out which blocks the record applies to, and "put" a separate copy
// of the record for each block.
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
self.put_wal_record(tag, rec);
}
// Handle a few special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&decoded);
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: pg_constants::MAIN_FORKNUM,
};
self.put_truncation(rel, lsn, truncate.blkno)?;
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&decoded);
self.put_create_database(
lsn,
createdb.db_id,
createdb.tablespace_id,
createdb.src_db_id,
createdb.src_tablespace_id,
)?;
}
// Now that this record has been handled, let the repository know that
// it is up-to-date to this LSN
self.advance_last_record_lsn(lsn);
Ok(())
}
/// Remember the all WAL before the given LSN has been processed.
///
/// The WAL receiver calls this after the put_* functions, to indicate that
@@ -359,13 +280,14 @@ mod tests {
// FIXME: The rocksdb implementation erroneously returns 'true' here, even
// though the relation was created only at a later LSN
// rocksdb implementation erroneosly returns 'true' here
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
// And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
assert_eq!(tline.get_relsize_exists(TESTREL_A, Lsn(2))?, true);
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(2))?, 1);
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
// And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(2))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(2))?, 1);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(5))?, 3);
// Check page contents at each LSN
assert_eq!(
@@ -405,7 +327,7 @@ mod tests {
tline.advance_last_valid_lsn(Lsn(6));
// Check reported size and contents after truncation
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(6))?, 2);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(6))?, 2);
assert_eq!(
tline.get_page_at_lsn(TEST_BUF(0), Lsn(6))?,
TEST_IMG("foo blk 0 at 3")
@@ -416,7 +338,7 @@ mod tests {
);
// should still see the truncated block with older LSN
assert_eq!(tline.get_relsize(TESTREL_A, Lsn(5))?, 3);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(5))?, 3);
assert_eq!(
tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?,
TEST_IMG("foo blk 2 at 5")
@@ -447,7 +369,7 @@ mod tests {
tline.advance_last_valid_lsn(Lsn(lsn));
assert_eq!(
tline.get_relsize(TESTREL_A, Lsn(lsn))?,
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE + 1
);
@@ -456,7 +378,7 @@ mod tests {
tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE)?;
tline.advance_last_valid_lsn(Lsn(lsn));
assert_eq!(
tline.get_relsize(TESTREL_A, Lsn(lsn))?,
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE
);
@@ -465,7 +387,7 @@ mod tests {
tline.put_truncation(TESTREL_A, Lsn(lsn), pg_constants::RELSEG_SIZE - 1)?;
tline.advance_last_valid_lsn(Lsn(lsn));
assert_eq!(
tline.get_relsize(TESTREL_A, Lsn(lsn))?,
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
pg_constants::RELSEG_SIZE - 1
);

View File

@@ -7,7 +7,6 @@
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::restore_local_repo::import_timeline_wal;
use crate::waldecoder::Oid;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -18,6 +17,7 @@ use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
@@ -714,7 +714,7 @@ impl Timeline for RocksTimeline {
///
/// Get size of relation at given LSN.
///
fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
let lsn = self.wait_lsn(lsn)?;
self.relsize_get_nowait(rel, lsn)
}
@@ -723,7 +723,7 @@ impl Timeline for RocksTimeline {
/// Does relation exist at given LSN?
///
/// FIXME: this actually returns true, if the relation exists at *any* LSN
fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
let key = CacheKey {
@@ -746,6 +746,46 @@ impl Timeline for RocksTimeline {
Ok(false)
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
/// through all entries in the given database. In practice, this
/// is used for CREATE DATABASE, and usually the template database is small.
/// But if it's not, this will be slow.
fn list_rels<'a>(&'a self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>> {
trace!("list_rels spcnode {} dbnode {} at {}", spcnode, dbnode, lsn);
let mut rels: HashSet<RelTag> = HashSet::new();
let searchkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: spcnode,
dbnode: dbnode,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(searchkey.ser()?);
while iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel.spcnode != spcnode || key.tag.rel.dbnode != dbnode {
break;
}
if key.lsn < lsn {
rels.insert(key.tag.rel);
}
iter.next();
}
Ok(rels)
}
// Other public functions, for updating the repository.
// These are used by the WAL receiver and WAL redo.
@@ -832,51 +872,6 @@ impl Timeline for RocksTimeline {
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
fn put_create_database(
&self,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()> {
let key = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: src_tablespace_id,
dbnode: src_db_id,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(key.ser()?);
let mut n = 0;
while iter.valid() {
let mut key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id {
break;
}
key.tag.rel.spcnode = tablespace_id;
key.tag.rel.dbnode = db_id;
key.lsn = lsn;
let v = iter.value().unwrap();
self.db.put(key.ser()?, v)?;
n += 1;
iter.next();
}
info!(
"Create database {}/{}, copy {} entries",
tablespace_id, db_id, n
);
Ok(())
}
/// Remember that WAL has been received and added to the timeline up to the given LSN
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);

View File

@@ -3,7 +3,7 @@
//! zenith repository
//!
use log::*;
use std::cmp::max;
use std::cmp::{max, min};
use std::fs;
use std::fs::File;
use std::io::Read;
@@ -14,8 +14,9 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use bytes::Bytes;
use crate::repository::{BufferTag, RelTag, Timeline};
use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder};
use crate::repository::{BufferTag, RelTag, Timeline, WALRecord};
use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder};
use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::pg_constants;
@@ -220,7 +221,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
}
if let Some((lsn, recdata)) = rec.unwrap() {
let decoded = decode_wal_record(recdata.clone());
timeline.save_decoded_record(decoded, recdata, lsn)?;
save_decoded_record(timeline, decoded, recdata, lsn)?;
last_lsn = lsn;
} else {
break;
@@ -241,3 +242,137 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
info!("reached end of WAL at {}", last_lsn);
Ok(())
}
///
/// Helper function to parse a WAL record and call the Timeline's PUT functions for all the
/// relations/pages that the record affects.
///
pub fn save_decoded_record(
timeline: &dyn Timeline,
decoded: DecodedWALRecord,
recdata: Bytes,
lsn: Lsn,
) -> Result<()> {
// Figure out which blocks the record applies to, and "put" a separate copy
// of the record for each block.
for blk in decoded.blocks.iter() {
let tag = BufferTag {
rel: RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum as u8,
},
blknum: blk.blkno,
};
let rec = WALRecord {
lsn,
will_init: blk.will_init || blk.apply_image,
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec);
}
// Handle a few special record types
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE
{
let truncate = XlSmgrTruncate::decode(&decoded);
if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 {
let rel = RelTag {
spcnode: truncate.rnode.spcnode,
dbnode: truncate.rnode.dbnode,
relnode: truncate.rnode.relnode,
forknum: pg_constants::MAIN_FORKNUM,
};
timeline.put_truncation(rel, lsn, truncate.blkno)?;
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&decoded);
save_create_database(
timeline,
lsn,
createdb.db_id,
createdb.tablespace_id,
createdb.src_db_id,
createdb.src_tablespace_id,
)?;
}
// Now that this record has been handled, let the repository know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
Ok(())
}
/// Subroutine of save_decoded_record(), to handle an XLOG_DBASE_CREATE record.
fn save_create_database(
timeline: &dyn Timeline,
lsn: Lsn,
db_id: Oid,
tablespace_id: Oid,
src_db_id: Oid,
src_tablespace_id: Oid,
) -> Result<()> {
// Creating a database is implemented by copying the template (aka. source) database.
// To copy all the relations, we need to ask for the state as of the same LSN, but we
// cannot pass 'lsn' to the Timeline.get_* functions, or they will block waiting for
// the last valid LSN to advance up to it. So we use the previous record's LSN in the
// get calls instead.
let req_lsn = min(timeline.get_last_record_lsn(), lsn);
let rels = timeline.list_rels(src_tablespace_id, src_db_id, req_lsn)?;
info!("creatdb: {} rels", rels.len());
let mut num_rels_copied = 0;
let mut num_blocks_copied = 0;
for src_rel in rels {
assert_eq!(src_rel.spcnode, src_tablespace_id);
assert_eq!(src_rel.dbnode, src_db_id);
let nblocks = timeline.get_rel_size(src_rel, req_lsn)?;
let dst_rel = RelTag {
spcnode: tablespace_id,
dbnode: db_id,
relnode: src_rel.relnode,
forknum: src_rel.forknum,
};
// Copy content
for blknum in 0..nblocks {
let src_key = BufferTag {
rel: src_rel,
blknum,
};
let dst_key = BufferTag {
rel: dst_rel,
blknum,
};
let content = timeline.get_page_at_lsn(src_key, req_lsn)?;
info!("copying block {:?} to {:?}", src_key, dst_key);
timeline.put_page_image(dst_key, lsn, content);
num_blocks_copied += 1;
}
if nblocks == 0 {
// make sure we have some trace of the relation, even if it's empty
timeline.put_truncation(dst_rel, lsn, 0)?;
}
num_rels_copied += 1;
}
info!(
"Created database {}/{}, copied {} blocks in {} rels at {}",
tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn
);
Ok(())
}

View File

@@ -7,6 +7,7 @@
//!
use crate::page_cache;
use crate::restore_local_repo;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
@@ -184,7 +185,7 @@ fn walreceiver_main(
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let decoded = decode_wal_record(recdata.clone());
timeline.save_decoded_record(decoded, recdata, lsn)?;
restore_local_repo::save_decoded_record(&*timeline, decoded, recdata, lsn)?;
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN