Spawn multiple wal-redo postgres instances

This commit is contained in:
Konstantin Knizhnik
2021-07-20 19:15:55 +03:00
parent 7c96c638aa
commit 1e0e3fbde0
8 changed files with 113 additions and 57 deletions

View File

@@ -132,7 +132,7 @@ impl<'a> Basebackup<'a> {
tag: &ObjectTag,
page: u32,
) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
// Zero length image indicates truncated segment: just skip it
if !img.is_empty() {
assert!(img.len() == pg_constants::BLCKSZ as usize);
@@ -172,7 +172,7 @@ impl<'a> Basebackup<'a> {
// Extract pg_filenode.map files from repository
//
fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> {
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
info!("add_relmap_file {:?}", db);
let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID {
String::from("global/pg_filenode.map") // filenode map for global tablespace
@@ -198,7 +198,7 @@ impl<'a> Basebackup<'a> {
if self.timeline.get_tx_status(xid, self.lsn)?
== pg_constants::TRANSACTION_STATUS_IN_PROGRESS
{
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?;
let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?;
let mut buf = BytesMut::new();
buf.extend_from_slice(&img[..]);
let crc = crc32c::crc32c(&img[..]);
@@ -216,10 +216,10 @@ impl<'a> Basebackup<'a> {
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?;
.get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?;
let pg_control_bytes = self
.timeline
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?;
.get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;

View File

@@ -28,12 +28,15 @@ const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000";
const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
const DEFAULT_WAL_REDOERS: usize = 2;
/// String arguments that can be declared via CLI or config file
#[derive(Serialize, Deserialize)]
struct CfgFileParams {
listen_addr: Option<String>,
gc_horizon: Option<String>,
gc_period: Option<String>,
wal_redoers: Option<String>,
pg_distrib_dir: Option<String>,
}
@@ -48,6 +51,7 @@ impl CfgFileParams {
listen_addr: get_arg("listen"),
gc_horizon: get_arg("gc_horizon"),
gc_period: get_arg("gc_period"),
wal_redoers: get_arg("wal_redoers"),
pg_distrib_dir: get_arg("postgres-distrib"),
}
}
@@ -59,6 +63,7 @@ impl CfgFileParams {
listen_addr: self.listen_addr.or(other.listen_addr),
gc_horizon: self.gc_horizon.or(other.gc_horizon),
gc_period: self.gc_period.or(other.gc_period),
wal_redoers: self.wal_redoers.or(other.wal_redoers),
pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir),
}
}
@@ -79,6 +84,11 @@ impl CfgFileParams {
None => DEFAULT_GC_PERIOD,
};
let wal_redoers = match self.wal_redoers.as_ref() {
Some(wal_redoers_str) => wal_redoers_str.parse::<usize>()?,
None => DEFAULT_WAL_REDOERS,
};
let pg_distrib_dir = match self.pg_distrib_dir.as_ref() {
Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str),
None => env::current_dir()?.join("tmp_install"),
@@ -91,11 +101,12 @@ impl CfgFileParams {
Ok(PageServerConf {
daemonize: false,
interactive: false,
materialize: false,
listen_addr,
gc_horizon,
gc_period,
wal_redoers,
workdir: PathBuf::from("."),
pg_distrib_dir,
@@ -120,6 +131,12 @@ fn main() -> Result<()> {
.takes_value(false)
.help("Interactive mode"),
)
.arg(
Arg::with_name("materialize")
.long("materialize")
.takes_value(false)
.help("Materialize pages constructed by get_page_at"),
)
.arg(
Arg::with_name("daemonize")
.short("d")
@@ -145,6 +162,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Interval between garbage collector iterations"),
)
.arg(
Arg::with_name("wal_redoers")
.long("wal_redoers")
.takes_value(true)
.help("Number of wal-redo postgres instances"),
)
.arg(
Arg::with_name("workdir")
.short("D")
@@ -181,6 +204,7 @@ fn main() -> Result<()> {
conf.daemonize = arg_matches.is_present("daemonize");
conf.interactive = arg_matches.is_present("interactive");
conf.materialize = arg_matches.is_present("materialize");
if init && (conf.daemonize || conf.interactive) {
eprintln!("--daemonize and --interactive may not be used with --init");

View File

@@ -26,9 +26,11 @@ pub mod walredo;
pub struct PageServerConf {
pub daemonize: bool,
pub interactive: bool,
pub materialize: bool,
pub listen_addr: String,
pub gc_horizon: u64,
pub gc_period: Duration,
pub wal_redoers: usize,
// Repository directory, relative to current working directory.
// Normally, the page server changes the current working directory

View File

@@ -71,6 +71,7 @@ impl Repository for ObjectRepository {
Some(timeline) => Ok(timeline.clone()),
None => {
let timeline = ObjectTimeline::open(
self.conf,
Arc::clone(&self.obj_store),
timelineid,
self.walredo_mgr.clone(),
@@ -124,6 +125,7 @@ impl Repository for ObjectRepository {
info!("Created empty timeline {}", timelineid);
let timeline = ObjectTimeline::open(
self.conf,
Arc::clone(&self.obj_store),
timelineid,
self.walredo_mgr.clone(),
@@ -163,7 +165,7 @@ impl Repository for ObjectRepository {
match tag {
ObjectTag::TimelineMetadataTag => {} // skip it
_ => {
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?;
let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn, false)?;
let val = ObjectValue::Page(PageEntry::Page(img));
let key = ObjectKey { timeline: dst, tag };
self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?;
@@ -197,6 +199,7 @@ pub struct ObjectTimeline {
// Backing key-value store
obj_store: Arc<dyn ObjectStore>,
conf: &'static PageServerConf,
// WAL redo manager, for reconstructing page versions from WAL records.
walredo_mgr: Arc<dyn WalRedoManager>,
@@ -231,6 +234,7 @@ impl ObjectTimeline {
///
/// Loads the metadata for the timeline into memory.
fn open(
conf: &'static PageServerConf,
obj_store: Arc<dyn ObjectStore>,
timelineid: ZTimelineId,
walredo_mgr: Arc<dyn WalRedoManager>,
@@ -244,6 +248,7 @@ impl ObjectTimeline {
let timeline = ObjectTimeline {
timelineid,
obj_store,
conf,
walredo_mgr,
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
@@ -265,10 +270,10 @@ impl Timeline for ObjectTimeline {
fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
let lsn = self.wait_lsn(req_lsn)?;
self.get_page_at_lsn_nowait(tag, lsn)
self.get_page_at_lsn_nowait(tag, lsn, self.conf.materialize)
}
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result<Bytes> {
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn, materialize: bool) -> Result<Bytes> {
const ZERO_PAGE: [u8; 8192] = [0u8; 8192];
// Look up the page entry. If it's a page image, return that. If it's a WAL record,
// ask the WAL redo service to reconstruct the page image from the WAL records.
@@ -290,11 +295,13 @@ impl Timeline for ObjectTimeline {
let (base_img, records) = self.collect_records_for_apply(tag, lsn)?;
page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?;
// Garbage collection assumes that we remember the materialized page
// version. Otherwise we could opt to not do it, with the downside that
// the next GetPage@LSN call of the same page version would have to
// redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone(), false)?;
if materialize {
// Garbage collection assumes that we remember the materialized page
// version. Otherwise we could opt to not do it, with the downside that
// the next GetPage@LSN call of the same page version would have to
// redo the WAL again.
self.put_page_image(tag, lsn, page_img.clone(), false)?;
}
}
ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE),
_ => bail!("Invalid object kind, expected a page entry or SLRU truncate"),
@@ -712,7 +719,7 @@ impl Timeline for ObjectTimeline {
{
if rel_size > tag.blknum {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
self.get_page_at_lsn_nowait(obj, lsn, true)?;
continue;
}
debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size);
@@ -755,7 +762,7 @@ impl Timeline for ObjectTimeline {
}
ObjectValue::Page(PageEntry::WALRecord(_)) => {
// preserve and materialize last version before deleting all preceeding
self.get_page_at_lsn_nowait(obj, lsn)?;
self.get_page_at_lsn_nowait(obj, lsn, true)?;
}
_ => {} // do nothing if already materialized
}

View File

@@ -59,7 +59,7 @@ pub trait Timeline: Send + Sync {
fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
/// Look up given page in the cache.
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result<Bytes>;
fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn, materialize: bool) -> Result<Bytes>;
/// Get size of relation
fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result<u32>;

View File

@@ -70,7 +70,7 @@ pub fn import_timeline_from_postgres_datadir(
import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?;
// Extract checkpoint record from pg_control and store is as separate object
let pg_control_bytes =
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn)?;
timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn, false)?;
let pg_control = ControlFileData::decode(&pg_control_bytes)?;
let checkpoint_bytes = pg_control.checkPointCopy.encode();
timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?;
@@ -298,7 +298,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint:
let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE);
let mut last_lsn = startpoint;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
loop {
@@ -578,7 +578,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
blknum,
});
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?;
let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn, false)?;
debug!("copying block {:?} to {:?}", src_key, dst_key);
@@ -598,7 +598,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab
match tag {
ObjectTag::FileNodeMap(db) => {
if db.spcnode == src_tablespace_id && db.dbnode == src_db_id {
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?;
let img = timeline.get_page_at_lsn_nowait(tag, req_lsn, false)?;
let new_tag = ObjectTag::FileNodeMap(DatabaseTag {
spcnode: tablespace_id,
dbnode: db_id,

View File

@@ -169,7 +169,7 @@ fn walreceiver_main(
let mut waldecoder = WalStreamDecoder::new(startpoint);
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?;
let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);

View File

@@ -27,6 +27,7 @@ use std::path::PathBuf;
use std::process::Stdio;
use std::sync::mpsc;
use std::sync::Mutex;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
@@ -103,13 +104,16 @@ struct PostgresRedoManagerInternal {
}
#[derive(Debug)]
struct WalRedoRequest {
struct WalRedoRequestData {
tag: ObjectTag,
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
}
#[derive(Debug)]
struct WalRedoRequest {
data: WalRedoRequestData,
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
}
@@ -175,10 +179,12 @@ impl WalRedoManager for PostgresRedoManager {
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest {
tag,
lsn,
base_img,
records,
data: WalRedoRequestData {
tag,
lsn,
base_img,
records,
},
response_channel: tx,
};
@@ -229,47 +235,64 @@ impl PostgresRedoManagerInternal {
.build()
.unwrap();
let process: PostgresRedoProcess;
let processes: Vec<PostgresRedoProcess>;
info!("launching WAL redo postgres process");
process = runtime
.block_on(PostgresRedoProcess::launch(self.conf))
.unwrap();
let wal_redoers = self.conf.wal_redoers;
processes = (0..wal_redoers).map(|i|runtime.block_on(PostgresRedoProcess::launch(self.conf, i)).unwrap()).collect();
// Loop forever, handling requests as they come.
loop {
let request = self
.request_rx
.recv()
.expect("WAL redo request channel was closed");
let mut requests: Vec<WalRedoRequest> = Vec::new();
requests.push(self
.request_rx
.recv()
.expect("WAL redo request channel was closed"));
loop {
let req = self.request_rx.try_recv();
match req {
Ok(req) => requests.push(req),
Err(_) => break,
}
}
let request_data = requests.iter().map(|req| &req.data);
let mut rr = 0; // round robin
let results = runtime.block_on(async {
let futures = request_data.map(|req| {
rr += 1;
self.handle_apply_request(&processes[rr % wal_redoers], &req)
});
let mut results : Vec<Result<Bytes, WalRedoError>> = Vec::new();
for future in futures {
results.push(future.await);
}
results
});
for (result, request) in results.into_iter().zip(requests.iter()) {
let result_ok = result.is_ok();
let result = runtime.block_on(self.handle_apply_request(&process, &request));
let result_ok = result.is_ok();
// Send the result to the requester
let _ = request.response_channel.send(result);
// Send the result to the requester
let _ = request.response_channel.send(result);
if !result_ok {
error!("wal-redo-postgres failed to apply request {:?}", request);
}
}
}
}
if !result_ok {
error!("wal-redo-postgres failed to apply request {:?}", request);
}
}
}
///
/// Process one request for WAL redo.
///
async fn handle_apply_request(
&self,
process: &PostgresRedoProcess,
request: &WalRedoRequest,
request: &WalRedoRequestData,
) -> Result<Bytes, WalRedoError> {
let tag = request.tag;
let lsn = request.lsn;
let base_img = request.base_img.clone();
let records = &request.records;
let nrecords = records.len();
let nrecords = records.len();
let start = Instant::now();
@@ -446,19 +469,19 @@ impl PostgresRedoManagerInternal {
}
struct PostgresRedoProcess {
stdin: RefCell<ChildStdin>,
stdout: RefCell<ChildStdout>,
stdin: Arc<RefCell<ChildStdin>>,
stdout: Arc<RefCell<ChildStdout>>,
}
impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
async fn launch(conf: &PageServerConf) -> Result<PostgresRedoProcess, Error> {
async fn launch(conf: &PageServerConf, id: usize) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf.workdir.join("wal-redo-datadir");
let datadir = conf.workdir.join(format!("wal-redo-datadir-{}",id));
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
@@ -538,8 +561,8 @@ impl PostgresRedoProcess {
tokio::spawn(f_stderr);
Ok(PostgresRedoProcess {
stdin: RefCell::new(stdin),
stdout: RefCell::new(stdout),
stdin: Arc::new(RefCell::new(stdin)),
stdout: Arc::new(RefCell::new(stdout)),
})
}