mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
fully rip it out, see which tests break
This commit is contained in:
@@ -264,9 +264,6 @@ pub struct SkTimelineInfo {
|
||||
pub http_connstr: Option<String>,
|
||||
#[serde(default)]
|
||||
pub https_connstr: Option<String>,
|
||||
// Minimum of all active RO replicas flush LSN
|
||||
#[serde(default = "lsn_invalid")]
|
||||
pub standby_horizon: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
|
||||
@@ -708,15 +708,6 @@ static TIMELINE_ARCHIVE_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static STANDBY_HORIZON: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_standby_horizon",
|
||||
"Standby apply LSN for which GC is hold off, by timeline.",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_resident_physical_size",
|
||||
@@ -3161,7 +3152,6 @@ pub(crate) struct TimelineMetrics {
|
||||
pub pitr_history_size: UIntGauge,
|
||||
pub archival_size: UIntGauge,
|
||||
pub layers_per_read: Histogram,
|
||||
pub standby_horizon_gauge: IntGauge,
|
||||
pub resident_physical_size_gauge: UIntGauge,
|
||||
pub visible_physical_size_gauge: UIntGauge,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
@@ -3263,9 +3253,6 @@ impl TimelineMetrics {
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
let standby_horizon_gauge = STANDBY_HORIZON
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
@@ -3349,7 +3336,6 @@ impl TimelineMetrics {
|
||||
pitr_history_size,
|
||||
archival_size,
|
||||
layers_per_read,
|
||||
standby_horizon_gauge,
|
||||
resident_physical_size_gauge,
|
||||
visible_physical_size_gauge,
|
||||
current_logical_size_gauge,
|
||||
@@ -3492,7 +3478,6 @@ impl TimelineMetrics {
|
||||
let shard_id = &self.shard_id;
|
||||
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
{
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
|
||||
@@ -250,8 +250,6 @@ pub struct Timeline {
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
|
||||
pub(crate) standby_horizon: AtomicLsn,
|
||||
|
||||
// WAL redo manager. `None` only for broken tenants.
|
||||
walredo_mgr: Option<Arc<super::WalRedoManager>>,
|
||||
|
||||
@@ -3083,8 +3081,6 @@ impl Timeline {
|
||||
l0_compaction_trigger: resources.l0_compaction_trigger,
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
|
||||
standby_horizon: AtomicLsn::new(0),
|
||||
|
||||
pagestream_throttle: resources.pagestream_throttle,
|
||||
|
||||
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
|
||||
@@ -6469,32 +6465,7 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
|
||||
let standby_horizon = self.standby_horizon.load();
|
||||
// Hold GC for the standby, but as a safety guard do it only within some
|
||||
// reasonable lag.
|
||||
if standby_horizon != Lsn::INVALID {
|
||||
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {
|
||||
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
|
||||
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
|
||||
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
|
||||
trace!("holding off GC for standby apply LSN {}", standby_horizon);
|
||||
} else {
|
||||
warn!(
|
||||
"standby is lagging for more than {}MB, not holding gc for it",
|
||||
MAX_ALLOWED_STANDBY_LAG / 1024 / 1024
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset standby horizon to ignore it if it is not updated till next GC.
|
||||
// It is an easy way to unset it when standby disappears without adding
|
||||
// more conf options.
|
||||
self.standby_horizon.store(Lsn::INVALID);
|
||||
self.metrics
|
||||
.standby_horizon_gauge
|
||||
.set(Lsn::INVALID.0 as i64);
|
||||
let new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
|
||||
|
||||
let res = self
|
||||
.gc_timeline(
|
||||
|
||||
@@ -2935,7 +2935,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
|
||||
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
|
||||
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff).
|
||||
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
|
||||
/// here.
|
||||
pub(crate) fn get_gc_compaction_watermark(self: &Arc<Self>) -> Lsn {
|
||||
@@ -2944,9 +2944,6 @@ impl Timeline {
|
||||
gc_info.min_cutoff()
|
||||
};
|
||||
|
||||
// TODO: standby horizon should use leases so we don't really need to consider it here.
|
||||
// let watermark = watermark.min(self.standby_horizon.load());
|
||||
|
||||
// TODO: ensure the child branches will not use anything below the watermark, or consider
|
||||
// them when computing the watermark.
|
||||
gc_cutoff_lsn.min(*self.get_applied_gc_cutoff_lsn())
|
||||
|
||||
@@ -710,7 +710,6 @@ impl ConnectionManagerState {
|
||||
commit_lsn: info.commit_lsn,
|
||||
safekeeper_connstr: info.safekeeper_connstr,
|
||||
availability_zone: info.availability_zone,
|
||||
standby_horizon: info.standby_horizon,
|
||||
}
|
||||
}
|
||||
MessageType::SafekeeperDiscoveryResponse => {
|
||||
@@ -731,21 +730,6 @@ impl ConnectionManagerState {
|
||||
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
trace!(
|
||||
"safekeeper info update: standby_horizon(cutoff)={}",
|
||||
timeline_update.standby_horizon
|
||||
);
|
||||
if timeline_update.standby_horizon != 0 {
|
||||
// ignore reports from safekeepers not connected to replicas
|
||||
self.timeline
|
||||
.standby_horizon
|
||||
.store(Lsn(timeline_update.standby_horizon));
|
||||
self.timeline
|
||||
.metrics
|
||||
.standby_horizon_gauge
|
||||
.set(timeline_update.standby_horizon as i64);
|
||||
}
|
||||
|
||||
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
|
||||
let old_entry = self.wal_stream_candidates.insert(
|
||||
new_safekeeper_id,
|
||||
@@ -1129,7 +1113,6 @@ mod tests {
|
||||
commit_lsn,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
},
|
||||
latest_update,
|
||||
}
|
||||
|
||||
@@ -199,7 +199,6 @@ async fn discover_loop(
|
||||
commit_lsn: sk_info.commit_lsn,
|
||||
safekeeper_connstr: sk_info.safekeeper_connstr,
|
||||
availability_zone: sk_info.availability_zone,
|
||||
standby_horizon: 0,
|
||||
};
|
||||
|
||||
// note this is a blocking call
|
||||
|
||||
@@ -554,7 +554,6 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
standby_horizon: sk_info.standby_horizon.0,
|
||||
};
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
@@ -346,7 +346,6 @@ impl SharedState {
|
||||
&self,
|
||||
ttid: &TenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
standby_apply_lsn: Lsn,
|
||||
) -> SafekeeperTimelineInfo {
|
||||
SafekeeperTimelineInfo {
|
||||
safekeeper_id: conf.my_id.0,
|
||||
@@ -370,7 +369,6 @@ impl SharedState {
|
||||
backup_lsn: self.sk.state().inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state().local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
standby_horizon: standby_apply_lsn.0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -780,9 +778,8 @@ impl Timeline {
|
||||
|
||||
/// Get safekeeper info for broadcasting to broker and other peers.
|
||||
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let standby_apply_lsn = self.walsenders.get_hotstandby().reply.apply_lsn;
|
||||
let shared_state = self.read_shared_state().await;
|
||||
shared_state.get_safekeeper_info(&self.ttid, conf, standby_apply_lsn)
|
||||
shared_state.get_safekeeper_info(&self.ttid, conf)
|
||||
}
|
||||
|
||||
/// Update timeline state with peer safekeeper data.
|
||||
|
||||
@@ -154,7 +154,6 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
https_connstr: Some("zenith-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
};
|
||||
counter += 1;
|
||||
yield info;
|
||||
|
||||
@@ -25,6 +25,7 @@ message SubscribeSafekeeperInfoRequest {
|
||||
}
|
||||
}
|
||||
|
||||
// Next ID: 16
|
||||
message SafekeeperTimelineInfo {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
@@ -42,7 +43,6 @@ message SafekeeperTimelineInfo {
|
||||
uint64 remote_consistent_lsn = 7;
|
||||
uint64 peer_horizon_lsn = 8;
|
||||
uint64 local_start_lsn = 9;
|
||||
uint64 standby_horizon = 14;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string.
|
||||
@@ -99,6 +99,7 @@ message SafekeeperDiscoveryRequest {
|
||||
}
|
||||
|
||||
// Shorter version of SafekeeperTimelineInfo, contains only necessary fields.
|
||||
// Next ID: 7
|
||||
message SafekeeperDiscoveryResponse {
|
||||
uint64 safekeeper_id = 1;
|
||||
TenantTimelineId tenant_timeline_id = 2;
|
||||
@@ -108,6 +109,4 @@ message SafekeeperDiscoveryResponse {
|
||||
string safekeeper_connstr = 4;
|
||||
// Availability zone of a safekeeper.
|
||||
optional string availability_zone = 5;
|
||||
// Replica apply LSN
|
||||
uint64 standby_horizon = 6;
|
||||
}
|
||||
|
||||
@@ -856,7 +856,6 @@ mod tests {
|
||||
https_connstr: Some("neon-1-sk-1.local:7678".to_owned()),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_horizon: 0,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -165,7 +165,6 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
|
||||
"pageserver_last_record_lsn",
|
||||
"pageserver_disk_consistent_lsn",
|
||||
"pageserver_projected_remote_consistent_lsn",
|
||||
"pageserver_standby_horizon",
|
||||
"pageserver_smgr_query_seconds_bucket",
|
||||
"pageserver_smgr_query_seconds_count",
|
||||
"pageserver_smgr_query_seconds_sum",
|
||||
|
||||
Reference in New Issue
Block a user