diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 09ff3894a5..a851126d67 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -300,8 +300,6 @@ impl PostgresNode { // wal_sender_timeout is the maximum time to wait for WAL replication. // It also defines how often the walreciever will send a feedback message to the wal sender. conf.append("wal_sender_timeout", "5s"); - conf.append("max_replication_flush_lag", "160MB"); - conf.append("max_replication_apply_lag", "1500MB"); conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("port", &self.address.port().to_string()); @@ -343,6 +341,11 @@ impl PostgresNode { conf.append_line(""); if !self.env.safekeepers.is_empty() { + // Configure backpressure + // In setup with safekeepers apply_lag depends on + // speed of data checkpointing on pageserver (see disk_consistent_lsn). + conf.append("max_replication_apply_lag", "1500MB"); + // Configure the node to connect to the safekeepers conf.append("synchronous_standby_names", "walproposer"); @@ -355,6 +358,16 @@ impl PostgresNode { .join(","); conf.append("wal_acceptors", &wal_acceptors); } else { + // Configure backpressure + // In setup without safekeepers, flush_lag depends on + // speed of of data checkpointing on pageserver (see disk_consistent_lsn) + conf.append("max_replication_flush_lag", "1500MB"); + + // We only use setup without safekeepers for tests, + // and don't care about data durability on pageserver, + // so set more relaxed synchronous_commit. + conf.append("synchronous_commit", "remote_write"); + // Configure the node to stream WAL directly to the pageserver // This isn't really a supported configuration, but can be useful for // testing. diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 3df1bdd95d..3f5c092e10 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -794,8 +794,8 @@ impl Timeline for LayeredTimeline { .wait_for_timeout(lsn, TIMEOUT) .with_context(|| { format!( - "Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}", - lsn, self.get_disk_consistent_lsn() + "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}", + lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn() ) })?; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 4c8157825d..8e7106355b 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -195,9 +195,6 @@ fn walreceiver_main( tenantid, timelineid, ) })?; - let _timeline_synced_disk_consistent_lsn = tenant_mgr::get_repository_for_tenant(tenantid)? - .get_timeline_state(timelineid) - .and_then(|state| state.remote_disk_consistent_lsn()); // // Start streaming the WAL, from where we left off previously. @@ -287,23 +284,19 @@ fn walreceiver_main( if let Some(last_lsn) = status_update { let last_lsn = PgLsn::from(u64::from(last_lsn)); + let timeline_synced_disk_consistent_lsn = + tenant_mgr::get_repository_for_tenant(tenantid)? + .get_timeline_state(timelineid) + .and_then(|state| state.remote_disk_consistent_lsn()) + .unwrap_or(Lsn(0)); // The last LSN we processed. It is not guaranteed to survive pageserver crash. let write_lsn = last_lsn; - // This value doesn't guarantee data durability, but it's ok. - // In setup with WAL service, pageserver durability is guaranteed by safekeepers. - // In setup without WAL service, we just don't care. - let flush_lsn = write_lsn; - // `disk_consistent_lsn` is the LSN at which page server guarantees persistence of all received data - // Depending on the setup we recieve WAL directly from Compute Node or - // from a WAL service. - // - // Senders use the feedback to determine if we are caught up: - // - Safekeepers are free to remove WAL preceding `apply_lsn`, - // as it will never be requested by this page server. - // - Compute Node uses 'apply_lsn' to calculate a lag for back pressure mechanism - // (delay WAL inserts to avoid lagging pageserver responses and WAL overflow). - let apply_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); + // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data + let flush_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); + // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash + // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. + let apply_lsn = PgLsn::from(u64::from(timeline_synced_disk_consistent_lsn)); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index 0b9ccf16c4..2203888ce9 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -53,7 +53,8 @@ def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir): # force wal flush cur.execute('checkpoint') - cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint()') + cur.execute( + 'SELECT next_multixact_id, pg_current_wal_insert_lsn() FROM pg_control_checkpoint()') res = cur.fetchone() next_multixact_id = res[0] lsn = res[1] diff --git a/vendor/postgres b/vendor/postgres index 22938e1a7d..f7ce86a970 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 22938e1a7d044a2643b7f2283f299113a213d66e +Subproject commit f7ce86a9700dc47485fa633d9dd60d7942f7f0e1 diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index da2fff41e3..457ff2eb9b 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -740,7 +740,7 @@ where /* * Update truncate and commit LSN in control file. * To avoid negative impact on performance of extra fsync, do it only - * when restart_lsn delta exceeds WAL segment size. + * when truncate_lsn delta exceeds WAL segment size. */ sync_control_file |= self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn; diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a27308079e..fdc56f729f 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -58,8 +58,8 @@ impl HotStandbyFeedback { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { pub write_lsn: Lsn, // last lsn received by pageserver - pub flush_lsn: Lsn, // not used - pub apply_lsn: Lsn, // pageserver's disk consistent lSN + pub flush_lsn: Lsn, // pageserver's disk consistent lSN + pub apply_lsn: Lsn, // pageserver's remote consistent lSN pub reply_ts: TimestampTz, pub reply_requested: bool, } @@ -145,8 +145,9 @@ impl ReplicationConn { Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; - state.disk_consistent_lsn = reply.apply_lsn; state.last_received_lsn = reply.write_lsn; + state.disk_consistent_lsn = reply.flush_lsn; + state.remote_consistent_lsn = reply.apply_lsn; timeline.update_replica_state(replica_id, Some(state)); } _ => warn!("unexpected message {:?}", msg), diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 96deba84ee..2abefbb7bc 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -42,6 +42,8 @@ pub struct ReplicaState { pub last_received_lsn: Lsn, // None means we don't know /// combined disk_consistent_lsn of pageservers pub disk_consistent_lsn: Lsn, + /// combined remote consistent lsn of pageservers + pub remote_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, } @@ -57,6 +59,7 @@ impl ReplicaState { ReplicaState { last_received_lsn: Lsn::MAX, disk_consistent_lsn: Lsn(u64::MAX), + remote_consistent_lsn: Lsn(u64::MAX), hs_feedback: HotStandbyFeedback { ts: 0, xmin: u64::MAX, @@ -109,6 +112,10 @@ impl SharedState { acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); // currently not used, but update it to be consistent acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn); + // When at least one replica has preserved data up to remote_consistent_lsn, + // safekeeper is free to delete it, so chose max of all replicas. + acc.remote_consistent_lsn = + Lsn::max(acc.remote_consistent_lsn, state.remote_consistent_lsn); } acc }