diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 9ffaaba584..d569119a4d 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -562,6 +562,7 @@ pub enum BeMessage<'a> { options: &'a [&'a str], }, KeepAlive(WalSndKeepAlive), + NeonInterpretedWalRecord(&'a [u8]), // TODO: use appropriate fields } /// Common shorthands. @@ -996,6 +997,17 @@ impl BeMessage<'_> { Ok(()) })? } + + // Neon extension: send interpreted WAL records to relevant pageservers. This is + // temporary until we move to a different protocol for Safekeeper->Pageserver WAL + // (possibly gRPC). + BeMessage::NeonInterpretedWalRecord(data) => { + buf.put_u8(b'z'); // arbitrary unused value + write_body(buf, |buf| { + buf.put_u64(data.len() as u64); + buf.put_slice(data); + }) + } } Ok(()) } diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 3f00b69cde..cacf024071 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -46,10 +46,16 @@ pub struct SafekeeperPostgresHandler { /// Parsed Postgres command. enum SafekeeperPostgresCommand { StartWalPush, - StartReplication { start_lsn: Lsn, term: Option }, + StartReplication { + start_lsn: Lsn, + term: Option, + interpret_wal: bool, + }, IdentifySystem, TimelineStatus, - JSONCtrl { cmd: AppendLogicalMessage }, + JSONCtrl { + cmd: AppendLogicalMessage, + }, } fn parse_cmd(cmd: &str) -> anyhow::Result { @@ -58,7 +64,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { } else if cmd.starts_with("START_REPLICATION") { let re = Regex::new( // We follow postgres START_REPLICATION LOGICAL options to pass term. - r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?", + r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?( interpret_wal)", ) .unwrap(); let caps = re @@ -71,7 +77,12 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { } else { None }; - Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term }) + let interpret_wal = caps.get(3).is_some(); + Ok(SafekeeperPostgresCommand::StartReplication { + start_lsn, + term, + interpret_wal, + }) } else if cmd.starts_with("IDENTIFY_SYSTEM") { Ok(SafekeeperPostgresCommand::IdentifySystem) } else if cmd.starts_with("TIMELINE_STATUS") { @@ -230,8 +241,12 @@ impl postgres_backend::Handler .instrument(info_span!("WAL receiver")) .await } - SafekeeperPostgresCommand::StartReplication { start_lsn, term } => { - self.handle_start_replication(pgb, start_lsn, term) + SafekeeperPostgresCommand::StartReplication { + start_lsn, + term, + interpret_wal, + } => { + self.handle_start_replication(pgb, start_lsn, term, interpret_wal) .instrument(info_span!("WAL sender")) .await } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 6d677f405a..070394d5db 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -380,17 +380,21 @@ impl SafekeeperPostgresHandler { /// Wrapper around handle_start_replication_guts handling result. Error is /// handled here while we're still in walsender ttid span; with API /// extension, this can probably be moved into postgres_backend. + /// + /// If interpret_wal is true, change the protocol to send custom Neon InterpretedWalRecord + /// instead of XLogData, for ingestion by Pageservers. pub async fn handle_start_replication( &mut self, pgb: &mut PostgresBackend, start_pos: Lsn, term: Option, + interpret_wal: bool, ) -> Result<(), QueryError> { let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; let residence_guard = tli.wal_residence_guard().await?; if let Err(end) = self - .handle_start_replication_guts(pgb, start_pos, term, residence_guard) + .handle_start_replication_guts(pgb, start_pos, term, interpret_wal, residence_guard) .await { let info = tli.get_safekeeper_info(&self.conf).await; @@ -407,6 +411,7 @@ impl SafekeeperPostgresHandler { pgb: &mut PostgresBackend, start_pos: Lsn, term: Option, + interpret_wal: bool, tli: WalResidentTimeline, ) -> Result<(), CopyStreamHandlerEnd> { let appname = self.appname.clone(); @@ -464,6 +469,7 @@ impl SafekeeperPostgresHandler { start_pos, end_pos, term, + interpret_wal, end_watch, ws_guard: ws_guard.clone(), wal_reader, @@ -543,6 +549,8 @@ struct WalSender<'a, IO> { /// in. Streaming is stopped if local term changes to a different (higher) /// value. term: Option, + /// If true, decode and filter WAL records and send InterpretedWalRecord instead of XLogRecord. + interpret_wal: bool, /// Watch channel receiver to learn end of available WAL (and wait for its advancement). end_watch: EndWatch, ws_guard: Arc, @@ -571,45 +579,49 @@ impl WalSender<'_, IO> { "nothing to send after waiting for WAL" ); - // try to send as much as available, capped by MAX_SEND_SIZE - let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64; - // if we went behind available WAL, back off - if chunk_end_pos >= self.end_pos { - chunk_end_pos = self.end_pos; + let (msg, send_size) = if self.interpret_wal { + (BeMessage::NeonInterpretedWalRecord(&[]), 0) // TODO } else { - // If sending not up to end pos, round down to page boundary to - // avoid breaking WAL record not at page boundary, as protocol - // demands. See walsender.c (XLogSendPhysical). - chunk_end_pos = chunk_end_pos - .checked_sub(chunk_end_pos.block_offset()) - .unwrap(); - } - let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize; - let send_buf = &mut self.send_buf[..send_size]; - let send_size: usize; - { - // If uncommitted part is being pulled, check that the term is - // still the expected one. - let _term_guard = if let Some(t) = self.term { - Some(self.tli.acquire_term(t).await?) + // try to send as much as available, capped by MAX_SEND_SIZE + let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64; + // if we went behind available WAL, back off + if chunk_end_pos >= self.end_pos { + chunk_end_pos = self.end_pos; } else { - None + // If sending not up to end pos, round down to page boundary to + // avoid breaking WAL record not at page boundary, as protocol + // demands. See walsender.c (XLogSendPhysical). + chunk_end_pos = chunk_end_pos + .checked_sub(chunk_end_pos.block_offset()) + .unwrap(); + } + let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize; + let send_buf = &mut self.send_buf[..send_size]; + let send_size: usize; + { + // If uncommitted part is being pulled, check that the term is + // still the expected one. + let _term_guard = if let Some(t) = self.term { + Some(self.tli.acquire_term(t).await?) + } else { + None + }; + // Read WAL into buffer. send_size can be additionally capped to + // segment boundary here. + send_size = self.wal_reader.read(send_buf).await? }; - // Read WAL into buffer. send_size can be additionally capped to - // segment boundary here. - send_size = self.wal_reader.read(send_buf).await? - }; - let send_buf = &send_buf[..send_size]; - - // and send it - self.pgb - .write_message(&BeMessage::XLogData(XLogDataBody { + let send_buf = &send_buf[..send_size]; + let msg = BeMessage::XLogData(XLogDataBody { wal_start: self.start_pos.0, wal_end: self.end_pos.0, timestamp: get_current_timestamp(), data: send_buf, - })) - .await?; + }); + (msg, send_size) + }; + + // and send it + self.pgb.write_message(&msg).await?; if let Some(appname) = &self.appname { if appname == "replica" {