diff --git a/integration_tests/tests/control_plane/mod.rs b/integration_tests/tests/control_plane/mod.rs index eab3f345af..5ace192ac0 100644 --- a/integration_tests/tests/control_plane/mod.rs +++ b/integration_tests/tests/control_plane/mod.rs @@ -178,6 +178,7 @@ impl PageServerNode { .arg("--skip-recovery") .env_clear() .env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary + .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) .status() .expect("failed to start pageserver"); diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index ea36de3af3..a4c108cd59 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -373,8 +373,8 @@ impl PageCache { lsn ); return Err(format!( - "Timed out while waiting for WAL record at LSN {} to arrive", - lsn + "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", + lsn >> 32, lsn & 0xffff_ffff ))?; } } @@ -383,11 +383,8 @@ impl PageCache { } if lsn < shared.first_valid_lsn { - error!( - "LSN {} has already been removed", - lsn - ); - return Err(format!("LSN {} has already been removed", lsn))?; + return Err(format!("LSN {:X}/{:X} has already been removed", + lsn >> 32, lsn & 0xffff_ffff))?; } } let mut buf = BytesMut::new(); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 90d2ed470e..227cc6924f 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -154,12 +154,14 @@ impl WalRedoProcess { Command::new("initdb") .args(&["-D", datadir.to_str().unwrap()]) .arg("-N") - .status(), + .output(), ) .expect("failed to execute initdb"); - if !initdb.success() { - panic!("initdb failed"); + if !initdb.status.success() { + panic!("initdb failed: {}\nstderr:\n{}", + std::str::from_utf8(&initdb.stdout).unwrap(), + std::str::from_utf8(&initdb.stderr).unwrap()); } // Start postgres itself diff --git a/walkeeper/README b/walkeeper/README new file mode 100644 index 0000000000..9672cc3b76 --- /dev/null +++ b/walkeeper/README @@ -0,0 +1,38 @@ +# WAL safekeeper + +Also know as the WAL service, WAL keeper or WAL acceptor. + +The WAL safekeeper acts as a holding area and redistribution center +for recently generated WAL. The primary Postgres server streams the +WAL to the WAL safekeeper, and treats it like a (synchronous) +replica. A replication slot is used in the primary to prevent the +primary from discarding WAL that hasn't been streamed to the +safekeeper yet. + +The primary connects to the WAL safekeeper, so it works in a "push" +fashion. That's different from how streaming replication usually +works, where the replica initiates the connection. To do that, there +is a component called "safekeeper_proxy". The safekeeper_proxy runs on +the same host as the primary Postgres server and connects to it to do +streaming replication. It also connects to the WAL safekeeper, and +forwards all the WAL. (PostgreSQL's archive_commands works in the +"push" style, but it operates on a WAL segment granularity. If +PostgreSQL had a push style API for streaming, we wouldn't need the +proxy). + +The Page Server connects to the WAL safekeeper, using the same +streaming replication protocol that's used between Postgres primary +and standby. You can also connect the Page Server directly to a +primary PostgreSQL node for testing. + +In a production installation, there are multiple WAL safekeepers +running on different nodes, and there is a quorum mechanism using the +Paxos algorithm to ensure that a piece of WAL is considered as durable +only after it has been flushed to disk on more than half of the WAL +safekeepers. The Paxos and crash recovery algorithm ensures that only +one primary node can be actively streaming WAL to the quorum of +safekeepers. + + +See vendor/postgres/src/bin/safekeeper/README.md for a more detailed +desription of the consensus protocol. (TODO: move the text here?) diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index b91a966afa..d70c10ce22 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -585,7 +585,7 @@ impl Connection { fn set_system(&mut self, id: SystemId) -> Result<()> { let mut systems = SYSTEMS.lock().unwrap(); if id == 0 { - // non-multitenant configuration: just sigle instance + // non-multitenant configuration: just a single instance if let Some(system) = systems.values().next() { self.system = Some(system.clone()); return Ok(()); @@ -937,6 +937,10 @@ impl Connection { /* * Always start streaming at the beginning of a segment + * + * FIXME: It is common practice to start streaming at the beginning of + * the segment, but it should be up to the client to decide that. We + * shouldn't enforce that here. */ start_pos -= XLogSegmentOffset(start_pos, wal_seg_size) as u64;