Refactoring of the Repository/Timeline stuff

- All timelines are now stored in the same rocksdb repository. The GET
  functions have been taught to follow the ancestors.

- Change the way relation size is stored. Instead of inserting "tombstone"
  entries for blocks that are truncated away, store relation size as
  separate key-value entry for each relation

- Add an abstraction for the key-value store: ObjectStore. It allows
  swapping RocksDB with some other key-value store easily. Perhaps we
  will write our own storage implementation using that interface, or
  perhaps we'll need a different abstraction, but this is a small
  improvement over status quo in any case.

- Garbage Collection is broken and commented out. It's not clear where and
  how it should be implemented.
This commit is contained in:
Heikki Linnakangas
2021-05-27 20:07:50 +03:00
parent d1d2d5ce69
commit 34f4207501
13 changed files with 1266 additions and 990 deletions

3
Cargo.lock generated
View File

@@ -181,6 +181,9 @@ name = "bytes"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
dependencies = [
"serde",
]
[[package]]
name = "cassowary"

View File

@@ -10,7 +10,7 @@ edition = "2018"
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"
bytes = "1.0.1"
bytes = { version = "1.0.1", features = ['serde'] }
byteorder = "1.4.3"
futures = "0.3.13"
lazy_static = "1.4.0"

View File

