From feb925c546d3cfce071de4173f68dd9bba1cb0a4 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Fri, 21 May 2021 17:36:36 +0300 Subject: [PATCH] Refector repository API by introducing abstact iterator and removing specialized methods on top level --- pageserver/src/repository.rs | 202 ++++++++++++++++- pageserver/src/repository/rocksdb.rs | 318 +++++++-------------------- 2 files changed, 280 insertions(+), 240 deletions(-) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index d9e8d2b393..d02b33ed47 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -9,6 +9,8 @@ use postgres_ffi::relfile_utils::forknumber_to_name; use std::fmt; use std::sync::Arc; use zenith_utils::lsn::Lsn; +use log::*; +use postgres_ffi::nonrelfile_utils::transaction_id_get_status; /// /// A repository corresponds to one .zenith directory. One repository holds multiple @@ -69,6 +71,12 @@ pub trait Timeline { /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; + /// Put raw data + fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()>; + + /// Get repository iterator + fn iterator(&self) -> Box; + /// Create a new database from a template database /// /// In PostgreSQL, CREATE DATABASE works by scanning the data directory and @@ -81,7 +89,49 @@ pub trait Timeline { tablespace_id: Oid, src_db_id: Oid, src_tablespace_id: Oid, - ) -> Result<()>; + ) -> Result<()> { + let mut n = 0; + for forknum in &[ + pg_constants::MAIN_FORKNUM, + pg_constants::FSM_FORKNUM, + pg_constants::VISIBILITYMAP_FORKNUM, + pg_constants::INIT_FORKNUM, + pg_constants::PG_FILENODEMAP_FORKNUM, + ] { + let key = RepositoryKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: *forknum, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut iter = self.iterator(); + iter.first(&key); + while iter.valid() { + let mut key = iter.key(); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; + } + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + + self.put_raw_data(key, iter.value())?; + n += 1; + iter.next(); + } + } + info!( + "Create database {}/{}, copy {} entries", + tablespace_id, db_id, n + ); + Ok(()) + } /// /// Helper function to parse a WAL record and call the above functions for all the @@ -167,15 +217,159 @@ pub trait Timeline { fn advance_last_record_lsn(&self, lsn: Lsn); fn get_last_record_lsn(&self) -> Lsn; + // + // Wait until WAL has been received up to the given LSN. + // + fn wait_lsn(&self, lsn: Lsn) -> Result; + /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations /// but can be also applied to normal relations. - fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>; + fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> { + let _lsn = self.wait_lsn(lsn)?; + let mut key = RepositoryKey { + // minimal key to start with + tag: BufferTag { rel, blknum: 0 }, + lsn: Lsn(0), + }; + let mut iter = self.iterator(); + iter.first(&key); + if iter.valid() { + let thiskey = iter.key(); + let tag = thiskey.tag; + if tag.rel == rel { + // still trversing this relation + let first_blknum = tag.blknum; + key.tag.blknum = u32::MAX; // maximal key + iter.last(&key); // locate last entry + if iter.valid() { + let thiskey = iter.key(); + let last_blknum = thiskey.tag.blknum; + return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive + } + } + } + Ok((0, 0)) // empty range + } /// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used) - fn get_databases(&self, lsn: Lsn) -> Result>; + fn get_databases(&self, lsn: Lsn) -> Result> { + let key = RepositoryKey { + // minimal key + tag: BufferTag { + rel: RelTag { + forknum: pg_constants::PG_FILENODEMAP_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut dbs = Vec::new(); + + let mut iter = self.iterator(); + iter.first(&key); + let mut prev_tag = key.tag.rel; + while iter.valid() { + let key = iter.key(); + if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { + break; // we are done with this fork + } + if key.tag.rel != prev_tag && key.lsn <= lsn { + prev_tag = key.tag.rel; + dbs.push(prev_tag); // collect unique tags + } + iter.next(); + } + return Ok(dbs); + } /// Get vector of prepared twophase transactions - fn get_twophase(&self, lsn: Lsn) -> Result>; + fn get_twophase(&self, lsn: Lsn) -> Result> { + let key = RepositoryKey { + // minimal key + tag: BufferTag { + rel: RelTag { + forknum: pg_constants::PG_TWOPHASE_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut gxacts = Vec::new(); + + let mut iter = self.iterator(); + iter.first(&key); + while iter.valid() { + let key = iter.key(); + if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM { + break; // we are done with this fork + } + if key.lsn <= lsn { + let xid = key.tag.blknum; + let tag = BufferTag { + rel: RelTag { + forknum: pg_constants::PG_XACT_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE, + }; + let clog_page = self.get_page_at_lsn(tag, lsn)?; + let status = transaction_id_get_status(xid, &clog_page[..]); + if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { + gxacts.push(xid); + } + } + iter.next(); + } + return Ok(gxacts); + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct RepositoryKey { + pub tag: BufferTag, + pub lsn: Lsn, +} + +impl RepositoryKey { + fn pack(&self, buf: &mut BytesMut) { + self.tag.pack(buf); + buf.put_u64(self.lsn.0); + } + fn unpack(buf: &mut Bytes) -> RepositoryKey { + RepositoryKey { + tag: BufferTag::unpack(buf), + lsn: Lsn::from(buf.get_u64()), + } + } + + fn from_slice(slice: &[u8]) -> Self { + let mut buf = Bytes::copy_from_slice(slice); + Self::unpack(&mut buf) + } + + fn to_bytes(&self) -> BytesMut { + let mut buf = BytesMut::new(); + self.pack(&mut buf); + buf + } +} + +pub trait RepositoryIterator { + fn first(&mut self, key: &RepositoryKey); + fn last(&mut self, key: &RepositoryKey); + fn next(&mut self); + fn prev(&mut self); + fn valid(&self) -> bool; + fn key(&self) -> RepositoryKey; + fn value(&self) -> &[u8]; } #[derive(Clone)] diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 2ec40156d8..ec2a3826e4 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -5,9 +5,8 @@ // full page images, keyed by the RelFileNode, blocknumber, and the // LSN. -use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; +use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord, RepositoryKey, RepositoryIterator}; use crate::restore_local_repo::restore_timeline; -use crate::waldecoder::{Oid, TransactionId}; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; @@ -16,7 +15,6 @@ use crate::ZTimelineId; use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; -use postgres_ffi::nonrelfile_utils::transaction_id_get_status; use postgres_ffi::*; use std::cmp::min; use std::collections::HashMap; @@ -84,35 +82,6 @@ pub struct RocksTimeline { // stored directly in the cache entry in that you still need to run the WAL redo // routine to generate the page image. // -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -struct CacheKey { - pub tag: BufferTag, - pub lsn: Lsn, -} - -impl CacheKey { - fn pack(&self, buf: &mut BytesMut) { - self.tag.pack(buf); - buf.put_u64(self.lsn.0); - } - fn unpack(buf: &mut Bytes) -> CacheKey { - CacheKey { - tag: BufferTag::unpack(buf), - lsn: Lsn::from(buf.get_u64()), - } - } - - fn from_slice(slice: &[u8]) -> Self { - let mut buf = Bytes::copy_from_slice(slice); - Self::unpack(&mut buf) - } - - fn to_bytes(&self) -> BytesMut { - let mut buf = BytesMut::new(); - self.pack(&mut buf); - buf - } -} enum CacheEntryContent { PageImage(Bytes), @@ -190,6 +159,33 @@ impl RocksRepository { } } } +struct RocksIterator<'a> { + iter: rocksdb::DBRawIterator<'a>, +} + +impl<'a> RepositoryIterator for RocksIterator<'a> { + fn next(&mut self) { + self.iter.next() + } + fn prev(&mut self) { + self.iter.prev() + } + fn valid(&self) -> bool { + self.iter.valid() + } + fn first(&mut self, key: &RepositoryKey) { + self.iter.seek(key.to_bytes()) + } + fn last(&mut self, key: &RepositoryKey) { + self.iter.seek_for_prev(key.to_bytes()) + } + fn key(&self) -> RepositoryKey { + RepositoryKey::from_slice(self.iter.key().unwrap()) + } + fn value(&self) -> &[u8] { + self.iter.value().unwrap() + } +} // Get handle to a given timeline. It is assumed to already exist. impl Repository for RocksRepository { @@ -299,7 +295,7 @@ impl RocksTimeline { tag: BufferTag, lsn: Lsn, ) -> (Option, Vec) { - let key = CacheKey { tag, lsn }; + let key = RepositoryKey { tag, lsn }; let mut base_img: Option = None; let mut records: Vec = Vec::new(); @@ -309,7 +305,7 @@ impl RocksTimeline { // Scan backwards, collecting the WAL records, until we hit an // old page image. while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = RepositoryKey::from_slice(iter.key().unwrap()); if key.tag != tag { break; } @@ -344,7 +340,7 @@ impl RocksTimeline { fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result { assert!(lsn <= self.last_valid_lsn.load()); - let mut key = CacheKey { + let mut key = RepositoryKey { tag: BufferTag { rel, blknum: u32::MAX, @@ -355,7 +351,7 @@ impl RocksTimeline { loop { iter.seek_for_prev(key.to_bytes()); if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); + let thiskey = RepositoryKey::from_slice(iter.key().unwrap()); if thiskey.tag.rel == rel { let content = CacheEntryContent::from_slice(iter.value().unwrap()); if let CacheEntryContent::Truncation = content { @@ -383,7 +379,7 @@ impl RocksTimeline { // checked_sub() returns None on overflow. if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { - let mut maxkey = CacheKey { + let mut maxkey = RepositoryKey { tag: BufferTag { rel: RelTag { spcnode: u32::MAX, @@ -404,7 +400,7 @@ impl RocksTimeline { let mut iter = self.db.raw_iterator(); iter.seek_for_prev(maxkey.to_bytes()); if iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = RepositoryKey::from_slice(iter.key().unwrap()); let v = iter.value().unwrap(); inspected += 1; @@ -457,7 +453,7 @@ impl RocksTimeline { // do not remove last version if last_lsn > horizon { // locate most recent record before horizon - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = RepositoryKey::from_slice(iter.key().unwrap()); if key.tag == maxkey.tag { let v = iter.value().unwrap(); if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { @@ -495,7 +491,7 @@ impl RocksTimeline { if !iter.valid() { break; } - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = RepositoryKey::from_slice(iter.key().unwrap()); if key.tag != maxkey.tag { break; } @@ -526,35 +522,6 @@ impl RocksTimeline { } } } - - // - // Wait until WAL has been received up to the given LSN. - // - fn wait_lsn(&self, mut lsn: Lsn) -> Result { - // When invalid LSN is requested, it means "don't wait, return latest version of the page" - // This is necessary for bootstrap. - if lsn == Lsn(0) { - let last_valid_lsn = self.last_valid_lsn.load(); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - last_valid_lsn, - lsn - ); - lsn = last_valid_lsn; - } - //trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); - self.last_valid_lsn - .wait_for_timeout(lsn, TIMEOUT) - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {} to arrive", - lsn - ) - })?; - //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); - - Ok(lsn) - } } impl Timeline for RocksTimeline { @@ -572,13 +539,13 @@ impl Timeline for RocksTimeline { // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. - let key = CacheKey { tag, lsn }; + let key = RepositoryKey { tag, lsn }; let mut iter = self.db.raw_iterator(); iter.seek_for_prev(key.to_bytes()); if iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = RepositoryKey::from_slice(iter.key().unwrap()); if key.tag == tag { let content = CacheEntryContent::from_slice(iter.value().unwrap()); let page_img: Bytes; @@ -623,124 +590,14 @@ impl Timeline for RocksTimeline { self.relsize_get_nowait(rel, lsn) } - /// Get vector of prepared twophase transactions - fn get_twophase(&self, lsn: Lsn) -> Result> { - let key = CacheKey { - // minimal key - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::PG_TWOPHASE_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut gxacts = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM { - break; // we are done with this fork - } - if key.lsn <= lsn { - let xid = key.tag.blknum; - let tag = BufferTag { - rel: RelTag { - forknum: pg_constants::PG_XACT_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE, - }; - let clog_page = self.get_page_at_lsn(tag, lsn)?; - let status = transaction_id_get_status(xid, &clog_page[..]); - if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { - gxacts.push(xid); - } - } - iter.next(); - } - return Ok(gxacts); - } - - /// Get databases. This function is used to local pg_filenode.map files - fn get_databases(&self, lsn: Lsn) -> Result> { - let key = CacheKey { - // minimal key - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::PG_FILENODEMAP_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut dbs = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - let mut prev_tag = key.tag.rel; - while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { - break; // we are done with this fork - } - if key.tag.rel != prev_tag && key.lsn <= lsn { - prev_tag = key.tag.rel; - dbs.push(prev_tag); // collect unique tags - } - iter.next(); - } - return Ok(dbs); - } - - /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations - /// but can be also applied to normal relations. - fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> { - let _lsn = self.wait_lsn(lsn)?; - let mut key = CacheKey { - // minimal key to start with - tag: BufferTag { rel, blknum: 0 }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); // locate first entry - if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); - let tag = thiskey.tag; - if tag.rel == rel { - // still trversing this relation - let first_blknum = tag.blknum; - key.tag.blknum = u32::MAX; // maximal key - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.to_bytes()); // localte last entry - if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); - let last_blknum = thiskey.tag.blknum; - return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive - } - } - } - Ok((0, 0)) // empty range - } - - /// + /// /// Does relation exist at given LSN? /// /// FIXME: this actually returns true, if the relation exists at *any* LSN fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; - let key = CacheKey { + let key = RepositoryKey { tag: BufferTag { rel, blknum: u32::MAX, @@ -750,7 +607,7 @@ impl Timeline for RocksTimeline { let mut iter = self.db.raw_iterator(); iter.seek_for_prev(key.to_bytes()); if iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); + let key = RepositoryKey::from_slice(iter.key().unwrap()); if key.tag.rel == rel { debug!("Relation {} exists at {}", rel, lsn); return Ok(true); @@ -768,7 +625,7 @@ impl Timeline for RocksTimeline { /// fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { let lsn = rec.lsn; - let key = CacheKey { tag, lsn }; + let key = RepositoryKey { tag, lsn }; let content = CacheEntryContent::WALRecord(rec); @@ -798,7 +655,7 @@ impl Timeline for RocksTimeline { trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); for blknum in nblocks..old_rel_size { - let key = CacheKey { + let key = RepositoryKey { tag: BufferTag { rel, blknum }, lsn, }; @@ -815,7 +672,7 @@ impl Timeline for RocksTimeline { /// Get page image at particular LSN /// fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result> { - let key = CacheKey { tag, lsn }; + let key = RepositoryKey { tag, lsn }; if let Some(bytes) = self.db.get(key.to_bytes())? { let content = CacheEntryContent::from_slice(&bytes); if let CacheEntryContent::PageImage(img) = content { @@ -830,7 +687,7 @@ impl Timeline for RocksTimeline { /// fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { let img_len = img.len(); - let key = CacheKey { tag, lsn }; + let key = RepositoryKey { tag, lsn }; let content = CacheEntryContent::PageImage(img); let mut val_buf = content.to_bytes(); @@ -858,55 +715,14 @@ impl Timeline for RocksTimeline { self.num_page_images.fetch_add(1, Ordering::Relaxed); } - fn put_create_database( - &self, - lsn: Lsn, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> Result<()> { - let mut n = 0; - for forknum in &[ - pg_constants::MAIN_FORKNUM, - pg_constants::FSM_FORKNUM, - pg_constants::VISIBILITYMAP_FORKNUM, - pg_constants::INIT_FORKNUM, - pg_constants::PG_FILENODEMAP_FORKNUM, - ] { - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: *forknum, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - while iter.valid() { - let mut key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; + fn iterator(&self) -> Box { + Box::new(RocksIterator { + iter: self.db.raw_iterator(), + }) + } - let v = iter.value().unwrap(); - self.db.put(key.to_bytes(), v)?; - n += 1; - iter.next(); - } - } - info!( - "Create database {}/{}, copy {} entries", - tablespace_id, db_id, n - ); + fn put_raw_data(&self, key: RepositoryKey, data: &[u8]) -> Result<()> { + self.db.put(key.to_bytes(), data)?; Ok(()) } @@ -961,6 +777,36 @@ impl Timeline for RocksTimeline { self.last_valid_lsn.load() } + // + // Wait until WAL has been received up to the given LSN. + // + fn wait_lsn(&self, lsn: Lsn) -> Result { + let mut lsn = lsn; + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. + if lsn == Lsn(0) { + let last_valid_lsn = self.last_valid_lsn.load(); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + last_valid_lsn, + lsn + ); + lsn = last_valid_lsn; + } + //trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); + self.last_valid_lsn + .wait_for_timeout(lsn, TIMEOUT) + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn + ) + })?; + //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); + + Ok(lsn) + } + // // Get statistics to be displayed in the user interface. //