mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 08:22:55 +00:00
Change behavior of ComputeControlPlane::new_node() (#235)
Previously, transaction commit could happen regardless of whether
pageserver has caught up or not. This patch aims to fix that.
There are two notable changes:
1. ComputeControlPlane::new_node() now sets the
`synchronous_standby_names = 'pageserver'` parameter to delay
transaction commit until pageserver acting as a standby has
fetched and ack'd a relevant portion of WAL.
2. pageserver now has to:
- Specify the `application_name = pageserver` which matches the
one in `synchronous_standby_names`.
- Properly reply with the ack'd LSNs.
This means that some tests don't need sleeps anymore.
TODO: We should probably make this behavior configurable.
Fixes #187.
This commit is contained in:
@@ -151,8 +151,11 @@ impl ComputeControlPlane {
|
||||
node.append_conf(
|
||||
"postgresql.conf",
|
||||
format!(
|
||||
"shared_preload_libraries = zenith\n\
|
||||
zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping
|
||||
concat!(
|
||||
"shared_preload_libraries = zenith\n",
|
||||
"synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg?
|
||||
"zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping
|
||||
),
|
||||
node.connstr()
|
||||
)
|
||||
.as_str(),
|
||||
|
||||
@@ -120,7 +120,10 @@ fn walreceiver_main(
|
||||
) -> Result<(), Error> {
|
||||
// Connect to the database in replication mode.
|
||||
info!("connecting to {:?}", wal_producer_connstr);
|
||||
let connect_cfg = format!("{} replication=true", wal_producer_connstr);
|
||||
let connect_cfg = format!(
|
||||
"{} application_name=pageserver replication=true",
|
||||
wal_producer_connstr
|
||||
);
|
||||
|
||||
let mut rclient = Client::connect(&connect_cfg, NoTls)?;
|
||||
info!("connected!");
|
||||
@@ -166,7 +169,7 @@ fn walreceiver_main(
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint);
|
||||
|
||||
while let Some(replication_message) = physical_stream.next()? {
|
||||
match replication_message {
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
@@ -226,12 +229,14 @@ fn walreceiver_main(
|
||||
info!("caught up at LSN {}", endlsn);
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
Some(endlsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
|
||||
let wal_end = keepalive.wal_end();
|
||||
let timestamp = keepalive.timestamp();
|
||||
let reply_requested: bool = keepalive.reply() != 0;
|
||||
let reply_requested = keepalive.reply() != 0;
|
||||
|
||||
trace!(
|
||||
"received PrimaryKeepAlive(wal_end: {}, timestamp: {:?} reply: {})",
|
||||
@@ -239,20 +244,23 @@ fn walreceiver_main(
|
||||
timestamp,
|
||||
reply_requested,
|
||||
);
|
||||
if reply_requested {
|
||||
// TODO: More thought should go into what values are sent here.
|
||||
let last_lsn = PgLsn::from(u64::from(timeline.get_last_valid_lsn()));
|
||||
let write_lsn = last_lsn;
|
||||
let flush_lsn = last_lsn;
|
||||
let apply_lsn = PgLsn::from(0);
|
||||
let ts = SystemTime::now();
|
||||
const NO_REPLY: u8 = 0u8;
|
||||
|
||||
physical_stream
|
||||
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
|
||||
}
|
||||
reply_requested.then(|| timeline.get_last_valid_lsn())
|
||||
}
|
||||
_ => (),
|
||||
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(last_lsn) = status_update {
|
||||
// TODO: More thought should go into what values are sent here.
|
||||
let last_lsn = PgLsn::from(u64::from(last_lsn));
|
||||
let write_lsn = last_lsn;
|
||||
let flush_lsn = last_lsn;
|
||||
let apply_lsn = PgLsn::from(0);
|
||||
let ts = SystemTime::now();
|
||||
const NO_REPLY: u8 = 0;
|
||||
|
||||
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import psycopg2
|
||||
import time
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
@@ -38,13 +37,6 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin):
|
||||
cur.execute('SELECT count(*) FROM foo')
|
||||
assert cur.fetchone() == (2, )
|
||||
|
||||
# FIXME: Currently, there is no guarantee that by the time the INSERT commits, the WAL
|
||||
# has been streamed safely to the WAL safekeeper or page server. It is merely stored
|
||||
# on the Postgres instance's local disk. Sleep a little, to give it time to be
|
||||
# streamed. This should be removed, when we have the ability to run the Postgres
|
||||
# instance -> safekeeper streaming in synchronous mode.
|
||||
time.sleep(5)
|
||||
|
||||
# Stop, and destroy the Postgres instance. Then recreate and restart it.
|
||||
pg_conn.close()
|
||||
pg.stop_and_destroy()
|
||||
|
||||
Reference in New Issue
Block a user