mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
page server: If WAL receiver dies, restart from where it left off
This commit is contained in:
committed by
Stas Kelvich
parent
ab432dff01
commit
5d0ee11cde
@@ -191,6 +191,28 @@ pub fn advance_last_valid_lsn(lsn: u64)
|
||||
shared.last_valid_lsn = lsn;
|
||||
}
|
||||
|
||||
//
|
||||
pub fn advance_first_valid_lsn(lsn: u64)
|
||||
{
|
||||
let mut shared = PAGECACHE.lock().unwrap();
|
||||
|
||||
// Can't move backwards.
|
||||
assert!(lsn >= shared.first_valid_lsn);
|
||||
|
||||
// Can't overtake last_valid_lsn (except when we're
|
||||
// initializing the system and last_valid_lsn hasn't been set yet.
|
||||
assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn);
|
||||
|
||||
shared.first_valid_lsn = lsn;
|
||||
}
|
||||
|
||||
pub fn get_last_valid_lsn() -> u64
|
||||
{
|
||||
let shared = PAGECACHE.lock().unwrap();
|
||||
|
||||
return shared.last_valid_lsn;
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// Simple test function for the WAL redo code:
|
||||
|
||||
@@ -60,16 +60,31 @@ async fn walreceiver_main() -> Result<(), Error> {
|
||||
let identify_system = rclient.identify_system().await?;
|
||||
|
||||
//
|
||||
// Start streaming the WAL.
|
||||
// Start streaming the WAL, from where we left off previously.
|
||||
//
|
||||
// TODO: currently, we start streaming at the primary's last insert location.
|
||||
// We should start at the last LSN that we had streamed previously, instead.
|
||||
// If this is the first time we start up, start streaming from the primary's
|
||||
// current end of WAL.
|
||||
//
|
||||
let mut physical_stream = rclient
|
||||
.start_physical_replication(None, identify_system.xlogpos(), None)
|
||||
.await.unwrap();
|
||||
// TODO: We should persist the last valid LSN over page server restarts (and
|
||||
// all the data, too, of course). And have some mechanism of bootstrapping.
|
||||
//
|
||||
let last_valid_lsn = page_cache::get_last_valid_lsn();
|
||||
let startpoint = {
|
||||
if last_valid_lsn != 0 {
|
||||
tokio_postgres::types::Lsn::from(last_valid_lsn)
|
||||
} else {
|
||||
let primary_lsn = identify_system.xlogpos();
|
||||
page_cache::advance_first_valid_lsn(u64::from(primary_lsn));
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(u64::from(identify_system.xlogpos()));
|
||||
primary_lsn
|
||||
}
|
||||
};
|
||||
|
||||
let mut physical_stream = rclient
|
||||
.start_physical_replication(None, startpoint, None)
|
||||
.await?;
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
|
||||
|
||||
while let Some(replication_message) = physical_stream.next().await {
|
||||
match replication_message? {
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
//
|
||||
// WAL redo
|
||||
//
|
||||
// We rely on Postgres to perform WAL redo for us. We launch
|
||||
// a postgres process in special "wal redo" mode that's similar
|
||||
// to single-user mode. We then pass the WAL record and the previous
|
||||
// page image, if any, to the postgress process, and ask the
|
||||
// process to apply it. Then we get the page image back. Communication
|
||||
// with the process happens via stdin/stdout
|
||||
// We rely on Postgres to perform WAL redo for us. We launch a
|
||||
// postgres process in special "wal redo" mode that's similar to
|
||||
// single-user mode. We then pass the the previous page image, if any,
|
||||
// and all the WAL records we want to apply, to the postgress
|
||||
// process. Then we get the page image back. Communication with the
|
||||
// postgres process happens via stdin/stdout
|
||||
//
|
||||
// See src/backend/tcop/zenith_wal_redo.c for the other side of
|
||||
// this communication.
|
||||
//
|
||||
// TODO: Even though the postgres code runs in a separate process,
|
||||
// it's not a secure sandbox.
|
||||
|
||||
Reference in New Issue
Block a user