Do not delete versions in GC

This commit is contained in:
Konstantin Knizhnik
2021-04-24 23:52:50 +03:00
8 changed files with 839 additions and 729 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@
/tmp_install
/tmp_check_cli
.vscode
.zenith

1
Cargo.lock generated
View File

@@ -1381,7 +1381,6 @@ dependencies = [
"chrono",
"clap",
"crc32c",
"crossbeam-channel",
"daemonize",
"futures",
"hex",

View File

@@ -8,7 +8,6 @@ edition = "2018"
[dependencies]
chrono = "0.4.19"
crossbeam-channel = "0.5.0"
rand = "0.8.3"
regex = "1.4.5"
bytes = "1.0.1"

File diff suppressed because it is too large Load Diff

View File

@@ -266,7 +266,7 @@ async fn walreceiver_main(
rec: recdata.clone(),
main_data_offset: decoded.main_data_offset as u32,
};
pcache.put_rel_wal_record(tag, rec).await?;
pcache.put_rel_wal_record(tag, rec)?;
}
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)

View File

@@ -23,216 +23,333 @@ use std::io::prelude::*;
use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::runtime::Runtime;
use tokio::sync::{oneshot, mpsc};
use tokio::time::timeout;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::page_cache;
use crate::page_cache::CacheEntry;
use crate::page_cache::BufferTag;
use crate::page_cache::PageCache;
use crate::page_cache::WALRecord;
use crate::ZTimelineId;
use crate::{page_cache::BufferTag, pg_constants, PageServerConf};
use crate::{pg_constants, PageServerConf};
static TIMEOUT: Duration = Duration::from_secs(20);
//
// Main entry point for the WAL applicator thread.
//
pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) {
info!("WAL redo thread started {}", timelineid);
///
/// A WAL redo manager consists of two parts: WalRedoManager, and
/// WalRedoManagerInternal. WalRedoManager is the public struct
/// that can be used to send redo requests to the manager.
/// WalRedoManagerInternal is used by the manager thread itself.
///
pub struct WalRedoManager {
conf: PageServerConf,
timelineid: ZTimelineId,
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
request_tx: mpsc::UnboundedSender<WalRedoRequest>,
request_rx: Mutex<Option<mpsc::UnboundedReceiver<WalRedoRequest>>>,
}
let pcache = page_cache::get_pagecache(conf, timelineid).unwrap();
struct WalRedoManagerInternal {
_conf: PageServerConf,
timelineid: ZTimelineId,
// Loop forever, handling requests as they come.
let walredo_channel_receiver = &pcache.walredo_receiver;
loop {
let mut process: WalRedoProcess;
let datadir = format!("wal-redo/{}", timelineid);
pcache: Arc<PageCache>,
request_rx: mpsc::UnboundedReceiver<WalRedoRequest>,
}
info!("launching WAL redo postgres process {}", timelineid);
{
let _guard = runtime.enter();
process = WalRedoProcess::launch(&datadir, &runtime).unwrap();
#[derive(Debug)]
struct WalRedoRequest {
tag: BufferTag,
lsn: u64,
response_channel: oneshot::Sender<Result<Bytes, WalRedoError>>,
}
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
#[error(transparent)]
IoError(#[from] std::io::Error),
}
///
/// Public interface of WAL redo manager
///
impl WalRedoManager
{
///
/// Create a new WalRedoManager.
///
/// This only initializes the struct. You need to call WalRedoManager::launch to
/// start the thread that processes the requests.
pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager {
let (tx, rx) = mpsc::unbounded_channel();
WalRedoManager {
conf: conf.clone(),
timelineid,
request_tx: tx,
request_rx: Mutex::new(Some(rx))
}
info!("WAL redo postgres started");
}
// Pretty arbitrarily, reuse the same Postgres process for 100 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100000 {
let request = walredo_channel_receiver.recv().unwrap();
///
/// Launch the WAL redo thread
///
pub fn launch(&self, pcache: Arc<PageCache>) {
let result = handle_apply_request(&pcache, &process, &runtime, request);
if result.is_err() {
// Something went wrong with handling the request. It's not clear
// if the request was faulty, and the next request would succeed
// again, or if the 'postgres' process went haywire. To be safe,
// kill the 'postgres' process so that we will start from a clean
// slate, with a new process, for the next request.
break;
}
}
// Get mutable references to the values that we need to pass to the
// thread.
let request_rx = self.request_rx.lock().unwrap().take().unwrap();
let conf_copy = self.conf.clone();
let timelineid = self.timelineid;
// Time to kill the 'postgres' process. A new one will be launched on next
// iteration of the loop.
info!("killing WAL redo postgres process");
let _ = runtime.block_on(process.stdin.get_mut().shutdown());
let mut child = process.child;
drop(process.stdin);
let _ = runtime.block_on(child.wait());
// Currently, the join handle is not saved anywhere and we
// won't try restart the thread if it dies.
let _walredo_thread = std::thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
// runtime for the async part.
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let mut internal = WalRedoManagerInternal {
_conf: conf_copy,
timelineid: timelineid,
pcache: pcache,
request_rx: request_rx,
};
runtime.block_on(internal.wal_redo_main());
})
.unwrap();
}
///
/// Request the WAL redo manager to apply WAL records, to reconstruct the page image
/// of the given page version.
///
pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = oneshot::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest {
tag, lsn,
response_channel: tx,
};
self.request_tx.send(request).expect("could not send WAL redo request");
rx.await.expect("could not receive response to WAL redo request")
}
}
fn transaction_id_set_status_bit(
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
) {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
} else {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
return;
}
///
/// WAL redo thread
///
impl WalRedoManagerInternal {
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
//
// Main entry point for the WAL applicator thread.
//
async fn wal_redo_main(&mut self) {
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
info!("WAL redo thread started {}", self.timelineid);
let byteptr = &mut page[byteno..byteno + 1];
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
// Loop forever, handling requests as they come.
loop {
let mut process: WalRedoProcess;
let datadir = format!("wal-redo/{}", self.timelineid);
let mut curval = byteptr[0];
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
info!("launching WAL redo postgres process {}", self.timelineid);
let mut byteval = [0];
byteval[0] = curval;
byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift);
byteval[0] |= status << bshift;
process = WalRedoProcess::launch(&datadir).await.unwrap();
info!("WAL redo postgres started");
byteptr.copy_from_slice(&byteval);
trace!(
"xl_xid {} byteno {} curval {} byteval {}",
xl_xid,
byteno,
curval,
byteval[0]
);
}
// Pretty arbitrarily, reuse the same Postgres process for 100000 requests.
// After that, kill it and start a new one. This is mostly to avoid
// using up all shared buffers in Postgres's shared buffer cache; we don't
// want to write any pages to disk in the WAL redo process.
for _i in 1..100000 {
let request = self.request_rx.recv().await.unwrap();
fn handle_apply_request(
pcache: &page_cache::PageCache,
process: &WalRedoProcess,
runtime: &Runtime,
entry_rc: Arc<CacheEntry>,
) -> Result<(), Error> {
let tag = entry_rc.key.tag;
let lsn = entry_rc.key.lsn;
let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref());
let result = self.handle_apply_request(&process, &request).await;
let result_ok = result.is_ok();
let mut entry = entry_rc.content.lock().unwrap();
assert!(entry.apply_pending);
entry.apply_pending = false;
// Send the result to the requester
let _ = request.response_channel.send(result);
let nrecords = records.len();
let start = Instant::now();
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
let mut page = BytesMut::from(zero_page_bytes);
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let _xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
if xl_rmid == pg_constants::RM_CLOG_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
if !result_ok {
// Something went wrong with handling the request. It's not clear
// if the request was faulty, and the next request would succeed
// again, or if the 'postgres' process went haywire. To be safe,
// kill the 'postgres' process so that we will start from a clean
// slate, with a new process, for the next request.
break;
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page);
}
// Time to kill the 'postgres' process. A new one will be launched on next
// iteration of the loop.
info!("killing WAL redo postgres process");
let _ = process.stdin.get_mut().shutdown().await;
let mut child = process.child;
drop(process.stdin);
let _ = child.wait().await;
}
}
fn transaction_id_set_status_bit(&self,
xl_info: u8,
xl_rmid: u8,
xl_xid: u32,
record: WALRecord,
page: &mut BytesMut,
) {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
let mut status = 0;
if info == pg_constants::XLOG_XACT_COMMIT {
status = pg_constants::TRANSACTION_STATUS_COMMITTED;
} else if info == pg_constants::XLOG_XACT_ABORT {
status = pg_constants::TRANSACTION_STATUS_ABORTED;
} else {
trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
return;
}
apply_result = Ok::<Bytes, Error>(page.freeze());
} else {
apply_result = process.apply_wal_records(runtime, tag, base_img, records);
trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}",
status,
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32)
/ pg_constants::CLOG_XACTS_PER_BYTE) as usize;
let byteptr = &mut page[byteno..byteno + 1];
let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE)
* pg_constants::CLOG_BITS_PER_XACT as u32) as u8;
let mut curval = byteptr[0];
curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK;
let mut byteval = [0];
byteval[0] = curval;
byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift);
byteval[0] |= status << bshift;
byteptr.copy_from_slice(&byteval);
trace!(
"xl_xid {} byteno {} curval {} byteval {}",
xl_xid,
byteno,
curval,
byteval[0]
);
}
let duration = start.elapsed();
///
/// Process one request for WAL redo.
///
async fn handle_apply_request(&self,
process: &WalRedoProcess,
request: &WalRedoRequest,
) -> Result<Bytes, WalRedoError> {
let pcache = &self.pcache;
let tag = request.tag;
let lsn = request.lsn;
let (base_img, records) = pcache.collect_records_for_apply(tag, lsn);
let result;
let nrecords = records.len();
trace!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords,
duration.as_millis(),
lsn >> 32,
lsn & 0xffff_ffff
);
let start = Instant::now();
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
result = Err(e);
} else {
entry.page_image = Some(apply_result.unwrap());
result = Ok(());
let apply_result: Result<Bytes, Error>;
if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 {
//TODO use base image if any
static ZERO_PAGE: [u8; 8192] = [0u8; 8192];
let zero_page_bytes: &[u8] = &ZERO_PAGE;
let mut page = BytesMut::from(zero_page_bytes);
for record in records {
let mut buf = record.rec.clone();
// 1. Parse XLogRecord struct
// FIXME: refactor to avoid code duplication.
let _xl_tot_len = buf.get_u32_le();
let xl_xid = buf.get_u32_le();
let _xl_prev = buf.get_u64_le();
let xl_info = buf.get_u8();
let xl_rmid = buf.get_u8();
buf.advance(2); // 2 bytes of padding
let _xl_crc = buf.get_u32_le();
if xl_rmid == pg_constants::RM_CLOG_ID {
let info = xl_info & !pg_constants::XLR_INFO_MASK;
if info == pg_constants::CLOG_ZEROPAGE {
page.clone_from_slice(zero_page_bytes);
trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}",
record.lsn >> 32,
record.lsn & 0xffffffff,
record.main_data_offset, record.rec.len());
}
} else if xl_rmid == pg_constants::RM_XACT_ID {
self.transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page);
}
}
apply_result = Ok::<Bytes, Error>(page.freeze());
} else {
apply_result = process.apply_wal_records(tag, base_img, records).await;
}
let duration = start.elapsed();
let result: Result<Bytes, WalRedoError>;
trace!(
"applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
nrecords,
duration.as_millis(),
lsn >> 32,
lsn & 0xffff_ffff
);
if let Err(e) = apply_result {
error!("could not apply WAL records: {}", e);
result = Err(WalRedoError::IoError(e));
} else {
let img = apply_result.unwrap();
// Should we put the image back to the page cache? We don't have to,
// but then the next GetPage@LSN call will have to do the WAL redo
// again
pcache.put_page_image(tag, lsn, img.clone());
result = Ok(img);
}
// The caller is responsible for sending the response
result
}
// Wake up the requester, whether the operation succeeded or not.
entry_rc.walredo_condvar.notify_all();
result
}
struct WalRedoProcess {
@@ -248,17 +365,14 @@ impl WalRedoProcess {
// Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// switch to setting same things in pageserver config file.
fn launch(datadir: &str, runtime: &Runtime) -> Result<WalRedoProcess, Error> {
async fn launch(datadir: &str) -> Result<WalRedoProcess, Error> {
// Create empty data directory for wal-redo postgres deleting old one.
fs::remove_dir_all(datadir).ok();
let initdb = runtime
.block_on(
Command::new("initdb")
.args(&["-D", datadir])
.arg("-N")
.output(),
)
.expect("failed to execute initdb");
let initdb = Command::new("initdb")
.args(&["-D", datadir])
.arg("-N")
.output()
.await.expect("failed to execute initdb");
if !initdb.status.success() {
panic!(
@@ -322,82 +436,76 @@ impl WalRedoProcess {
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
fn apply_wal_records(
&self,
runtime: &Runtime,
async fn apply_wal_records(&self,
tag: BufferTag,
base_img: Option<Bytes>,
records: Vec<WALRecord>,
) -> Result<Bytes, Error> {
) -> Result<Bytes, std::io::Error> {
let mut stdin = self.stdin.borrow_mut();
let mut stdout = self.stdout.borrow_mut();
runtime.block_on(async {
//
// This async block sends all the commands to the process.
//
// For reasons I don't understand, this needs to be a "move" block;
// otherwise the stdin pipe doesn't get closed, despite the shutdown()
// call.
//
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
if base_img.is_some() {
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
)
.await??;
if base_img.is_some() {
timeout(
TIMEOUT,
stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())),
)
.await??;
}
}
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
// Send WAL records.
for rec in records.iter() {
let r = rec.clone();
stdin
.write_all(&build_apply_record_msg(r.lsn, r.rec))
.await?;
stdin
.write_all(&build_apply_record_msg(r.lsn, r.rec))
.await?;
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
}
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
};
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
// Read back new page image
let f_stdout = async {
let mut buf = [0u8; 8192];
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
//debug!("got response for {}", tag.blknum);
Ok::<[u8; 8192], Error>(buf)
};
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
// Kill the process. This closes its stdin, which should signal the process
// to terminate. TODO: SIGKILL if needed
//child.wait();
let res = futures::try_join!(f_stdout, f_stdin)?;
let res = futures::try_join!(f_stdout, f_stdin)?;
let buf = res.0;
let buf = res.0;
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
})
Ok::<Bytes, Error>(Bytes::from(std::vec::Vec::from(buf)))
}
}
// Functions for constructing messages to send to the postgres WAL redo
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes {
let len = 4 + 5 * 4;
let mut buf = BytesMut::with_capacity(1 + len);

View File

@@ -1,4 +1,5 @@
//! zenith_utils is intended to be a place to put code that is shared
//! between other crates in this repository.
/// SeqWait allows waiting for a future sequence number to arrive
pub mod seqwait;

View File

@@ -1,3 +1,5 @@
#![warn(missing_docs)]
use std::collections::BTreeMap;
use std::mem;
use std::sync::Mutex;
@@ -63,7 +65,7 @@ impl SeqWait {
// This will steal the entire waiters map.
// When we drop it all waiters will be woken.
mem::take(&mut internal.waiters);
mem::take(&mut internal.waiters)
// Drop the lock as we exit this scope.
};