diff --git a/Cargo.lock b/Cargo.lock index 98d2e0864a..c1a14210de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4133,7 +4133,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "bytes", "fallible-iterator", @@ -4146,7 +4146,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "base64 0.20.0", "byteorder", @@ -4165,7 +4165,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "bytes", "fallible-iterator", @@ -5364,6 +5364,7 @@ dependencies = [ "itertools 0.10.5", "metrics", "once_cell", + "pageserver_api", "parking_lot 0.12.1", "postgres", "postgres-protocol", @@ -5395,6 +5396,7 @@ dependencies = [ "tracing-subscriber", "url", "utils", + "wal_decoder", "walproposer", "workspace_hack", ] @@ -6466,7 +6468,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#a130197713830a0ea0004b539b1f51a66b4c3e18" +source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#2a2a7c56930dd5ad60676ce6da92e1cbe6fb3ef5" dependencies = [ "async-trait", "byteorder", @@ -7021,6 +7023,7 @@ dependencies = [ "serde_assert", "serde_json", "serde_path_to_error", + "serde_with", "signal-hook", "strum", "strum_macros", diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 7666728427..0abca5cdc2 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -18,7 +18,7 @@ use std::{ str::FromStr, time::Duration, }; -use utils::logging::LogFormat; +use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol}; use crate::models::ImageCompressionAlgorithm; use crate::models::LsnLease; @@ -120,6 +120,7 @@ pub struct ConfigToml { pub no_sync: Option, #[serde(with = "humantime_serde")] pub server_side_batch_timeout: Option, + pub wal_receiver_protocol: PostgresClientProtocol, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -330,6 +331,9 @@ pub mod defaults { pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None; + + pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol = + utils::postgres_client::PostgresClientProtocol::Vanilla; } impl Default for ConfigToml { @@ -418,6 +422,7 @@ impl Default for ConfigToml { .map(|duration| humantime::parse_duration(duration).unwrap()), tenant_config: TenantConfigToml::default(), no_sync: None, + wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, } } } diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 6c40968496..b7871ab01f 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -562,6 +562,9 @@ pub enum BeMessage<'a> { options: &'a [&'a str], }, KeepAlive(WalSndKeepAlive), + /// Batch of interpreted, shard filtered WAL records, + /// ready for the pageserver to ingest + InterpretedWalRecords(InterpretedWalRecordsBody<'a>), } /// Common shorthands. @@ -672,6 +675,25 @@ pub struct WalSndKeepAlive { pub request_reply: bool, } +/// Batch of interpreted WAL records used in the interpreted +/// safekeeper to pageserver protocol. +/// +/// Note that the pageserver uses the RawInterpretedWalRecordsBody +/// counterpart of this from the neondatabase/rust-postgres repo. +/// If you're changing this struct, you likely need to change its +/// twin as well. +#[derive(Debug)] +pub struct InterpretedWalRecordsBody<'a> { + /// End of raw WAL in [`Self::data`] + pub streaming_lsn: u64, + /// Current end of WAL on the server + pub commit_lsn: u64, + /// Start LSN of the next record in PG WAL. + /// Is 0 if the portion of PG WAL did not contain any records. + pub next_record_lsn: u64, + pub data: &'a [u8], +} + pub static HELLO_WORLD_ROW: BeMessage = BeMessage::DataRow(&[Some(b"hello world")]); // single text column @@ -996,6 +1018,20 @@ impl BeMessage<'_> { Ok(()) })? } + + BeMessage::InterpretedWalRecords(rec) => { + // We use the COPY_DATA_TAG for our custom message + // since this tag is interpreted as raw bytes. + buf.put_u8(b'd'); + write_body(buf, |buf| { + buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol + // dependency + buf.put_u64(rec.streaming_lsn); + buf.put_u64(rec.commit_lsn); + buf.put_u64(rec.next_record_lsn); + buf.put_slice(rec.data); + }); + } } Ok(()) } diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 4aad0aee2c..f440b81d8f 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -33,6 +33,7 @@ pprof.workspace = true regex.workspace = true routerify.workspace = true serde.workspace = true +serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true diff --git a/libs/utils/src/postgres_client.rs b/libs/utils/src/postgres_client.rs index dba74f5b0b..3073bbde4c 100644 --- a/libs/utils/src/postgres_client.rs +++ b/libs/utils/src/postgres_client.rs @@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig}; use crate::id::TenantTimelineId; +/// Postgres client protocol types +#[derive( + Copy, + Clone, + PartialEq, + Eq, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, + Debug, +)] +#[strum(serialize_all = "kebab-case")] +#[repr(u8)] +pub enum PostgresClientProtocol { + /// Usual Postgres replication protocol + Vanilla, + /// Custom shard-aware protocol that replicates interpreted records. + /// Used to send wal from safekeeper to pageserver. + Interpreted, +} + +impl TryFrom for PostgresClientProtocol { + type Error = u8; + + fn try_from(value: u8) -> Result { + Ok(match value { + v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla, + v if v == (PostgresClientProtocol::Interpreted as u8) => { + PostgresClientProtocol::Interpreted + } + x => return Err(x), + }) + } +} + +pub struct ConnectionConfigArgs<'a> { + pub protocol: PostgresClientProtocol, + + pub ttid: TenantTimelineId, + pub shard_number: Option, + pub shard_count: Option, + pub shard_stripe_size: Option, + + pub listen_pg_addr_str: &'a str, + + pub auth_token: Option<&'a str>, + pub availability_zone: Option<&'a str>, +} + +impl<'a> ConnectionConfigArgs<'a> { + fn options(&'a self) -> Vec { + let mut options = vec![ + "-c".to_owned(), + format!("timeline_id={}", self.ttid.timeline_id), + format!("tenant_id={}", self.ttid.tenant_id), + format!("protocol={}", self.protocol as u8), + ]; + + if self.shard_number.is_some() { + assert!(self.shard_count.is_some()); + assert!(self.shard_stripe_size.is_some()); + + options.push(format!("shard_count={}", self.shard_count.unwrap())); + options.push(format!("shard_number={}", self.shard_number.unwrap())); + options.push(format!( + "shard_stripe_size={}", + self.shard_stripe_size.unwrap() + )); + } + + options + } +} + /// Create client config for fetching WAL from safekeeper on particular timeline. /// listen_pg_addr_str is in form host:\[port\]. pub fn wal_stream_connection_config( - TenantTimelineId { - tenant_id, - timeline_id, - }: TenantTimelineId, - listen_pg_addr_str: &str, - auth_token: Option<&str>, - availability_zone: Option<&str>, + args: ConnectionConfigArgs, ) -> anyhow::Result { let (host, port) = - parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; + parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?; let port = port.unwrap_or(5432); let mut connstr = PgConnectionConfig::new_host_port(host, port) - .extend_options([ - "-c".to_owned(), - format!("timeline_id={}", timeline_id), - format!("tenant_id={}", tenant_id), - ]) - .set_password(auth_token.map(|s| s.to_owned())); + .extend_options(args.options()) + .set_password(args.auth_token.map(|s| s.to_owned())); - if let Some(availability_zone) = availability_zone { + if let Some(availability_zone) = args.availability_zone { connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]); } diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index c69f8c869a..7ac425cb5f 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -65,6 +65,18 @@ pub struct InterpretedWalRecord { pub xid: TransactionId, } +impl InterpretedWalRecord { + /// Checks if the WAL record is empty + /// + /// An empty interpreted WAL record has no data or metadata and does not have to be sent to the + /// pageserver. + pub fn is_empty(&self) -> bool { + self.batch.is_empty() + && self.metadata_record.is_none() + && matches!(self.flush_uncommitted, FlushUncommittedRecords::No) + } +} + /// The interpreted part of the Postgres WAL record which requires metadata /// writes to the underlying storage engine. #[derive(Serialize, Deserialize)] diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 9c0708ebbe..41294da7a0 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -496,11 +496,16 @@ impl SerializedValueBatch { } } - /// Checks if the batch is empty - /// - /// A batch is empty when it contains no serialized values. - /// Note that it may still contain observed values. + /// Checks if the batch contains any serialized or observed values pub fn is_empty(&self) -> bool { + !self.has_data() && self.metadata.is_empty() + } + + /// Checks if the batch contains data + /// + /// Note that if this returns false, it may still contain observed values or + /// a metadata record. + pub fn has_data(&self) -> bool { let empty = self.raw.is_empty(); if cfg!(debug_assertions) && empty { @@ -510,7 +515,7 @@ impl SerializedValueBatch { .all(|meta| matches!(meta, ValueMeta::Observed(_)))); } - empty + !empty } /// Returns the number of values serialized in the batch diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 033a9a4619..a8c2c2e992 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> { // after setting up logging, log the effective IO engine choice and read path implementations info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode"); + info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol"); // The tenants directory contains all the pageserver local disk state. // Create if not exists and make sure all the contents are durable before proceeding. diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 59ea6fb941..2cf237e72b 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig}; use std::env; use storage_broker::Uri; use utils::logging::SecretString; +use utils::postgres_client::PostgresClientProtocol; use once_cell::sync::OnceCell; use reqwest::Url; @@ -190,6 +191,8 @@ pub struct PageServerConf { /// Maximum amount of time for which a get page request request /// might be held up for request merging. pub server_side_batch_timeout: Option, + + pub wal_receiver_protocol: PostgresClientProtocol, } /// Token for authentication to safekeepers @@ -350,6 +353,7 @@ impl PageServerConf { server_side_batch_timeout, tenant_config, no_sync, + wal_receiver_protocol, } = config_toml; let mut conf = PageServerConf { @@ -393,6 +397,7 @@ impl PageServerConf { import_pgdata_upcall_api, import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from), import_pgdata_aws_endpoint_url, + wal_receiver_protocol, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f4f184be5a..c491bfe650 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1229,10 +1229,9 @@ impl<'a> DatadirModification<'a> { } pub(crate) fn has_dirty_data(&self) -> bool { - !self - .pending_data_batch + self.pending_data_batch .as_ref() - .map_or(true, |b| b.is_empty()) + .map_or(false, |b| b.has_data()) } /// Set the current lsn @@ -1408,7 +1407,7 @@ impl<'a> DatadirModification<'a> { Some(pending_batch) => { pending_batch.extend(batch); } - None if !batch.is_empty() => { + None if batch.has_data() => { self.pending_data_batch = Some(batch); } None => { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4881be33a6..f6a06e73a7 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2470,6 +2470,7 @@ impl Timeline { *guard = Some(WalReceiver::start( Arc::clone(self), WalReceiverConf { + protocol: self.conf.wal_receiver_protocol, wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag, @@ -5896,7 +5897,7 @@ impl<'a> TimelineWriter<'a> { batch: SerializedValueBatch, ctx: &RequestContext, ) -> anyhow::Result<()> { - if batch.is_empty() { + if !batch.has_data() { return Ok(()); } diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 4a3a5c621b..f831f5e48a 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::postgres_client::PostgresClientProtocol; use self::connection_manager::ConnectionManagerStatus; @@ -45,6 +46,7 @@ use super::Timeline; #[derive(Clone)] pub struct WalReceiverConf { + pub protocol: PostgresClientProtocol, /// The timeout on the connection to safekeeper for WAL streaming. pub wal_connect_timeout: Duration, /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index de50f217d8..7a64703a30 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig; use utils::backoff::{ exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; -use utils::postgres_client::wal_stream_connection_config; +use utils::postgres_client::{ + wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol, +}; use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, @@ -984,15 +986,33 @@ impl ConnectionManagerState { if info.safekeeper_connstr.is_empty() { return None; // no connection string, ignore sk } - match wal_stream_connection_config( - self.id, - info.safekeeper_connstr.as_ref(), - match &self.conf.auth_token { - None => None, - Some(x) => Some(x), + + let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol { + PostgresClientProtocol::Vanilla => { + (None, None, None) }, - self.conf.availability_zone.as_deref(), - ) { + PostgresClientProtocol::Interpreted => { + let shard_identity = self.timeline.get_shard_identity(); + ( + Some(shard_identity.number.0), + Some(shard_identity.count.0), + Some(shard_identity.stripe_size.0), + ) + } + }; + + let connection_conf_args = ConnectionConfigArgs { + protocol: self.conf.protocol, + ttid: self.id, + shard_number, + shard_count, + shard_stripe_size, + listen_pg_addr_str: info.safekeeper_connstr.as_ref(), + auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), + availability_zone: self.conf.availability_zone.as_deref() + }; + + match wal_stream_connection_config(connection_conf_args) { Ok(connstr) => Some((*sk_id, info, connstr)), Err(e) => { error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id); @@ -1096,6 +1116,7 @@ impl ReconnectReason { mod tests { use super::*; use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; + use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL; use url::Host; fn dummy_broker_sk_timeline( @@ -1532,6 +1553,7 @@ mod tests { timeline, cancel: CancellationToken::new(), conf: WalReceiverConf { + protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, wal_connect_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1), max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 6ac6920d47..1a0e66ceb3 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -36,7 +36,7 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use utils::{id::NodeId, lsn::Lsn}; +use utils::{bin_ser::BeSer, id::NodeId, lsn::Lsn}; use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError}; /// Status of the connection. @@ -291,6 +291,15 @@ pub(super) async fn handle_walreceiver_connection( connection_status.latest_connection_update = now; connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end())); } + ReplicationMessage::RawInterpretedWalRecords(raw) => { + connection_status.latest_connection_update = now; + if !raw.data().is_empty() { + connection_status.latest_wal_update = now; + } + + connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn())); + connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn())); + } &_ => {} }; if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) { @@ -298,7 +307,130 @@ pub(super) async fn handle_walreceiver_connection( return Ok(()); } + async fn commit( + modification: &mut DatadirModification<'_>, + uncommitted: &mut u64, + filtered: &mut u64, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + WAL_INGEST + .records_committed + .inc_by(*uncommitted - *filtered); + modification.commit(ctx).await?; + *uncommitted = 0; + *filtered = 0; + Ok(()) + } + let status_update = match replication_message { + ReplicationMessage::RawInterpretedWalRecords(raw) => { + WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64); + + let mut uncommitted_records = 0; + let mut filtered_records = 0; + + // This is the end LSN of the raw WAL from which the records + // were interpreted. + let streaming_lsn = Lsn::from(raw.streaming_lsn()); + tracing::debug!( + "Received WAL up to {streaming_lsn} with next_record_lsn={}", + Lsn(raw.next_record_lsn().unwrap_or(0)) + ); + + let records = Vec::::des(raw.data()).with_context(|| { + anyhow::anyhow!( + "Failed to deserialize interpreted records ending at LSN {streaming_lsn}" + ) + })?; + + // We start the modification at 0 because each interpreted record + // advances it to its end LSN. 0 is just an initialization placeholder. + let mut modification = timeline.begin_modification(Lsn(0)); + + for interpreted in records { + if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) + && uncommitted_records > 0 + { + commit( + &mut modification, + &mut uncommitted_records, + &mut filtered_records, + &ctx, + ) + .await?; + } + + let next_record_lsn = interpreted.next_record_lsn; + let ingested = walingest + .ingest_record(interpreted, &mut modification, &ctx) + .await + .with_context(|| format!("could not ingest record at {next_record_lsn}"))?; + + if !ingested { + tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); + WAL_INGEST.records_filtered.inc(); + filtered_records += 1; + } + + uncommitted_records += 1; + + // FIXME: this cannot be made pausable_failpoint without fixing the + // failpoint library; in tests, the added amount of debugging will cause us + // to timeout the tests. + fail_point!("walreceiver-after-ingest"); + + // Commit every ingest_batch_size records. Even if we filtered out + // all records, we still need to call commit to advance the LSN. + if uncommitted_records >= ingest_batch_size + || modification.approx_pending_bytes() + > DatadirModification::MAX_PENDING_BYTES + { + commit( + &mut modification, + &mut uncommitted_records, + &mut filtered_records, + &ctx, + ) + .await?; + } + } + + // Records might have been filtered out on the safekeeper side, but we still + // need to advance last record LSN on all shards. If we've not ingested the latest + // record, then set the LSN of the modification past it. This way all shards + // advance their last record LSN at the same time. + let needs_last_record_lsn_advance = match raw.next_record_lsn().map(Lsn::from) { + Some(lsn) if lsn > modification.get_lsn() => { + modification.set_lsn(lsn).unwrap(); + true + } + _ => false, + }; + + if uncommitted_records > 0 || needs_last_record_lsn_advance { + // Commit any uncommitted records + commit( + &mut modification, + &mut uncommitted_records, + &mut filtered_records, + &ctx, + ) + .await?; + } + + if !caught_up && streaming_lsn >= end_of_wal { + info!("caught up at LSN {streaming_lsn}"); + caught_up = true; + } + + tracing::debug!( + "Ingested WAL up to {streaming_lsn}. Last record LSN is {}", + timeline.get_last_record_lsn() + ); + + Some(streaming_lsn) + } + ReplicationMessage::XLogData(xlog_data) => { // Pass the WAL data to the decoder, and see if we can decode // more records as a result. @@ -316,21 +448,6 @@ pub(super) async fn handle_walreceiver_connection( let mut uncommitted_records = 0; let mut filtered_records = 0; - async fn commit( - modification: &mut DatadirModification<'_>, - uncommitted: &mut u64, - filtered: &mut u64, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - WAL_INGEST - .records_committed - .inc_by(*uncommitted - *filtered); - modification.commit(ctx).await?; - *uncommitted = 0; - *filtered = 0; - Ok(()) - } - while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index ab77b63d54..635a9222e1 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -28,6 +28,7 @@ hyper0.workspace = true futures.workspace = true once_cell.workspace = true parking_lot.workspace = true +pageserver_api.workspace = true postgres.workspace = true postgres-protocol.workspace = true pprof.workspace = true @@ -58,6 +59,7 @@ sd-notify.workspace = true storage_broker.workspace = true tokio-stream.workspace = true utils.workspace = true +wal_decoder.workspace = true workspace_hack.workspace = true diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 3f00b69cde..cec7c3c7ee 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -2,11 +2,15 @@ //! protocol commands. use anyhow::Context; +use pageserver_api::models::ShardParameters; +use pageserver_api::shard::{ShardIdentity, ShardStripeSize}; use std::future::Future; use std::str::{self, FromStr}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, info, info_span, Instrument}; +use utils::postgres_client::PostgresClientProtocol; +use utils::shard::{ShardCount, ShardNumber}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; @@ -35,6 +39,8 @@ pub struct SafekeeperPostgresHandler { pub tenant_id: Option, pub timeline_id: Option, pub ttid: TenantTimelineId, + pub shard: Option, + pub protocol: Option, /// Unique connection id is logged in spans for observability. pub conn_id: ConnectionId, /// Auth scope allowed on the connections and public key used to check auth tokens. None if auth is not configured. @@ -107,11 +113,28 @@ impl postgres_backend::Handler ) -> Result<(), QueryError> { if let FeStartupPacket::StartupMessage { params, .. } = sm { if let Some(options) = params.options_raw() { + let mut shard_count: Option = None; + let mut shard_number: Option = None; + let mut shard_stripe_size: Option = None; + for opt in options { // FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy, // remove these after the PR gets deployed: // https://github.com/neondatabase/neon/pull/2433#discussion_r970005064 match opt.split_once('=') { + Some(("protocol", value)) => { + let raw_value = value + .parse::() + .with_context(|| format!("Failed to parse {value} as protocol"))?; + + self.protocol = Some( + PostgresClientProtocol::try_from(raw_value).map_err(|_| { + QueryError::Other(anyhow::anyhow!( + "Unexpected client protocol type: {raw_value}" + )) + })?, + ); + } Some(("ztenantid", value)) | Some(("tenant_id", value)) => { self.tenant_id = Some(value.parse().with_context(|| { format!("Failed to parse {value} as tenant id") @@ -127,9 +150,54 @@ impl postgres_backend::Handler metrics.set_client_az(client_az) } } + Some(("shard_count", value)) => { + shard_count = Some(value.parse::().with_context(|| { + format!("Failed to parse {value} as shard count") + })?); + } + Some(("shard_number", value)) => { + shard_number = Some(value.parse::().with_context(|| { + format!("Failed to parse {value} as shard number") + })?); + } + Some(("shard_stripe_size", value)) => { + shard_stripe_size = Some(value.parse::().with_context(|| { + format!("Failed to parse {value} as shard stripe size") + })?); + } _ => continue, } } + + match self.protocol() { + PostgresClientProtocol::Vanilla => { + if shard_count.is_some() + || shard_number.is_some() + || shard_stripe_size.is_some() + { + return Err(QueryError::Other(anyhow::anyhow!( + "Shard params specified for vanilla protocol" + ))); + } + } + PostgresClientProtocol::Interpreted => { + match (shard_count, shard_number, shard_stripe_size) { + (Some(count), Some(number), Some(stripe_size)) => { + let params = ShardParameters { + count: ShardCount(count), + stripe_size: ShardStripeSize(stripe_size), + }; + self.shard = + Some(ShardIdentity::from_params(ShardNumber(number), ¶ms)); + } + _ => { + return Err(QueryError::Other(anyhow::anyhow!( + "Shard params were not specified" + ))); + } + } + } + } } if let Some(app_name) = params.get("application_name") { @@ -150,6 +218,11 @@ impl postgres_backend::Handler tracing::field::debug(self.appname.clone()), ); + if let Some(shard) = self.shard.as_ref() { + tracing::Span::current() + .record("shard", tracing::field::display(shard.shard_slug())); + } + Ok(()) } else { Err(QueryError::Other(anyhow::anyhow!( @@ -258,6 +331,8 @@ impl SafekeeperPostgresHandler { tenant_id: None, timeline_id: None, ttid: TenantTimelineId::empty(), + shard: None, + protocol: None, conn_id, claims: None, auth, @@ -265,6 +340,10 @@ impl SafekeeperPostgresHandler { } } + pub fn protocol(&self) -> PostgresClientProtocol { + self.protocol.unwrap_or(PostgresClientProtocol::Vanilla) + } + // when accessing management api supply None as an argument // when using to authorize tenant pass corresponding tenant id fn check_permission(&self, tenant_id: Option) -> Result<(), QueryError> { diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 6d68b6b59b..abe6e00a66 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -29,6 +29,7 @@ pub mod receive_wal; pub mod recovery; pub mod remove_wal; pub mod safekeeper; +pub mod send_interpreted_wal; pub mod send_wal; pub mod state; pub mod timeline; @@ -38,6 +39,7 @@ pub mod timeline_manager; pub mod timelines_set; pub mod wal_backup; pub mod wal_backup_partial; +pub mod wal_reader_stream; pub mod wal_service; pub mod wal_storage; diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 9c4149d8f1..7b87166aa0 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -17,6 +17,7 @@ use tokio::{ use tokio_postgres::replication::ReplicationStream; use tokio_postgres::types::PgLsn; use tracing::*; +use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol}; use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config}; use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; @@ -325,7 +326,17 @@ async fn recovery_stream( conf: &SafeKeeperConf, ) -> anyhow::Result { // TODO: pass auth token - let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?; + let connection_conf_args = ConnectionConfigArgs { + protocol: PostgresClientProtocol::Vanilla, + ttid: tli.ttid, + shard_number: None, + shard_count: None, + shard_stripe_size: None, + listen_pg_addr_str: &donor.pg_connstr, + auth_token: None, + availability_zone: None, + }; + let cfg = wal_stream_connection_config(connection_conf_args)?; let mut cfg = cfg.to_tokio_postgres_config(); // It will make safekeeper give out not committed WAL (up to flush_lsn). cfg.application_name(&format!("safekeeper_{}", conf.my_id)); diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs new file mode 100644 index 0000000000..cf0ee276e9 --- /dev/null +++ b/safekeeper/src/send_interpreted_wal.rs @@ -0,0 +1,121 @@ +use std::time::Duration; + +use anyhow::Context; +use futures::StreamExt; +use pageserver_api::shard::ShardIdentity; +use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend}; +use postgres_ffi::MAX_SEND_SIZE; +use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder}; +use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::time::MissedTickBehavior; +use utils::bin_ser::BeSer; +use utils::lsn::Lsn; +use wal_decoder::models::InterpretedWalRecord; + +use crate::send_wal::EndWatchView; +use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder}; + +/// Shard-aware interpreted record sender. +/// This is used for sending WAL to the pageserver. Said WAL +/// is pre-interpreted and filtered for the shard. +pub(crate) struct InterpretedWalSender<'a, IO> { + pub(crate) pgb: &'a mut PostgresBackend, + pub(crate) wal_stream_builder: WalReaderStreamBuilder, + pub(crate) end_watch_view: EndWatchView, + pub(crate) shard: ShardIdentity, + pub(crate) pg_version: u32, + pub(crate) appname: Option, +} + +impl InterpretedWalSender<'_, IO> { + /// Send interpreted WAL to a receiver. + /// Stops when an error occurs or the receiver is caught up and there's no active compute. + /// + /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ? + /// convenience. + pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> { + let mut wal_position = self.wal_stream_builder.start_pos(); + let mut wal_decoder = + WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version); + + let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?; + let mut stream = std::pin::pin!(stream); + + let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1)); + keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip); + keepalive_ticker.reset(); + + loop { + tokio::select! { + // Get some WAL from the stream and then: decode, interpret and send it + wal = stream.next() => { + let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal { + Some(some) => some?, + None => { break; } + }; + + wal_position = wal_end_lsn; + wal_decoder.feed_bytes(&wal); + + let mut records = Vec::new(); + let mut max_next_record_lsn = None; + while let Some((next_record_lsn, recdata)) = wal_decoder + .poll_decode() + .with_context(|| "Failed to decode WAL")? + { + assert!(next_record_lsn.is_aligned()); + max_next_record_lsn = Some(next_record_lsn); + + // Deserialize and interpret WAL record + let interpreted = InterpretedWalRecord::from_bytes_filtered( + recdata, + &self.shard, + next_record_lsn, + self.pg_version, + ) + .with_context(|| "Failed to interpret WAL")?; + + if !interpreted.is_empty() { + records.push(interpreted); + } + } + + let mut buf = Vec::new(); + records + .ser_into(&mut buf) + .with_context(|| "Failed to serialize interpreted WAL")?; + + // Reset the keep alive ticker since we are sending something + // over the wire now. + keepalive_ticker.reset(); + + self.pgb + .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody { + streaming_lsn: wal_end_lsn.0, + commit_lsn: available_wal_end_lsn.0, + next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0, + data: buf.as_slice(), + })).await?; + } + + // Send a periodic keep alive when the connection has been idle for a while. + _ = keepalive_ticker.tick() => { + self.pgb + .write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + wal_end: self.end_watch_view.get().0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .await?; + } + } + } + + // The loop above ends when the receiver is caught up and there's no more WAL to send. + Err(CopyStreamHandlerEnd::ServerInitiated(format!( + "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", + self.appname, wal_position, + ))) + } +} diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index aa65ec851b..1acfcad418 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -5,12 +5,15 @@ use crate::handler::SafekeeperPostgresHandler; use crate::metrics::RECEIVED_PS_FEEDBACKS; use crate::receive_wal::WalReceivers; use crate::safekeeper::{Term, TermLsn}; +use crate::send_interpreted_wal::InterpretedWalSender; use crate::timeline::WalResidentTimeline; +use crate::wal_reader_stream::WalReaderStreamBuilder; use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use crate::GlobalTimelines; use anyhow::{bail, Context as AnyhowContext}; use bytes::Bytes; +use futures::future::Either; use parking_lot::Mutex; use postgres_backend::PostgresBackend; use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError}; @@ -22,6 +25,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use utils::failpoint_support; use utils::id::TenantTimelineId; use utils::pageserver_feedback::PageserverFeedback; +use utils::postgres_client::PostgresClientProtocol; use std::cmp::{max, min}; use std::net::SocketAddr; @@ -226,7 +230,7 @@ impl WalSenders { /// Get remote_consistent_lsn reported by the pageserver. Returns None if /// client is not pageserver. - fn get_ws_remote_consistent_lsn(self: &Arc, id: WalSenderId) -> Option { + pub fn get_ws_remote_consistent_lsn(self: &Arc, id: WalSenderId) -> Option { let shared = self.mutex.lock(); let slot = shared.get_slot(id); match slot.feedback { @@ -370,6 +374,16 @@ pub struct WalSenderGuard { walsenders: Arc, } +impl WalSenderGuard { + pub fn id(&self) -> WalSenderId { + self.id + } + + pub fn walsenders(&self) -> &Arc { + &self.walsenders + } +} + impl Drop for WalSenderGuard { fn drop(&mut self) { self.walsenders.unregister(self.id); @@ -440,11 +454,12 @@ impl SafekeeperPostgresHandler { } info!( - "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}", + "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={}", start_pos, end_pos, matches!(end_watch, EndWatch::Flush(_)), - appname + appname, + self.protocol(), ); // switch to copy @@ -456,21 +471,51 @@ impl SafekeeperPostgresHandler { // not synchronized with sends, so this avoids deadlocks. let reader = pgb.split().context("START_REPLICATION split")?; + let send_fut = match self.protocol() { + PostgresClientProtocol::Vanilla => { + let sender = WalSender { + pgb, + // should succeed since we're already holding another guard + tli: tli.wal_residence_guard().await?, + appname, + start_pos, + end_pos, + term, + end_watch, + ws_guard: ws_guard.clone(), + wal_reader, + send_buf: vec![0u8; MAX_SEND_SIZE], + }; + + Either::Left(sender.run()) + } + PostgresClientProtocol::Interpreted => { + let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000; + let end_watch_view = end_watch.view(); + let wal_stream_builder = WalReaderStreamBuilder { + tli: tli.wal_residence_guard().await?, + start_pos, + end_pos, + term, + end_watch, + wal_sender_guard: ws_guard.clone(), + }; + + let sender = InterpretedWalSender { + pgb, + wal_stream_builder, + end_watch_view, + shard: self.shard.unwrap(), + pg_version, + appname, + }; + + Either::Right(sender.run()) + } + }; + let tli_cancel = tli.cancel.clone(); - let mut sender = WalSender { - pgb, - // should succeed since we're already holding another guard - tli: tli.wal_residence_guard().await?, - appname, - start_pos, - end_pos, - term, - end_watch, - ws_guard: ws_guard.clone(), - wal_reader, - send_buf: vec![0u8; MAX_SEND_SIZE], - }; let mut reply_reader = ReplyReader { reader, ws_guard: ws_guard.clone(), @@ -479,7 +524,7 @@ impl SafekeeperPostgresHandler { let res = tokio::select! { // todo: add read|write .context to these errors - r = sender.run() => r, + r = send_fut => r, r = reply_reader.run() => r, _ = tli_cancel.cancelled() => { return Err(CopyStreamHandlerEnd::Cancelled); @@ -504,16 +549,22 @@ impl SafekeeperPostgresHandler { } } +/// TODO(vlad): maybe lift this instead /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the /// given term (recovery by walproposer or peer safekeeper). -enum EndWatch { +#[derive(Clone)] +pub(crate) enum EndWatch { Commit(Receiver), Flush(Receiver), } impl EndWatch { + pub(crate) fn view(&self) -> EndWatchView { + EndWatchView(self.clone()) + } + /// Get current end of WAL. - fn get(&self) -> Lsn { + pub(crate) fn get(&self) -> Lsn { match self { EndWatch::Commit(r) => *r.borrow(), EndWatch::Flush(r) => r.borrow().lsn, @@ -521,15 +572,44 @@ impl EndWatch { } /// Wait for the update. - async fn changed(&mut self) -> anyhow::Result<()> { + pub(crate) async fn changed(&mut self) -> anyhow::Result<()> { match self { EndWatch::Commit(r) => r.changed().await?, EndWatch::Flush(r) => r.changed().await?, } Ok(()) } + + pub(crate) async fn wait_for_lsn( + &mut self, + lsn: Lsn, + client_term: Option, + ) -> anyhow::Result { + loop { + let end_pos = self.get(); + if end_pos > lsn { + return Ok(end_pos); + } + if let EndWatch::Flush(rx) = &self { + let curr_term = rx.borrow().term; + if let Some(client_term) = client_term { + if curr_term != client_term { + bail!("term changed: requested {}, now {}", client_term, curr_term); + } + } + } + self.changed().await?; + } + } } +pub(crate) struct EndWatchView(EndWatch); + +impl EndWatchView { + pub(crate) fn get(&self) -> Lsn { + self.0.get() + } +} /// A half driving sending WAL. struct WalSender<'a, IO> { pgb: &'a mut PostgresBackend, @@ -566,7 +646,7 @@ impl WalSender<'_, IO> { /// /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ? /// convenience. - async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> { + async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> { loop { // Wait for the next portion if it is not there yet, or just // update our end of WAL available for sending value, we diff --git a/safekeeper/src/wal_reader_stream.rs b/safekeeper/src/wal_reader_stream.rs new file mode 100644 index 0000000000..f8c0c502cd --- /dev/null +++ b/safekeeper/src/wal_reader_stream.rs @@ -0,0 +1,149 @@ +use std::sync::Arc; + +use async_stream::try_stream; +use bytes::Bytes; +use futures::Stream; +use postgres_backend::CopyStreamHandlerEnd; +use std::time::Duration; +use tokio::time::timeout; +use utils::lsn::Lsn; + +use crate::{ + safekeeper::Term, + send_wal::{EndWatch, WalSenderGuard}, + timeline::WalResidentTimeline, +}; + +pub(crate) struct WalReaderStreamBuilder { + pub(crate) tli: WalResidentTimeline, + pub(crate) start_pos: Lsn, + pub(crate) end_pos: Lsn, + pub(crate) term: Option, + pub(crate) end_watch: EndWatch, + pub(crate) wal_sender_guard: Arc, +} + +impl WalReaderStreamBuilder { + pub(crate) fn start_pos(&self) -> Lsn { + self.start_pos + } +} + +pub(crate) struct WalBytes { + /// Raw PG WAL + pub(crate) wal: Bytes, + /// Start LSN of [`Self::wal`] + #[allow(dead_code)] + pub(crate) wal_start_lsn: Lsn, + /// End LSN of [`Self::wal`] + pub(crate) wal_end_lsn: Lsn, + /// End LSN of WAL available on the safekeeper. + /// + /// For pagservers this will be commit LSN, + /// while for the compute it will be the flush LSN. + pub(crate) available_wal_end_lsn: Lsn, +} + +impl WalReaderStreamBuilder { + /// Builds a stream of Postgres WAL starting from [`Self::start_pos`]. + /// The stream terminates when the receiver (pageserver) is fully caught up + /// and there's no active computes. + pub(crate) async fn build( + self, + buffer_size: usize, + ) -> anyhow::Result>> { + // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`]. + // We can make the raw WAL sender use this stream too and remove the duplication. + let Self { + tli, + mut start_pos, + mut end_pos, + term, + mut end_watch, + wal_sender_guard, + } = self; + let mut wal_reader = tli.get_walreader(start_pos).await?; + let mut buffer = vec![0; buffer_size]; + + const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); + + Ok(try_stream! { + loop { + let have_something_to_send = end_pos > start_pos; + + if !have_something_to_send { + // wait for lsn + let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await; + match res { + Ok(ok) => { + end_pos = ok?; + }, + Err(_) => { + if let EndWatch::Commit(_) = end_watch { + if let Some(remote_consistent_lsn) = wal_sender_guard + .walsenders() + .get_ws_remote_consistent_lsn(wal_sender_guard.id()) + { + if tli.should_walsender_stop(remote_consistent_lsn).await { + // Stop streaming if the receivers are caught up and + // there's no active compute. This causes the loop in + // [`crate::send_interpreted_wal::InterpretedWalSender::run`] + // to exit and terminate the WAL stream. + return; + } + } + } + + continue; + } + } + } + + + assert!( + end_pos > start_pos, + "nothing to send after waiting for WAL" + ); + + // try to send as much as available, capped by the buffer size + let mut chunk_end_pos = start_pos + buffer_size as u64; + // if we went behind available WAL, back off + if chunk_end_pos >= end_pos { + chunk_end_pos = end_pos; + } else { + // If sending not up to end pos, round down to page boundary to + // avoid breaking WAL record not at page boundary, as protocol + // demands. See walsender.c (XLogSendPhysical). + chunk_end_pos = chunk_end_pos + .checked_sub(chunk_end_pos.block_offset()) + .unwrap(); + } + let send_size = (chunk_end_pos.0 - start_pos.0) as usize; + let buffer = &mut buffer[..send_size]; + let send_size: usize; + { + // If uncommitted part is being pulled, check that the term is + // still the expected one. + let _term_guard = if let Some(t) = term { + Some(tli.acquire_term(t).await?) + } else { + None + }; + // Read WAL into buffer. send_size can be additionally capped to + // segment boundary here. + send_size = wal_reader.read(buffer).await? + }; + let wal = Bytes::copy_from_slice(&buffer[..send_size]); + + yield WalBytes { + wal, + wal_start_lsn: start_pos, + wal_end_lsn: start_pos + send_size as u64, + available_wal_end_lsn: end_pos + }; + + start_pos += send_size as u64; + } + }) + } +} diff --git a/test_runner/performance/test_sharded_ingest.py b/test_runner/performance/test_sharded_ingest.py index 77e8f2cf17..e965aae5a0 100644 --- a/test_runner/performance/test_sharded_ingest.py +++ b/test_runner/performance/test_sharded_ingest.py @@ -15,16 +15,21 @@ from fixtures.neon_fixtures import ( @pytest.mark.timeout(600) @pytest.mark.parametrize("shard_count", [1, 8, 32]) +@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) def test_sharded_ingest( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, shard_count: int, + wal_receiver_protocol: str, ): """ Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper and fanning out to a large number of shards on dedicated Pageservers. Comparing the base case (shard_count=1) to the sharded case indicates the overhead of sharding. """ + neon_env_builder.pageserver_config_override = ( + f"wal_receiver_protocol = '{wal_receiver_protocol}'" + ) ROW_COUNT = 100_000_000 # about 7 GB of WAL @@ -50,7 +55,6 @@ def test_sharded_ingest( # Start the endpoint. endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) - # Ingest data and measure WAL volume and duration. with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -68,4 +72,48 @@ def test_sharded_ingest( wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024)) zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) + total_ingested = 0 + total_records_received = 0 + ingested_by_ps = [] + for pageserver in env.pageservers: + ingested = pageserver.http_client().get_metric_value( + "pageserver_wal_ingest_bytes_received_total" + ) + records_received = pageserver.http_client().get_metric_value( + "pageserver_wal_ingest_records_received_total" + ) + + if ingested is None: + ingested = 0 + + if records_received is None: + records_received = 0 + + ingested_by_ps.append( + ( + pageserver.id, + { + "ingested": ingested, + "records_received": records_received, + }, + ) + ) + + total_ingested += int(ingested) + total_records_received += int(records_received) + + total_ingested_mb = total_ingested / (1024 * 1024) + zenbenchmark.record("wal_ingested", total_ingested_mb, "MB", MetricReport.LOWER_IS_BETTER) + zenbenchmark.record( + "records_received", total_records_received, "records", MetricReport.LOWER_IS_BETTER + ) + + ingested_by_ps.sort(key=lambda x: x[0]) + for _, stats in ingested_by_ps: + for k in stats: + if k != "records_received": + stats[k] /= 1024**2 + + log.info(f"WAL ingested by each pageserver {ingested_by_ps}") + assert tenant_get_shards(env, tenant_id) == shards, "shards moved" diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index f71e05924a..79fd256304 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -27,7 +27,8 @@ AGGRESIVE_COMPACTION_TENANT_CONF = { @skip_in_debug_build("only run with release build") -def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str): """ This is a smoke test that compaction kicks in. The workload repeatedly churns a small number of rows and manually instructs the pageserver to run compaction @@ -38,8 +39,8 @@ def test_pageserver_compaction_smoke(neon_env_builder: NeonEnvBuilder): # Effectively disable the page cache to rely only on image layers # to shorten reads. - neon_env_builder.pageserver_config_override = """ -page_cache_size=10 + neon_env_builder.pageserver_config_override = f""" +page_cache_size=10; wal_receiver_protocol='{wal_receiver_protocol}' """ env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF) diff --git a/test_runner/regress/test_crafted_wal_end.py b/test_runner/regress/test_crafted_wal_end.py index 23c6fa3a5a..70e71d99cd 100644 --- a/test_runner/regress/test_crafted_wal_end.py +++ b/test_runner/regress/test_crafted_wal_end.py @@ -19,7 +19,14 @@ from fixtures.neon_fixtures import NeonEnvBuilder "wal_record_crossing_segment_followed_by_small_one", ], ) -def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str): +@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +def test_crafted_wal_end( + neon_env_builder: NeonEnvBuilder, wal_type: str, wal_receiver_protocol: str +): + neon_env_builder.pageserver_config_override = ( + f"wal_receiver_protocol = '{wal_receiver_protocol}'" + ) + env = neon_env_builder.init_start() env.create_branch("test_crafted_wal_end") env.pageserver.allowed_errors.extend( diff --git a/test_runner/regress/test_subxacts.py b/test_runner/regress/test_subxacts.py index 7a46f0140c..1d86c353be 100644 --- a/test_runner/regress/test_subxacts.py +++ b/test_runner/regress/test_subxacts.py @@ -1,6 +1,7 @@ from __future__ import annotations -from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content +import pytest +from fixtures.neon_fixtures import NeonEnvBuilder, check_restored_datadir_content # Test subtransactions @@ -9,8 +10,13 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content # maintained in the pageserver, so subtransactions are not very exciting for # Neon. They are included in the commit record though and updated in the # CLOG. -def test_subxacts(neon_simple_env: NeonEnv, test_output_dir): - env = neon_simple_env +@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol): + neon_env_builder.pageserver_config_override = ( + f"wal_receiver_protocol = '{wal_receiver_protocol}'" + ) + + env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") pg_conn = endpoint.connect() diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 18408b0619..094b10b576 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -622,8 +622,12 @@ async def run_segment_init_failure(env: NeonEnv): # Test (injected) failure during WAL segment init. # https://github.com/neondatabase/neon/issues/6401 # https://github.com/neondatabase/neon/issues/6402 -def test_segment_init_failure(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("wal_receiver_protocol", ["vanilla", "interpreted"]) +def test_segment_init_failure(neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: str): neon_env_builder.num_safekeepers = 1 + neon_env_builder.pageserver_config_override = ( + f"wal_receiver_protocol = '{wal_receiver_protocol}'" + ) env = neon_env_builder.init_start() asyncio.run(run_segment_init_failure(env))