diff --git a/docs/rfcs/cluster-size-limits.md b/docs/rfcs/cluster-size-limits.md index bd12fb6eee..bd4cb9ef32 100644 --- a/docs/rfcs/cluster-size-limits.md +++ b/docs/rfcs/cluster-size-limits.md @@ -36,12 +36,12 @@ This is how the `LOGICAL_TIMELINE_SIZE` metric is implemented in the pageserver. Alternatively, we could count only relation data. As in pg_database_size(). This approach is somewhat more user-friendly because it is the data that is really affected by the user. On the other hand, it puts us in a weaker position than other services, i.e., RDS. -We will need to refactor the timeline_size counter or add another counter to implement it. +We will need to refactor the timeline_size counter or add another counter to implement it. Timeline size is updated during wal digestion. It is not versioned and is valid at the last_received_lsn moment. Then this size should be reported to compute node. -`current_timeline_size` value is included in the walreceiver's custom feedback message: `ZenithFeedback.` +`current_timeline_size` value is included in the walreceiver's custom feedback message: `ReplicationFeedback.` (PR about protocol changes https://github.com/zenithdb/zenith/pull/1037). @@ -64,11 +64,11 @@ We should warn users if the limit is soon to be reached. ### **Reliability, failure modes and corner cases** 1. `current_timeline_size` is valid at the last received and digested by pageserver lsn. - + If pageserver lags behind compute node, `current_timeline_size` will lag too. This lag can be tuned using backpressure, but it is not expected to be 0 all the time. - + So transactions that happen in this lsn range may cause limit overflow. Especially operations that generate (i.e., CREATE DATABASE) or free (i.e., TRUNCATE) a lot of data pages while generating a small amount of WAL. Are there other operations like this? - + Currently, CREATE DATABASE operations are restricted in the console. So this is not an issue. diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index 599af3fc68..0a320f123c 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -926,10 +926,10 @@ impl<'a> BeMessage<'a> { } } -// Zenith extension of postgres replication protocol -// See ZENITH_STATUS_UPDATE_TAG_BYTE +// Neon extension of postgres replication protocol +// See NEON_STATUS_UPDATE_TAG_BYTE #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -pub struct ZenithFeedback { +pub struct ReplicationFeedback { // Last known size of the timeline. Used to enforce timeline size limit. pub current_timeline_size: u64, // Parts of StandbyStatusUpdate we resend to compute via safekeeper @@ -939,13 +939,13 @@ pub struct ZenithFeedback { pub ps_replytime: SystemTime, } -// NOTE: Do not forget to increment this number when adding new fields to ZenithFeedback. +// NOTE: Do not forget to increment this number when adding new fields to ReplicationFeedback. // Do not remove previously available fields because this might be backwards incompatible. -pub const ZENITH_FEEDBACK_FIELDS_NUMBER: u8 = 5; +pub const REPLICATION_FEEDBACK_FIELDS_NUMBER: u8 = 5; -impl ZenithFeedback { - pub fn empty() -> ZenithFeedback { - ZenithFeedback { +impl ReplicationFeedback { + pub fn empty() -> ReplicationFeedback { + ReplicationFeedback { current_timeline_size: 0, ps_writelsn: 0, ps_applylsn: 0, @@ -954,7 +954,7 @@ impl ZenithFeedback { } } - // Serialize ZenithFeedback using custom format + // Serialize ReplicationFeedback using custom format // to support protocol extensibility. // // Following layout is used: @@ -965,7 +965,7 @@ impl ZenithFeedback { // uint32 - value length in bytes // value itself pub fn serialize(&self, buf: &mut BytesMut) -> Result<()> { - buf.put_u8(ZENITH_FEEDBACK_FIELDS_NUMBER); // # of keys + buf.put_u8(REPLICATION_FEEDBACK_FIELDS_NUMBER); // # of keys write_cstr(&Bytes::from("current_timeline_size"), buf)?; buf.put_i32(8); buf.put_u64(self.current_timeline_size); @@ -992,9 +992,9 @@ impl ZenithFeedback { Ok(()) } - // Deserialize ZenithFeedback message - pub fn parse(mut buf: Bytes) -> ZenithFeedback { - let mut zf = ZenithFeedback::empty(); + // Deserialize ReplicationFeedback message + pub fn parse(mut buf: Bytes) -> ReplicationFeedback { + let mut zf = ReplicationFeedback::empty(); let nfields = buf.get_u8(); let mut i = 0; while i < nfields { @@ -1035,14 +1035,14 @@ impl ZenithFeedback { _ => { let len = buf.get_i32(); warn!( - "ZenithFeedback parse. unknown key {} of len {}. Skip it.", + "ReplicationFeedback parse. unknown key {} of len {}. Skip it.", key, len ); buf.advance(len as usize); } } } - trace!("ZenithFeedback parsed is {:?}", zf); + trace!("ReplicationFeedback parsed is {:?}", zf); zf } } @@ -1052,8 +1052,8 @@ mod tests { use super::*; #[test] - fn test_zenithfeedback_serialization() { - let mut zf = ZenithFeedback::empty(); + fn test_replication_feedback_serialization() { + let mut zf = ReplicationFeedback::empty(); // Fill zf with some values zf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, @@ -1062,13 +1062,13 @@ mod tests { let mut data = BytesMut::new(); zf.serialize(&mut data).unwrap(); - let zf_parsed = ZenithFeedback::parse(data.freeze()); + let zf_parsed = ReplicationFeedback::parse(data.freeze()); assert_eq!(zf, zf_parsed); } #[test] - fn test_zenithfeedback_unknown_key() { - let mut zf = ZenithFeedback::empty(); + fn test_replication_feedback_unknown_key() { + let mut zf = ReplicationFeedback::empty(); // Fill zf with some values zf.current_timeline_size = 12345678; // Set rounded time to be able to compare it with deserialized value, @@ -1079,7 +1079,7 @@ mod tests { // Add an extra field to the buffer and adjust number of keys if let Some(first) = data.first_mut() { - *first = ZENITH_FEEDBACK_FIELDS_NUMBER + 1; + *first = REPLICATION_FEEDBACK_FIELDS_NUMBER + 1; } write_cstr(&Bytes::from("new_field_one"), &mut data).unwrap(); @@ -1087,7 +1087,7 @@ mod tests { data.put_u64(42); // Parse serialized data and check that new field is not parsed - let zf_parsed = ZenithFeedback::parse(data.freeze()); + let zf_parsed = ReplicationFeedback::parse(data.freeze()); assert_eq!(zf, zf_parsed); } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 32bd88cf7c..82401e1d8c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -71,7 +71,7 @@ use tokio::{ use tracing::*; use url::Url; use utils::lsn::Lsn; -use utils::pq_proto::ZenithFeedback; +use utils::pq_proto::ReplicationFeedback; use utils::zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}; use self::connection_handler::{WalConnectionEvent, WalReceiverConnection}; @@ -521,7 +521,7 @@ struct WalConnectionData { safekeeper_id: NodeId, connection: WalReceiverConnection, connection_init_time: NaiveDateTime, - last_wal_receiver_data: Option<(ZenithFeedback, NaiveDateTime)>, + last_wal_receiver_data: Option<(ReplicationFeedback, NaiveDateTime)>, } #[derive(Debug, PartialEq, Eq)] @@ -846,7 +846,7 @@ mod tests { .await; let now = Utc::now().naive_utc(); dummy_connection_data.last_wal_receiver_data = Some(( - ZenithFeedback { + ReplicationFeedback { current_timeline_size: 1, ps_writelsn: 1, ps_applylsn: current_lsn, @@ -1017,7 +1017,7 @@ mod tests { let time_over_threshold = Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout; dummy_connection_data.last_wal_receiver_data = Some(( - ZenithFeedback { + ReplicationFeedback { current_timeline_size: 1, ps_writelsn: current_lsn.0, ps_applylsn: 1, diff --git a/pageserver/src/walreceiver/connection_handler.rs b/pageserver/src/walreceiver/connection_handler.rs index aaccee9730..97b9b8cc9b 100644 --- a/pageserver/src/walreceiver/connection_handler.rs +++ b/pageserver/src/walreceiver/connection_handler.rs @@ -19,7 +19,7 @@ use tokio_stream::StreamExt; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use utils::{ lsn::Lsn, - pq_proto::ZenithFeedback, + pq_proto::ReplicationFeedback, zid::{NodeId, ZTenantTimelineId}, }; @@ -33,7 +33,7 @@ use crate::{ #[derive(Debug, Clone)] pub enum WalConnectionEvent { Started, - NewWal(ZenithFeedback), + NewWal(ReplicationFeedback), End(Result<(), String>), } @@ -328,7 +328,7 @@ async fn handle_walreceiver_connection( // Send zenith feedback message. // Regular standby_status_update fields are put into this message. - let zenith_status_update = ZenithFeedback { + let zenith_status_update = ReplicationFeedback { current_timeline_size: timeline.get_current_logical_size() as u64, ps_writelsn: write_lsn, ps_flushlsn: flush_lsn, diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 5a2e5f125f..fe4f9d231c 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -242,9 +242,9 @@ impl Collector for TimelineCollector { let timeline_id = tli.zttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; - let mut most_advanced: Option = None; + let mut most_advanced: Option = None; for replica in tli.replicas.iter() { - if let Some(replica_feedback) = replica.zenith_feedback { + if let Some(replica_feedback) = replica.pageserver_feedback { if let Some(current) = most_advanced { if current.ps_writelsn < replica_feedback.ps_writelsn { most_advanced = Some(replica_feedback); diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index eb6316dec2..7986fa5834 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -23,7 +23,7 @@ use postgres_ffi::xlog_utils::MAX_SEND_SIZE; use utils::{ bin_ser::LeSer, lsn::Lsn, - pq_proto::{SystemId, ZenithFeedback}, + pq_proto::{ReplicationFeedback, SystemId}, zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}, }; @@ -348,7 +348,7 @@ pub struct AppendResponse { // a criterion for walproposer --sync mode exit pub commit_lsn: Lsn, pub hs_feedback: HotStandbyFeedback, - pub zenith_feedback: ZenithFeedback, + pub pageserver_feedback: ReplicationFeedback, } impl AppendResponse { @@ -358,7 +358,7 @@ impl AppendResponse { flush_lsn: Lsn(0), commit_lsn: Lsn(0), hs_feedback: HotStandbyFeedback::empty(), - zenith_feedback: ZenithFeedback::empty(), + pageserver_feedback: ReplicationFeedback::empty(), } } } @@ -476,7 +476,7 @@ impl AcceptorProposerMessage { buf.put_u64_le(msg.hs_feedback.xmin); buf.put_u64_le(msg.hs_feedback.catalog_xmin); - msg.zenith_feedback.serialize(buf)? + msg.pageserver_feedback.serialize(buf)? } } @@ -677,7 +677,7 @@ where commit_lsn: self.state.commit_lsn, // will be filled by the upper code to avoid bothering safekeeper hs_feedback: HotStandbyFeedback::empty(), - zenith_feedback: ZenithFeedback::empty(), + pageserver_feedback: ReplicationFeedback::empty(), }; trace!("formed AppendResponse {:?}", ar); ar diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index fd82a55efa..11e5b963c9 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -21,7 +21,7 @@ use utils::{ bin_ser::BeSer, lsn::Lsn, postgres_backend::PostgresBackend, - pq_proto::{BeMessage, FeMessage, WalSndKeepAlive, XLogDataBody, ZenithFeedback}, + pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}, sock_split::ReadStream, }; @@ -29,7 +29,7 @@ use utils::{ const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; // zenith extension of replication protocol -const ZENITH_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; +const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; type FullTransactionId = u64; @@ -122,15 +122,15 @@ impl ReplicationConn { warn!("unexpected StandbyReply. Read-only postgres replicas are not supported in safekeepers yet."); // timeline.update_replica_state(replica_id, Some(state)); } - Some(ZENITH_STATUS_UPDATE_TAG_BYTE) => { + Some(NEON_STATUS_UPDATE_TAG_BYTE) => { // Note: deserializing is on m[9..] because we skip the tag byte and len bytes. let buf = Bytes::copy_from_slice(&m[9..]); - let reply = ZenithFeedback::parse(buf); + let reply = ReplicationFeedback::parse(buf); - trace!("ZenithFeedback is {:?}", reply); - // Only pageserver sends ZenithFeedback, so set the flag. + trace!("ReplicationFeedback is {:?}", reply); + // Only pageserver sends ReplicationFeedback, so set the flag. // This replica is the source of information to resend to compute. - state.zenith_feedback = Some(reply); + state.pageserver_feedback = Some(reply); timeline.update_replica_state(replica_id, state); } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 42bb02c1ea..39f2593dbc 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -21,7 +21,7 @@ use tracing::*; use utils::{ lsn::Lsn, - pq_proto::ZenithFeedback, + pq_proto::ReplicationFeedback, zid::{NodeId, ZTenantId, ZTenantTimelineId}, }; @@ -48,8 +48,8 @@ pub struct ReplicaState { pub remote_consistent_lsn: Lsn, /// combined hot standby feedback from all replicas pub hs_feedback: HotStandbyFeedback, - /// Zenith specific feedback received from pageserver, if any - pub zenith_feedback: Option, + /// Replication specific feedback received from pageserver, if any + pub pageserver_feedback: Option, } impl Default for ReplicaState { @@ -68,7 +68,7 @@ impl ReplicaState { xmin: u64::MAX, catalog_xmin: u64::MAX, }, - zenith_feedback: None, + pageserver_feedback: None, } } } @@ -221,25 +221,25 @@ impl SharedState { // we need to know which pageserver compute node considers to be main. // See https://github.com/zenithdb/zenith/issues/1171 // - if let Some(zenith_feedback) = state.zenith_feedback { - if let Some(acc_feedback) = acc.zenith_feedback { - if acc_feedback.ps_writelsn < zenith_feedback.ps_writelsn { + if let Some(pageserver_feedback) = state.pageserver_feedback { + if let Some(acc_feedback) = acc.pageserver_feedback { + if acc_feedback.ps_writelsn < pageserver_feedback.ps_writelsn { warn!("More than one pageserver is streaming WAL for the timeline. Feedback resolving is not fully supported yet."); - acc.zenith_feedback = Some(zenith_feedback); + acc.pageserver_feedback = Some(pageserver_feedback); } } else { - acc.zenith_feedback = Some(zenith_feedback); + acc.pageserver_feedback = Some(pageserver_feedback); } // last lsn received by pageserver // FIXME if multiple pageservers are streaming WAL, last_received_lsn must be tracked per pageserver. // See https://github.com/zenithdb/zenith/issues/1171 - acc.last_received_lsn = Lsn::from(zenith_feedback.ps_writelsn); + acc.last_received_lsn = Lsn::from(pageserver_feedback.ps_writelsn); // When at least one pageserver has preserved data up to remote_consistent_lsn, // safekeeper is free to delete it, so choose max of all pageservers. acc.remote_consistent_lsn = max( - Lsn::from(zenith_feedback.ps_applylsn), + Lsn::from(pageserver_feedback.ps_applylsn), acc.remote_consistent_lsn, ); } @@ -457,8 +457,8 @@ impl Timeline { if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { let state = shared_state.get_replicas_state(); resp.hs_feedback = state.hs_feedback; - if let Some(zenith_feedback) = state.zenith_feedback { - resp.zenith_feedback = zenith_feedback; + if let Some(pageserver_feedback) = state.pageserver_feedback { + resp.pageserver_feedback = pageserver_feedback; } }