diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 29314dab9e..0cf7ca184d 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -513,11 +513,6 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'timeline_offloading' as bool")?, - wal_receiver_protocol_override: settings - .remove("wal_receiver_protocol_override") - .map(serde_json::from_str) - .transpose() - .context("parse `wal_receiver_protocol_override` from json")?, rel_size_v2_enabled: settings .remove("rel_size_v2_enabled") .map(|x| x.parse::()) diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 46903965b1..30b0612082 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -20,7 +20,6 @@ use postgres_backend::AuthType; use remote_storage::RemoteStorageConfig; use serde_with::serde_as; use utils::logging::LogFormat; -use utils::postgres_client::PostgresClientProtocol; use crate::models::{ImageCompressionAlgorithm, LsnLease}; @@ -189,7 +188,6 @@ pub struct ConfigToml { pub virtual_file_io_mode: Option, #[serde(skip_serializing_if = "Option::is_none")] pub no_sync: Option, - pub wal_receiver_protocol: PostgresClientProtocol, pub page_service_pipelining: PageServicePipeliningConfig, pub get_vectored_concurrent_io: GetVectoredConcurrentIo, pub enable_read_path_debugging: Option, @@ -527,8 +525,6 @@ pub struct TenantConfigToml { /// (either this flag or the pageserver-global one need to be set) pub timeline_offloading: bool, - pub wal_receiver_protocol_override: Option, - /// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into /// `index_part.json`, and it cannot be reversed. pub rel_size_v2_enabled: bool, @@ -609,12 +605,6 @@ pub mod defaults { pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512; - pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol = - utils::postgres_client::PostgresClientProtocol::Interpreted { - format: utils::postgres_client::InterpretedFormat::Protobuf, - compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }), - }; - pub const DEFAULT_SSL_KEY_FILE: &str = "server.key"; pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt"; } @@ -713,7 +703,6 @@ impl Default for ConfigToml { virtual_file_io_mode: None, tenant_config: TenantConfigToml::default(), no_sync: None, - wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, page_service_pipelining: PageServicePipeliningConfig::Pipelined( PageServicePipeliningConfigPipelined { max_batch_size: NonZeroUsize::new(32).unwrap(), @@ -858,7 +847,6 @@ impl Default for TenantConfigToml { lsn_lease_length: LsnLease::DEFAULT_LENGTH, lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, timeline_offloading: true, - wal_receiver_protocol_override: None, rel_size_v2_enabled: false, gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED, gc_compaction_verification: DEFAULT_GC_COMPACTION_VERIFICATION, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 28ced4a368..881f24b86c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -20,7 +20,6 @@ use serde_with::serde_as; pub use utilization::PageserverUtilization; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; -use utils::postgres_client::PostgresClientProtocol; use utils::{completion, serde_system_time}; use crate::config::Ratio; @@ -622,8 +621,6 @@ pub struct TenantConfigPatch { #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub timeline_offloading: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] - pub wal_receiver_protocol_override: FieldPatch, - #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub rel_size_v2_enabled: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub gc_compaction_enabled: FieldPatch, @@ -748,9 +745,6 @@ pub struct TenantConfig { #[serde(skip_serializing_if = "Option::is_none")] pub timeline_offloading: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub wal_receiver_protocol_override: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub rel_size_v2_enabled: Option, @@ -812,7 +806,6 @@ impl TenantConfig { mut lsn_lease_length, mut lsn_lease_length_for_ts, mut timeline_offloading, - mut wal_receiver_protocol_override, mut rel_size_v2_enabled, mut gc_compaction_enabled, mut gc_compaction_verification, @@ -905,9 +898,6 @@ impl TenantConfig { .map(|v| humantime::parse_duration(&v))? .apply(&mut lsn_lease_length_for_ts); patch.timeline_offloading.apply(&mut timeline_offloading); - patch - .wal_receiver_protocol_override - .apply(&mut wal_receiver_protocol_override); patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled); patch .gc_compaction_enabled @@ -960,7 +950,6 @@ impl TenantConfig { lsn_lease_length, lsn_lease_length_for_ts, timeline_offloading, - wal_receiver_protocol_override, rel_size_v2_enabled, gc_compaction_enabled, gc_compaction_verification, @@ -1058,9 +1047,6 @@ impl TenantConfig { timeline_offloading: self .timeline_offloading .unwrap_or(global_conf.timeline_offloading), - wal_receiver_protocol_override: self - .wal_receiver_protocol_override - .or(global_conf.wal_receiver_protocol_override), rel_size_v2_enabled: self .rel_size_v2_enabled .unwrap_or(global_conf.rel_size_v2_enabled), diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index be7e634d4c..a1a95ad2d1 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -158,7 +158,6 @@ fn main() -> anyhow::Result<()> { // (maybe we should automate this with a visitor?). info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode"); - info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol"); info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation"); info!(?conf.page_service_pipelining, "starting with page service pipelining config"); info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config"); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 628d4f6021..3492a8d966 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -27,7 +27,6 @@ use reqwest::Url; use storage_broker::Uri; use utils::id::{NodeId, TimelineId}; use utils::logging::{LogFormat, SecretString}; -use utils::postgres_client::PostgresClientProtocol; use crate::tenant::storage_layer::inmemory_layer::IndexEntry; use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; @@ -211,8 +210,6 @@ pub struct PageServerConf { /// Optionally disable disk syncs (unsafe!) pub no_sync: bool, - pub wal_receiver_protocol: PostgresClientProtocol, - pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig, pub get_vectored_concurrent_io: pageserver_api::config::GetVectoredConcurrentIo, @@ -421,7 +418,6 @@ impl PageServerConf { virtual_file_io_engine, tenant_config, no_sync, - wal_receiver_protocol, page_service_pipelining, get_vectored_concurrent_io, enable_read_path_debugging, @@ -484,7 +480,6 @@ impl PageServerConf { import_pgdata_upcall_api, import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from), import_pgdata_aws_endpoint_url, - wal_receiver_protocol, page_service_pipelining, get_vectored_concurrent_io, tracing, diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3173ab221f..3eb70ffac2 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2855,7 +2855,6 @@ pub(crate) struct WalIngestMetrics { pub(crate) records_received: IntCounter, pub(crate) records_observed: IntCounter, pub(crate) records_committed: IntCounter, - pub(crate) records_filtered: IntCounter, pub(crate) values_committed_metadata_images: IntCounter, pub(crate) values_committed_metadata_deltas: IntCounter, pub(crate) values_committed_data_images: IntCounter, @@ -2911,11 +2910,6 @@ pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| { "Number of WAL records which resulted in writes to pageserver storage" ) .expect("failed to define a metric"), - records_filtered: register_int_counter!( - "pageserver_wal_ingest_records_filtered", - "Number of WAL records filtered out due to sharding" - ) - .expect("failed to define a metric"), values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]), values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]), values_committed_data_images: values_committed.with_label_values(&["data", "image"]), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8fdf84b6d3..6798606141 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2845,21 +2845,6 @@ impl Timeline { ) } - /// Resolve the effective WAL receiver protocol to use for this tenant. - /// - /// Priority order is: - /// 1. Tenant config override - /// 2. Default value for tenant config override - /// 3. Pageserver config override - /// 4. Pageserver config default - pub fn resolve_wal_receiver_protocol(&self) -> PostgresClientProtocol { - let tenant_conf = self.tenant_conf.load().tenant_conf.clone(); - tenant_conf - .wal_receiver_protocol_override - .or(self.conf.default_tenant_conf.wal_receiver_protocol_override) - .unwrap_or(self.conf.wal_receiver_protocol) - } - pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) { // NB: Most tenant conf options are read by background loops, so, // changes will automatically be picked up. @@ -3215,10 +3200,16 @@ impl Timeline { guard.is_none(), "multiple launches / re-launches of WAL receiver are not supported" ); + + let protocol = PostgresClientProtocol::Interpreted { + format: utils::postgres_client::InterpretedFormat::Protobuf, + compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }), + }; + *guard = Some(WalReceiver::start( Arc::clone(self), WalReceiverConf { - protocol: self.resolve_wal_receiver_protocol(), + protocol, wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag, diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 3c3608d1bd..7e0b0e9b25 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -32,9 +32,7 @@ use utils::backoff::{ }; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; -use utils::postgres_client::{ - ConnectionConfigArgs, PostgresClientProtocol, wal_stream_connection_config, -}; +use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config}; use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError}; use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf}; @@ -991,19 +989,12 @@ impl ConnectionManagerState { return None; // no connection string, ignore sk } - let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol { - PostgresClientProtocol::Vanilla => { - (None, None, None) - }, - PostgresClientProtocol::Interpreted { .. } => { - let shard_identity = self.timeline.get_shard_identity(); - ( - Some(shard_identity.number.0), - Some(shard_identity.count.0), - Some(shard_identity.stripe_size.0), - ) - } - }; + let shard_identity = self.timeline.get_shard_identity(); + let (shard_number, shard_count, shard_stripe_size) = ( + Some(shard_identity.number.0), + Some(shard_identity.count.0), + Some(shard_identity.stripe_size.0), + ); let connection_conf_args = ConnectionConfigArgs { protocol: self.conf.protocol, @@ -1120,8 +1111,8 @@ impl ReconnectReason { #[cfg(test)] mod tests { - use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL; use url::Host; + use utils::postgres_client::PostgresClientProtocol; use super::*; use crate::tenant::harness::{TIMELINE_ID, TenantHarness}; @@ -1552,6 +1543,11 @@ mod tests { .await .expect("Failed to create an empty timeline for dummy wal connection manager"); + let protocol = PostgresClientProtocol::Interpreted { + format: utils::postgres_client::InterpretedFormat::Protobuf, + compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }), + }; + ConnectionManagerState { id: TenantTimelineId { tenant_id: harness.tenant_shard_id.tenant_id, @@ -1560,7 +1556,7 @@ mod tests { timeline, cancel: CancellationToken::new(), conf: WalReceiverConf { - protocol: DEFAULT_WAL_RECEIVER_PROTOCOL, + protocol, wal_connect_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1), max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 249849ac4b..343e04f5f0 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -15,7 +15,7 @@ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_ffi::v14::xlog_utils::normalize_lsn; -use postgres_ffi::waldecoder::{WalDecodeError, WalStreamDecoder}; +use postgres_ffi::waldecoder::WalDecodeError; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use tokio::sync::watch; @@ -31,7 +31,7 @@ use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; use utils::postgres_client::PostgresClientProtocol; use utils::sync::gate::GateError; -use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords}; +use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecords}; use wal_decoder::wire_format::FromWireFormat; use super::TaskStateUpdate; @@ -275,8 +275,6 @@ pub(super) async fn handle_walreceiver_connection( let copy_stream = replication_client.copy_both_simple(&query).await?; let mut physical_stream = pin!(ReplicationStream::new(copy_stream)); - let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version); - let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx) .await .map_err(|e| match e.kind { @@ -284,14 +282,16 @@ pub(super) async fn handle_walreceiver_connection( _ => WalReceiverError::Other(e.into()), })?; - let shard = vec![*timeline.get_shard_identity()]; - - let interpreted_proto_config = match protocol { - PostgresClientProtocol::Vanilla => None, + let (format, compression) = match protocol { PostgresClientProtocol::Interpreted { format, compression, - } => Some((format, compression)), + } => (format, compression), + PostgresClientProtocol::Vanilla => { + return Err(WalReceiverError::Other(anyhow!( + "Vanilla WAL receiver protocol is no longer supported for ingest" + ))); + } }; let mut expected_wal_start = startpoint; @@ -313,16 +313,6 @@ pub(super) async fn handle_walreceiver_connection( // Update the connection status before processing the message. If the message processing // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper. match &replication_message { - ReplicationMessage::XLogData(xlog_data) => { - connection_status.latest_connection_update = now; - connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end())); - connection_status.streaming_lsn = Some(Lsn::from( - xlog_data.wal_start() + xlog_data.data().len() as u64, - )); - if !xlog_data.data().is_empty() { - connection_status.latest_wal_update = now; - } - } ReplicationMessage::PrimaryKeepAlive(keepalive) => { connection_status.latest_connection_update = now; connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end())); @@ -353,7 +343,6 @@ pub(super) async fn handle_walreceiver_connection( // were interpreted. let streaming_lsn = Lsn::from(raw.streaming_lsn()); - let (format, compression) = interpreted_proto_config.unwrap(); let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression) .await .with_context(|| { @@ -509,138 +498,6 @@ pub(super) async fn handle_walreceiver_connection( Some(streaming_lsn) } - ReplicationMessage::XLogData(xlog_data) => { - async fn commit( - modification: &mut DatadirModification<'_>, - uncommitted: &mut u64, - filtered: &mut u64, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - let stats = modification.stats(); - modification.commit(ctx).await?; - WAL_INGEST - .records_committed - .inc_by(*uncommitted - *filtered); - WAL_INGEST.inc_values_committed(&stats); - *uncommitted = 0; - *filtered = 0; - Ok(()) - } - - // Pass the WAL data to the decoder, and see if we can decode - // more records as a result. - let data = xlog_data.data(); - let startlsn = Lsn::from(xlog_data.wal_start()); - let endlsn = startlsn + data.len() as u64; - - trace!("received XLogData between {startlsn} and {endlsn}"); - - WAL_INGEST.bytes_received.inc_by(data.len() as u64); - waldecoder.feed_bytes(data); - - { - let mut modification = timeline.begin_modification(startlsn); - let mut uncommitted_records = 0; - let mut filtered_records = 0; - - while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? { - // It is important to deal with the aligned records as lsn in getPage@LSN is - // aligned and can be several bytes bigger. Without this alignment we are - // at risk of hitting a deadlock. - if !next_record_lsn.is_aligned() { - return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); - } - - // Deserialize and interpret WAL record - let interpreted = InterpretedWalRecord::from_bytes_filtered( - recdata, - &shard, - next_record_lsn, - modification.tline.pg_version, - )? - .remove(timeline.get_shard_identity()) - .unwrap(); - - if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) - && uncommitted_records > 0 - { - // Special case: legacy PG database creations operate by reading pages from a 'template' database: - // these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure - // all earlier writes of data blocks are visible by committing any modification in flight. - commit( - &mut modification, - &mut uncommitted_records, - &mut filtered_records, - &ctx, - ) - .await?; - } - - // Ingest the records without immediately committing them. - timeline.metrics.wal_records_received.inc(); - let ingested = walingest - .ingest_record(interpreted, &mut modification, &ctx) - .await - .with_context(|| { - format!("could not ingest record at {next_record_lsn}") - }) - .inspect_err(|err| { - // TODO: we can't differentiate cancellation errors with - // anyhow::Error, so just ignore it if we're cancelled. - if !cancellation.is_cancelled() && !timeline.is_stopping() { - critical!("{err:?}") - } - })?; - if !ingested { - tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}"); - WAL_INGEST.records_filtered.inc(); - filtered_records += 1; - } - - // FIXME: this cannot be made pausable_failpoint without fixing the - // failpoint library; in tests, the added amount of debugging will cause us - // to timeout the tests. - fail_point!("walreceiver-after-ingest"); - - last_rec_lsn = next_record_lsn; - - // Commit every ingest_batch_size records. Even if we filtered out - // all records, we still need to call commit to advance the LSN. - uncommitted_records += 1; - if uncommitted_records >= ingest_batch_size - || modification.approx_pending_bytes() - > DatadirModification::MAX_PENDING_BYTES - { - commit( - &mut modification, - &mut uncommitted_records, - &mut filtered_records, - &ctx, - ) - .await?; - } - } - - // Commit the remaining records. - if uncommitted_records > 0 { - commit( - &mut modification, - &mut uncommitted_records, - &mut filtered_records, - &ctx, - ) - .await?; - } - } - - if !caught_up && endlsn >= end_of_wal { - info!("caught up at LSN {endlsn}"); - caught_up = true; - } - - Some(endlsn) - } - ReplicationMessage::PrimaryKeepAlive(keepalive) => { let wal_end = keepalive.wal_end(); let timestamp = keepalive.timestamp(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f9337bed89..db3f080261 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -357,31 +357,6 @@ class PgProtocol: return TimelineId(cast("str", self.safe_psql("show neon.timeline_id")[0][0])) -class PageserverWalReceiverProtocol(StrEnum): - VANILLA = "vanilla" - INTERPRETED = "interpreted" - - @staticmethod - def to_config_key_value(proto) -> tuple[str, dict[str, Any]]: - if proto == PageserverWalReceiverProtocol.VANILLA: - return ( - "wal_receiver_protocol", - { - "type": "vanilla", - }, - ) - elif proto == PageserverWalReceiverProtocol.INTERPRETED: - return ( - "wal_receiver_protocol", - { - "type": "interpreted", - "args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}}, - }, - ) - else: - raise ValueError(f"Unknown protocol type: {proto}") - - @dataclass class PageserverTracingConfig: sampling_ratio: tuple[int, int] @@ -475,7 +450,6 @@ class NeonEnvBuilder: safekeeper_extra_opts: list[str] | None = None, storage_controller_port_override: int | None = None, pageserver_virtual_file_io_mode: str | None = None, - pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None, pageserver_get_vectored_concurrent_io: str | None = None, pageserver_tracing_config: PageserverTracingConfig | None = None, pageserver_import_config: PageserverImportConfig | None = None, @@ -552,11 +526,6 @@ class NeonEnvBuilder: self.pageserver_virtual_file_io_mode = pageserver_virtual_file_io_mode - if pageserver_wal_receiver_protocol is not None: - self.pageserver_wal_receiver_protocol = pageserver_wal_receiver_protocol - else: - self.pageserver_wal_receiver_protocol = PageserverWalReceiverProtocol.INTERPRETED - assert test_name.startswith("test_"), ( "Unexpectedly instantiated from outside a test function" ) @@ -1202,7 +1171,6 @@ class NeonEnv: self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode - self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io self.pageserver_tracing_config = config.pageserver_tracing_config if config.pageserver_import_config is None: @@ -1334,13 +1302,6 @@ class NeonEnv: for key, value in override.items(): ps_cfg[key] = value - if self.pageserver_wal_receiver_protocol is not None: - key, value = PageserverWalReceiverProtocol.to_config_key_value( - self.pageserver_wal_receiver_protocol - ) - if key not in ps_cfg: - ps_cfg[key] = value - if self.pageserver_tracing_config is not None: key, value = self.pageserver_tracing_config.to_config_key_value() diff --git a/test_runner/performance/test_sharded_ingest.py b/test_runner/performance/test_sharded_ingest.py index 293026d40a..364fcf3737 100644 --- a/test_runner/performance/test_sharded_ingest.py +++ b/test_runner/performance/test_sharded_ingest.py @@ -15,19 +15,10 @@ from fixtures.neon_fixtures import ( @pytest.mark.timeout(1200) @pytest.mark.parametrize("shard_count", [1, 8, 32]) -@pytest.mark.parametrize( - "wal_receiver_protocol", - [ - "vanilla", - "interpreted-bincode-compressed", - "interpreted-protobuf-compressed", - ], -) def test_sharded_ingest( neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, shard_count: int, - wal_receiver_protocol: str, ): """ Benchmarks sharded ingestion throughput, by ingesting a large amount of WAL into a Safekeeper @@ -39,36 +30,6 @@ def test_sharded_ingest( neon_env_builder.num_pageservers = shard_count env = neon_env_builder.init_configs() - for ps in env.pageservers: - if wal_receiver_protocol == "vanilla": - ps.patch_config_toml_nonrecursive( - { - "wal_receiver_protocol": { - "type": "vanilla", - } - } - ) - elif wal_receiver_protocol == "interpreted-bincode-compressed": - ps.patch_config_toml_nonrecursive( - { - "wal_receiver_protocol": { - "type": "interpreted", - "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, - } - } - ) - elif wal_receiver_protocol == "interpreted-protobuf-compressed": - ps.patch_config_toml_nonrecursive( - { - "wal_receiver_protocol": { - "type": "interpreted", - "args": {"format": "protobuf", "compression": {"zstd": {"level": 1}}}, - } - } - ) - else: - raise AssertionError("Test must use explicit wal receiver protocol config") - env.start() # Create a sharded tenant and timeline, and migrate it to the respective pageservers. Ensure diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index 3eb6b7193c..dc44fc77db 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -182,10 +182,6 @@ def test_fully_custom_config(positive_env: NeonEnv): "lsn_lease_length": "1m", "lsn_lease_length_for_ts": "5s", "timeline_offloading": False, - "wal_receiver_protocol_override": { - "type": "interpreted", - "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, - }, "rel_size_v2_enabled": True, "relsize_snapshot_cache_capacity": 10000, "gc_compaction_enabled": True, diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 370f57b19d..1570d40ae9 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -10,7 +10,6 @@ import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnvBuilder, - PageserverWalReceiverProtocol, generate_uploads_and_deletions, ) from fixtures.pageserver.http import PageserverApiException @@ -68,14 +67,9 @@ PREEMPT_GC_COMPACTION_TENANT_CONF = { @skip_in_debug_build("only run with release build") -@pytest.mark.parametrize( - "wal_receiver_protocol", - [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], -) @pytest.mark.timeout(900) def test_pageserver_compaction_smoke( neon_env_builder: NeonEnvBuilder, - wal_receiver_protocol: PageserverWalReceiverProtocol, ): """ This is a smoke test that compaction kicks in. The workload repeatedly churns @@ -85,8 +79,6 @@ def test_pageserver_compaction_smoke( observed bounds. """ - neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol - # Effectively disable the page cache to rely only on image layers # to shorten reads. neon_env_builder.pageserver_config_override = """ diff --git a/test_runner/regress/test_crafted_wal_end.py b/test_runner/regress/test_crafted_wal_end.py index 6b9dcbba07..89ff873ca3 100644 --- a/test_runner/regress/test_crafted_wal_end.py +++ b/test_runner/regress/test_crafted_wal_end.py @@ -1,9 +1,13 @@ from __future__ import annotations +from typing import TYPE_CHECKING + import pytest from fixtures.log_helper import log from fixtures.neon_cli import WalCraft -from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnvBuilder # Restart nodes with WAL end having specially crafted shape, like last record # crossing segment boundary, to test decoding issues. @@ -19,17 +23,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder, PageserverWalReceiverProtocol "wal_record_crossing_segment_followed_by_small_one", ], ) -@pytest.mark.parametrize( - "wal_receiver_protocol", - [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], -) def test_crafted_wal_end( neon_env_builder: NeonEnvBuilder, wal_type: str, - wal_receiver_protocol: PageserverWalReceiverProtocol, ): - neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol - env = neon_env_builder.init_start() env.create_branch("test_crafted_wal_end") env.pageserver.allowed_errors.extend( diff --git a/test_runner/regress/test_subxacts.py b/test_runner/regress/test_subxacts.py index b235da0bc7..92a21007c8 100644 --- a/test_runner/regress/test_subxacts.py +++ b/test_runner/regress/test_subxacts.py @@ -1,9 +1,7 @@ from __future__ import annotations -import pytest from fixtures.neon_fixtures import ( NeonEnvBuilder, - PageserverWalReceiverProtocol, check_restored_datadir_content, ) @@ -14,13 +12,7 @@ from fixtures.neon_fixtures import ( # maintained in the pageserver, so subtransactions are not very exciting for # Neon. They are included in the commit record though and updated in the # CLOG. -@pytest.mark.parametrize( - "wal_receiver_protocol", - [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], -) -def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir, wal_receiver_protocol): - neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol - +def test_subxacts(neon_env_builder: NeonEnvBuilder, test_output_dir): env = neon_env_builder.init_start() endpoint = env.endpoints.create_start("main") diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index de6bdc0aec..d78b9d8817 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -348,7 +348,6 @@ def test_tenant_config_patch(neon_env_builder: NeonEnvBuilder, ps_managed_by: st def assert_tenant_conf_semantically_equal(lhs, rhs): """ - Storcon returns None for fields that are not set while the pageserver does not. Compare two tenant's config overrides semantically, by dropping the None values. """ lhs = {k: v for k, v in lhs.items() if v is not None} @@ -375,10 +374,7 @@ def test_tenant_config_patch(neon_env_builder: NeonEnvBuilder, ps_managed_by: st patch: dict[str, Any | None] = { "gc_period": "3h", - "wal_receiver_protocol_override": { - "type": "interpreted", - "args": {"format": "bincode", "compression": {"zstd": {"level": 1}}}, - }, + "gc_compaction_ratio_percent": 10, } api.patch_tenant_config(env.initial_tenant, patch) tenant_conf_after_patch = api.tenant_config(env.initial_tenant).tenant_specific_overrides @@ -391,7 +387,7 @@ def test_tenant_config_patch(neon_env_builder: NeonEnvBuilder, ps_managed_by: st assert_tenant_conf_semantically_equal(tenant_conf_after_patch, crnt_tenant_conf | patch) crnt_tenant_conf = tenant_conf_after_patch - patch = {"gc_period": "5h", "wal_receiver_protocol_override": None} + patch = {"gc_period": "5h", "gc_compaction_ratio_percent": None} api.patch_tenant_config(env.initial_tenant, patch) tenant_conf_after_patch = api.tenant_config(env.initial_tenant).tenant_specific_overrides if ps_managed_by == "storcon": diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 4070f99568..d8a7dc2a2b 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -14,7 +14,6 @@ from fixtures.neon_fixtures import ( Endpoint, NeonEnv, NeonEnvBuilder, - PageserverWalReceiverProtocol, Safekeeper, ) from fixtures.remote_storage import RemoteStorageKind @@ -751,15 +750,8 @@ async def run_segment_init_failure(env: NeonEnv): # Test (injected) failure during WAL segment init. # https://github.com/neondatabase/neon/issues/6401 # https://github.com/neondatabase/neon/issues/6402 -@pytest.mark.parametrize( - "wal_receiver_protocol", - [PageserverWalReceiverProtocol.VANILLA, PageserverWalReceiverProtocol.INTERPRETED], -) -def test_segment_init_failure( - neon_env_builder: NeonEnvBuilder, wal_receiver_protocol: PageserverWalReceiverProtocol -): +def test_segment_init_failure(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 1 - neon_env_builder.pageserver_wal_receiver_protocol = wal_receiver_protocol env = neon_env_builder.init_start() asyncio.run(run_segment_init_failure(env))