@@ -98,8 +98,11 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
// and we failed to run initdb again in the same directory. This has been solved for the
// rapid init+start case now, but the general race condition remains if you restart the
// server quickly.
let repo = crate::repository::rocksdb::RocksRepository::new(
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
let repo = crate::object_repository::ObjectRepository::new(
conf,
std::sync::Arc::new(storage),
std::sync::Arc::new(crate::walredo::DummyRedoManager {}),
);
let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?;

View File

@@ -8,10 +8,13 @@ use std::time::Duration;
pub mod basebackup;
pub mod branches;
pub mod object_repository;
pub mod object_store;
pub mod page_cache;
pub mod page_service;
pub mod repository;
pub mod restore_local_repo;
pub mod rocksdb_storage;
pub mod tui;
pub mod tui_event;
mod tui_logger;

View File

@@ -0,0 +1,956 @@
//!
//! Implementation of the Repository/Timeline traits, using a key-value store
//! (ObjectStore) to for the actual storage.
//!
//! This maps the relation-oriented operations in the Timeline interface into
//! objects stored in an ObjectStore. Relation size is stored as a separate object
//! in the key-value store. If a page is written beyond the current end-of-file,
//! we also insert the new size as a new "page version" in the key-value store.
//!
//! Also, this implements Copy-on-Write forking of timelines. For each timeline,
//! we store the parent timeline in the object store, in a little metadata blob.
//! When we need to find a version of a page, we walk the timeline history backwards
//! until we find the page we're looking for, making a separate lookup into the
//! key-value store for each timeline.
use crate::object_store::{ObjectKey, ObjectStore};
use crate::repository::*;
use crate::restore_local_repo::import_timeline_wal;
use crate::walredo::WalRedoManager;
use crate::{PageServerConf, ZTimelineId};
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
///
pub struct ObjectRepository {
obj_store: Arc<dyn ObjectStore>,
conf: &'static PageServerConf,
timelines: Mutex<HashMap<ZTimelineId, Arc<ObjectTimeline>>>,
walredo_mgr: Arc<dyn WalRedoManager>,
}
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(600);
impl ObjectRepository {
pub fn new(
conf: &'static PageServerConf,
obj_store: Arc<dyn ObjectStore>,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> ObjectRepository {
ObjectRepository {
conf,
obj_store,
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
}
}
}
impl Repository for ObjectRepository {
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
match timelines.get(&timelineid) {
Some(timeline) => Ok(timeline.clone()),
None => {
let timeline = ObjectTimeline::open(
Arc::clone(&self.obj_store),
timelineid,
self.walredo_mgr.clone(),
)?;
// Load any new WAL after the last checkpoint into the repository.
info!(
"Loading WAL for timeline {} starting at {}",
timelineid,
timeline.get_last_record_lsn()
);
let wal_dir = self.conf.timeline_path(timelineid).join("wal");
import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?;
let timeline_rc = Arc::new(timeline);
if self.conf.gc_horizon != 0 {
ObjectTimeline::launch_gc_thread(self.conf, timeline_rc.clone());
}
timelines.insert(timelineid, timeline_rc.clone());
Ok(timeline_rc)
}
}
}
/// Create a new, empty timeline. The caller is responsible for loading data into it
fn create_empty_timeline(
&self,
timelineid: ZTimelineId,
start_lsn: Lsn,
) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
// Write metadata key
let metadata = MetadataEntry {
last_valid_lsn: start_lsn,
last_record_lsn: start_lsn,
ancestor_timeline: None,
ancestor_lsn: start_lsn,
};
self.obj_store.put(
&timeline_metadata_key(timelineid),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
)?;
info!("Created empty timeline {}", timelineid);
let timeline = ObjectTimeline::open(
Arc::clone(&self.obj_store),
timelineid,
self.walredo_mgr.clone(),
)?;
let timeline_rc = Arc::new(timeline);
let r = timelines.insert(timelineid, timeline_rc.clone());
assert!(r.is_none());
// don't start the garbage collector for unit tests, either.
Ok(timeline_rc)
}
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
// just to check the source timeline exists
let _ = self.get_timeline(src)?;
// Write a metadata key, noting the ancestor of th new timeline. There is initially
// no data in it, but all the read-calls know to look into the ancestor.
let metadata = MetadataEntry {
last_valid_lsn: at_lsn,
last_record_lsn: at_lsn,
ancestor_timeline: Some(src),
ancestor_lsn: at_lsn,
};
self.obj_store.put(
&timeline_metadata_key(dst),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
)?;
Ok(())
}
}
///
/// A handle to a specific timeline in the repository. This is the API
/// that's exposed to the rest of the system.
///
pub struct ObjectTimeline {
timelineid: ZTimelineId,
// Backing key-value store
obj_store: Arc<dyn ObjectStore>,
// WAL redo manager, for reconstructing page versions from WAL records.
walredo_mgr: Arc<dyn WalRedoManager>,
// What page versions do we hold in the cache? If we get a request > last_valid_lsn,
// we need to wait until we receive all the WAL up to the request. The SeqWait
// provides functions for that. TODO: If we get a request for an old LSN, such that
// the versions have already been garbage collected away, we should throw an error,
// but we don't track that currently.
//
// last_record_lsn points to the end of last processed WAL record.
// It can lag behind last_valid_lsn, if the WAL receiver has received some WAL
// after the end of last record, but not the whole next record yet. In the
// page cache, we care about last_valid_lsn, but if the WAL receiver needs to
// restart the streaming, it needs to restart at the end of last record, so
// we track them separately. last_record_lsn should perhaps be in
// walreceiver.rs instead of here, but it seems convenient to keep all three
// values together.
//
last_valid_lsn: SeqWait<Lsn>,
last_record_lsn: AtomicLsn,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
}
impl ObjectTimeline {
/// Open a Timeline handle.
///
/// Loads the metadata for the timeline into memory.
fn open(
obj_store: Arc<dyn ObjectStore>,
timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> Result<ObjectTimeline> {
// Load metadata into memory
let v = obj_store
.get(&timeline_metadata_key(timelineid), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let metadata = MetadataEntry::des(&v)?;
let timeline = ObjectTimeline {
timelineid,
obj_store,
walredo_mgr,
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
ancestor_timeline: metadata.ancestor_timeline,
ancestor_lsn: metadata.ancestor_lsn,
};
Ok(timeline)
}
}
impl Timeline for ObjectTimeline {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
/// Look up given page in the cache.
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
let lsn = self.wait_lsn(req_lsn)?;
self.get_page_at_lsn_nowait(tag, lsn)
}
/// Get size of relation
fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
let lsn = self.wait_lsn(lsn)?;
match self.relsize_get_nowait(rel, lsn)? {
Some(nblocks) => Ok(nblocks),
None => bail!("relation {} not found at {}", rel, lsn),
}
}
/// Does relation exist at given LSN?
fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
let key = relation_size_key(self.timelineid, rel);
let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?;
if let Some((_key, _val)) = iter.next().transpose()? {
debug!("Relation {} exists at {}", rel, lsn);
return Ok(true);
}
debug!("Relation {} doesn't exist at {}", rel, lsn);
Ok(false)
}
/// Get a list of all distinct relations in given tablespace and database.
fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>> {
// List all relations in this timeline.
let mut all_rels = self
.obj_store
.list_rels(self.timelineid, spcnode, dbnode, lsn)?;
// Also list all relations in ancestor timelines. If a relation hasn't been modified
// after the fork, there will be no trace of it in the object store with the current
// timeline id.
let mut prev_timeline: Option<ZTimelineId> = self.ancestor_timeline;
let mut lsn = self.ancestor_lsn;
while let Some(timeline) = prev_timeline {
let this_rels = self.obj_store.list_rels(timeline, spcnode, dbnode, lsn)?;
for rel in this_rels {
all_rels.insert(rel);
}
// Load ancestor metadata.
let v = self
.obj_store
.get(&timeline_metadata_key(timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let metadata = MetadataEntry::des(&v)?;
prev_timeline = metadata.ancestor_timeline;
lsn = metadata.ancestor_lsn;
}
Ok(all_rels)
}
//------------------------------------------------------------------------------
// Public PUT functions, to update the repository with new page versions.
//
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Put a new page version that can be constructed from a WAL record
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn;
let key = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
};
let val = PageEntry::WALRecord(rec);
self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?;
debug!(
"put_wal_record rel {} blk {} at {}",
tag.rel, tag.blknum, lsn
);
// Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = RelationSizeEntry::Size(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
}
Ok(())
}
///
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
};
let val = PageEntry::Page(img);
self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?;
debug!(
"put_page_image rel {} blk {} at {}",
tag.rel, tag.blknum, lsn
);
// Also check if this created or extended the file
let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0);
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = RelationSizeEntry::Size(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
}
Ok(())
}
///
/// Adds a relation-wide WAL record (like truncate) to the repository,
/// associating it with all pages started with specified block number
///
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> {
let key = relation_size_key(self.timelineid, rel);
let val = RelationSizeEntry::Size(nblocks);
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
Ok(())
}
/// Remember the all WAL before the given LSN has been processed.
///
/// The WAL receiver calls this after the put_* functions, to indicate that
/// all WAL before this point has been digested. Before that, if you call
/// GET on an earlier LSN, it will block.
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last valid LSN backwards (was {}, new {})",
old, lsn
);
}
}
fn get_last_valid_lsn(&self) -> Lsn {
self.last_valid_lsn.load()
}
fn init_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
assert!(old == Lsn(0));
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old == Lsn(0));
}
/// Like `advance_last_valid_lsn`, but this always points to the end of
/// a WAL record, not in the middle of one.
///
/// This must be <= last valid LSN. This is tracked separately from last
/// valid LSN, so that the WAL receiver knows where to restart streaming.
///
/// NOTE: this updates last_valid_lsn as well.
fn advance_last_record_lsn(&self, lsn: Lsn) {
// Can't move backwards.
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old, lsn
);
}
}
fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load()
}
///
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
// Flush all the changes written so far with PUT functions to disk.
// RocksDB writes out things as we go (?), so we don't need to do much here. We just
// write out the last valid and record LSNs.
fn checkpoint(&self) -> Result<()> {
let metadata = MetadataEntry {
last_valid_lsn: self.last_valid_lsn.load(),
last_record_lsn: self.last_record_lsn.load(),
ancestor_timeline: self.ancestor_timeline,
ancestor_lsn: self.ancestor_lsn,
};
self.obj_store.put(
&timeline_metadata_key(self.timelineid),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
)?;
trace!("checkpoint at {}", metadata.last_valid_lsn);
Ok(())
}
}
impl ObjectTimeline {
fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result<Bytes> {
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let searchkey = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
if let Some((version_lsn, value)) = iter.next().transpose()? {
let page_img: Bytes;
match PageEntry::des(&value)? {
PageEntry::Page(img) => {
page_img = img;
}
PageEntry::WALRecord(_rec) => {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone())?;
}
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
trace!(
"Returning page with LSN {:X}/{:X} for {} blk {} from {} (request {})",
page_lsn_hi,
page_lsn_lo,
tag.rel,
tag.blknum,
version_lsn,
lsn
);
return Ok(page_img);
}
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
trace!("page {} blk {} at {} not found", tag.rel, tag.blknum, lsn);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
///
/// Internal function to get relation size at given LSN.
///
/// The caller must ensure that WAL has been received up to 'lsn'.
///
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<Option<u32>> {
let key = relation_size_key(self.timelineid, rel);
let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?;
if let Some((version_lsn, value)) = iter.next().transpose()? {
match RelationSizeEntry::des(&value)? {
RelationSizeEntry::Size(nblocks) => {
trace!(
"relation {} has size {} at {} (request {})",
rel,
nblocks,
version_lsn,
lsn
);
Ok(Some(nblocks))
}
RelationSizeEntry::Unlink => {
trace!(
"relation {} not found; it was dropped at lsn {}",
rel,
version_lsn
);
Ok(None)
}
}
} else {
info!("relation {} not found at {}", rel, lsn);
Ok(None)
}
}
///
/// Collect all the WAL records that are needed to reconstruct a page
/// image for the given cache entry.
///
/// Returns an old page image (if any), and a vector of WAL records to apply
/// over it.
///
fn collect_records_for_apply(
&self,
tag: BufferTag,
lsn: Lsn,
) -> Result<(Option<Bytes>, Vec<WALRecord>)> {
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
let searchkey = ObjectKey {
timeline: self.timelineid,
buf_tag: tag,
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
while let Some((_key, value)) = iter.next().transpose()? {
match PageEntry::des(&value)? {
PageEntry::Page(img) => {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img);
break;
}
PageEntry::WALRecord(rec) => {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
}
}
}
records.reverse();
Ok((base_img, records))
}
fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc<ObjectTimeline>) {
let _gc_thread = thread::Builder::new()
.name("Garbage collection thread".into())
.spawn(move || {
// FIXME
timeline_rc.do_gc(conf).expect("GC thread died");
})
.unwrap();
}
fn do_gc(&self, conf: &'static PageServerConf) -> Result<()> {
loop {
thread::sleep(conf.gc_period);
// FIXME: broken
/*
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
let mut maxkey = StorageKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: Lsn::MAX,
};
let now = Instant::now();
let mut reconstructed = 0u64;
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
let key = StorageKey::des(iter.key().unwrap());
let val = StorageValue::des(iter.value().unwrap());
inspected += 1;
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// reconstruct most recent page version
if let StorageValueContent::Image(_) = val.content {
// force reconstruction of most recent page version
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
trace!(
"Reconstruct most recent page {} blk {} at {} from {} records",
key.tag.rel,
key.tag.blknum,
key.lsn,
records.len()
);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
reconstructed += 1;
}
iter.seek_for_prev(maxkey.to_bytes());
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let key = StorageKey::des(iter.key().unwrap());
if key.tag == maxkey.tag {
let val = StorageValue::des(iter.value().unwrap());
if let StorageValueContent::Image(_) = val.content {
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
trace!("Reconstruct horizon page {} blk {} at {} from {} records",
key.tag.rel, key.tag.blknum, key.lsn, records.len());
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
truncated += 1;
} else {
trace!(
"Keeping horizon page {} blk {} at {}",
key.tag.rel,
key.tag.blknum,
key.lsn
);
}
}
} else {
trace!(
"Last page {} blk {} at {}, horizon {}",
key.tag.rel,
key.tag.blknum,
key.lsn,
horizon
);
}
// remove records prior to horizon
loop {
iter.prev();
if !iter.valid() {
break;
}
let key = StorageKey::des(iter.key().unwrap());
if key.tag != maxkey.tag {
break;
}
let mut val = StorageValue::des(iter.value().unwrap());
if val.alive {
val.alive = false;
self.storage.put(key, val)?;
deleted += 1;
trace!(
"deleted: {} blk {} at {}",
key.tag.rel,
key.tag.blknum,
key.lsn
);
} else {
break;
}
}
}
maxkey = minkey;
} else {
break;
}
}
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, reconstructed, truncated, deleted);
}
*/
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
///
/// Iterate through object versions with given key, in reverse LSN order.
///
/// This implements following the timeline history over the plain
/// ObjectStore::object_versions function, which doesn't know
/// about the relationships between timeline.
///
fn object_versions<'a>(
&self,
obj_store: &'a dyn ObjectStore,
key: &ObjectKey,
lsn: Lsn,
) -> Result<ObjectVersionIter<'a>> {
let current_iter = obj_store.object_versions(key, lsn)?;
Ok(ObjectVersionIter {
obj_store,
buf_tag: key.buf_tag,
current_iter,
ancestor_timeline: self.ancestor_timeline,
ancestor_lsn: self.ancestor_lsn,
})
}
}
///
/// We store two kinds of page versions in the repository:
///
/// 1. Ready-made images of the block
/// 2. WAL records, to be applied on top of the "previous" entry
///
/// Some WAL records will initialize the page from scratch. For such records,
/// the 'will_init' flag is set. They don't need the previous page image before
/// applying. The 'will_init' flag is set for records containing a full-page image,
/// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
/// stored directly in the cache entry in that you still need to run the WAL redo
/// routine to generate the page image.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum PageEntry {
Page(Bytes),
WALRecord(WALRecord),
}
///
/// In addition to page versions, we store relation size as a separate, versioned,
/// object.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RelationSizeEntry {
Size(u32),
/// Tombstone for a dropped relation.
//
// TODO: Not used. Currently, we never drop relations. The parsing
// of relation drops in COMMIT/ABORT records has not been
// implemented. We should also have a mechanism to remove
// "orphaned" relfiles, if the compute node crashes before writing
// the COMMIT/ABORT record.
Unlink,
}
const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey {
ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
rel,
blknum: u32::MAX,
},
}
}
///
/// In addition to those per-page and per-relation entries, we also
/// store a little metadata blob for each timeline. It is stored using
/// STORAGE_SPECIAL_FORKNUM.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetadataEntry {
last_valid_lsn: Lsn,
last_record_lsn: Lsn,
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
}
const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey {
ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
rel: RelTag {
forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: 0,
},
}
}
///
/// Iterator for `object_versions`. Returns all page versions of a given block, in
/// reverse LSN order. This implements the traversal of ancestor timelines. If
/// a page isn't found in the most recent timeline, this iterates to the parent,
/// until a page version is found.
///
struct ObjectVersionIter<'a> {
obj_store: &'a dyn ObjectStore,
buf_tag: BufferTag,
/// Iterator on the current timeline.
current_iter: Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>,
/// Ancestor of the current timeline being iterated.
ancestor_timeline: Option<ZTimelineId>,
ancestor_lsn: Lsn,
}
impl<'a> Iterator for ObjectVersionIter<'a> {
type Item = Result<(Lsn, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
}
}
impl<'a> ObjectVersionIter<'a> {
///
/// "transposed" version of the standard Iterator::next function.
///
/// The rust standard Iterator::next function returns an
/// Option of a Result, but it's more convenient to work with
/// Result of a Option so that you can use ? to check for errors.
///
fn next_result(&mut self) -> Result<Option<(Lsn, Vec<u8>)>> {
loop {
// If there is another entry on the current timeline, return it.
if let Some(result) = self.current_iter.next() {
return Ok(Some(result));
}
// Out of entries on this timeline. Move to the ancestor, if any.
if let Some(ancestor_timeline) = self.ancestor_timeline {
let searchkey = ObjectKey {
timeline: ancestor_timeline,
buf_tag: self.buf_tag,
};
let ancestor_iter = self
.obj_store
.object_versions(&searchkey, self.ancestor_lsn)?;
// Load the parent timeline's metadata. (We don't
// actually need it yet, only if we need to follow to
// the grandparent timeline)
let v = self
.obj_store
.get(&timeline_metadata_key(ancestor_timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let ancestor_metadata = MetadataEntry::des(&v)?;
self.ancestor_timeline = ancestor_metadata.ancestor_timeline;
self.ancestor_lsn = ancestor_metadata.ancestor_lsn;
self.current_iter = ancestor_iter;
} else {
return Ok(None);
}
}
}
}

View File

@@ -0,0 +1,61 @@
use crate::repository::{BufferTag, RelTag};
use crate::ZTimelineId;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::iter::Iterator;
use zenith_utils::lsn::Lsn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectKey {
pub timeline: ZTimelineId,
pub buf_tag: BufferTag,
}
///
/// Low-level storage abstraction.
///
/// All the data in the repository is stored in a key-value store. This trait
/// abstracts the details of the key-value store.
///
/// A simple key-value store would support just GET and PUT operations with
/// a key, but the upper layer needs slightly complicated read operations
///
/// The most frequently used function is 'object_versions'. It is used
/// to look up a page version. It is LSN aware, in that the caller
/// specifies an LSN, and the function returns all values for that
/// block with the same or older LSN.
///
pub trait ObjectStore: Send + Sync {
///
/// Store a value with given key.
///
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()>;
/// Read entry with the exact given key.
///
/// This is used for retrieving metadata with special key that doesn't
/// correspond to any real relation.
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>>;
/// Iterate through all page versions of one object.
///
/// Returns all page versions in descending LSN order, along with the LSN
/// of each page version.
fn object_versions<'a>(
&'a self,
key: &ObjectKey,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>>;
/// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'.
///
/// This is used to implement 'create database'
fn list_rels(
&self,
timelineid: ZTimelineId,
spcnode: u32,
dbnode: u32,
lsn: Lsn,
) -> Result<HashSet<RelTag>>;
}

View File

@@ -3,30 +3,33 @@
//! isn't much here. If we implement multi-tenancy, this will probably be changed into
//! a hash map, keyed by the tenant ID.
use crate::repository::rocksdb::RocksRepository;
use crate::object_repository::ObjectRepository;
use crate::repository::Repository;
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use lazy_static::lazy_static;
use std::sync::{Arc, Mutex};
lazy_static! {
pub static ref REPOSITORY: Mutex<Option<Arc<dyn Repository + Send + Sync>>> = Mutex::new(None);
pub static ref REPOSITORY: Mutex<Option<Arc<dyn Repository>>> = Mutex::new(None);
}
pub fn init(conf: &'static PageServerConf) {
let mut m = REPOSITORY.lock().unwrap();
let obj_store = RocksObjectStore::open(conf).unwrap();
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf);
// we have already changed current dir to the repository.
let repo = RocksRepository::new(conf, Arc::new(walredo_mgr));
let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr));
*m = Some(Arc::new(repo));
}
pub fn get_repository() -> Arc<dyn Repository + Send + Sync> {
pub fn get_repository() -> Arc<dyn Repository> {
let o = &REPOSITORY.lock().unwrap();
Arc::clone(o.as_ref().unwrap())
}

View File

@@ -1,5 +1,3 @@
pub mod rocksdb;
use crate::ZTimelineId;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -13,7 +11,7 @@ use zenith_utils::lsn::Lsn;
///
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository {
pub trait Repository: Send + Sync {
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;
@@ -30,7 +28,7 @@ pub trait Repository {
//fn get_stats(&self) -> RepositoryStats;
}
pub trait Timeline {
pub trait Timeline: Send + Sync {
//------------------------------------------------------------------------------
// Public GET functions
//------------------------------------------------------------------------------
@@ -57,10 +55,10 @@ pub trait Timeline {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord);
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>;
/// Like put_wal_record, but with ready-made image of the page.
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes);
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>;
/// Truncate relation
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>;
@@ -156,7 +154,7 @@ pub struct BufferTag {
pub blknum: u32,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WALRecord {
pub lsn: Lsn, // LSN at the *end* of the record
pub will_init: bool,
@@ -196,6 +194,8 @@ impl WALRecord {
#[cfg(test)]
mod tests {
use super::*;
use crate::object_repository::ObjectRepository;
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
use postgres_ffi::pg_constants;
@@ -250,9 +250,11 @@ mod tests {
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let obj_store = RocksObjectStore::create(conf)?;
let walredo_mgr = TestRedoManager {};
let repo = rocksdb::RocksRepository::new(conf, Arc::new(walredo_mgr));
let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr));
Ok(Box::new(repo))
}
@@ -269,21 +271,17 @@ mod tests {
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
tline.init_valid_lsn(Lsn(1));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"));
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"));
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"));
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"));
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?;
tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?;
tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?;
tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?;
tline.advance_last_valid_lsn(Lsn(5));
// FIXME: The rocksdb implementation erroneously returns 'true' here, even
// though the relation was created only at a later LSN
// rocksdb implementation erroneosly returns 'true' here
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false
// And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, false);
assert!(tline.get_rel_size(TESTREL_A, Lsn(1)).is_err());
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(2))?, true);
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(2))?, 1);
@@ -364,7 +362,7 @@ mod tests {
for i in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn)));
lsn += 1;
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img);
tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?;
}
tline.advance_last_valid_lsn(Lsn(lsn));

