mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-25 21:40:37 +00:00
Refactor WAL redo to not use a separate thread.
My main motivation is to make it easier to attribute time spent in WAL redo to the request that needed the WAL redo. With this patch, the WAL redo is performed by the requester thread, so it shows up in stack traces and in 'perf' report as part of the requester's call stack. This is also slightly simpler (less lines of code) and should be a bit faster too.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
//!
|
||||
//! WAL redo. This service runs PostgreSQL in a special wal_redo mode
|
||||
//! to apply given WAL records over an old page image and return new page image.
|
||||
//! to apply given WAL records over an old page image and return new
|
||||
//! page image.
|
||||
//!
|
||||
//! We rely on Postgres to perform WAL redo for us. We launch a
|
||||
//! postgres process in special "wal redo" mode that's similar to
|
||||
@@ -12,8 +13,10 @@
|
||||
//! See src/backend/tcop/zenith_wal_redo.c for the other side of
|
||||
//! this communication.
|
||||
//!
|
||||
//! TODO: Even though the postgres code runs in a separate process,
|
||||
//! it's not a secure sandbox.
|
||||
//! The Postgres process is assumed to be secure against malicious WAL
|
||||
//! records. It achieves it by dropping privileges before replaying
|
||||
//! any WAL records, so that even if an attacker hijacks the Postgres
|
||||
//! process, he cannot escape out of it.
|
||||
//!
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
@@ -26,7 +29,6 @@ use std::io::prelude::*;
|
||||
use std::io::Error;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -49,7 +51,6 @@ use postgres_ffi::XLogRecord;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
/// This is used as a part of the key inside key-value storage (RocksDB currently).
|
||||
///
|
||||
/// In Postgres `BufferTag` structure is used for exactly the same purpose.
|
||||
/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
|
||||
@@ -103,21 +104,18 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
|
||||
static TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
///
|
||||
/// The implementation consists of two parts: PostgresRedoManager, and
|
||||
/// PostgresRedoManagerInternal. PostgresRedoManager is the public struct
|
||||
/// that can be used to send redo requests to the manager.
|
||||
/// PostgresRedoManagerInternal is used by the manager thread itself.
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the processs at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
request_tx: Mutex<mpsc::Sender<WalRedoRequest>>,
|
||||
}
|
||||
|
||||
struct PostgresRedoManagerInternal {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
request_rx: mpsc::Receiver<WalRedoRequest>,
|
||||
|
||||
tenantid: ZTenantId,
|
||||
runtime: tokio::runtime::Runtime,
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -128,8 +126,6 @@ struct WalRedoRequest {
|
||||
|
||||
base_img: Option<Bytes>,
|
||||
records: Vec<WALRecord>,
|
||||
|
||||
response_channel: mpsc::Sender<Result<Bytes, WalRedoError>>,
|
||||
}
|
||||
|
||||
/// An error happened in WAL redo
|
||||
@@ -145,41 +141,6 @@ pub enum WalRedoError {
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
/// This launches a new thread to handle the requests.
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
//
|
||||
// Launch the WAL redo thread
|
||||
//
|
||||
// Get mutable references to the values that we need to pass to the
|
||||
// thread.
|
||||
let request_rx = rx;
|
||||
|
||||
// Currently, the join handle is not saved anywhere and we
|
||||
// won't try restart the thread if it dies.
|
||||
let _walredo_thread = std::thread::Builder::new()
|
||||
.name("WAL redo thread".into())
|
||||
.spawn(move || {
|
||||
let mut internal = PostgresRedoManagerInternal {
|
||||
conf,
|
||||
request_rx,
|
||||
tenantid,
|
||||
};
|
||||
internal.wal_redo_main();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
PostgresRedoManager {
|
||||
request_tx: Mutex::new(tx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WalRedoManager for PostgresRedoManager {
|
||||
///
|
||||
/// Request the WAL redo manager to apply some WAL records
|
||||
@@ -195,8 +156,6 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
base_img: Option<Bytes>,
|
||||
records: Vec<WALRecord>,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
// Create a channel where to receive the response
|
||||
let (tx, rx) = mpsc::channel::<Result<Bytes, WalRedoError>>();
|
||||
|
||||
let request = WalRedoRequest {
|
||||
rel,
|
||||
@@ -204,17 +163,18 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
lsn,
|
||||
base_img,
|
||||
records,
|
||||
response_channel: tx,
|
||||
};
|
||||
|
||||
self.request_tx
|
||||
.lock()
|
||||
.unwrap()
|
||||
.send(request)
|
||||
.expect("could not send WAL redo request");
|
||||
// launch the WAL redo process on first use
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
if process_guard.is_none() {
|
||||
let p = self.runtime
|
||||
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = (*process_guard).as_ref().unwrap();
|
||||
|
||||
rx.recv()
|
||||
.expect("could not receive response to WAL redo request")
|
||||
self.runtime.block_on(self.handle_apply_request(&process, &request))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,15 +196,12 @@ fn mx_offset_to_member_offset(xid: MultiXactId) -> usize {
|
||||
+ (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize
|
||||
}
|
||||
|
||||
///
|
||||
/// WAL redo thread
|
||||
///
|
||||
impl PostgresRedoManagerInternal {
|
||||
//
|
||||
// Main entry point for the WAL applicator thread.
|
||||
//
|
||||
fn wal_redo_main(&mut self) {
|
||||
info!("WAL redo thread started for tenant: {}", self.tenantid);
|
||||
impl PostgresRedoManager {
|
||||
|
||||
///
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager {
|
||||
|
||||
// We block on waiting for requests on the walredo request channel, but
|
||||
// use async I/O to communicate with the child process. Initialize the
|
||||
@@ -254,33 +211,12 @@ impl PostgresRedoManagerInternal {
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let process: PostgresRedoProcess;
|
||||
|
||||
info!(
|
||||
"launching WAL redo postgres process for tenant: {}",
|
||||
self.tenantid
|
||||
);
|
||||
|
||||
process = runtime
|
||||
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))
|
||||
.unwrap();
|
||||
|
||||
// Loop forever, handling requests as they come.
|
||||
loop {
|
||||
let request = self
|
||||
.request_rx
|
||||
.recv()
|
||||
.expect("WAL redo request channel was closed");
|
||||
|
||||
let result = runtime.block_on(self.handle_apply_request(&process, &request));
|
||||
let result_ok = result.is_ok();
|
||||
|
||||
// Send the result to the requester
|
||||
let _ = request.response_channel.send(result);
|
||||
|
||||
if !result_ok {
|
||||
error!("wal-redo-postgres failed to apply request {:?}", request);
|
||||
}
|
||||
// The actual process is launched lazily, on first request.
|
||||
PostgresRedoManager {
|
||||
runtime,
|
||||
tenantid,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,6 +413,9 @@ impl PostgresRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle to the Postgres WAL redo process
|
||||
///
|
||||
struct PostgresRedoProcess {
|
||||
stdin: RefCell<ChildStdin>,
|
||||
stdout: RefCell<ChildStdout>,
|
||||
|
||||
Reference in New Issue
Block a user