mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 15:49:58 +00:00
safekeeper: lift decoding and interpretation of WAL to the safekeeper (#9746)
## Problem For any given tenant shard, pageservers receive all of the tenant's WAL from the safekeeper. This soft-blocks us from using larger shard counts due to bandwidth concerns and CPU overhead of filtering out the records. ## Summary of changes This PR lifts the decoding and interpretation of WAL from the pageserver into the safekeeper. A customised PG replication protocol is used where instead of sending raw WAL, the safekeeper sends filtered, interpreted records. The receiver drives the protocol selection, so, on the pageserver side, usage of the new protocol is gated by a new pageserver config: `wal_receiver_protocol`. More granularly the changes are: 1. Optionally inject the protocol and shard identity into the arguments used for starting replication 2. On the safekeeper side, implement a new wal sending primitive which decodes and interprets records before sending them over 3. On the pageserver side, implement the ingestion of this new replication message type. It's very similar to what we already have for raw wal (minus decoding and interpreting). ## Notes * This PR currently uses my [branch of rust-postgres](https://github.com/neondatabase/rust-postgres/tree/vlad/interpreted-wal-record-replication-support) which includes the deserialization logic for the new replication message type. PR for that is open [here](https://github.com/neondatabase/rust-postgres/pull/32). * This PR contains changes for both pageservers and safekeepers. It's safe to merge because the new protocol is disabled by default on the pageserver side. We can gradually start enabling it in subsequent releases. * CI tests are running on https://github.com/neondatabase/neon/pull/9747 ## Links Related: https://github.com/neondatabase/neon/issues/9336 Epic: https://github.com/neondatabase/neon/issues/9329
This commit is contained in:
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -4133,7 +4133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4146,7 +4146,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4165,7 +4165,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -5364,6 +5364,7 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
@@ -5395,6 +5396,7 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"wal_decoder",
|
||||
"walproposer",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -6466,7 +6468,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -7021,6 +7023,7 @@ dependencies = [
|
||||
"serde_assert",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"strum",
|
||||
"strum_macros",
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::{
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::logging::LogFormat;
|
||||
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
@@ -120,6 +120,7 @@ pub struct ConfigToml {
|
||||
pub no_sync: Option<bool>,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub server_side_batch_timeout: Option<Duration>,
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -330,6 +331,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Vanilla;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
@@ -418,6 +422,7 @@ impl Default for ConfigToml {
|
||||
.map(|duration| humantime::parse_duration(duration).unwrap()),
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -562,6 +562,9 @@ pub enum BeMessage<'a> {
|
||||
options: &'a [&'a str],
|
||||
},
|
||||
KeepAlive(WalSndKeepAlive),
|
||||
/// Batch of interpreted, shard filtered WAL records,
|
||||
/// ready for the pageserver to ingest
|
||||
InterpretedWalRecords(InterpretedWalRecordsBody<'a>),
|
||||
}
|
||||
|
||||
/// Common shorthands.
|
||||
@@ -672,6 +675,25 @@ pub struct WalSndKeepAlive {
|
||||
pub request_reply: bool,
|
||||
}
|
||||
|
||||
/// Batch of interpreted WAL records used in the interpreted
|
||||
/// safekeeper to pageserver protocol.
|
||||
///
|
||||
/// Note that the pageserver uses the RawInterpretedWalRecordsBody
|
||||
/// counterpart of this from the neondatabase/rust-postgres repo.
|
||||
/// If you're changing this struct, you likely need to change its
|
||||
/// twin as well.
|
||||
#[derive(Debug)]
|
||||
pub struct InterpretedWalRecordsBody<'a> {
|
||||
/// End of raw WAL in [`Self::data`]
|
||||
pub streaming_lsn: u64,
|
||||
/// Current end of WAL on the server
|
||||
pub commit_lsn: u64,
|
||||
/// Start LSN of the next record in PG WAL.
|
||||
/// Is 0 if the portion of PG WAL did not contain any records.
|
||||
pub next_record_lsn: u64,
|
||||
pub data: &'a [u8],
|
||||
}
|
||||
|
||||
pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]);
|
||||
|
||||
// single text column
|
||||
@@ -996,6 +1018,20 @@ impl BeMessage<'_> {
|
||||
Ok(())
|
||||
})?
|
||||
}
|
||||
|
||||
BeMessage::InterpretedWalRecords(rec) => {
|
||||
// We use the COPY_DATA_TAG for our custom message
|
||||
// since this tag is interpreted as raw bytes.
|
||||
buf.put_u8(b'd');
|
||||
write_body(buf, |buf| {
|
||||
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
|
||||
// dependency
|
||||
buf.put_u64(rec.streaming_lsn);
|
||||
buf.put_u64(rec.commit_lsn);
|
||||
buf.put_u64(rec.next_record_lsn);
|
||||
buf.put_slice(rec.data);
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ pprof.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
|
||||
use crate::id::TenantTimelineId;
|
||||
|
||||
/// Postgres client protocol types
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
#[repr(u8)]
|
||||
pub enum PostgresClientProtocol {
|
||||
/// Usual Postgres replication protocol
|
||||
Vanilla,
|
||||
/// Custom shard-aware protocol that replicates interpreted records.
|
||||
/// Used to send wal from safekeeper to pageserver.
|
||||
Interpreted,
|
||||
}
|
||||
|
||||
impl TryFrom<u8> for PostgresClientProtocol {
|
||||
type Error = u8;
|
||||
|
||||
fn try_from(value: u8) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
|
||||
v if v == (PostgresClientProtocol::Interpreted as u8) => {
|
||||
PostgresClientProtocol::Interpreted
|
||||
}
|
||||
x => return Err(x),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectionConfigArgs<'a> {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard_number: Option<u8>,
|
||||
pub shard_count: Option<u8>,
|
||||
pub shard_stripe_size: Option<u32>,
|
||||
|
||||
pub listen_pg_addr_str: &'a str,
|
||||
|
||||
pub auth_token: Option<&'a str>,
|
||||
pub availability_zone: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> ConnectionConfigArgs<'a> {
|
||||
fn options(&'a self) -> Vec<String> {
|
||||
let mut options = vec![
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", self.ttid.timeline_id),
|
||||
format!("tenant_id={}", self.ttid.tenant_id),
|
||||
format!("protocol={}", self.protocol as u8),
|
||||
];
|
||||
|
||||
if self.shard_number.is_some() {
|
||||
assert!(self.shard_count.is_some());
|
||||
assert!(self.shard_stripe_size.is_some());
|
||||
|
||||
options.push(format!("shard_count={}", self.shard_count.unwrap()));
|
||||
options.push(format!("shard_number={}", self.shard_number.unwrap()));
|
||||
options.push(format!(
|
||||
"shard_stripe_size={}",
|
||||
self.shard_stripe_size.unwrap()
|
||||
));
|
||||
}
|
||||
|
||||
options
|
||||
}
|
||||
}
|
||||
|
||||
/// Create client config for fetching WAL from safekeeper on particular timeline.
|
||||
/// listen_pg_addr_str is in form host:\[port\].
|
||||
pub fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
}: TenantTimelineId,
|
||||
listen_pg_addr_str: &str,
|
||||
auth_token: Option<&str>,
|
||||
availability_zone: Option<&str>,
|
||||
args: ConnectionConfigArgs,
|
||||
) -> anyhow::Result<PgConnectionConfig> {
|
||||
let (host, port) =
|
||||
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
|
||||
let port = port.unwrap_or(5432);
|
||||
let mut connstr = PgConnectionConfig::new_host_port(host, port)
|
||||
.extend_options([
|
||||
"-c".to_owned(),
|
||||
format!("timeline_id={}", timeline_id),
|
||||
format!("tenant_id={}", tenant_id),
|
||||
])
|
||||
.set_password(auth_token.map(|s| s.to_owned()));
|
||||
.extend_options(args.options())
|
||||
.set_password(args.auth_token.map(|s| s.to_owned()));
|
||||
|
||||
if let Some(availability_zone) = availability_zone {
|
||||
if let Some(availability_zone) = args.availability_zone {
|
||||
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
|
||||
}
|
||||
|
||||
|
||||
@@ -65,6 +65,18 @@ pub struct InterpretedWalRecord {
|
||||
pub xid: TransactionId,
|
||||
}
|
||||
|
||||
impl InterpretedWalRecord {
|
||||
/// Checks if the WAL record is empty
|
||||
///
|
||||
/// An empty interpreted WAL record has no data or metadata and does not have to be sent to the
|
||||
/// pageserver.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.batch.is_empty()
|
||||
&& self.metadata_record.is_none()
|
||||
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
|
||||
}
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
/// writes to the underlying storage engine.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -496,11 +496,16 @@ impl SerializedValueBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the batch is empty
|
||||
///
|
||||
/// A batch is empty when it contains no serialized values.
|
||||
/// Note that it may still contain observed values.
|
||||
/// Checks if the batch contains any serialized or observed values
|
||||
pub fn is_empty(&self) -> bool {
|
||||
!self.has_data() && self.metadata.is_empty()
|
||||
}
|
||||
|
||||
/// Checks if the batch contains data
|
||||
///
|
||||
/// Note that if this returns false, it may still contain observed values or
|
||||
/// a metadata record.
|
||||
pub fn has_data(&self) -> bool {
|
||||
let empty = self.raw.is_empty();
|
||||
|
||||
if cfg!(debug_assertions) && empty {
|
||||
@@ -510,7 +515,7 @@ impl SerializedValueBatch {
|
||||
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
|
||||
}
|
||||
|
||||
empty
|
||||
!empty
|
||||
}
|
||||
|
||||
/// Returns the number of values serialized in the batch
|
||||
|
||||
@@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> {
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
|
||||
@@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::logging::SecretString;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::Url;
|
||||
@@ -190,6 +191,8 @@ pub struct PageServerConf {
|
||||
/// Maximum amount of time for which a get page request request
|
||||
/// might be held up for request merging.
|
||||
pub server_side_batch_timeout: Option<Duration>,
|
||||
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -350,6 +353,7 @@ impl PageServerConf {
|
||||
server_side_batch_timeout,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
wal_receiver_protocol,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -393,6 +397,7 @@ impl PageServerConf {
|
||||
import_pgdata_upcall_api,
|
||||
import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from),
|
||||
import_pgdata_aws_endpoint_url,
|
||||
wal_receiver_protocol,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -1229,10 +1229,9 @@ impl<'a> DatadirModification<'a> {
|
||||
}
|
||||
|
||||
pub(crate) fn has_dirty_data(&self) -> bool {
|
||||
!self
|
||||
.pending_data_batch
|
||||
self.pending_data_batch
|
||||
.as_ref()
|
||||
.map_or(true, |b| b.is_empty())
|
||||
.map_or(false, |b| b.has_data())
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
@@ -1408,7 +1407,7 @@ impl<'a> DatadirModification<'a> {
|
||||
Some(pending_batch) => {
|
||||
pending_batch.extend(batch);
|
||||
}
|
||||
None if !batch.is_empty() => {
|
||||
None if batch.has_data() => {
|
||||
self.pending_data_batch = Some(batch);
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -2470,6 +2470,7 @@ impl Timeline {
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
protocol: self.conf.wal_receiver_protocol,
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
@@ -5896,7 +5897,7 @@ impl<'a> TimelineWriter<'a> {
|
||||
batch: SerializedValueBatch,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if batch.is_empty() {
|
||||
if !batch.has_data() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use self::connection_manager::ConnectionManagerStatus;
|
||||
|
||||
@@ -45,6 +46,7 @@ use super::Timeline;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WalReceiverConf {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
/// The timeout on the connection to safekeeper for WAL streaming.
|
||||
pub wal_connect_timeout: Duration,
|
||||
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
|
||||
|
||||
@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
|
||||
use utils::backoff::{
|
||||
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
use utils::postgres_client::wal_stream_connection_config;
|
||||
use utils::postgres_client::{
|
||||
wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
|
||||
};
|
||||
use utils::{
|
||||
id::{NodeId, TenantTimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -984,15 +986,33 @@ impl ConnectionManagerState {
|
||||
if info.safekeeper_connstr.is_empty() {
|
||||
return None; // no connection string, ignore sk
|
||||
}
|
||||
match wal_stream_connection_config(
|
||||
self.id,
|
||||
info.safekeeper_connstr.as_ref(),
|
||||
match &self.conf.auth_token {
|
||||
None => None,
|
||||
Some(x) => Some(x),
|
||||
|
||||
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
},
|
||||
self.conf.availability_zone.as_deref(),
|
||||
) {
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
Some(shard_identity.count.0),
|
||||
Some(shard_identity.stripe_size.0),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: self.conf.protocol,
|
||||
ttid: self.id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
};
|
||||
|
||||
match wal_stream_connection_config(connection_conf_args) {
|
||||
Ok(connstr) => Some((*sk_id, info, connstr)),
|
||||
Err(e) => {
|
||||
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
|
||||
@@ -1096,6 +1116,7 @@ impl ReconnectReason {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
|
||||
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
|
||||
use url::Host;
|
||||
|
||||
fn dummy_broker_sk_timeline(
|
||||
@@ -1532,6 +1553,7 @@ mod tests {
|
||||
timeline,
|
||||
cancel: CancellationToken::new(),
|
||||
conf: WalReceiverConf {
|
||||
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
@@ -291,6 +291,15 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
connection_status.latest_connection_update = now;
|
||||
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
|
||||
}
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
connection_status.latest_connection_update = now;
|
||||
if !raw.data().is_empty() {
|
||||
connection_status.latest_wal_update = now;
|
||||
}
|
||||
|
||||
connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
|
||||
connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
|
||||
}
|
||||
&_ => {}
|
||||
};
|
||||
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
|
||||
@@ -298,7 +307,130 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
|
||||
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
let streaming_lsn = Lsn::from(raw.streaming_lsn());
|
||||
tracing::debug!(
|
||||
"Received WAL up to {streaming_lsn} with next_record_lsn={}",
|
||||
Lsn(raw.next_record_lsn().unwrap_or(0))
|
||||
);
|
||||
|
||||
let records = Vec::<InterpretedWalRecord>::des(raw.data()).with_context(|| {
|
||||
anyhow::anyhow!(
|
||||
"Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
|
||||
)
|
||||
})?;
|
||||
|
||||
// We start the modification at 0 because each interpreted record
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
for interpreted in records {
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| format!("could not ingest record at {next_record_lsn}"))?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
uncommitted_records += 1;
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Records might have been filtered out on the safekeeper side, but we still
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
info!("caught up at LSN {streaming_lsn}");
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
|
||||
timeline.get_last_record_lsn()
|
||||
);
|
||||
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
@@ -316,21 +448,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
|
||||
@@ -28,6 +28,7 @@ hyper0.workspace = true
|
||||
futures.workspace = true
|
||||
once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
@@ -58,6 +59,7 @@ sd-notify.workspace = true
|
||||
storage_broker.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
utils.workspace = true
|
||||
wal_decoder.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
|
||||
@@ -2,11 +2,15 @@
|
||||
//! protocol commands.
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
|
||||
use std::future::Future;
|
||||
use std::str::{self, FromStr};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, info_span, Instrument};
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
@@ -35,6 +39,8 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub tenant_id: Option<TenantId>,
|
||||
pub timeline_id: Option<TimelineId>,
|
||||
pub ttid: TenantTimelineId,
|
||||
pub shard: Option<ShardIdentity>,
|
||||
pub protocol: Option<PostgresClientProtocol>,
|
||||
/// Unique connection id is logged in spans for observability.
|
||||
pub conn_id: ConnectionId,
|
||||
/// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured.
|
||||
@@ -107,11 +113,28 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
) -> Result<(), QueryError> {
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(options) = params.options_raw() {
|
||||
let mut shard_count: Option<u8> = None;
|
||||
let mut shard_number: Option<u8> = None;
|
||||
let mut shard_stripe_size: Option<u32> = None;
|
||||
|
||||
for opt in options {
|
||||
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
|
||||
// remove these after the PR gets deployed:
|
||||
// https://github.com/neondatabase/neon/pull/2433#discussion_r970005064
|
||||
match opt.split_once('=') {
|
||||
Some(("protocol", value)) => {
|
||||
let raw_value = value
|
||||
.parse::<u8>()
|
||||
.with_context(|| format!("Failed to parse {value} as protocol"))?;
|
||||
|
||||
self.protocol = Some(
|
||||
PostgresClientProtocol::try_from(raw_value).map_err(|_| {
|
||||
QueryError::Other(anyhow::anyhow!(
|
||||
"Unexpected client protocol type: {raw_value}"
|
||||
))
|
||||
})?,
|
||||
);
|
||||
}
|
||||
Some(("ztenantid", value)) | Some(("tenant_id", value)) => {
|
||||
self.tenant_id = Some(value.parse().with_context(|| {
|
||||
format!("Failed to parse {value} as tenant id")
|
||||
@@ -127,9 +150,54 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
metrics.set_client_az(client_az)
|
||||
}
|
||||
}
|
||||
Some(("shard_count", value)) => {
|
||||
shard_count = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard count")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_number", value)) => {
|
||||
shard_number = Some(value.parse::<u8>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard number")
|
||||
})?);
|
||||
}
|
||||
Some(("shard_stripe_size", value)) => {
|
||||
shard_stripe_size = Some(value.parse::<u32>().with_context(|| {
|
||||
format!("Failed to parse {value} as shard stripe size")
|
||||
})?);
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
if shard_count.is_some()
|
||||
|| shard_number.is_some()
|
||||
|| shard_stripe_size.is_some()
|
||||
{
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params specified for vanilla protocol"
|
||||
)));
|
||||
}
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
match (shard_count, shard_number, shard_stripe_size) {
|
||||
(Some(count), Some(number), Some(stripe_size)) => {
|
||||
let params = ShardParameters {
|
||||
count: ShardCount(count),
|
||||
stripe_size: ShardStripeSize(stripe_size),
|
||||
};
|
||||
self.shard =
|
||||
Some(ShardIdentity::from_params(ShardNumber(number), ¶ms));
|
||||
}
|
||||
_ => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
"Shard params were not specified"
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
@@ -150,6 +218,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
tracing::field::debug(self.appname.clone()),
|
||||
);
|
||||
|
||||
if let Some(shard) = self.shard.as_ref() {
|
||||
tracing::Span::current()
|
||||
.record("shard", tracing::field::display(shard.shard_slug()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -258,6 +331,8 @@ impl SafekeeperPostgresHandler {
|
||||
tenant_id: None,
|
||||
timeline_id: None,
|
||||
ttid: TenantTimelineId::empty(),
|
||||
shard: None,
|
||||
protocol: None,
|
||||
conn_id,
|
||||
claims: None,
|
||||
auth,
|
||||
@@ -265,6 +340,10 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn protocol(&self) -> PostgresClientProtocol {
|
||||
self.protocol.unwrap_or(PostgresClientProtocol::Vanilla)
|
||||
}
|
||||
|
||||
// when accessing management api supply None as an argument
|
||||
// when using to authorize tenant pass corresponding tenant id
|
||||
fn check_permission(&self, tenant_id: Option<TenantId>) -> Result<(), QueryError> {
|
||||
|
||||
@@ -29,6 +29,7 @@ pub mod receive_wal;
|
||||
pub mod recovery;
|
||||
pub mod remove_wal;
|
||||
pub mod safekeeper;
|
||||
pub mod send_interpreted_wal;
|
||||
pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
@@ -38,6 +39,7 @@ pub mod timeline_manager;
|
||||
pub mod timelines_set;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_reader_stream;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use tokio::{
|
||||
use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::types::PgLsn;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol};
|
||||
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
|
||||
|
||||
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
|
||||
@@ -325,7 +326,17 @@ async fn recovery_stream(
|
||||
conf: &SafeKeeperConf,
|
||||
) -> anyhow::Result<String> {
|
||||
// TODO: pass auth token
|
||||
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: PostgresClientProtocol::Vanilla,
|
||||
ttid: tli.ttid,
|
||||
shard_number: None,
|
||||
shard_count: None,
|
||||
shard_stripe_size: None,
|
||||
listen_pg_addr_str: &donor.pg_connstr,
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
};
|
||||
let cfg = wal_stream_connection_config(connection_conf_args)?;
|
||||
let mut cfg = cfg.to_tokio_postgres_config();
|
||||
// It will make safekeeper give out not committed WAL (up to flush_lsn).
|
||||
cfg.application_name(&format!("safekeeper_{}", conf.my_id));
|
||||
|
||||
121
safekeeper/src/send_interpreted_wal.rs
Normal file
121
safekeeper/src/send_interpreted_wal.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
|
||||
use postgres_ffi::MAX_SEND_SIZE;
|
||||
use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
|
||||
use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::lsn::Lsn;
|
||||
use wal_decoder::models::InterpretedWalRecord;
|
||||
|
||||
use crate::send_wal::EndWatchView;
|
||||
use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
|
||||
|
||||
/// Shard-aware interpreted record sender.
|
||||
/// This is used for sending WAL to the pageserver. Said WAL
|
||||
/// is pre-interpreted and filtered for the shard.
|
||||
pub(crate) struct InterpretedWalSender<'a, IO> {
|
||||
pub(crate) pgb: &'a mut PostgresBackend<IO>,
|
||||
pub(crate) wal_stream_builder: WalReaderStreamBuilder,
|
||||
pub(crate) end_watch_view: EndWatchView,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
pub(crate) pg_version: u32,
|
||||
pub(crate) appname: Option<String>,
|
||||
}
|
||||
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
|
||||
/// Send interpreted WAL to a receiver.
|
||||
/// Stops when an error occurs or the receiver is caught up and there's no active compute.
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
let mut wal_position = self.wal_stream_builder.start_pos();
|
||||
let mut wal_decoder =
|
||||
WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
|
||||
|
||||
let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
|
||||
let mut stream = std::pin::pin!(stream);
|
||||
|
||||
let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
|
||||
keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
keepalive_ticker.reset();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Get some WAL from the stream and then: decode, interpret and send it
|
||||
wal = stream.next() => {
|
||||
let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
|
||||
Some(some) => some?,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
wal_position = wal_end_lsn;
|
||||
wal_decoder.feed_bytes(&wal);
|
||||
|
||||
let mut records = Vec::new();
|
||||
let mut max_next_record_lsn = None;
|
||||
while let Some((next_record_lsn, recdata)) = wal_decoder
|
||||
.poll_decode()
|
||||
.with_context(|| "Failed to decode WAL")?
|
||||
{
|
||||
assert!(next_record_lsn.is_aligned());
|
||||
max_next_record_lsn = Some(next_record_lsn);
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
&self.shard,
|
||||
next_record_lsn,
|
||||
self.pg_version,
|
||||
)
|
||||
.with_context(|| "Failed to interpret WAL")?;
|
||||
|
||||
if !interpreted.is_empty() {
|
||||
records.push(interpreted);
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = Vec::new();
|
||||
records
|
||||
.ser_into(&mut buf)
|
||||
.with_context(|| "Failed to serialize interpreted WAL")?;
|
||||
|
||||
// Reset the keep alive ticker since we are sending something
|
||||
// over the wire now.
|
||||
keepalive_ticker.reset();
|
||||
|
||||
self.pgb
|
||||
.write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
|
||||
streaming_lsn: wal_end_lsn.0,
|
||||
commit_lsn: available_wal_end_lsn.0,
|
||||
next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
|
||||
data: buf.as_slice(),
|
||||
})).await?;
|
||||
}
|
||||
|
||||
// Send a periodic keep alive when the connection has been idle for a while.
|
||||
_ = keepalive_ticker.tick() => {
|
||||
self.pgb
|
||||
.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
wal_end: self.end_watch_view.get().0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The loop above ends when the receiver is caught up and there's no more WAL to send.
|
||||
Err(CopyStreamHandlerEnd::ServerInitiated(format!(
|
||||
"ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
|
||||
self.appname, wal_position,
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -5,12 +5,15 @@ use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::metrics::RECEIVED_PS_FEEDBACKS;
|
||||
use crate::receive_wal::WalReceivers;
|
||||
use crate::safekeeper::{Term, TermLsn};
|
||||
use crate::send_interpreted_wal::InterpretedWalSender;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
use crate::wal_reader_stream::WalReaderStreamBuilder;
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context as AnyhowContext};
|
||||
use bytes::Bytes;
|
||||
use futures::future::Either;
|
||||
use parking_lot::Mutex;
|
||||
use postgres_backend::PostgresBackend;
|
||||
use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
|
||||
@@ -22,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use utils::failpoint_support;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::net::SocketAddr;
|
||||
@@ -226,7 +230,7 @@ impl WalSenders {
|
||||
|
||||
/// Get remote_consistent_lsn reported by the pageserver. Returns None if
|
||||
/// client is not pageserver.
|
||||
fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
|
||||
let shared = self.mutex.lock();
|
||||
let slot = shared.get_slot(id);
|
||||
match slot.feedback {
|
||||
@@ -370,6 +374,16 @@ pub struct WalSenderGuard {
|
||||
walsenders: Arc<WalSenders>,
|
||||
}
|
||||
|
||||
impl WalSenderGuard {
|
||||
pub fn id(&self) -> WalSenderId {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn walsenders(&self) -> &Arc<WalSenders> {
|
||||
&self.walsenders
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WalSenderGuard {
|
||||
fn drop(&mut self) {
|
||||
self.walsenders.unregister(self.id);
|
||||
@@ -440,11 +454,12 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
|
||||
info!(
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
|
||||
"starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}",
|
||||
start_pos,
|
||||
end_pos,
|
||||
matches!(end_watch, EndWatch::Flush(_)),
|
||||
appname
|
||||
appname,
|
||||
self.protocol(),
|
||||
);
|
||||
|
||||
// switch to copy
|
||||
@@ -456,21 +471,51 @@ impl SafekeeperPostgresHandler {
|
||||
// not synchronized with sends, so this avoids deadlocks.
|
||||
let reader = pgb.split().context("START_REPLICATION split")?;
|
||||
|
||||
let send_fut = match self.protocol() {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
let sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
|
||||
Either::Left(sender.run())
|
||||
}
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
|
||||
let end_watch_view = end_watch.view();
|
||||
let wal_stream_builder = WalReaderStreamBuilder {
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
wal_sender_guard: ws_guard.clone(),
|
||||
};
|
||||
|
||||
let sender = InterpretedWalSender {
|
||||
pgb,
|
||||
wal_stream_builder,
|
||||
end_watch_view,
|
||||
shard: self.shard.unwrap(),
|
||||
pg_version,
|
||||
appname,
|
||||
};
|
||||
|
||||
Either::Right(sender.run())
|
||||
}
|
||||
};
|
||||
|
||||
let tli_cancel = tli.cancel.clone();
|
||||
|
||||
let mut sender = WalSender {
|
||||
pgb,
|
||||
// should succeed since we're already holding another guard
|
||||
tli: tli.wal_residence_guard().await?,
|
||||
appname,
|
||||
start_pos,
|
||||
end_pos,
|
||||
term,
|
||||
end_watch,
|
||||
ws_guard: ws_guard.clone(),
|
||||
wal_reader,
|
||||
send_buf: vec![0u8; MAX_SEND_SIZE],
|
||||
};
|
||||
let mut reply_reader = ReplyReader {
|
||||
reader,
|
||||
ws_guard: ws_guard.clone(),
|
||||
@@ -479,7 +524,7 @@ impl SafekeeperPostgresHandler {
|
||||
|
||||
let res = tokio::select! {
|
||||
// todo: add read|write .context to these errors
|
||||
r = sender.run() => r,
|
||||
r = send_fut => r,
|
||||
r = reply_reader.run() => r,
|
||||
_ = tli_cancel.cancelled() => {
|
||||
return Err(CopyStreamHandlerEnd::Cancelled);
|
||||
@@ -504,16 +549,22 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// TODO(vlad): maybe lift this instead
|
||||
/// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
|
||||
/// given term (recovery by walproposer or peer safekeeper).
|
||||
enum EndWatch {
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EndWatch {
|
||||
Commit(Receiver<Lsn>),
|
||||
Flush(Receiver<TermLsn>),
|
||||
}
|
||||
|
||||
impl EndWatch {
|
||||
pub(crate) fn view(&self) -> EndWatchView {
|
||||
EndWatchView(self.clone())
|
||||
}
|
||||
|
||||
/// Get current end of WAL.
|
||||
fn get(&self) -> Lsn {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
match self {
|
||||
EndWatch::Commit(r) => *r.borrow(),
|
||||
EndWatch::Flush(r) => r.borrow().lsn,
|
||||
@@ -521,15 +572,44 @@ impl EndWatch {
|
||||
}
|
||||
|
||||
/// Wait for the update.
|
||||
async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
EndWatch::Commit(r) => r.changed().await?,
|
||||
EndWatch::Flush(r) => r.changed().await?,
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_lsn(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
client_term: Option<Term>,
|
||||
) -> anyhow::Result<Lsn> {
|
||||
loop {
|
||||
let end_pos = self.get();
|
||||
if end_pos > lsn {
|
||||
return Ok(end_pos);
|
||||
}
|
||||
if let EndWatch::Flush(rx) = &self {
|
||||
let curr_term = rx.borrow().term;
|
||||
if let Some(client_term) = client_term {
|
||||
if curr_term != client_term {
|
||||
bail!("term changed: requested {}, now {}", client_term, curr_term);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.changed().await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct EndWatchView(EndWatch);
|
||||
|
||||
impl EndWatchView {
|
||||
pub(crate) fn get(&self) -> Lsn {
|
||||
self.0.get()
|
||||
}
|
||||
}
|
||||
/// A half driving sending WAL.
|
||||
struct WalSender<'a, IO> {
|
||||
pgb: &'a mut PostgresBackend<IO>,
|
||||
@@ -566,7 +646,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
///
|
||||
/// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
|
||||
/// convenience.
|
||||
async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
// Wait for the next portion if it is not there yet, or just
|
||||
// update our end of WAL available for sending value, we
|
||||
|
||||
149
safekeeper/src/wal_reader_stream.rs
Normal file
149
safekeeper/src/wal_reader_stream.rs
Normal file
@@ -0,0 +1,149 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_stream::try_stream;
|
||||
use bytes::Bytes;
|
||||
use futures::Stream;
|
||||
use postgres_backend::CopyStreamHandlerEnd;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
safekeeper::Term,
|
||||
send_wal::{EndWatch, WalSenderGuard},
|
||||
timeline::WalResidentTimeline,
|
||||
};
|
||||
|
||||
pub(crate) struct WalReaderStreamBuilder {
|
||||
pub(crate) tli: WalResidentTimeline,
|
||||
pub(crate) start_pos: Lsn,
|
||||
pub(crate) end_pos: Lsn,
|
||||
pub(crate) term: Option<Term>,
|
||||
pub(crate) end_watch: EndWatch,
|
||||
pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
pub(crate) fn start_pos(&self) -> Lsn {
|
||||
self.start_pos
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WalBytes {
|
||||
/// Raw PG WAL
|
||||
pub(crate) wal: Bytes,
|
||||
/// Start LSN of [`Self::wal`]
|
||||
#[allow(dead_code)]
|
||||
pub(crate) wal_start_lsn: Lsn,
|
||||
/// End LSN of [`Self::wal`]
|
||||
pub(crate) wal_end_lsn: Lsn,
|
||||
/// End LSN of WAL available on the safekeeper.
|
||||
///
|
||||
/// For pagservers this will be commit LSN,
|
||||
/// while for the compute it will be the flush LSN.
|
||||
pub(crate) available_wal_end_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalReaderStreamBuilder {
|
||||
/// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
|
||||
/// The stream terminates when the receiver (pageserver) is fully caught up
|
||||
/// and there's no active computes.
|
||||
pub(crate) async fn build(
|
||||
self,
|
||||
buffer_size: usize,
|
||||
) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
|
||||
// TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
|
||||
// We can make the raw WAL sender use this stream too and remove the duplication.
|
||||
let Self {
|
||||
tli,
|
||||
mut start_pos,
|
||||
mut end_pos,
|
||||
term,
|
||||
mut end_watch,
|
||||
wal_sender_guard,
|
||||
} = self;
|
||||
let mut wal_reader = tli.get_walreader(start_pos).await?;
|
||||
let mut buffer = vec![0; buffer_size];
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
Ok(try_stream! {
|
||||
loop {
|
||||
let have_something_to_send = end_pos > start_pos;
|
||||
|
||||
if !have_something_to_send {
|
||||
// wait for lsn
|
||||
let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
|
||||
match res {
|
||||
Ok(ok) => {
|
||||
end_pos = ok?;
|
||||
},
|
||||
Err(_) => {
|
||||
if let EndWatch::Commit(_) = end_watch {
|
||||
if let Some(remote_consistent_lsn) = wal_sender_guard
|
||||
.walsenders()
|
||||
.get_ws_remote_consistent_lsn(wal_sender_guard.id())
|
||||
{
|
||||
if tli.should_walsender_stop(remote_consistent_lsn).await {
|
||||
// Stop streaming if the receivers are caught up and
|
||||
// there's no active compute. This causes the loop in
|
||||
// [`crate::send_interpreted_wal::InterpretedWalSender::run`]
|
||||
// to exit and terminate the WAL stream.
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
assert!(
|
||||
end_pos > start_pos,
|
||||
"nothing to send after waiting for WAL"
|
||||
);
|
||||
|
||||
// try to send as much as available, capped by the buffer size
|
||||
let mut chunk_end_pos = start_pos + buffer_size as u64;
|
||||
// if we went behind available WAL, back off
|
||||
if chunk_end_pos >= end_pos {
|
||||
chunk_end_pos = end_pos;
|
||||
} else {
|
||||
// If sending not up to end pos, round down to page boundary to
|
||||
// avoid breaking WAL record not at page boundary, as protocol
|
||||
// demands. See walsender.c (XLogSendPhysical).
|
||||
chunk_end_pos = chunk_end_pos
|
||||
.checked_sub(chunk_end_pos.block_offset())
|
||||
.unwrap();
|
||||
}
|
||||
let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
|
||||
let buffer = &mut buffer[..send_size];
|
||||
let send_size: usize;
|
||||
{
|
||||
// If uncommitted part is being pulled, check that the term is
|
||||
// still the expected one.
|
||||
let _term_guard = if let Some(t) = term {
|
||||
Some(tli.acquire_term(t).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Read WAL into buffer. send_size can be additionally capped to
|
||||
// segment boundary here.
|
||||
send_size = wal_reader.read(buffer).await?
|
||||
};
|
||||
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
||||
|
||||
yield WalBytes {
|
||||
wal,
|
||||
wal_start_lsn: start_pos,
|
||||
wal_end_lsn: start_pos + send_size as u64,
|
||||
available_wal_end_lsn: end_pos
|
||||
};
|
||||
|
||||
start_pos += send_size as u64;
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -15,16 +15,21 @@ from fixtures.neon_fixtures import (
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.parametrize("shard_count", [1, 8, 32])
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_sharded_ingest(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
shard_count: int,
|
||||
wal_receiver_protocol: str,
|
||||
):
|
||||
"""
|
||||
Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper
|
||||
and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case
|
||||
(shard_count=1) to the sharded case indicates the overhead of sharding.
|
||||
"""
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
|
||||
ROW_COUNT = 100_000_000 # about 7 GB of WAL
|
||||
|
||||
@@ -50,7 +55,6 @@ def test_sharded_ingest(
|
||||
# Start the endpoint.
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
|
||||
|
||||
# Ingest data and measure WAL volume and duration.
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -68,4 +72,48 @@ def test_sharded_ingest(
|
||||
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
|
||||
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
|
||||
|
||||
total_ingested = 0
|
||||
total_records_received = 0
|
||||
ingested_by_ps = []
|
||||
for pageserver in env.pageservers:
|
||||
ingested = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_bytes_received_total"
|
||||
)
|
||||
records_received = pageserver.http_client().get_metric_value(
|
||||
"pageserver_wal_ingest_records_received_total"
|
||||
)
|
||||
|
||||
if ingested is None:
|
||||
ingested = 0
|
||||
|
||||
if records_received is None:
|
||||
records_received = 0
|
||||
|
||||
ingested_by_ps.append(
|
||||
(
|
||||
pageserver.id,
|
||||
{
|
||||
"ingested": ingested,
|
||||
"records_received": records_received,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
total_ingested += int(ingested)
|
||||
total_records_received += int(records_received)
|
||||
|
||||
total_ingested_mb = total_ingested / (1024 * 1024)
|
||||
zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record(
|
||||
"records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
|
||||
ingested_by_ps.sort(key=lambda x: x[0])
|
||||
for _, stats in ingested_by_ps:
|
||||
for k in stats:
|
||||
if k != "records_received":
|
||||
stats[k] /= 1024**2
|
||||
|
||||
log.info(f"WAL ingested by each pageserver {ingested_by_ps}")
|
||||
|
||||
assert tenant_get_shards(env, tenant_id) == shards, "shards moved"
|
||||
|
||||
@@ -27,7 +27,8 @@ AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
|
||||
|
||||
@skip_in_debug_build("only run with release build")
|
||||
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str):
|
||||
"""
|
||||
This is a smoke test that compaction kicks in. The workload repeatedly churns
|
||||
a small number of rows and manually instructs the pageserver to run compaction
|
||||
@@ -38,8 +39,8 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Effectively disable the page cache to rely only on image layers
|
||||
# to shorten reads.
|
||||
neon_env_builder.pageserver_config_override = """
|
||||
page_cache_size=10
|
||||
neon_env_builder.pageserver_config_override = f"""
|
||||
page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}'
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
|
||||
|
||||
@@ -19,7 +19,14 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"wal_record_crossing_segment_followed_by_small_one",
|
||||
],
|
||||
)
|
||||
def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_crafted_wal_end(
|
||||
neon_env_builder: NeonEnvBuilder, wal_type: str, wal_receiver_protocol: str
|
||||
):
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_branch("test_crafted_wal_end")
|
||||
env.pageserver.allowed_errors.extend(
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_content
|
||||
|
||||
|
||||
# Test subtransactions
|
||||
@@ -9,8 +10,13 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
# maintained in the pageserver, so subtransactions are not very exciting for
|
||||
# Neon. They are included in the commit record though and updated in the
|
||||
# CLOG.
|
||||
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol):
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
|
||||
@@ -622,8 +622,12 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
# Test (injected) failure during WAL segment init.
|
||||
# https://github.com/neondatabase/neon/issues/6401
|
||||
# https://github.com/neondatabase/neon/issues/6402
|
||||
def test_segment_init_failure(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"])
|
||||
def test_segment_init_failure(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"wal_receiver_protocol = '{wal_receiver_protocol}'"
|
||||
)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_segment_init_failure(env))
|
||||
|
||||
Reference in New Issue
Block a user