Add safekeeper protocol to send decoded records

This commit is contained in:
Arthur Petukhovsky
2023-11-09 19:33:41 +00:00
parent 56630b0eda
commit e70d486281
8 changed files with 295 additions and 83 deletions

1
Cargo.lock generated
View File

@@ -4466,6 +4466,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",

View File

@@ -36,7 +36,7 @@ use crate::{
};
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::pageserver_feedback::PageserverFeedback;
use utils::{id::NodeId, lsn::Lsn};
@@ -244,13 +244,22 @@ pub(super) async fn handle_walreceiver_connection(
info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
let query = format!("START_REPLICATION PHYSICAL {startpoint}");
let shard = timeline.get_shard();
let shard_str = serde_json::to_string(&shard).map_err(|e| {
WalReceiverError::Other(anyhow!(
"Failed to serialize shard info for walreceiver: {e}"
))
})?;
info!("starting replication for shard {shard_str}");
let query = format!(
"START_REPLICATION PHYSICAL {startpoint} (shard={})",
shard_str
);
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
while let Some(replication_message) = {
@@ -273,9 +282,7 @@ pub(super) async fn handle_walreceiver_connection(
ReplicationMessage::XLogData(xlog_data) => {
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
xlog_data.wal_start() + xlog_data.data().len() as u64,
));
connection_status.streaming_lsn = Some(Lsn::from(xlog_data.wal_start()));
if !xlog_data.data().is_empty() {
connection_status.latest_wal_update = now;
}
@@ -293,44 +300,29 @@ pub(super) async fn handle_walreceiver_connection(
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
// Process decoded WAL record.
let next_lsn = Lsn::from(xlog_data.wal_start());
let data = xlog_data.into_data();
trace!("received XLogData between {startlsn} and {endlsn}");
trace!("received XLogData up to {next_lsn}");
waldecoder.feed_bytes(data);
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(next_lsn);
walingest
.ingest_record(data, next_lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {next_lsn}"))?;
{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(endlsn);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !lsn.is_aligned() {
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
fail_point!("walreceiver-after-ingest");
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
last_rec_lsn = next_lsn;
fail_point!("walreceiver-after-ingest");
last_rec_lsn = lsn;
}
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {endlsn}");
if !caught_up && next_lsn >= end_of_wal {
info!("caught up at LSN {next_lsn}");
caught_up = true;
}
Some(endlsn)
Some(last_rec_lsn)
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {

View File

@@ -93,6 +93,13 @@ impl<'a> WalIngest<'a> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
tracing::trace!(
"decoded rmid={} xid={} xl_info={}",
decoded.xl_rmid,
decoded.xl_xid,
decoded.xl_info
);
// Fast path: we may skip the entire record if it only references blocks on another shard.
// Otherwise we proceed, and filter blocks later.
let any_local_blocks = decoded.blocks.iter().any(|blk| {

View File

@@ -41,6 +41,7 @@ toml_edit.workspace = true
tracing.workspace = true
url.workspace = true
metrics.workspace = true
pageserver_api.workspace = true
postgres_backend.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true

View File

@@ -2,6 +2,7 @@
//! protocol commands.
use anyhow::Context;
use std::str::FromStr;
use std::str::{self};
use std::sync::Arc;
@@ -12,7 +13,7 @@ use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
use crate::safekeeper::Term;
use crate::send_wal::ReplicationOptions;
use crate::timeline::TimelineError;
use crate::wal_service::ConnectionId;
use crate::{GlobalTimelines, SafeKeeperConf};
@@ -46,7 +47,7 @@ pub struct SafekeeperPostgresHandler {
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush,
StartReplication { start_lsn: Lsn, term: Option<Term> },
StartReplication(ReplicationOptions),
IdentifySystem,
TimelineStatus,
JSONCtrl { cmd: AppendLogicalMessage },
@@ -58,7 +59,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
} else if cmd.starts_with("START_REPLICATION") {
let re = Regex::new(
// We follow postgres START_REPLICATION LOGICAL options to pass term.
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?",
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)(?: \(term='(\d+)'\))?(?: \(shard=(.+)\))?",
)
.unwrap();
let caps = re
@@ -71,7 +72,18 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
} else {
None
};
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn, term })
let shard = if let Some(m) = caps.get(3) {
Some(serde_json::from_str(m.as_str())?)
} else {
None
};
Ok(SafekeeperPostgresCommand::StartReplication(
ReplicationOptions {
start_lsn,
term,
shard,
},
))
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
Ok(SafekeeperPostgresCommand::IdentifySystem)
} else if cmd.starts_with("TIMELINE_STATUS") {
@@ -86,7 +98,7 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
}
}
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &'static str {
match cmd {
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
@@ -228,8 +240,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
SafekeeperPostgresCommand::StartReplication(opts) => {
self.handle_start_replication(pgb, opts)
.instrument(info_span!("WAL sender"))
.await
}

View File

@@ -27,6 +27,7 @@ pub mod recovery;
pub mod remove_wal;
pub mod safekeeper;
pub mod send_wal;
pub mod send_wal_sharded;
pub mod timeline;
pub mod wal_backup;
pub mod wal_service;

View File

@@ -6,13 +6,15 @@ use crate::safekeeper::{Term, TermLsn};
use crate::timeline::Timeline;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use crate::{send_wal_sharded, GlobalTimelines};
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
use pageserver_api::shard::ShardIdentity;
use parking_lot::Mutex;
use postgres_backend::PostgresBackend;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
@@ -31,6 +33,12 @@ use tokio::time::timeout;
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn};
pub struct ReplicationOptions {
pub start_lsn: Lsn,
pub term: Option<Term>,
pub shard: Option<ShardIdentity>,
}
// See: https://www.postgresql.org/docs/13/protocol-replication.html
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
@@ -349,6 +357,22 @@ impl Drop for WalSenderGuard {
}
}
impl WalSenderGuard {
pub async fn should_stop(&self, tli: &Arc<Timeline>) -> bool {
if let Some(remote_consistent_lsn) = self.walsenders.get_ws_remote_consistent_lsn(self.id) {
if tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return true;
}
}
false
}
}
impl SafekeeperPostgresHandler {
/// Wrapper around handle_start_replication_guts handling result. Error is
/// handled here while we're still in walsender ttid span; with API
@@ -356,13 +380,9 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
opts: ReplicationOptions,
) -> Result<(), QueryError> {
if let Err(end) = self
.handle_start_replication_guts(pgb, start_pos, term)
.await
{
if let Err(end) = self.handle_start_replication_guts(pgb, opts).await {
// Log the result and probably send it to the client, closing the stream.
pgb.handle_copy_stream_end(end).await;
}
@@ -372,12 +392,12 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
opts: ReplicationOptions,
) -> Result<(), CopyStreamHandlerEnd> {
let appname = self.appname.clone();
let tli =
GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
let start_pos = opts.start_lsn;
// Use a guard object to remove our entry from the timeline when we are done.
let ws_guard = Arc::new(tli.get_walsenders().register(
@@ -415,11 +435,13 @@ impl SafekeeperPostgresHandler {
}
info!(
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, addr={}, shard={:?}",
start_pos,
end_pos,
matches!(end_watch, EndWatch::Flush(_)),
appname
appname,
pgb.get_peer_addr(),
opts.shard,
);
// switch to copy
@@ -438,23 +460,49 @@ impl SafekeeperPostgresHandler {
// not synchronized with sends, so this avoids deadlocks.
let reader = pgb.split().context("START_REPLICATION split")?;
let mut sender = WalSender {
pgb,
tli: tli.clone(),
appname,
start_pos,
end_pos,
term,
end_watch,
ws_guard: ws_guard.clone(),
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
let ws_guard_clone = ws_guard.clone();
let sender_future = async {
if let Some(_shard) = opts.shard {
send_wal_sharded::WalSender {
pgb,
tli: tli.clone(),
appname,
start_pos,
end_pos,
term: opts.term,
end_watch,
ws_guard: ws_guard_clone,
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
waldecoder: WalStreamDecoder::new(
start_pos,
tli.get_state().await.1.server.pg_version / 10000,
),
}
.run()
.await
} else {
WalSender {
pgb,
tli: tli.clone(),
appname,
start_pos,
end_pos,
term: opts.term,
end_watch,
ws_guard: ws_guard_clone,
wal_reader,
send_buf: [0; MAX_SEND_SIZE],
}
.run()
.await
}
};
let mut reply_reader = ReplyReader { reader, ws_guard };
let res = tokio::select! {
// todo: add read|write .context to these errors
r = sender.run() => r,
r = sender_future => r,
r = reply_reader.run() => r,
};
// Join pg backend back.
@@ -466,14 +514,14 @@ impl SafekeeperPostgresHandler {
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
/// given term (recovery by walproposer or peer safekeeper).
enum EndWatch {
pub enum EndWatch {
Commit(Receiver<Lsn>),
Flush(Receiver<TermLsn>),
}
impl EndWatch {
/// Get current end of WAL.
fn get(&self) -> Lsn {
pub fn get(&self) -> Lsn {
match self {
EndWatch::Commit(r) => *r.borrow(),
EndWatch::Flush(r) => r.borrow().lsn,
@@ -481,7 +529,7 @@ impl EndWatch {
}
/// Wait for the update.
async fn changed(&mut self) -> anyhow::Result<()> {
pub async fn changed(&mut self) -> anyhow::Result<()> {
match self {
EndWatch::Commit(r) => r.changed().await?,
EndWatch::Flush(r) => r.changed().await?,
@@ -598,21 +646,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
// Check for termination only if we are streaming up to commit_lsn
// (to pageserver).
if let EndWatch::Commit(_) = self.end_watch {
if let Some(remote_consistent_lsn) = self
.ws_guard
.walsenders
.get_ws_remote_consistent_lsn(self.ws_guard.id)
{
if self.tli.should_walsender_stop(remote_consistent_lsn).await {
// Terminate if there is nothing more to send.
// Note that "ending streaming" part of the string is used by
// pageserver to identify WalReceiverError::SuccessfulCompletion,
// do not change this string without updating pageserver.
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
if self.ws_guard.should_stop(&self.tli).await {
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
)));
}
}
}
@@ -685,7 +723,7 @@ const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
/// - Ok(None) if timeout expired;
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(
pub async fn wait_for_lsn(
rx: &mut EndWatch,
client_term: Option<Term>,
start_pos: Lsn,

View File

@@ -0,0 +1,160 @@
use std::{cmp::min, sync::Arc};
use anyhow::Context;
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{trace};
use utils::lsn::Lsn;
use crate::{
safekeeper::Term,
send_wal::{wait_for_lsn, EndWatch, WalSenderGuard},
timeline::Timeline,
wal_storage::WalReader,
};
/// A half driving sending WAL.
pub struct WalSender<'a, IO> {
pub pgb: &'a mut PostgresBackend<IO>,
pub tli: Arc<Timeline>,
pub appname: Option<String>,
// Position since which we are sending next chunk.
pub start_pos: Lsn,
// WAL up to this position is known to be locally available.
// Usually this is the same as the latest commit_lsn, but in case of
// walproposer recovery, this is flush_lsn.
//
// We send this LSN to the receiver as wal_end, so that it knows how much
// WAL this safekeeper has. This LSN should be as fresh as possible.
pub end_pos: Lsn,
/// When streaming uncommitted part, the term the client acts as the leader
/// in. Streaming is stopped if local term changes to a different (higher)
/// value.
pub term: Option<Term>,
/// Watch channel receiver to learn end of available WAL (and wait for its advancement).
pub end_watch: EndWatch,
pub ws_guard: Arc<WalSenderGuard>,
pub wal_reader: WalReader,
// buffer for readling WAL into to send it
pub send_buf: [u8; MAX_SEND_SIZE],
pub waldecoder: WalStreamDecoder,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// Send WAL until
/// - an error occurs
/// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
///
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
/// convenience.
pub async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
// Wait for the next portion if it is not there yet, or just
// update our end of WAL available for sending value, we
// communicate it to the receiver.
self.wait_wal().await?;
assert!(
self.end_pos > self.start_pos,
"nothing to send after waiting for WAL"
);
// try to send as much as available, capped by MAX_SEND_SIZE
let mut send_size = self
.end_pos
.checked_sub(self.start_pos)
.context("reading wal without waiting for it first")?
.0 as usize;
send_size = min(send_size, self.send_buf.len());
let send_buf = &mut self.send_buf[..send_size];
let send_size: usize;
{
// If uncommitted part is being pulled, check that the term is
// still the expected one.
let _term_guard = if let Some(t) = self.term {
Some(self.tli.acquire_term(t).await?)
} else {
None
};
// read wal into buffer
send_size = self.wal_reader.read(send_buf).await?
};
let send_buf = &send_buf[..send_size];
// feed waldecoder with the data
self.waldecoder.feed_bytes(send_buf);
self.start_pos += send_size as u64;
while let Some((lsn, recdata)) =
self.waldecoder.poll_decode().context("wal decoding")?
{
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !lsn.is_aligned() {
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"unaligned record at {}",
lsn
)));
}
trace!(
"read record of {} bytes of WAL ending at {}",
recdata.len(),
lsn
);
// and send it
self.pgb
.write_message(&BeMessage::XLogData(XLogDataBody {
wal_start: lsn.0,
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
data: &recdata,
}))
.await?;
}
}
}
/// wait until we have WAL to stream, sending keepalives and checking for
/// exit in the meanwhile
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = self.end_watch.get();
if self.end_pos > self.start_pos {
// We have something to send.
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Wait for WAL to appear, now self.end_pos == self.start_pos.
if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
self.end_pos = lsn;
trace!("got end_pos {:?}, streaming", self.end_pos);
return Ok(());
}
// Timed out waiting for WAL, check for termination and send KA.
// Check for termination only if we are streaming up to commit_lsn
// (to pageserver).
if let EndWatch::Commit(_) = self.end_watch {
if self.ws_guard.should_stop(&self.tli).await {
return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
self.appname, self.start_pos,
)));
}
}
self.pgb
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
wal_end: self.end_pos.0,
timestamp: get_current_timestamp(),
request_reply: true,
}))
.await?;
}
}
}