mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 19:42:55 +00:00
Advance "last valid LSN", even if it's not at WAL record boundary.
The GetPage@LSN requests used last flushed WAL position as the request LSN, but the last flushed WAL position might point in the middle of a WAL record (most likely at a page boundary). But we used to only update the "last valid LSN" after fully decoding a record. As a result, this could happen: 1. Postgres generates two WAL record. They span from 0/10000 to 0/20000, and from 0/20000 to 0/30000. 2. Postgres flushes the WAL to 0/25000. 3. Page server receives the WAL up to 0/25000. It decodes the first WAL record and advances the last valid LSN to the end of that record, 0/20000 3. Postgres issues a GetPage@LSN request, using 0/15000 as the request LSN. 4. The GetPage@LSN request is stuck in the page server, because last valid LSN is 0/10000, and the request LSN is 0/15000. This situation gets unwedged when something kicks a new WAL flush in the Postgres server, like a new transaction. But that can take a long time. Fix by updating the last valid LSN to the last received LSN, even if it points in the middle of a record.
This commit is contained in:
committed by
Stas Kelvich
parent
52e754f301
commit
bfb20522eb
@@ -210,10 +210,12 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
|
||||
let entry_rc: Arc<CacheEntry>;
|
||||
{
|
||||
let mut shared = PAGECACHE.shared.lock().unwrap();
|
||||
let mut waited = false;
|
||||
|
||||
while lsn > shared.last_valid_lsn {
|
||||
// TODO: Wait for the WAL receiver to catch up
|
||||
debug!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn);
|
||||
waited = true;
|
||||
trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn);
|
||||
let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap();
|
||||
|
||||
shared = wait_result.0;
|
||||
@@ -221,6 +223,10 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>
|
||||
return Err(format!("Timed out while waiting for WAL record at LSN {} to arrive", lsn))?;
|
||||
}
|
||||
}
|
||||
if waited {
|
||||
trace!("caught up now, continuing");
|
||||
}
|
||||
|
||||
if lsn < shared.first_valid_lsn {
|
||||
return Err(format!("LSN {} has already been removed", lsn))?;
|
||||
}
|
||||
|
||||
@@ -84,11 +84,15 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
|
||||
match replication_message? {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
|
||||
trace!("received XLogData , lsn: {}", xlog_data.wal_start());
|
||||
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
waldecoder.feed_bytes(xlog_data.data());
|
||||
let data = xlog_data.data();
|
||||
let startlsn = xlog_data.wal_start();
|
||||
let endlsn = startlsn + data.len() as u64;
|
||||
|
||||
trace!("received XLogData between {} and {}", startlsn, endlsn);
|
||||
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
loop {
|
||||
if let Some((startlsn, endlsn, recdata)) = waldecoder.poll_decode() {
|
||||
@@ -125,7 +129,16 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Update the last_valid LSN value in the page cache one more time. We updated
|
||||
// it in the loop above, between each WAL record, but we might have received
|
||||
// a partial record after the last completed record. Our page cache's value
|
||||
// better reflect that, because GetPage@LSN requests might also point in the
|
||||
// middle of a record, if the request LSN was taken from the server's current
|
||||
// flush ptr.
|
||||
page_cache::advance_last_valid_lsn(endlsn);
|
||||
}
|
||||
|
||||
ReplicationMessage::PrimaryKeepAlive(_keepalive) => {
|
||||
trace!("received PrimaryKeepAlive");
|
||||
// FIXME: Reply, or the connection will time out
|
||||
|
||||
Reference in New Issue
Block a user