From bb1446e33ae504ffdbc3bf1ced2f11af1bba1d39 Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Wed, 9 Jun 2021 11:24:55 +0300 Subject: [PATCH] Change behavior of ComputeControlPlane::new_node() (#235) Previously, transaction commit could happen regardless of whether pageserver has caught up or not. This patch aims to fix that. There are two notable changes: 1. ComputeControlPlane::new_node() now sets the `synchronous_standby_names = 'pageserver'` parameter to delay transaction commit until pageserver acting as a standby has fetched and ack'd a relevant portion of WAL. 2. pageserver now has to: - Specify the `application_name = pageserver` which matches the one in `synchronous_standby_names`. - Properly reply with the ack'd LSNs. This means that some tests don't need sleeps anymore. TODO: We should probably make this behavior configurable. Fixes #187. --- control_plane/src/compute.rs | 7 +++- pageserver/src/walreceiver.rs | 38 +++++++++++-------- .../batch_others/test_restart_compute.py | 8 ---- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 882ea3157a..2e49776865 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -151,8 +151,11 @@ impl ComputeControlPlane { node.append_conf( "postgresql.conf", format!( - "shared_preload_libraries = zenith\n\ - zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping + concat!( + "shared_preload_libraries = zenith\n", + "synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg? + "zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping + ), node.connstr() ) .as_str(), diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 3a466d4423..dc0e9e84db 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -120,7 +120,10 @@ fn walreceiver_main( ) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); - let connect_cfg = format!("{} replication=true", wal_producer_connstr); + let connect_cfg = format!( + "{} application_name=pageserver replication=true", + wal_producer_connstr + ); let mut rclient = Client::connect(&connect_cfg, NoTls)?; info!("connected!"); @@ -166,7 +169,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); while let Some(replication_message) = physical_stream.next()? { - match replication_message { + let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { // Pass the WAL data to the decoder, and see if we can decode // more records as a result. @@ -226,12 +229,14 @@ fn walreceiver_main( info!("caught up at LSN {}", endlsn); caught_up = true; } + + Some(endlsn) } ReplicationMessage::PrimaryKeepAlive(keepalive) => { let wal_end = keepalive.wal_end(); let timestamp = keepalive.timestamp(); - let reply_requested: bool = keepalive.reply() != 0; + let reply_requested = keepalive.reply() != 0; trace!( "received PrimaryKeepAlive(wal_end: {}, timestamp: {:?} reply: {})", @@ -239,20 +244,23 @@ fn walreceiver_main( timestamp, reply_requested, ); - if reply_requested { - // TODO: More thought should go into what values are sent here. - let last_lsn = PgLsn::from(u64::from(timeline.get_last_valid_lsn())); - let write_lsn = last_lsn; - let flush_lsn = last_lsn; - let apply_lsn = PgLsn::from(0); - let ts = SystemTime::now(); - const NO_REPLY: u8 = 0u8; - physical_stream - .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; - } + reply_requested.then(|| timeline.get_last_valid_lsn()) } - _ => (), + + _ => None, + }; + + if let Some(last_lsn) = status_update { + // TODO: More thought should go into what values are sent here. + let last_lsn = PgLsn::from(u64::from(last_lsn)); + let write_lsn = last_lsn; + let flush_lsn = last_lsn; + let apply_lsn = PgLsn::from(0); + let ts = SystemTime::now(); + const NO_REPLY: u8 = 0; + + physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; } } Ok(()) diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index eb8e335d98..a332753380 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -1,5 +1,4 @@ import psycopg2 -import time pytest_plugins = ("fixtures.zenith_fixtures") @@ -38,13 +37,6 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin): cur.execute('SELECT count(*) FROM foo') assert cur.fetchone() == (2, ) - # FIXME: Currently, there is no guarantee that by the time the INSERT commits, the WAL - # has been streamed safely to the WAL safekeeper or page server. It is merely stored - # on the Postgres instance's local disk. Sleep a little, to give it time to be - # streamed. This should be removed, when we have the ability to run the Postgres - # instance -> safekeeper streaming in synchronous mode. - time.sleep(5) - # Stop, and destroy the Postgres instance. Then recreate and restart it. pg_conn.close() pg.stop_and_destroy()