mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Refactor WalRedoManager for easier testing.
Turn WalRedoManager into an abstract trait, so that it can be easily mocked in unit tests. One change here is that the WAL redo manager is no longer tied to a specific zenith timeline. It didn't do anything with that information aside from using it in the dummy datadir's name. We could use any random string for that purpose, it's just to prevent two WAL redo managers from stepping over each other. But this commit actually changes things so that all timelines use the same WAL redo manager, so that's not necessary. We will probably want to maintain a pool of WAL redo processes in the future, but for now let's keep it simple. In the passing, fix some comments.
This commit is contained in:
@@ -3,27 +3,31 @@
|
||||
//! isn't much here. If we implement multi-tenancy, this will probably be changed into
|
||||
//! a hash map, keyed by the tenant ID.
|
||||
|
||||
use crate::PageServerConf;
|
||||
//use crate::repository::Repository;
|
||||
use crate::repository::rocksdb::RocksRepository;
|
||||
use crate::repository::Repository;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use lazy_static::lazy_static;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
lazy_static! {
|
||||
pub static ref REPOSITORY: Mutex<Option<Arc<RocksRepository>>> = Mutex::new(None);
|
||||
pub static ref REPOSITORY: Mutex<Option<Arc<dyn Repository + Send + Sync>>> = Mutex::new(None);
|
||||
}
|
||||
|
||||
pub fn init(conf: &PageServerConf) {
|
||||
let mut m = REPOSITORY.lock().unwrap();
|
||||
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf);
|
||||
|
||||
// we have already changed current dir to the repository.
|
||||
let repo = RocksRepository::new(conf, Path::new("."));
|
||||
let repo = RocksRepository::new(conf, Path::new("."), Arc::new(walredo_mgr));
|
||||
|
||||
*m = Some(Arc::new(repo));
|
||||
}
|
||||
|
||||
pub fn get_repository() -> Arc<RocksRepository> {
|
||||
pub fn get_repository() -> Arc<dyn Repository + Send + Sync> {
|
||||
let o = &REPOSITORY.lock().unwrap();
|
||||
Arc::clone(o.as_ref().unwrap())
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ use zenith_utils::lsn::Lsn;
|
||||
|
||||
use crate::basebackup;
|
||||
use crate::page_cache;
|
||||
use crate::repository::{BufferTag, RelTag, Repository};
|
||||
use crate::repository::{BufferTag, RelTag};
|
||||
use crate::restore_local_repo;
|
||||
use crate::walreceiver;
|
||||
use crate::PageServerConf;
|
||||
|
||||
@@ -34,6 +34,8 @@ pub struct RocksRepository {
|
||||
repo_dir: PathBuf,
|
||||
conf: PageServerConf,
|
||||
timelines: Mutex<HashMap<ZTimelineId, Arc<RocksTimeline>>>,
|
||||
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
}
|
||||
|
||||
pub struct RocksTimeline {
|
||||
@@ -41,7 +43,7 @@ pub struct RocksTimeline {
|
||||
db: rocksdb::DB,
|
||||
|
||||
// WAL redo manager
|
||||
walredo_mgr: WalRedoManager,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
|
||||
// What page versions do we hold in the cache? If we get a request > last_valid_lsn,
|
||||
// we need to wait until we receive all the WAL up to the request. The SeqWait
|
||||
@@ -154,11 +156,16 @@ impl CacheEntryContent {
|
||||
}
|
||||
|
||||
impl RocksRepository {
|
||||
pub fn new(conf: &PageServerConf, repo_dir: &Path) -> RocksRepository {
|
||||
pub fn new(
|
||||
conf: &PageServerConf,
|
||||
repo_dir: &Path,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksRepository {
|
||||
RocksRepository {
|
||||
repo_dir: PathBuf::from(repo_dir),
|
||||
conf: conf.clone(),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
walredo_mgr,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,7 +187,8 @@ impl Repository for RocksRepository {
|
||||
match timelines.get(&timelineid) {
|
||||
Some(timeline) => Ok(timeline.clone()),
|
||||
None => {
|
||||
let timeline = RocksTimeline::new(&self.conf, &self.repo_dir, timelineid);
|
||||
let timeline =
|
||||
RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone());
|
||||
|
||||
restore_timeline(&self.conf, &timeline, timelineid)?;
|
||||
|
||||
@@ -222,11 +230,15 @@ impl RocksTimeline {
|
||||
rocksdb::DB::open(&opts, &path).unwrap()
|
||||
}
|
||||
|
||||
fn new(conf: &PageServerConf, repo_dir: &Path, timelineid: ZTimelineId) -> RocksTimeline {
|
||||
fn new(
|
||||
repo_dir: &Path,
|
||||
timelineid: ZTimelineId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager>,
|
||||
) -> RocksTimeline {
|
||||
RocksTimeline {
|
||||
db: RocksTimeline::open_rocksdb(repo_dir, timelineid),
|
||||
|
||||
walredo_mgr: WalRedoManager::new(conf, timelineid),
|
||||
walredo_mgr,
|
||||
|
||||
last_valid_lsn: SeqWait::new(Lsn(0)),
|
||||
last_record_lsn: AtomicLsn::new(0),
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
//!
|
||||
|
||||
use crate::page_cache;
|
||||
use crate::repository::Repository;
|
||||
use crate::waldecoder::*;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
|
||||
@@ -1,19 +1,19 @@
|
||||
//
|
||||
// WAL redo
|
||||
//
|
||||
// We rely on Postgres to perform WAL redo for us. We launch a
|
||||
// postgres process in special "wal redo" mode that's similar to
|
||||
// single-user mode. We then pass the the previous page image, if any,
|
||||
// and all the WAL records we want to apply, to the postgress
|
||||
// process. Then we get the page image back. Communication with the
|
||||
// postgres process happens via stdin/stdout
|
||||
//
|
||||
// See src/backend/tcop/zenith_wal_redo.c for the other side of
|
||||
// this communication.
|
||||
//
|
||||
// TODO: Even though the postgres code runs in a separate process,
|
||||
// it's not a secure sandbox.
|
||||
//
|
||||
//!
|
||||
//! WAL redo
|
||||
//!
|
||||
//! We rely on Postgres to perform WAL redo for us. We launch a
|
||||
//! postgres process in special "wal redo" mode that's similar to
|
||||
//! single-user mode. We then pass the the previous page image, if any,
|
||||
//! and all the WAL records we want to apply, to the postgres
|
||||
//! process. Then we get the page image back. Communication with the
|
||||
//! postgres process happens via stdin/stdout
|
||||
//!
|
||||
//! See src/backend/tcop/zenith_wal_redo.c for the other side of
|
||||
//! this communication.
|
||||
//!
|
||||
//! TODO: Even though the postgres code runs in a separate process,
|
||||
//! it's not a secure sandbox.
|
||||
//!
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use log::*;
|
||||
use std::assert;
|
||||
@@ -37,25 +37,43 @@ use zenith_utils::lsn::Lsn;
|
||||
use crate::repository::BufferTag;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
use crate::ZTimelineId;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::xlog_utils::XLogRecord;
|
||||
|
||||
///
|
||||
/// WAL Redo Manager is responsible for replaying WAL records.
|
||||
///
|
||||
/// Callers use the WAL redo manager through this abstract interface,
|
||||
/// which makes it easy to mock it in tests.
|
||||
pub trait WalRedoManager: Send + Sync {
|
||||
/// Apply some WAL records.
|
||||
///
|
||||
/// The caller passes an old page image, and WAL records that should be
|
||||
/// applied over it. The return value is a new page image, after applying
|
||||
/// the reords.
|
||||
fn request_redo(
|
||||
&self,
|
||||
tag: BufferTag,
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: Vec<WALRecord>,
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
static TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
///
|
||||
/// A WAL redo manager consists of two parts: WalRedoManager, and
|
||||
/// WalRedoManagerInternal. WalRedoManager is the public struct
|
||||
/// The implementation consists of two parts: PostgresRedoManager, and
|
||||
/// PostgresRedoManagerInternal. PostgresRedoManager is the public struct
|
||||
/// that can be used to send redo requests to the manager.
|
||||
/// WalRedoManagerInternal is used by the manager thread itself.
|
||||
/// PostgresRedoManagerInternal is used by the manager thread itself.
|
||||
///
|
||||
pub struct WalRedoManager {
|
||||
pub struct PostgresRedoManager {
|
||||
request_tx: Mutex<mpsc::Sender<WalRedoRequest>>,
|
||||
}
|
||||
|
||||
struct WalRedoManagerInternal {
|
||||
struct PostgresRedoManagerInternal {
|
||||
_conf: PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
|
||||
request_rx: mpsc::Receiver<WalRedoRequest>,
|
||||
}
|
||||
@@ -81,13 +99,12 @@ pub enum WalRedoError {
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
impl WalRedoManager {
|
||||
impl PostgresRedoManager {
|
||||
///
|
||||
/// Create a new WalRedoManager.
|
||||
/// Create a new PostgresRedoManager.
|
||||
///
|
||||
/// This only initializes the struct. You need to call WalRedoManager::launch to
|
||||
/// start the thread that processes the requests.
|
||||
pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager {
|
||||
/// This launches a new thread to handle the requests.
|
||||
pub fn new(conf: &PageServerConf) -> PostgresRedoManager {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
//
|
||||
@@ -103,25 +120,28 @@ impl WalRedoManager {
|
||||
let _walredo_thread = std::thread::Builder::new()
|
||||
.name("WAL redo thread".into())
|
||||
.spawn(move || {
|
||||
let mut internal = WalRedoManagerInternal {
|
||||
let mut internal = PostgresRedoManagerInternal {
|
||||
_conf: conf_copy,
|
||||
timelineid,
|
||||
request_rx,
|
||||
};
|
||||
internal.wal_redo_main();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
WalRedoManager {
|
||||
PostgresRedoManager {
|
||||
request_tx: Mutex::new(tx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WalRedoManager for PostgresRedoManager {
|
||||
///
|
||||
/// Request the WAL redo manager to apply WAL records, to reconstruct the page image
|
||||
/// of the given page version.
|
||||
/// Request the WAL redo manager to apply some WAL records
|
||||
///
|
||||
pub fn request_redo(
|
||||
/// The WAL redo is handled by a separate thread, so this just sends a request
|
||||
/// to the thread and waits for response.
|
||||
///
|
||||
fn request_redo(
|
||||
&self,
|
||||
tag: BufferTag,
|
||||
lsn: Lsn,
|
||||
@@ -153,12 +173,12 @@ impl WalRedoManager {
|
||||
///
|
||||
/// WAL redo thread
|
||||
///
|
||||
impl WalRedoManagerInternal {
|
||||
impl PostgresRedoManagerInternal {
|
||||
//
|
||||
// Main entry point for the WAL applicator thread.
|
||||
//
|
||||
fn wal_redo_main(&mut self) {
|
||||
info!("WAL redo thread started {}", self.timelineid);
|
||||
info!("WAL redo thread started");
|
||||
|
||||
// We block on waiting for requests on the walredo request channel, but
|
||||
// use async I/O to communicate with the child process. Initialize the
|
||||
@@ -168,12 +188,18 @@ impl WalRedoManagerInternal {
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let process: WalRedoProcess;
|
||||
let datadir = format!("wal-redo/{}", self.timelineid);
|
||||
let process: PostgresRedoProcess;
|
||||
|
||||
info!("launching WAL redo postgres process {}", self.timelineid);
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = format!("wal-redo-datadir");
|
||||
|
||||
process = runtime.block_on(WalRedoProcess::launch(&datadir)).unwrap();
|
||||
info!("launching WAL redo postgres process");
|
||||
|
||||
process = runtime
|
||||
.block_on(PostgresRedoProcess::launch(&datadir))
|
||||
.unwrap();
|
||||
info!("WAL redo postgres started");
|
||||
|
||||
// Loop forever, handling requests as they come.
|
||||
@@ -216,7 +242,7 @@ impl WalRedoManagerInternal {
|
||||
///
|
||||
async fn handle_apply_request(
|
||||
&self,
|
||||
process: &WalRedoProcess,
|
||||
process: &PostgresRedoProcess,
|
||||
request: &WalRedoRequest,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let tag = request.tag;
|
||||
@@ -351,19 +377,19 @@ impl WalRedoManagerInternal {
|
||||
}
|
||||
}
|
||||
|
||||
struct WalRedoProcess {
|
||||
struct PostgresRedoProcess {
|
||||
stdin: RefCell<ChildStdin>,
|
||||
stdout: RefCell<ChildStdout>,
|
||||
}
|
||||
|
||||
impl WalRedoProcess {
|
||||
impl PostgresRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
// Tests who run pageserver binary are setting proper PG_BIN_DIR
|
||||
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
|
||||
// switch to setting same things in pageserver config file.
|
||||
async fn launch(datadir: &str) -> Result<WalRedoProcess, Error> {
|
||||
async fn launch(datadir: &str) -> Result<PostgresRedoProcess, Error> {
|
||||
// Create empty data directory for wal-redo postgres deleting old one.
|
||||
fs::remove_dir_all(datadir).ok();
|
||||
let initdb = Command::new("initdb")
|
||||
@@ -426,7 +452,7 @@ impl WalRedoProcess {
|
||||
};
|
||||
tokio::spawn(f_stderr);
|
||||
|
||||
Ok(WalRedoProcess {
|
||||
Ok(PostgresRedoProcess {
|
||||
stdin: RefCell::new(stdin),
|
||||
stdout: RefCell::new(stdout),
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user