review: parse logical message prefix on decode

This commit is contained in:
Vlad Lazar
2024-10-23 19:01:32 +02:00
parent 31434f1cb1
commit de5ecdc6b8

View File

@@ -218,11 +218,13 @@ struct RawXlogRecord {
enum LogicalMessageRecord {
Put(PutLogicalMessage),
#[cfg(feature = "testing")]
Failpoint,
}
struct PutLogicalMessage {
path: String,
buf: Bytes,
prefix_size: usize,
}
enum StandbyRecord {
@@ -454,6 +456,16 @@ impl WalIngest {
self.ingest_logical_message_put(put, modification, ctx)
.await?;
}
#[cfg(feature = "testing")]
LogicalMessageRecord::Failpoint => {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
failpoint_support::sleep_millis_async!(
"wal-ingest-logical-message-sleep"
);
}
}
}
}
@@ -2183,21 +2195,8 @@ impl WalIngest {
modification: &mut DatadirModification<'_>,
ctx: &RequestContext,
) -> Result<()> {
let PutLogicalMessage { buf, prefix_size } = put;
let prefix = std::str::from_utf8(&buf[0..prefix_size - 1])?;
let message = &buf[prefix_size..];
if prefix == "neon-test" {
// This is a convenient way to make the WAL ingestion pause at
// particular point in the WAL. For more fine-grained control,
// we could peek into the message and only pause if it contains
// a particular string, for example, but this is enough for now.
failpoint_support::sleep_millis_async!("wal-ingest-logical-message-sleep");
} else if let Some(path) = prefix.strip_prefix("neon-file:") {
modification.put_file(path, message, ctx).await?;
}
Ok(())
let PutLogicalMessage { path, buf } = put;
modification.put_file(path.as_str(), &buf, ctx).await
}
fn decode_logical_message_record(
@@ -2208,13 +2207,21 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_LOGICAL_MESSAGE {
let xlrec = crate::walrecord::XlLogicalMessage::decode(buf);
let buf_size = xlrec.prefix_size + xlrec.message_size;
// TODO change decode function interface to take ownership of buf
let buf = Bytes::copy_from_slice(&buf[..buf_size]);
return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage {
buf,
prefix_size: xlrec.prefix_size,
})));
let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?;
#[cfg(feature = "testing")]
if prefix == "neon-test" {
return Ok(Some(LogicalMessageRecord::Failpoint));
}
if let Some(path) = prefix.strip_prefix("neon-file:") {
let buf_size = xlrec.prefix_size + xlrec.message_size;
let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]);
return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage {
path: path.to_string(),
buf,
})));
}
}
Ok(None)