Compare commits

...

6 Commits

Author SHA1 Message Date
Konstantin Knizhnik
de02cc9ee4 Add in-memory storage engine 2021-07-22 10:59:30 +03:00
Konstantin Knizhnik
96e73fb585 Add in-memory storage engine-a 2021-07-22 10:59:28 +03:00
Konstantin Knizhnik
a533d22f71 Do not update relation metadata when materializing page in get_page_at_lsn 2021-07-20 23:03:35 +03:00
Konstantin Knizhnik
0ab4792943 Fix integration tests 2021-07-20 22:23:24 +03:00
Konstantin Knizhnik
c60e3e2337 Fix unit tests 2021-07-20 19:29:28 +03:00
Konstantin Knizhnik
1e0e3fbde0 Spawn multiple wal-redo postgres instances 2021-07-20 19:15:55 +03:00
15 changed files with 545 additions and 67 deletions

View File

@@ -132,7 +132,9 @@ impl<'a> Basebackup<'a> {
tag: &ObjectTag,
page: u32,
) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let img = self
.timeline
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
// Zero length image indicates truncated segment: just skip it
if !img.is_empty() {
assert!(img.len() == pg_constants::BLCKSZ as usize);
@@ -172,7 +174,9 @@ impl<'a> Basebackup<'a> {
// Extract pg_filenode.map files from repository
//
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let img = self
.timeline
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
info!("add_relmap_file {:?}", db);
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map") // filenode map for global tablespace
@@ -198,7 +202,9 @@ impl<'a> Basebackup<'a> {
if self.timeline.get_tx_status(xid, self.lsn)?
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let img = self
.timeline
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
@@ -214,12 +220,12 @@ impl<'a> Basebackup<'a> {
// Add generated pg_control file
//
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?;
let pg_control_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
let checkpoint_bytes =
self.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?;
let pg_control_bytes =
self.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;

View File

@@ -28,12 +28,15 @@ const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
const DEFAULT_WAL_REDOERS: usize = 1;
/// String arguments that can be declared via CLI or config file
#[derive(Serialize, Deserialize)]
struct CfgFileParams {
listen_addr: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
wal_redoers: Option<String>,
pg_distrib_dir: Option<String>,
}
@@ -48,6 +51,7 @@ impl CfgFileParams {
listen_addr: get_arg("listen"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
wal_redoers: get_arg("wal_redoers"),
pg_distrib_dir: get_arg("postgres-distrib"),
}
}
@@ -59,6 +63,7 @@ impl CfgFileParams {
listen_addr: self.listen_addr.or(other.listen_addr),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
wal_redoers: self.wal_redoers.or(other.wal_redoers),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
}
}
@@ -79,6 +84,11 @@ impl CfgFileParams {
None => DEFAULT_GC_PERIOD,
};
let wal_redoers = match self.wal_redoers.as_ref() {
Some(wal_redoers_str) => wal_redoers_str.parse::<usize>()?,
None => DEFAULT_WAL_REDOERS,
};
let pg_distrib_dir = match self.pg_distrib_dir.as_ref() {
Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str),
None => env::current_dir()?.join("tmp_install"),
@@ -91,11 +101,12 @@ impl CfgFileParams {
Ok(PageServerConf {
daemonize: false,
interactive: false,
materialize: false,
listen_addr,
gc_horizon,
gc_period,
wal_redoers,
workdir: PathBuf::from("."),
pg_distrib_dir,
@@ -120,6 +131,12 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Interactive mode"),
)
.arg(
Arg::with_name("materialize")
.long("materialize")
.takes_value(false)
.help("Materialize pages constructed by get_page_at"),
)
.arg(
Arg::with_name("daemonize")
.short("d")
@@ -145,6 +162,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between garbage collector iterations"),
)
.arg(
Arg::with_name("wal_redoers")
.long("wal_redoers")
.takes_value(true)
.help("Number of wal-redo postgres instances"),
)
.arg(
Arg::with_name("workdir")
.short("D")
@@ -181,6 +204,7 @@ fn main() -> Result<()> {
conf.daemonize = arg_matches.is_present("daemonize");
conf.interactive = arg_matches.is_present("interactive");
conf.materialize = arg_matches.is_present("materialize");
if init && (conf.daemonize || conf.interactive) {
eprintln!("--daemonize and --interactive may not be used with --init");

View File

@@ -101,6 +101,7 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
// rapid init+start case now, but the general race condition remains if you restart the
// server quickly.
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
//let storage = crate::inmem_storage::InmemObjectStore::create(conf)?;
let repo = crate::object_repository::ObjectRepository::new(
conf,

View File

@@ -0,0 +1,345 @@
//!
//! An implementation of the ObjectStore interface, backed by BTreeMap
//!
use crate::object_key::*;
use crate::object_store::ObjectStore;
use crate::repository::RelTag;
use crate::PageServerConf;
use crate::ZTimelineId;
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashSet};
use std::fs::File;
use std::io::prelude::*;
use std::ops::Bound::*;
use std::sync::RwLock;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
pub struct StorageKey {
obj_key: ObjectKey,
lsn: Lsn,
}
impl StorageKey {
/// The first key for a given timeline
fn timeline_start(timeline: ZTimelineId) -> Self {
Self {
obj_key: ObjectKey {
timeline,
tag: ObjectTag::FirstTag,
},
lsn: Lsn(0),
}
}
}
pub struct InmemObjectStore {
conf: &'static PageServerConf,
db: RwLock<BTreeMap<StorageKey, Vec<u8>>>,
}
impl ObjectStore for InmemObjectStore {
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>> {
let db = self.db.read().unwrap();
let val = db.get(&StorageKey {
obj_key: key.clone(),
lsn,
});
if let Some(val) = val {
Ok(val.clone())
} else {
bail!("could not find page {:?}", key);
}
}
fn get_next_key(&self, key: &ObjectKey) -> Result<Option<ObjectKey>> {
let search_key = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
let db = self.db.read().unwrap();
for pair in db.range(&search_key..) {
let key = pair.0;
return Ok(Some(key.obj_key.clone()));
}
Ok(None)
}
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> {
let mut db = self.db.write().unwrap();
db.insert(
StorageKey {
obj_key: key.clone(),
lsn,
},
value.to_vec(),
);
Ok(())
}
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> {
let mut db = self.db.write().unwrap();
db.remove(&StorageKey {
obj_key: key.clone(),
lsn,
});
Ok(())
}
/// Iterate through page versions of given page, starting from the given LSN.
/// The versions are walked in descending LSN order.
fn object_versions<'a>(
&'a self,
key: &ObjectKey,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>> {
let from = StorageKey {
obj_key: key.clone(),
lsn: Lsn(0),
};
let till = StorageKey {
obj_key: key.clone(),
lsn,
};
let db = self.db.read().unwrap();
let versions: Vec<(Lsn, Vec<u8>)> = db
.range(from..=till)
.map(|pair| (pair.0.lsn, pair.1.clone()))
.collect();
Ok(Box::new(InmemObjectVersionIter::new(versions)))
}
/// Iterate through all timeline objects
fn list_objects<'a>(
&'a self,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
let curr_key = StorageKey::timeline_start(timeline);
Ok(Box::new(InmemObjectIter {
store: &self,
curr_key,
timeline,
nonrel_only,
lsn,
}))
}
/// Get a list of all distinct relations in given tablespace and database.
///
/// TODO: This implementation is very inefficient, it scans
/// through all entries in the given database. In practice, this
/// is used for CREATE DATABASE, and usually the template database is small.
/// But if it's not, this will be slow.
fn list_rels(
&self,
timelineid: ZTimelineId,
spcnode: u32,
dbnode: u32,
lsn: Lsn,
) -> Result<HashSet<RelTag>> {
// FIXME: This scans everything. Very slow
let mut rels: HashSet<RelTag> = HashSet::new();
let mut search_rel_tag = RelTag {
spcnode,
dbnode,
relnode: 0,
forknum: 0u8,
};
let db = self.db.read().unwrap();
'outer: loop {
let search_key = StorageKey {
obj_key: ObjectKey {
timeline: timelineid,
tag: ObjectTag::RelationMetadata(search_rel_tag),
},
lsn: Lsn(0),
};
for pair in db.range(&search_key..) {
let key = pair.0;
if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag {
if spcnode != 0 && rel_tag.spcnode != spcnode
|| dbnode != 0 && rel_tag.dbnode != dbnode
{
break 'outer;
}
if key.lsn <= lsn {
// visible in this snapshot
rels.insert(rel_tag);
}
search_rel_tag = rel_tag;
// skip to next relation
// FIXME: What if relnode is u32::MAX ?
search_rel_tag.relnode += 1;
continue 'outer;
} else {
// no more relation metadata entries
break 'outer;
}
}
}
Ok(rels)
}
/// Iterate through versions of all objects in a timeline.
///
/// Returns objects in increasing key-version order.
/// Returns all versions up to and including the specified LSN.
fn objects<'a>(
&'a self,
timeline: ZTimelineId,
lsn: Lsn,
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>> {
let curr_key = StorageKey::timeline_start(timeline);
Ok(Box::new(InmemObjects {
store: &self,
curr_key,
timeline,
lsn,
}))
}
fn compact(&self) {}
}
impl Drop for InmemObjectStore {
fn drop(&mut self) {
let path = self.conf.workdir.join("objstore.dmp");
let mut f = File::create(path).unwrap();
f.write(&self.db.ser().unwrap()).unwrap();
}
}
impl InmemObjectStore {
pub fn open(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
let path = conf.workdir.join("objstore.dmp");
let mut f = File::open(path)?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer)?;
let db = RwLock::new(BTreeMap::des(&buffer)?);
Ok(InmemObjectStore { conf: conf, db })
}
pub fn create(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
Ok(InmemObjectStore {
conf: conf,
db: RwLock::new(BTreeMap::new()),
})
}
}
///
/// Iterator for `object_versions`. Returns all page versions of a given block, in
/// reverse LSN order.
///
struct InmemObjectVersionIter {
versions: Vec<(Lsn, Vec<u8>)>,
curr: usize,
}
impl InmemObjectVersionIter {
fn new(versions: Vec<(Lsn, Vec<u8>)>) -> InmemObjectVersionIter {
let curr = versions.len();
InmemObjectVersionIter { versions, curr }
}
}
impl Iterator for InmemObjectVersionIter {
type Item = (Lsn, Vec<u8>);
fn next(&mut self) -> std::option::Option<Self::Item> {
if self.curr == 0 {
None
} else {
self.curr -= 1;
Some(self.versions[self.curr].clone())
}
}
}
struct InmemObjects<'r> {
store: &'r InmemObjectStore,
curr_key: StorageKey,
timeline: ZTimelineId,
lsn: Lsn,
}
impl<'r> Iterator for InmemObjects<'r> {
// TODO consider returning Box<[u8]>
type Item = Result<(ObjectTag, Lsn, Vec<u8>)>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
}
}
impl<'r> InmemObjects<'r> {
fn next_result(&mut self) -> Result<Option<(ObjectTag, Lsn, Vec<u8>)>> {
let db = self.store.db.read().unwrap();
for pair in db.range((Excluded(&self.curr_key), Unbounded)) {
let key = pair.0;
if key.obj_key.timeline != self.timeline {
return Ok(None);
}
if key.lsn > self.lsn {
// TODO can speed up by seeking iterator
continue;
}
self.curr_key = key.clone();
let value = pair.1.clone();
return Ok(Some((key.obj_key.tag, key.lsn, value)));
}
Ok(None)
}
}
///
/// Iterator for `list_objects`. Returns all objects preceeding specified LSN
///
struct InmemObjectIter<'a> {
store: &'a InmemObjectStore,
curr_key: StorageKey,
timeline: ZTimelineId,
nonrel_only: bool,
lsn: Lsn,
}
impl<'a> Iterator for InmemObjectIter<'a> {
type Item = ObjectTag;
fn next(&mut self) -> std::option::Option<Self::Item> {
let db = self.store.db.read().unwrap();
'outer: loop {
for pair in db.range((Excluded(&self.curr_key), Unbounded)) {
let key = pair.0;
if key.obj_key.timeline != self.timeline {
return None;
}
self.curr_key = key.clone();
self.curr_key.lsn = Lsn(u64::MAX); // next seek should skip all versions
if key.lsn <= self.lsn {
// visible in this snapshot
if self.nonrel_only {
match key.obj_key.tag {
ObjectTag::RelationMetadata(_) => return None,
ObjectTag::RelationBuffer(_) => return None,
_ => return Some(key.obj_key.tag),
}
} else {
return Some(key.obj_key.tag);
}
}
continue 'outer;
}
return None;
}
}
}

View File

@@ -7,6 +7,7 @@ use std::time::Duration;
pub mod basebackup;
pub mod branches;
pub mod inmem_storage;
pub mod object_key;
pub mod object_repository;
pub mod object_store;
@@ -26,9 +27,11 @@ pub mod walredo;
pub struct PageServerConf {
pub daemonize: bool,
pub interactive: bool,
pub materialize: bool,
pub listen_addr: String,
pub gc_horizon: u64,
pub gc_period: Duration,
pub wal_redoers: usize,
// Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory
@@ -103,7 +106,7 @@ impl PageServerConf {
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ZTimelineId([u8; 16]);
impl FromStr for ZTimelineId {

View File

@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
/// repository. It is shared between object_repository.rs and object_store.rs.
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects
/// using the key given by the caller.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct ObjectKey {
pub timeline: ZTimelineId,
pub tag: ObjectTag,

View File

@@ -71,6 +71,7 @@ impl Repository for ObjectRepository {
Some(timeline) => Ok(timeline.clone()),
None => {
let timeline = ObjectTimeline::open(
self.conf,
Arc::clone(&self.obj_store),
timelineid,
self.walredo_mgr.clone(),
@@ -124,6 +125,7 @@ impl Repository for ObjectRepository {
info!("Created empty timeline {}", timelineid);
let timeline = ObjectTimeline::open(
self.conf,
Arc::clone(&self.obj_store),
timelineid,
self.walredo_mgr.clone(),
@@ -163,7 +165,7 @@ impl Repository for ObjectRepository {
match tag {
ObjectTag::TimelineMetadataTag => {} // skip it
_ => {
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?;
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn, false)?;
let val = ObjectValue::Page(PageEntry::Page(img));
let key = ObjectKey { timeline: dst, tag };
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
@@ -197,6 +199,7 @@ pub struct ObjectTimeline {
// Backing key-value store
obj_store: Arc<dyn ObjectStore>,
conf: &'static PageServerConf,
// WAL redo manager, for reconstructing page versions from WAL records.
walredo_mgr: Arc<dyn WalRedoManager>,
@@ -231,6 +234,7 @@ impl ObjectTimeline {
///
/// Loads the metadata for the timeline into memory.
fn open(
conf: &'static PageServerConf,
obj_store: Arc<dyn ObjectStore>,
timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>,
@@ -244,6 +248,7 @@ impl ObjectTimeline {
let timeline = ObjectTimeline {
timelineid,
obj_store,
conf,
walredo_mgr,
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
@@ -265,10 +270,15 @@ impl Timeline for ObjectTimeline {
fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
let lsn = self.wait_lsn(req_lsn)?;
self.get_page_at_lsn_nowait(tag, lsn)
self.get_page_at_lsn_nowait(tag, lsn, self.conf.materialize)
}
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
fn get_page_at_lsn_nowait(
&self,
tag: ObjectTag,
req_lsn: Lsn,
materialize: bool,
) -> Result<Bytes> {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -290,11 +300,13 @@ impl Timeline for ObjectTimeline {
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
// Garbage collection assumes that we remember the materialized page
// version. Otherwise we could opt to not do it, with the downside that
// the next GetPage@LSN call of the same page version would have to
// redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone(), false)?;
if materialize {
// Garbage collection assumes that we remember the materialized page
// version. Otherwise we could opt to not do it, with the downside that
// the next GetPage@LSN call of the same page version would have to
// redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone(), false)?;
}
}
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
_ => bail!("Invalid object kind, expected a page entry or SLRU truncate"),
@@ -712,7 +724,7 @@ impl Timeline for ObjectTimeline {
{
if rel_size > tag.blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
self.get_page_at_lsn_nowait(obj, lsn, true)?;
continue;
}
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
@@ -755,7 +767,7 @@ impl Timeline for ObjectTimeline {
}
ObjectValue::Page(PageEntry::WALRecord(_)) => {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
self.get_page_at_lsn_nowait(obj, lsn, true)?;
}
_ => {} // do nothing if already materialized
}

View File

@@ -6,6 +6,7 @@
use crate::object_repository::ObjectRepository;
use crate::repository::Repository;
use crate::rocksdb_storage::RocksObjectStore;
//use crate::inmem_storage::InmemObjectStore;
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use lazy_static::lazy_static;
@@ -19,6 +20,7 @@ pub fn init(conf: &'static PageServerConf) {
let mut m = REPOSITORY.lock().unwrap();
let obj_store = RocksObjectStore::open(conf).unwrap();
//let obj_store = InmemObjectStore::open(conf).unwrap();
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf);

View File

@@ -59,7 +59,7 @@ pub trait Timeline: Send + Sync {
fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Look up given page in the cache.
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn, materialize: bool) -> Result<Bytes>;
/// Get size of relation
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
@@ -144,13 +144,13 @@ pub trait Timeline: Send + Sync {
/// but it can be explicitly requested through page server API.
///
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
/// `compact` parameter is used to force compaction of storage.
/// Some storage implementation are based on LSM tree and require periodic merge (compaction).
/// Usually storage implementation determines itself when compaction should be performed.
/// But for GC tests it way be useful to force compaction just after completion of GC iteration
/// to make sure that all detected garbage is removed.
/// So right now `compact` is set to true when GC explicitly requested through page srver API,
/// and is st to false in GC threads which infinitely repeats GC iterations in loop.
/// `compact` parameter is used to force compaction of storage.
/// Some storage implementation are based on LSM tree and require periodic merge (compaction).
/// Usually storage implementation determines itself when compaction should be performed.
/// But for GC tests it way be useful to force compaction just after completion of GC iteration
/// to make sure that all detected garbage is removed.
/// So right now `compact` is set to true when GC explicitly requested through page srver API,
/// and is st to false in GC threads which infinitely repeats GC iterations in loop.
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult>;
// Check transaction status
@@ -353,8 +353,10 @@ mod tests {
let conf = PageServerConf {
daemonize: false,
interactive: false,
materialize: false,
gc_horizon: 64 * 1024 * 1024,
gc_period: Duration::from_secs(10),
wal_redoers: 1,
listen_addr: "127.0.0.1:5430".to_string(),
workdir: repo_dir,
pg_distrib_dir: "".into(),

View File

@@ -70,7 +70,7 @@ pub fn import_timeline_from_postgres_datadir(
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?;
// Extract checkpoint record from pg_control and store is as separate object
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn)?;
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn, false)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode();
timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?;
@@ -298,7 +298,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let checkpoint_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
loop {
@@ -578,7 +579,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
blknum,
});
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?;
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn, false)?;
debug!("copying block {:?} to {:?}", src_key, dst_key);
@@ -598,7 +599,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
match tag {
ObjectTag::FileNodeMap(db) => {
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?;
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn, false)?;
let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: tablespace_id,
dbnode: db_id,

View File

@@ -169,7 +169,8 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let checkpoint_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);

View File

@@ -26,6 +26,7 @@ use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
@@ -103,13 +104,16 @@ struct PostgresRedoManagerInternal {
}
#[derive(Debug)]
struct WalRedoRequest {
struct WalRedoRequestData {
tag: ObjectTag,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
}
#[derive(Debug)]
struct WalRedoRequest {
data: WalRedoRequestData,
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
}
@@ -175,10 +179,12 @@ impl WalRedoManager for PostgresRedoManager {
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest {
tag,
lsn,
base_img,
records,
data: WalRedoRequestData {
tag,
lsn,
base_img,
records,
},
response_channel: tx,
};
@@ -229,40 +235,64 @@ impl PostgresRedoManagerInternal {
.build()
.unwrap();
let process: PostgresRedoProcess;
let processes: Vec<PostgresRedoProcess>;
info!("launching WAL redo postgres process");
process = runtime
.block_on(PostgresRedoProcess::launch(self.conf))
.unwrap();
let wal_redoers = self.conf.wal_redoers;
processes = (0..wal_redoers)
.map(|i| {
runtime
.block_on(PostgresRedoProcess::launch(self.conf, i))
.unwrap()
})
.collect();
// Loop forever, handling requests as they come.
loop {
let request = self
.request_rx
.recv()
.expect("WAL redo request channel was closed");
let mut requests: Vec<WalRedoRequest> = Vec::new();
requests.push(
self.request_rx
.recv()
.expect("WAL redo request channel was closed"),
);
loop {
let req = self.request_rx.try_recv();
match req {
Ok(req) => requests.push(req),
Err(_) => break,
}
}
let request_data = requests.iter().map(|req| &req.data);
let mut rr = 0; // round robin
let results = runtime.block_on(async {
let futures = request_data.map(|req| {
rr += 1;
self.handle_apply_request(&processes[rr % wal_redoers], &req)
});
let mut results: Vec<Result<Bytes, WalRedoError>> = Vec::new();
for future in futures {
results.push(future.await);
}
results
});
for (result, request) in results.into_iter().zip(requests.iter()) {
let result_ok = result.is_ok();
let result = runtime.block_on(self.handle_apply_request(&process, &request));
let result_ok = result.is_ok();
// Send the result to the requester
let _ = request.response_channel.send(result);
// Send the result to the requester
let _ = request.response_channel.send(result);
if !result_ok {
error!("wal-redo-postgres failed to apply request {:?}", request);
if !result_ok {
error!("wal-redo-postgres failed to apply request {:?}", request);
}
}
}
}
///
/// Process one request for WAL redo.
///
async fn handle_apply_request(
&self,
process: &PostgresRedoProcess,
request: &WalRedoRequest,
request: &WalRedoRequestData,
) -> Result<Bytes, WalRedoError> {
let tag = request.tag;
let lsn = request.lsn;
@@ -446,19 +476,19 @@ impl PostgresRedoManagerInternal {
}
struct PostgresRedoProcess {
stdin: RefCell<ChildStdin>,
stdout: RefCell<ChildStdout>,
stdin: Arc<RefCell<ChildStdin>>,
stdout: Arc<RefCell<ChildStdout>>,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
async fn launch(conf: &PageServerConf) -> Result<PostgresRedoProcess, Error> {
async fn launch(conf: &PageServerConf, id: usize) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf.workdir.join("wal-redo-datadir");
let datadir = conf.workdir.join(format!("wal-redo-datadir-{}", id));
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
@@ -538,8 +568,8 @@ impl PostgresRedoProcess {
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess {
stdin: RefCell::new(stdin),
stdout: RefCell::new(stdout),
stdin: Arc::new(RefCell::new(stdin)),
stdout: Arc::new(RefCell::new(stdout)),
})
}

View File

@@ -0,0 +1,25 @@
from contextlib import closing
import psycopg2.extras
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test insertion of larg number of records
#
# This test is pretty tightly coupled with the current implementation of page version storage
# and garbage collection in object_repository.rs.
#
def test_bulk_insert(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_bulk_insert", "empty"])
pg = postgres.create_start('test_bulk_insert')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)")
cur.execute("create index on t(c1)")
cur.execute("create index on t(c2)")
cur.execute("create index on t(c3)")
cur.execute("create index on t(c4)")
cur.execute("create index on t(c5)")
cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))")
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000,random()*1000000,random()*1000000,random()*1000000)")

View File

@@ -0,0 +1,26 @@
from contextlib import closing
import psycopg2.extras
import time
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Test insertion of larg number of records
#
# This test is pretty tightly coupled with the current implementation of page version storage
# and garbage collection in object_repository.rs.
#
def test_seq_scan(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run(["branch", "test_seq_scan", "empty"])
pg = postgres.create_start('test_seq_scan')
with closing(pg.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)")
cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))")
cur.execute("set max_parallel_workers_per_gather=0");
for i in range(100):
start = time.time()
cur.execute("select count(*) from t");
stop = time.time()
print(f'Elapsed time for iterating through 1000000 records is {stop - start}')