Compare commits

...

1 Commits

Author SHA1 Message Date
Erik Grinaker
444d812220 safekeeper: add InterpretedWalRecord protocol support 2024-11-04 13:25:36 +01:00
3 changed files with 78 additions and 39 deletions

View File

@@ -562,6 +562,7 @@ pub enum BeMessage<'a> {
options: &'a [&'a str],
},
KeepAlive(WalSndKeepAlive),
NeonInterpretedWalRecord(&'a [u8]), // TODO: use appropriate fields
}
/// Common shorthands.
@@ -996,6 +997,17 @@ impl BeMessage<'_> {
Ok(())
})?
}
// Neon extension: send interpreted WAL records to relevant pageservers. This is
// temporary until we move to a different protocol for Safekeeper->Pageserver WAL
// (possibly gRPC).
BeMessage::NeonInterpretedWalRecord(data) => {
buf.put_u8(b'z'); // arbitrary unused value
write_body(buf, |buf| {
buf.put_u64(data.len() as u64);
buf.put_slice(data);
})
}
}
Ok(())
}

View File

@@ -46,10 +46,16 @@ pub struct SafekeeperPostgresHandler {
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush,
StartReplication { start_lsn: Lsn, term: Option<Term> },
StartReplication {
start_lsn: Lsn,
term: Option<Term>,
interpret_wal: bool,
},
IdentifySystem,
TimelineStatus,
JSONCtrl { cmd: AppendLogicalMessage },
JSONCtrl {
cmd: AppendLogicalMessage,
},
}
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
@@ -58,7 +64,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
} else if cmd.starts_with("START_REPLICATION") {
let re = Regex::new(
// We follow postgres START_REPLICATION LOGICAL options to pass term.
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?( interpret_wal)",
)
.unwrap();
let caps = re
@@ -71,7 +77,12 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
} else {
None
};
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
let interpret_wal = caps.get(3).is_some();
Ok(SafekeeperPostgresCommand::StartReplication {
start_lsn,
term,
interpret_wal,
})
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
Ok(SafekeeperPostgresCommand::IdentifySystem)
} else if cmd.starts_with("TIMELINE_STATUS") {
@@ -230,8 +241,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
SafekeeperPostgresCommand::StartReplication {
start_lsn,
term,
interpret_wal,
} => {
self.handle_start_replication(pgb, start_lsn, term, interpret_wal)
.instrument(info_span!("WAL sender"))
.await
}

View File

@@ -380,17 +380,21 @@ impl SafekeeperPostgresHandler {
/// Wrapper around handle_start_replication_guts handling result. Error is
/// handled here while we're still in walsender ttid span; with API
/// extension, this can probably be moved into postgres_backend.
///
/// If interpret_wal is true, change the protocol to send custom Neon InterpretedWalRecord
/// instead of XLogData, for ingestion by Pageservers.
pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
interpret_wal: bool,
) -> Result<(), QueryError> {
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
let residence_guard = tli.wal_residence_guard().await?;
if let Err(end) = self
.handle_start_replication_guts(pgb, start_pos, term, residence_guard)
.handle_start_replication_guts(pgb, start_pos, term, interpret_wal, residence_guard)
.await
{
let info = tli.get_safekeeper_info(&self.conf).await;
@@ -407,6 +411,7 @@ impl SafekeeperPostgresHandler {
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
interpret_wal: bool,
tli: WalResidentTimeline,
) -> Result<(), CopyStreamHandlerEnd> {
let appname = self.appname.clone();
@@ -464,6 +469,7 @@ impl SafekeeperPostgresHandler {
start_pos,
end_pos,
term,
interpret_wal,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
@@ -543,6 +549,8 @@ struct WalSender<'a, IO> {
/// in. Streaming is stopped if local term changes to a different (higher)
/// value.
term: Option<Term>,
/// If true, decode and filter WAL records and send InterpretedWalRecord instead of XLogRecord.
interpret_wal: bool,
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
end_watch: EndWatch,
ws_guard: Arc<WalSenderGuard>,
@@ -571,45 +579,49 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= self.end_pos {
chunk_end_pos = self.end_pos;
let (msg, send_size) = if self.interpret_wal {
(BeMessage::NeonInterpretedWalRecord(&[]), 0) // TODO
} else {
// If sending not up to end pos, round down to page boundary to
// avoid breaking WAL record not at page boundary, as protocol
// demands. See walsender.c (XLogSendPhysical).
chunk_end_pos = chunk_end_pos
.checked_sub(chunk_end_pos.block_offset())
.unwrap();
}
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is
// still the expected one.
let _term_guard = if let Some(t) = self.term {
Some(self.tli.acquire_term(t).await?)
// try to send as much as available, capped by MAX_SEND_SIZE
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
// if we went behind available WAL, back off
if chunk_end_pos >= self.end_pos {
chunk_end_pos = self.end_pos;
} else {
None
// If sending not up to end pos, round down to page boundary to
// avoid breaking WAL record not at page boundary, as protocol
// demands. See walsender.c (XLogSendPhysical).
chunk_end_pos = chunk_end_pos
.checked_sub(chunk_end_pos.block_offset())
.unwrap();
}
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is
// still the expected one.
let _term_guard = if let Some(t) = self.term {
Some(self.tli.acquire_term(t).await?)
} else {
None
};
// Read WAL into buffer. send_size can be additionally capped to
// segment boundary here.
send_size = self.wal_reader.read(send_buf).await?
};
// Read WAL into buffer. send_size can be additionally capped to
// segment boundary here.
send_size = self.wal_reader.read(send_buf).await?
};
let send_buf = &send_buf[..send_size];
// and send it
self.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
let send_buf = &send_buf[..send_size];
let msg = BeMessage::XLogData(XLogDataBody {
wal_start: self.start_pos.0,
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
data: send_buf,
}))
.await?;
});
(msg, send_size)
};
// and send it
self.pgb.write_message(&msg).await?;
if let Some(appname) = &self.appname {
if appname == "replica" {