Propagate remote_consistent_lsn to safekeepers.

Change meaning of lsns in HOT_STANDBY_FEEDBACK:
flush_lsn = disk_consistent_lsn,
apply_lsn = remote_consistent_lsn
Update compute node backpressure configuration respectively.

Update compute node configuration:
set 'synchronous_commit=remote_write' in setup without safekeepers.
This way compute node doesn't have to wait for data checkpoint on pageserver.
This doesn't guarantee data durability, but we only use this setup for tests, so it's fine.
This commit is contained in:
anastasia
2021-12-16 14:30:31 +03:00
committed by lubennikovaav
parent 42647f606e
commit 980f5f8440
8 changed files with 42 additions and 27 deletions

View File

@@ -300,8 +300,6 @@ impl PostgresNode {
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
conf.append("wal_sender_timeout", "5s");
conf.append("max_replication_flush_lag", "160MB");
conf.append("max_replication_apply_lag", "1500MB");
conf.append("listen_addresses", &self.address.ip().to_string());
conf.append("port", &self.address.port().to_string());
@@ -343,6 +341,11 @@ impl PostgresNode {
conf.append_line("");
if !self.env.safekeepers.is_empty() {
// Configure backpressure
// In setup with safekeepers apply_lag depends on
// speed of data checkpointing on pageserver (see disk_consistent_lsn).
conf.append("max_replication_apply_lag", "1500MB");
// Configure the node to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
@@ -355,6 +358,16 @@ impl PostgresNode {
.join(",");
conf.append("wal_acceptors", &wal_acceptors);
} else {
// Configure backpressure
// In setup without safekeepers, flush_lag depends on
// speed of of data checkpointing on pageserver (see disk_consistent_lsn)
conf.append("max_replication_flush_lag", "1500MB");
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.

View File

@@ -794,8 +794,8 @@ impl Timeline for LayeredTimeline {
.wait_for_timeout(lsn, TIMEOUT)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, disk consistent LSN={}",
lsn, self.get_disk_consistent_lsn()
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn()
)
})?;

View File

@@ -195,9 +195,6 @@ fn walreceiver_main(
tenantid, timelineid,
)
})?;
let _timeline_synced_disk_consistent_lsn = tenant_mgr::get_repository_for_tenant(tenantid)?
.get_timeline_state(timelineid)
.and_then(|state| state.remote_disk_consistent_lsn());
//
// Start streaming the WAL, from where we left off previously.
@@ -287,23 +284,19 @@ fn walreceiver_main(
if let Some(last_lsn) = status_update {
let last_lsn = PgLsn::from(u64::from(last_lsn));
let timeline_synced_disk_consistent_lsn =
tenant_mgr::get_repository_for_tenant(tenantid)?
.get_timeline_state(timelineid)
.and_then(|state| state.remote_disk_consistent_lsn())
.unwrap_or(Lsn(0));
// The last LSN we processed. It is not guaranteed to survive pageserver crash.
let write_lsn = last_lsn;
// This value doesn't guarantee data durability, but it's ok.
// In setup with WAL service, pageserver durability is guaranteed by safekeepers.
// In setup without WAL service, we just don't care.
let flush_lsn = write_lsn;
// `disk_consistent_lsn` is the LSN at which page server guarantees persistence of all received data
// Depending on the setup we recieve WAL directly from Compute Node or
// from a WAL service.
//
// Senders use the feedback to determine if we are caught up:
// - Safekeepers are free to remove WAL preceding `apply_lsn`,
// as it will never be requested by this page server.
// - Compute Node uses 'apply_lsn' to calculate a lag for back pressure mechanism
// (delay WAL inserts to avoid lagging pageserver responses and WAL overflow).
let apply_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
// `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
let flush_lsn = PgLsn::from(u64::from(timeline.get_disk_consistent_lsn()));
// The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
// Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
let apply_lsn = PgLsn::from(u64::from(timeline_synced_disk_consistent_lsn));
let ts = SystemTime::now();
const NO_REPLY: u8 = 0;
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;

View File

@@ -53,7 +53,8 @@ def test_multixact(zenith_simple_env: ZenithEnv, test_output_dir):
# force wal flush
cur.execute('checkpoint')
cur.execute('SELECT next_multixact_id, pg_current_wal_flush_lsn() FROM pg_control_checkpoint()')
cur.execute(
'SELECT next_multixact_id, pg_current_wal_insert_lsn() FROM pg_control_checkpoint()')
res = cur.fetchone()
next_multixact_id = res[0]
lsn = res[1]

View File

@@ -740,7 +740,7 @@ where
/*
* Update truncate and commit LSN in control file.
* To avoid negative impact on performance of extra fsync, do it only
* when restart_lsn delta exceeds WAL segment size.
* when truncate_lsn delta exceeds WAL segment size.
*/
sync_control_file |=
self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn;

View File

@@ -58,8 +58,8 @@ impl HotStandbyFeedback {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // last lsn received by pageserver
pub flush_lsn: Lsn, // not used
pub apply_lsn: Lsn, // pageserver's disk consistent lSN
pub flush_lsn: Lsn, // pageserver's disk consistent lSN
pub apply_lsn: Lsn, // pageserver's remote consistent lSN
pub reply_ts: TimestampTz,
pub reply_requested: bool,
}
@@ -145,8 +145,9 @@ impl ReplicationConn {
Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
let reply = StandbyReply::des(&m[1..])
.context("failed to deserialize StandbyReply")?;
state.disk_consistent_lsn = reply.apply_lsn;
state.last_received_lsn = reply.write_lsn;
state.disk_consistent_lsn = reply.flush_lsn;
state.remote_consistent_lsn = reply.apply_lsn;
timeline.update_replica_state(replica_id, Some(state));
}
_ => warn!("unexpected message {:?}", msg),

View File

@@ -42,6 +42,8 @@ pub struct ReplicaState {
pub last_received_lsn: Lsn, // None means we don't know
/// combined disk_consistent_lsn of pageservers
pub disk_consistent_lsn: Lsn,
/// combined remote consistent lsn of pageservers
pub remote_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
}
@@ -57,6 +59,7 @@ impl ReplicaState {
ReplicaState {
last_received_lsn: Lsn::MAX,
disk_consistent_lsn: Lsn(u64::MAX),
remote_consistent_lsn: Lsn(u64::MAX),
hs_feedback: HotStandbyFeedback {
ts: 0,
xmin: u64::MAX,
@@ -109,6 +112,10 @@ impl SharedState {
acc.disk_consistent_lsn = Lsn::min(acc.disk_consistent_lsn, state.disk_consistent_lsn);
// currently not used, but update it to be consistent
acc.last_received_lsn = Lsn::min(acc.last_received_lsn, state.last_received_lsn);
// When at least one replica has preserved data up to remote_consistent_lsn,
// safekeeper is free to delete it, so chose max of all replicas.
acc.remote_consistent_lsn =
Lsn::max(acc.remote_consistent_lsn, state.remote_consistent_lsn);
}
acc
}