mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
pageserver: gate new protocol with pageserver config
This commit is contained in:
@@ -18,7 +18,7 @@ use std::{
|
||||
str::FromStr,
|
||||
time::Duration,
|
||||
};
|
||||
use utils::logging::LogFormat;
|
||||
use utils::{logging::LogFormat, postgres_client::PostgresClientProtocol};
|
||||
|
||||
use crate::models::ImageCompressionAlgorithm;
|
||||
use crate::models::LsnLease;
|
||||
@@ -109,6 +109,7 @@ pub struct ConfigToml {
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub no_sync: Option<bool>,
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -317,6 +318,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
|
||||
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
|
||||
utils::postgres_client::PostgresClientProtocol::Vanilla;
|
||||
}
|
||||
|
||||
impl Default for ConfigToml {
|
||||
@@ -399,6 +403,7 @@ impl Default for ConfigToml {
|
||||
virtual_file_io_mode: None,
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,6 +126,7 @@ fn main() -> anyhow::Result<()> {
|
||||
// after setting up logging, log the effective IO engine choice and read path implementations
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
|
||||
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
|
||||
|
||||
// The tenants directory contains all the pageserver local disk state.
|
||||
// Create if not exists and make sure all the contents are durable before proceeding.
|
||||
|
||||
@@ -14,6 +14,7 @@ use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use std::env;
|
||||
use storage_broker::Uri;
|
||||
use utils::logging::SecretString;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
use reqwest::Url;
|
||||
@@ -182,6 +183,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// Optionally disable disk syncs (unsafe!)
|
||||
pub no_sync: bool,
|
||||
|
||||
pub wal_receiver_protocol: PostgresClientProtocol,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -338,6 +341,7 @@ impl PageServerConf {
|
||||
virtual_file_io_engine,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
wal_receiver_protocol,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -377,6 +381,7 @@ impl PageServerConf {
|
||||
image_compression,
|
||||
timeline_offloading,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
wal_receiver_protocol,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -2416,6 +2416,7 @@ impl Timeline {
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
protocol: self.conf.wal_receiver_protocol,
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
|
||||
@@ -38,6 +38,7 @@ use storage_broker::BrokerClientChannel;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
|
||||
use self::connection_manager::ConnectionManagerStatus;
|
||||
|
||||
@@ -45,6 +46,7 @@ use super::Timeline;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WalReceiverConf {
|
||||
pub protocol: PostgresClientProtocol,
|
||||
/// The timeout on the connection to safekeeper for WAL streaming.
|
||||
pub wal_connect_timeout: Duration,
|
||||
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
|
||||
|
||||
@@ -987,12 +987,26 @@ impl ConnectionManagerState {
|
||||
return None; // no connection string, ignore sk
|
||||
}
|
||||
|
||||
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
|
||||
PostgresClientProtocol::Vanilla => {
|
||||
(None, None, None)
|
||||
},
|
||||
PostgresClientProtocol::Interpreted => {
|
||||
let shard_identity = self.timeline.get_shard_identity();
|
||||
(
|
||||
Some(shard_identity.number.0),
|
||||
Some(shard_identity.count.0),
|
||||
Some(shard_identity.stripe_size.0),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let connection_conf_args = ConnectionConfigArgs {
|
||||
protocol: PostgresClientProtocol::Vanilla,
|
||||
protocol: self.conf.protocol,
|
||||
ttid: self.id,
|
||||
shard_number: None,
|
||||
shard_count: None,
|
||||
shard_stripe_size: None,
|
||||
shard_number,
|
||||
shard_count,
|
||||
shard_stripe_size,
|
||||
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
|
||||
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
|
||||
availability_zone: self.conf.availability_zone.as_deref()
|
||||
@@ -1102,6 +1116,7 @@ impl ReconnectReason {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
|
||||
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
|
||||
use url::Host;
|
||||
|
||||
fn dummy_broker_sk_timeline(
|
||||
@@ -1538,6 +1553,7 @@ mod tests {
|
||||
timeline,
|
||||
cancel: CancellationToken::new(),
|
||||
conf: WalReceiverConf {
|
||||
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
|
||||
Reference in New Issue
Block a user