diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index aaa79f7c27..1c08d49d8b 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -218,11 +218,13 @@ struct RawXlogRecord { enum LogicalMessageRecord { Put(PutLogicalMessage), + #[cfg(feature = "testing")] + Failpoint, } struct PutLogicalMessage { + path: String, buf: Bytes, - prefix_size: usize, } enum StandbyRecord { @@ -454,6 +456,16 @@ impl WalIngest { self.ingest_logical_message_put(put, modification, ctx) .await?; } + #[cfg(feature = "testing")] + LogicalMessageRecord::Failpoint => { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + failpoint_support::sleep_millis_async!( + "wal-ingest-logical-message-sleep" + ); + } } } } @@ -2183,21 +2195,8 @@ impl WalIngest { modification: &mut DatadirModification<'_>, ctx: &RequestContext, ) -> Result<()> { - let PutLogicalMessage { buf, prefix_size } = put; - - let prefix = std::str::from_utf8(&buf[0..prefix_size - 1])?; - let message = &buf[prefix_size..]; - if prefix == "neon-test" { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); - } else if let Some(path) = prefix.strip_prefix("neon-file:") { - modification.put_file(path, message, ctx).await?; - } - - Ok(()) + let PutLogicalMessage { path, buf } = put; + modification.put_file(path.as_str(), &buf, ctx).await } fn decode_logical_message_record( @@ -2208,13 +2207,21 @@ impl WalIngest { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_LOGICAL_MESSAGE { let xlrec = crate::walrecord::XlLogicalMessage::decode(buf); - let buf_size = xlrec.prefix_size + xlrec.message_size; - // TODO change decode function interface to take ownership of buf - let buf = Bytes::copy_from_slice(&buf[..buf_size]); - return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage { - buf, - prefix_size: xlrec.prefix_size, - }))); + let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; + + #[cfg(feature = "testing")] + if prefix == "neon-test" { + return Ok(Some(LogicalMessageRecord::Failpoint)); + } + + if let Some(path) = prefix.strip_prefix("neon-file:") { + let buf_size = xlrec.prefix_size + xlrec.message_size; + let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); + return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage { + path: path.to_string(), + buf, + }))); + } } Ok(None)