Refactor WAL redo to not use a separate thread.

My main motivation is to make it easier to attribute time spent in WAL
redo to the request that needed the WAL redo. With this patch, the WAL
redo is performed by the requester thread, so it shows up in stack traces
and in 'perf' report as part of the requester's call stack. This is also
slightly simpler (less lines of code) and should be a bit faster too.
This commit is contained in:
Heikki Linnakangas
2021-08-13 17:23:36 +03:00
parent f8de71eab0
commit 6e22a8f709

View File

@@ -1,6 +1,7 @@
//! //!
//! WAL redo. This service runs PostgreSQL in a special wal_redo mode //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
//! to apply given WAL records over an old page image and return new page image. //! to apply given WAL records over an old page image and return new
//! page image.
//! //!
//! We rely on Postgres to perform WAL redo for us. We launch a //! We rely on Postgres to perform WAL redo for us. We launch a
//! postgres process in special "wal redo" mode that's similar to //! postgres process in special "wal redo" mode that's similar to
@@ -12,8 +13,10 @@
//! See src/backend/tcop/zenith_wal_redo.c for the other side of //! See src/backend/tcop/zenith_wal_redo.c for the other side of
//! this communication. //! this communication.
//! //!
//! TODO: Even though the postgres code runs in a separate process, //! The Postgres process is assumed to be secure against malicious WAL
//! it's not a secure sandbox. //! records. It achieves it by dropping privileges before replaying
//! any WAL records, so that even if an attacker hijacks the Postgres
//! process, he cannot escape out of it.
//! //!
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
@@ -26,7 +29,6 @@ use std::io::prelude::*;
use std::io::Error; use std::io::Error;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Stdio; use std::process::Stdio;
use std::sync::mpsc;
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@@ -49,7 +51,6 @@ use postgres_ffi::XLogRecord;
/// ///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
/// This is used as a part of the key inside key-value storage (RocksDB currently).
/// ///
/// In Postgres `BufferTag` structure is used for exactly the same purpose. /// In Postgres `BufferTag` structure is used for exactly the same purpose.
/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91). /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
@@ -103,21 +104,18 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
static TIMEOUT: Duration = Duration::from_secs(20); static TIMEOUT: Duration = Duration::from_secs(20);
/// ///
/// The implementation consists of two parts: PostgresRedoManager, and /// This is the real implementation that uses a Postgres process to
/// PostgresRedoManagerInternal. PostgresRedoManager is the public struct /// perform WAL replay. Only one thread can use the processs at a time,
/// that can be used to send redo requests to the manager. /// that is controlled by the Mutex. In the future, we might want to
/// PostgresRedoManagerInternal is used by the manager thread itself. /// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// ///
pub struct PostgresRedoManager { pub struct PostgresRedoManager {
request_tx: Mutex<mpsc::Sender<WalRedoRequest>>, tenantid: ZTenantId,
}
struct PostgresRedoManagerInternal {
conf: &'static PageServerConf, conf: &'static PageServerConf,
request_rx: mpsc::Receiver<WalRedoRequest>, runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
tenantid: ZTenantId,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -128,8 +126,6 @@ struct WalRedoRequest {
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<WALRecord>, records: Vec<WALRecord>,
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
} }
/// An error happened in WAL redo /// An error happened in WAL redo
@@ -145,41 +141,6 @@ pub enum WalRedoError {
/// ///
/// Public interface of WAL redo manager /// Public interface of WAL redo manager
/// ///
impl PostgresRedoManager {
///
/// Create a new PostgresRedoManager.
///
/// This launches a new thread to handle the requests.
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
let (tx, rx) = mpsc::channel();
//
// Launch the WAL redo thread
//
// Get mutable references to the values that we need to pass to the
// thread.
let request_rx = rx;
// Currently, the join handle is not saved anywhere and we
// won't try restart the thread if it dies.
let _walredo_thread = std::thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
let mut internal = PostgresRedoManagerInternal {
conf,
request_rx,
tenantid,
};
internal.wal_redo_main();
})
.unwrap();
PostgresRedoManager {
request_tx: Mutex::new(tx),
}
}
}
impl WalRedoManager for PostgresRedoManager { impl WalRedoManager for PostgresRedoManager {
/// ///
/// Request the WAL redo manager to apply some WAL records /// Request the WAL redo manager to apply some WAL records
@@ -195,8 +156,6 @@ impl WalRedoManager for PostgresRedoManager {
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<WALRecord>, records: Vec<WALRecord>,
) -> Result<Bytes, WalRedoError> { ) -> Result<Bytes, WalRedoError> {
// Create a channel where to receive the response
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
let request = WalRedoRequest { let request = WalRedoRequest {
rel, rel,
@@ -204,17 +163,18 @@ impl WalRedoManager for PostgresRedoManager {
lsn, lsn,
base_img, base_img,
records, records,
response_channel: tx,
}; };
self.request_tx // launch the WAL redo process on first use
.lock() let mut process_guard = self.process.lock().unwrap();
.unwrap() if process_guard.is_none() {
.send(request) let p = self.runtime
.expect("could not send WAL redo request"); .block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
*process_guard = Some(p);
}
let process = (*process_guard).as_ref().unwrap();
rx.recv() self.runtime.block_on(self.handle_apply_request(&process, &request))
.expect("could not receive response to WAL redo request")
} }
} }
@@ -236,15 +196,12 @@ fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize + (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize
} }
/// impl PostgresRedoManager {
/// WAL redo thread
/// ///
impl PostgresRedoManagerInternal { /// Create a new PostgresRedoManager.
// ///
// Main entry point for the WAL applicator thread. pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
//
fn wal_redo_main(&mut self) {
info!("WAL redo thread started for tenant: {}", self.tenantid);
// We block on waiting for requests on the walredo request channel, but // We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the // use async I/O to communicate with the child process. Initialize the
@@ -254,33 +211,12 @@ impl PostgresRedoManagerInternal {
.build() .build()
.unwrap(); .unwrap();
let process: PostgresRedoProcess; // The actual process is launched lazily, on first request.
PostgresRedoManager {
info!( runtime,
"launching WAL redo postgres process for tenant: {}", tenantid,
self.tenantid conf,
); process: Mutex::new(None),
process = runtime
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))
.unwrap();
// Loop forever, handling requests as they come.
loop {
let request = self
.request_rx
.recv()
.expect("WAL redo request channel was closed");
let result = runtime.block_on(self.handle_apply_request(&process, &request));
let result_ok = result.is_ok();
// Send the result to the requester
let _ = request.response_channel.send(result);
if !result_ok {
error!("wal-redo-postgres failed to apply request {:?}", request);
}
} }
} }
@@ -477,6 +413,9 @@ impl PostgresRedoManagerInternal {
} }
} }
///
/// Handle to the Postgres WAL redo process
///
struct PostgresRedoProcess { struct PostgresRedoProcess {
stdin: RefCell<ChildStdin>, stdin: RefCell<ChildStdin>,
stdout: RefCell<ChildStdout>, stdout: RefCell<ChildStdout>,