diff --git a/src/page_cache.rs b/src/page_cache.rs index 3468f3ed4d..560ad53e86 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -191,6 +191,28 @@ pub fn advance_last_valid_lsn(lsn: u64) shared.last_valid_lsn = lsn; } +// +pub fn advance_first_valid_lsn(lsn: u64) +{ + let mut shared = PAGECACHE.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.first_valid_lsn); + + // Can't overtake last_valid_lsn (except when we're + // initializing the system and last_valid_lsn hasn't been set yet. + assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + + shared.first_valid_lsn = lsn; +} + +pub fn get_last_valid_lsn() -> u64 +{ + let shared = PAGECACHE.lock().unwrap(); + + return shared.last_valid_lsn; +} + // // Simple test function for the WAL redo code: diff --git a/src/walreceiver.rs b/src/walreceiver.rs index cb6c4e373d..8da64c414f 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -60,16 +60,31 @@ async fn walreceiver_main() -> Result<(), Error> { let identify_system = rclient.identify_system().await?; // - // Start streaming the WAL. + // Start streaming the WAL, from where we left off previously. // - // TODO: currently, we start streaming at the primary's last insert location. - // We should start at the last LSN that we had streamed previously, instead. + // If this is the first time we start up, start streaming from the primary's + // current end of WAL. // - let mut physical_stream = rclient - .start_physical_replication(None, identify_system.xlogpos(), None) - .await.unwrap(); + // TODO: We should persist the last valid LSN over page server restarts (and + // all the data, too, of course). And have some mechanism of bootstrapping. + // + let last_valid_lsn = page_cache::get_last_valid_lsn(); + let startpoint = { + if last_valid_lsn != 0 { + tokio_postgres::types::Lsn::from(last_valid_lsn) + } else { + let primary_lsn = identify_system.xlogpos(); + page_cache::advance_first_valid_lsn(u64::from(primary_lsn)); - let mut waldecoder = WalStreamDecoder::new(u64::from(identify_system.xlogpos())); + primary_lsn + } + }; + + let mut physical_stream = rclient + .start_physical_replication(None, startpoint, None) + .await?; + + let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); while let Some(replication_message) = physical_stream.next().await { match replication_message? { diff --git a/src/walredo.rs b/src/walredo.rs index 29c8f139ef..a33472c367 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -1,12 +1,15 @@ // // WAL redo // -// We rely on Postgres to perform WAL redo for us. We launch -// a postgres process in special "wal redo" mode that's similar -// to single-user mode. We then pass the WAL record and the previous -// page image, if any, to the postgress process, and ask the -// process to apply it. Then we get the page image back. Communication -// with the process happens via stdin/stdout +// We rely on Postgres to perform WAL redo for us. We launch a +// postgres process in special "wal redo" mode that's similar to +// single-user mode. We then pass the the previous page image, if any, +// and all the WAL records we want to apply, to the postgress +// process. Then we get the page image back. Communication with the +// postgres process happens via stdin/stdout +// +// See src/backend/tcop/zenith_wal_redo.c for the other side of +// this communication. // // TODO: Even though the postgres code runs in a separate process, // it's not a secure sandbox.