diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index f48c1febb5..e71fc14a35 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -18,7 +18,7 @@ use std::{ str::FromStr, time::Duration, }; -use utils::logging::LogFormat; +use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol}; use crate::models::ImageCompressionAlgorithm; use crate::models::LsnLease; @@ -109,6 +109,7 @@ pub struct ConfigToml { pub virtual_file_io_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_sync: Option, + pub wal_receiver_protocol: PostgresClientProtocol, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -317,6 +318,9 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; + + pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol = + utils::postgres_client::PostgresClientProtocol::Vanilla; } impl Default for ConfigToml { @@ -399,6 +403,7 @@ impl Default for ConfigToml { virtual_file_io_mode: None, tenant_config: TenantConfigToml::default(), no_sync: None, + wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, } } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 033a9a4619..a8c2c2e992 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> { // after setting up logging, log the effective IO engine choice and read path implementations info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode"); + info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol"); // The tenants directory contains all the pageserver local disk state. // Create if not exists and make sure all the contents are durable before proceeding. diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index b694a43599..6342d6e163 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig}; use std::env; use storage_broker::Uri; use utils::logging::SecretString; +use utils::postgres_client::PostgresClientProtocol; use once_cell::sync::OnceCell; use reqwest::Url; @@ -182,6 +183,8 @@ pub struct PageServerConf { /// Optionally disable disk syncs (unsafe!) pub no_sync: bool, + + pub wal_receiver_protocol: PostgresClientProtocol, } /// Token for authentication to safekeepers @@ -338,6 +341,7 @@ impl PageServerConf { virtual_file_io_engine, tenant_config, no_sync, + wal_receiver_protocol, } = config_toml; let mut conf = PageServerConf { @@ -377,6 +381,7 @@ impl PageServerConf { image_compression, timeline_offloading, ephemeral_bytes_per_memory_kb, + wal_receiver_protocol, // ------------------------------------------------------------ // fields that require additional validation or custom handling diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 00dfd728ce..8cd55abfb6 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2416,6 +2416,7 @@ impl Timeline { *guard = Some(WalReceiver::start( Arc::clone(self), WalReceiverConf { + protocol: self.conf.wal_receiver_protocol, wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag, diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 4a3a5c621b..f831f5e48a 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::postgres_client::PostgresClientProtocol; use self::connection_manager::ConnectionManagerStatus; @@ -45,6 +46,7 @@ use super::Timeline; #[derive(Clone)] pub struct WalReceiverConf { + pub protocol: PostgresClientProtocol, /// The timeout on the connection to safekeeper for WAL streaming. pub wal_connect_timeout: Duration, /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one. diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6fb0eeb8bc..7a64703a30 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -987,12 +987,26 @@ impl ConnectionManagerState { return None; // no connection string, ignore sk } + let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol { + PostgresClientProtocol::Vanilla => { + (None, None, None) + }, + PostgresClientProtocol::Interpreted => { + let shard_identity = self.timeline.get_shard_identity(); + ( + Some(shard_identity.number.0), + Some(shard_identity.count.0), + Some(shard_identity.stripe_size.0), + ) + } + }; + let connection_conf_args = ConnectionConfigArgs { - protocol: PostgresClientProtocol::Vanilla, + protocol: self.conf.protocol, ttid: self.id, - shard_number: None, - shard_count: None, - shard_stripe_size: None, + shard_number, + shard_count, + shard_stripe_size, listen_pg_addr_str: info.safekeeper_connstr.as_ref(), auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()), availability_zone: self.conf.availability_zone.as_deref() @@ -1102,6 +1116,7 @@ impl ReconnectReason { mod tests { use super::*; use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; + use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL; use url::Host; fn dummy_broker_sk_timeline( @@ -1538,6 +1553,7 @@ mod tests { timeline, cancel: CancellationToken::new(), conf: WalReceiverConf { + protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, wal_connect_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1), max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),