mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 02:30:37 +00:00
Compare commits
6 Commits
hackathon/
...
parallel_w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de02cc9ee4 | ||
|
|
96e73fb585 | ||
|
|
a533d22f71 | ||
|
|
0ab4792943 | ||
|
|
c60e3e2337 | ||
|
|
1e0e3fbde0 |
@@ -132,7 +132,9 @@ impl<'a> Basebackup<'a> {
|
|||||||
tag: &ObjectTag,
|
tag: &ObjectTag,
|
||||||
page: u32,
|
page: u32,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
let img = self
|
||||||
|
.timeline
|
||||||
|
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||||
// Zero length image indicates truncated segment: just skip it
|
// Zero length image indicates truncated segment: just skip it
|
||||||
if !img.is_empty() {
|
if !img.is_empty() {
|
||||||
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
assert!(img.len() == pg_constants::BLCKSZ as usize);
|
||||||
@@ -172,7 +174,9 @@ impl<'a> Basebackup<'a> {
|
|||||||
// Extract pg_filenode.map files from repository
|
// Extract pg_filenode.map files from repository
|
||||||
//
|
//
|
||||||
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
|
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
|
||||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
let img = self
|
||||||
|
.timeline
|
||||||
|
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||||
info!("add_relmap_file {:?}", db);
|
info!("add_relmap_file {:?}", db);
|
||||||
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
|
||||||
String::from("global/pg_filenode.map") // filenode map for global tablespace
|
String::from("global/pg_filenode.map") // filenode map for global tablespace
|
||||||
@@ -198,7 +202,9 @@ impl<'a> Basebackup<'a> {
|
|||||||
if self.timeline.get_tx_status(xid, self.lsn)?
|
if self.timeline.get_tx_status(xid, self.lsn)?
|
||||||
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
|
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
|
||||||
{
|
{
|
||||||
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
|
let img = self
|
||||||
|
.timeline
|
||||||
|
.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
|
||||||
let mut buf = BytesMut::new();
|
let mut buf = BytesMut::new();
|
||||||
buf.extend_from_slice(&img[..]);
|
buf.extend_from_slice(&img[..]);
|
||||||
let crc = crc32c::crc32c(&img[..]);
|
let crc = crc32c::crc32c(&img[..]);
|
||||||
@@ -214,12 +220,12 @@ impl<'a> Basebackup<'a> {
|
|||||||
// Add generated pg_control file
|
// Add generated pg_control file
|
||||||
//
|
//
|
||||||
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
|
||||||
let checkpoint_bytes = self
|
let checkpoint_bytes =
|
||||||
.timeline
|
self.timeline
|
||||||
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?;
|
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?;
|
||||||
let pg_control_bytes = self
|
let pg_control_bytes =
|
||||||
.timeline
|
self.timeline
|
||||||
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
|
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?;
|
||||||
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
|
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
|
||||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||||
|
|
||||||
|
|||||||
@@ -28,12 +28,15 @@ const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
|
|||||||
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||||
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||||
|
|
||||||
|
const DEFAULT_WAL_REDOERS: usize = 1;
|
||||||
|
|
||||||
/// String arguments that can be declared via CLI or config file
|
/// String arguments that can be declared via CLI or config file
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct CfgFileParams {
|
struct CfgFileParams {
|
||||||
listen_addr: Option<String>,
|
listen_addr: Option<String>,
|
||||||
gc_horizon: Option<String>,
|
gc_horizon: Option<String>,
|
||||||
gc_period: Option<String>,
|
gc_period: Option<String>,
|
||||||
|
wal_redoers: Option<String>,
|
||||||
pg_distrib_dir: Option<String>,
|
pg_distrib_dir: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -48,6 +51,7 @@ impl CfgFileParams {
|
|||||||
listen_addr: get_arg("listen"),
|
listen_addr: get_arg("listen"),
|
||||||
gc_horizon: get_arg("gc_horizon"),
|
gc_horizon: get_arg("gc_horizon"),
|
||||||
gc_period: get_arg("gc_period"),
|
gc_period: get_arg("gc_period"),
|
||||||
|
wal_redoers: get_arg("wal_redoers"),
|
||||||
pg_distrib_dir: get_arg("postgres-distrib"),
|
pg_distrib_dir: get_arg("postgres-distrib"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -59,6 +63,7 @@ impl CfgFileParams {
|
|||||||
listen_addr: self.listen_addr.or(other.listen_addr),
|
listen_addr: self.listen_addr.or(other.listen_addr),
|
||||||
gc_horizon: self.gc_horizon.or(other.gc_horizon),
|
gc_horizon: self.gc_horizon.or(other.gc_horizon),
|
||||||
gc_period: self.gc_period.or(other.gc_period),
|
gc_period: self.gc_period.or(other.gc_period),
|
||||||
|
wal_redoers: self.wal_redoers.or(other.wal_redoers),
|
||||||
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
|
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -79,6 +84,11 @@ impl CfgFileParams {
|
|||||||
None => DEFAULT_GC_PERIOD,
|
None => DEFAULT_GC_PERIOD,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let wal_redoers = match self.wal_redoers.as_ref() {
|
||||||
|
Some(wal_redoers_str) => wal_redoers_str.parse::<usize>()?,
|
||||||
|
None => DEFAULT_WAL_REDOERS,
|
||||||
|
};
|
||||||
|
|
||||||
let pg_distrib_dir = match self.pg_distrib_dir.as_ref() {
|
let pg_distrib_dir = match self.pg_distrib_dir.as_ref() {
|
||||||
Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str),
|
Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str),
|
||||||
None => env::current_dir()?.join("tmp_install"),
|
None => env::current_dir()?.join("tmp_install"),
|
||||||
@@ -91,11 +101,12 @@ impl CfgFileParams {
|
|||||||
Ok(PageServerConf {
|
Ok(PageServerConf {
|
||||||
daemonize: false,
|
daemonize: false,
|
||||||
interactive: false,
|
interactive: false,
|
||||||
|
materialize: false,
|
||||||
|
|
||||||
listen_addr,
|
listen_addr,
|
||||||
gc_horizon,
|
gc_horizon,
|
||||||
gc_period,
|
gc_period,
|
||||||
|
wal_redoers,
|
||||||
workdir: PathBuf::from("."),
|
workdir: PathBuf::from("."),
|
||||||
|
|
||||||
pg_distrib_dir,
|
pg_distrib_dir,
|
||||||
@@ -120,6 +131,12 @@ fn main() -> Result<()> {
|
|||||||
.takes_value(false)
|
.takes_value(false)
|
||||||
.help("Interactive mode"),
|
.help("Interactive mode"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("materialize")
|
||||||
|
.long("materialize")
|
||||||
|
.takes_value(false)
|
||||||
|
.help("Materialize pages constructed by get_page_at"),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("daemonize")
|
Arg::with_name("daemonize")
|
||||||
.short("d")
|
.short("d")
|
||||||
@@ -145,6 +162,12 @@ fn main() -> Result<()> {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.help("Interval between garbage collector iterations"),
|
.help("Interval between garbage collector iterations"),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("wal_redoers")
|
||||||
|
.long("wal_redoers")
|
||||||
|
.takes_value(true)
|
||||||
|
.help("Number of wal-redo postgres instances"),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("workdir")
|
Arg::with_name("workdir")
|
||||||
.short("D")
|
.short("D")
|
||||||
@@ -181,6 +204,7 @@ fn main() -> Result<()> {
|
|||||||
|
|
||||||
conf.daemonize = arg_matches.is_present("daemonize");
|
conf.daemonize = arg_matches.is_present("daemonize");
|
||||||
conf.interactive = arg_matches.is_present("interactive");
|
conf.interactive = arg_matches.is_present("interactive");
|
||||||
|
conf.materialize = arg_matches.is_present("materialize");
|
||||||
|
|
||||||
if init && (conf.daemonize || conf.interactive) {
|
if init && (conf.daemonize || conf.interactive) {
|
||||||
eprintln!("--daemonize and --interactive may not be used with --init");
|
eprintln!("--daemonize and --interactive may not be used with --init");
|
||||||
|
|||||||
@@ -101,6 +101,7 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> {
|
|||||||
// rapid init+start case now, but the general race condition remains if you restart the
|
// rapid init+start case now, but the general race condition remains if you restart the
|
||||||
// server quickly.
|
// server quickly.
|
||||||
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
|
let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?;
|
||||||
|
//let storage = crate::inmem_storage::InmemObjectStore::create(conf)?;
|
||||||
|
|
||||||
let repo = crate::object_repository::ObjectRepository::new(
|
let repo = crate::object_repository::ObjectRepository::new(
|
||||||
conf,
|
conf,
|
||||||
|
|||||||
345
pageserver/src/inmem_storage.rs
Normal file
345
pageserver/src/inmem_storage.rs
Normal file
@@ -0,0 +1,345 @@
|
|||||||
|
//!
|
||||||
|
//! An implementation of the ObjectStore interface, backed by BTreeMap
|
||||||
|
//!
|
||||||
|
use crate::object_key::*;
|
||||||
|
use crate::object_store::ObjectStore;
|
||||||
|
use crate::repository::RelTag;
|
||||||
|
use crate::PageServerConf;
|
||||||
|
use crate::ZTimelineId;
|
||||||
|
use anyhow::{bail, Result};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{BTreeMap, HashSet};
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::prelude::*;
|
||||||
|
use std::ops::Bound::*;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
use zenith_utils::bin_ser::BeSer;
|
||||||
|
use zenith_utils::lsn::Lsn;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
|
||||||
|
pub struct StorageKey {
|
||||||
|
obj_key: ObjectKey,
|
||||||
|
lsn: Lsn,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StorageKey {
|
||||||
|
/// The first key for a given timeline
|
||||||
|
fn timeline_start(timeline: ZTimelineId) -> Self {
|
||||||
|
Self {
|
||||||
|
obj_key: ObjectKey {
|
||||||
|
timeline,
|
||||||
|
tag: ObjectTag::FirstTag,
|
||||||
|
},
|
||||||
|
lsn: Lsn(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InmemObjectStore {
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
db: RwLock<BTreeMap<StorageKey, Vec<u8>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectStore for InmemObjectStore {
|
||||||
|
fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result<Vec<u8>> {
|
||||||
|
let db = self.db.read().unwrap();
|
||||||
|
let val = db.get(&StorageKey {
|
||||||
|
obj_key: key.clone(),
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
if let Some(val) = val {
|
||||||
|
Ok(val.clone())
|
||||||
|
} else {
|
||||||
|
bail!("could not find page {:?}", key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_next_key(&self, key: &ObjectKey) -> Result<Option<ObjectKey>> {
|
||||||
|
let search_key = StorageKey {
|
||||||
|
obj_key: key.clone(),
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let db = self.db.read().unwrap();
|
||||||
|
for pair in db.range(&search_key..) {
|
||||||
|
let key = pair.0;
|
||||||
|
return Ok(Some(key.obj_key.clone()));
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> {
|
||||||
|
let mut db = self.db.write().unwrap();
|
||||||
|
db.insert(
|
||||||
|
StorageKey {
|
||||||
|
obj_key: key.clone(),
|
||||||
|
lsn,
|
||||||
|
},
|
||||||
|
value.to_vec(),
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> {
|
||||||
|
let mut db = self.db.write().unwrap();
|
||||||
|
db.remove(&StorageKey {
|
||||||
|
obj_key: key.clone(),
|
||||||
|
lsn,
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Iterate through page versions of given page, starting from the given LSN.
|
||||||
|
/// The versions are walked in descending LSN order.
|
||||||
|
fn object_versions<'a>(
|
||||||
|
&'a self,
|
||||||
|
key: &ObjectKey,
|
||||||
|
lsn: Lsn,
|
||||||
|
) -> Result<Box<dyn Iterator<Item = (Lsn, Vec<u8>)> + 'a>> {
|
||||||
|
let from = StorageKey {
|
||||||
|
obj_key: key.clone(),
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
let till = StorageKey {
|
||||||
|
obj_key: key.clone(),
|
||||||
|
lsn,
|
||||||
|
};
|
||||||
|
let db = self.db.read().unwrap();
|
||||||
|
let versions: Vec<(Lsn, Vec<u8>)> = db
|
||||||
|
.range(from..=till)
|
||||||
|
.map(|pair| (pair.0.lsn, pair.1.clone()))
|
||||||
|
.collect();
|
||||||
|
Ok(Box::new(InmemObjectVersionIter::new(versions)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Iterate through all timeline objects
|
||||||
|
fn list_objects<'a>(
|
||||||
|
&'a self,
|
||||||
|
timeline: ZTimelineId,
|
||||||
|
nonrel_only: bool,
|
||||||
|
lsn: Lsn,
|
||||||
|
) -> Result<Box<dyn Iterator<Item = ObjectTag> + 'a>> {
|
||||||
|
let curr_key = StorageKey::timeline_start(timeline);
|
||||||
|
|
||||||
|
Ok(Box::new(InmemObjectIter {
|
||||||
|
store: &self,
|
||||||
|
curr_key,
|
||||||
|
timeline,
|
||||||
|
nonrel_only,
|
||||||
|
lsn,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a list of all distinct relations in given tablespace and database.
|
||||||
|
///
|
||||||
|
/// TODO: This implementation is very inefficient, it scans
|
||||||
|
/// through all entries in the given database. In practice, this
|
||||||
|
/// is used for CREATE DATABASE, and usually the template database is small.
|
||||||
|
/// But if it's not, this will be slow.
|
||||||
|
fn list_rels(
|
||||||
|
&self,
|
||||||
|
timelineid: ZTimelineId,
|
||||||
|
spcnode: u32,
|
||||||
|
dbnode: u32,
|
||||||
|
lsn: Lsn,
|
||||||
|
) -> Result<HashSet<RelTag>> {
|
||||||
|
// FIXME: This scans everything. Very slow
|
||||||
|
|
||||||
|
let mut rels: HashSet<RelTag> = HashSet::new();
|
||||||
|
|
||||||
|
let mut search_rel_tag = RelTag {
|
||||||
|
spcnode,
|
||||||
|
dbnode,
|
||||||
|
relnode: 0,
|
||||||
|
forknum: 0u8,
|
||||||
|
};
|
||||||
|
let db = self.db.read().unwrap();
|
||||||
|
'outer: loop {
|
||||||
|
let search_key = StorageKey {
|
||||||
|
obj_key: ObjectKey {
|
||||||
|
timeline: timelineid,
|
||||||
|
tag: ObjectTag::RelationMetadata(search_rel_tag),
|
||||||
|
},
|
||||||
|
lsn: Lsn(0),
|
||||||
|
};
|
||||||
|
for pair in db.range(&search_key..) {
|
||||||
|
let key = pair.0;
|
||||||
|
|
||||||
|
if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag {
|
||||||
|
if spcnode != 0 && rel_tag.spcnode != spcnode
|
||||||
|
|| dbnode != 0 && rel_tag.dbnode != dbnode
|
||||||
|
{
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
if key.lsn <= lsn {
|
||||||
|
// visible in this snapshot
|
||||||
|
rels.insert(rel_tag);
|
||||||
|
}
|
||||||
|
search_rel_tag = rel_tag;
|
||||||
|
// skip to next relation
|
||||||
|
// FIXME: What if relnode is u32::MAX ?
|
||||||
|
search_rel_tag.relnode += 1;
|
||||||
|
continue 'outer;
|
||||||
|
} else {
|
||||||
|
// no more relation metadata entries
|
||||||
|
break 'outer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(rels)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Iterate through versions of all objects in a timeline.
|
||||||
|
///
|
||||||
|
/// Returns objects in increasing key-version order.
|
||||||
|
/// Returns all versions up to and including the specified LSN.
|
||||||
|
fn objects<'a>(
|
||||||
|
&'a self,
|
||||||
|
timeline: ZTimelineId,
|
||||||
|
lsn: Lsn,
|
||||||
|
) -> Result<Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>> {
|
||||||
|
let curr_key = StorageKey::timeline_start(timeline);
|
||||||
|
|
||||||
|
Ok(Box::new(InmemObjects {
|
||||||
|
store: &self,
|
||||||
|
curr_key,
|
||||||
|
timeline,
|
||||||
|
lsn,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compact(&self) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for InmemObjectStore {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let path = self.conf.workdir.join("objstore.dmp");
|
||||||
|
let mut f = File::create(path).unwrap();
|
||||||
|
f.write(&self.db.ser().unwrap()).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InmemObjectStore {
|
||||||
|
pub fn open(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
|
||||||
|
let path = conf.workdir.join("objstore.dmp");
|
||||||
|
let mut f = File::open(path)?;
|
||||||
|
let mut buffer = Vec::new();
|
||||||
|
// read the whole file
|
||||||
|
f.read_to_end(&mut buffer)?;
|
||||||
|
let db = RwLock::new(BTreeMap::des(&buffer)?);
|
||||||
|
Ok(InmemObjectStore { conf: conf, db })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create(conf: &'static PageServerConf) -> Result<InmemObjectStore> {
|
||||||
|
Ok(InmemObjectStore {
|
||||||
|
conf: conf,
|
||||||
|
db: RwLock::new(BTreeMap::new()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Iterator for `object_versions`. Returns all page versions of a given block, in
|
||||||
|
/// reverse LSN order.
|
||||||
|
///
|
||||||
|
struct InmemObjectVersionIter {
|
||||||
|
versions: Vec<(Lsn, Vec<u8>)>,
|
||||||
|
curr: usize,
|
||||||
|
}
|
||||||
|
impl InmemObjectVersionIter {
|
||||||
|
fn new(versions: Vec<(Lsn, Vec<u8>)>) -> InmemObjectVersionIter {
|
||||||
|
let curr = versions.len();
|
||||||
|
InmemObjectVersionIter { versions, curr }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Iterator for InmemObjectVersionIter {
|
||||||
|
type Item = (Lsn, Vec<u8>);
|
||||||
|
|
||||||
|
fn next(&mut self) -> std::option::Option<Self::Item> {
|
||||||
|
if self.curr == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
self.curr -= 1;
|
||||||
|
Some(self.versions[self.curr].clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct InmemObjects<'r> {
|
||||||
|
store: &'r InmemObjectStore,
|
||||||
|
curr_key: StorageKey,
|
||||||
|
timeline: ZTimelineId,
|
||||||
|
lsn: Lsn,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'r> Iterator for InmemObjects<'r> {
|
||||||
|
// TODO consider returning Box<[u8]>
|
||||||
|
type Item = Result<(ObjectTag, Lsn, Vec<u8>)>;
|
||||||
|
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.next_result().transpose()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'r> InmemObjects<'r> {
|
||||||
|
fn next_result(&mut self) -> Result<Option<(ObjectTag, Lsn, Vec<u8>)>> {
|
||||||
|
let db = self.store.db.read().unwrap();
|
||||||
|
for pair in db.range((Excluded(&self.curr_key), Unbounded)) {
|
||||||
|
let key = pair.0;
|
||||||
|
if key.obj_key.timeline != self.timeline {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
if key.lsn > self.lsn {
|
||||||
|
// TODO can speed up by seeking iterator
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
self.curr_key = key.clone();
|
||||||
|
let value = pair.1.clone();
|
||||||
|
return Ok(Some((key.obj_key.tag, key.lsn, value)));
|
||||||
|
}
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///
|
||||||
|
/// Iterator for `list_objects`. Returns all objects preceeding specified LSN
|
||||||
|
///
|
||||||
|
struct InmemObjectIter<'a> {
|
||||||
|
store: &'a InmemObjectStore,
|
||||||
|
curr_key: StorageKey,
|
||||||
|
timeline: ZTimelineId,
|
||||||
|
nonrel_only: bool,
|
||||||
|
lsn: Lsn,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Iterator for InmemObjectIter<'a> {
|
||||||
|
type Item = ObjectTag;
|
||||||
|
|
||||||
|
fn next(&mut self) -> std::option::Option<Self::Item> {
|
||||||
|
let db = self.store.db.read().unwrap();
|
||||||
|
'outer: loop {
|
||||||
|
for pair in db.range((Excluded(&self.curr_key), Unbounded)) {
|
||||||
|
let key = pair.0;
|
||||||
|
if key.obj_key.timeline != self.timeline {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
self.curr_key = key.clone();
|
||||||
|
self.curr_key.lsn = Lsn(u64::MAX); // next seek should skip all versions
|
||||||
|
if key.lsn <= self.lsn {
|
||||||
|
// visible in this snapshot
|
||||||
|
if self.nonrel_only {
|
||||||
|
match key.obj_key.tag {
|
||||||
|
ObjectTag::RelationMetadata(_) => return None,
|
||||||
|
ObjectTag::RelationBuffer(_) => return None,
|
||||||
|
_ => return Some(key.obj_key.tag),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Some(key.obj_key.tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue 'outer;
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ use std::time::Duration;
|
|||||||
|
|
||||||
pub mod basebackup;
|
pub mod basebackup;
|
||||||
pub mod branches;
|
pub mod branches;
|
||||||
|
pub mod inmem_storage;
|
||||||
pub mod object_key;
|
pub mod object_key;
|
||||||
pub mod object_repository;
|
pub mod object_repository;
|
||||||
pub mod object_store;
|
pub mod object_store;
|
||||||
@@ -26,9 +27,11 @@ pub mod walredo;
|
|||||||
pub struct PageServerConf {
|
pub struct PageServerConf {
|
||||||
pub daemonize: bool,
|
pub daemonize: bool,
|
||||||
pub interactive: bool,
|
pub interactive: bool,
|
||||||
|
pub materialize: bool,
|
||||||
pub listen_addr: String,
|
pub listen_addr: String,
|
||||||
pub gc_horizon: u64,
|
pub gc_horizon: u64,
|
||||||
pub gc_period: Duration,
|
pub gc_period: Duration,
|
||||||
|
pub wal_redoers: usize,
|
||||||
|
|
||||||
// Repository directory, relative to current working directory.
|
// Repository directory, relative to current working directory.
|
||||||
// Normally, the page server changes the current working directory
|
// Normally, the page server changes the current working directory
|
||||||
@@ -103,7 +106,7 @@ impl PageServerConf {
|
|||||||
/// is separate from PostgreSQL timelines, and doesn't have those
|
/// is separate from PostgreSQL timelines, and doesn't have those
|
||||||
/// limitations. A zenith timeline is identified by a 128-bit ID, which
|
/// limitations. A zenith timeline is identified by a 128-bit ID, which
|
||||||
/// is usually printed out as a hex string.
|
/// is usually printed out as a hex string.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||||
pub struct ZTimelineId([u8; 16]);
|
pub struct ZTimelineId([u8; 16]);
|
||||||
|
|
||||||
impl FromStr for ZTimelineId {
|
impl FromStr for ZTimelineId {
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
/// repository. It is shared between object_repository.rs and object_store.rs.
|
/// repository. It is shared between object_repository.rs and object_store.rs.
|
||||||
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects
|
/// It is mostly opaque to ObjectStore, it just stores and retrieves objects
|
||||||
/// using the key given by the caller.
|
/// using the key given by the caller.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
pub struct ObjectKey {
|
pub struct ObjectKey {
|
||||||
pub timeline: ZTimelineId,
|
pub timeline: ZTimelineId,
|
||||||
pub tag: ObjectTag,
|
pub tag: ObjectTag,
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ impl Repository for ObjectRepository {
|
|||||||
Some(timeline) => Ok(timeline.clone()),
|
Some(timeline) => Ok(timeline.clone()),
|
||||||
None => {
|
None => {
|
||||||
let timeline = ObjectTimeline::open(
|
let timeline = ObjectTimeline::open(
|
||||||
|
self.conf,
|
||||||
Arc::clone(&self.obj_store),
|
Arc::clone(&self.obj_store),
|
||||||
timelineid,
|
timelineid,
|
||||||
self.walredo_mgr.clone(),
|
self.walredo_mgr.clone(),
|
||||||
@@ -124,6 +125,7 @@ impl Repository for ObjectRepository {
|
|||||||
info!("Created empty timeline {}", timelineid);
|
info!("Created empty timeline {}", timelineid);
|
||||||
|
|
||||||
let timeline = ObjectTimeline::open(
|
let timeline = ObjectTimeline::open(
|
||||||
|
self.conf,
|
||||||
Arc::clone(&self.obj_store),
|
Arc::clone(&self.obj_store),
|
||||||
timelineid,
|
timelineid,
|
||||||
self.walredo_mgr.clone(),
|
self.walredo_mgr.clone(),
|
||||||
@@ -163,7 +165,7 @@ impl Repository for ObjectRepository {
|
|||||||
match tag {
|
match tag {
|
||||||
ObjectTag::TimelineMetadataTag => {} // skip it
|
ObjectTag::TimelineMetadataTag => {} // skip it
|
||||||
_ => {
|
_ => {
|
||||||
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?;
|
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn, false)?;
|
||||||
let val = ObjectValue::Page(PageEntry::Page(img));
|
let val = ObjectValue::Page(PageEntry::Page(img));
|
||||||
let key = ObjectKey { timeline: dst, tag };
|
let key = ObjectKey { timeline: dst, tag };
|
||||||
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
|
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
|
||||||
@@ -197,6 +199,7 @@ pub struct ObjectTimeline {
|
|||||||
|
|
||||||
// Backing key-value store
|
// Backing key-value store
|
||||||
obj_store: Arc<dyn ObjectStore>,
|
obj_store: Arc<dyn ObjectStore>,
|
||||||
|
conf: &'static PageServerConf,
|
||||||
|
|
||||||
// WAL redo manager, for reconstructing page versions from WAL records.
|
// WAL redo manager, for reconstructing page versions from WAL records.
|
||||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||||
@@ -231,6 +234,7 @@ impl ObjectTimeline {
|
|||||||
///
|
///
|
||||||
/// Loads the metadata for the timeline into memory.
|
/// Loads the metadata for the timeline into memory.
|
||||||
fn open(
|
fn open(
|
||||||
|
conf: &'static PageServerConf,
|
||||||
obj_store: Arc<dyn ObjectStore>,
|
obj_store: Arc<dyn ObjectStore>,
|
||||||
timelineid: ZTimelineId,
|
timelineid: ZTimelineId,
|
||||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||||
@@ -244,6 +248,7 @@ impl ObjectTimeline {
|
|||||||
let timeline = ObjectTimeline {
|
let timeline = ObjectTimeline {
|
||||||
timelineid,
|
timelineid,
|
||||||
obj_store,
|
obj_store,
|
||||||
|
conf,
|
||||||
walredo_mgr,
|
walredo_mgr,
|
||||||
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
|
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
|
||||||
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
|
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
|
||||||
@@ -265,10 +270,15 @@ impl Timeline for ObjectTimeline {
|
|||||||
fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
|
fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
|
||||||
let lsn = self.wait_lsn(req_lsn)?;
|
let lsn = self.wait_lsn(req_lsn)?;
|
||||||
|
|
||||||
self.get_page_at_lsn_nowait(tag, lsn)
|
self.get_page_at_lsn_nowait(tag, lsn, self.conf.materialize)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
|
fn get_page_at_lsn_nowait(
|
||||||
|
&self,
|
||||||
|
tag: ObjectTag,
|
||||||
|
req_lsn: Lsn,
|
||||||
|
materialize: bool,
|
||||||
|
) -> Result<Bytes> {
|
||||||
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
|
||||||
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
|
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
|
||||||
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
// ask the WAL redo service to reconstruct the page image from the WAL records.
|
||||||
@@ -290,11 +300,13 @@ impl Timeline for ObjectTimeline {
|
|||||||
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
|
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
|
||||||
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
|
||||||
|
|
||||||
// Garbage collection assumes that we remember the materialized page
|
if materialize {
|
||||||
// version. Otherwise we could opt to not do it, with the downside that
|
// Garbage collection assumes that we remember the materialized page
|
||||||
// the next GetPage@LSN call of the same page version would have to
|
// version. Otherwise we could opt to not do it, with the downside that
|
||||||
// redo the WAL again.
|
// the next GetPage@LSN call of the same page version would have to
|
||||||
self.put_page_image(tag, lsn, page_img.clone(), false)?;
|
// redo the WAL again.
|
||||||
|
self.put_page_image(tag, lsn, page_img.clone(), false)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
|
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
|
||||||
_ => bail!("Invalid object kind, expected a page entry or SLRU truncate"),
|
_ => bail!("Invalid object kind, expected a page entry or SLRU truncate"),
|
||||||
@@ -712,7 +724,7 @@ impl Timeline for ObjectTimeline {
|
|||||||
{
|
{
|
||||||
if rel_size > tag.blknum {
|
if rel_size > tag.blknum {
|
||||||
// preserve and materialize last version before deleting all preceeding
|
// preserve and materialize last version before deleting all preceeding
|
||||||
self.get_page_at_lsn_nowait(obj, lsn)?;
|
self.get_page_at_lsn_nowait(obj, lsn, true)?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
|
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
|
||||||
@@ -755,7 +767,7 @@ impl Timeline for ObjectTimeline {
|
|||||||
}
|
}
|
||||||
ObjectValue::Page(PageEntry::WALRecord(_)) => {
|
ObjectValue::Page(PageEntry::WALRecord(_)) => {
|
||||||
// preserve and materialize last version before deleting all preceeding
|
// preserve and materialize last version before deleting all preceeding
|
||||||
self.get_page_at_lsn_nowait(obj, lsn)?;
|
self.get_page_at_lsn_nowait(obj, lsn, true)?;
|
||||||
}
|
}
|
||||||
_ => {} // do nothing if already materialized
|
_ => {} // do nothing if already materialized
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
use crate::object_repository::ObjectRepository;
|
use crate::object_repository::ObjectRepository;
|
||||||
use crate::repository::Repository;
|
use crate::repository::Repository;
|
||||||
use crate::rocksdb_storage::RocksObjectStore;
|
use crate::rocksdb_storage::RocksObjectStore;
|
||||||
|
//use crate::inmem_storage::InmemObjectStore;
|
||||||
use crate::walredo::PostgresRedoManager;
|
use crate::walredo::PostgresRedoManager;
|
||||||
use crate::PageServerConf;
|
use crate::PageServerConf;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
@@ -19,6 +20,7 @@ pub fn init(conf: &'static PageServerConf) {
|
|||||||
let mut m = REPOSITORY.lock().unwrap();
|
let mut m = REPOSITORY.lock().unwrap();
|
||||||
|
|
||||||
let obj_store = RocksObjectStore::open(conf).unwrap();
|
let obj_store = RocksObjectStore::open(conf).unwrap();
|
||||||
|
//let obj_store = InmemObjectStore::open(conf).unwrap();
|
||||||
|
|
||||||
// Set up a WAL redo manager, for applying WAL records.
|
// Set up a WAL redo manager, for applying WAL records.
|
||||||
let walredo_mgr = PostgresRedoManager::new(conf);
|
let walredo_mgr = PostgresRedoManager::new(conf);
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ pub trait Timeline: Send + Sync {
|
|||||||
fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
|
fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
|
||||||
|
|
||||||
/// Look up given page in the cache.
|
/// Look up given page in the cache.
|
||||||
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
|
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn, materialize: bool) -> Result<Bytes>;
|
||||||
|
|
||||||
/// Get size of relation
|
/// Get size of relation
|
||||||
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
|
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;
|
||||||
@@ -144,13 +144,13 @@ pub trait Timeline: Send + Sync {
|
|||||||
/// but it can be explicitly requested through page server API.
|
/// but it can be explicitly requested through page server API.
|
||||||
///
|
///
|
||||||
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
|
/// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval).
|
||||||
/// `compact` parameter is used to force compaction of storage.
|
/// `compact` parameter is used to force compaction of storage.
|
||||||
/// Some storage implementation are based on LSM tree and require periodic merge (compaction).
|
/// Some storage implementation are based on LSM tree and require periodic merge (compaction).
|
||||||
/// Usually storage implementation determines itself when compaction should be performed.
|
/// Usually storage implementation determines itself when compaction should be performed.
|
||||||
/// But for GC tests it way be useful to force compaction just after completion of GC iteration
|
/// But for GC tests it way be useful to force compaction just after completion of GC iteration
|
||||||
/// to make sure that all detected garbage is removed.
|
/// to make sure that all detected garbage is removed.
|
||||||
/// So right now `compact` is set to true when GC explicitly requested through page srver API,
|
/// So right now `compact` is set to true when GC explicitly requested through page srver API,
|
||||||
/// and is st to false in GC threads which infinitely repeats GC iterations in loop.
|
/// and is st to false in GC threads which infinitely repeats GC iterations in loop.
|
||||||
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult>;
|
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult>;
|
||||||
|
|
||||||
// Check transaction status
|
// Check transaction status
|
||||||
@@ -353,8 +353,10 @@ mod tests {
|
|||||||
let conf = PageServerConf {
|
let conf = PageServerConf {
|
||||||
daemonize: false,
|
daemonize: false,
|
||||||
interactive: false,
|
interactive: false,
|
||||||
|
materialize: false,
|
||||||
gc_horizon: 64 * 1024 * 1024,
|
gc_horizon: 64 * 1024 * 1024,
|
||||||
gc_period: Duration::from_secs(10),
|
gc_period: Duration::from_secs(10),
|
||||||
|
wal_redoers: 1,
|
||||||
listen_addr: "127.0.0.1:5430".to_string(),
|
listen_addr: "127.0.0.1:5430".to_string(),
|
||||||
workdir: repo_dir,
|
workdir: repo_dir,
|
||||||
pg_distrib_dir: "".into(),
|
pg_distrib_dir: "".into(),
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ pub fn import_timeline_from_postgres_datadir(
|
|||||||
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?;
|
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?;
|
||||||
// Extract checkpoint record from pg_control and store is as separate object
|
// Extract checkpoint record from pg_control and store is as separate object
|
||||||
let pg_control_bytes =
|
let pg_control_bytes =
|
||||||
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn)?;
|
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn, false)?;
|
||||||
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
|
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
|
||||||
let checkpoint_bytes = pg_control.checkPointCopy.encode();
|
let checkpoint_bytes = pg_control.checkPointCopy.encode();
|
||||||
timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?;
|
timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?;
|
||||||
@@ -298,7 +298,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
|
|||||||
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
|
||||||
let mut last_lsn = startpoint;
|
let mut last_lsn = startpoint;
|
||||||
|
|
||||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
|
let checkpoint_bytes =
|
||||||
|
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
|
||||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -578,7 +579,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
|
|||||||
blknum,
|
blknum,
|
||||||
});
|
});
|
||||||
|
|
||||||
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?;
|
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn, false)?;
|
||||||
|
|
||||||
debug!("copying block {:?} to {:?}", src_key, dst_key);
|
debug!("copying block {:?} to {:?}", src_key, dst_key);
|
||||||
|
|
||||||
@@ -598,7 +599,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
|
|||||||
match tag {
|
match tag {
|
||||||
ObjectTag::FileNodeMap(db) => {
|
ObjectTag::FileNodeMap(db) => {
|
||||||
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
|
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
|
||||||
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?;
|
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn, false)?;
|
||||||
let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
|
let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
|
||||||
spcnode: tablespace_id,
|
spcnode: tablespace_id,
|
||||||
dbnode: db_id,
|
dbnode: db_id,
|
||||||
|
|||||||
@@ -169,7 +169,8 @@ fn walreceiver_main(
|
|||||||
|
|
||||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||||
|
|
||||||
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
|
let checkpoint_bytes =
|
||||||
|
timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
|
||||||
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||||
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
||||||
|
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ use std::io::Error;
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
@@ -103,13 +104,16 @@ struct PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct WalRedoRequest {
|
struct WalRedoRequestData {
|
||||||
tag: ObjectTag,
|
tag: ObjectTag,
|
||||||
lsn: Lsn,
|
lsn: Lsn,
|
||||||
|
|
||||||
base_img: Option<Bytes>,
|
base_img: Option<Bytes>,
|
||||||
records: Vec<WALRecord>,
|
records: Vec<WALRecord>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct WalRedoRequest {
|
||||||
|
data: WalRedoRequestData,
|
||||||
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
|
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,10 +179,12 @@ impl WalRedoManager for PostgresRedoManager {
|
|||||||
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
|
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
|
||||||
|
|
||||||
let request = WalRedoRequest {
|
let request = WalRedoRequest {
|
||||||
tag,
|
data: WalRedoRequestData {
|
||||||
lsn,
|
tag,
|
||||||
base_img,
|
lsn,
|
||||||
records,
|
base_img,
|
||||||
|
records,
|
||||||
|
},
|
||||||
response_channel: tx,
|
response_channel: tx,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -229,40 +235,64 @@ impl PostgresRedoManagerInternal {
|
|||||||
.build()
|
.build()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let process: PostgresRedoProcess;
|
let processes: Vec<PostgresRedoProcess>;
|
||||||
|
|
||||||
info!("launching WAL redo postgres process");
|
info!("launching WAL redo postgres process");
|
||||||
|
|
||||||
process = runtime
|
let wal_redoers = self.conf.wal_redoers;
|
||||||
.block_on(PostgresRedoProcess::launch(self.conf))
|
processes = (0..wal_redoers)
|
||||||
.unwrap();
|
.map(|i| {
|
||||||
|
runtime
|
||||||
|
.block_on(PostgresRedoProcess::launch(self.conf, i))
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Loop forever, handling requests as they come.
|
// Loop forever, handling requests as they come.
|
||||||
loop {
|
loop {
|
||||||
let request = self
|
let mut requests: Vec<WalRedoRequest> = Vec::new();
|
||||||
.request_rx
|
requests.push(
|
||||||
.recv()
|
self.request_rx
|
||||||
.expect("WAL redo request channel was closed");
|
.recv()
|
||||||
|
.expect("WAL redo request channel was closed"),
|
||||||
|
);
|
||||||
|
loop {
|
||||||
|
let req = self.request_rx.try_recv();
|
||||||
|
match req {
|
||||||
|
Ok(req) => requests.push(req),
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let request_data = requests.iter().map(|req| &req.data);
|
||||||
|
let mut rr = 0; // round robin
|
||||||
|
let results = runtime.block_on(async {
|
||||||
|
let futures = request_data.map(|req| {
|
||||||
|
rr += 1;
|
||||||
|
self.handle_apply_request(&processes[rr % wal_redoers], &req)
|
||||||
|
});
|
||||||
|
let mut results: Vec<Result<Bytes, WalRedoError>> = Vec::new();
|
||||||
|
for future in futures {
|
||||||
|
results.push(future.await);
|
||||||
|
}
|
||||||
|
results
|
||||||
|
});
|
||||||
|
for (result, request) in results.into_iter().zip(requests.iter()) {
|
||||||
|
let result_ok = result.is_ok();
|
||||||
|
|
||||||
let result = runtime.block_on(self.handle_apply_request(&process, &request));
|
// Send the result to the requester
|
||||||
let result_ok = result.is_ok();
|
let _ = request.response_channel.send(result);
|
||||||
|
|
||||||
// Send the result to the requester
|
if !result_ok {
|
||||||
let _ = request.response_channel.send(result);
|
error!("wal-redo-postgres failed to apply request {:?}", request);
|
||||||
|
}
|
||||||
if !result_ok {
|
|
||||||
error!("wal-redo-postgres failed to apply request {:?}", request);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///
|
|
||||||
/// Process one request for WAL redo.
|
|
||||||
///
|
|
||||||
async fn handle_apply_request(
|
async fn handle_apply_request(
|
||||||
&self,
|
&self,
|
||||||
process: &PostgresRedoProcess,
|
process: &PostgresRedoProcess,
|
||||||
request: &WalRedoRequest,
|
request: &WalRedoRequestData,
|
||||||
) -> Result<Bytes, WalRedoError> {
|
) -> Result<Bytes, WalRedoError> {
|
||||||
let tag = request.tag;
|
let tag = request.tag;
|
||||||
let lsn = request.lsn;
|
let lsn = request.lsn;
|
||||||
@@ -446,19 +476,19 @@ impl PostgresRedoManagerInternal {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct PostgresRedoProcess {
|
struct PostgresRedoProcess {
|
||||||
stdin: RefCell<ChildStdin>,
|
stdin: Arc<RefCell<ChildStdin>>,
|
||||||
stdout: RefCell<ChildStdout>,
|
stdout: Arc<RefCell<ChildStdout>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PostgresRedoProcess {
|
impl PostgresRedoProcess {
|
||||||
//
|
//
|
||||||
// Start postgres binary in special WAL redo mode.
|
// Start postgres binary in special WAL redo mode.
|
||||||
//
|
//
|
||||||
async fn launch(conf: &PageServerConf) -> Result<PostgresRedoProcess, Error> {
|
async fn launch(conf: &PageServerConf, id: usize) -> Result<PostgresRedoProcess, Error> {
|
||||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||||
// just create one with constant name. That fails if you try to launch more than
|
// just create one with constant name. That fails if you try to launch more than
|
||||||
// one WAL redo manager concurrently.
|
// one WAL redo manager concurrently.
|
||||||
let datadir = conf.workdir.join("wal-redo-datadir");
|
let datadir = conf.workdir.join(format!("wal-redo-datadir-{}", id));
|
||||||
|
|
||||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||||
if datadir.exists() {
|
if datadir.exists() {
|
||||||
@@ -538,8 +568,8 @@ impl PostgresRedoProcess {
|
|||||||
tokio::spawn(f_stderr);
|
tokio::spawn(f_stderr);
|
||||||
|
|
||||||
Ok(PostgresRedoProcess {
|
Ok(PostgresRedoProcess {
|
||||||
stdin: RefCell::new(stdin),
|
stdin: Arc::new(RefCell::new(stdin)),
|
||||||
stdout: RefCell::new(stdout),
|
stdout: Arc::new(RefCell::new(stdout)),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
25
test_runner/batch_others/test_bulk_insert.py
Normal file
25
test_runner/batch_others/test_bulk_insert.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
import psycopg2.extras
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test insertion of larg number of records
|
||||||
|
#
|
||||||
|
# This test is pretty tightly coupled with the current implementation of page version storage
|
||||||
|
# and garbage collection in object_repository.rs.
|
||||||
|
#
|
||||||
|
def test_bulk_insert(zenith_cli, pageserver, postgres, pg_bin):
|
||||||
|
zenith_cli.run(["branch", "test_bulk_insert", "empty"])
|
||||||
|
pg = postgres.create_start('test_bulk_insert')
|
||||||
|
|
||||||
|
with closing(pg.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)")
|
||||||
|
cur.execute("create index on t(c1)")
|
||||||
|
cur.execute("create index on t(c2)")
|
||||||
|
cur.execute("create index on t(c3)")
|
||||||
|
cur.execute("create index on t(c4)")
|
||||||
|
cur.execute("create index on t(c5)")
|
||||||
|
cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))")
|
||||||
|
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000,random()*1000000,random()*1000000,random()*1000000)")
|
||||||
26
test_runner/batch_others/test_seq_scan.py
Normal file
26
test_runner/batch_others/test_seq_scan.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
from contextlib import closing
|
||||||
|
import psycopg2.extras
|
||||||
|
import time
|
||||||
|
|
||||||
|
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test insertion of larg number of records
|
||||||
|
#
|
||||||
|
# This test is pretty tightly coupled with the current implementation of page version storage
|
||||||
|
# and garbage collection in object_repository.rs.
|
||||||
|
#
|
||||||
|
def test_seq_scan(zenith_cli, pageserver, postgres, pg_bin):
|
||||||
|
zenith_cli.run(["branch", "test_seq_scan", "empty"])
|
||||||
|
pg = postgres.create_start('test_seq_scan')
|
||||||
|
|
||||||
|
with closing(pg.connect()) as conn:
|
||||||
|
with conn.cursor() as cur:
|
||||||
|
cur.execute("create table t(c1 bigint, c2 bigint, c3 bigint, c4 bigint, c5 bigint)")
|
||||||
|
cur.execute("insert into t values (generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000),generate_series(1,1000000))")
|
||||||
|
cur.execute("set max_parallel_workers_per_gather=0");
|
||||||
|
for i in range(100):
|
||||||
|
start = time.time()
|
||||||
|
cur.execute("select count(*) from t");
|
||||||
|
stop = time.time()
|
||||||
|
print(f'Elapsed time for iterating through 1000000 records is {stop - start}')
|
||||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 8ab674ad99...fedd3660a2
Reference in New Issue
Block a user