From e70d486281953031c3aa26a3969b8d67eab8bf00 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 9 Nov 2023 19:33:41 +0000 Subject: [PATCH] Add safekeeper protocol to send decoded records --- Cargo.lock | 1 + .../walreceiver/walreceiver_connection.rs | 66 ++++---- pageserver/src/walingest.rs | 7 + safekeeper/Cargo.toml | 1 + safekeeper/src/handler.rs | 26 ++- safekeeper/src/lib.rs | 1 + safekeeper/src/send_wal.rs | 116 ++++++++----- safekeeper/src/send_wal_sharded.rs | 160 ++++++++++++++++++ 8 files changed, 295 insertions(+), 83 deletions(-) create mode 100644 safekeeper/src/send_wal_sharded.rs diff --git a/Cargo.lock b/Cargo.lock index 0292cf5b47..5009d0e472 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4466,6 +4466,7 @@ dependencies = [ "hyper", "metrics", "once_cell", + "pageserver_api", "parking_lot 0.12.1", "postgres", "postgres-protocol", diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 3e56753ad4..233975adef 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -36,7 +36,7 @@ use crate::{ }; use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; -use postgres_ffi::waldecoder::WalStreamDecoder; + use utils::pageserver_feedback::PageserverFeedback; use utils::{id::NodeId, lsn::Lsn}; @@ -244,13 +244,22 @@ pub(super) async fn handle_walreceiver_connection( info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."); - let query = format!("START_REPLICATION PHYSICAL {startpoint}"); + let shard = timeline.get_shard(); + let shard_str = serde_json::to_string(&shard).map_err(|e| { + WalReceiverError::Other(anyhow!( + "Failed to serialize shard info for walreceiver: {e}" + )) + })?; + info!("starting replication for shard {shard_str}"); + + let query = format!( + "START_REPLICATION PHYSICAL {startpoint} (shard={})", + shard_str + ); let copy_stream = replication_client.copy_both_simple(&query).await?; let mut physical_stream = pin!(ReplicationStream::new(copy_stream)); - let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version); - let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?; while let Some(replication_message) = { @@ -273,9 +282,7 @@ pub(super) async fn handle_walreceiver_connection( ReplicationMessage::XLogData(xlog_data) => { connection_status.latest_connection_update = now; connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end())); - connection_status.streaming_lsn = Some(Lsn::from( - xlog_data.wal_start() + xlog_data.data().len() as u64, - )); + connection_status.streaming_lsn = Some(Lsn::from(xlog_data.wal_start())); if !xlog_data.data().is_empty() { connection_status.latest_wal_update = now; } @@ -293,44 +300,29 @@ pub(super) async fn handle_walreceiver_connection( let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { - // Pass the WAL data to the decoder, and see if we can decode - // more records as a result. - let data = xlog_data.data(); - let startlsn = Lsn::from(xlog_data.wal_start()); - let endlsn = startlsn + data.len() as u64; + // Process decoded WAL record. + let next_lsn = Lsn::from(xlog_data.wal_start()); + let data = xlog_data.into_data(); - trace!("received XLogData between {startlsn} and {endlsn}"); + trace!("received XLogData up to {next_lsn}"); - waldecoder.feed_bytes(data); + let mut decoded = DecodedWALRecord::default(); + let mut modification = timeline.begin_modification(next_lsn); + walingest + .ingest_record(data, next_lsn, &mut modification, &mut decoded, &ctx) + .await + .with_context(|| format!("could not ingest record at {next_lsn}"))?; - { - let mut decoded = DecodedWALRecord::default(); - let mut modification = timeline.begin_modification(endlsn); - while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - // It is important to deal with the aligned records as lsn in getPage@LSN is - // aligned and can be several bytes bigger. Without this alignment we are - // at risk of hitting a deadlock. - if !lsn.is_aligned() { - return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); - } + fail_point!("walreceiver-after-ingest"); - walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) - .await - .with_context(|| format!("could not ingest record at {lsn}"))?; + last_rec_lsn = next_lsn; - fail_point!("walreceiver-after-ingest"); - - last_rec_lsn = lsn; - } - } - - if !caught_up && endlsn >= end_of_wal { - info!("caught up at LSN {endlsn}"); + if !caught_up && next_lsn >= end_of_wal { + info!("caught up at LSN {next_lsn}"); caught_up = true; } - Some(endlsn) + Some(last_rec_lsn) } ReplicationMessage::PrimaryKeepAlive(keepalive) => { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 19804da34f..61ceac1147 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -93,6 +93,13 @@ impl<'a> WalIngest<'a> { modification.lsn = lsn; decode_wal_record(recdata, decoded, self.timeline.pg_version)?; + tracing::trace!( + "decoded rmid={} xid={} xl_info={}", + decoded.xl_rmid, + decoded.xl_xid, + decoded.xl_info + ); + // Fast path: we may skip the entire record if it only references blocks on another shard. // Otherwise we proceed, and filter blocks later. let any_local_blocks = decoded.blocks.iter().any(|blk| { diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 30516c0763..91100a436e 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -41,6 +41,7 @@ toml_edit.workspace = true tracing.workspace = true url.workspace = true metrics.workspace = true +pageserver_api.workspace = true postgres_backend.workspace = true postgres_ffi.workspace = true pq_proto.workspace = true diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index d5333abae6..f86af15471 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -2,6 +2,7 @@ //! protocol commands. use anyhow::Context; + use std::str::FromStr; use std::str::{self}; use std::sync::Arc; @@ -12,7 +13,7 @@ use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED}; -use crate::safekeeper::Term; +use crate::send_wal::ReplicationOptions; use crate::timeline::TimelineError; use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; @@ -46,7 +47,7 @@ pub struct SafekeeperPostgresHandler { /// Parsed Postgres command. enum SafekeeperPostgresCommand { StartWalPush, - StartReplication { start_lsn: Lsn, term: Option }, + StartReplication(ReplicationOptions), IdentifySystem, TimelineStatus, JSONCtrl { cmd: AppendLogicalMessage }, @@ -58,7 +59,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { } else if cmd.starts_with("START_REPLICATION") { let re = Regex::new( // We follow postgres START_REPLICATION LOGICAL options to pass term. - r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?", + r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?(?: \(shard=(.+)\))?", ) .unwrap(); let caps = re @@ -71,7 +72,18 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { } else { None }; - Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term }) + let shard = if let Some(m) = caps.get(3) { + Some(serde_json::from_str(m.as_str())?) + } else { + None + }; + Ok(SafekeeperPostgresCommand::StartReplication( + ReplicationOptions { + start_lsn, + term, + shard, + }, + )) } else if cmd.starts_with("IDENTIFY_SYSTEM") { Ok(SafekeeperPostgresCommand::IdentifySystem) } else if cmd.starts_with("TIMELINE_STATUS") { @@ -86,7 +98,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { } } -fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str { +fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &'static str { match cmd { SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH", SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION", @@ -228,8 +240,8 @@ impl postgres_backend::Handler .instrument(info_span!("WAL receiver")) .await } - SafekeeperPostgresCommand::StartReplication { start_lsn, term } => { - self.handle_start_replication(pgb, start_lsn, term) + SafekeeperPostgresCommand::StartReplication(opts) => { + self.handle_start_replication(pgb, opts) .instrument(info_span!("WAL sender")) .await } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 3a086f1f54..1de9bc8f12 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -27,6 +27,7 @@ pub mod recovery; pub mod remove_wal; pub mod safekeeper; pub mod send_wal; +pub mod send_wal_sharded; pub mod timeline; pub mod wal_backup; pub mod wal_service; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 44f14f8c7e..debade2349 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -6,13 +6,15 @@ use crate::safekeeper::{Term, TermLsn}; use crate::timeline::Timeline; use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; -use crate::GlobalTimelines; +use crate::{send_wal_sharded, GlobalTimelines}; use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; +use pageserver_api::shard::ShardIdentity; use parking_lot::Mutex; use postgres_backend::PostgresBackend; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; use postgres_ffi::get_current_timestamp; +use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; use serde::{Deserialize, Serialize}; @@ -31,6 +33,12 @@ use tokio::time::timeout; use tracing::*; use utils::{bin_ser::BeSer, lsn::Lsn}; +pub struct ReplicationOptions { + pub start_lsn: Lsn, + pub term: Option, + pub shard: Option, +} + // See: https://www.postgresql.org/docs/13/protocol-replication.html const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; @@ -349,6 +357,22 @@ impl Drop for WalSenderGuard { } } +impl WalSenderGuard { + pub async fn should_stop(&self, tli: &Arc) -> bool { + if let Some(remote_consistent_lsn) = self.walsenders.get_ws_remote_consistent_lsn(self.id) { + if tli.should_walsender_stop(remote_consistent_lsn).await { + // Terminate if there is nothing more to send. + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. + return true; + } + } + + false + } +} + impl SafekeeperPostgresHandler { /// Wrapper around handle_start_replication_guts handling result. Error is /// handled here while we're still in walsender ttid span; with API @@ -356,13 +380,9 @@ impl SafekeeperPostgresHandler { pub async fn handle_start_replication( &mut self, pgb: &mut PostgresBackend, - start_pos: Lsn, - term: Option, + opts: ReplicationOptions, ) -> Result<(), QueryError> { - if let Err(end) = self - .handle_start_replication_guts(pgb, start_pos, term) - .await - { + if let Err(end) = self.handle_start_replication_guts(pgb, opts).await { // Log the result and probably send it to the client, closing the stream. pgb.handle_copy_stream_end(end).await; } @@ -372,12 +392,12 @@ impl SafekeeperPostgresHandler { pub async fn handle_start_replication_guts( &mut self, pgb: &mut PostgresBackend, - start_pos: Lsn, - term: Option, + opts: ReplicationOptions, ) -> Result<(), CopyStreamHandlerEnd> { let appname = self.appname.clone(); let tli = GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?; + let start_pos = opts.start_lsn; // Use a guard object to remove our entry from the timeline when we are done. let ws_guard = Arc::new(tli.get_walsenders().register( @@ -415,11 +435,13 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}", + "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, addr={}, shard={:?}", start_pos, end_pos, matches!(end_watch, EndWatch::Flush(_)), - appname + appname, + pgb.get_peer_addr(), + opts.shard, ); // switch to copy @@ -438,23 +460,49 @@ impl SafekeeperPostgresHandler { // not synchronized with sends, so this avoids deadlocks. let reader = pgb.split().context("START_REPLICATION split")?; - let mut sender = WalSender { - pgb, - tli: tli.clone(), - appname, - start_pos, - end_pos, - term, - end_watch, - ws_guard: ws_guard.clone(), - wal_reader, - send_buf: [0; MAX_SEND_SIZE], + let ws_guard_clone = ws_guard.clone(); + let sender_future = async { + if let Some(_shard) = opts.shard { + send_wal_sharded::WalSender { + pgb, + tli: tli.clone(), + appname, + start_pos, + end_pos, + term: opts.term, + end_watch, + ws_guard: ws_guard_clone, + wal_reader, + send_buf: [0; MAX_SEND_SIZE], + waldecoder: WalStreamDecoder::new( + start_pos, + tli.get_state().await.1.server.pg_version / 10000, + ), + } + .run() + .await + } else { + WalSender { + pgb, + tli: tli.clone(), + appname, + start_pos, + end_pos, + term: opts.term, + end_watch, + ws_guard: ws_guard_clone, + wal_reader, + send_buf: [0; MAX_SEND_SIZE], + } + .run() + .await + } }; let mut reply_reader = ReplyReader { reader, ws_guard }; let res = tokio::select! { // todo: add read|write .context to these errors - r = sender.run() => r, + r = sender_future => r, r = reply_reader.run() => r, }; // Join pg backend back. @@ -466,14 +514,14 @@ impl SafekeeperPostgresHandler { /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the /// given term (recovery by walproposer or peer safekeeper). -enum EndWatch { +pub enum EndWatch { Commit(Receiver), Flush(Receiver), } impl EndWatch { /// Get current end of WAL. - fn get(&self) -> Lsn { + pub fn get(&self) -> Lsn { match self { EndWatch::Commit(r) => *r.borrow(), EndWatch::Flush(r) => r.borrow().lsn, @@ -481,7 +529,7 @@ impl EndWatch { } /// Wait for the update. - async fn changed(&mut self) -> anyhow::Result<()> { + pub async fn changed(&mut self) -> anyhow::Result<()> { match self { EndWatch::Commit(r) => r.changed().await?, EndWatch::Flush(r) => r.changed().await?, @@ -598,21 +646,11 @@ impl WalSender<'_, IO> { // Check for termination only if we are streaming up to commit_lsn // (to pageserver). if let EndWatch::Commit(_) = self.end_watch { - if let Some(remote_consistent_lsn) = self - .ws_guard - .walsenders - .get_ws_remote_consistent_lsn(self.ws_guard.id) - { - if self.tli.should_walsender_stop(remote_consistent_lsn).await { - // Terminate if there is nothing more to send. - // Note that "ending streaming" part of the string is used by - // pageserver to identify WalReceiverError::SuccessfulCompletion, - // do not change this string without updating pageserver. - return Err(CopyStreamHandlerEnd::ServerInitiated(format!( + if self.ws_guard.should_stop(&self.tli).await { + return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", self.appname, self.start_pos, ))); - } } } @@ -685,7 +723,7 @@ const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); /// - Ok(None) if timeout expired; /// - Err in case of error -- only if 1) term changed while fetching in recovery /// mode 2) watch channel closed, which must never happen. -async fn wait_for_lsn( +pub async fn wait_for_lsn( rx: &mut EndWatch, client_term: Option, start_pos: Lsn, diff --git a/safekeeper/src/send_wal_sharded.rs b/safekeeper/src/send_wal_sharded.rs new file mode 100644 index 0000000000..fb57a0e455 --- /dev/null +++ b/safekeeper/src/send_wal_sharded.rs @@ -0,0 +1,160 @@ +use std::{cmp::min, sync::Arc}; + +use anyhow::Context; +use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend}; +use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder, MAX_SEND_SIZE}; +use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{trace}; +use utils::lsn::Lsn; + +use crate::{ + safekeeper::Term, + send_wal::{wait_for_lsn, EndWatch, WalSenderGuard}, + timeline::Timeline, + wal_storage::WalReader, +}; + +/// A half driving sending WAL. +pub struct WalSender<'a, IO> { + pub pgb: &'a mut PostgresBackend, + pub tli: Arc, + pub appname: Option, + // Position since which we are sending next chunk. + pub start_pos: Lsn, + // WAL up to this position is known to be locally available. + // Usually this is the same as the latest commit_lsn, but in case of + // walproposer recovery, this is flush_lsn. + // + // We send this LSN to the receiver as wal_end, so that it knows how much + // WAL this safekeeper has. This LSN should be as fresh as possible. + pub end_pos: Lsn, + /// When streaming uncommitted part, the term the client acts as the leader + /// in. Streaming is stopped if local term changes to a different (higher) + /// value. + pub term: Option, + /// Watch channel receiver to learn end of available WAL (and wait for its advancement). + pub end_watch: EndWatch, + pub ws_guard: Arc, + pub wal_reader: WalReader, + // buffer for readling WAL into to send it + pub send_buf: [u8; MAX_SEND_SIZE], + pub waldecoder: WalStreamDecoder, +} + +impl WalSender<'_, IO> { + /// Send WAL until + /// - an error occurs + /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn) + /// + /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ? + /// convenience. + pub async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> { + loop { + // Wait for the next portion if it is not there yet, or just + // update our end of WAL available for sending value, we + // communicate it to the receiver. + self.wait_wal().await?; + assert!( + self.end_pos > self.start_pos, + "nothing to send after waiting for WAL" + ); + + // try to send as much as available, capped by MAX_SEND_SIZE + let mut send_size = self + .end_pos + .checked_sub(self.start_pos) + .context("reading wal without waiting for it first")? + .0 as usize; + send_size = min(send_size, self.send_buf.len()); + let send_buf = &mut self.send_buf[..send_size]; + let send_size: usize; + { + // If uncommitted part is being pulled, check that the term is + // still the expected one. + let _term_guard = if let Some(t) = self.term { + Some(self.tli.acquire_term(t).await?) + } else { + None + }; + // read wal into buffer + send_size = self.wal_reader.read(send_buf).await? + }; + let send_buf = &send_buf[..send_size]; + + // feed waldecoder with the data + self.waldecoder.feed_bytes(send_buf); + self.start_pos += send_size as u64; + + while let Some((lsn, recdata)) = + self.waldecoder.poll_decode().context("wal decoding")? + { + // It is important to deal with the aligned records as lsn in getPage@LSN is + // aligned and can be several bytes bigger. Without this alignment we are + // at risk of hitting a deadlock. + if !lsn.is_aligned() { + return Err(CopyStreamHandlerEnd::ServerInitiated(format!( + "unaligned record at {}", + lsn + ))); + } + + trace!( + "read record of {} bytes of WAL ending at {}", + recdata.len(), + lsn + ); + + // and send it + self.pgb + .write_message(&BeMessage::XLogData(XLogDataBody { + wal_start: lsn.0, + wal_end: self.end_pos.0, + timestamp: get_current_timestamp(), + data: &recdata, + })) + .await?; + } + } + } + + /// wait until we have WAL to stream, sending keepalives and checking for + /// exit in the meanwhile + async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> { + loop { + self.end_pos = self.end_watch.get(); + if self.end_pos > self.start_pos { + // We have something to send. + trace!("got end_pos {:?}, streaming", self.end_pos); + return Ok(()); + } + + // Wait for WAL to appear, now self.end_pos == self.start_pos. + if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? { + self.end_pos = lsn; + trace!("got end_pos {:?}, streaming", self.end_pos); + return Ok(()); + } + + // Timed out waiting for WAL, check for termination and send KA. + // Check for termination only if we are streaming up to commit_lsn + // (to pageserver). + if let EndWatch::Commit(_) = self.end_watch { + if self.ws_guard.should_stop(&self.tli).await { + return Err(CopyStreamHandlerEnd::ServerInitiated(format!( + "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", + self.appname, self.start_pos, + ))); + } + } + + self.pgb + .write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + wal_end: self.end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .await?; + } + } +}