mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
If WAL streaming connection is lost, restart at right place.
Need to restart at the end of last WAL record, not in the middle of a record if we had previously streamed a partial record.
This commit is contained in:
@@ -46,6 +46,7 @@ pub struct PageCache {
|
||||
// that they can be read without acquiring the mutex).
|
||||
pub first_valid_lsn: AtomicU64,
|
||||
pub last_valid_lsn: AtomicU64,
|
||||
pub last_record_lsn: AtomicU64,
|
||||
}
|
||||
|
||||
pub struct PageCacheStats {
|
||||
@@ -55,6 +56,7 @@ pub struct PageCacheStats {
|
||||
pub num_getpage_requests: u64,
|
||||
pub first_valid_lsn: u64,
|
||||
pub last_valid_lsn: u64,
|
||||
pub last_record_lsn: u64,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -77,8 +79,18 @@ struct PageCacheShared {
|
||||
// page version. If we get a request > last_valid_lsn, we need to wait until
|
||||
// we receive all the WAL up to the request.
|
||||
//
|
||||
// last_record_lsn points to the end of last processed WAL record.
|
||||
// It can lag behind last_valid_lsn, if the WAL receiver has received some WAL
|
||||
// after the end of last record, but not the whole next record yet. In the
|
||||
// page cache, we care about last_valid_lsn, but if the WAL receiver needs to
|
||||
// restart the streaming, it needs to restart at the end of last record, so
|
||||
// we track them separately. last_record_lsn should perhaps be in
|
||||
// walreceiver.rs instead of here, but it seems convenient to keep all three
|
||||
// values together.
|
||||
//
|
||||
first_valid_lsn: u64,
|
||||
last_valid_lsn: u64,
|
||||
last_record_lsn: u64,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -96,6 +108,7 @@ fn init_page_cache() -> PageCache
|
||||
relsize_cache: HashMap::new(),
|
||||
first_valid_lsn: 0,
|
||||
last_valid_lsn: 0,
|
||||
last_record_lsn: 0,
|
||||
}),
|
||||
valid_lsn_condvar: Condvar::new(),
|
||||
|
||||
@@ -109,6 +122,7 @@ fn init_page_cache() -> PageCache
|
||||
|
||||
first_valid_lsn: AtomicU64::new(0),
|
||||
last_valid_lsn: AtomicU64::new(0),
|
||||
last_record_lsn: AtomicU64::new(0),
|
||||
}
|
||||
|
||||
}
|
||||
@@ -438,6 +452,25 @@ pub fn advance_last_valid_lsn(lsn: u64)
|
||||
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
//
|
||||
// NOTE: this updates last_valid_lsn as well.
|
||||
//
|
||||
pub fn advance_last_record_lsn(lsn: u64)
|
||||
{
|
||||
let mut shared = PAGECACHE.shared.lock().unwrap();
|
||||
|
||||
// Can't move backwards.
|
||||
assert!(lsn >= shared.last_valid_lsn);
|
||||
assert!(lsn >= shared.last_record_lsn);
|
||||
|
||||
shared.last_valid_lsn = lsn;
|
||||
shared.last_record_lsn = lsn;
|
||||
PAGECACHE.valid_lsn_condvar.notify_all();
|
||||
|
||||
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
//
|
||||
pub fn _advance_first_valid_lsn(lsn: u64)
|
||||
{
|
||||
@@ -460,18 +493,21 @@ pub fn init_valid_lsn(lsn: u64)
|
||||
|
||||
assert!(shared.first_valid_lsn == 0);
|
||||
assert!(shared.last_valid_lsn == 0);
|
||||
assert!(shared.last_record_lsn == 0);
|
||||
|
||||
shared.first_valid_lsn = lsn;
|
||||
shared.last_valid_lsn = lsn;
|
||||
shared.last_record_lsn = lsn;
|
||||
PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed);
|
||||
PAGECACHE.last_record_lsn.store(lsn, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_last_valid_lsn() -> u64
|
||||
pub fn get_last_record_lsn() -> u64
|
||||
{
|
||||
let shared = PAGECACHE.shared.lock().unwrap();
|
||||
|
||||
return shared.last_valid_lsn;
|
||||
return shared.last_record_lsn;
|
||||
}
|
||||
|
||||
//
|
||||
@@ -562,5 +598,6 @@ pub fn get_stats() -> PageCacheStats
|
||||
num_getpage_requests: PAGECACHE.num_getpage_requests.load(Ordering::Relaxed),
|
||||
first_valid_lsn: PAGECACHE.first_valid_lsn.load(Ordering::Relaxed),
|
||||
last_valid_lsn: PAGECACHE.last_valid_lsn.load(Ordering::Relaxed),
|
||||
last_record_lsn: PAGECACHE.last_record_lsn.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,7 +260,10 @@ impl tui::widgets::Widget for MetricsWidget {
|
||||
let lsnrange = format!("{} - {}",
|
||||
format_lsn(page_cache_stats.first_valid_lsn),
|
||||
format_lsn(page_cache_stats.last_valid_lsn));
|
||||
let last_valid_recordlsn_str =
|
||||
format_lsn(page_cache_stats.last_record_lsn);
|
||||
lines.push(get_metric_str("Valid LSN range", &lsnrange));
|
||||
lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str));
|
||||
lines.push(get_metric_u64("# of cache entries", page_cache_stats.num_entries));
|
||||
lines.push(get_metric_u64("# of page images", page_cache_stats.num_page_images));
|
||||
lines.push(get_metric_u64("# of WAL records", page_cache_stats.num_wal_records));
|
||||
|
||||
@@ -138,6 +138,11 @@ impl WalStreamDecoder {
|
||||
// read xl_tot_len FIXME: assumes little-endian
|
||||
self.startlsn = self.lsn;
|
||||
let xl_tot_len = self.inputbuf.get_u32_le();
|
||||
if xl_tot_len < SizeOfXLogRecord {
|
||||
error!("invalid xl_tot_len {} at {:X}/{:X}", xl_tot_len,
|
||||
self.lsn >> 32, self.lsn & 0xffffffff);
|
||||
panic!();
|
||||
}
|
||||
self.lsn += 4;
|
||||
|
||||
self.recordbuf.clear();
|
||||
|
||||
@@ -66,18 +66,26 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
|
||||
//
|
||||
// Start streaming the WAL, from where we left off previously.
|
||||
//
|
||||
let mut startpoint = page_cache::get_last_valid_lsn();
|
||||
let mut startpoint = page_cache::get_last_record_lsn();
|
||||
if startpoint == 0 {
|
||||
// FIXME: Or should we just error out?
|
||||
page_cache::init_valid_lsn(u64::from(_identify_system.xlogpos()));
|
||||
startpoint = u64::from(_identify_system.xlogpos());
|
||||
} else {
|
||||
// There might be some padding after the last full record, skip it.
|
||||
//
|
||||
// FIXME: It probably would be better to always start streaming from the beginning
|
||||
// of the page, or the segment, so that we could check the page/segment headers
|
||||
// too. Just for the sake of paranoia.
|
||||
if startpoint % 8 != 0 {
|
||||
startpoint += 8 - (startpoint % 8);
|
||||
}
|
||||
}
|
||||
let startpoint = tokio_postgres::types::Lsn::from(startpoint);
|
||||
|
||||
debug!("starting replication from {:?}...", startpoint);
|
||||
let mut physical_stream = rclient
|
||||
.start_physical_replication(None, startpoint, None)
|
||||
.await?;
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint));
|
||||
|
||||
while let Some(replication_message) = physical_stream.next().await {
|
||||
@@ -123,7 +131,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
|
||||
|
||||
// Now that this record has been handled, let the page cache know that
|
||||
// it is up-to-date to this LSN
|
||||
page_cache::advance_last_valid_lsn(lsn);
|
||||
page_cache::advance_last_record_lsn(lsn);
|
||||
|
||||
} else {
|
||||
break;
|
||||
|
||||
Reference in New Issue
Block a user