View File

@@ -1,955 +0,0 @@
//
// A Repository holds all the different page versions and WAL records
//
// This implementation uses RocksDB to store WAL wal records and
// full page images, keyed by the RelFileNode, blocknumber, and the
// LSN.
use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord};
use crate::restore_local_repo::import_timeline_wal;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::{anyhow, bail, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use postgres_ffi::pg_constants;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::{AtomicLsn, Lsn};
use zenith_utils::seqwait::SeqWait;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(600);
pub struct RocksRepository {
conf: &'static PageServerConf,
timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>,
walredo_mgr: Arc<dyn WalRedoManager>,
}
pub struct RocksTimeline {
// RocksDB handle
db: rocksdb::DB,
// WAL redo manager
walredo_mgr: Arc<dyn WalRedoManager>,
// What page versions do we hold in the cache? If we get a request > last_valid_lsn,
// we need to wait until we receive all the WAL up to the request. The SeqWait
// provides functions for that. TODO: If we get a request for an old LSN, such that
// the versions have already been garbage collected away, we should throw an error,
// but we don't track that currently.
//
// last_record_lsn points to the end of last processed WAL record.
// It can lag behind last_valid_lsn, if the WAL receiver has received some WAL
// after the end of last record, but not the whole next record yet. In the
// page cache, we care about last_valid_lsn, but if the WAL receiver needs to
// restart the streaming, it needs to restart at the end of last record, so
// we track them separately. last_record_lsn should perhaps be in
// walreceiver.rs instead of here, but it seems convenient to keep all three
// values together.
//
last_valid_lsn: SeqWait<Lsn>,
last_record_lsn: AtomicLsn,
// Counters, for metrics collection.
pub num_entries: AtomicU64,
pub num_page_images: AtomicU64,
pub num_wal_records: AtomicU64,
pub num_getpage_requests: AtomicU64,
}
//
// We store two kinds of entries in the repository:
//
// 1. Ready-made images of the block
// 2. WAL records, to be applied on top of the "previous" entry
//
// Some WAL records will initialize the page from scratch. For such records,
// the 'will_init' flag is set. They don't need the previous page image before
// applying. The 'will_init' flag is set for records containing a full-page image,
// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages
// stored directly in the cache entry in that you still need to run the WAL redo
// routine to generate the page image.
//
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)]
struct CacheKey {
pub tag: BufferTag,
pub lsn: Lsn,
}
//
// In addition to those per-page entries, the 'last_valid_lsn' and 'last_record_lsn'
// values are also persisted in the rocskdb repository. They are stored with CacheKeys
// with ROCKSDB_SPECIAL_FORKNUM, and 'blknum' indicates which value it is. The
// rest of the key fields are zero. We use a CacheKey as the key for these too,
// so that whenever we iterate through keys in the repository, we can safely parse
// the key blob as CacheKey without checking for these special values first.
//
// FIXME: This is quite a similar concept to the special entries created by
// `BufferTag::fork` function. Merge them somehow? These special keys are specific
// to the rocksb implementation, not exposed to the rest of the system, but the
// other special forks created by `BufferTag::fork` are also used elsewhere.
//
impl CacheKey {
const fn special(id: u32) -> CacheKey {
CacheKey {
tag: BufferTag {
rel: RelTag {
forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM,
spcnode: 0,
dbnode: 0,
relnode: 0,
},
blknum: id,
},
lsn: Lsn(0),
}
}
fn is_special(&self) -> bool {
self.tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM
}
}
static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0);
static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1);
enum CacheEntryContent {
PageImage(Bytes),
WALRecord(WALRecord),
Truncation,
}
// The serialized representation of a CacheEntryContent begins with
// single byte that indicates what kind of entry it is. There is also
// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent
// at all, you must peek into the first byte of the serialized representation
// to read it.
const CONTENT_PAGE_IMAGE: u8 = 1u8;
const CONTENT_WAL_RECORD: u8 = 2u8;
const CONTENT_TRUNCATION: u8 = 3u8;
const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above
const UNUSED_VERSION_FLAG: u8 = 4u8;
impl CacheEntryContent {
pub fn pack(&self, buf: &mut BytesMut) {
match self {
CacheEntryContent::PageImage(image) => {
buf.put_u8(CONTENT_PAGE_IMAGE);
buf.put_u16(image.len() as u16);
buf.put_slice(&image[..]);
}
CacheEntryContent::WALRecord(rec) => {
buf.put_u8(CONTENT_WAL_RECORD);
rec.pack(buf);
}
CacheEntryContent::Truncation => {
buf.put_u8(CONTENT_TRUNCATION);
}
}
}
pub fn unpack(buf: &mut Bytes) -> CacheEntryContent {
let kind = buf.get_u8() & CONTENT_KIND_MASK;
match kind {
CONTENT_PAGE_IMAGE => {
let len = buf.get_u16() as usize;
let mut dst = vec![0u8; len];
buf.copy_to_slice(&mut dst);
CacheEntryContent::PageImage(Bytes::from(dst))
}
CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)),
CONTENT_TRUNCATION => CacheEntryContent::Truncation,
_ => unreachable!(),
}
}
fn from_slice(slice: &[u8]) -> Self {
let mut buf = Bytes::copy_from_slice(slice);
Self::unpack(&mut buf)
}
fn to_bytes(&self) -> BytesMut {
let mut buf = BytesMut::new();
self.pack(&mut buf);
buf
}
}
impl RocksRepository {
pub fn new(
conf: &'static PageServerConf,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> RocksRepository {
RocksRepository {
conf,
timelines: Mutex::new(HashMap::new()),
walredo_mgr,
}
}
}
impl RocksRepository {
fn get_rocks_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<RocksTimeline>> {
let mut timelines = self.timelines.lock().unwrap();
match timelines.get(&timelineid) {
Some(timeline) => Ok(timeline.clone()),
None => {
let timeline =
RocksTimeline::open(self.conf, timelineid, self.walredo_mgr.clone())?;
// Load any new WAL after the last checkpoint into the repository.
info!(
"Loading WAL for timeline {} starting at {}",
timelineid,
timeline.get_last_record_lsn()
);
let wal_dir = self.conf.timeline_path(timelineid).join("wal");
import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?;
let timeline_rc = Arc::new(timeline);
if self.conf.gc_horizon != 0 {
RocksTimeline::launch_gc_thread(self.conf, timeline_rc.clone());
}
timelines.insert(timelineid, timeline_rc.clone());
Ok(timeline_rc)
}
}
}
}
// Get handle to a given timeline. It is assumed to already exist.
impl Repository for RocksRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>> {
Ok(self.get_rocks_timeline(timelineid)?)
}
fn create_empty_timeline(
&self,
timelineid: ZTimelineId,
start_lsn: Lsn,
) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
let timeline =
RocksTimeline::create(&self.conf, timelineid, self.walredo_mgr.clone(), start_lsn)?;
let timeline_rc = Arc::new(timeline);
let r = timelines.insert(timelineid, timeline_rc.clone());
assert!(r.is_none());
// don't start the garbage collector for unit tests, either.
Ok(timeline_rc)
}
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> {
let src_timeline = self.get_rocks_timeline(src)?;
info!("branching at {}", at_lsn);
let dst_timeline =
RocksTimeline::create(&self.conf, dst, self.walredo_mgr.clone(), at_lsn)?;
// Copy all entries <= LSN
//
// This is very inefficient, a far cry from the promise of cheap copy-on-write
// branching. But it will do for now.
let mut iter = src_timeline.db.raw_iterator();
iter.seek_to_first();
while iter.valid() {
let k = iter.key().unwrap();
let key = CacheKey::des(k)?;
if !key.is_special() && key.lsn <= at_lsn {
let v = iter.value().unwrap();
dst_timeline.db.put(k, v)?;
}
iter.next();
}
Ok(())
}
}
impl RocksTimeline {
/// common options used by `open` and `create`
fn get_rocksdb_opts() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
}
});
opts
}
/// Open a RocksDB database, and load the last valid and record LSNs into memory.
fn open(
conf: &PageServerConf,
timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>,
) -> Result<RocksTimeline> {
let path = conf.timeline_path(timelineid);
let db = rocksdb::DB::open(&RocksTimeline::get_rocksdb_opts(), path)?;
// Load these into memory
let lsnstr = db
.get(LAST_VALID_LSN_KEY.ser()?)
.with_context(|| "last_valid_lsn not found in repository")?
.ok_or(anyhow!("empty last_valid_lsn"))?;
let last_valid_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?;
let lsnstr = db
.get(LAST_VALID_RECORD_LSN_KEY.ser()?)
.with_context(|| "last_record_lsn not found in repository")?
.ok_or(anyhow!("empty last_record_lsn"))?;
let last_record_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?;
let timeline = RocksTimeline {
db,
walredo_mgr,
last_valid_lsn: SeqWait::new(last_valid_lsn),
last_record_lsn: AtomicLsn::new(last_record_lsn.0),
num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
num_wal_records: AtomicU64::new(0),
num_getpage_requests: AtomicU64::new(0),
};
Ok(timeline)
}
/// Create a new RocksDB database. It is initally empty, except for the last
/// valid and last record LSNs, which are set to 'start_lsn'.
fn create(
conf: &PageServerConf,
timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>,
start_lsn: Lsn,
) -> Result<RocksTimeline> {
let path = conf.timeline_path(timelineid);
let mut opts = RocksTimeline::get_rocksdb_opts();
opts.create_if_missing(true);
opts.set_error_if_exists(true);
let db = rocksdb::DB::open(&opts, path)?;
let timeline = RocksTimeline {
db,
walredo_mgr,
last_valid_lsn: SeqWait::new(start_lsn),
last_record_lsn: AtomicLsn::new(start_lsn.0),
num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
num_wal_records: AtomicU64::new(0),
num_getpage_requests: AtomicU64::new(0),
};
// Write the initial last_valid/record_lsn values
timeline.checkpoint()?;
Ok(timeline)
}
fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc<RocksTimeline>) {
let timeline_rc_copy = timeline_rc.clone();
let _gc_thread = thread::Builder::new()
.name("Garbage collection thread".into())
.spawn(move || {
// FIXME
timeline_rc_copy.do_gc(conf).expect("GC thread died");
})
.unwrap();
}
///
/// Collect all the WAL records that are needed to reconstruct a page
/// image for the given cache entry.
///
/// Returns an old page image (if any), and a vector of WAL records to apply
/// over it.
///
fn collect_records_for_apply(
&self,
tag: BufferTag,
lsn: Lsn,
) -> (Option<Bytes>, Vec<WALRecord>) {
let key = CacheKey { tag, lsn };
let mut base_img: Option<Bytes> = None;
let mut records: Vec<WALRecord> = Vec::new();
let mut iter = self.db.raw_iterator();
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
iter.seek_for_prev(serialized_key);
// Scan backwards, collecting the WAL records, until we hit an
// old page image.
while iter.valid() {
let key = CacheKey::des(iter.key().unwrap()).unwrap();
if key.tag != tag {
break;
}
let content = CacheEntryContent::from_slice(iter.value().unwrap());
if let CacheEntryContent::PageImage(img) = content {
// We have a base image. No need to dig deeper into the list of
// records
base_img = Some(img);
break;
} else if let CacheEntryContent::WALRecord(rec) = content {
records.push(rec.clone());
// If this WAL record initializes the page, no need to dig deeper.
if rec.will_init {
break;
}
} else {
panic!("no base image and no WAL record on cache entry");
}
iter.prev();
}
records.reverse();
(base_img, records)
}
// Internal functions
//
// Internal function to get relation size at given LSN.
//
// The caller must ensure that WAL has been received up to 'lsn'.
//
fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
assert!(lsn <= self.last_valid_lsn.load());
let mut key = CacheKey {
tag: BufferTag {
rel,
blknum: u32::MAX,
},
lsn,
};
let mut iter = self.db.raw_iterator();
loop {
iter.seek_for_prev(key.ser()?);
if iter.valid() {
let thiskey = CacheKey::des(iter.key().unwrap())?;
if thiskey.tag.rel == rel {
// Ignore entries with later LSNs.
if thiskey.lsn > lsn {
key.tag.blknum = thiskey.tag.blknum;
continue;
}
let content = CacheEntryContent::from_slice(iter.value().unwrap());
if let CacheEntryContent::Truncation = content {
if thiskey.tag.blknum > 0 {
key.tag.blknum = thiskey.tag.blknum - 1;
continue;
}
break;
}
let relsize = thiskey.tag.blknum + 1;
debug!("Size of relation {} at {} is {}", rel, lsn, relsize);
return Ok(relsize);
}
}
break;
}
debug!("Size of relation {} at {} is zero", rel, lsn);
Ok(0)
}
fn do_gc(&self, conf: &'static PageServerConf) -> Result<Bytes> {
loop {
thread::sleep(conf.gc_period);
let last_lsn = self.get_last_valid_lsn();
// checked_sub() returns None on overflow.
if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) {
let mut maxkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: u32::MAX,
dbnode: u32::MAX,
relnode: u32::MAX,
forknum: u8::MAX,
},
blknum: u32::MAX,
},
lsn: Lsn::MAX,
};
let now = Instant::now();
let mut reconstructed = 0u64;
let mut truncated = 0u64;
let mut inspected = 0u64;
let mut deleted = 0u64;
loop {
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(maxkey.ser()?);
if iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
let v = iter.value().unwrap();
inspected += 1;
// Construct boundaries for old records cleanup
maxkey.tag = key.tag;
let last_lsn = key.lsn;
maxkey.lsn = min(horizon, last_lsn); // do not remove last version
let mut minkey = maxkey.clone();
minkey.lsn = Lsn(0); // first version
// reconstruct most recent page version
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
// force reconstruction of most recent page version
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
trace!(
"Reconstruct most recent page {} blk {} at {} from {} records",
key.tag.rel,
key.tag.blknum,
key.lsn,
records.len()
);
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
reconstructed += 1;
}
iter.seek_for_prev(maxkey.ser()?);
if iter.valid() {
// do not remove last version
if last_lsn > horizon {
// locate most recent record before horizon
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag == maxkey.tag {
let v = iter.value().unwrap();
if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD {
let (base_img, records) =
self.collect_records_for_apply(key.tag, key.lsn);
trace!("Reconstruct horizon page {} blk {} at {} from {} records",
key.tag.rel, key.tag.blknum, key.lsn, records.len());
let new_img = self
.walredo_mgr
.request_redo(key.tag, key.lsn, base_img, records)?;
self.put_page_image(key.tag, key.lsn, new_img.clone());
truncated += 1;
} else {
trace!(
"Keeping horizon page {} blk {} at {}",
key.tag.rel,
key.tag.blknum,
key.lsn
);
}
}
} else {
trace!(
"Last page {} blk {} at {}, horizon {}",
key.tag.rel,
key.tag.blknum,
key.lsn,
horizon
);
}
// remove records prior to horizon
loop {
iter.prev();
if !iter.valid() {
break;
}
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag != maxkey.tag {
break;
}
let v = iter.value().unwrap();
if (v[0] & UNUSED_VERSION_FLAG) == 0 {
let mut v = v.to_owned();
v[0] |= UNUSED_VERSION_FLAG;
self.db.put(key.ser()?, &v[..])?;
deleted += 1;
trace!(
"deleted: {} blk {} at {}",
key.tag.rel,
key.tag.blknum,
key.lsn
);
} else {
break;
}
}
}
maxkey = minkey;
} else {
break;
}
}
info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted",
now.elapsed(), inspected, reconstructed, truncated, deleted);
}
}
}
//
// Wait until WAL has been received up to the given LSN.
//
fn wait_lsn(&self, mut lsn: Lsn) -> Result<Lsn> {
// When invalid LSN is requested, it means "don't wait, return latest version of the page"
// This is necessary for bootstrap.
if lsn == Lsn(0) {
let last_valid_lsn = self.last_valid_lsn.load();
trace!(
"walreceiver doesn't work yet last_valid_lsn {}, requested {}",
last_valid_lsn,
lsn
);
lsn = last_valid_lsn;
}
trace!(
"Start waiting for LSN {}, valid LSN is {}",
lsn,
self.last_valid_lsn.load()
);
self.last_valid_lsn
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}",
lsn,
self.last_valid_lsn.load(),
)
})?;
//trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load());
Ok(lsn)
}
}
impl Timeline for RocksTimeline {
// Public GET interface functions
///
/// GetPage@LSN
///
/// Returns an 8k page image
///
fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
let lsn = self.wait_lsn(req_lsn)?;
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
let key = CacheKey { tag, lsn };
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.ser()?);
if iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag == tag {
let content = CacheEntryContent::from_slice(iter.value().unwrap());
let page_img: Bytes;
if let CacheEntryContent::PageImage(img) = content {
page_img = img;
} else if let CacheEntryContent::WALRecord(_rec) = content {
// Request the WAL redo manager to apply the WAL records for us.
let (base_img, records) = self.collect_records_for_apply(tag, lsn);
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
self.put_page_image(tag, lsn, page_img.clone());
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
}
// FIXME: assumes little-endian. Only used for the debugging log though
let page_lsn_hi =
u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap());
let page_lsn_lo =
u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap());
debug!(
"Returning page with LSN {:X}/{:X} for {} blk {}",
page_lsn_hi, page_lsn_lo, tag.rel, tag.blknum
);
return Ok(page_img);
}
}
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
debug!(
"Page {} blk {} at {}({}) not found",
tag.rel, tag.blknum, req_lsn, lsn
);
Ok(Bytes::from_static(&ZERO_PAGE))
/* return Err("could not find page image")?; */
}
///
/// Get size of relation at given LSN.
///
fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result<u32> {
let lsn = self.wait_lsn(lsn)?;
self.relsize_get_nowait(rel, lsn)
}
///
/// Does relation exist at given LSN?
///
/// FIXME: this actually returns true, if the relation exists at *any* LSN
fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result<bool> {
let lsn = self.wait_lsn(req_lsn)?;
let key = CacheKey {
tag: BufferTag {
rel,
blknum: u32::MAX,
},
lsn,
};
let mut iter = self.db.raw_iterator();
iter.seek_for_prev(key.ser()?);
if iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel == rel {
debug!("Relation {} exists at {}", rel, lsn);
return Ok(true);
}
}
debug!("Relation {} doesn't exist at {}", rel, lsn);
Ok(false)
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
/// through all entries in the given database. In practice, this
/// is used for CREATE DATABASE, and usually the template database is small.
/// But if it's not, this will be slow.
fn list_rels<'a>(&'a self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result<HashSet<RelTag>> {
trace!("list_rels spcnode {} dbnode {} at {}", spcnode, dbnode, lsn);
let mut rels: HashSet<RelTag> = HashSet::new();
let searchkey = CacheKey {
tag: BufferTag {
rel: RelTag {
spcnode: spcnode,
dbnode: dbnode,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(searchkey.ser()?);
while iter.valid() {
let key = CacheKey::des(iter.key().unwrap())?;
if key.tag.rel.spcnode != spcnode || key.tag.rel.dbnode != dbnode {
break;
}
if key.lsn < lsn {
rels.insert(key.tag.rel);
}
iter.next();
}
Ok(rels)
}
// Other public functions, for updating the repository.
// These are used by the WAL receiver and WAL redo.
///
/// Adds a WAL record to the repository
///
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let lsn = rec.lsn;
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::WALRecord(rec);
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
let _res = self.db.put(serialized_key, content.to_bytes());
trace!(
"put_wal_record rel {} blk {} at {}",
tag.rel,
tag.blknum,
lsn
);
self.num_entries.fetch_add(1, Ordering::Relaxed);
self.num_wal_records.fetch_add(1, Ordering::Relaxed);
}
///
/// Adds a relation-wide WAL record (like truncate) to the repository,
/// associating it with all pages started with specified block number
///
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> {
// What was the size of the relation before this record?
let last_lsn = self.last_valid_lsn.load();
let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?;
let content = CacheEntryContent::Truncation;
// set new relation size
trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
for blknum in nblocks..old_rel_size {
let key = CacheKey {
tag: BufferTag { rel, blknum },
lsn,
};
trace!("put_wal_record lsn: {}", key.lsn);
let _res = self.db.put(key.ser()?, content.to_bytes());
}
let n = (old_rel_size - nblocks) as u64;
self.num_entries.fetch_add(n, Ordering::Relaxed);
self.num_wal_records.fetch_add(n, Ordering::Relaxed);
Ok(())
}
///
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) {
let img_len = img.len();
let key = CacheKey { tag, lsn };
let content = CacheEntryContent::PageImage(img);
let mut val_buf = content.to_bytes();
// Zero size of page image indicates that page can be removed
if img_len == 0 {
if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 {
// records already marked for deletion
return;
} else {
// delete truncated multixact page
val_buf[0] |= UNUSED_VERSION_FLAG;
}
}
trace!("put_wal_record lsn: {}", key.lsn);
let serialized_key = key.ser().expect("serialize CacheKey should always succeed");
let _res = self.db.put(serialized_key, content.to_bytes());
trace!(
"put_page_image rel {} blk {} at {}",
tag.rel,
tag.blknum,
lsn
);
self.num_page_images.fetch_add(1, Ordering::Relaxed);
}
/// Remember that WAL has been received and added to the timeline up to the given LSN
fn advance_last_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last valid LSN backwards (was {}, new {})",
old, lsn
);
}
}
///
/// Remember the (end of) last valid WAL record remembered for the timeline.
///
/// NOTE: this updates last_valid_lsn as well.
///
fn advance_last_record_lsn(&self, lsn: Lsn) {
// Can't move backwards.
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old <= lsn);
// Also advance last_valid_lsn
let old = self.last_valid_lsn.advance(lsn);
// Can't move backwards.
if lsn < old {
warn!(
"attempted to move last record LSN backwards (was {}, new {})",
old, lsn
);
}
}
fn get_last_record_lsn(&self) -> Lsn {
self.last_record_lsn.load()
}
fn init_valid_lsn(&self, lsn: Lsn) {
let old = self.last_valid_lsn.advance(lsn);
assert!(old == Lsn(0));
let old = self.last_record_lsn.fetch_max(lsn);
assert!(old == Lsn(0));
}
fn get_last_valid_lsn(&self) -> Lsn {
self.last_valid_lsn.load()
}
// Flush all the changes written so far with PUT functions to disk.
// RocksDB writes out things as we go (?), so we don't need to do much here. We just
// write out the last valid and record LSNs.
fn checkpoint(&self) -> Result<()> {
let last_valid_lsn = self.last_valid_lsn.load();
self.db
.put(LAST_VALID_LSN_KEY.ser()?, last_valid_lsn.to_string())?;
self.db.put(
LAST_VALID_RECORD_LSN_KEY.ser()?,
self.last_record_lsn.load().to_string(),
)?;
trace!("checkpoint at {}", last_valid_lsn);
Ok(())
}
//
// Get statistics to be displayed in the user interface.
//
// FIXME
/*
fn get_stats(&self) -> TimelineStats {
TimelineStats {
num_entries: self.num_entries.load(Ordering::Relaxed),
num_page_images: self.num_page_images.load(Ordering::Relaxed),
num_wal_records: self.num_wal_records.load(Ordering::Relaxed),
num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed),
}
}
*/
}

