mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pageserver: guard against WAL gaps in the interpreted protocol (#10858)
## Problem The interpreted SK <-> PS protocol does not guard against gaps (neither does the Vanilla one, but that's beside the point). ## Summary of changes Extend the protocol to include the start LSN of the PG WAL section from which the records were interpreted. Validation is enabled via a config flag on the pageserver and works as follows: **Case 1**: `raw_wal_start_lsn` is smaller than the requested LSN There can't be gaps here, but we check that the shard received records which it hasn't seen before. **Case 2**: `raw_wal_start_lsn` is equal to the requested LSN This is the happy case. No gap and nothing to check **Case 3**: `raw_wal_start_lsn` is greater than the requested LSN This is a gap. To make Case 3 work I had to bend the protocol a bit. We read record chunks of WAL which aren't record aligned and feed them to the decoder. The picture below shows a shard which subscribes at a position somewhere within Record 2. We already have a wal reader which is below that position so we wait to catch up. We read some wal in Read 1 (all of Record 1 and some of Record 2). The new shard doesn't need Record 1 (it has already processed it according to the starting position), but we read past it's starting position. When we do Read 2, we decode Record 2 and ship it off to the shard, but the starting position of Read 2 is greater than the starting position the shard requested. This looks like a gap.  To make it work, we extend the protocol to send an empty `InterpretedWalRecords` to shards if the WAL the records originated from ends the requested start position. On the pageserver, that just updates the tracking LSNs in memory (no-op really). This gives us a workaround for the fake gap. As a drive by, make `InterpretedWalRecords::next_record_lsn` mandatory in the application level definition. It's always included. Related: https://github.com/neondatabase/cloud/issues/23935
This commit is contained in:
@@ -122,6 +122,8 @@ pub struct ConfigToml {
|
||||
pub page_service_pipelining: PageServicePipeliningConfig,
|
||||
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
pub enable_read_path_debugging: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub validate_wal_contiguity: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -521,6 +523,7 @@ impl Default for ConfigToml {
|
||||
} else {
|
||||
None
|
||||
},
|
||||
validate_wal_contiguity: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ package interpreted_wal;
|
||||
message InterpretedWalRecords {
|
||||
repeated InterpretedWalRecord records = 1;
|
||||
optional uint64 next_record_lsn = 2;
|
||||
optional uint64 raw_wal_start_lsn = 3;
|
||||
}
|
||||
|
||||
message InterpretedWalRecord {
|
||||
|
||||
@@ -60,7 +60,11 @@ pub struct InterpretedWalRecords {
|
||||
pub records: Vec<InterpretedWalRecord>,
|
||||
// Start LSN of the next record after the batch.
|
||||
// Note that said record may not belong to the current shard.
|
||||
pub next_record_lsn: Option<Lsn>,
|
||||
pub next_record_lsn: Lsn,
|
||||
// Inclusive start LSN of the PG WAL from which the interpreted
|
||||
// WAL records were extracted. Note that this is not necessarily the
|
||||
// start LSN of the first interpreted record in the batch.
|
||||
pub raw_wal_start_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
/// An interpreted Postgres WAL record, ready to be handled by the pageserver
|
||||
|
||||
@@ -167,7 +167,8 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(proto::InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(|l| l.0),
|
||||
next_record_lsn: Some(value.next_record_lsn.0),
|
||||
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -254,7 +255,11 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
|
||||
|
||||
Ok(InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: value.next_record_lsn.map(Lsn::from),
|
||||
next_record_lsn: value
|
||||
.next_record_lsn
|
||||
.map(Lsn::from)
|
||||
.expect("Always provided"),
|
||||
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
|
||||
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
|
||||
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");
|
||||
|
||||
|
||||
@@ -197,6 +197,10 @@ pub struct PageServerConf {
|
||||
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
|
||||
/// files read.
|
||||
pub enable_read_path_debugging: bool,
|
||||
|
||||
/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
|
||||
/// safekeepers does not have gaps.
|
||||
pub validate_wal_contiguity: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -360,6 +364,7 @@ impl PageServerConf {
|
||||
page_service_pipelining,
|
||||
get_vectored_concurrent_io,
|
||||
enable_read_path_debugging,
|
||||
validate_wal_contiguity,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -446,6 +451,7 @@ impl PageServerConf {
|
||||
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
|
||||
no_sync: no_sync.unwrap_or(false),
|
||||
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
|
||||
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -2874,6 +2874,7 @@ impl Timeline {
|
||||
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
availability_zone: self.conf.availability_zone.clone(),
|
||||
ingest_batch_size: self.conf.ingest_batch_size,
|
||||
validate_wal_contiguity: self.conf.validate_wal_contiguity,
|
||||
},
|
||||
broker_client,
|
||||
ctx,
|
||||
|
||||
@@ -56,6 +56,7 @@ pub struct WalReceiverConf {
|
||||
pub auth_token: Option<Arc<String>>,
|
||||
pub availability_zone: Option<String>,
|
||||
pub ingest_batch_size: u64,
|
||||
pub validate_wal_contiguity: bool,
|
||||
}
|
||||
|
||||
pub struct WalReceiver {
|
||||
|
||||
@@ -537,6 +537,7 @@ impl ConnectionManagerState {
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
let protocol = self.conf.protocol;
|
||||
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
let ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
@@ -558,6 +559,7 @@ impl ConnectionManagerState {
|
||||
ctx,
|
||||
node_id,
|
||||
ingest_batch_size,
|
||||
validate_wal_contiguity,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1563,6 +1565,7 @@ mod tests {
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
ingest_batch_size: 1,
|
||||
validate_wal_contiguity: false,
|
||||
},
|
||||
wal_connection: None,
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
|
||||
@@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
ctx: RequestContext,
|
||||
safekeeper_node: NodeId,
|
||||
ingest_batch_size: u64,
|
||||
validate_wal_contiguity: bool,
|
||||
) -> Result<(), WalReceiverError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
} => Some((format, compression)),
|
||||
};
|
||||
|
||||
let mut expected_wal_start = startpoint;
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
@@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
)
|
||||
})?;
|
||||
|
||||
// Guard against WAL gaps. If the start LSN of the PG WAL section
|
||||
// from which the interpreted records were extracted, doesn't match
|
||||
// the end of the previous batch (or the starting point for the first batch),
|
||||
// then kill this WAL receiver connection and start a new one.
|
||||
if validate_wal_contiguity {
|
||||
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
|
||||
match raw_wal_start_lsn.cmp(&expected_wal_start) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
let msg = format!(
|
||||
"Gap in streamed WAL: [{}, {})",
|
||||
expected_wal_start, raw_wal_start_lsn
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
std::cmp::Ordering::Less => {
|
||||
// Other shards are reading WAL behind us.
|
||||
// This is valid, but check that we received records
|
||||
// that we haven't seen before.
|
||||
if let Some(first_rec) = batch.records.first() {
|
||||
if first_rec.next_record_lsn < last_rec_lsn {
|
||||
let msg = format!(
|
||||
"Received record with next_record_lsn multiple times ({} < {})",
|
||||
first_rec.next_record_lsn, expected_wal_start
|
||||
);
|
||||
critical!("{msg}");
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
}
|
||||
}
|
||||
}
|
||||
std::cmp::Ordering::Equal => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn,
|
||||
raw_wal_start_lsn: _,
|
||||
} = batch;
|
||||
|
||||
tracing::debug!(
|
||||
"Received WAL up to {} with next_record_lsn={:?}",
|
||||
"Received WAL up to {} with next_record_lsn={}",
|
||||
streaming_lsn,
|
||||
next_record_lsn
|
||||
);
|
||||
@@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match next_record_lsn {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
|
||||
modification.set_lsn(next_record_lsn).unwrap();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
@@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
if let Some(lsn) = next_record_lsn {
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
last_rec_lsn = next_record_lsn;
|
||||
expected_wal_start = streaming_lsn;
|
||||
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
@@ -295,6 +295,10 @@ impl InterpretedWalReader {
|
||||
|
||||
let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
|
||||
|
||||
// Tracks the start of the PG WAL LSN from which the current batch of
|
||||
// interpreted records originated.
|
||||
let mut current_batch_wal_start_lsn: Option<Lsn> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Main branch for reading WAL and forwarding it
|
||||
@@ -302,7 +306,7 @@ impl InterpretedWalReader {
|
||||
let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
|
||||
let WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: _,
|
||||
wal_start_lsn,
|
||||
wal_end_lsn,
|
||||
available_wal_end_lsn,
|
||||
} = match wal {
|
||||
@@ -315,6 +319,12 @@ impl InterpretedWalReader {
|
||||
}
|
||||
};
|
||||
|
||||
// We will already have a value if the previous chunks of WAL
|
||||
// did not decode into anything useful.
|
||||
if current_batch_wal_start_lsn.is_none() {
|
||||
current_batch_wal_start_lsn = Some(wal_start_lsn);
|
||||
}
|
||||
|
||||
wal_decoder.feed_bytes(&wal);
|
||||
|
||||
// Deserialize and interpret WAL records from this batch of WAL.
|
||||
@@ -363,7 +373,9 @@ impl InterpretedWalReader {
|
||||
|
||||
let max_next_record_lsn = match max_next_record_lsn {
|
||||
Some(lsn) => lsn,
|
||||
None => { continue; }
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Update the current position such that new receivers can decide
|
||||
@@ -377,21 +389,38 @@ impl InterpretedWalReader {
|
||||
}
|
||||
}
|
||||
|
||||
let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();
|
||||
|
||||
// Send interpreted records downstream. Anything that has already been seen
|
||||
// by a shard is filtered out.
|
||||
let mut shard_senders_to_remove = Vec::new();
|
||||
for (shard, states) in &mut self.shard_senders {
|
||||
for state in states {
|
||||
if max_next_record_lsn <= state.next_record_lsn {
|
||||
continue;
|
||||
}
|
||||
|
||||
let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
|
||||
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
|
||||
|
||||
let batch = InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: Some(max_next_record_lsn),
|
||||
let batch = if max_next_record_lsn > state.next_record_lsn {
|
||||
// This batch contains at least one record that this shard has not
|
||||
// seen yet.
|
||||
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
|
||||
|
||||
InterpretedWalRecords {
|
||||
records,
|
||||
next_record_lsn: max_next_record_lsn,
|
||||
raw_wal_start_lsn: Some(batch_wal_start_lsn),
|
||||
}
|
||||
} else if wal_end_lsn > state.next_record_lsn {
|
||||
// All the records in this batch were seen by the shard
|
||||
// However, the batch maps to a chunk of WAL that the
|
||||
// shard has not yet seen. Notify it of the start LSN
|
||||
// of the PG WAL chunk such that it doesn't look like a gap.
|
||||
InterpretedWalRecords {
|
||||
records: Vec::default(),
|
||||
next_record_lsn: state.next_record_lsn,
|
||||
raw_wal_start_lsn: Some(batch_wal_start_lsn),
|
||||
}
|
||||
} else {
|
||||
// The shard has seen this chunk of WAL before. Skip it.
|
||||
continue;
|
||||
};
|
||||
|
||||
let res = state.tx.send(Batch {
|
||||
@@ -403,7 +432,7 @@ impl InterpretedWalReader {
|
||||
if res.is_err() {
|
||||
shard_senders_to_remove.push(shard_sender_id);
|
||||
} else {
|
||||
state.next_record_lsn = max_next_record_lsn;
|
||||
state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1167,15 +1167,15 @@ class NeonEnv:
|
||||
"max_batch_size": 32,
|
||||
}
|
||||
|
||||
# Concurrent IO (https://github.com/neondatabase/neon/issues/9378):
|
||||
# enable concurrent IO by default in tests and benchmarks.
|
||||
# Compat tests are exempt because old versions fail to parse the new config.
|
||||
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
|
||||
if config.test_may_use_compatibility_snapshot_binaries:
|
||||
log.info(
|
||||
"Forcing use of binary-built-in default to avoid forward-compatibility related test failures"
|
||||
"Skipping WAL contiguity validation to avoid forward-compatibility related test failures"
|
||||
)
|
||||
get_vectored_concurrent_io = None
|
||||
else:
|
||||
# Look for gaps in WAL received from safekeepeers
|
||||
ps_cfg["validate_wal_contiguity"] = True
|
||||
|
||||
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
|
||||
if get_vectored_concurrent_io is not None:
|
||||
ps_cfg["get_vectored_concurrent_io"] = {
|
||||
"mode": self.pageserver_get_vectored_concurrent_io,
|
||||
|
||||
Reference in New Issue
Block a user