diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs
index c8da31acc3..2a238b5bc0 100644
--- a/pageserver/src/page_cache.rs
+++ b/pageserver/src/page_cache.rs
@@ -3,27 +3,31 @@
//! isn't much here. If we implement multi-tenancy, this will probably be changed into
//! a hash map, keyed by the tenant ID.
-use crate::PageServerConf;
-//use crate::repository::Repository;
use crate::repository::rocksdb::RocksRepository;
+use crate::repository::Repository;
+use crate::walredo::PostgresRedoManager;
+use crate::PageServerConf;
use lazy_static::lazy_static;
use std::path::Path;
use std::sync::{Arc, Mutex};
lazy_static! {
- pub static ref REPOSITORY: Mutex>> = Mutex::new(None);
+ pub static ref REPOSITORY: Mutex >> = Mutex::new(None);
}
pub fn init(conf: &PageServerConf) {
let mut m = REPOSITORY.lock().unwrap();
+ // Set up a WAL redo manager, for applying WAL records.
+ let walredo_mgr = PostgresRedoManager::new(conf);
+
// we have already changed current dir to the repository.
- let repo = RocksRepository::new(conf, Path::new("."));
+ let repo = RocksRepository::new(conf, Path::new("."), Arc::new(walredo_mgr));
*m = Some(Arc::new(repo));
}
-pub fn get_repository() -> Arc {
+pub fn get_repository() -> Arc {
let o = &REPOSITORY.lock().unwrap();
Arc::clone(o.as_ref().unwrap())
}
diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs
index 153ec15466..06bba6932b 100644
--- a/pageserver/src/page_service.rs
+++ b/pageserver/src/page_service.rs
@@ -25,7 +25,7 @@ use zenith_utils::lsn::Lsn;
use crate::basebackup;
use crate::page_cache;
-use crate::repository::{BufferTag, RelTag, Repository};
+use crate::repository::{BufferTag, RelTag};
use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs
index f5b15927b9..552ee429c2 100644
--- a/pageserver/src/repository/rocksdb.rs
+++ b/pageserver/src/repository/rocksdb.rs
@@ -34,6 +34,8 @@ pub struct RocksRepository {
repo_dir: PathBuf,
conf: PageServerConf,
timelines: Mutex>>,
+
+ walredo_mgr: Arc,
}
pub struct RocksTimeline {
@@ -41,7 +43,7 @@ pub struct RocksTimeline {
db: rocksdb::DB,
// WAL redo manager
- walredo_mgr: WalRedoManager,
+ walredo_mgr: Arc,
// What page versions do we hold in the cache? If we get a request > last_valid_lsn,
// we need to wait until we receive all the WAL up to the request. The SeqWait
@@ -154,11 +156,16 @@ impl CacheEntryContent {
}
impl RocksRepository {
- pub fn new(conf: &PageServerConf, repo_dir: &Path) -> RocksRepository {
+ pub fn new(
+ conf: &PageServerConf,
+ repo_dir: &Path,
+ walredo_mgr: Arc,
+ ) -> RocksRepository {
RocksRepository {
repo_dir: PathBuf::from(repo_dir),
conf: conf.clone(),
timelines: Mutex::new(HashMap::new()),
+ walredo_mgr,
}
}
}
@@ -180,7 +187,8 @@ impl Repository for RocksRepository {
match timelines.get(&timelineid) {
Some(timeline) => Ok(timeline.clone()),
None => {
- let timeline = RocksTimeline::new(&self.conf, &self.repo_dir, timelineid);
+ let timeline =
+ RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone());
restore_timeline(&self.conf, &timeline, timelineid)?;
@@ -222,11 +230,15 @@ impl RocksTimeline {
rocksdb::DB::open(&opts, &path).unwrap()
}
- fn new(conf: &PageServerConf, repo_dir: &Path, timelineid: ZTimelineId) -> RocksTimeline {
+ fn new(
+ repo_dir: &Path,
+ timelineid: ZTimelineId,
+ walredo_mgr: Arc,
+ ) -> RocksTimeline {
RocksTimeline {
db: RocksTimeline::open_rocksdb(repo_dir, timelineid),
- walredo_mgr: WalRedoManager::new(conf, timelineid),
+ walredo_mgr,
last_valid_lsn: SeqWait::new(Lsn(0)),
last_record_lsn: AtomicLsn::new(0),
diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs
index b3a06f4499..be1c8aac44 100644
--- a/pageserver/src/walreceiver.rs
+++ b/pageserver/src/walreceiver.rs
@@ -7,7 +7,6 @@
//!
use crate::page_cache;
-use crate::repository::Repository;
use crate::waldecoder::*;
use crate::PageServerConf;
use crate::ZTimelineId;
diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs
index ae27bc7f7d..6239f553e4 100644
--- a/pageserver/src/walredo.rs
+++ b/pageserver/src/walredo.rs
@@ -1,19 +1,19 @@
-//
-// WAL redo
-//
-// We rely on Postgres to perform WAL redo for us. We launch a
-// postgres process in special "wal redo" mode that's similar to
-// single-user mode. We then pass the the previous page image, if any,
-// and all the WAL records we want to apply, to the postgress
-// process. Then we get the page image back. Communication with the
-// postgres process happens via stdin/stdout
-//
-// See src/backend/tcop/zenith_wal_redo.c for the other side of
-// this communication.
-//
-// TODO: Even though the postgres code runs in a separate process,
-// it's not a secure sandbox.
-//
+//!
+//! WAL redo
+//!
+//! We rely on Postgres to perform WAL redo for us. We launch a
+//! postgres process in special "wal redo" mode that's similar to
+//! single-user mode. We then pass the the previous page image, if any,
+//! and all the WAL records we want to apply, to the postgres
+//! process. Then we get the page image back. Communication with the
+//! postgres process happens via stdin/stdout
+//!
+//! See src/backend/tcop/zenith_wal_redo.c for the other side of
+//! this communication.
+//!
+//! TODO: Even though the postgres code runs in a separate process,
+//! it's not a secure sandbox.
+//!
use bytes::{Buf, BufMut, Bytes, BytesMut};
use log::*;
use std::assert;
@@ -37,25 +37,43 @@ use zenith_utils::lsn::Lsn;
use crate::repository::BufferTag;
use crate::repository::WALRecord;
use crate::PageServerConf;
-use crate::ZTimelineId;
use postgres_ffi::pg_constants;
use postgres_ffi::xlog_utils::XLogRecord;
+///
+/// WAL Redo Manager is responsible for replaying WAL records.
+///
+/// Callers use the WAL redo manager through this abstract interface,
+/// which makes it easy to mock it in tests.
+pub trait WalRedoManager: Send + Sync {
+ /// Apply some WAL records.
+ ///
+ /// The caller passes an old page image, and WAL records that should be
+ /// applied over it. The return value is a new page image, after applying
+ /// the reords.
+ fn request_redo(
+ &self,
+ tag: BufferTag,
+ lsn: Lsn,
+ base_img: Option,
+ records: Vec,
+ ) -> Result;
+}
+
static TIMEOUT: Duration = Duration::from_secs(20);
///
-/// A WAL redo manager consists of two parts: WalRedoManager, and
-/// WalRedoManagerInternal. WalRedoManager is the public struct
+/// The implementation consists of two parts: PostgresRedoManager, and
+/// PostgresRedoManagerInternal. PostgresRedoManager is the public struct
/// that can be used to send redo requests to the manager.
-/// WalRedoManagerInternal is used by the manager thread itself.
+/// PostgresRedoManagerInternal is used by the manager thread itself.
///
-pub struct WalRedoManager {
+pub struct PostgresRedoManager {
request_tx: Mutex>,
}
-struct WalRedoManagerInternal {
+struct PostgresRedoManagerInternal {
_conf: PageServerConf,
- timelineid: ZTimelineId,
request_rx: mpsc::Receiver,
}
@@ -81,13 +99,12 @@ pub enum WalRedoError {
///
/// Public interface of WAL redo manager
///
-impl WalRedoManager {
+impl PostgresRedoManager {
///
- /// Create a new WalRedoManager.
+ /// Create a new PostgresRedoManager.
///
- /// This only initializes the struct. You need to call WalRedoManager::launch to
- /// start the thread that processes the requests.
- pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager {
+ /// This launches a new thread to handle the requests.
+ pub fn new(conf: &PageServerConf) -> PostgresRedoManager {
let (tx, rx) = mpsc::channel();
//
@@ -103,25 +120,28 @@ impl WalRedoManager {
let _walredo_thread = std::thread::Builder::new()
.name("WAL redo thread".into())
.spawn(move || {
- let mut internal = WalRedoManagerInternal {
+ let mut internal = PostgresRedoManagerInternal {
_conf: conf_copy,
- timelineid,
request_rx,
};
internal.wal_redo_main();
})
.unwrap();
- WalRedoManager {
+ PostgresRedoManager {
request_tx: Mutex::new(tx),
}
}
+}
+impl WalRedoManager for PostgresRedoManager {
///
- /// Request the WAL redo manager to apply WAL records, to reconstruct the page image
- /// of the given page version.
+ /// Request the WAL redo manager to apply some WAL records
///
- pub fn request_redo(
+ /// The WAL redo is handled by a separate thread, so this just sends a request
+ /// to the thread and waits for response.
+ ///
+ fn request_redo(
&self,
tag: BufferTag,
lsn: Lsn,
@@ -153,12 +173,12 @@ impl WalRedoManager {
///
/// WAL redo thread
///
-impl WalRedoManagerInternal {
+impl PostgresRedoManagerInternal {
//
// Main entry point for the WAL applicator thread.
//
fn wal_redo_main(&mut self) {
- info!("WAL redo thread started {}", self.timelineid);
+ info!("WAL redo thread started");
// We block on waiting for requests on the walredo request channel, but
// use async I/O to communicate with the child process. Initialize the
@@ -168,12 +188,18 @@ impl WalRedoManagerInternal {
.build()
.unwrap();
- let process: WalRedoProcess;
- let datadir = format!("wal-redo/{}", self.timelineid);
+ let process: PostgresRedoProcess;
- info!("launching WAL redo postgres process {}", self.timelineid);
+ // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
+ // just create one with constant name. That fails if you try to launch more than
+ // one WAL redo manager concurrently.
+ let datadir = format!("wal-redo-datadir");
- process = runtime.block_on(WalRedoProcess::launch(&datadir)).unwrap();
+ info!("launching WAL redo postgres process");
+
+ process = runtime
+ .block_on(PostgresRedoProcess::launch(&datadir))
+ .unwrap();
info!("WAL redo postgres started");
// Loop forever, handling requests as they come.
@@ -216,7 +242,7 @@ impl WalRedoManagerInternal {
///
async fn handle_apply_request(
&self,
- process: &WalRedoProcess,
+ process: &PostgresRedoProcess,
request: &WalRedoRequest,
) -> Result {
let tag = request.tag;
@@ -351,19 +377,19 @@ impl WalRedoManagerInternal {
}
}
-struct WalRedoProcess {
+struct PostgresRedoProcess {
stdin: RefCell,
stdout: RefCell,
}
-impl WalRedoProcess {
+impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
// Tests who run pageserver binary are setting proper PG_BIN_DIR
// and PG_LIB_DIR so that WalRedo would start right postgres. We may later
// switch to setting same things in pageserver config file.
- async fn launch(datadir: &str) -> Result {
+ async fn launch(datadir: &str) -> Result {
// Create empty data directory for wal-redo postgres deleting old one.
fs::remove_dir_all(datadir).ok();
let initdb = Command::new("initdb")
@@ -426,7 +452,7 @@ impl WalRedoProcess {
};
tokio::spawn(f_stderr);
- Ok(WalRedoProcess {
+ Ok(PostgresRedoProcess {
stdin: RefCell::new(stdin),
stdout: RefCell::new(stdout),
})