safekeeper: check for non-consecutive writes in safekeeper.rs

wal_storage.rs already checks this, but since this is a quite legit scenario
check it at safekeeper.rs (consensus level) as well.

ref https://github.com/neondatabase/neon/issues/8212

This is a take 2; previous PR #8640 had been reverted because interplay
with another change broke test_last_log_term_switch.
This commit is contained in:
Arseny Sher
2024-08-07 19:26:06 +03:00
committed by Arseny Sher
parent 83dd7f559c
commit c7187be8a1
3 changed files with 113 additions and 23 deletions

View File

@@ -875,6 +875,29 @@ where
return Ok(Some(AcceptorProposerMessage::AppendResponse(resp)));
}
// Disallow any non-sequential writes, which can result in gaps or
// overwrites. If we need to move the pointer, ProposerElected message
// should have truncated WAL first accordingly. Note that the first
// condition (WAL rewrite) is quite expected in real world; it happens
// when walproposer reconnects to safekeeper and writes some more data
// while first connection still gets some packets later. It might be
// better to not log this as error! above.
let write_lsn = self.wal_store.write_lsn();
if write_lsn > msg.h.begin_lsn {
bail!(
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
write_lsn,
msg.h.begin_lsn
);
}
if write_lsn < msg.h.begin_lsn && write_lsn != Lsn(0) {
bail!(
"append request creates gap in written WAL, write_lsn={}, msg lsn={}",
write_lsn,
msg.h.begin_lsn,
);
}
// Now we know that we are in the same term as the proposer,
// processing the message.
@@ -960,10 +983,7 @@ mod tests {
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
use super::*;
use crate::{
state::{EvictionState, PersistedPeers, TimelinePersistentState},
wal_storage::Storage,
};
use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};
use std::{ops::Deref, str::FromStr, time::Instant};
// fake storage for tests
@@ -1003,6 +1023,10 @@ mod tests {
}
impl wal_storage::Storage for DummyWalStore {
fn write_lsn(&self) -> Lsn {
self.lsn
}
fn flush_lsn(&self) -> Lsn {
self.lsn
}
@@ -1076,7 +1100,7 @@ mod tests {
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
term: 1,
term: 2,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
@@ -1090,24 +1114,29 @@ mod tests {
};
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(3),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(3),
}]),
timeline_start_lsn: Lsn(0),
term: 2,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![
TermLsn {
term: 1,
lsn: Lsn(1),
},
TermLsn {
term: 2,
lsn: Lsn(3),
},
]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
// check that AppendRequest before term_start_lsn doesn't switch last_log_term.
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok());
assert_eq!(sk.get_last_log_term(), 0);
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
assert_eq!(sk.get_last_log_term(), 1);
// but record at term_start_lsn does the switch
ar_hdr.begin_lsn = Lsn(2);
@@ -1116,12 +1145,63 @@ mod tests {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
let resp = sk
.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await;
assert!(resp.is_ok());
sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %)
assert_eq!(sk.get_last_log_term(), 1);
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
assert_eq!(sk.get_last_log_term(), 2);
}
#[tokio::test]
async fn test_non_consecutive_write() {
let storage = InMemoryState {
persisted_state: test_sk_state(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let mut sk = SafeKeeper::new(TimelineState::new(storage), wal_store, NodeId(0)).unwrap();
let pem = ProposerElected {
term: 1,
start_streaming_at: Lsn(1),
term_history: TermHistory(vec![TermLsn {
term: 1,
lsn: Lsn(1),
}]),
timeline_start_lsn: Lsn(1),
};
sk.process_msg(&ProposerAcceptorMessage::Elected(pem))
.await
.unwrap();
let ar_hdr = AppendRequestHeader {
term: 1,
term_start_lsn: Lsn(3),
begin_lsn: Lsn(1),
end_lsn: Lsn(2),
commit_lsn: Lsn(0),
truncate_lsn: Lsn(0),
proposer_uuid: [0; 16],
};
let append_request = AppendRequest {
h: ar_hdr.clone(),
wal_data: Bytes::from_static(b"b"),
};
// do write ending at 2, it should be ok
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap();
let mut ar_hrd2 = ar_hdr.clone();
ar_hrd2.begin_lsn = Lsn(4);
ar_hrd2.end_lsn = Lsn(5);
let append_request = AppendRequest {
h: ar_hdr,
wal_data: Bytes::from_static(b"b"),
};
// and now starting at 4, it must fail
sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request))
.await
.unwrap_err();
}
#[test]

View File

@@ -37,6 +37,8 @@ use pq_proto::SystemId;
use utils::{id::TenantTimelineId, lsn::Lsn};
pub trait Storage {
// Last written LSN.
fn write_lsn(&self) -> Lsn;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
@@ -329,6 +331,10 @@ impl PhysicalStorage {
}
impl Storage for PhysicalStorage {
// Last written LSN.
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn

View File

@@ -175,6 +175,10 @@ impl DiskWALStorage {
}
impl wal_storage::Storage for DiskWALStorage {
// Last written LSN.
fn write_lsn(&self) -> Lsn {
self.write_lsn
}
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn