pageserer: refactor logical message record

This commit is contained in:
Vlad Lazar
2024-10-19 14:29:04 +02:00
parent bdd8a5be6a
commit e2d008bd21

View File

@@ -212,6 +212,10 @@ struct XlogRecord {
buf: Bytes,
}
struct LogicalMessageRecord {
buf: Bytes,
prefix_size: usize,
}
impl WalIngest {
pub async fn new(
timeline: &Timeline,
@@ -405,21 +409,11 @@ impl WalIngest {
.await?;
}
pg_constants::RM_LOGICALMSG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
let xlrec = crate::walrecord::XlLogicalMessage::decode(&mut buf);
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
let message = &buf[xlrec.prefix_size..xlrec.prefix_size + xlrec.message_size];
if prefix == "neon-test" {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
}
let maybe_logical_message_record =
Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap();
if let Some(logical_message_record) = maybe_logical_message_record {
self.ingest_logical_message_record(logical_message_record, modification, ctx)
.await?;
}
}
pg_constants::RM_STANDBY_ID => {
@@ -2254,6 +2248,48 @@ impl WalIngest {
}))
}
async fn ingest_logical_message_record(
&mut self,
record: LogicalMessageRecord,
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
let LogicalMessageRecord { buf, prefix_size } = record;
let prefix = std::str::from_utf8(&buf[0..prefix_size - 1])?;
let message = &buf[prefix_size..];
if prefix == "neon-test" {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
}
Ok(())
}
fn decode_logical_message_record(
buf: &mut Bytes,
decoded: &DecodedWALRecord,
_pg_version: u32,
) -> anyhow::Result<Option<LogicalMessageRecord>> {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
let xlrec = crate::walrecord::XlLogicalMessage::decode(buf);
let buf_size = xlrec.prefix_size + xlrec.message_size;
// TODO change decode function interface to take ownership of buf
let buf = Bytes::copy_from_slice(&buf[..buf_size]);
return Ok(Some(LogicalMessageRecord {
buf,
prefix_size: xlrec.prefix_size,
}));
}
Ok(None)
}
async fn put_rel_creation(
&mut self,
modification: &mut DatadirModification<'_>,