From 1e0e3fbde01d26c80975509603810ddec83ed44f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 20 Jul 2021 19:15:55 +0300 Subject: [PATCH] Spawn multiple wal-redo postgres instances --- pageserver/src/basebackup.rs | 10 +-- pageserver/src/bin/pageserver.rs | 26 +++++++- pageserver/src/lib.rs | 2 + pageserver/src/object_repository.rs | 27 +++++--- pageserver/src/repository.rs | 2 +- pageserver/src/restore_local_repo.rs | 8 +-- pageserver/src/walreceiver.rs | 2 +- pageserver/src/walredo.rs | 93 +++++++++++++++++----------- 8 files changed, 113 insertions(+), 57 deletions(-) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 8b5e759eed..13c33621a5 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -132,7 +132,7 @@ impl<'a> Basebackup<'a> { tag: &ObjectTag, page: u32, ) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?; // Zero length image indicates truncated segment: just skip it if !img.is_empty() { assert!(img.len() == pg_constants::BLCKSZ as usize); @@ -172,7 +172,7 @@ impl<'a> Basebackup<'a> { // Extract pg_filenode.map files from repository // fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?; info!("add_relmap_file {:?}", db); let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { String::from("global/pg_filenode.map") // filenode map for global tablespace @@ -198,7 +198,7 @@ impl<'a> Basebackup<'a> { if self.timeline.get_tx_status(xid, self.lsn)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn, false)?; let mut buf = BytesMut::new(); buf.extend_from_slice(&img[..]); let crc = crc32c::crc32c(&img[..]); @@ -216,10 +216,10 @@ impl<'a> Basebackup<'a> { fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { let checkpoint_bytes = self .timeline - .get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?; + .get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn, false)?; let pg_control_bytes = self .timeline - .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; + .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn, false)?; let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 87b8354269..7d4b8614e3 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -28,12 +28,15 @@ const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000"; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); +const DEFAULT_WAL_REDOERS: usize = 2; + /// String arguments that can be declared via CLI or config file #[derive(Serialize, Deserialize)] struct CfgFileParams { listen_addr: Option, gc_horizon: Option, gc_period: Option, + wal_redoers: Option, pg_distrib_dir: Option, } @@ -48,6 +51,7 @@ impl CfgFileParams { listen_addr: get_arg("listen"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), + wal_redoers: get_arg("wal_redoers"), pg_distrib_dir: get_arg("postgres-distrib"), } } @@ -59,6 +63,7 @@ impl CfgFileParams { listen_addr: self.listen_addr.or(other.listen_addr), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), + wal_redoers: self.wal_redoers.or(other.wal_redoers), pg_distrib_dir: self.pg_distrib_dir.or(other.pg_distrib_dir), } } @@ -79,6 +84,11 @@ impl CfgFileParams { None => DEFAULT_GC_PERIOD, }; + let wal_redoers = match self.wal_redoers.as_ref() { + Some(wal_redoers_str) => wal_redoers_str.parse::()?, + None => DEFAULT_WAL_REDOERS, + }; + let pg_distrib_dir = match self.pg_distrib_dir.as_ref() { Some(pg_distrib_dir_str) => PathBuf::from(pg_distrib_dir_str), None => env::current_dir()?.join("tmp_install"), @@ -91,11 +101,12 @@ impl CfgFileParams { Ok(PageServerConf { daemonize: false, interactive: false, + materialize: false, listen_addr, gc_horizon, gc_period, - + wal_redoers, workdir: PathBuf::from("."), pg_distrib_dir, @@ -120,6 +131,12 @@ fn main() -> Result<()> { .takes_value(false) .help("Interactive mode"), ) + .arg( + Arg::with_name("materialize") + .long("materialize") + .takes_value(false) + .help("Materialize pages constructed by get_page_at"), + ) .arg( Arg::with_name("daemonize") .short("d") @@ -145,6 +162,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Interval between garbage collector iterations"), ) + .arg( + Arg::with_name("wal_redoers") + .long("wal_redoers") + .takes_value(true) + .help("Number of wal-redo postgres instances"), + ) .arg( Arg::with_name("workdir") .short("D") @@ -181,6 +204,7 @@ fn main() -> Result<()> { conf.daemonize = arg_matches.is_present("daemonize"); conf.interactive = arg_matches.is_present("interactive"); + conf.materialize = arg_matches.is_present("materialize"); if init && (conf.daemonize || conf.interactive) { eprintln!("--daemonize and --interactive may not be used with --init"); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index d4cce49f9c..cd50c3a742 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -26,9 +26,11 @@ pub mod walredo; pub struct PageServerConf { pub daemonize: bool, pub interactive: bool, + pub materialize: bool, pub listen_addr: String, pub gc_horizon: u64, pub gc_period: Duration, + pub wal_redoers: usize, // Repository directory, relative to current working directory. // Normally, the page server changes the current working directory diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index dd1c0d0eba..6db9773b81 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -71,6 +71,7 @@ impl Repository for ObjectRepository { Some(timeline) => Ok(timeline.clone()), None => { let timeline = ObjectTimeline::open( + self.conf, Arc::clone(&self.obj_store), timelineid, self.walredo_mgr.clone(), @@ -124,6 +125,7 @@ impl Repository for ObjectRepository { info!("Created empty timeline {}", timelineid); let timeline = ObjectTimeline::open( + self.conf, Arc::clone(&self.obj_store), timelineid, self.walredo_mgr.clone(), @@ -163,7 +165,7 @@ impl Repository for ObjectRepository { match tag { ObjectTag::TimelineMetadataTag => {} // skip it _ => { - let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?; + let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn, false)?; let val = ObjectValue::Page(PageEntry::Page(img)); let key = ObjectKey { timeline: dst, tag }; self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?; @@ -197,6 +199,7 @@ pub struct ObjectTimeline { // Backing key-value store obj_store: Arc, + conf: &'static PageServerConf, // WAL redo manager, for reconstructing page versions from WAL records. walredo_mgr: Arc, @@ -231,6 +234,7 @@ impl ObjectTimeline { /// /// Loads the metadata for the timeline into memory. fn open( + conf: &'static PageServerConf, obj_store: Arc, timelineid: ZTimelineId, walredo_mgr: Arc, @@ -244,6 +248,7 @@ impl ObjectTimeline { let timeline = ObjectTimeline { timelineid, obj_store, + conf, walredo_mgr, last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), @@ -265,10 +270,10 @@ impl Timeline for ObjectTimeline { fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; - self.get_page_at_lsn_nowait(tag, lsn) + self.get_page_at_lsn_nowait(tag, lsn, self.conf.materialize) } - fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn, materialize: bool) -> Result { const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; // Look up the page entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. @@ -290,11 +295,13 @@ impl Timeline for ObjectTimeline { let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; - // Garbage collection assumes that we remember the materialized page - // version. Otherwise we could opt to not do it, with the downside that - // the next GetPage@LSN call of the same page version would have to - // redo the WAL again. - self.put_page_image(tag, lsn, page_img.clone(), false)?; + if materialize { + // Garbage collection assumes that we remember the materialized page + // version. Otherwise we could opt to not do it, with the downside that + // the next GetPage@LSN call of the same page version would have to + // redo the WAL again. + self.put_page_image(tag, lsn, page_img.clone(), false)?; + } } ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE), _ => bail!("Invalid object kind, expected a page entry or SLRU truncate"), @@ -712,7 +719,7 @@ impl Timeline for ObjectTimeline { { if rel_size > tag.blknum { // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(obj, lsn)?; + self.get_page_at_lsn_nowait(obj, lsn, true)?; continue; } debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size); @@ -755,7 +762,7 @@ impl Timeline for ObjectTimeline { } ObjectValue::Page(PageEntry::WALRecord(_)) => { // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(obj, lsn)?; + self.get_page_at_lsn_nowait(obj, lsn, true)?; } _ => {} // do nothing if already materialized } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 968db48f0a..7e381e8280 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -59,7 +59,7 @@ pub trait Timeline: Send + Sync { fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result; /// Look up given page in the cache. - fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result; + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn, materialize: bool) -> Result; /// Get size of relation fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index ad5e7cb298..c530cc7be1 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -70,7 +70,7 @@ pub fn import_timeline_from_postgres_datadir( import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?; // Extract checkpoint record from pg_control and store is as separate object let pg_control_bytes = - timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn)?; + timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn, false)?; let pg_control = ControlFileData::decode(&pg_control_bytes)?; let checkpoint_bytes = pg_control.checkPointCopy.encode(); timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?; @@ -298,7 +298,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; loop { @@ -578,7 +578,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab blknum, }); - let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?; + let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn, false)?; debug!("copying block {:?} to {:?}", src_key, dst_key); @@ -598,7 +598,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab match tag { ObjectTag::FileNodeMap(db) => { if db.spcnode == src_tablespace_id && db.dbnode == src_db_id { - let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?; + let img = timeline.get_page_at_lsn_nowait(tag, req_lsn, false)?; let new_tag = ObjectTag::FileNodeMap(DatabaseTag { spcnode: tablespace_id, dbnode: db_id, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 52c5f0e470..811c69217f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -169,7 +169,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint, false)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e43b9c354a..f7ff7faae5 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -27,6 +27,7 @@ use std::path::PathBuf; use std::process::Stdio; use std::sync::mpsc; use std::sync::Mutex; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; use tokio::io::AsyncBufReadExt; @@ -103,13 +104,16 @@ struct PostgresRedoManagerInternal { } #[derive(Debug)] -struct WalRedoRequest { +struct WalRedoRequestData { tag: ObjectTag, lsn: Lsn, - base_img: Option, records: Vec, +} +#[derive(Debug)] +struct WalRedoRequest { + data: WalRedoRequestData, response_channel: mpsc::Sender>, } @@ -175,10 +179,12 @@ impl WalRedoManager for PostgresRedoManager { let (tx, rx) = mpsc::channel::>(); let request = WalRedoRequest { - tag, - lsn, - base_img, - records, + data: WalRedoRequestData { + tag, + lsn, + base_img, + records, + }, response_channel: tx, }; @@ -229,47 +235,64 @@ impl PostgresRedoManagerInternal { .build() .unwrap(); - let process: PostgresRedoProcess; + let processes: Vec; info!("launching WAL redo postgres process"); - process = runtime - .block_on(PostgresRedoProcess::launch(self.conf)) - .unwrap(); + let wal_redoers = self.conf.wal_redoers; + processes = (0..wal_redoers).map(|i|runtime.block_on(PostgresRedoProcess::launch(self.conf, i)).unwrap()).collect(); // Loop forever, handling requests as they come. loop { - let request = self - .request_rx - .recv() - .expect("WAL redo request channel was closed"); + let mut requests: Vec = Vec::new(); + requests.push(self + .request_rx + .recv() + .expect("WAL redo request channel was closed")); + loop { + let req = self.request_rx.try_recv(); + match req { + Ok(req) => requests.push(req), + Err(_) => break, + } + } + let request_data = requests.iter().map(|req| &req.data); + let mut rr = 0; // round robin + let results = runtime.block_on(async { + let futures = request_data.map(|req| { + rr += 1; + self.handle_apply_request(&processes[rr % wal_redoers], &req) + }); + let mut results : Vec> = Vec::new(); + for future in futures { + results.push(future.await); + } + results + }); + for (result, request) in results.into_iter().zip(requests.iter()) { + let result_ok = result.is_ok(); - let result = runtime.block_on(self.handle_apply_request(&process, &request)); - let result_ok = result.is_ok(); + // Send the result to the requester + let _ = request.response_channel.send(result); - // Send the result to the requester - let _ = request.response_channel.send(result); + if !result_ok { + error!("wal-redo-postgres failed to apply request {:?}", request); + } + } + } + } - if !result_ok { - error!("wal-redo-postgres failed to apply request {:?}", request); - } - } - } - - /// - /// Process one request for WAL redo. - /// async fn handle_apply_request( &self, process: &PostgresRedoProcess, - request: &WalRedoRequest, + request: &WalRedoRequestData, ) -> Result { let tag = request.tag; let lsn = request.lsn; let base_img = request.base_img.clone(); let records = &request.records; - let nrecords = records.len(); + let nrecords = records.len(); let start = Instant::now(); @@ -446,19 +469,19 @@ impl PostgresRedoManagerInternal { } struct PostgresRedoProcess { - stdin: RefCell, - stdout: RefCell, + stdin: Arc>, + stdout: Arc>, } impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - async fn launch(conf: &PageServerConf) -> Result { + async fn launch(conf: &PageServerConf, id: usize) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. - let datadir = conf.workdir.join("wal-redo-datadir"); + let datadir = conf.workdir.join(format!("wal-redo-datadir-{}",id)); // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { @@ -538,8 +561,8 @@ impl PostgresRedoProcess { tokio::spawn(f_stderr); Ok(PostgresRedoProcess { - stdin: RefCell::new(stdin), - stdout: RefCell::new(stdout), + stdin: Arc::new(RefCell::new(stdin)), + stdout: Arc::new(RefCell::new(stdout)), }) }