From 980f5f8440201741ce6b392fbd5d2f49b120a8f4 Mon Sep 17 00:00:00 2001 From: anastasia Date: Thu, 16 Dec 2021 14:30:31 +0300 Subject: [PATCH] Propagate remote_consistent_lsn to safekeepers. Change meaning of lsns in HOT_STANDBY_FEEDBACK: flush_lsn = disk_consistent_lsn, apply_lsn = remote_consistent_lsn Update compute node backpressure configuration respectively. Update compute node configuration: set 'synchronous_commit=remote_write' in setup without safekeepers. This way compute node doesn't have to wait for data checkpoint on pageserver. This doesn't guarantee data durability, but we only use this setup for tests, so it's fine. --- control_plane/src/compute.rs | 17 ++++++++++++-- pageserver/src/layered_repository.rs | 4 ++-- pageserver/src/walreceiver.rs | 27 ++++++++-------------- test_runner/batch_others/test_multixact.py | 3 ++- vendor/postgres | 2 +- walkeeper/src/safekeeper.rs | 2 +- walkeeper/src/send_wal.rs | 7 +++--- walkeeper/src/timeline.rs | 7 ++++++ 8 files changed, 42 insertions(+), 27 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 09ff3894a5..a851126d67 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -300,8 +300,6 @@ impl PostgresNode { // wal_sender_timeout is the maximum time to wait for WAL replication. // It also defines how often the walreciever will send a feedback message to the wal sender. conf.append("wal_sender_timeout", "5s"); - conf.append("max_replication_flush_lag", "160MB"); - conf.append("max_replication_apply_lag", "1500MB"); conf.append("listen_addresses", &self.address.ip().to_string()); conf.append("port", &self.address.port().to_string()); @@ -343,6 +341,11 @@ impl PostgresNode { conf.append_line(""); if !self.env.safekeepers.is_empty() { + // Configure backpressure + // In setup with safekeepers apply_lag depends on + // speed of data checkpointing on pageserver (see disk_consistent_lsn). + conf.append("max_replication_apply_lag", "1500MB"); + // Configure the node to connect to the safekeepers conf.append("synchronous_standby_names", "walproposer"); @@ -355,6 +358,16 @@ impl PostgresNode { .join(","); conf.append("wal_acceptors", &wal_acceptors); } else { + // Configure backpressure + // In setup without safekeepers, flush_lag depends on + // speed of of data checkpointing on pageserver (see disk_consistent_lsn) + conf.append("max_replication_flush_lag", "1500MB"); + + // We only use setup without safekeepers for tests, + // and don't care about data durability on pageserver, + // so set more relaxed synchronous_commit. + conf.append("synchronous_commit", "remote_write"); + // Configure the node to stream WAL directly to the pageserver // This isn't really a supported configuration, but can be useful for // testing. diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 3df1bdd95d..3f5c092e10 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -794,8 +794,8 @@ impl Timeline for LayeredTimeline { .wait_for_timeout(lsn, TIMEOUT) .with_context(|| { format!( - "Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}", - lsn, self.get_disk_consistent_lsn() + "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}", + lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn() ) })?; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 4c8157825d..8e7106355b 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -195,9 +195,6 @@ fn walreceiver_main( tenantid, timelineid, ) })?; - let _timeline_synced_disk_consistent_lsn = tenant_mgr::get_repository_for_tenant(tenantid)? - .get_timeline_state(timelineid) - .and_then(|state| state.remote_disk_consistent_lsn()); // // Start streaming the WAL, from where we left off previously. @@ -287,23 +284,19 @@ fn walreceiver_main( if let Some(last_lsn) = status_update { let last_lsn = PgLsn::from(u64::from(last_lsn)); + let timeline_synced_disk_consistent_lsn = + tenant_mgr::get_repository_for_tenant(tenantid)? + .get_timeline_state(timelineid) + .and_then(|state| state.remote_disk_consistent_lsn()) + .unwrap_or(Lsn(0)); // The last LSN we processed. It is not guaranteed to survive pageserver crash. let write_lsn = last_lsn; - // This value doesn't guarantee data durability, but it's ok. - // In setup with WAL service, pageserver durability is guaranteed by safekeepers. - // In setup without WAL service, we just don't care. - let flush_lsn = write_lsn; - // `disk_consistent_lsn` is the LSN at which page server guarantees persistence of all received data - // Depending on the setup we recieve WAL directly from Compute Node or - // from a WAL service. - // - // Senders use the feedback to determine if we are caught up: - // - Safekeepers are free to remove WAL preceding `apply_lsn`, - // as it will never be requested by this page server. - // - Compute Node uses 'apply_lsn' to calculate a lag for back pressure mechanism - // (delay WAL inserts to avoid lagging pageserver responses and WAL overflow). - let apply_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); + // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data + let flush_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn())); + // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash + // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`. + let apply_lsn = PgLsn::from(u64::from(timeline_synced_disk_consistent_lsn)); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; diff --git a/test_runner/batch_others/test_multixact.py b/test_runner/batch_others/test_multixact.py index 0b9ccf16c4..2203888ce9 100644 --- a/test_runner/batch_others/test_multixact.py +++ b/test_runner/batch_others/test_multixact.py @@ -53,7 +53,8 @@ def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir): # force wal flush cur.execute('checkpoint') - cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint()') + cur.execute( + 'SELECT next_multixact_id, pg_current_wal_insert_lsn() FROM pg_control_checkpoint()') res = cur.fetchone() next_multixact_id = res[0] lsn = res[1] diff --git a/vendor/postgres b/vendor/postgres index 22938e1a7d..f7ce86a970 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 22938e1a7d044a2643b7f2283f299113a213d66e +Subproject commit f7ce86a9700dc47485fa633d9dd60d7942f7f0e1 diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index da2fff41e3..457ff2eb9b 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -740,7 +740,7 @@ where /* * Update truncate and commit LSN in control file. * To avoid negative impact on performance of extra fsync, do it only - * when restart_lsn delta exceeds WAL segment size. + * when truncate_lsn delta exceeds WAL segment size. */ sync_control_file |= self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn; diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index a27308079e..fdc56f729f 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -58,8 +58,8 @@ impl HotStandbyFeedback { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StandbyReply { pub write_lsn: Lsn, // last lsn received by pageserver - pub flush_lsn: Lsn, // not used - pub apply_lsn: Lsn, // pageserver's disk consistent lSN + pub flush_lsn: Lsn, // pageserver's disk consistent lSN + pub apply_lsn: Lsn, // pageserver's remote consistent lSN pub reply_ts: TimestampTz, pub reply_requested: bool, } @@ -145,8 +145,9 @@ impl ReplicationConn { Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let reply = StandbyReply::des(&m[1..]) .context("failed to deserialize StandbyReply")?; - state.disk_consistent_lsn = reply.apply_lsn; state.last_received_lsn = reply.write_lsn; + state.disk_consistent_lsn = reply.flush_lsn; + state.remote_consistent_lsn = reply.apply_lsn; timeline.update_replica_state(replica_id, Some(state)); } _ => warn!("unexpected message {:?}", msg), diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 96deba84ee..2abefbb7bc 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -42,6 +42,8 @@ pub struct ReplicaState { pub last_received_lsn: Lsn, // None means we don't know /// combined disk_consistent_lsn of pageservers pub disk_consistent_lsn: Lsn, + /// combined remote consistent lsn of pageservers + pub remote_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, } @@ -57,6 +59,7 @@ impl ReplicaState { ReplicaState { last_received_lsn: Lsn::MAX, disk_consistent_lsn: Lsn(u64::MAX), + remote_consistent_lsn: Lsn(u64::MAX), hs_feedback: HotStandbyFeedback { ts: 0, xmin: u64::MAX, @@ -109,6 +112,10 @@ impl SharedState { acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn); // currently not used, but update it to be consistent acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn); + // When at least one replica has preserved data up to remote_consistent_lsn, + // safekeeper is free to delete it, so chose max of all replicas. + acc.remote_consistent_lsn = + Lsn::max(acc.remote_consistent_lsn, state.remote_consistent_lsn); } acc }