mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
safekeeper: add InterpretedWalRecord protocol support
This commit is contained in:
@@ -562,6 +562,7 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
NeonInterpretedWalRecord(&'a [u8]), // TODO: use appropriate fields
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -996,6 +997,17 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
// Neon extension: send interpreted WAL records to relevant pageservers. This is
|
||||
// temporary until we move to a different protocol for Safekeeper->Pageserver WAL
|
||||
// (possibly gRPC).
|
||||
BeMessage::NeonInterpretedWalRecord(data) => {
|
||||
buf.put_u8(b'z'); // arbitrary unused value
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u64(data.len() as u64);
|
||||
buf.put_slice(data);
|
||||
})
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,10 +46,16 @@ pub struct SafekeeperPostgresHandler {
|
||||
/// Parsed Postgres command.
|
||||
enum SafekeeperPostgresCommand {
|
||||
StartWalPush,
|
||||
StartReplication { start_lsn: Lsn, term: Option<Term> },
|
||||
StartReplication {
|
||||
start_lsn: Lsn,
|
||||
term: Option<Term>,
|
||||
interpret_wal: bool,
|
||||
},
|
||||
IdentifySystem,
|
||||
TimelineStatus,
|
||||
JSONCtrl { cmd: AppendLogicalMessage },
|
||||
JSONCtrl {
|
||||
cmd: AppendLogicalMessage,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
@@ -58,7 +64,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
} else if cmd.starts_with("START_REPLICATION") {
|
||||
let re = Regex::new(
|
||||
// We follow postgres START_REPLICATION LOGICAL options to pass term.
|
||||
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
|
||||
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?( interpret_wal)",
|
||||
)
|
||||
.unwrap();
|
||||
let caps = re
|
||||
@@ -71,7 +77,12 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
|
||||
let interpret_wal = caps.get(3).is_some();
|
||||
Ok(SafekeeperPostgresCommand::StartReplication {
|
||||
start_lsn,
|
||||
term,
|
||||
interpret_wal,
|
||||
})
|
||||
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
|
||||
Ok(SafekeeperPostgresCommand::IdentifySystem)
|
||||
} else if cmd.starts_with("TIMELINE_STATUS") {
|
||||
@@ -230,8 +241,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
.instrument(info_span!("WAL receiver"))
|
||||
.await
|
||||
}
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
|
||||
self.handle_start_replication(pgb, start_lsn, term)
|
||||
SafekeeperPostgresCommand::StartReplication {
|
||||
start_lsn,
|
||||
term,
|
||||
interpret_wal,
|
||||
} => {
|
||||
self.handle_start_replication(pgb, start_lsn, term, interpret_wal)
|
||||
.instrument(info_span!("WAL sender"))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -380,17 +380,21 @@ impl SafekeeperPostgresHandler {
|
||||
/// Wrapper around handle_start_replication_guts handling result. Error is
|
||||
/// handled here while we're still in walsender ttid span; with API
|
||||
/// extension, this can probably be moved into postgres_backend.
|
||||
///
|
||||
/// If interpret_wal is true, change the protocol to send custom Neon InterpretedWalRecord
|
||||
/// instead of XLogData, for ingestion by Pageservers.
|
||||
pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
interpret_wal: bool,
|
||||
) -> Result<(), QueryError> {
|
||||
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
|
||||
let residence_guard = tli.wal_residence_guard().await?;
|
||||
|
||||
if let Err(end) = self
|
||||
.handle_start_replication_guts(pgb, start_pos, term, residence_guard)
|
||||
.handle_start_replication_guts(pgb, start_pos, term, interpret_wal, residence_guard)
|
||||
.await
|
||||
{
|
||||
let info = tli.get_safekeeper_info(&self.conf).await;
|
||||
@@ -407,6 +411,7 @@ impl SafekeeperPostgresHandler {
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
start_pos: Lsn,
|
||||
term: Option<Term>,
|
||||
interpret_wal: bool,
|
||||
tli: WalResidentTimeline,
|
||||
) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let appname = self.appname.clone();
|
||||
@@ -464,6 +469,7 @@ impl SafekeeperPostgresHandler {
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
interpret_wal,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
@@ -543,6 +549,8 @@ struct WalSender<'a, IO> {
|
||||
/// in. Streaming is stopped if local term changes to a different (higher)
|
||||
/// value.
|
||||
term: Option<Term>,
|
||||
/// If true, decode and filter WAL records and send InterpretedWalRecord instead of XLogRecord.
|
||||
interpret_wal: bool,
|
||||
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
|
||||
end_watch: EndWatch,
|
||||
ws_guard: Arc<WalSenderGuard>,
|
||||
@@ -571,45 +579,49 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= self.end_pos {
|
||||
chunk_end_pos = self.end_pos;
|
||||
let (msg, send_size) = if self.interpret_wal {
|
||||
(BeMessage::NeonInterpretedWalRecord(&[]), 0) // TODO
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = self.term {
|
||||
Some(self.tli.acquire_term(t).await?)
|
||||
// try to send as much as available, capped by MAX_SEND_SIZE
|
||||
let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= self.end_pos {
|
||||
chunk_end_pos = self.end_pos;
|
||||
} else {
|
||||
None
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
|
||||
let send_buf = &mut self.send_buf[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = self.term {
|
||||
Some(self.tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = self.wal_reader.read(send_buf).await?
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = self.wal_reader.read(send_buf).await?
|
||||
};
|
||||
let send_buf = &send_buf[..send_size];
|
||||
|
||||
// and send it
|
||||
self.pgb
|
||||
.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
let send_buf = &send_buf[..send_size];
|
||||
let msg = BeMessage::XLogData(XLogDataBody {
|
||||
wal_start: self.start_pos.0,
|
||||
wal_end: self.end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: send_buf,
|
||||
}))
|
||||
.await?;
|
||||
});
|
||||
(msg, send_size)
|
||||
};
|
||||
|
||||
// and send it
|
||||
self.pgb.write_message(&msg).await?;
|
||||
|
||||
if let Some(appname) = &self.appname {
|
||||
if appname == "replica" {
|
||||
|
||||
Reference in New Issue
Block a user