Rename ZenithFeedback (#1912)

This commit is contained in:
Kirill Bulatov
2022-06-11 00:44:05 +03:00
committed by GitHub
parent e1336f451d
commit d8a37452c8
8 changed files with 61 additions and 61 deletions

View File

@@ -36,12 +36,12 @@ This is how the `LOGICAL_TIMELINE_SIZE` metric is implemented in the pageserver.
Alternatively, we could count only relation data. As in pg_database_size().
This approach is somewhat more user-friendly because it is the data that is really affected by the user.
On the other hand, it puts us in a weaker position than other services, i.e., RDS.
We will need to refactor the timeline_size counter or add another counter to implement it.
We will need to refactor the timeline_size counter or add another counter to implement it.
Timeline size is updated during wal digestion. It is not versioned and is valid at the last_received_lsn moment.
Then this size should be reported to compute node.
`current_timeline_size` value is included in the walreceiver's custom feedback message: `ZenithFeedback.`
`current_timeline_size` value is included in the walreceiver's custom feedback message: `ReplicationFeedback.`
(PR about protocol changes https://github.com/zenithdb/zenith/pull/1037).
@@ -64,11 +64,11 @@ We should warn users if the limit is soon to be reached.
### **Reliability, failure modes and corner cases**
1. `current_timeline_size` is valid at the last received and digested by pageserver lsn.
If pageserver lags behind compute node, `current_timeline_size` will lag too. This lag can be tuned using backpressure, but it is not expected to be 0 all the time.
So transactions that happen in this lsn range may cause limit overflow. Especially operations that generate (i.e., CREATE DATABASE) or free (i.e., TRUNCATE) a lot of data pages while generating a small amount of WAL. Are there other operations like this?
Currently, CREATE DATABASE operations are restricted in the console. So this is not an issue.

View File

@@ -926,10 +926,10 @@ impl<'a> BeMessage<'a> {
}
}
// Zenith extension of postgres replication protocol
// See ZENITH_STATUS_UPDATE_TAG_BYTE
// Neon extension of postgres replication protocol
// See NEON_STATUS_UPDATE_TAG_BYTE
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct ZenithFeedback {
pub struct ReplicationFeedback {
// Last known size of the timeline. Used to enforce timeline size limit.
pub current_timeline_size: u64,
// Parts of StandbyStatusUpdate we resend to compute via safekeeper
@@ -939,13 +939,13 @@ pub struct ZenithFeedback {
pub ps_replytime: SystemTime,
}
// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback.
// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const ZENITH_FEEDBACK_FIELDS_NUMBER: u8 = 5;
pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl ZenithFeedback {
pub fn empty() -> ZenithFeedback {
ZenithFeedback {
impl ReplicationFeedback {
pub fn empty() -> ReplicationFeedback {
ReplicationFeedback {
current_timeline_size: 0,
ps_writelsn: 0,
ps_applylsn: 0,
@@ -954,7 +954,7 @@ impl ZenithFeedback {
}
}
// Serialize ZenithFeedback using custom format
// Serialize ReplicationFeedback using custom format
// to support protocol extensibility.
//
// Following layout is used:
@@ -965,7 +965,7 @@ impl ZenithFeedback {
// uint32 - value length in bytes
// value itself
pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> {
buf.put_u8(ZENITH_FEEDBACK_FIELDS_NUMBER); // # of keys
buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys
write_cstr(&Bytes::from("current_timeline_size"), buf)?;
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
@@ -992,9 +992,9 @@ impl ZenithFeedback {
Ok(())
}
// Deserialize ZenithFeedback message
pub fn parse(mut buf: Bytes) -> ZenithFeedback {
let mut zf = ZenithFeedback::empty();
// Deserialize ReplicationFeedback message
pub fn parse(mut buf: Bytes) -> ReplicationFeedback {
let mut zf = ReplicationFeedback::empty();
let nfields = buf.get_u8();
let mut i = 0;
while i < nfields {
@@ -1035,14 +1035,14 @@ impl ZenithFeedback {
_ => {
let len = buf.get_i32();
warn!(
"ZenithFeedback parse. unknown key {} of len {}. Skip it.",
"ReplicationFeedback parse. unknown key {} of len {}. Skip it.",
key, len
);
buf.advance(len as usize);
}
}
}
trace!("ZenithFeedback parsed is {:?}", zf);
trace!("ReplicationFeedback parsed is {:?}", zf);
zf
}
}
@@ -1052,8 +1052,8 @@ mod tests {
use super::*;
#[test]
fn test_zenithfeedback_serialization() {
let mut zf = ZenithFeedback::empty();
fn test_replication_feedback_serialization() {
let mut zf = ReplicationFeedback::empty();
// Fill zf with some values
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
@@ -1062,13 +1062,13 @@ mod tests {
let mut data = BytesMut::new();
zf.serialize(&mut data).unwrap();
let zf_parsed = ZenithFeedback::parse(data.freeze());
let zf_parsed = ReplicationFeedback::parse(data.freeze());
assert_eq!(zf, zf_parsed);
}
#[test]
fn test_zenithfeedback_unknown_key() {
let mut zf = ZenithFeedback::empty();
fn test_replication_feedback_unknown_key() {
let mut zf = ReplicationFeedback::empty();
// Fill zf with some values
zf.current_timeline_size = 12345678;
// Set rounded time to be able to compare it with deserialized value,
@@ -1079,7 +1079,7 @@ mod tests {
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = ZENITH_FEEDBACK_FIELDS_NUMBER + 1;
*first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1;
}
write_cstr(&Bytes::from("new_field_one"), &mut data).unwrap();
@@ -1087,7 +1087,7 @@ mod tests {
data.put_u64(42);
// Parse serialized data and check that new field is not parsed
let zf_parsed = ZenithFeedback::parse(data.freeze());
let zf_parsed = ReplicationFeedback::parse(data.freeze());
assert_eq!(zf, zf_parsed);
}

View File

@@ -71,7 +71,7 @@ use tokio::{
use tracing::*;
use url::Url;
use utils::lsn::Lsn;
use utils::pq_proto::ZenithFeedback;
use utils::pq_proto::ReplicationFeedback;
use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId};
use self::connection_handler::{WalConnectionEvent, WalReceiverConnection};
@@ -521,7 +521,7 @@ struct WalConnectionData {
safekeeper_id: NodeId,
connection: WalReceiverConnection,
connection_init_time: NaiveDateTime,
last_wal_receiver_data: Option<(ZenithFeedback, NaiveDateTime)>,
last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>,
}
#[derive(Debug, PartialEq, Eq)]
@@ -846,7 +846,7 @@ mod tests {
.await;
let now = Utc::now().naive_utc();
dummy_connection_data.last_wal_receiver_data = Some((
ZenithFeedback {
ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: 1,
ps_applylsn: current_lsn,
@@ -1017,7 +1017,7 @@ mod tests {
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
dummy_connection_data.last_wal_receiver_data = Some((
ZenithFeedback {
ReplicationFeedback {
current_timeline_size: 1,
ps_writelsn: current_lsn.0,
ps_applylsn: 1,

View File

@@ -19,7 +19,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantTimelineId},
};
@@ -33,7 +33,7 @@ use crate::{
#[derive(Debug, Clone)]
pub enum WalConnectionEvent {
Started,
NewWal(ZenithFeedback),
NewWal(ReplicationFeedback),
End(Result<(), String>),
}
@@ -328,7 +328,7 @@ async fn handle_walreceiver_connection(
// Send zenith feedback message.
// Regular standby_status_update fields are put into this message.
let zenith_status_update = ZenithFeedback {
let zenith_status_update = ReplicationFeedback {
current_timeline_size: timeline.get_current_logical_size() as u64,
ps_writelsn: write_lsn,
ps_flushlsn: flush_lsn,

View File

@@ -242,9 +242,9 @@ impl Collector for TimelineCollector {
let timeline_id = tli.zttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
let mut most_advanced: Option<utils::pq_proto::ZenithFeedback> = None;
let mut most_advanced: Option<utils::pq_proto::ReplicationFeedback> = None;
for replica in tli.replicas.iter() {
if let Some(replica_feedback) = replica.zenith_feedback {
if let Some(replica_feedback) = replica.pageserver_feedback {
if let Some(current) = most_advanced {
if current.ps_writelsn < replica_feedback.ps_writelsn {
most_advanced = Some(replica_feedback);

View File

@@ -23,7 +23,7 @@ use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
use utils::{
bin_ser::LeSer,
lsn::Lsn,
pq_proto::{SystemId, ZenithFeedback},
pq_proto::{ReplicationFeedback, SystemId},
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
@@ -348,7 +348,7 @@ pub struct AppendResponse {
// a criterion for walproposer --sync mode exit
pub commit_lsn: Lsn,
pub hs_feedback: HotStandbyFeedback,
pub zenith_feedback: ZenithFeedback,
pub pageserver_feedback: ReplicationFeedback,
}
impl AppendResponse {
@@ -358,7 +358,7 @@ impl AppendResponse {
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
hs_feedback: HotStandbyFeedback::empty(),
zenith_feedback: ZenithFeedback::empty(),
pageserver_feedback: ReplicationFeedback::empty(),
}
}
}
@@ -476,7 +476,7 @@ impl AcceptorProposerMessage {
buf.put_u64_le(msg.hs_feedback.xmin);
buf.put_u64_le(msg.hs_feedback.catalog_xmin);
msg.zenith_feedback.serialize(buf)?
msg.pageserver_feedback.serialize(buf)?
}
}
@@ -677,7 +677,7 @@ where
commit_lsn: self.state.commit_lsn,
// will be filled by the upper code to avoid bothering safekeeper
hs_feedback: HotStandbyFeedback::empty(),
zenith_feedback: ZenithFeedback::empty(),
pageserver_feedback: ReplicationFeedback::empty(),
};
trace!("formed AppendResponse {:?}", ar);
ar

View File

@@ -21,7 +21,7 @@ use utils::{
bin_ser::BeSer,
lsn::Lsn,
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback},
pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody},
sock_split::ReadStream,
};
@@ -29,7 +29,7 @@ use utils::{
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
// zenith extension of replication protocol
const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
type FullTransactionId = u64;
@@ -122,15 +122,15 @@ impl ReplicationConn {
warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet.");
// timeline.update_replica_state(replica_id, Some(state));
}
Some(ZENITH_STATUS_UPDATE_TAG_BYTE) => {
Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
// Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
let buf = Bytes::copy_from_slice(&m[9..]);
let reply = ZenithFeedback::parse(buf);
let reply = ReplicationFeedback::parse(buf);
trace!("ZenithFeedback is {:?}", reply);
// Only pageserver sends ZenithFeedback, so set the flag.
trace!("ReplicationFeedback is {:?}", reply);
// Only pageserver sends ReplicationFeedback, so set the flag.
// This replica is the source of information to resend to compute.
state.zenith_feedback = Some(reply);
state.pageserver_feedback = Some(reply);
timeline.update_replica_state(replica_id, state);
}

View File

@@ -21,7 +21,7 @@ use tracing::*;
use utils::{
lsn::Lsn,
pq_proto::ZenithFeedback,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantId, ZTenantTimelineId},
};
@@ -48,8 +48,8 @@ pub struct ReplicaState {
pub remote_consistent_lsn: Lsn,
/// combined hot standby feedback from all replicas
pub hs_feedback: HotStandbyFeedback,
/// Zenith specific feedback received from pageserver, if any
pub zenith_feedback: Option<ZenithFeedback>,
/// Replication specific feedback received from pageserver, if any
pub pageserver_feedback: Option<ReplicationFeedback>,
}
impl Default for ReplicaState {
@@ -68,7 +68,7 @@ impl ReplicaState {
xmin: u64::MAX,
catalog_xmin: u64::MAX,
},
zenith_feedback: None,
pageserver_feedback: None,
}
}
}
@@ -221,25 +221,25 @@ impl SharedState {
// we need to know which pageserver compute node considers to be main.
// See https://github.com/zenithdb/zenith/issues/1171
//
if let Some(zenith_feedback) = state.zenith_feedback {
if let Some(acc_feedback) = acc.zenith_feedback {
if acc_feedback.ps_writelsn < zenith_feedback.ps_writelsn {
if let Some(pageserver_feedback) = state.pageserver_feedback {
if let Some(acc_feedback) = acc.pageserver_feedback {
if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn {
warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet.");
acc.zenith_feedback = Some(zenith_feedback);
acc.pageserver_feedback = Some(pageserver_feedback);
}
} else {
acc.zenith_feedback = Some(zenith_feedback);
acc.pageserver_feedback = Some(pageserver_feedback);
}
// last lsn received by pageserver
// FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver.
// See https://github.com/zenithdb/zenith/issues/1171
acc.last_received_lsn = Lsn::from(zenith_feedback.ps_writelsn);
acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn);
// When at least one pageserver has preserved data up to remote_consistent_lsn,
// safekeeper is free to delete it, so choose max of all pageservers.
acc.remote_consistent_lsn = max(
Lsn::from(zenith_feedback.ps_applylsn),
Lsn::from(pageserver_feedback.ps_applylsn),
acc.remote_consistent_lsn,
);
}
@@ -457,8 +457,8 @@ impl Timeline {
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
let state = shared_state.get_replicas_state();
resp.hs_feedback = state.hs_feedback;
if let Some(zenith_feedback) = state.zenith_feedback {
resp.zenith_feedback = zenith_feedback;
if let Some(pageserver_feedback) = state.pageserver_feedback {
resp.pageserver_feedback = pageserver_feedback;
}
}