diff --git a/src/walreceiver.rs b/src/walreceiver.rs index 9005aad43b..814721a541 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -61,9 +61,11 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - } }); - let _identify_system = rclient.identify_system().await?; + let identify_system = rclient.identify_system().await?; + let end_of_wal = u64::from(identify_system.xlogpos()); + let mut caught_up = false; - let sysid : u64 = _identify_system.systemid().parse().unwrap(); + let sysid : u64 = identify_system.systemid().parse().unwrap(); let pcache = page_cache::get_pagecahe(conf, sysid); // @@ -71,9 +73,9 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - // let mut startpoint = pcache.get_last_valid_lsn(); if startpoint == 0 { - // If we start here with _identify_system.xlogpos() we will have race condition with + // If we start here with identify_system.xlogpos() we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn - // smaller than _identify_system.xlogpos(). + // smaller than identify_system.xlogpos(). // // Current procedure for starting postgres will anyway be changed to something // different like having 'initdb' method on a pageserver (or importing some shared @@ -91,8 +93,10 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - startpoint += 8 - (startpoint % 8); } } + debug!("starting replication from {:X}/{:X}, server is at {:X}/{:X}...", + (startpoint >> 32), (startpoint & 0xffffffff), + (end_of_wal >> 32), (end_of_wal & 0xffffffff)); let startpoint = tokio_postgres::types::Lsn::from(startpoint); - debug!("starting replication from {:?}...", startpoint); let mut physical_stream = rclient .start_physical_replication(None, startpoint, None) .await?; @@ -108,7 +112,9 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - let startlsn = xlog_data.wal_start(); let endlsn = startlsn + data.len() as u64; - trace!("received XLogData between {} and {}", startlsn, endlsn); + trace!("received XLogData between {:X}/{:X} and {:X}/{:X}", + (startlsn >> 32), (startlsn & 0xffffffff), + (endlsn >> 32), (endlsn & 0xffffffff)); waldecoder.feed_bytes(data); @@ -155,6 +161,11 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) - // middle of a record, if the request LSN was taken from the server's current // flush ptr. pcache.advance_last_valid_lsn(endlsn); + + if !caught_up && endlsn >= end_of_wal { + info!("caught up at LSN {:X}/{:X}", (endlsn >> 32), (endlsn & 0xffffffff)); + caught_up = true; + } } ReplicationMessage::PrimaryKeepAlive(_keepalive) => {