mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 07:00:38 +00:00
This is a backwards-incompatible change. The new pageserver cannot read repositories created with an old pageserver binary, or vice versa. Simplify Repository to a value-store ------------------------------------ Move the responsibility of tracking relation metadata, like which relations exist and what are their sizes, from Repository to a new module, pgdatadir_mapping.rs. The interface to Repository is now a simple key-value PUT/GET operations. It's still not any old key-value store though. A Repository is still responsible from handling branching, and every GET operation comes with an LSN. Mapping from Postgres data directory to keys/values --------------------------------------------------- All the data is now stored in the key-value store. The 'pgdatadir_mapping.rs' module handles mapping from PostgreSQL objects like relation pages and SLRUs, to key-value pairs. The key to the Repository key-value store is a Key struct, which consists of a few integer fields. It's wide enough to store a full RelFileNode, fork and block number, and to distinguish those from metadata keys. 'pgdatadir_mapping.rs' is also responsible for maintaining a "partitioning" of the keyspace. Partitioning means splitting the keyspace so that each partition holds a roughly equal number of keys. The partitioning is used when new image layer files are created, so that each image layer file is roughly the same size. The partitioning is also responsible for reclaiming space used by deleted keys. The Repository implementation doesn't have any explicit support for deleting keys. Instead, the deleted keys are simply omitted from the partitioning, and when a new image layer is created, the omitted keys are not copied over to the new image layer. We might want to implement tombstone keys in the future, to reclaim space faster, but this will work for now. Changes to low-level layer file code ------------------------------------ The concept of a "segment" is gone. Each layer file can now store an arbitrary range of Keys. Checkpointing, compaction ------------------------- The background tasks are somewhat different now. Whenever checkpoint_distance is reached, the WAL receiver thread "freezes" the current in-memory layer, and creates a new one. This is a quick operation and doesn't perform any I/O yet. It then launches a background "layer flushing thread" to write the frozen layer to disk, as a new L0 delta layer. This mechanism takes care of durability. It replaces the checkpointing thread. Compaction is a new background operation that takes a bunch of L0 delta layers, and reshuffles the data in them. It runs in a separate compaction thread. Deployment ---------- This also contains changes to the ansible scripts that enable having multiple different pageservers running at the same time in the staging environment. We will use that to keep an old version of the pageserver running, for clusters created with the old version, at the same time with a new pageserver with the new binary. Author: Heikki Linnakangas Author: Konstantin Knizhnik <knizhnik@zenith.tech> Author: Andrey Taranik <andrey@zenith.tech> Reviewed-by: Matthias Van De Meent <matthias@zenith.tech> Reviewed-by: Bojan Serafimov <bojan@zenith.tech> Reviewed-by: Konstantin Knizhnik <knizhnik@zenith.tech> Reviewed-by: Anton Shyrabokau <antons@zenith.tech> Reviewed-by: Dhammika Pathirana <dham@zenith.tech> Reviewed-by: Kirill Bulatov <kirill@zenith.tech> Reviewed-by: Anastasia Lubennikova <anastasia@zenith.tech> Reviewed-by: Alexey Kondratov <alexey@zenith.tech>
1351 lines
42 KiB
Rust
1351 lines
42 KiB
Rust
//!
|
|
//! This provides an abstraction to store PostgreSQL relations and other files
|
|
//! in the key-value store that implements the Repository interface.
|
|
//!
|
|
//! (TODO: The line between PUT-functions here and walingest.rs is a bit blurry, as
|
|
//! walingest.rs handles a few things like implicit relation creation and extension.
|
|
//! Clarify that)
|
|
//!
|
|
use crate::keyspace::{KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES};
|
|
use crate::reltag::{RelTag, SlruKind};
|
|
use crate::repository::*;
|
|
use crate::repository::{Repository, Timeline};
|
|
use crate::walrecord::ZenithWalRecord;
|
|
use anyhow::{bail, ensure, Result};
|
|
use bytes::{Buf, Bytes};
|
|
use postgres_ffi::{pg_constants, Oid, TransactionId};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::ops::Range;
|
|
use std::sync::atomic::{AtomicIsize, Ordering};
|
|
use std::sync::{Arc, RwLockReadGuard};
|
|
use tracing::{debug, error, trace, warn};
|
|
use zenith_utils::bin_ser::BeSer;
|
|
use zenith_utils::lsn::AtomicLsn;
|
|
use zenith_utils::lsn::Lsn;
|
|
|
|
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
|
|
pub type BlockNumber = u32;
|
|
|
|
pub struct DatadirTimeline<R>
|
|
where
|
|
R: Repository,
|
|
{
|
|
/// The underlying key-value store. Callers should not read or modify the
|
|
/// data in the underlying store directly. However, it is exposed to have
|
|
/// access to information like last-LSN, ancestor, and operations like
|
|
/// compaction.
|
|
pub tline: Arc<R::Timeline>,
|
|
|
|
/// When did we last calculate the partitioning?
|
|
last_partitioning: AtomicLsn,
|
|
|
|
/// Configuration: how often should the partitioning be recalculated.
|
|
repartition_threshold: u64,
|
|
|
|
/// Current logical size of the "datadir", at the last LSN.
|
|
current_logical_size: AtomicIsize,
|
|
}
|
|
|
|
impl<R: Repository> DatadirTimeline<R> {
|
|
pub fn new(tline: Arc<R::Timeline>, repartition_threshold: u64) -> Self {
|
|
DatadirTimeline {
|
|
tline,
|
|
last_partitioning: AtomicLsn::new(0),
|
|
current_logical_size: AtomicIsize::new(0),
|
|
repartition_threshold,
|
|
}
|
|
}
|
|
|
|
/// (Re-)calculate the logical size of the database at the latest LSN.
|
|
///
|
|
/// This can be a slow operation.
|
|
pub fn init_logical_size(&self) -> Result<()> {
|
|
let last_lsn = self.tline.get_last_record_lsn();
|
|
self.current_logical_size.store(
|
|
self.get_current_logical_size_non_incremental(last_lsn)? as isize,
|
|
Ordering::SeqCst,
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Start ingesting a WAL record, or other atomic modification of
|
|
/// the timeline.
|
|
///
|
|
/// This provides a transaction-like interface to perform a bunch
|
|
/// of modifications atomically, all stamped with one LSN.
|
|
///
|
|
/// To ingest a WAL record, call begin_modification(lsn) to get a
|
|
/// DatadirModification object. Use the functions in the object to
|
|
/// modify the repository state, updating all the pages and metadata
|
|
/// that the WAL record affects. When you're done, call commit() to
|
|
/// commit the changes.
|
|
///
|
|
/// Note that any pending modifications you make through the
|
|
/// modification object won't be visible to calls to the 'get' and list
|
|
/// functions of the timeline until you finish! And if you update the
|
|
/// same page twice, the last update wins.
|
|
///
|
|
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
|
|
DatadirModification {
|
|
tline: self,
|
|
lsn,
|
|
pending_updates: HashMap::new(),
|
|
pending_deletions: Vec::new(),
|
|
pending_nblocks: 0,
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
// Public GET functions
|
|
//------------------------------------------------------------------------------
|
|
|
|
/// Look up given page version.
|
|
pub fn get_rel_page_at_lsn(&self, tag: RelTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
|
|
ensure!(tag.relnode != 0, "invalid relnode");
|
|
|
|
let nblocks = self.get_rel_size(tag, lsn)?;
|
|
if blknum >= nblocks {
|
|
debug!(
|
|
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
|
tag, blknum, lsn, nblocks
|
|
);
|
|
return Ok(ZERO_PAGE.clone());
|
|
}
|
|
|
|
let key = rel_block_to_key(tag, blknum);
|
|
self.tline.get(key, lsn)
|
|
}
|
|
|
|
/// Get size of a relation file
|
|
pub fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<BlockNumber> {
|
|
ensure!(tag.relnode != 0, "invalid relnode");
|
|
|
|
if (tag.forknum == pg_constants::FSM_FORKNUM
|
|
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
|
|
&& !self.get_rel_exists(tag, lsn)?
|
|
{
|
|
// FIXME: Postgres sometimes calls smgrcreate() to create
|
|
// FSM, and smgrnblocks() on it immediately afterwards,
|
|
// without extending it. Tolerate that by claiming that
|
|
// any non-existent FSM fork has size 0.
|
|
return Ok(0);
|
|
}
|
|
|
|
let key = rel_size_to_key(tag);
|
|
let mut buf = self.tline.get(key, lsn)?;
|
|
Ok(buf.get_u32_le())
|
|
}
|
|
|
|
/// Does relation exist?
|
|
pub fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result<bool> {
|
|
ensure!(tag.relnode != 0, "invalid relnode");
|
|
|
|
// fetch directory listing
|
|
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
|
let buf = self.tline.get(key, lsn)?;
|
|
let dir = RelDirectory::des(&buf)?;
|
|
|
|
let exists = dir.rels.get(&(tag.relnode, tag.forknum)).is_some();
|
|
|
|
Ok(exists)
|
|
}
|
|
|
|
/// Get a list of all existing relations in given tablespace and database.
|
|
pub fn list_rels(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<HashSet<RelTag>> {
|
|
// fetch directory listing
|
|
let key = rel_dir_to_key(spcnode, dbnode);
|
|
let buf = self.tline.get(key, lsn)?;
|
|
let dir = RelDirectory::des(&buf)?;
|
|
|
|
let rels: HashSet<RelTag> =
|
|
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
|
|
spcnode,
|
|
dbnode,
|
|
relnode: *relnode,
|
|
forknum: *forknum,
|
|
}));
|
|
|
|
Ok(rels)
|
|
}
|
|
|
|
/// Look up given SLRU page version.
|
|
pub fn get_slru_page_at_lsn(
|
|
&self,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
lsn: Lsn,
|
|
) -> Result<Bytes> {
|
|
let key = slru_block_to_key(kind, segno, blknum);
|
|
self.tline.get(key, lsn)
|
|
}
|
|
|
|
/// Get size of an SLRU segment
|
|
pub fn get_slru_segment_size(
|
|
&self,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
lsn: Lsn,
|
|
) -> Result<BlockNumber> {
|
|
let key = slru_segment_size_to_key(kind, segno);
|
|
let mut buf = self.tline.get(key, lsn)?;
|
|
Ok(buf.get_u32_le())
|
|
}
|
|
|
|
/// Get size of an SLRU segment
|
|
pub fn get_slru_segment_exists(&self, kind: SlruKind, segno: u32, lsn: Lsn) -> Result<bool> {
|
|
// fetch directory listing
|
|
let key = slru_dir_to_key(kind);
|
|
let buf = self.tline.get(key, lsn)?;
|
|
let dir = SlruSegmentDirectory::des(&buf)?;
|
|
|
|
let exists = dir.segments.get(&segno).is_some();
|
|
Ok(exists)
|
|
}
|
|
|
|
/// Get a list of SLRU segments
|
|
pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result<HashSet<u32>> {
|
|
// fetch directory entry
|
|
let key = slru_dir_to_key(kind);
|
|
|
|
let buf = self.tline.get(key, lsn)?;
|
|
let dir = SlruSegmentDirectory::des(&buf)?;
|
|
|
|
Ok(dir.segments)
|
|
}
|
|
|
|
pub fn get_relmap_file(&self, spcnode: Oid, dbnode: Oid, lsn: Lsn) -> Result<Bytes> {
|
|
let key = relmap_file_key(spcnode, dbnode);
|
|
|
|
let buf = self.tline.get(key, lsn)?;
|
|
Ok(buf)
|
|
}
|
|
|
|
pub fn list_dbdirs(&self, lsn: Lsn) -> Result<HashMap<(Oid, Oid), bool>> {
|
|
// fetch directory entry
|
|
let buf = self.tline.get(DBDIR_KEY, lsn)?;
|
|
let dir = DbDirectory::des(&buf)?;
|
|
|
|
Ok(dir.dbdirs)
|
|
}
|
|
|
|
pub fn get_twophase_file(&self, xid: TransactionId, lsn: Lsn) -> Result<Bytes> {
|
|
let key = twophase_file_key(xid);
|
|
let buf = self.tline.get(key, lsn)?;
|
|
Ok(buf)
|
|
}
|
|
|
|
pub fn list_twophase_files(&self, lsn: Lsn) -> Result<HashSet<TransactionId>> {
|
|
// fetch directory entry
|
|
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
|
|
let dir = TwoPhaseDirectory::des(&buf)?;
|
|
|
|
Ok(dir.xids)
|
|
}
|
|
|
|
pub fn get_control_file(&self, lsn: Lsn) -> Result<Bytes> {
|
|
self.tline.get(CONTROLFILE_KEY, lsn)
|
|
}
|
|
|
|
pub fn get_checkpoint(&self, lsn: Lsn) -> Result<Bytes> {
|
|
self.tline.get(CHECKPOINT_KEY, lsn)
|
|
}
|
|
|
|
/// Get the LSN of the last ingested WAL record.
|
|
///
|
|
/// This is just a convenience wrapper that calls through to the underlying
|
|
/// repository.
|
|
pub fn get_last_record_lsn(&self) -> Lsn {
|
|
self.tline.get_last_record_lsn()
|
|
}
|
|
|
|
/// Check that it is valid to request operations with that lsn.
|
|
///
|
|
/// This is just a convenience wrapper that calls through to the underlying
|
|
/// repository.
|
|
pub fn check_lsn_is_in_scope(
|
|
&self,
|
|
lsn: Lsn,
|
|
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
|
) -> Result<()> {
|
|
self.tline.check_lsn_is_in_scope(lsn, latest_gc_cutoff_lsn)
|
|
}
|
|
|
|
/// Retrieve current logical size of the timeline
|
|
///
|
|
/// NOTE: counted incrementally, includes ancestors,
|
|
pub fn get_current_logical_size(&self) -> usize {
|
|
let current_logical_size = self.current_logical_size.load(Ordering::Acquire);
|
|
match usize::try_from(current_logical_size) {
|
|
Ok(sz) => sz,
|
|
Err(_) => {
|
|
error!(
|
|
"current_logical_size is out of range: {}",
|
|
current_logical_size
|
|
);
|
|
0
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Does the same as get_current_logical_size but counted on demand.
|
|
/// Used to initialize the logical size tracking on startup.
|
|
///
|
|
/// Only relation blocks are counted currently. That excludes metadata,
|
|
/// SLRUs, twophase files etc.
|
|
pub fn get_current_logical_size_non_incremental(&self, lsn: Lsn) -> Result<usize> {
|
|
// Fetch list of database dirs and iterate them
|
|
let buf = self.tline.get(DBDIR_KEY, lsn)?;
|
|
let dbdir = DbDirectory::des(&buf)?;
|
|
|
|
let mut total_size: usize = 0;
|
|
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
|
for rel in self.list_rels(*spcnode, *dbnode, lsn)? {
|
|
let relsize_key = rel_size_to_key(rel);
|
|
let mut buf = self.tline.get(relsize_key, lsn)?;
|
|
let relsize = buf.get_u32_le();
|
|
|
|
total_size += relsize as usize;
|
|
}
|
|
}
|
|
Ok(total_size * pg_constants::BLCKSZ as usize)
|
|
}
|
|
|
|
///
|
|
/// Get a KeySpace that covers all the Keys that are in use at the given LSN.
|
|
/// Anything that's not listed maybe removed from the underlying storage (from
|
|
/// that LSN forwards).
|
|
fn collect_keyspace(&self, lsn: Lsn) -> Result<KeySpace> {
|
|
// Iterate through key ranges, greedily packing them into partitions
|
|
let mut result = KeySpaceAccum::new();
|
|
|
|
// The dbdir metadata always exists
|
|
result.add_key(DBDIR_KEY);
|
|
|
|
// Fetch list of database dirs and iterate them
|
|
let buf = self.tline.get(DBDIR_KEY, lsn)?;
|
|
let dbdir = DbDirectory::des(&buf)?;
|
|
|
|
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
|
|
dbs.sort_unstable();
|
|
for (spcnode, dbnode) in dbs {
|
|
result.add_key(relmap_file_key(spcnode, dbnode));
|
|
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
|
|
|
let mut rels: Vec<RelTag> = self
|
|
.list_rels(spcnode, dbnode, lsn)?
|
|
.iter()
|
|
.cloned()
|
|
.collect();
|
|
rels.sort_unstable();
|
|
for rel in rels {
|
|
let relsize_key = rel_size_to_key(rel);
|
|
let mut buf = self.tline.get(relsize_key, lsn)?;
|
|
let relsize = buf.get_u32_le();
|
|
|
|
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
|
|
result.add_key(relsize_key);
|
|
}
|
|
}
|
|
|
|
// Iterate SLRUs next
|
|
for kind in [
|
|
SlruKind::Clog,
|
|
SlruKind::MultiXactMembers,
|
|
SlruKind::MultiXactOffsets,
|
|
] {
|
|
let slrudir_key = slru_dir_to_key(kind);
|
|
result.add_key(slrudir_key);
|
|
let buf = self.tline.get(slrudir_key, lsn)?;
|
|
let dir = SlruSegmentDirectory::des(&buf)?;
|
|
let mut segments: Vec<u32> = dir.segments.iter().cloned().collect();
|
|
segments.sort_unstable();
|
|
for segno in segments {
|
|
let segsize_key = slru_segment_size_to_key(kind, segno);
|
|
let mut buf = self.tline.get(segsize_key, lsn)?;
|
|
let segsize = buf.get_u32_le();
|
|
|
|
result.add_range(
|
|
slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize),
|
|
);
|
|
result.add_key(segsize_key);
|
|
}
|
|
}
|
|
|
|
// Then pg_twophase
|
|
result.add_key(TWOPHASEDIR_KEY);
|
|
let buf = self.tline.get(TWOPHASEDIR_KEY, lsn)?;
|
|
let twophase_dir = TwoPhaseDirectory::des(&buf)?;
|
|
let mut xids: Vec<TransactionId> = twophase_dir.xids.iter().cloned().collect();
|
|
xids.sort_unstable();
|
|
for xid in xids {
|
|
result.add_key(twophase_file_key(xid));
|
|
}
|
|
|
|
result.add_key(CONTROLFILE_KEY);
|
|
result.add_key(CHECKPOINT_KEY);
|
|
|
|
Ok(result.to_keyspace())
|
|
}
|
|
}
|
|
|
|
/// DatadirModification represents an operation to ingest an atomic set of
|
|
/// updates to the repository. It is created by the 'begin_record'
|
|
/// function. It is called for each WAL record, so that all the modifications
|
|
/// by a one WAL record appear atomic.
|
|
pub struct DatadirModification<'a, R: Repository> {
|
|
/// The timeline this modification applies to. You can access this to
|
|
/// read the state, but note that any pending updates are *not* reflected
|
|
/// in the state in 'tline' yet.
|
|
pub tline: &'a DatadirTimeline<R>,
|
|
|
|
lsn: Lsn,
|
|
|
|
// The modifications are not applied directly to the underyling key-value store.
|
|
// The put-functions add the modifications here, and they are flushed to the
|
|
// underlying key-value store by the 'finish' function.
|
|
pending_updates: HashMap<Key, Value>,
|
|
pending_deletions: Vec<Range<Key>>,
|
|
pending_nblocks: isize,
|
|
}
|
|
|
|
impl<'a, R: Repository> DatadirModification<'a, R> {
|
|
/// Initialize a completely new repository.
|
|
///
|
|
/// This inserts the directory metadata entries that are assumed to
|
|
/// always exist.
|
|
pub fn init_empty(&mut self) -> Result<()> {
|
|
let buf = DbDirectory::ser(&DbDirectory {
|
|
dbdirs: HashMap::new(),
|
|
})?;
|
|
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
|
|
|
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
|
|
xids: HashSet::new(),
|
|
})?;
|
|
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
|
|
|
|
let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
|
|
let empty_dir = Value::Image(buf);
|
|
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
|
|
self.put(
|
|
slru_dir_to_key(SlruKind::MultiXactMembers),
|
|
empty_dir.clone(),
|
|
);
|
|
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Put a new page version that can be constructed from a WAL record
|
|
///
|
|
/// NOTE: this will *not* implicitly extend the relation, if the page is beyond the
|
|
/// current end-of-file. It's up to the caller to check that the relation size
|
|
/// matches the blocks inserted!
|
|
pub fn put_rel_wal_record(
|
|
&mut self,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
rec: ZenithWalRecord,
|
|
) -> Result<()> {
|
|
ensure!(rel.relnode != 0, "invalid relnode");
|
|
self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec));
|
|
Ok(())
|
|
}
|
|
|
|
// Same, but for an SLRU.
|
|
pub fn put_slru_wal_record(
|
|
&mut self,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
rec: ZenithWalRecord,
|
|
) -> Result<()> {
|
|
self.put(
|
|
slru_block_to_key(kind, segno, blknum),
|
|
Value::WalRecord(rec),
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Like put_wal_record, but with ready-made image of the page.
|
|
pub fn put_rel_page_image(
|
|
&mut self,
|
|
rel: RelTag,
|
|
blknum: BlockNumber,
|
|
img: Bytes,
|
|
) -> Result<()> {
|
|
ensure!(rel.relnode != 0, "invalid relnode");
|
|
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
|
|
Ok(())
|
|
}
|
|
|
|
pub fn put_slru_page_image(
|
|
&mut self,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
blknum: BlockNumber,
|
|
img: Bytes,
|
|
) -> Result<()> {
|
|
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
|
|
Ok(())
|
|
}
|
|
|
|
/// Store a relmapper file (pg_filenode.map) in the repository
|
|
pub fn put_relmap_file(&mut self, spcnode: Oid, dbnode: Oid, img: Bytes) -> Result<()> {
|
|
// Add it to the directory (if it doesn't exist already)
|
|
let buf = self.get(DBDIR_KEY)?;
|
|
let mut dbdir = DbDirectory::des(&buf)?;
|
|
|
|
let r = dbdir.dbdirs.insert((spcnode, dbnode), true);
|
|
if r == None || r == Some(false) {
|
|
// The dbdir entry didn't exist, or it contained a
|
|
// 'false'. The 'insert' call already updated it with
|
|
// 'true', now write the updated 'dbdirs' map back.
|
|
let buf = DbDirectory::ser(&dbdir)?;
|
|
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
|
}
|
|
if r == None {
|
|
// Create RelDirectory
|
|
let buf = RelDirectory::ser(&RelDirectory {
|
|
rels: HashSet::new(),
|
|
})?;
|
|
self.put(
|
|
rel_dir_to_key(spcnode, dbnode),
|
|
Value::Image(Bytes::from(buf)),
|
|
);
|
|
}
|
|
|
|
self.put(relmap_file_key(spcnode, dbnode), Value::Image(img));
|
|
Ok(())
|
|
}
|
|
|
|
pub fn put_twophase_file(&mut self, xid: TransactionId, img: Bytes) -> Result<()> {
|
|
// Add it to the directory entry
|
|
let buf = self.get(TWOPHASEDIR_KEY)?;
|
|
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
|
if !dir.xids.insert(xid) {
|
|
bail!("twophase file for xid {} already exists", xid);
|
|
}
|
|
self.put(
|
|
TWOPHASEDIR_KEY,
|
|
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
|
|
);
|
|
|
|
self.put(twophase_file_key(xid), Value::Image(img));
|
|
Ok(())
|
|
}
|
|
|
|
pub fn put_control_file(&mut self, img: Bytes) -> Result<()> {
|
|
self.put(CONTROLFILE_KEY, Value::Image(img));
|
|
Ok(())
|
|
}
|
|
|
|
pub fn put_checkpoint(&mut self, img: Bytes) -> Result<()> {
|
|
self.put(CHECKPOINT_KEY, Value::Image(img));
|
|
Ok(())
|
|
}
|
|
|
|
pub fn drop_dbdir(&mut self, spcnode: Oid, dbnode: Oid) -> Result<()> {
|
|
// Remove entry from dbdir
|
|
let buf = self.get(DBDIR_KEY)?;
|
|
let mut dir = DbDirectory::des(&buf)?;
|
|
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
|
|
let buf = DbDirectory::ser(&dir)?;
|
|
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
|
} else {
|
|
warn!(
|
|
"dropped dbdir for spcnode {} dbnode {} did not exist in db directory",
|
|
spcnode, dbnode
|
|
);
|
|
}
|
|
|
|
// FIXME: update pending_nblocks
|
|
|
|
// Delete all relations and metadata files for the spcnode/dnode
|
|
self.delete(dbdir_key_range(spcnode, dbnode));
|
|
Ok(())
|
|
}
|
|
|
|
/// Create a relation fork.
|
|
///
|
|
/// 'nblocks' is the initial size.
|
|
pub fn put_rel_creation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
|
ensure!(rel.relnode != 0, "invalid relnode");
|
|
// It's possible that this is the first rel for this db in this
|
|
// tablespace. Create the reldir entry for it if so.
|
|
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY)?)?;
|
|
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
|
let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() {
|
|
// Didn't exist. Update dbdir
|
|
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
|
|
let buf = DbDirectory::ser(&dbdir)?;
|
|
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
|
|
|
// and create the RelDirectory
|
|
RelDirectory::default()
|
|
} else {
|
|
// reldir already exists, fetch it
|
|
RelDirectory::des(&self.get(rel_dir_key)?)?
|
|
};
|
|
|
|
// Add the new relation to the rel directory entry, and write it back
|
|
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
|
bail!("rel {} already exists", rel);
|
|
}
|
|
self.put(
|
|
rel_dir_key,
|
|
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
|
|
);
|
|
|
|
// Put size
|
|
let size_key = rel_size_to_key(rel);
|
|
let buf = nblocks.to_le_bytes();
|
|
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
|
|
|
self.pending_nblocks += nblocks as isize;
|
|
|
|
// Even if nblocks > 0, we don't insert any actual blocks here. That's up to the
|
|
// caller.
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Truncate relation
|
|
pub fn put_rel_truncation(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
|
ensure!(rel.relnode != 0, "invalid relnode");
|
|
let size_key = rel_size_to_key(rel);
|
|
|
|
// Fetch the old size first
|
|
let old_size = self.get(size_key)?.get_u32_le();
|
|
|
|
// Update the entry with the new size.
|
|
let buf = nblocks.to_le_bytes();
|
|
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
|
|
|
// Update logical database size.
|
|
self.pending_nblocks -= old_size as isize - nblocks as isize;
|
|
Ok(())
|
|
}
|
|
|
|
/// Extend relation
|
|
pub fn put_rel_extend(&mut self, rel: RelTag, nblocks: BlockNumber) -> Result<()> {
|
|
ensure!(rel.relnode != 0, "invalid relnode");
|
|
|
|
// Put size
|
|
let size_key = rel_size_to_key(rel);
|
|
let old_size = self.get(size_key)?.get_u32_le();
|
|
|
|
let buf = nblocks.to_le_bytes();
|
|
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
|
|
|
self.pending_nblocks += nblocks as isize - old_size as isize;
|
|
Ok(())
|
|
}
|
|
|
|
/// Drop a relation.
|
|
pub fn put_rel_drop(&mut self, rel: RelTag) -> Result<()> {
|
|
ensure!(rel.relnode != 0, "invalid relnode");
|
|
|
|
// Remove it from the directory entry
|
|
let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
|
let buf = self.get(dir_key)?;
|
|
let mut dir = RelDirectory::des(&buf)?;
|
|
|
|
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
|
|
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
|
} else {
|
|
warn!("dropped rel {} did not exist in rel directory", rel);
|
|
}
|
|
|
|
// update logical size
|
|
let size_key = rel_size_to_key(rel);
|
|
let old_size = self.get(size_key)?.get_u32_le();
|
|
self.pending_nblocks -= old_size as isize;
|
|
|
|
// Delete size entry, as well as all blocks
|
|
self.delete(rel_key_range(rel));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn put_slru_segment_creation(
|
|
&mut self,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
nblocks: BlockNumber,
|
|
) -> Result<()> {
|
|
// Add it to the directory entry
|
|
let dir_key = slru_dir_to_key(kind);
|
|
let buf = self.get(dir_key)?;
|
|
let mut dir = SlruSegmentDirectory::des(&buf)?;
|
|
|
|
if !dir.segments.insert(segno) {
|
|
bail!("slru segment {:?}/{} already exists", kind, segno);
|
|
}
|
|
self.put(
|
|
dir_key,
|
|
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
|
|
);
|
|
|
|
// Put size
|
|
let size_key = slru_segment_size_to_key(kind, segno);
|
|
let buf = nblocks.to_le_bytes();
|
|
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
|
|
|
// even if nblocks > 0, we don't insert any actual blocks here
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Extend SLRU segment
|
|
pub fn put_slru_extend(
|
|
&mut self,
|
|
kind: SlruKind,
|
|
segno: u32,
|
|
nblocks: BlockNumber,
|
|
) -> Result<()> {
|
|
// Put size
|
|
let size_key = slru_segment_size_to_key(kind, segno);
|
|
let buf = nblocks.to_le_bytes();
|
|
self.put(size_key, Value::Image(Bytes::from(buf.to_vec())));
|
|
Ok(())
|
|
}
|
|
|
|
/// This method is used for marking truncated SLRU files
|
|
pub fn drop_slru_segment(&mut self, kind: SlruKind, segno: u32) -> Result<()> {
|
|
// Remove it from the directory entry
|
|
let dir_key = slru_dir_to_key(kind);
|
|
let buf = self.get(dir_key)?;
|
|
let mut dir = SlruSegmentDirectory::des(&buf)?;
|
|
|
|
if !dir.segments.remove(&segno) {
|
|
warn!("slru segment {:?}/{} does not exist", kind, segno);
|
|
}
|
|
self.put(
|
|
dir_key,
|
|
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
|
|
);
|
|
|
|
// Delete size entry, as well as all blocks
|
|
self.delete(slru_segment_key_range(kind, segno));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Drop a relmapper file (pg_filenode.map)
|
|
pub fn drop_relmap_file(&mut self, _spcnode: Oid, _dbnode: Oid) -> Result<()> {
|
|
// TODO
|
|
Ok(())
|
|
}
|
|
|
|
/// This method is used for marking truncated SLRU files
|
|
pub fn drop_twophase_file(&mut self, xid: TransactionId) -> Result<()> {
|
|
// Remove it from the directory entry
|
|
let buf = self.get(TWOPHASEDIR_KEY)?;
|
|
let mut dir = TwoPhaseDirectory::des(&buf)?;
|
|
|
|
if !dir.xids.remove(&xid) {
|
|
warn!("twophase file for xid {} does not exist", xid);
|
|
}
|
|
self.put(
|
|
TWOPHASEDIR_KEY,
|
|
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
|
|
);
|
|
|
|
// Delete it
|
|
self.delete(twophase_key_range(xid));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
///
|
|
/// Finish this atomic update, writing all the updated keys to the
|
|
/// underlying timeline.
|
|
///
|
|
pub fn commit(self) -> Result<()> {
|
|
let writer = self.tline.tline.writer();
|
|
|
|
let last_partitioning = self.tline.last_partitioning.load();
|
|
let pending_nblocks = self.pending_nblocks;
|
|
|
|
for (key, value) in self.pending_updates {
|
|
writer.put(key, self.lsn, value)?;
|
|
}
|
|
for key_range in self.pending_deletions {
|
|
writer.delete(key_range.clone(), self.lsn)?;
|
|
}
|
|
|
|
writer.finish_write(self.lsn);
|
|
|
|
if last_partitioning == Lsn(0)
|
|
|| self.lsn.0 - last_partitioning.0 > self.tline.repartition_threshold
|
|
{
|
|
let keyspace = self.tline.collect_keyspace(self.lsn)?;
|
|
let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES);
|
|
self.tline.tline.hint_partitioning(partitioning, self.lsn)?;
|
|
self.tline.last_partitioning.store(self.lsn);
|
|
}
|
|
|
|
if pending_nblocks != 0 {
|
|
self.tline.current_logical_size.fetch_add(
|
|
pending_nblocks * pg_constants::BLCKSZ as isize,
|
|
Ordering::SeqCst,
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Internal helper functions to batch the modifications
|
|
|
|
fn get(&self, key: Key) -> Result<Bytes> {
|
|
// Have we already updated the same key? Read the pending updated
|
|
// version in that case.
|
|
//
|
|
// Note: we don't check pending_deletions. It is an error to request a
|
|
// value that has been removed, deletion only avoids leaking storage.
|
|
if let Some(value) = self.pending_updates.get(&key) {
|
|
if let Value::Image(img) = value {
|
|
Ok(img.clone())
|
|
} else {
|
|
// Currently, we never need to read back a WAL record that we
|
|
// inserted in the same "transaction". All the metadata updates
|
|
// work directly with Images, and we never need to read actual
|
|
// data pages. We could handle this if we had to, by calling
|
|
// the walredo manager, but let's keep it simple for now.
|
|
bail!("unexpected pending WAL record");
|
|
}
|
|
} else {
|
|
let last_lsn = self.tline.get_last_record_lsn();
|
|
self.tline.tline.get(key, last_lsn)
|
|
}
|
|
}
|
|
|
|
fn put(&mut self, key: Key, val: Value) {
|
|
self.pending_updates.insert(key, val);
|
|
}
|
|
|
|
fn delete(&mut self, key_range: Range<Key>) {
|
|
trace!("DELETE {}-{}", key_range.start, key_range.end);
|
|
self.pending_deletions.push(key_range);
|
|
}
|
|
}
|
|
|
|
//--- Metadata structs stored in key-value pairs in the repository.
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
struct DbDirectory {
|
|
// (spcnode, dbnode) -> (do relmapper and PG_VERSION files exist)
|
|
dbdirs: HashMap<(Oid, Oid), bool>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
struct TwoPhaseDirectory {
|
|
xids: HashSet<TransactionId>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize, Default)]
|
|
struct RelDirectory {
|
|
// Set of relations that exist. (relfilenode, forknum)
|
|
//
|
|
// TODO: Store it as a btree or radix tree or something else that spans multiple
|
|
// key-value pairs, if you have a lot of relations
|
|
rels: HashSet<(Oid, u8)>,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize)]
|
|
struct RelSizeEntry {
|
|
nblocks: u32,
|
|
}
|
|
|
|
#[derive(Debug, Serialize, Deserialize, Default)]
|
|
struct SlruSegmentDirectory {
|
|
// Set of SLRU segments that exist.
|
|
segments: HashSet<u32>,
|
|
}
|
|
|
|
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; pg_constants::BLCKSZ as usize]);
|
|
|
|
// Layout of the Key address space
|
|
//
|
|
// The Key struct, used to address the underlying key-value store, consists of
|
|
// 18 bytes, split into six fields. See 'Key' in repository.rs. We need to map
|
|
// all the data and metadata keys into those 18 bytes.
|
|
//
|
|
// Principles for the mapping:
|
|
//
|
|
// - Things that are often accessed or modified together, should be close to
|
|
// each other in the key space. For example, if a relation is extended by one
|
|
// block, we create a new key-value pair for the block data, and update the
|
|
// relation size entry. Because of that, the RelSize key comes after all the
|
|
// RelBlocks of a relation: the RelSize and the last RelBlock are always next
|
|
// to each other.
|
|
//
|
|
// The key space is divided into four major sections, identified by the first
|
|
// byte, and the form a hierarchy:
|
|
//
|
|
// 00 Relation data and metadata
|
|
//
|
|
// DbDir () -> (dbnode, spcnode)
|
|
// Filenodemap
|
|
// RelDir -> relnode forknum
|
|
// RelBlocks
|
|
// RelSize
|
|
//
|
|
// 01 SLRUs
|
|
//
|
|
// SlruDir kind
|
|
// SlruSegBlocks segno
|
|
// SlruSegSize
|
|
//
|
|
// 02 pg_twophase
|
|
//
|
|
// 03 misc
|
|
// controlfile
|
|
// checkpoint
|
|
//
|
|
// Below is a full list of the keyspace allocation:
|
|
//
|
|
// DbDir:
|
|
// 00 00000000 00000000 00000000 00 00000000
|
|
//
|
|
// Filenodemap:
|
|
// 00 SPCNODE DBNODE 00000000 00 00000000
|
|
//
|
|
// RelDir:
|
|
// 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
|
|
//
|
|
// RelBlock:
|
|
// 00 SPCNODE DBNODE RELNODE FORK BLKNUM
|
|
//
|
|
// RelSize:
|
|
// 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
|
|
//
|
|
// SlruDir:
|
|
// 01 kind 00000000 00000000 00 00000000
|
|
//
|
|
// SlruSegBlock:
|
|
// 01 kind 00000001 SEGNO 00 BLKNUM
|
|
//
|
|
// SlruSegSize:
|
|
// 01 kind 00000001 SEGNO 00 FFFFFFFF
|
|
//
|
|
// TwoPhaseDir:
|
|
// 02 00000000 00000000 00000000 00 00000000
|
|
//
|
|
// TwoPhaseFile:
|
|
// 02 00000000 00000000 00000000 00 XID
|
|
//
|
|
// ControlFile:
|
|
// 03 00000000 00000000 00000000 00 00000000
|
|
//
|
|
// Checkpoint:
|
|
// 03 00000000 00000000 00000000 00 00000001
|
|
|
|
//-- Section 01: relation data and metadata
|
|
|
|
const DBDIR_KEY: Key = Key {
|
|
field1: 0x00,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
};
|
|
|
|
fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
|
|
Key {
|
|
field1: 0x00,
|
|
field2: spcnode,
|
|
field3: dbnode,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
}..Key {
|
|
field1: 0x00,
|
|
field2: spcnode,
|
|
field3: dbnode,
|
|
field4: 0xffffffff,
|
|
field5: 0xff,
|
|
field6: 0xffffffff,
|
|
}
|
|
}
|
|
|
|
fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
|
|
Key {
|
|
field1: 0x00,
|
|
field2: spcnode,
|
|
field3: dbnode,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
}
|
|
}
|
|
|
|
fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
|
Key {
|
|
field1: 0x00,
|
|
field2: spcnode,
|
|
field3: dbnode,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 1,
|
|
}
|
|
}
|
|
|
|
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
|
Key {
|
|
field1: 0x00,
|
|
field2: rel.spcnode,
|
|
field3: rel.dbnode,
|
|
field4: rel.relnode,
|
|
field5: rel.forknum,
|
|
field6: blknum,
|
|
}
|
|
}
|
|
|
|
fn rel_size_to_key(rel: RelTag) -> Key {
|
|
Key {
|
|
field1: 0x00,
|
|
field2: rel.spcnode,
|
|
field3: rel.dbnode,
|
|
field4: rel.relnode,
|
|
field5: rel.forknum,
|
|
field6: 0xffffffff,
|
|
}
|
|
}
|
|
|
|
fn rel_key_range(rel: RelTag) -> Range<Key> {
|
|
Key {
|
|
field1: 0x00,
|
|
field2: rel.spcnode,
|
|
field3: rel.dbnode,
|
|
field4: rel.relnode,
|
|
field5: rel.forknum,
|
|
field6: 0,
|
|
}..Key {
|
|
field1: 0x00,
|
|
field2: rel.spcnode,
|
|
field3: rel.dbnode,
|
|
field4: rel.relnode,
|
|
field5: rel.forknum + 1,
|
|
field6: 0,
|
|
}
|
|
}
|
|
|
|
//-- Section 02: SLRUs
|
|
|
|
fn slru_dir_to_key(kind: SlruKind) -> Key {
|
|
Key {
|
|
field1: 0x01,
|
|
field2: match kind {
|
|
SlruKind::Clog => 0x00,
|
|
SlruKind::MultiXactMembers => 0x01,
|
|
SlruKind::MultiXactOffsets => 0x02,
|
|
},
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
}
|
|
}
|
|
|
|
fn slru_block_to_key(kind: SlruKind, segno: u32, blknum: BlockNumber) -> Key {
|
|
Key {
|
|
field1: 0x01,
|
|
field2: match kind {
|
|
SlruKind::Clog => 0x00,
|
|
SlruKind::MultiXactMembers => 0x01,
|
|
SlruKind::MultiXactOffsets => 0x02,
|
|
},
|
|
field3: 1,
|
|
field4: segno,
|
|
field5: 0,
|
|
field6: blknum,
|
|
}
|
|
}
|
|
|
|
fn slru_segment_size_to_key(kind: SlruKind, segno: u32) -> Key {
|
|
Key {
|
|
field1: 0x01,
|
|
field2: match kind {
|
|
SlruKind::Clog => 0x00,
|
|
SlruKind::MultiXactMembers => 0x01,
|
|
SlruKind::MultiXactOffsets => 0x02,
|
|
},
|
|
field3: 1,
|
|
field4: segno,
|
|
field5: 0,
|
|
field6: 0xffffffff,
|
|
}
|
|
}
|
|
|
|
fn slru_segment_key_range(kind: SlruKind, segno: u32) -> Range<Key> {
|
|
let field2 = match kind {
|
|
SlruKind::Clog => 0x00,
|
|
SlruKind::MultiXactMembers => 0x01,
|
|
SlruKind::MultiXactOffsets => 0x02,
|
|
};
|
|
|
|
Key {
|
|
field1: 0x01,
|
|
field2,
|
|
field3: segno,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
}..Key {
|
|
field1: 0x01,
|
|
field2,
|
|
field3: segno,
|
|
field4: 0,
|
|
field5: 1,
|
|
field6: 0,
|
|
}
|
|
}
|
|
|
|
//-- Section 03: pg_twophase
|
|
|
|
const TWOPHASEDIR_KEY: Key = Key {
|
|
field1: 0x02,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
};
|
|
|
|
fn twophase_file_key(xid: TransactionId) -> Key {
|
|
Key {
|
|
field1: 0x02,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: xid,
|
|
}
|
|
}
|
|
|
|
fn twophase_key_range(xid: TransactionId) -> Range<Key> {
|
|
let (next_xid, overflowed) = xid.overflowing_add(1);
|
|
|
|
Key {
|
|
field1: 0x02,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: xid,
|
|
}..Key {
|
|
field1: 0x02,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: if overflowed { 1 } else { 0 },
|
|
field6: next_xid,
|
|
}
|
|
}
|
|
|
|
//-- Section 03: Control file
|
|
const CONTROLFILE_KEY: Key = Key {
|
|
field1: 0x03,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 0,
|
|
};
|
|
|
|
const CHECKPOINT_KEY: Key = Key {
|
|
field1: 0x03,
|
|
field2: 0,
|
|
field3: 0,
|
|
field4: 0,
|
|
field5: 0,
|
|
field6: 1,
|
|
};
|
|
|
|
// Reverse mappings for a few Keys.
|
|
// These are needed by WAL redo manager.
|
|
|
|
pub fn key_to_rel_block(key: Key) -> Result<(RelTag, BlockNumber)> {
|
|
Ok(match key.field1 {
|
|
0x00 => (
|
|
RelTag {
|
|
spcnode: key.field2,
|
|
dbnode: key.field3,
|
|
relnode: key.field4,
|
|
forknum: key.field5,
|
|
},
|
|
key.field6,
|
|
),
|
|
_ => bail!("unexpected value kind 0x{:02x}", key.field1),
|
|
})
|
|
}
|
|
|
|
pub fn key_to_slru_block(key: Key) -> Result<(SlruKind, u32, BlockNumber)> {
|
|
Ok(match key.field1 {
|
|
0x01 => {
|
|
let kind = match key.field2 {
|
|
0x00 => SlruKind::Clog,
|
|
0x01 => SlruKind::MultiXactMembers,
|
|
0x02 => SlruKind::MultiXactOffsets,
|
|
_ => bail!("unrecognized slru kind 0x{:02x}", key.field2),
|
|
};
|
|
let segno = key.field4;
|
|
let blknum = key.field6;
|
|
|
|
(kind, segno, blknum)
|
|
}
|
|
_ => bail!("unexpected value kind 0x{:02x}", key.field1),
|
|
})
|
|
}
|
|
|
|
//
|
|
//-- Tests that should work the same with any Repository/Timeline implementation.
|
|
//
|
|
|
|
#[cfg(test)]
|
|
pub fn create_test_timeline<R: Repository>(
|
|
repo: R,
|
|
timeline_id: zenith_utils::zid::ZTimelineId,
|
|
) -> Result<Arc<crate::DatadirTimeline<R>>> {
|
|
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
|
|
let tline = DatadirTimeline::new(tline, crate::layered_repository::tests::TEST_FILE_SIZE / 10);
|
|
let mut m = tline.begin_modification(Lsn(8));
|
|
m.init_empty()?;
|
|
m.commit()?;
|
|
Ok(Arc::new(tline))
|
|
}
|
|
|
|
#[allow(clippy::bool_assert_comparison)]
|
|
#[cfg(test)]
|
|
mod tests {
|
|
//use super::repo_harness::*;
|
|
//use super::*;
|
|
|
|
/*
|
|
fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
|
|
let incremental = timeline.get_current_logical_size();
|
|
let non_incremental = timeline
|
|
.get_current_logical_size_non_incremental(lsn)
|
|
.unwrap();
|
|
assert_eq!(incremental, non_incremental);
|
|
}
|
|
*/
|
|
|
|
/*
|
|
///
|
|
/// Test list_rels() function, with branches and dropped relations
|
|
///
|
|
#[test]
|
|
fn test_list_rels_drop() -> Result<()> {
|
|
let repo = RepoHarness::create("test_list_rels_drop")?.load();
|
|
let tline = create_empty_timeline(repo, TIMELINE_ID)?;
|
|
const TESTDB: u32 = 111;
|
|
|
|
// Import initial dummy checkpoint record, otherwise the get_timeline() call
|
|
// after branching fails below
|
|
let mut writer = tline.begin_record(Lsn(0x10));
|
|
writer.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
|
writer.finish()?;
|
|
|
|
// Create a relation on the timeline
|
|
let mut writer = tline.begin_record(Lsn(0x20));
|
|
writer.put_rel_page_image(TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
|
|
writer.finish()?;
|
|
|
|
let writer = tline.begin_record(Lsn(0x00));
|
|
writer.finish()?;
|
|
|
|
// Check that list_rels() lists it after LSN 2, but no before it
|
|
assert!(!tline.list_rels(0, TESTDB, Lsn(0x10))?.contains(&TESTREL_A));
|
|
assert!(tline.list_rels(0, TESTDB, Lsn(0x20))?.contains(&TESTREL_A));
|
|
assert!(tline.list_rels(0, TESTDB, Lsn(0x30))?.contains(&TESTREL_A));
|
|
|
|
// Create a branch, check that the relation is visible there
|
|
repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Lsn(0x30))?;
|
|
let newtline = match repo.get_timeline(NEW_TIMELINE_ID)?.local_timeline() {
|
|
Some(timeline) => timeline,
|
|
None => panic!("Should have a local timeline"),
|
|
};
|
|
let newtline = DatadirTimelineImpl::new(newtline);
|
|
assert!(newtline
|
|
.list_rels(0, TESTDB, Lsn(0x30))?
|
|
.contains(&TESTREL_A));
|
|
|
|
// Drop it on the branch
|
|
let mut new_writer = newtline.begin_record(Lsn(0x40));
|
|
new_writer.drop_relation(TESTREL_A)?;
|
|
new_writer.finish()?;
|
|
|
|
// Check that it's no longer listed on the branch after the point where it was dropped
|
|
assert!(newtline
|
|
.list_rels(0, TESTDB, Lsn(0x30))?
|
|
.contains(&TESTREL_A));
|
|
assert!(!newtline
|
|
.list_rels(0, TESTDB, Lsn(0x40))?
|
|
.contains(&TESTREL_A));
|
|
|
|
// Run checkpoint and garbage collection and check that it's still not visible
|
|
newtline.tline.checkpoint(CheckpointConfig::Forced)?;
|
|
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
|
|
|
|
assert!(!newtline
|
|
.list_rels(0, TESTDB, Lsn(0x40))?
|
|
.contains(&TESTREL_A));
|
|
|
|
Ok(())
|
|
}
|
|
*/
|
|
|
|
/*
|
|
#[test]
|
|
fn test_read_beyond_eof() -> Result<()> {
|
|
let repo = RepoHarness::create("test_read_beyond_eof")?.load();
|
|
let tline = create_test_timeline(repo, TIMELINE_ID)?;
|
|
|
|
make_some_layers(&tline, Lsn(0x20))?;
|
|
let mut writer = tline.begin_record(Lsn(0x60));
|
|
walingest.put_rel_page_image(
|
|
&mut writer,
|
|
TESTREL_A,
|
|
0,
|
|
TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x60))),
|
|
)?;
|
|
writer.finish()?;
|
|
|
|
// Test read before rel creation. Should error out.
|
|
assert!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x10)).is_err());
|
|
|
|
// Read block beyond end of relation at different points in time.
|
|
// These reads should fall into different delta, image, and in-memory layers.
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x20))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x25))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x30))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x35))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x40))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x45))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x50))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x55))?, ZERO_PAGE);
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_A, 1, Lsn(0x60))?, ZERO_PAGE);
|
|
|
|
// Test on an in-memory layer with no preceding layer
|
|
let mut writer = tline.begin_record(Lsn(0x70));
|
|
walingest.put_rel_page_image(
|
|
&mut writer,
|
|
TESTREL_B,
|
|
0,
|
|
TEST_IMG(&format!("foo blk 0 at {}", Lsn(0x70))),
|
|
)?;
|
|
writer.finish()?;
|
|
|
|
assert_eq!(tline.get_rel_page_at_lsn(TESTREL_B, 1, Lsn(0x70))?, ZERO_PAGE);
|
|
|
|
Ok(())
|
|
}
|
|
*/
|
|
}
|