View File

@@ -145,7 +145,7 @@ fn import_relfile(
},
blknum,
};
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf));
timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?;
/*
if oldest_lsn == 0 || p.lsn < oldest_lsn {
oldest_lsn = p.lsn;
@@ -273,7 +273,7 @@ pub fn save_decoded_record(
main_data_offset: decoded.main_data_offset as u32,
};
timeline.put_wal_record(tag, rec);
timeline.put_wal_record(tag, rec)?;
}
// Handle a few special record types
@@ -359,7 +359,7 @@ fn save_create_database(
info!("copying block {:?} to {:?}", src_key, dst_key);
timeline.put_page_image(dst_key, lsn, content);
timeline.put_page_image(dst_key, lsn, content)?;
num_blocks_copied += 1;
}

View File

@@ -0,0 +1,208 @@
//!
//! An implementation of the ObjectStore interface, backed by RocksDB
//!
use crate::object_store::{ObjectKey, ObjectStore};
use crate::repository::{BufferTag, RelTag};
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StorageKey {
obj_key: ObjectKey,
lsn: Lsn,
}
pub struct RocksObjectStore {
_conf: &'static PageServerConf,
// RocksDB handle
db: rocksdb::DB,
}
impl ObjectStore for RocksObjectStore {
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>> {
let val = self.db.get(StorageKey::ser(&StorageKey {
obj_key: key.clone(),
lsn,
})?)?;
if let Some(val) = val {
Ok(val)
} else {
bail!("could not find page {:?}", key);
}
}
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> {
self.db.put(
StorageKey::ser(&StorageKey {
obj_key: key.clone(),
lsn,
})?,
value,
)?;
Ok(())
}
/// Iterate through page versions of given page, starting from the given LSN.
/// The versions are walked in descending LSN order.
fn object_versions<'a>(
&'a self,
key: &ObjectKey,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>> {
let iter = RocksObjectVersionIter::new(&self.db, key, lsn)?;
Ok(Box::new(iter))
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
/// through all entries in the given database. In practice, this
/// is used for CREATE DATABASE, and usually the template database is small.
/// But if it's not, this will be slow.
fn list_rels(
&self,
timelineid: ZTimelineId,
spcnode: u32,
dbnode: u32,
lsn: Lsn,
) -> Result<HashSet<RelTag>> {
// FIXME: This scans everything. Very slow
let mut rels: HashSet<RelTag> = HashSet::new();
let searchkey = StorageKey {
obj_key: ObjectKey {
timeline: timelineid,
buf_tag: BufferTag {
rel: RelTag {
spcnode,
dbnode,
relnode: 0,
forknum: 0u8,
},
blknum: 0,
},
},
lsn: Lsn(0),
};
let mut iter = self.db.raw_iterator();
iter.seek(searchkey.ser()?);
while iter.valid() {
let key = StorageKey::des(iter.key().unwrap())?;
if key.obj_key.buf_tag.rel.spcnode != spcnode
|| key.obj_key.buf_tag.rel.dbnode != dbnode
{
break;
}
if key.lsn < lsn {
rels.insert(key.obj_key.buf_tag.rel);
}
iter.next();
}
Ok(rels)
}
}
impl RocksObjectStore {
/// Open a RocksDB database.
pub fn open(conf: &'static PageServerConf) -> Result<RocksObjectStore> {
let path = conf.workdir.join("rocksdb-storage");
let db = rocksdb::DB::open(&Self::get_rocksdb_opts(), path)?;
let storage = RocksObjectStore { _conf: conf, db };
Ok(storage)
}
/// Create a new, empty RocksDB database.
pub fn create(conf: &'static PageServerConf) -> Result<RocksObjectStore> {
let path = conf.workdir.join("rocksdb-storage");
std::fs::create_dir(&path)?;
let mut opts = Self::get_rocksdb_opts();
opts.create_if_missing(true);
opts.set_error_if_exists(true);
let db = rocksdb::DB::open(&opts, &path)?;
let obj_store = RocksObjectStore { _conf: conf, db };
Ok(obj_store)
}
/// common options used by `open` and `create`
fn get_rocksdb_opts() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.set_use_fsync(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
// FIXME
/*
opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| {
if (val[0] & UNUSED_VERSION_FLAG) != 0 {
rocksdb::compaction_filter::Decision::Remove
} else {
rocksdb::compaction_filter::Decision::Keep
}
});
*/
opts
}
}
///
/// Iterator for `object_versions`. Returns all page versions of a given block, in
/// reverse LSN order.
///
struct RocksObjectVersionIter<'a> {
obj_key: ObjectKey,
dbiter: rocksdb::DBRawIterator<'a>,
first_call: bool,
}
impl<'a> RocksObjectVersionIter<'a> {
fn new(
db: &'a rocksdb::DB,
obj_key: &ObjectKey,
lsn: Lsn,
) -> Result<RocksObjectVersionIter<'a>> {
let key = StorageKey {
obj_key: obj_key.clone(),
lsn,
};
let mut dbiter = db.raw_iterator();
dbiter.seek_for_prev(StorageKey::ser(&key)?); // locate last entry
Ok(RocksObjectVersionIter {
first_call: true,
obj_key: obj_key.clone(),
dbiter,
})
}
}
impl<'a> Iterator for RocksObjectVersionIter<'a> {
type Item = (Lsn, Vec<u8>);
fn next(&mut self) -> std::option::Option<Self::Item> {
if self.first_call {
self.first_call = false;
} else {
self.dbiter.prev(); // walk backwards
}
if !self.dbiter.valid() {
return None;
}
let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap();
if key.obj_key.buf_tag != self.obj_key.buf_tag {
return None;
}
let val = self.dbiter.value().unwrap();
let result = val.to_vec();
Some((key.lsn, result))
}
}

View File

@@ -186,10 +186,6 @@ fn walreceiver_main(
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let decoded = decode_wal_record(recdata.clone());
restore_local_repo::save_decoded_record(&*timeline, decoded, recdata, lsn)?;
// Now that this record has been handled, let the page cache know that
// it is up-to-date to this LSN
timeline.advance_last_record_lsn(lsn);
last_rec_lsn = lsn;
}

View File

@@ -259,7 +259,7 @@ impl PostgresRedoManagerInternal {
let result: Result<Bytes, WalRedoError>;
trace!(
debug!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),