Compare commits

...

6 Commits

Author SHA1 Message Date
Konstantin Knizhnik
de02cc9ee4 Add in-memory storage engine 2021-07-22 10:59:30 +03:00
Konstantin Knizhnik
96e73fb585 Add in-memory storage engine-a 2021-07-22 10:59:28 +03:00
Konstantin Knizhnik
a533d22f71 Do not update relation metadata when materializing page in get_page_at_lsn 2021-07-20 23:03:35 +03:00
Konstantin Knizhnik
0ab4792943 Fix integration tests 2021-07-20 22:23:24 +03:00
Konstantin Knizhnik
c60e3e2337 Fix unit tests 2021-07-20 19:29:28 +03:00
Konstantin Knizhnik
1e0e3fbde0 Spawn multiple wal-redo postgres instances 2021-07-20 19:15:55 +03:00
15 changed files with 545 additions and 67 deletions

View File

@@ -132,7 +132,9 @@ impl<'a> Basebackup<'a> {
tag: &ObjectTag, tag: &ObjectTag,
page: u32, page: u32,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; let img = self
.timeline
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
// Zero length image indicates truncated segment: just skip it // Zero length image indicates truncated segment: just skip it
if !img.is_empty() { if !img.is_empty() {
assert!(img.len() == pg_constants::BLCKSZ as usize); assert!(img.len() == pg_constants::BLCKSZ as usize);
@@ -172,7 +174,9 @@ impl<'a> Basebackup<'a> {
// Extract pg_filenode.map files from repository // Extract pg_filenode.map files from repository
// //
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; let img = self
.timeline
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
info!("add_relmap_file {:?}", db); info!("add_relmap_file {:?}", db);
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map") // filenode map for global tablespace String::from("global/pg_filenode.map") // filenode map for global tablespace
@@ -198,7 +202,9 @@ impl<'a> Basebackup<'a> {
if self.timeline.get_tx_status(xid, self.lsn)? if self.timeline.get_tx_status(xid, self.lsn)?
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS == pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{ {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; let img = self
.timeline
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]); buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]); let crc = crc32c::crc32c(&img[..]);
@@ -214,12 +220,12 @@ impl<'a> Basebackup<'a> {
// Add generated pg_control file // Add generated pg_control file
// //
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self let checkpoint_bytes =
.timeline self.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?; .get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?;
let pg_control_bytes = self let pg_control_bytes =
.timeline self.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;

View File

@@ -28,12 +28,15 @@ const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
const DEFAULT_WAL_REDOERS: usize = 1;
/// String arguments that can be declared via CLI or config file /// String arguments that can be declared via CLI or config file
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct CfgFileParams { struct CfgFileParams {
listen_addr: Option<String>, listen_addr: Option<String>,
gc_horizon: Option<String>, gc_horizon: Option<String>,
gc_period: Option<String>, gc_period: Option<String>,
wal_redoers: Option<String>,
pg_distrib_dir: Option<String>, pg_distrib_dir: Option<String>,
} }
@@ -48,6 +51,7 @@ impl CfgFileParams {
listen_addr: get_arg("listen"), listen_addr: get_arg("listen"),
gc_horizon: get_arg("gc_horizon"), gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"), gc_period: get_arg("gc_period"),
wal_redoers: get_arg("wal_redoers"),
pg_distrib_dir: get_arg("postgres-distrib"), pg_distrib_dir: get_arg("postgres-distrib"),
} }
} }
@@ -59,6 +63,7 @@ impl CfgFileParams {
listen_addr: self.listen_addr.or(other.listen_addr), listen_addr: self.listen_addr.or(other.listen_addr),
gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period), gc_period: self.gc_period.or(other.gc_period),
wal_redoers: self.wal_redoers.or(other.wal_redoers),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
} }
} }
@@ -79,6 +84,11 @@ impl CfgFileParams {
None => DEFAULT_GC_PERIOD, None => DEFAULT_GC_PERIOD,
}; };
let wal_redoers = match self.wal_redoers.as_ref() {
Some(wal_redoers_str) => wal_redoers_str.parse::<usize>()?,
None => DEFAULT_WAL_REDOERS,
};
let pg_distrib_dir = match self.pg_distrib_dir.as_ref() { let pg_distrib_dir = match self.pg_distrib_dir.as_ref() {
Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str), Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str),
None => env::current_dir()?.join("tmp_install"), None => env::current_dir()?.join("tmp_install"),
@@ -91,11 +101,12 @@ impl CfgFileParams {
Ok(PageServerConf { Ok(PageServerConf {
daemonize: false, daemonize: false,
interactive: false, interactive: false,
materialize: false,
listen_addr, listen_addr,
gc_horizon, gc_horizon,
gc_period, gc_period,
wal_redoers,
workdir: PathBuf::from("."), workdir: PathBuf::from("."),
pg_distrib_dir, pg_distrib_dir,
@@ -120,6 +131,12 @@ fn main() -> Result<()> {
.takes_value(false) .takes_value(false)
.help("Interactive mode"), .help("Interactive mode"),
) )
.arg(
Arg::with_name("materialize")
.long("materialize")
.takes_value(false)
.help("Materialize pages constructed by get_page_at"),
)
.arg( .arg(
Arg::with_name("daemonize") Arg::with_name("daemonize")
.short("d") .short("d")
@@ -145,6 +162,12 @@ fn main() -> Result<()> {
.takes_value(true) .takes_value(true)
.help("Interval between garbage collector iterations"), .help("Interval between garbage collector iterations"),
) )
.arg(
Arg::with_name("wal_redoers")
.long("wal_redoers")
.takes_value(true)
.help("Number of wal-redo postgres instances"),
)
.arg( .arg(
Arg::with_name("workdir") Arg::with_name("workdir")
.short("D") .short("D")
@@ -181,6 +204,7 @@ fn main() -> Result<()> {
conf.daemonize = arg_matches.is_present("daemonize"); conf.daemonize = arg_matches.is_present("daemonize");
conf.interactive = arg_matches.is_present("interactive"); conf.interactive = arg_matches.is_present("interactive");
conf.materialize = arg_matches.is_present("materialize");
if init && (conf.daemonize || conf.interactive) { if init && (conf.daemonize || conf.interactive) {
eprintln!("--daemonize and --interactive may not be used with --init"); eprintln!("--daemonize and --interactive may not be used with --init");

View File

@@ -101,6 +101,7 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
// rapid init+start case now, but the general race condition remains if you restart the // rapid init+start case now, but the general race condition remains if you restart the
// server quickly. // server quickly.
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?; let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
//let storage = crate::inmem_storage::InmemObjectStore::create(conf)?;
let repo = crate::object_repository::ObjectRepository::new( let repo = crate::object_repository::ObjectRepository::new(
conf, conf,

View File

@@ -0,0 +1,345 @@
//!
//! An implementation of the ObjectStore interface, backed by BTreeMap
//!
use crate::object_key::*;
use crate::object_store::ObjectStore;
use crate::repository::RelTag;
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet};
use std::fs::File;
use std::io::prelude::*;
use std::ops::Bound::*;
use std::sync::RwLock;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct StorageKey {
obj_key: ObjectKey,
lsn: Lsn,
}
impl StorageKey {
/// The first key for a given timeline
fn timeline_start(timeline: ZTimelineId) -> Self {
Self {
obj_key: ObjectKey {
timeline,
tag: ObjectTag::FirstTag,
},
lsn: Lsn(0),
}
}
}
pub struct InmemObjectStore {
conf: &'static PageServerConf,
db: RwLock<BTreeMap<StorageKey, Vec<u8>>>,
}
impl ObjectStore for InmemObjectStore {
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>> {
let db = self.db.read().unwrap();
let val = db.get(&StorageKey {
obj_key: key.clone(),
lsn,
});
if let Some(val) = val {
Ok(val.clone())
} else {
bail!("could not find page {:?}", key);
}
}
fn get_next_key(&self, key: &ObjectKey) -> Result<Option<ObjectKey>> {
let search_key = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
let db = self.db.read().unwrap();
for pair in db.range(&search_key..) {
let key = pair.0;
return Ok(Some(key.obj_key.clone()));
}
Ok(None)
}
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> {
let mut db = self.db.write().unwrap();
db.insert(
StorageKey {
obj_key: key.clone(),
lsn,
},
value.to_vec(),
);
Ok(())
}
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> {
let mut db = self.db.write().unwrap();
db.remove(&StorageKey {
obj_key: key.clone(),
lsn,
});
Ok(())
}
/// Iterate through page versions of given page, starting from the given LSN.
/// The versions are walked in descending LSN order.
fn object_versions<'a>(
&'a self,
key: &ObjectKey,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>> {
let from = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
let till = StorageKey {
obj_key: key.clone(),
lsn,
};
let db = self.db.read().unwrap();
let versions: Vec<(Lsn, Vec<u8>)> = db
.range(from..=till)
.map(|pair| (pair.0.lsn, pair.1.clone()))
.collect();
Ok(Box::new(InmemObjectVersionIter::new(versions)))
}
/// Iterate through all timeline objects
fn list_objects<'a>(
&'a self,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
let curr_key = StorageKey::timeline_start(timeline);
Ok(Box::new(InmemObjectIter {
store: &self,
curr_key,
timeline,
nonrel_only,
lsn,
}))
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
/// through all entries in the given database. In practice, this
/// is used for CREATE DATABASE, and usually the template database is small.
/// But if it's not, this will be slow.
fn list_rels(
&self,
timelineid: ZTimelineId,
spcnode: u32,
dbnode: u32,
lsn: Lsn,
) -> Result<HashSet<RelTag>> {
// FIXME: This scans everything. Very slow
let mut rels: HashSet<RelTag> = HashSet::new();
let mut search_rel_tag = RelTag {
spcnode,
dbnode,
relnode: 0,
forknum: 0u8,
};
let db = self.db.read().unwrap();
'outer: loop {
let search_key = StorageKey {
obj_key: ObjectKey {
timeline: timelineid,
tag: ObjectTag::RelationMetadata(search_rel_tag),
},
lsn: Lsn(0),
};
for pair in db.range(&search_key..) {
let key = pair.0;
if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag {
if spcnode != 0 && rel_tag.spcnode != spcnode
|| dbnode != 0 && rel_tag.dbnode != dbnode
{
break 'outer;
}
if key.lsn <= lsn {
// visible in this snapshot
rels.insert(rel_tag);
}
search_rel_tag = rel_tag;
// skip to next relation
// FIXME: What if relnode is u32::MAX ?
search_rel_tag.relnode += 1;
continue 'outer;
} else {
// no more relation metadata entries
break 'outer;
}
}
}
Ok(rels)
}
/// Iterate through versions of all objects in a timeline.
///
/// Returns objects in increasing key-version order.
/// Returns all versions up to and including the specified LSN.
fn objects<'a>(
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>> {
let curr_key = StorageKey::timeline_start(timeline);
Ok(Box::new(InmemObjects {
store: &self,
curr_key,
timeline,
lsn,
}))
}
fn compact(&self) {}
}
impl Drop for InmemObjectStore {
fn drop(&mut self) {
let path = self.conf.workdir.join("objstore.dmp");
let mut f = File::create(path).unwrap();
f.write(&self.db.ser().unwrap()).unwrap();
}
}
impl InmemObjectStore {
pub fn open(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
let path = conf.workdir.join("objstore.dmp");
let mut f = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer)?;
let db = RwLock::new(BTreeMap::des(&buffer)?);
Ok(InmemObjectStore { conf: conf, db })
}
pub fn create(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
Ok(InmemObjectStore {
conf: conf,
db: RwLock::new(BTreeMap::new()),
})
}
}
///
/// Iterator for `object_versions`. Returns all page versions of a given block, in
/// reverse LSN order.
///
struct InmemObjectVersionIter {
versions: Vec<(Lsn, Vec<u8>)>,
curr: usize,
}
impl InmemObjectVersionIter {
fn new(versions: Vec<(Lsn, Vec<u8>)>) -> InmemObjectVersionIter {
let curr = versions.len();
InmemObjectVersionIter { versions, curr }
}
}
impl Iterator for InmemObjectVersionIter {
type Item = (Lsn, Vec<u8>);
fn next(&mut self) -> std::option::Option<Self::Item> {
if self.curr == 0 {
None
} else {
self.curr -= 1;
Some(self.versions[self.curr].clone())
}
}
}
struct InmemObjects<'r> {
store: &'r InmemObjectStore,
curr_key: StorageKey,
timeline: ZTimelineId,
lsn: Lsn,
}
impl<'r> Iterator for InmemObjects<'r> {
// TODO consider returning Box<[u8]>
type Item = Result<(ObjectTag, Lsn, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
}
}
impl<'r> InmemObjects<'r> {
fn next_result(&mut self) -> Result<Option<(ObjectTag, Lsn, Vec<u8>)>> {
let db = self.store.db.read().unwrap();
for pair in db.range((Excluded(&self.curr_key), Unbounded)) {
let key = pair.0;
if key.obj_key.timeline != self.timeline {
return Ok(None);
}
if key.lsn > self.lsn {
// TODO can speed up by seeking iterator
continue;
}
self.curr_key = key.clone();
let value = pair.1.clone();
return Ok(Some((key.obj_key.tag, key.lsn, value)));
}
Ok(None)
}
}
///
/// Iterator for `list_objects`. Returns all objects preceeding specified LSN
///
struct InmemObjectIter<'a> {
store: &'a InmemObjectStore,
curr_key: StorageKey,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
}
impl<'a> Iterator for InmemObjectIter<'a> {
type Item = ObjectTag;
fn next(&mut self) -> std::option::Option<Self::Item> {
let db = self.store.db.read().unwrap();
'outer: loop {
for pair in db.range((Excluded(&self.curr_key), Unbounded)) {
let key = pair.0;
if key.obj_key.timeline != self.timeline {
return None;
}
self.curr_key = key.clone();
self.curr_key.lsn = Lsn(u64::MAX); // next seek should skip all versions
if key.lsn <= self.lsn {
// visible in this snapshot
if self.nonrel_only {
match key.obj_key.tag {
ObjectTag::RelationMetadata(_) => return None,
ObjectTag::RelationBuffer(_) => return None,
_ => return Some(key.obj_key.tag),
}
} else {
return Some(key.obj_key.tag);
}
}
continue 'outer;
}
return None;
}
}
}

View File

@@ -7,6 +7,7 @@ use std::time::Duration;
pub mod basebackup; pub mod basebackup;
pub mod branches; pub mod branches;
pub mod inmem_storage;
pub mod object_key; pub mod object_key;
pub mod object_repository; pub mod object_repository;
pub mod object_store; pub mod object_store;
@@ -26,9 +27,11 @@ pub mod walredo;
pub struct PageServerConf { pub struct PageServerConf {
pub daemonize: bool, pub daemonize: bool,
pub interactive: bool, pub interactive: bool,
pub materialize: bool,
pub listen_addr: String, pub listen_addr: String,
pub gc_horizon: u64, pub gc_horizon: u64,
pub gc_period: Duration, pub gc_period: Duration,
pub wal_redoers: usize,
// Repository directory, relative to current working directory. // Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory // Normally, the page server changes the current working directory
@@ -103,7 +106,7 @@ impl PageServerConf {
/// is separate from PostgreSQL timelines, and doesn't have those /// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which /// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string. /// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ZTimelineId([u8; 16]); pub struct ZTimelineId([u8; 16]);
impl FromStr for ZTimelineId { impl FromStr for ZTimelineId {

View File

@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
/// repository. It is shared between object_repository.rs and object_store.rs. /// repository. It is shared between object_repository.rs and object_store.rs.
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects /// It is mostly opaque to ObjectStore, it just stores and retrieves objects
/// using the key given by the caller. /// using the key given by the caller.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct ObjectKey { pub struct ObjectKey {
pub timeline: ZTimelineId, pub timeline: ZTimelineId,
pub tag: ObjectTag, pub tag: ObjectTag,

View File

@@ -71,6 +71,7 @@ impl Repository for ObjectRepository {
Some(timeline) => Ok(timeline.clone()), Some(timeline) => Ok(timeline.clone()),
None => { None => {
let timeline = ObjectTimeline::open( let timeline = ObjectTimeline::open(
self.conf,
Arc::clone(&self.obj_store), Arc::clone(&self.obj_store),
timelineid, timelineid,
self.walredo_mgr.clone(), self.walredo_mgr.clone(),
@@ -124,6 +125,7 @@ impl Repository for ObjectRepository {
info!("Created empty timeline {}", timelineid); info!("Created empty timeline {}", timelineid);
let timeline = ObjectTimeline::open( let timeline = ObjectTimeline::open(
self.conf,
Arc::clone(&self.obj_store), Arc::clone(&self.obj_store),
timelineid, timelineid,
self.walredo_mgr.clone(), self.walredo_mgr.clone(),
@@ -163,7 +165,7 @@ impl Repository for ObjectRepository {
match tag { match tag {
ObjectTag::TimelineMetadataTag => {} // skip it ObjectTag::TimelineMetadataTag => {} // skip it
_ => { _ => {
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?; let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn, false)?;
let val = ObjectValue::Page(PageEntry::Page(img)); let val = ObjectValue::Page(PageEntry::Page(img));
let key = ObjectKey { timeline: dst, tag }; let key = ObjectKey { timeline: dst, tag };
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?; self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
@@ -197,6 +199,7 @@ pub struct ObjectTimeline {
// Backing key-value store // Backing key-value store
obj_store: Arc<dyn ObjectStore>, obj_store: Arc<dyn ObjectStore>,
conf: &'static PageServerConf,
// WAL redo manager, for reconstructing page versions from WAL records. // WAL redo manager, for reconstructing page versions from WAL records.
walredo_mgr: Arc<dyn WalRedoManager>, walredo_mgr: Arc<dyn WalRedoManager>,
@@ -231,6 +234,7 @@ impl ObjectTimeline {
/// ///
/// Loads the metadata for the timeline into memory. /// Loads the metadata for the timeline into memory.
fn open( fn open(
conf: &'static PageServerConf,
obj_store: Arc<dyn ObjectStore>, obj_store: Arc<dyn ObjectStore>,
timelineid: ZTimelineId, timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>, walredo_mgr: Arc<dyn WalRedoManager>,
@@ -244,6 +248,7 @@ impl ObjectTimeline {
let timeline = ObjectTimeline { let timeline = ObjectTimeline {
timelineid, timelineid,
obj_store, obj_store,
conf,
walredo_mgr, walredo_mgr,
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
@@ -265,10 +270,15 @@ impl Timeline for ObjectTimeline {
fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> { fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
let lsn = self.wait_lsn(req_lsn)?; let lsn = self.wait_lsn(req_lsn)?;
self.get_page_at_lsn_nowait(tag, lsn) self.get_page_at_lsn_nowait(tag, lsn, self.conf.materialize)
} }
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> { fn get_page_at_lsn_nowait(
&self,
tag: ObjectTag,
req_lsn: Lsn,
materialize: bool,
) -> Result<Bytes> {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
// Look up the page entry. If it's a page image, return that. If it's a WAL record, // Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records. // ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -290,11 +300,13 @@ impl Timeline for ObjectTimeline {
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
// Garbage collection assumes that we remember the materialized page if materialize {
// version. Otherwise we could opt to not do it, with the downside that // Garbage collection assumes that we remember the materialized page
// the next GetPage@LSN call of the same page version would have to // version. Otherwise we could opt to not do it, with the downside that
// redo the WAL again. // the next GetPage@LSN call of the same page version would have to
self.put_page_image(tag, lsn, page_img.clone(), false)?; // redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone(), false)?;
}
} }
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE), ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
_ => bail!("Invalid object kind, expected a page entry or SLRU truncate"), _ => bail!("Invalid object kind, expected a page entry or SLRU truncate"),
@@ -712,7 +724,7 @@ impl Timeline for ObjectTimeline {
{ {
if rel_size > tag.blknum { if rel_size > tag.blknum {
// preserve and materialize last version before deleting all preceeding // preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?; self.get_page_at_lsn_nowait(obj, lsn, true)?;
continue; continue;
} }
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size); debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
@@ -755,7 +767,7 @@ impl Timeline for ObjectTimeline {
} }
ObjectValue::Page(PageEntry::WALRecord(_)) => { ObjectValue::Page(PageEntry::WALRecord(_)) => {
// preserve and materialize last version before deleting all preceeding // preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?; self.get_page_at_lsn_nowait(obj, lsn, true)?;
} }
_ => {} // do nothing if already materialized _ => {} // do nothing if already materialized
} }

View File

@@ -6,6 +6,7 @@
use crate::object_repository::ObjectRepository; use crate::object_repository::ObjectRepository;
use crate::repository::Repository; use crate::repository::Repository;
use crate::rocksdb_storage::RocksObjectStore; use crate::rocksdb_storage::RocksObjectStore;
//use crate::inmem_storage::InmemObjectStore;
use crate::walredo::PostgresRedoManager; use crate::walredo::PostgresRedoManager;
use crate::PageServerConf; use crate::PageServerConf;
use lazy_static::lazy_static; use lazy_static::lazy_static;
@@ -19,6 +20,7 @@ pub fn init(conf: &'static PageServerConf) {
let mut m = REPOSITORY.lock().unwrap(); let mut m = REPOSITORY.lock().unwrap();
let obj_store = RocksObjectStore::open(conf).unwrap(); let obj_store = RocksObjectStore::open(conf).unwrap();
//let obj_store = InmemObjectStore::open(conf).unwrap();
// Set up a WAL redo manager, for applying WAL records. // Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf); let walredo_mgr = PostgresRedoManager::new(conf);

View File

@@ -59,7 +59,7 @@ pub trait Timeline: Send + Sync {
fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>; fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Look up given page in the cache. /// Look up given page in the cache.
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>; fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn, materialize: bool) -> Result<Bytes>;
/// Get size of relation /// Get size of relation
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>; fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
@@ -144,13 +144,13 @@ pub trait Timeline: Send + Sync {
/// but it can be explicitly requested through page server API. /// but it can be explicitly requested through page server API.
/// ///
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
/// `compact` parameter is used to force compaction of storage. /// `compact` parameter is used to force compaction of storage.
/// Some storage implementation are based on LSM tree and require periodic merge (compaction). /// Some storage implementation are based on LSM tree and require periodic merge (compaction).
/// Usually storage implementation determines itself when compaction should be performed. /// Usually storage implementation determines itself when compaction should be performed.
/// But for GC tests it way be useful to force compaction just after completion of GC iteration /// But for GC tests it way be useful to force compaction just after completion of GC iteration
/// to make sure that all detected garbage is removed. /// to make sure that all detected garbage is removed.
/// So right now `compact` is set to true when GC explicitly requested through page srver API, /// So right now `compact` is set to true when GC explicitly requested through page srver API,
/// and is st to false in GC threads which infinitely repeats GC iterations in loop. /// and is st to false in GC threads which infinitely repeats GC iterations in loop.
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult>; fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult>;
// Check transaction status // Check transaction status
@@ -353,8 +353,10 @@ mod tests {
let conf = PageServerConf { let conf = PageServerConf {
daemonize: false, daemonize: false,
interactive: false, interactive: false,
materialize: false,
gc_horizon: 64 * 1024 * 1024, gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10), gc_period: Duration::from_secs(10),
wal_redoers: 1,
listen_addr: "127.0.0.1:5430".to_string(), listen_addr: "127.0.0.1:5430".to_string(),
workdir: repo_dir, workdir: repo_dir,
pg_distrib_dir: "".into(), pg_distrib_dir: "".into(),

View File

@@ -70,7 +70,7 @@ pub fn import_timeline_from_postgres_datadir(
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?; import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?;
// Extract checkpoint record from pg_control and store is as separate object // Extract checkpoint record from pg_control and store is as separate object
let pg_control_bytes = let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn)?; timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn, false)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?; let pg_control = ControlFileData::decode(&pg_control_bytes)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode(); let checkpoint_bytes = pg_control.checkPointCopy.encode();
timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?; timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?;
@@ -298,7 +298,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint; let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; let checkpoint_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
loop { loop {
@@ -578,7 +579,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
blknum, blknum,
}); });
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?; let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn, false)?;
debug!("copying block {:?} to {:?}", src_key, dst_key); debug!("copying block {:?} to {:?}", src_key, dst_key);
@@ -598,7 +599,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
match tag { match tag {
ObjectTag::FileNodeMap(db) => { ObjectTag::FileNodeMap(db) => {
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id { if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?; let img = timeline.get_page_at_lsn_nowait(tag, req_lsn, false)?;
let new_tag = ObjectTag::FileNodeMap(DatabaseTag { let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: tablespace_id, spcnode: tablespace_id,
dbnode: db_id, dbnode: db_id,

View File

@@ -169,7 +169,8 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint); let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; let checkpoint_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);

View File

@@ -26,6 +26,7 @@ use std::io::Error;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Stdio; use std::process::Stdio;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@@ -103,13 +104,16 @@ struct PostgresRedoManagerInternal {
} }
#[derive(Debug)] #[derive(Debug)]
struct WalRedoRequest { struct WalRedoRequestData {
tag: ObjectTag, tag: ObjectTag,
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<WALRecord>, records: Vec<WALRecord>,
}
#[derive(Debug)]
struct WalRedoRequest {
data: WalRedoRequestData,
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>, response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
} }
@@ -175,10 +179,12 @@ impl WalRedoManager for PostgresRedoManager {
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>(); let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest { let request = WalRedoRequest {
tag, data: WalRedoRequestData {
lsn, tag,
base_img, lsn,
records, base_img,
records,
},
response_channel: tx, response_channel: tx,
}; };
@@ -229,40 +235,64 @@ impl PostgresRedoManagerInternal {
.build() .build()
.unwrap(); .unwrap();
let process: PostgresRedoProcess; let processes: Vec<PostgresRedoProcess>;
info!("launching WAL redo postgres process"); info!("launching WAL redo postgres process");
process = runtime let wal_redoers = self.conf.wal_redoers;
.block_on(PostgresRedoProcess::launch(self.conf)) processes = (0..wal_redoers)
.unwrap(); .map(|i| {
runtime
.block_on(PostgresRedoProcess::launch(self.conf, i))
.unwrap()
})
.collect();
// Loop forever, handling requests as they come. // Loop forever, handling requests as they come.
loop { loop {
let request = self let mut requests: Vec<WalRedoRequest> = Vec::new();
.request_rx requests.push(
.recv() self.request_rx
.expect("WAL redo request channel was closed"); .recv()
.expect("WAL redo request channel was closed"),
);
loop {
let req = self.request_rx.try_recv();
match req {
Ok(req) => requests.push(req),
Err(_) => break,
}
}
let request_data = requests.iter().map(|req| &req.data);
let mut rr = 0; // round robin
let results = runtime.block_on(async {
let futures = request_data.map(|req| {
rr += 1;
self.handle_apply_request(&processes[rr % wal_redoers], &req)
});
let mut results: Vec<Result<Bytes, WalRedoError>> = Vec::new();
for future in futures {
results.push(future.await);
}
results
});
for (result, request) in results.into_iter().zip(requests.iter()) {
let result_ok = result.is_ok();
let result = runtime.block_on(self.handle_apply_request(&process, &request)); // Send the result to the requester
let result_ok = result.is_ok(); let _ = request.response_channel.send(result);
// Send the result to the requester if !result_ok {
let _ = request.response_channel.send(result); error!("wal-redo-postgres failed to apply request {:?}", request);
}
if !result_ok {
error!("wal-redo-postgres failed to apply request {:?}", request);
} }
} }
} }
///
/// Process one request for WAL redo.
///
async fn handle_apply_request( async fn handle_apply_request(
&self, &self,
process: &PostgresRedoProcess, process: &PostgresRedoProcess,
request: &WalRedoRequest, request: &WalRedoRequestData,
) -> Result<Bytes, WalRedoError> { ) -> Result<Bytes, WalRedoError> {
let tag = request.tag; let tag = request.tag;
let lsn = request.lsn; let lsn = request.lsn;
@@ -446,19 +476,19 @@ impl PostgresRedoManagerInternal {
} }
struct PostgresRedoProcess { struct PostgresRedoProcess {
stdin: RefCell<ChildStdin>, stdin: Arc<RefCell<ChildStdin>>,
stdout: RefCell<ChildStdout>, stdout: Arc<RefCell<ChildStdout>>,
} }
impl PostgresRedoProcess { impl PostgresRedoProcess {
// //
// Start postgres binary in special WAL redo mode. // Start postgres binary in special WAL redo mode.
// //
async fn launch(conf: &PageServerConf) -> Result<PostgresRedoProcess, Error> { async fn launch(conf: &PageServerConf, id: usize) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than // just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently. // one WAL redo manager concurrently.
let datadir = conf.workdir.join("wal-redo-datadir"); let datadir = conf.workdir.join(format!("wal-redo-datadir-{}", id));
// Create empty data directory for wal-redo postgres, deleting old one first. // Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() { if datadir.exists() {
@@ -538,8 +568,8 @@ impl PostgresRedoProcess {
tokio::spawn(f_stderr); tokio::spawn(f_stderr);
Ok(PostgresRedoProcess { Ok(PostgresRedoProcess {
stdin: RefCell::new(stdin), stdin: Arc::new(RefCell::new(stdin)),
stdout: RefCell::new(stdout), stdout: Arc::new(RefCell::new(stdout)),
}) })
} }

View File

@@ -0,0 +1,25 @@
from contextlib import closing
import psycopg2.extras
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test insertion of larg number of records
#
# This test is pretty tightly coupled with the current implementation of page version storage
# and garbage collection in object_repository.rs.
#
def test_bulk_insert(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_bulk_insert", "empty"])
pg = postgres.create_start('test_bulk_insert')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)")
cur.execute("create index on t(c1)")
cur.execute("create index on t(c2)")
cur.execute("create index on t(c3)")
cur.execute("create index on t(c4)")
cur.execute("create index on t(c5)")
cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))")
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000,random()*1000000,random()*1000000,random()*1000000)")

View File

@@ -0,0 +1,26 @@
from contextlib import closing
import psycopg2.extras
import time
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test insertion of larg number of records
#
# This test is pretty tightly coupled with the current implementation of page version storage
# and garbage collection in object_repository.rs.
#
def test_seq_scan(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_seq_scan", "empty"])
pg = postgres.create_start('test_seq_scan')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)")
cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))")
cur.execute("set max_parallel_workers_per_gather=0");
for i in range(100):
start = time.time()
cur.execute("select count(*) from t");
stop = time.time()
print(f'Elapsed time for iterating through 1000000 records is {stop - start}')