Refactor pagecache <-> Wal redo communication

After the rocksdb patch (commit 6aa38d3f7d), the CacheEntry struct was
used only momentarily in the communication between the page_cache and
the walredo modules. It was in fact not stored in any cache anymore.
For clarity, refactor the communication.

There is now a WalRedoManager struct, with `request_redo` function,
that can be used to request WAL replay of a particular page. It sends
a request to a queue like before, but the queue has been replaced with
tokio::sync::mpsc. Previously, the resulting page image was stored
directly in the CacheEntry, and the requestor was notified using a
condition variable. Now, the requestor includes a 'oneshot' channel in
the request, and the WAL redo manager sends the response there.
This commit is contained in:
Heikki Linnakangas
2021-04-24 12:24:04 +03:00
parent fe79082e29
commit 93d7d2ae2a
4 changed files with 376 additions and 319 deletions

1
Cargo.lock generated
View File

@@ -1381,7 +1381,6 @@ dependencies = [
"chrono",
"clap",
"crc32c",
"crossbeam-channel",
"daemonize",
"futures",
"hex",

View File

@@ -8,7 +8,6 @@ edition = "2018"
[dependencies]
chrono = "0.4.19"
crossbeam-channel = "0.5.0"
rand = "0.8.3"
regex = "1.4.5"
bytes = "1.0.1"

View File

@@ -7,11 +7,9 @@
use crate::restore_local_repo::restore_timeline;
use crate::waldecoder::Oid;
use crate::ZTimelineId;
use crate::{walredo, zenith_repo_dir, PageServerConf};
use crate::{zenith_repo_dir, PageServerConf};
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rocksdb;
@@ -19,11 +17,12 @@ use std::cmp::min;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use std::{convert::TryInto, ops::AddAssign};
use zenith_utils::seqwait::SeqWait;
use crate::walredo::WalRedoManager;
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -34,9 +33,8 @@ pub struct PageCache {
// RocksDB handle
db: rocksdb::DB,
// Channel for communicating with the WAL redo process here.
pub walredo_sender: Sender<Arc<CacheEntry>>,
pub walredo_receiver: Receiver<Arc<CacheEntry>>,
// WAL redo manager
walredo_mgr: WalRedoManager,
// Allows .await on the arrival of a particular LSN.
seqwait_lsn: SeqWait,
@@ -131,20 +129,11 @@ pub fn get_or_restore_pagecache(
let result = Arc::new(pcache);
// Launch the WAL redo thread
result.walredo_mgr.launch(result.clone());
pcaches.insert(timelineid, result.clone());
// Initialize the WAL redo thread
//
// Now join_handle is not saved any where and we won'try restart tharead
// if it is dead. We may later stop that treads after some inactivity period
// and restart them on demand.
let conf_copy = conf.clone();
let _walredo_thread = thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
walredo::wal_redo_main(&conf_copy, timelineid);
})
.unwrap();
if conf.gc_horizon != 0 {
let conf_copy = conf.clone();
let _gc_thread = thread::Builder::new()
@@ -162,7 +151,10 @@ pub fn get_or_restore_pagecache(
fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("Garbage collection thread started {}", timelineid);
let pcache = get_pagecache(conf, timelineid).unwrap();
pcache.do_gc(conf).unwrap();
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
runtime.block_on(pcache.do_gc(conf)).unwrap();
}
fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB {
@@ -176,20 +168,19 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB
}
fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache {
// Initialize the channel between the page cache and the WAL applicator
let (s, r) = unbounded();
PageCache {
db: open_rocksdb(&conf, timelineid),
shared: Mutex::new(PageCacheShared {
first_valid_lsn: 0,
last_valid_lsn: 0,
last_record_lsn: 0,
}),
seqwait_lsn: SeqWait::new(0),
walredo_sender: s,
walredo_receiver: r,
db: open_rocksdb(&conf, timelineid),
walredo_mgr: WalRedoManager::new(conf, timelineid),
seqwait_lsn: SeqWait::new(0),
num_entries: AtomicU64::new(0),
num_page_images: AtomicU64::new(0),
@@ -234,19 +225,6 @@ impl CacheKey {
}
}
pub struct CacheEntry {
pub key: CacheKey,
pub content: Mutex<CacheEntryContent>,
// Condition variable used by the WAL redo service, to wake up
// requester.
//
// FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar
// or something else.
pub walredo_condvar: Condvar,
}
pub struct CacheEntryContent {
pub page_image: Option<Bytes>,
pub wal_record: Option<WALRecord>,
@@ -283,16 +261,6 @@ impl CacheEntryContent {
}
}
impl CacheEntry {
fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry {
CacheEntry {
key,
content: Mutex::new(content),
walredo_condvar: Condvar::new(),
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)]
pub struct RelTag {
pub spcnode: u32,
@@ -378,7 +346,8 @@ impl WALRecord {
// Public interface functions
impl PageCache {
fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result<Bytes> {
let mut minbuf = BytesMut::new();
let mut maxbuf = BytesMut::new();
let cf = self
@@ -429,10 +398,10 @@ impl PageCache {
minkey.lsn = 0;
// reconstruct most recent page version
if content.wal_record.is_some() {
trace!("Reconstruct most recent page {:?}", key);
if let Some(rec) = content.wal_record {
trace!("Reconstruct most recent page {:?}", key);
// force reconstruction of most recent page version
self.reconstruct_page(key, content)?;
self.walredo_mgr.request_redo(key.tag, rec.lsn).await?;
reconstructed += 1;
}
@@ -449,12 +418,13 @@ impl PageCache {
minbuf.clear();
minbuf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut minbuf);
if content.wal_record.is_some() {
if let Some(rec) = content.wal_record {
minbuf.clear();
minbuf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut minbuf);
trace!("Reconstruct horizon page {:?}", key);
self.reconstruct_page(key, content)?;
self.walredo_mgr.request_redo(key.tag, rec.lsn).await?;
truncated += 1;
}
}
@@ -475,31 +445,6 @@ impl PageCache {
}
}
fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result<Bytes> {
let entry_rc = Arc::new(CacheEntry::new(key.clone(), content));
let mut entry_content = entry_rc.content.lock().unwrap();
entry_content.apply_pending = true;
let s = &self.walredo_sender;
s.send(entry_rc.clone())?;
while entry_content.apply_pending {
entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap();
}
// We should now have a page image. If we don't, it means that WAL redo
// failed to reconstruct it. WAL redo should've logged that error already.
let page_img = match &entry_content.page_image {
Some(p) => p.clone(),
None => {
error!("could not apply WAL to reconstruct page image for GetPage@LSN request");
bail!("could not apply WAL to reconstruct page image");
}
};
self.put_page_image(key.tag, key.lsn, page_img.clone());
Ok(page_img)
}
async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result<u64> {
let mut lsn = req_lsn;
//When invalid LSN is requested, it means "don't wait, return latest version of the page"
@@ -561,7 +506,7 @@ impl PageCache {
return Ok(Bytes::from_static(&ZERO_PAGE));
/* return Err("could not find page image")?; */
}
let (k, v) = entry_opt.unwrap();
let (_k, v) = entry_opt.unwrap();
buf.clear();
buf.extend_from_slice(&v);
let content = CacheEntryContent::unpack(&mut buf);
@@ -569,10 +514,9 @@ impl PageCache {
if let Some(img) = &content.page_image {
page_img = img.clone();
} else if content.wal_record.is_some() {
buf.clear();
buf.extend_from_slice(&k);
let key = CacheKey::unpack(&mut buf);
page_img = self.reconstruct_page(key, content)?;
// Request the WAL redo manager to apply the WAL records for us.
page_img = self.walredo_mgr.request_redo(tag, lsn).await?;
} else {
// No base image, and no WAL record. Huh?
bail!("no page image or WAL record for requested page");
@@ -602,10 +546,10 @@ impl PageCache {
// Returns an old page image (if any), and a vector of WAL records to apply
// over it.
//
pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option<Bytes>, Vec<WALRecord>) {
pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option<Bytes>, Vec<WALRecord>) {
let minkey = CacheKey {
tag: BufferTag {
rel: entry.key.tag.rel,
rel: tag.rel,
blknum: 0,
},
lsn: 0,
@@ -617,8 +561,15 @@ impl PageCache {
let mut readopts = rocksdb::ReadOptions::default();
readopts.set_iterate_lower_bound(buf.to_vec());
let key = CacheKey {
tag: BufferTag {
rel: tag.rel,
blknum: tag.blknum,
},
lsn: lsn,
};
buf.clear();
entry.key.pack(&mut buf);
key.pack(&mut buf);
let iter = self.db.iterator_opt(
rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse),
readopts,

View File

@@ -23,216 +23,333 @@ use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::runtime::Runtime;
use tokio::sync::{oneshot, mpsc};
use tokio::time::timeout;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::page_cache;
use crate::page_cache::CacheEntry;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::page_cache::WALRecord;
use crate::ZTimelineId;
use crate::{page_cache::BufferTag, pg_constants, PageServerConf};
use crate::{pg_constants, PageServerConf};
static TIMEOUT: Duration = Duration::from_secs(20);
//
// Main entry point for the WAL applicator thread.
//
pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("WAL redo thread started {}", timelineid);
///
/// A WAL redo manager consists of two parts: WalRedoManager, and
/// WalRedoManagerInternal. WalRedoManager is the public struct
/// that can be used to send redo requests to the manager.
/// WalRedoManagerInternal is used by the manager thread itself.
///
pub struct WalRedoManager {
conf: PageServerConf,
timelineid: ZTimelineId,
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
request_tx: mpsc::UnboundedSender<WalRedoRequest>,
request_rx: Mutex<Option<mpsc::UnboundedReceiver<WalRedoRequest>>>,
}
let pcache = page_cache::get_pagecache(conf, timelineid).unwrap();
struct WalRedoManagerInternal {
_conf: PageServerConf,
timelineid: ZTimelineId,
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &pcache.walredo_receiver;
loop {
let mut process: WalRedoProcess;
let datadir = format!("wal-redo/{}", timelineid);
pcache: Arc<PageCache>,
request_rx: mpsc::UnboundedReceiver<WalRedoRequest>,
}
info!("launching WAL redo postgres process {}", timelineid);
{
let _guard = runtime.enter();
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
#[derive(Debug)]
struct WalRedoRequest {
tag: BufferTag,
lsn: u64,
response_channel: oneshot::Sender<Result<Bytes, WalRedoError>>,
}
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
#[error(transparent)]
IoError(#[from] std::io::Error),
}
///
/// Public interface of WAL redo manager
///
impl WalRedoManager
{
///
/// Create a new WalRedoManager.
///
/// This only initializes the struct. You need to call WalRedoManager::launch to
/// start the thread that processes the requests.
pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager {
let (tx, rx) = mpsc::unbounded_channel();
WalRedoManager {
conf: conf.clone(),
timelineid,
request_tx: tx,
request_rx: Mutex::new(Some(rx))
}
info!("WAL redo postgres started");
}
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100000 {
let request = walredo_channel_receiver.recv().unwrap();
///
/// Launch the WAL redo thread
///
pub fn launch(&self, pcache: Arc<PageCache>) {
let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() {
// Something went wrong with handling the request. It's not clear
// if the request was faulty, and the next request would succeed
// again, or if the 'postgres' process went haywire. To be safe,
// kill the 'postgres' process so that we will start from a clean
// slate, with a new process, for the next request.
break;
}
}
// Get mutable references to the values that we need to pass to the
// thread.
let request_rx = self.request_rx.lock().unwrap().take().unwrap();
let conf_copy = self.conf.clone();
let timelineid = self.timelineid;
// Time to kill the 'postgres' process. A new one will be launched on next
// iteration of the loop.
info!("killing WAL redo postgres process");
let _ = runtime.block_on(process.stdin.get_mut().shutdown());
let mut child = process.child;
drop(process.stdin);
let _ = runtime.block_on(child.wait());
// Currently, the join handle is not saved anywhere and we
// won't try restart the thread if it dies.
let _walredo_thread = std::thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut internal = WalRedoManagerInternal {
_conf: conf_copy,
timelineid: timelineid,
pcache: pcache,
request_rx: request_rx,
};
runtime.block_on(internal.wal_redo_main());
})
.unwrap();
}
///
/// Request the WAL redo manager to apply WAL records, to reconstruct the page image
/// of the given page version.
///
pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = oneshot::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest {
tag, lsn,
response_channel: tx,
};
self.request_tx.send(request).expect("could not send WAL redo request");
rx.await.expect("could not receive response to WAL redo request")
}
}
fn transaction_id_set_status_bit(
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
) {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
} else {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
return;
}
///
/// WAL redo thread
///
impl WalRedoManagerInternal {
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
//
// Main entry point for the WAL applicator thread.
//
async fn wal_redo_main(&mut self) {
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
info!("WAL redo thread started {}", self.timelineid);
let byteptr = &mut page[byteno..byteno + 1];
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
// Loop forever, handling requests as they come.
loop {
let mut process: WalRedoProcess;
let datadir = format!("wal-redo/{}", self.timelineid);
let mut curval = byteptr[0];
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
info!("launching WAL redo postgres process {}", self.timelineid);
let mut byteval = [0];
byteval[0] = curval;
byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift);
byteval[0] |= status << bshift;
process = WalRedoProcess::launch(&datadir).await.unwrap();
info!("WAL redo postgres started");
byteptr.copy_from_slice(&byteval);
trace!(
"xl_xid {} byteno {} curval {} byteval {}",
xl_xid,
byteno,
curval,
byteval[0]
);
}
// Pretty arbitrarily, reuse the same Postgres process for 100000 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100000 {
let request = self.request_rx.recv().await.unwrap();
fn handle_apply_request(
pcache: &page_cache::PageCache,
process: &WalRedoProcess,
runtime: &Runtime,
entry_rc: Arc<CacheEntry>,
) -> Result<(), Error> {
let tag = entry_rc.key.tag;
let lsn = entry_rc.key.lsn;
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let result = self.handle_apply_request(&process, &request).await;
let result_ok = result.is_ok();
let mut entry = entry_rc.content.lock().unwrap();
assert!(entry.apply_pending);
entry.apply_pending = false;
// Send the result to the requester
let _ = request.response_channel.send(result);
let nrecords = records.len();
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
let mut page = BytesMut::from(zero_page_bytes);
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let _xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
if xl_rmid == pg_constants::RM_CLOG_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
if !result_ok {
// Something went wrong with handling the request. It's not clear
// if the request was faulty, and the next request would succeed
// again, or if the 'postgres' process went haywire. To be safe,
// kill the 'postgres' process so that we will start from a clean
// slate, with a new process, for the next request.
break;
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page);
}
// Time to kill the 'postgres' process. A new one will be launched on next
// iteration of the loop.
info!("killing WAL redo postgres process");
let _ = process.stdin.get_mut().shutdown().await;
let mut child = process.child;
drop(process.stdin);
let _ = child.wait().await;
}
}
fn transaction_id_set_status_bit(&self,
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
) {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
} else {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
return;
}
apply_result = Ok::<Bytes, Error>(page.freeze());
} else {
apply_result = process.apply_wal_records(runtime, tag, base_img, records);
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
let byteptr = &mut page[byteno..byteno + 1];
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
let mut curval = byteptr[0];
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
let mut byteval = [0];
byteval[0] = curval;
byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift);
byteval[0] |= status << bshift;
byteptr.copy_from_slice(&byteval);
trace!(
"xl_xid {} byteno {} curval {} byteval {}",
xl_xid,
byteno,
curval,
byteval[0]
);
}
let duration = start.elapsed();
///
/// Process one request for WAL redo.
///
async fn handle_apply_request(&self,
process: &WalRedoProcess,
request: &WalRedoRequest,
) -> Result<Bytes, WalRedoError> {
let pcache = &self.pcache;
let tag = request.tag;
let lsn = request.lsn;
let (base_img, records) = pcache.collect_records_for_apply(tag, lsn);
let result;
let nrecords = records.len();
trace!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords,
duration.as_millis(),
lsn >> 32,
lsn & 0xffff_ffff
);
let start = Instant::now();
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
result = Err(e);
} else {
entry.page_image = Some(apply_result.unwrap());
result = Ok(());
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
let mut page = BytesMut::from(zero_page_bytes);
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let _xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
if xl_rmid == pg_constants::RM_CLOG_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
self.transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page);
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
} else {
apply_result = process.apply_wal_records(tag, base_img, records).await;
}
let duration = start.elapsed();
let result: Result<Bytes, WalRedoError>;
trace!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords,
duration.as_millis(),
lsn >> 32,
lsn & 0xffff_ffff
);
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
result = Err(WalRedoError::IoError(e));
} else {
let img = apply_result.unwrap();
// Should we put the image back to the page cache? We don't have to,
// but then the next GetPage@LSN call will have to do the WAL redo
// again
pcache.put_page_image(tag, lsn, img.clone());
result = Ok(img);
}
// The caller is responsible for sending the response
result
}
// Wake up the requester, whether the operation succeeded or not.
entry_rc.walredo_condvar.notify_all();
result
}
struct WalRedoProcess {
@@ -248,17 +365,14 @@ impl WalRedoProcess {
// Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// switch to setting same things in pageserver config file.
fn launch(datadir: &str, runtime: &Runtime) -> Result<WalRedoProcess, Error> {
async fn launch(datadir: &str) -> Result<WalRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one.
fs::remove_dir_all(datadir).ok();
let initdb = runtime
.block_on(
Command::new("initdb")
.args(&["-D", datadir])
.arg("-N")
.output(),
)
.expect("failed to execute initdb");
let initdb = Command::new("initdb")
.args(&["-D", datadir])
.arg("-N")
.output()
.await.expect("failed to execute initdb");
if !initdb.status.success() {
panic!(
@@ -322,82 +436,76 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
fn apply_wal_records(
&self,
runtime: &Runtime,
async fn apply_wal_records(&self,
tag: BufferTag,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
) -> Result<Bytes, Error> {
) -> Result<Bytes, std::io::Error> {
let mut stdin = self.stdin.borrow_mut();
let mut stdout = self.stdout.borrow_mut();
runtime.block_on(async {
//
// This async block sends all the commands to the process.
//
// For reasons I don't understand, this needs to be a "move" block;
// otherwise the stdin pipe doesn't get closed, despite the shutdown()
// call.
//
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
if base_img.is_some() {
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
)
.await??;
if base_img.is_some() {
timeout(
TIMEOUT,
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
)
.await??;
}
}
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
stdin
.write_all(&build_apply_record_msg(r.lsn, r.rec))
.await?;
stdin
.write_all(&build_apply_record_msg(r.lsn, r.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
let res = futures::try_join!(f_stdout, f_stdin)?;
let res = futures::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
})
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
}
}
// Functions for constructing messages to send to the postgres WAL redo
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let mut buf = BytesMut::with_capacity(1 + len);