diff --git a/Cargo.lock b/Cargo.lock index 6d668d58b2..da38779279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1381,7 +1381,6 @@ dependencies = [ "chrono", "clap", "crc32c", - "crossbeam-channel", "daemonize", "futures", "hex", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index a198f6403a..5309a77591 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" [dependencies] chrono = "0.4.19" -crossbeam-channel = "0.5.0" rand = "0.8.3" regex = "1.4.5" bytes = "1.0.1" diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8060b385f1..6132731688 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -7,11 +7,9 @@ use crate::restore_local_repo::restore_timeline; use crate::waldecoder::Oid; use crate::ZTimelineId; -use crate::{walredo, zenith_repo_dir, PageServerConf}; +use crate::{zenith_repo_dir, PageServerConf}; use anyhow::{bail, Context}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crossbeam_channel::unbounded; -use crossbeam_channel::{Receiver, Sender}; use lazy_static::lazy_static; use log::*; use rocksdb; @@ -19,11 +17,12 @@ use std::cmp::min; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; use zenith_utils::seqwait::SeqWait; +use crate::walredo::WalRedoManager; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -34,9 +33,8 @@ pub struct PageCache { // RocksDB handle db: rocksdb::DB, - // Channel for communicating with the WAL redo process here. - pub walredo_sender: Sender>, - pub walredo_receiver: Receiver>, + // WAL redo manager + walredo_mgr: WalRedoManager, // Allows .await on the arrival of a particular LSN. seqwait_lsn: SeqWait, @@ -131,20 +129,11 @@ pub fn get_or_restore_pagecache( let result = Arc::new(pcache); + // Launch the WAL redo thread + result.walredo_mgr.launch(result.clone()); + pcaches.insert(timelineid, result.clone()); - // Initialize the WAL redo thread - // - // Now join_handle is not saved any where and we won'try restart tharead - // if it is dead. We may later stop that treads after some inactivity period - // and restart them on demand. - let conf_copy = conf.clone(); - let _walredo_thread = thread::Builder::new() - .name("WAL redo thread".into()) - .spawn(move || { - walredo::wal_redo_main(&conf_copy, timelineid); - }) - .unwrap(); if conf.gc_horizon != 0 { let conf_copy = conf.clone(); let _gc_thread = thread::Builder::new() @@ -162,7 +151,10 @@ pub fn get_or_restore_pagecache( fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { info!("Garbage collection thread started {}", timelineid); let pcache = get_pagecache(conf, timelineid).unwrap(); - pcache.do_gc(conf).unwrap(); + + let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + + runtime.block_on(pcache.do_gc(conf)).unwrap(); } fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { @@ -176,20 +168,19 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB } fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { - // Initialize the channel between the page cache and the WAL applicator - let (s, r) = unbounded(); PageCache { - db: open_rocksdb(&conf, timelineid), shared: Mutex::new(PageCacheShared { first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, }), - seqwait_lsn: SeqWait::new(0), - walredo_sender: s, - walredo_receiver: r, + db: open_rocksdb(&conf, timelineid), + + walredo_mgr: WalRedoManager::new(conf, timelineid), + + seqwait_lsn: SeqWait::new(0), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), @@ -234,19 +225,6 @@ impl CacheKey { } } -pub struct CacheEntry { - pub key: CacheKey, - - pub content: Mutex, - - // Condition variable used by the WAL redo service, to wake up - // requester. - // - // FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar - // or something else. - pub walredo_condvar: Condvar, -} - pub struct CacheEntryContent { pub page_image: Option, pub wal_record: Option, @@ -283,16 +261,6 @@ impl CacheEntryContent { } } -impl CacheEntry { - fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry { - CacheEntry { - key, - content: Mutex::new(content), - walredo_condvar: Condvar::new(), - } - } -} - #[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] pub struct RelTag { pub spcnode: u32, @@ -378,7 +346,8 @@ impl WALRecord { // Public interface functions impl PageCache { - fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + let mut minbuf = BytesMut::new(); let mut maxbuf = BytesMut::new(); let cf = self @@ -429,10 +398,10 @@ impl PageCache { minkey.lsn = 0; // reconstruct most recent page version - if content.wal_record.is_some() { - trace!("Reconstruct most recent page {:?}", key); + if let Some(rec) = content.wal_record { + trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version - self.reconstruct_page(key, content)?; + self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; reconstructed += 1; } @@ -449,12 +418,13 @@ impl PageCache { minbuf.clear(); minbuf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut minbuf); - if content.wal_record.is_some() { + if let Some(rec) = content.wal_record { minbuf.clear(); minbuf.extend_from_slice(&k); let key = CacheKey::unpack(&mut minbuf); + trace!("Reconstruct horizon page {:?}", key); - self.reconstruct_page(key, content)?; + self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; truncated += 1; } } @@ -475,31 +445,6 @@ impl PageCache { } } - fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result { - let entry_rc = Arc::new(CacheEntry::new(key.clone(), content)); - - let mut entry_content = entry_rc.content.lock().unwrap(); - entry_content.apply_pending = true; - - let s = &self.walredo_sender; - s.send(entry_rc.clone())?; - - while entry_content.apply_pending { - entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - } - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - let page_img = match &entry_content.page_image { - Some(p) => p.clone(), - None => { - error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); - bail!("could not apply WAL to reconstruct page image"); - } - }; - self.put_page_image(key.tag, key.lsn, page_img.clone()); - Ok(page_img) - } - async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { let mut lsn = req_lsn; //When invalid LSN is requested, it means "don't wait, return latest version of the page" @@ -561,7 +506,7 @@ impl PageCache { return Ok(Bytes::from_static(&ZERO_PAGE)); /* return Err("could not find page image")?; */ } - let (k, v) = entry_opt.unwrap(); + let (_k, v) = entry_opt.unwrap(); buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); @@ -569,10 +514,9 @@ impl PageCache { if let Some(img) = &content.page_image { page_img = img.clone(); } else if content.wal_record.is_some() { - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - page_img = self.reconstruct_page(key, content)?; + + // Request the WAL redo manager to apply the WAL records for us. + page_img = self.walredo_mgr.request_redo(tag, lsn).await?; } else { // No base image, and no WAL record. Huh? bail!("no page image or WAL record for requested page"); @@ -602,10 +546,10 @@ impl PageCache { // Returns an old page image (if any), and a vector of WAL records to apply // over it. // - pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { + pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { let minkey = CacheKey { tag: BufferTag { - rel: entry.key.tag.rel, + rel: tag.rel, blknum: 0, }, lsn: 0, @@ -617,8 +561,15 @@ impl PageCache { let mut readopts = rocksdb::ReadOptions::default(); readopts.set_iterate_lower_bound(buf.to_vec()); + let key = CacheKey { + tag: BufferTag { + rel: tag.rel, + blknum: tag.blknum, + }, + lsn: lsn, + }; buf.clear(); - entry.key.pack(&mut buf); + key.pack(&mut buf); let iter = self.db.iterator_opt( rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), readopts, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 10fa9caf8e..4110ceef43 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -23,216 +23,333 @@ use std::io::prelude::*; use std::io::Error; use std::path::PathBuf; use std::process::Stdio; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; use tokio::io::AsyncBufReadExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; -use tokio::runtime::Runtime; +use tokio::sync::{oneshot, mpsc}; use tokio::time::timeout; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crate::page_cache; -use crate::page_cache::CacheEntry; +use crate::page_cache::BufferTag; +use crate::page_cache::PageCache; use crate::page_cache::WALRecord; use crate::ZTimelineId; -use crate::{page_cache::BufferTag, pg_constants, PageServerConf}; +use crate::{pg_constants, PageServerConf}; static TIMEOUT: Duration = Duration::from_secs(20); -// -// Main entry point for the WAL applicator thread. -// -pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) { - info!("WAL redo thread started {}", timelineid); +/// +/// A WAL redo manager consists of two parts: WalRedoManager, and +/// WalRedoManagerInternal. WalRedoManager is the public struct +/// that can be used to send redo requests to the manager. +/// WalRedoManagerInternal is used by the manager thread itself. +/// +pub struct WalRedoManager { + conf: PageServerConf, + timelineid: ZTimelineId, - // We block on waiting for requests on the walredo request channel, but - // use async I/O to communicate with the child process. Initialize the - // runtime for the async part. - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + request_tx: mpsc::UnboundedSender, + request_rx: Mutex>>, +} - let pcache = page_cache::get_pagecache(conf, timelineid).unwrap(); +struct WalRedoManagerInternal { + _conf: PageServerConf, + timelineid: ZTimelineId, - // Loop forever, handling requests as they come. - let walredo_channel_receiver = &pcache.walredo_receiver; - loop { - let mut process: WalRedoProcess; - let datadir = format!("wal-redo/{}", timelineid); + pcache: Arc, + request_rx: mpsc::UnboundedReceiver, +} - info!("launching WAL redo postgres process {}", timelineid); - { - let _guard = runtime.enter(); - process = WalRedoProcess::launch(&datadir, &runtime).unwrap(); +#[derive(Debug)] +struct WalRedoRequest { + tag: BufferTag, + lsn: u64, + + response_channel: oneshot::Sender>, +} + +/// An error happened in WAL redo +#[derive(Debug, thiserror::Error)] +pub enum WalRedoError { + #[error(transparent)] + IoError(#[from] std::io::Error), +} + +/// +/// Public interface of WAL redo manager +/// +impl WalRedoManager +{ + /// + /// Create a new WalRedoManager. + /// + /// This only initializes the struct. You need to call WalRedoManager::launch to + /// start the thread that processes the requests. + pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager { + + let (tx, rx) = mpsc::unbounded_channel(); + + WalRedoManager { + conf: conf.clone(), + timelineid, + request_tx: tx, + request_rx: Mutex::new(Some(rx)) } - info!("WAL redo postgres started"); + } - // Pretty arbitrarily, reuse the same Postgres process for 100 requests. - // After that, kill it and start a new one. This is mostly to avoid - // using up all shared buffers in Postgres's shared buffer cache; we don't - // want to write any pages to disk in the WAL redo process. - for _i in 1..100000 { - let request = walredo_channel_receiver.recv().unwrap(); + /// + /// Launch the WAL redo thread + /// + pub fn launch(&self, pcache: Arc) { - let result = handle_apply_request(&pcache, &process, &runtime, request); - if result.is_err() { - // Something went wrong with handling the request. It's not clear - // if the request was faulty, and the next request would succeed - // again, or if the 'postgres' process went haywire. To be safe, - // kill the 'postgres' process so that we will start from a clean - // slate, with a new process, for the next request. - break; - } - } + // Get mutable references to the values that we need to pass to the + // thread. + let request_rx = self.request_rx.lock().unwrap().take().unwrap(); + let conf_copy = self.conf.clone(); + let timelineid = self.timelineid; - // Time to kill the 'postgres' process. A new one will be launched on next - // iteration of the loop. - info!("killing WAL redo postgres process"); - let _ = runtime.block_on(process.stdin.get_mut().shutdown()); - let mut child = process.child; - drop(process.stdin); - let _ = runtime.block_on(child.wait()); + // Currently, the join handle is not saved anywhere and we + // won't try restart the thread if it dies. + let _walredo_thread = std::thread::Builder::new() + .name("WAL redo thread".into()) + .spawn(move || { + + // We block on waiting for requests on the walredo request channel, but + // use async I/O to communicate with the child process. Initialize the + // runtime for the async part. + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let mut internal = WalRedoManagerInternal { + _conf: conf_copy, + timelineid: timelineid, + pcache: pcache, + request_rx: request_rx, + }; + + runtime.block_on(internal.wal_redo_main()); + }) + .unwrap(); + } + + /// + /// Request the WAL redo manager to apply WAL records, to reconstruct the page image + /// of the given page version. + /// + pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result { + + // Create a channel where to receive the response + let (tx, rx) = oneshot::channel::>(); + + let request = WalRedoRequest { + tag, lsn, + response_channel: tx, + }; + + self.request_tx.send(request).expect("could not send WAL redo request"); + + rx.await.expect("could not receive response to WAL redo request") } } -fn transaction_id_set_status_bit( - xl_info: u8, - xl_rmid: u8, - xl_xid: u32, - record: WALRecord, - page: &mut BytesMut, -) { - let info = xl_info & pg_constants::XLOG_XACT_OPMASK; - let mut status = 0; - if info == pg_constants::XLOG_XACT_COMMIT { - status = pg_constants::TRANSACTION_STATUS_COMMITTED; - } else if info == pg_constants::XLOG_XACT_ABORT { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - } else { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", - status, - record.lsn >> 32, - record.lsn & 0xffffffff, - record.main_data_offset, record.rec.len()); - return; - } +/// +/// WAL redo thread +/// +impl WalRedoManagerInternal { - trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", - status, - record.lsn >> 32, - record.lsn & 0xffffffff, - record.main_data_offset, record.rec.len()); + // + // Main entry point for the WAL applicator thread. + // + async fn wal_redo_main(&mut self) { - let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) - / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + info!("WAL redo thread started {}", self.timelineid); - let byteptr = &mut page[byteno..byteno + 1]; - let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) - * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + // Loop forever, handling requests as they come. + loop { + let mut process: WalRedoProcess; + let datadir = format!("wal-redo/{}", self.timelineid); - let mut curval = byteptr[0]; - curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; + info!("launching WAL redo postgres process {}", self.timelineid); - let mut byteval = [0]; - byteval[0] = curval; - byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift); - byteval[0] |= status << bshift; + process = WalRedoProcess::launch(&datadir).await.unwrap(); + info!("WAL redo postgres started"); - byteptr.copy_from_slice(&byteval); - trace!( - "xl_xid {} byteno {} curval {} byteval {}", - xl_xid, - byteno, - curval, - byteval[0] - ); -} + // Pretty arbitrarily, reuse the same Postgres process for 100000 requests. + // After that, kill it and start a new one. This is mostly to avoid + // using up all shared buffers in Postgres's shared buffer cache; we don't + // want to write any pages to disk in the WAL redo process. + for _i in 1..100000 { + let request = self.request_rx.recv().await.unwrap(); -fn handle_apply_request( - pcache: &page_cache::PageCache, - process: &WalRedoProcess, - runtime: &Runtime, - entry_rc: Arc, -) -> Result<(), Error> { - let tag = entry_rc.key.tag; - let lsn = entry_rc.key.lsn; - let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref()); + let result = self.handle_apply_request(&process, &request).await; + let result_ok = result.is_ok(); - let mut entry = entry_rc.content.lock().unwrap(); - assert!(entry.apply_pending); - entry.apply_pending = false; + // Send the result to the requester + let _ = request.response_channel.send(result); - let nrecords = records.len(); - - let start = Instant::now(); - - let apply_result: Result; - if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { - //TODO use base image if any - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let zero_page_bytes: &[u8] = &ZERO_PAGE; - let mut page = BytesMut::from(zero_page_bytes); - - for record in records { - let mut buf = record.rec.clone(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let _xl_tot_len = buf.get_u32_le(); - let xl_xid = buf.get_u32_le(); - let _xl_prev = buf.get_u64_le(); - let xl_info = buf.get_u8(); - let xl_rmid = buf.get_u8(); - buf.advance(2); // 2 bytes of padding - let _xl_crc = buf.get_u32_le(); - - if xl_rmid == pg_constants::RM_CLOG_ID { - let info = xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::CLOG_ZEROPAGE { - page.clone_from_slice(zero_page_bytes); - trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", - record.lsn >> 32, - record.lsn & 0xffffffff, - record.main_data_offset, record.rec.len()); + if !result_ok { + // Something went wrong with handling the request. It's not clear + // if the request was faulty, and the next request would succeed + // again, or if the 'postgres' process went haywire. To be safe, + // kill the 'postgres' process so that we will start from a clean + // slate, with a new process, for the next request. + break; } - } else if xl_rmid == pg_constants::RM_XACT_ID { - transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page); } + + // Time to kill the 'postgres' process. A new one will be launched on next + // iteration of the loop. + info!("killing WAL redo postgres process"); + let _ = process.stdin.get_mut().shutdown().await; + let mut child = process.child; + drop(process.stdin); + let _ = child.wait().await; + } + } + + fn transaction_id_set_status_bit(&self, + xl_info: u8, + xl_rmid: u8, + xl_xid: u32, + record: WALRecord, + page: &mut BytesMut, + ) { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut status = 0; + if info == pg_constants::XLOG_XACT_COMMIT { + status = pg_constants::TRANSACTION_STATUS_COMMITTED; + } else if info == pg_constants::XLOG_XACT_ABORT { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + } else { + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + return; } - apply_result = Ok::(page.freeze()); - } else { - apply_result = process.apply_wal_records(runtime, tag, base_img, records); + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + + let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) + / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + + let byteptr = &mut page[byteno..byteno + 1]; + let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) + * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + + let mut curval = byteptr[0]; + curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; + + let mut byteval = [0]; + byteval[0] = curval; + byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift); + byteval[0] |= status << bshift; + + byteptr.copy_from_slice(&byteval); + trace!( + "xl_xid {} byteno {} curval {} byteval {}", + xl_xid, + byteno, + curval, + byteval[0] + ); } - let duration = start.elapsed(); + /// + /// Process one request for WAL redo. + /// + async fn handle_apply_request(&self, + process: &WalRedoProcess, + request: &WalRedoRequest, + ) -> Result { + let pcache = &self.pcache; + let tag = request.tag; + let lsn = request.lsn; + let (base_img, records) = pcache.collect_records_for_apply(tag, lsn); - let result; + let nrecords = records.len(); - trace!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", - nrecords, - duration.as_millis(), - lsn >> 32, - lsn & 0xffff_ffff - ); + let start = Instant::now(); - if let Err(e) = apply_result { - error!("could not apply WAL records: {}", e); - result = Err(e); - } else { - entry.page_image = Some(apply_result.unwrap()); - result = Ok(()); + let apply_result: Result; + if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { + //TODO use base image if any + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let zero_page_bytes: &[u8] = &ZERO_PAGE; + let mut page = BytesMut::from(zero_page_bytes); + + for record in records { + let mut buf = record.rec.clone(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let _xl_tot_len = buf.get_u32_le(); + let xl_xid = buf.get_u32_le(); + let _xl_prev = buf.get_u64_le(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); + buf.advance(2); // 2 bytes of padding + let _xl_crc = buf.get_u32_le(); + + if xl_rmid == pg_constants::RM_CLOG_ID { + let info = xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + page.clone_from_slice(zero_page_bytes); + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + } + } else if xl_rmid == pg_constants::RM_XACT_ID { + self.transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page); + } + } + + apply_result = Ok::(page.freeze()); + } else { + apply_result = process.apply_wal_records(tag, base_img, records).await; + } + + let duration = start.elapsed(); + + let result: Result; + + trace!( + "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", + nrecords, + duration.as_millis(), + lsn >> 32, + lsn & 0xffff_ffff + ); + + if let Err(e) = apply_result { + error!("could not apply WAL records: {}", e); + result = Err(WalRedoError::IoError(e)); + } else { + let img = apply_result.unwrap(); + + // Should we put the image back to the page cache? We don't have to, + // but then the next GetPage@LSN call will have to do the WAL redo + // again + pcache.put_page_image(tag, lsn, img.clone()); + + result = Ok(img); + } + + // The caller is responsible for sending the response + result } - - // Wake up the requester, whether the operation succeeded or not. - entry_rc.walredo_condvar.notify_all(); - - result } struct WalRedoProcess { @@ -248,17 +365,14 @@ impl WalRedoProcess { // Tests who run pageserver binary are setting proper PG_BIN_DIR // and PG_LIB_DIR so that WalRedo would start right postgres. We may later // switch to setting same things in pageserver config file. - fn launch(datadir: &str, runtime: &Runtime) -> Result { + async fn launch(datadir: &str) -> Result { // Create empty data directory for wal-redo postgres deleting old one. fs::remove_dir_all(datadir).ok(); - let initdb = runtime - .block_on( - Command::new("initdb") - .args(&["-D", datadir]) - .arg("-N") - .output(), - ) - .expect("failed to execute initdb"); + let initdb = Command::new("initdb") + .args(&["-D", datadir]) + .arg("-N") + .output() + .await.expect("failed to execute initdb"); if !initdb.status.success() { panic!( @@ -322,82 +436,76 @@ impl WalRedoProcess { // Apply given WAL records ('records') over an old page image. Returns // new page image. // - fn apply_wal_records( - &self, - runtime: &Runtime, + async fn apply_wal_records(&self, tag: BufferTag, base_img: Option, records: Vec, - ) -> Result { + ) -> Result { let mut stdin = self.stdin.borrow_mut(); let mut stdout = self.stdout.borrow_mut(); - runtime.block_on(async { - // - // This async block sends all the commands to the process. - // - // For reasons I don't understand, this needs to be a "move" block; - // otherwise the stdin pipe doesn't get closed, despite the shutdown() - // call. - // - let f_stdin = async { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) + + let f_stdin = async { + // Send base image, if any. (If the record initializes the page, previous page + // version is not needed.) + timeout( + TIMEOUT, + stdin.write_all(&build_begin_redo_for_block_msg(tag)), + ) + .await??; + if base_img.is_some() { timeout( TIMEOUT, - stdin.write_all(&build_begin_redo_for_block_msg(tag)), + stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), ) - .await??; - if base_img.is_some() { - timeout( - TIMEOUT, - stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), - ) .await??; - } + } - // Send WAL records. - for rec in records.iter() { - let r = rec.clone(); + // Send WAL records. + for rec in records.iter() { + let r = rec.clone(); - stdin - .write_all(&build_apply_record_msg(r.lsn, r.rec)) - .await?; + stdin + .write_all(&build_apply_record_msg(r.lsn, r.rec)) + .await?; - //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", - // r.lsn >> 32, r.lsn & 0xffff_ffff); - } - //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", - // records.len(), lsn >> 32, lsn & 0xffff_ffff); + //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", + // r.lsn >> 32, r.lsn & 0xffff_ffff); + } + //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", + // records.len(), lsn >> 32, lsn & 0xffff_ffff); - // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; - timeout(TIMEOUT, stdin.flush()).await??; - //debug!("sent GetPage for {}", tag.blknum); - Ok::<(), Error>(()) - }; + // Send GetPage command to get the result back + timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; + timeout(TIMEOUT, stdin.flush()).await??; + //debug!("sent GetPage for {}", tag.blknum); + Ok::<(), Error>(()) + }; - // Read back new page image - let f_stdout = async { - let mut buf = [0u8; 8192]; + // Read back new page image + let f_stdout = async { + let mut buf = [0u8; 8192]; - timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; - //debug!("got response for {}", tag.blknum); - Ok::<[u8; 8192], Error>(buf) - }; + timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; + //debug!("got response for {}", tag.blknum); + Ok::<[u8; 8192], Error>(buf) + }; - // Kill the process. This closes its stdin, which should signal the process - // to terminate. TODO: SIGKILL if needed - //child.wait(); + // Kill the process. This closes its stdin, which should signal the process + // to terminate. TODO: SIGKILL if needed + //child.wait(); - let res = futures::try_join!(f_stdout, f_stdin)?; + let res = futures::try_join!(f_stdout, f_stdin)?; - let buf = res.0; + let buf = res.0; - Ok::(Bytes::from(std::vec::Vec::from(buf))) - }) + Ok::(Bytes::from(std::vec::Vec::from(buf))) } } +// Functions for constructing messages to send to the postgres WAL redo +// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for +// explanation of the protocol. + fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { let len = 4 + 5 * 4; let mut buf = BytesMut::with_capacity(1 + len);