Make the debugging output from WAL receiver a bit nicer.

This commit is contained in:
Heikki Linnakangas
2021-03-31 21:47:06 +03:00
parent 79e4110cf0
commit 0bf3c3224e

View File

@@ -61,9 +61,11 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
}
});
let _identify_system = rclient.identify_system().await?;
let identify_system = rclient.identify_system().await?;
let end_of_wal = u64::from(identify_system.xlogpos());
let mut caught_up = false;
let sysid : u64 = _identify_system.systemid().parse().unwrap();
let sysid : u64 = identify_system.systemid().parse().unwrap();
let pcache = page_cache::get_pagecahe(conf, sysid);
//
@@ -71,9 +73,9 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
//
let mut startpoint = pcache.get_last_valid_lsn();
if startpoint == 0 {
// If we start here with _identify_system.xlogpos() we will have race condition with
// If we start here with identify_system.xlogpos() we will have race condition with
// postgres start: insert into postgres may request page that was modified with lsn
// smaller than _identify_system.xlogpos().
// smaller than identify_system.xlogpos().
//
// Current procedure for starting postgres will anyway be changed to something
// different like having 'initdb' method on a pageserver (or importing some shared
@@ -91,8 +93,10 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
startpoint += 8 - (startpoint % 8);
}
}
debug!("starting replication from {:X}/{:X}, server is at {:X}/{:X}...",
(startpoint >> 32), (startpoint & 0xffffffff),
(end_of_wal >> 32), (end_of_wal & 0xffffffff));
let startpoint = tokio_postgres::types::Lsn::from(startpoint);
debug!("starting replication from {:?}...", startpoint);
let mut physical_stream = rclient
.start_physical_replication(None, startpoint, None)
.await?;
@@ -108,7 +112,9 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
let startlsn = xlog_data.wal_start();
let endlsn = startlsn + data.len() as u64;
trace!("received XLogData between {} and {}", startlsn, endlsn);
trace!("received XLogData between {:X}/{:X} and {:X}/{:X}",
(startlsn >> 32), (startlsn & 0xffffffff),
(endlsn >> 32), (endlsn & 0xffffffff));
waldecoder.feed_bytes(data);
@@ -155,6 +161,11 @@ async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -
// middle of a record, if the request LSN was taken from the server's current
// flush ptr.
pcache.advance_last_valid_lsn(endlsn);
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {:X}/{:X}", (endlsn >> 32), (endlsn & 0xffffffff));
caught_up = true;
}
}
ReplicationMessage::PrimaryKeepAlive(_keepalive) => {