From e2d008bd21d94bfddbb53a7b633984b4cbbce68d Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Sat, 19 Oct 2024 14:29:04 +0200 Subject: [PATCH] pageserer: refactor logical message record --- pageserver/src/walingest.rs | 66 ++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index bfc7b5e9e5..60345f170a 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -212,6 +212,10 @@ struct XlogRecord { buf: Bytes, } +struct LogicalMessageRecord { + buf: Bytes, + prefix_size: usize, +} impl WalIngest { pub async fn new( timeline: &Timeline, @@ -405,21 +409,11 @@ impl WalIngest { .await?; } pg_constants::RM_LOGICALMSG_ID => { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf); - let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; - let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size]; - if prefix == "neon-test" { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); - } else if let Some(path) = prefix.strip_prefix("neon-file:") { - modification.put_file(path, message, ctx).await?; - } + let maybe_logical_message_record = + Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap(); + if let Some(logical_message_record) = maybe_logical_message_record { + self.ingest_logical_message_record(logical_message_record, modification, ctx) + .await?; } } pg_constants::RM_STANDBY_ID => { @@ -2254,6 +2248,48 @@ impl WalIngest { })) } + async fn ingest_logical_message_record( + &mut self, + record: LogicalMessageRecord, + modification: &mut DatadirModification<'_>, + ctx: &RequestContext, + ) -> Result<()> { + let LogicalMessageRecord { buf, prefix_size } = record; + + let prefix = std::str::from_utf8(&buf[0..prefix_size - 1])?; + let message = &buf[prefix_size..]; + if prefix == "neon-test" { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep"); + } else if let Some(path) = prefix.strip_prefix("neon-file:") { + modification.put_file(path, message, ctx).await?; + } + + Ok(()) + } + + fn decode_logical_message_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + _pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_LOGICAL_MESSAGE { + let xlrec = crate::walrecord::XlLogicalMessage::decode(buf); + let buf_size = xlrec.prefix_size + xlrec.message_size; + // TODO change decode function interface to take ownership of buf + let buf = Bytes::copy_from_slice(&buf[..buf_size]); + return Ok(Some(LogicalMessageRecord { + buf, + prefix_size: xlrec.prefix_size, + })); + } + + Ok(None) + } async fn put_rel_creation( &mut self, modification: &mut DatadirModification<'_>,