mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 02:30:37 +00:00
Compare commits
1 Commits
jcsp/layer
...
no_xid_ali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ed8aea612f |
@@ -9,7 +9,6 @@ use std::{
|
||||
collections::HashMap,
|
||||
io::{BufRead, Read},
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
str::FromStr,
|
||||
sync::atomic::AtomicUsize,
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
@@ -438,7 +437,18 @@ pub enum CompactionAlgorithm {
|
||||
Tiered,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
strum_macros::FromRepr,
|
||||
strum_macros::EnumString,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum ImageCompressionAlgorithm {
|
||||
/// Disabled for writes, and never decompress during reading.
|
||||
/// Never set this after you've enabled compression once!
|
||||
@@ -458,31 +468,6 @@ impl ImageCompressionAlgorithm {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ImageCompressionAlgorithm {
|
||||
type Err = anyhow::Error;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let mut components = s.split(['(', ')']);
|
||||
let first = components
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
|
||||
match first {
|
||||
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
|
||||
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
|
||||
"zstd" => {
|
||||
let level = if let Some(v) = components.next() {
|
||||
let v: i8 = v.parse()?;
|
||||
Some(v)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(ImageCompressionAlgorithm::Zstd { level })
|
||||
}
|
||||
_ => anyhow::bail!("invalid specifier '{first}'"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompactionAlgorithmSettings {
|
||||
pub kind: CompactionAlgorithm,
|
||||
@@ -1675,29 +1660,4 @@ mod tests {
|
||||
AuxFilePolicy::CrossValidation
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_image_compression_algorithm_parsing() {
|
||||
use ImageCompressionAlgorithm::*;
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
|
||||
Disabled
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
|
||||
DisabledNoDecompress
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
|
||||
Zstd { level: None }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
|
||||
Zstd { level: Some(18) }
|
||||
);
|
||||
assert_eq!(
|
||||
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
|
||||
Zstd { level: Some(-3) }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,15 +48,6 @@ pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
|
||||
#[allow(clippy::identity_op)]
|
||||
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
|
||||
|
||||
/// Interval of checkpointing metadata file. We should store metadata file to enforce
|
||||
/// predicate that checkpoint.nextXid is larger than any XID in WAL.
|
||||
/// But flushing checkpoint file for each transaction seems to be too expensive,
|
||||
/// so XID_CHECKPOINT_INTERVAL is used to forward align nextXid and so perform
|
||||
/// metadata checkpoint only once per XID_CHECKPOINT_INTERVAL transactions.
|
||||
/// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
|
||||
/// in order to let CLOG_TRUNCATE mechanism correctly extend CLOG.
|
||||
const XID_CHECKPOINT_INTERVAL: u32 = 1024;
|
||||
|
||||
pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo {
|
||||
(0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo
|
||||
}
|
||||
@@ -331,14 +322,10 @@ impl CheckPoint {
|
||||
/// Returns 'true' if the XID was updated.
|
||||
pub fn update_next_xid(&mut self, xid: u32) -> bool {
|
||||
// nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
|
||||
let mut new_xid = std::cmp::max(
|
||||
let new_xid = std::cmp::max(
|
||||
xid.wrapping_add(1),
|
||||
pg_constants::FIRST_NORMAL_TRANSACTION_ID,
|
||||
);
|
||||
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
|
||||
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
|
||||
new_xid =
|
||||
new_xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1);
|
||||
let full_xid = self.nextXid.value;
|
||||
let old_xid = full_xid as u32;
|
||||
if new_xid.wrapping_sub(old_xid) as i32 > 0 {
|
||||
@@ -360,7 +347,7 @@ impl CheckPoint {
|
||||
/// Advance next multi-XID/offset to those given in arguments.
|
||||
///
|
||||
/// It's important that this handles wraparound correctly. This should match the
|
||||
/// MultiXactAdvanceNextMXact() logic in PostgreSQL's xlog_redo() function.
|
||||
/// MultiXactAdvceNextMXact() logic in PostgreSQL's xlog_redo() function.
|
||||
///
|
||||
/// Returns 'true' if the Checkpoint was updated.
|
||||
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
|
||||
|
||||
@@ -500,15 +500,6 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static VISIBLE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_visible_physical_size",
|
||||
"The size of the layer files present in the pageserver's filesystem.",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_resident_physical_size_global",
|
||||
@@ -1465,12 +1456,10 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"pageserver_live_connections_started",
|
||||
"Number of network connections that we started handling",
|
||||
"pageserver_live_connections_finished",
|
||||
"Number of network connections that we finished handling",
|
||||
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_live_connections",
|
||||
"Number of live network connections",
|
||||
&["pageserver_connection_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
@@ -2139,7 +2128,6 @@ pub(crate) struct TimelineMetrics {
|
||||
pub archival_size: UIntGauge,
|
||||
pub standby_horizon_gauge: IntGauge,
|
||||
pub resident_physical_size_gauge: UIntGauge,
|
||||
pub visible_physical_size_gauge: UIntGauge,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
pub aux_file_size_gauge: IntGauge,
|
||||
@@ -2226,9 +2214,6 @@ impl TimelineMetrics {
|
||||
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
let visible_physical_size_gauge = VISIBLE_PHYSICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
// TODO: we shouldn't expose this metric
|
||||
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
@@ -2279,7 +2264,6 @@ impl TimelineMetrics {
|
||||
archival_size,
|
||||
standby_horizon_gauge,
|
||||
resident_physical_size_gauge,
|
||||
visible_physical_size_gauge,
|
||||
current_logical_size_gauge,
|
||||
aux_file_size_gauge,
|
||||
directory_entries_count_gauge,
|
||||
@@ -2331,7 +2315,6 @@ impl TimelineMetrics {
|
||||
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
|
||||
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
}
|
||||
let _ = VISIBLE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
|
||||
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);
|
||||
|
||||
@@ -55,7 +55,7 @@ use crate::basebackup::BasebackupError;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics;
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
|
||||
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
@@ -215,9 +215,14 @@ async fn page_service_conn_main(
|
||||
auth_type: AuthType,
|
||||
connection_ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["page_service"])
|
||||
.guard();
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
|
||||
socket
|
||||
.set_nodelay(true)
|
||||
|
||||
@@ -931,7 +931,7 @@ impl Timeline {
|
||||
result.to_keyspace(),
|
||||
/* AUX sparse key space */
|
||||
SparseKeySpace(KeySpace {
|
||||
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
|
||||
ranges: vec![repl_origin_key_range(), Key::metadata_aux_key_range()],
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -19,7 +19,6 @@ use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -31,7 +30,6 @@ use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
@@ -94,12 +92,14 @@ use crate::tenant::storage_layer::ImageLayer;
|
||||
use crate::walredo;
|
||||
use crate::InitializationOrder;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::fmt::Display;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::ops::Bound::Included;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
@@ -632,11 +632,6 @@ impl Tenant {
|
||||
timeline.maybe_spawn_flush_loop();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ancestor) = timeline.get_ancestor_timeline() {
|
||||
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
|
||||
ancestor_gc_info.insert_child(timeline.timeline_id, timeline.get_ancestor_lsn());
|
||||
}
|
||||
};
|
||||
|
||||
// Sanity check: a timeline should have some content.
|
||||
@@ -1738,9 +1733,6 @@ impl Tenant {
|
||||
.values()
|
||||
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
|
||||
|
||||
// Before activation, populate each Timeline's GcInfo with information about its children
|
||||
self.initialize_gc_info(&timelines_accessor);
|
||||
|
||||
// Spawn gc and compaction loops. The loops will shut themselves
|
||||
// down when they notice that the tenant is inactive.
|
||||
tasks::start_background_loops(self, background_jobs_can_start);
|
||||
@@ -2773,56 +2765,6 @@ impl Tenant {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Populate all Timelines' `GcInfo` with information about their children. We do not set the
|
||||
/// PITR cutoffs here, because that requires I/O: this is done later, before GC, by [`Self::refresh_gc_info_internal`]
|
||||
///
|
||||
/// Subsequently, parent-child relationships are updated incrementally during timeline creation/deletion.
|
||||
fn initialize_gc_info(
|
||||
&self,
|
||||
timelines: &std::sync::MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) {
|
||||
// This function must be called before activation: after activation timeline create/delete operations
|
||||
// might happen, and this function is not safe to run concurrently with those.
|
||||
assert!(!self.is_active());
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
let mut all_branchpoints: BTreeMap<TimelineId, Vec<(Lsn, TimelineId, Option<KeySpace>)>> =
|
||||
BTreeMap::new();
|
||||
timelines.iter().for_each(|(timeline_id, timeline_entry)| {
|
||||
if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() {
|
||||
let ancestor_children = all_branchpoints.entry(*ancestor_timeline_id).or_default();
|
||||
ancestor_children.push((timeline_entry.get_ancestor_lsn(), *timeline_id, None));
|
||||
}
|
||||
});
|
||||
|
||||
// The number of bytes we always keep, irrespective of PITR: this is a constant across timelines
|
||||
let horizon = self.get_gc_horizon();
|
||||
|
||||
// Populate each timeline's GcInfo with information about its child branches
|
||||
for timeline in timelines.values() {
|
||||
let mut branchpoints: Vec<(Lsn, TimelineId, Option<KeySpace>)> = all_branchpoints
|
||||
.remove(&timeline.timeline_id)
|
||||
.unwrap_or_default();
|
||||
|
||||
branchpoints.sort_by_key(|b| b.0);
|
||||
|
||||
let mut target = timeline.gc_info.write().unwrap();
|
||||
|
||||
target.retain_lsns = branchpoints;
|
||||
|
||||
let horizon_cutoff = timeline
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(horizon)
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
target.cutoffs = GcCutoffs {
|
||||
horizon: horizon_cutoff,
|
||||
pitr: Lsn::INVALID,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh_gc_info_internal(
|
||||
&self,
|
||||
target_timeline_id: Option<TimelineId>,
|
||||
@@ -2845,11 +2787,6 @@ impl Tenant {
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if target_timeline_id.is_some() && timelines.is_empty() {
|
||||
// We were to act on a particular timeline and it wasn't found
|
||||
return Err(GcError::TimelineNotFound);
|
||||
}
|
||||
|
||||
let mut gc_cutoffs: HashMap<TimelineId, GcCutoffs> =
|
||||
HashMap::with_capacity(timelines.len());
|
||||
|
||||
@@ -2872,15 +2809,68 @@ impl Tenant {
|
||||
// because that will stall branch creation.
|
||||
let gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
// the branch point where it was created.
|
||||
let (all_branchpoints, timelines): (BTreeSet<(TimelineId, Lsn)>, _) = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let mut all_branchpoints = BTreeSet::new();
|
||||
let timelines = {
|
||||
if let Some(target_timeline_id) = target_timeline_id.as_ref() {
|
||||
if timelines.get(target_timeline_id).is_none() {
|
||||
return Err(GcError::TimelineNotFound);
|
||||
}
|
||||
};
|
||||
|
||||
timelines
|
||||
.iter()
|
||||
.map(|(_timeline_id, timeline_entry)| {
|
||||
if let Some(ancestor_timeline_id) =
|
||||
&timeline_entry.get_ancestor_timeline_id()
|
||||
{
|
||||
// If target_timeline is specified, we only need to know branchpoints of its children
|
||||
if let Some(timeline_id) = target_timeline_id {
|
||||
if ancestor_timeline_id == &timeline_id {
|
||||
all_branchpoints.insert((
|
||||
*ancestor_timeline_id,
|
||||
timeline_entry.get_ancestor_lsn(),
|
||||
));
|
||||
}
|
||||
}
|
||||
// Collect branchpoints for all timelines
|
||||
else {
|
||||
all_branchpoints.insert((
|
||||
*ancestor_timeline_id,
|
||||
timeline_entry.get_ancestor_lsn(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
timeline_entry.clone()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
(all_branchpoints, timelines)
|
||||
};
|
||||
|
||||
// Ok, we now know all the branch points.
|
||||
// Update the GC information for each timeline.
|
||||
let mut gc_timelines = Vec::with_capacity(timelines.len());
|
||||
for timeline in timelines {
|
||||
// We filtered the timeline list above
|
||||
// If target_timeline is specified, ignore all other timelines
|
||||
if let Some(target_timeline_id) = target_timeline_id {
|
||||
assert_eq!(target_timeline_id, timeline.timeline_id);
|
||||
if timeline.timeline_id != target_timeline_id {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let branchpoints: Vec<Lsn> = all_branchpoints
|
||||
.range((
|
||||
Included((timeline.timeline_id, Lsn(0))),
|
||||
Included((timeline.timeline_id, Lsn(u64::MAX))),
|
||||
))
|
||||
.map(|&x| x.1)
|
||||
.collect();
|
||||
|
||||
{
|
||||
let mut target = timeline.gc_info.write().unwrap();
|
||||
|
||||
@@ -2918,12 +2908,20 @@ impl Tenant {
|
||||
.0,
|
||||
);
|
||||
|
||||
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
|
||||
// - this timeline was created while we were finding cutoffs
|
||||
// - lsn for timestamp search fails for this timeline repeatedly
|
||||
if let Some(cutoffs) = gc_cutoffs.remove(&timeline.timeline_id) {
|
||||
target.cutoffs = cutoffs;
|
||||
}
|
||||
match gc_cutoffs.remove(&timeline.timeline_id) {
|
||||
Some(cutoffs) => {
|
||||
target.retain_lsns = branchpoints;
|
||||
target.cutoffs = cutoffs;
|
||||
}
|
||||
None => {
|
||||
// reasons for this being unavailable:
|
||||
// - this timeline was created while we were finding cutoffs
|
||||
// - lsn for timestamp search fails for this timeline repeatedly
|
||||
//
|
||||
// in both cases, refreshing the branchpoints is correct.
|
||||
target.retain_lsns = branchpoints;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
gc_timelines.push(timeline);
|
||||
@@ -4307,7 +4305,7 @@ mod tests {
|
||||
{
|
||||
let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
|
||||
assert_eq!(branchpoints.len(), 1);
|
||||
assert_eq!(branchpoints[0], (Lsn(0x40), NEW_TIMELINE_ID, None));
|
||||
assert_eq!(branchpoints[0], Lsn(0x40));
|
||||
}
|
||||
|
||||
// You can read the key from the child branch even though the parent is
|
||||
|
||||
@@ -51,7 +51,7 @@ use crate::keyspace::KeyPartitioning;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::storage_layer::InMemoryLayer;
|
||||
use anyhow::Result;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum, KeySpaceRandomAccum};
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::iter::Peekable;
|
||||
use std::ops::Range;
|
||||
@@ -61,7 +61,7 @@ use utils::lsn::Lsn;
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::LayerKey;
|
||||
|
||||
use super::storage_layer::{LayerVisibility, PersistentLayerDesc};
|
||||
use super::storage_layer::PersistentLayerDesc;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
@@ -870,164 +870,6 @@ impl LayerMap {
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// `read_points` represent the tip of a timeline and any branch points, i.e. the places
|
||||
/// where we expect to serve reads.
|
||||
///
|
||||
/// This function is O(N) and should be called infrequently. The caller is responsible for
|
||||
/// looking up and updating the Layer objects for these layer descriptors.
|
||||
pub(crate) fn get_visibility(
|
||||
&self,
|
||||
mut read_points: Vec<(Lsn, KeySpace)>,
|
||||
) -> (Vec<(Arc<PersistentLayerDesc>, LayerVisibility)>, KeySpace) {
|
||||
// This is like a KeySpace, but written for efficient subtraction of layers and unions with KeySpaces
|
||||
struct KeyShadow {
|
||||
// FIXME: consider efficiency. KeySpace is a flat vector, so in principle fairly inefficient for
|
||||
// repeatedly calling contains(), BUT as we iterate through the layermap we expect the shadow to shrink
|
||||
// to something quite small, and for small collections an algorithmically expensive vector is often better
|
||||
// for performance than a more algorithmically cheap data structure.
|
||||
inner: KeySpace,
|
||||
}
|
||||
|
||||
impl KeyShadow {
|
||||
fn new(keyspace: KeySpace) -> Self {
|
||||
Self { inner: keyspace }
|
||||
}
|
||||
|
||||
fn contains(&self, range: Range<Key>) -> bool {
|
||||
self.inner.overlaps(&range)
|
||||
}
|
||||
|
||||
/// Return true if anything was removed.
|
||||
fn subtract(&mut self, range: Range<Key>) -> bool {
|
||||
let removed = self.inner.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![range],
|
||||
});
|
||||
!removed.ranges.is_empty()
|
||||
}
|
||||
|
||||
fn union_with(&mut self, keyspace: KeySpace) {
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
let prev = std::mem::take(&mut self.inner);
|
||||
accum.add_keyspace(prev);
|
||||
accum.add_keyspace(keyspace);
|
||||
self.inner = accum.to_keyspace();
|
||||
}
|
||||
}
|
||||
|
||||
// The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
|
||||
// and a ReadPoint
|
||||
read_points.sort_by_key(|rp| rp.0);
|
||||
let mut shadow = KeyShadow::new(
|
||||
read_points
|
||||
.pop()
|
||||
.expect("Every timeline has at least one read point")
|
||||
.1,
|
||||
);
|
||||
|
||||
// We will interleave all our read points and layers into a sorted collection
|
||||
enum Item {
|
||||
ReadPoint { lsn: Lsn, keyspace: KeySpace },
|
||||
Layer(Arc<PersistentLayerDesc>),
|
||||
}
|
||||
|
||||
let mut items = Vec::with_capacity(self.historic.len() + read_points.len());
|
||||
items.extend(self.iter_historic_layers().map(Item::Layer));
|
||||
items.extend(read_points.into_iter().map(|rp| Item::ReadPoint {
|
||||
lsn: rp.0,
|
||||
keyspace: rp.1,
|
||||
}));
|
||||
|
||||
// Ordering: we want to iterate like this:
|
||||
// 1. Highest LSNs first
|
||||
// 2. Consider ReadPoints before image layers if they're at the same LSN
|
||||
items.sort_by_key(|item| {
|
||||
std::cmp::Reverse(match item {
|
||||
Item::ReadPoint {
|
||||
lsn,
|
||||
keyspace: _keyspace,
|
||||
} => (*lsn, 0),
|
||||
Item::Layer(layer) => {
|
||||
if layer.is_delta() {
|
||||
(layer.get_lsn_range().end, 1)
|
||||
} else {
|
||||
(layer.image_layer_lsn(), 2)
|
||||
}
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
let mut results = Vec::with_capacity(self.historic.len());
|
||||
|
||||
// TODO: handle delta layers properly with multiple read points: if a read point intersects a delta layer, we might already
|
||||
// have encountered it and marked it as not-visible. We need to keep track of which delta layers we are currently within, and
|
||||
// when we encounter a ReadPoint, update the delta layer's visibility as needed.
|
||||
// let mut pending_delta : Vec= ...
|
||||
let mut maybe_covered_deltas: Vec<Arc<PersistentLayerDesc>> = Vec::new();
|
||||
|
||||
for item in items {
|
||||
let (reached_lsn, is_readpoint) = match &item {
|
||||
Item::ReadPoint {
|
||||
lsn,
|
||||
keyspace: _keyspace,
|
||||
} => (lsn, true),
|
||||
Item::Layer(layer) => (&layer.lsn_range.start, false),
|
||||
};
|
||||
maybe_covered_deltas.retain(|d| {
|
||||
if *reached_lsn >= d.lsn_range.start && is_readpoint {
|
||||
// We encountered a readpoint within the delta layer: it is visible
|
||||
results.push((d.clone(), LayerVisibility::Visible));
|
||||
false
|
||||
} else if *reached_lsn < d.lsn_range.start {
|
||||
// We passed the layer's range without encountering a read point: it is not visible
|
||||
results.push((d.clone(), LayerVisibility::Covered));
|
||||
false
|
||||
} else {
|
||||
// We're still in the delta layer: continue iterating
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
match item {
|
||||
Item::ReadPoint {
|
||||
lsn: _lsn,
|
||||
keyspace,
|
||||
} => {
|
||||
shadow.union_with(keyspace);
|
||||
}
|
||||
Item::Layer(layer) => {
|
||||
let visibility = if layer.is_delta() {
|
||||
if shadow.contains(layer.get_key_range()) {
|
||||
LayerVisibility::Visible
|
||||
} else {
|
||||
// If a layer isn't visible based on current state, we must defer deciding whether
|
||||
// it is truly not visible until we have advanced past the delta's range: we might
|
||||
// encounter another branch point within this delta layer's LSN range.
|
||||
maybe_covered_deltas.push(layer);
|
||||
continue;
|
||||
}
|
||||
} else if shadow.subtract(layer.get_key_range()) {
|
||||
// An image layer, which overlapped with the shadow
|
||||
LayerVisibility::Visible
|
||||
} else {
|
||||
// An image layer, which did not overlap with the shadow
|
||||
LayerVisibility::Covered
|
||||
};
|
||||
|
||||
results.push((layer, visibility));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drain any remaining maybe_covered deltas
|
||||
results.extend(
|
||||
maybe_covered_deltas
|
||||
.into_iter()
|
||||
.map(|d| (d, LayerVisibility::Covered)),
|
||||
);
|
||||
|
||||
(results, shadow.inner)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -521,10 +521,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
|
||||
|
||||
Ok(&self.historic_coverage)
|
||||
}
|
||||
|
||||
pub(crate) fn len(&self) -> usize {
|
||||
self.layers.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -271,14 +271,10 @@ pub(super) async fn gather_inputs(
|
||||
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
|
||||
.retain_lsns
|
||||
.iter()
|
||||
.filter_map(|(lsn, _child_id, _)| {
|
||||
if lsn > &ancestor_lsn {
|
||||
// this assumes there are no other retain_lsns than the branchpoints
|
||||
Some((*lsn, LsnKind::BranchPoint))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.filter(|&&lsn| lsn > ancestor_lsn)
|
||||
.copied()
|
||||
// this assumes there are no other retain_lsns than the branchpoints
|
||||
.map(|lsn| (lsn, LsnKind::BranchPoint))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
|
||||
|
||||
@@ -457,26 +457,6 @@ pub enum ValueReconstructResult {
|
||||
Missing,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum LayerVisibility {
|
||||
/// A Visible layer might be read while serving a read, because there is not an image layer between it
|
||||
/// and a readable LSN (the tip of the branch or a child's branch point)
|
||||
Visible,
|
||||
/// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
|
||||
/// a branch or ephemeral endpoint at an LSN below the layer that covers this.
|
||||
Covered,
|
||||
/// Calculating layer visibilty requires I/O, so until this has happened layers are loaded
|
||||
/// in this state. Note that newly written layers may be called Visible immediately, this uninitialized
|
||||
/// state is for when existing layers are constructed while loading a timeline.
|
||||
Uninitialized,
|
||||
}
|
||||
|
||||
impl Default for LayerVisibility {
|
||||
fn default() -> Self {
|
||||
Self::Uninitialized
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
|
||||
|
||||
@@ -488,7 +468,6 @@ pub struct LayerAccessStats(Mutex<LayerAccessStatsLocked>);
|
||||
struct LayerAccessStatsLocked {
|
||||
for_scraping_api: LayerAccessStatsInner,
|
||||
for_eviction_policy: LayerAccessStatsInner,
|
||||
visibility: LayerVisibility,
|
||||
}
|
||||
|
||||
impl LayerAccessStatsLocked {
|
||||
@@ -612,13 +591,7 @@ impl LayerAccessStats {
|
||||
inner.count_by_access_kind[access_kind] += 1;
|
||||
inner.task_kind_flag |= ctx.task_kind();
|
||||
inner.last_accesses.write(this_access);
|
||||
});
|
||||
|
||||
// We may access a layer marked as Covered, if a new branch was created that depends on
|
||||
// this layer, and background updates to layer visibility didn't notice it yet
|
||||
if !matches!(locked.visibility, LayerVisibility::Visible) {
|
||||
locked.visibility = LayerVisibility::Visible;
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn as_api_model(
|
||||
@@ -700,28 +673,6 @@ impl LayerAccessStats {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_visibility(&self, visibility: LayerVisibility) {
|
||||
self.0.lock().unwrap().visibility = visibility;
|
||||
}
|
||||
|
||||
pub(crate) fn get_visibility(&self) -> LayerVisibility {
|
||||
self.0.lock().unwrap().visibility.clone()
|
||||
}
|
||||
|
||||
/// Summarize how likely this layer is to be used: its access time (if accessed), and its visibility hint.
|
||||
pub(crate) fn atime_visibility(&self) -> (Option<SystemTime>, LayerVisibility) {
|
||||
let state = self.0.lock().unwrap();
|
||||
|
||||
(
|
||||
state
|
||||
.for_eviction_policy
|
||||
.last_accesses
|
||||
.recent()
|
||||
.map(|a| a.when),
|
||||
state.visibility.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a layer descriptor from a layer.
|
||||
|
||||
@@ -250,8 +250,6 @@ impl Layer {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
// Newly created layers are marked visible by default: the usual case is that they were created to be read.
|
||||
access_stats.set_visibility(super::LayerVisibility::Visible);
|
||||
|
||||
let local_path = local_layer_path(
|
||||
conf,
|
||||
|
||||
@@ -30,7 +30,7 @@ use pageserver_api::{
|
||||
InMemoryLayerInfo, LayerMapInfo, LsnLease, TimelineState,
|
||||
},
|
||||
reltag::BlockNumber,
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId},
|
||||
shard::{ShardIdentity, ShardNumber, TenantShardId},
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde_with::serde_as;
|
||||
@@ -135,7 +135,7 @@ use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{config::TenantConf, storage_layer::LayerVisibility};
|
||||
use super::config::TenantConf;
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
||||
use super::{remote_timeline_client::RemoteTimelineClient, storage_layer::ReadableLayer};
|
||||
@@ -453,12 +453,12 @@ pub struct WalReceiverInfo {
|
||||
/// Garbage Collection.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GcInfo {
|
||||
/// Record which parts of this timeline's history are still needed by children
|
||||
/// Specific LSNs that are needed.
|
||||
///
|
||||
/// Optionally store each child's keyspace at their branch LSN: parts of the keyspace not covered here may be dropped during GC, as
|
||||
/// the child will never read them. For example, a child which has covered its whole keyspace with image layers
|
||||
/// will put an empty keyspace here. Children populate this: if it is None, presume the child may read any part of the keyspace.
|
||||
pub(crate) retain_lsns: Vec<(Lsn, TimelineId, Option<KeySpace>)>,
|
||||
/// Currently, this includes all points where child branches have
|
||||
/// been forked off from. In the future, could also include
|
||||
/// explicit user-defined snapshot points.
|
||||
pub(crate) retain_lsns: Vec<Lsn>,
|
||||
|
||||
/// The cutoff coordinates, which are combined by selecting the minimum.
|
||||
pub(crate) cutoffs: GcCutoffs,
|
||||
@@ -474,23 +474,6 @@ impl GcInfo {
|
||||
pub(crate) fn min_cutoff(&self) -> Lsn {
|
||||
self.cutoffs.select_min()
|
||||
}
|
||||
|
||||
pub(super) fn insert_child(&mut self, child_id: TimelineId, child_lsn: Lsn) {
|
||||
self.retain_lsns.push((child_lsn, child_id, None));
|
||||
self.retain_lsns.sort_by_key(|i| i.0);
|
||||
}
|
||||
|
||||
pub(super) fn remove_child(&mut self, child_id: TimelineId) {
|
||||
self.retain_lsns.retain(|i| i.1 != child_id);
|
||||
}
|
||||
|
||||
/// When the child re-calculates which parts of the keyspace it will read from the ancestor, it posts
|
||||
/// and update to the parent using this function, to enable the parent to perhaps GC more layers.
|
||||
pub(super) fn notify_child_keyspace(&mut self, child_id: TimelineId, key_space: KeySpace) {
|
||||
if let Ok(idx) = self.retain_lsns.binary_search_by_key(&child_id, |i| i.1) {
|
||||
self.retain_lsns.get_mut(idx).unwrap().2 = Some(key_space);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The `GcInfo` component describing which Lsns need to be retained.
|
||||
@@ -1810,26 +1793,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
match self.get_compaction_algorithm_settings().kind {
|
||||
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await?,
|
||||
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await?,
|
||||
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
|
||||
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
|
||||
}
|
||||
|
||||
if self.shard_identity.count >= ShardCount::new(2) {
|
||||
// Limit the number of layer rewrites to the number of partitions: this means its
|
||||
// runtime should be comparable to a full round of image layer creations, rather than
|
||||
// being potentially much longer.
|
||||
// TODO: make `partitioning` a sync lock: see comment in `repartition()` for why there's no
|
||||
// real async use.
|
||||
let rewrite_max = self.partitioning.try_lock().unwrap().0 .0.parts.len();
|
||||
|
||||
self.compact_shard_ancestors(rewrite_max, ctx).await?;
|
||||
}
|
||||
|
||||
// TODO: be more selective: call this once at startup, and thereafter only when some branching changes or
|
||||
// when image layer are generated.
|
||||
self.update_layer_visibility(ctx).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mutate the timeline with a [`TimelineWriter`].
|
||||
@@ -3001,17 +2967,6 @@ impl Timeline {
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
.ok()
|
||||
.expect("only this task sets it");
|
||||
|
||||
// As a nice-to-have, calculate layer visibilties. Otherwise this will
|
||||
// be initialized on first compaction. Doing it as early as possible
|
||||
// enables code that depends on layer visibility (like uploading heatmaps)
|
||||
// to execute earlier, rather than waiting for compaction.
|
||||
match self.update_layer_visibility(&background_ctx).await {
|
||||
Ok(_) | Err(CompactionError::ShuttingDown) => {}
|
||||
Err(e) => {
|
||||
tracing::warn!("Initial layer visibility calculation failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_ondemand_logical_size_calculation(
|
||||
@@ -3189,8 +3144,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// The timeline heatmap is a hint to secondary locations from the primary location,
|
||||
/// indicating which layers should be downloaded on the secondary to give it a warm
|
||||
/// cache, that will enable it to take over as the attached location without degrading performance.
|
||||
/// indicating which layers are currently on-disk on the primary.
|
||||
///
|
||||
/// None is returned if the Timeline is in a state where uploading a heatmap
|
||||
/// doesn't make sense, such as shutting down or initializing. The caller
|
||||
@@ -3203,32 +3157,19 @@ impl Timeline {
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
|
||||
let mut resident_visible_layers = Vec::new();
|
||||
let now = SystemTime::now();
|
||||
for layer in guard.likely_resident_layers() {
|
||||
let (atime, visibility) = layer.access_stats().atime_visibility();
|
||||
let resident = guard.likely_resident_layers().map(|layer| {
|
||||
let last_activity_ts = layer.access_stats().latest_activity_or_now();
|
||||
|
||||
match visibility {
|
||||
LayerVisibility::Uninitialized => {
|
||||
// Refuse to generate a heatmap at all until layer visibilty is initialized
|
||||
return None;
|
||||
}
|
||||
LayerVisibility::Covered => {
|
||||
// This layer is covered: exclude it from the heatmap because a secondary
|
||||
// node is highly unlikely to need this layer in the event that it takes over as attached
|
||||
}
|
||||
LayerVisibility::Visible => resident_visible_layers.push(HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
layer.metadata(),
|
||||
atime.unwrap_or(now),
|
||||
)),
|
||||
}
|
||||
}
|
||||
HeatMapLayer::new(
|
||||
layer.layer_desc().layer_name(),
|
||||
layer.metadata(),
|
||||
last_activity_ts,
|
||||
)
|
||||
});
|
||||
|
||||
Some(HeatMapTimeline::new(
|
||||
self.timeline_id,
|
||||
resident_visible_layers,
|
||||
))
|
||||
let layers = resident.collect();
|
||||
|
||||
Some(HeatMapTimeline::new(self.timeline_id, layers))
|
||||
}
|
||||
|
||||
/// Returns true if the given lsn is or was an ancestor branchpoint.
|
||||
@@ -5094,11 +5035,7 @@ impl Timeline {
|
||||
|
||||
let horizon_cutoff = min(gc_info.cutoffs.horizon, self.get_disk_consistent_lsn());
|
||||
let pitr_cutoff = gc_info.cutoffs.pitr;
|
||||
let retain_lsns = gc_info
|
||||
.retain_lsns
|
||||
.iter()
|
||||
.map(|(lsn, _child_id, _)| *lsn)
|
||||
.collect();
|
||||
let retain_lsns = gc_info.retain_lsns.clone();
|
||||
|
||||
// Gets the maximum LSN that holds the valid lease.
|
||||
//
|
||||
|
||||
@@ -19,14 +19,14 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::keyspace::ShardedRange;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, info_span, trace, warn, Instrument};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::tenant::storage_layer::{AsLayerDesc, LayerVisibility, PersistentLayerDesc};
|
||||
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc};
|
||||
use crate::tenant::timeline::{drop_rlock, Hole, ImageLayerCreationOutcome};
|
||||
use crate::tenant::timeline::{DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
@@ -100,7 +100,7 @@ impl Timeline {
|
||||
// Define partitioning schema if needed
|
||||
|
||||
// FIXME: the match should only cover repartitioning, not the next steps
|
||||
match self
|
||||
let partition_count = match self
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
@@ -140,6 +140,7 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
self.upload_new_image_layers(image_layers)?;
|
||||
partitioning.parts.len()
|
||||
}
|
||||
Err(err) => {
|
||||
// no partitioning? This is normal, if the timeline was just created
|
||||
@@ -151,9 +152,19 @@ impl Timeline {
|
||||
if !self.cancel.is_cancelled() {
|
||||
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
|
||||
}
|
||||
1
|
||||
}
|
||||
};
|
||||
|
||||
if self.shard_identity.count >= ShardCount::new(2) {
|
||||
// Limit the number of layer rewrites to the number of partitions: this means its
|
||||
// runtime should be comparable to a full round of image layer creations, rather than
|
||||
// being potentially much longer.
|
||||
let rewrite_max = partition_count;
|
||||
|
||||
self.compact_shard_ancestors(rewrite_max, ctx).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -165,7 +176,7 @@ impl Timeline {
|
||||
///
|
||||
/// Note: this phase may read and write many gigabytes of data: use rewrite_max to bound
|
||||
/// how much work it will try to do in each compaction pass.
|
||||
pub(super) async fn compact_shard_ancestors(
|
||||
async fn compact_shard_ancestors(
|
||||
self: &Arc<Self>,
|
||||
rewrite_max: usize,
|
||||
ctx: &RequestContext,
|
||||
@@ -347,88 +358,6 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A post-compaction step to update the LayerVisibility of layers covered by image layers. This
|
||||
/// should also be called when new branches are created.
|
||||
///
|
||||
/// Sweep through the layer map, identifying layers which are covered by image layers
|
||||
/// such that they do not need to be available to service reads. The resulting LayerVisibility
|
||||
/// result may be used as an input to eviction and secondary downloads to de-prioritize layers
|
||||
/// that we know won't be needed for reads.
|
||||
pub(super) async fn update_layer_visibility(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
// Start with a keyspace representing all the keys we need to read from the tip of the branch
|
||||
let head_lsn = self.get_last_record_lsn();
|
||||
let (mut head_keyspace, sparse_ks) = self.collect_keyspace(head_lsn, ctx).await?;
|
||||
|
||||
// Converting the sparse part of the keyspace into the dense keyspace is safe in this context
|
||||
// because we will never iterate through the keys.
|
||||
head_keyspace.merge(&sparse_ks.0);
|
||||
|
||||
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
|
||||
// are implicitly visible, because LayerVisibility's default is Visible, and we never modify it here.
|
||||
let layer_manager = self.layers.read().await;
|
||||
let layer_map = layer_manager.layer_map();
|
||||
|
||||
let mut visible_size: u64 = 0;
|
||||
|
||||
// FIXME: we only get accurate keyspaces from children if they've already run update_layer_visibility themselves. At startup all the timelines
|
||||
// initialize this in arbitrary order (at the end of initial_logical_size_calculation). We should coordinate these. Perhaps at the very start
|
||||
// of the tenant compaction task we should do all the timelines' layer visibility calculations in a leaf-first order?
|
||||
let readable_points = {
|
||||
let children = self.gc_info.read().unwrap().retain_lsns.clone();
|
||||
|
||||
let mut readable_points = Vec::with_capacity(children.len() + 1);
|
||||
for (child_lsn, _child_timeline_id, child_keyspace) in &children {
|
||||
let keyspace = match child_keyspace {
|
||||
Some(ks) => ks.clone(),
|
||||
None => {
|
||||
// The child has not posted information about which parts of the keyspace they depend on: presume they depend on all of it.
|
||||
let (mut keyspace, sparse_keyspace) =
|
||||
self.collect_keyspace(*child_lsn, ctx).await?;
|
||||
keyspace.merge(&sparse_keyspace.0);
|
||||
keyspace
|
||||
}
|
||||
};
|
||||
readable_points.push((*child_lsn, keyspace));
|
||||
}
|
||||
readable_points.push((head_lsn, head_keyspace));
|
||||
readable_points
|
||||
};
|
||||
|
||||
let (layer_visibility, shadow) = layer_map.get_visibility(readable_points);
|
||||
for (layer_desc, visibility) in layer_visibility {
|
||||
// FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
|
||||
let layer = layer_manager.get_from_desc(&layer_desc);
|
||||
if matches!(visibility, LayerVisibility::Visible) {
|
||||
visible_size += layer.metadata().file_size;
|
||||
}
|
||||
|
||||
layer.access_stats().set_visibility(visibility);
|
||||
}
|
||||
|
||||
if let Some(ancestor) = &self.ancestor_timeline {
|
||||
// Having calculated the readable keyspace after walking back through all this timeline's layers, the resulting keyspace is the remaining
|
||||
// keys for which reads may still fall through to the parent branch. Notify the parent branch of this, so that they may GC layers which
|
||||
// do not overlap with this keyspace, and so that they may use this as an input to their own visibility updates.
|
||||
ancestor
|
||||
.gc_info
|
||||
.write()
|
||||
.unwrap()
|
||||
.notify_child_keyspace(self.timeline_id, shadow);
|
||||
}
|
||||
|
||||
// Also include in the visible size all the layers which we would never update visibility on
|
||||
// TODO: getter that doesn't spuriously construct a Vec<>
|
||||
for layer in layer_map.get_level0_deltas().unwrap() {
|
||||
visible_size += layer.file_size;
|
||||
}
|
||||
self.metrics.visible_physical_size_gauge.set(visible_size);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
|
||||
/// as Level 1 files.
|
||||
async fn compact_level0(
|
||||
|
||||
@@ -148,14 +148,14 @@ async fn cleanup_remaining_timeline_fs_traces(
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
async fn remove_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
timeline_id: TimelineId,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
|
||||
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
|
||||
// We already deleted the layer files, so it's probably best to panic.
|
||||
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
|
||||
@@ -163,14 +163,8 @@ async fn remove_timeline_from_tenant(
|
||||
panic!("Timeline grew children while we removed layer files");
|
||||
}
|
||||
|
||||
// Unlink from parent
|
||||
if let Some(ancestor) = timeline.get_ancestor_timeline() {
|
||||
let mut ancestor_gc_info = ancestor.gc_info.write().unwrap();
|
||||
ancestor_gc_info.remove_child(timeline.timeline_id);
|
||||
}
|
||||
|
||||
timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.remove(&timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
|
||||
drop(timelines);
|
||||
@@ -299,9 +293,6 @@ impl DeleteTimelineFlow {
|
||||
{
|
||||
let mut locked = tenant.timelines.lock().unwrap();
|
||||
locked.insert(timeline_id, Arc::clone(&timeline));
|
||||
|
||||
// Note that we do not insert this into the parent branch's GcInfo: the parent is not obliged to retain
|
||||
// any data for child timelines being deleted.
|
||||
}
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
@@ -422,7 +413,7 @@ impl DeleteTimelineFlow {
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
remove_timeline_from_tenant(tenant, timeline, &guard).await?;
|
||||
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
|
||||
@@ -255,14 +255,6 @@ impl LayerManager {
|
||||
new_layer.layer_desc().lsn_range
|
||||
);
|
||||
|
||||
// Transfer visibilty hint from old to new layer, since the new layer covers the same key space. This is not guaranteed to
|
||||
// be accurate (as the new layer may cover a different subset of the key range), but is a sensible default, and prevents
|
||||
// always marking rewritten layers as visible.
|
||||
new_layer
|
||||
.as_ref()
|
||||
.access_stats()
|
||||
.set_visibility(old_layer.access_stats().get_visibility());
|
||||
|
||||
// Safety: we may never rewrite the same file in-place. Callers are responsible
|
||||
// for ensuring that they only rewrite layers after something changes the path,
|
||||
// such as an increment in the generation number.
|
||||
|
||||
@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
@@ -208,9 +208,14 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
.instrument(tracing::info_span!("poller")),
|
||||
);
|
||||
|
||||
let _guard = LIVE_CONNECTIONS
|
||||
.with_label_values(&["wal_receiver"])
|
||||
.guard();
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
// One of the pros of `defer!` is that this will *most probably*
|
||||
// get called, even in presence of panics.
|
||||
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
|
||||
gauge.inc();
|
||||
scopeguard::defer! {
|
||||
gauge.dec();
|
||||
}
|
||||
|
||||
let identify = identify_system(&replication_client).await?;
|
||||
info!("{identify:?}");
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use futures::StreamExt;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -29,7 +29,7 @@ impl LargeObjectKind {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct LargeObject {
|
||||
pub key: String,
|
||||
pub size: u64,
|
||||
@@ -45,76 +45,53 @@ pub async fn find_large_objects(
|
||||
bucket_config: BucketConfig,
|
||||
min_size: u64,
|
||||
ignore_deltas: bool,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<LargeObjectListing> {
|
||||
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
|
||||
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
|
||||
let objects_stream = tenants.map_ok(|tenant_shard_id| {
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
let s3_client = s3_client.clone();
|
||||
async move {
|
||||
let mut objects = Vec::new();
|
||||
let mut total_objects_ctr = 0u64;
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
total_objects_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok((tenant_shard_id, objects, total_objects_ctr))
|
||||
}
|
||||
});
|
||||
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
|
||||
|
||||
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
|
||||
let mut objects = Vec::new();
|
||||
|
||||
let mut tenant_ctr = 0u64;
|
||||
let mut object_ctr = 0u64;
|
||||
while let Some(res) = objects_stream.next().await {
|
||||
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
|
||||
objects.extend_from_slice(&objects_slice);
|
||||
while let Some(tenant_shard_id) = tenants.next().await {
|
||||
let tenant_shard_id = tenant_shard_id?;
|
||||
let mut tenant_root = target.tenant_root(&tenant_shard_id);
|
||||
// We want the objects and not just common prefixes
|
||||
tenant_root.delimiter.clear();
|
||||
let mut continuation_token = None;
|
||||
loop {
|
||||
let fetch_response =
|
||||
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
|
||||
.await?;
|
||||
for obj in fetch_response.contents().iter().filter(|o| {
|
||||
if let Some(obj_size) = o.size {
|
||||
min_size as i64 <= obj_size
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}) {
|
||||
let key = obj.key().expect("couldn't get key").to_owned();
|
||||
let kind = LargeObjectKind::from_key(&key);
|
||||
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
|
||||
continue;
|
||||
}
|
||||
objects.push(LargeObject {
|
||||
key,
|
||||
size: obj.size.unwrap() as u64,
|
||||
kind,
|
||||
})
|
||||
}
|
||||
object_ctr += fetch_response.contents().len() as u64;
|
||||
match fetch_response.next_continuation_token {
|
||||
Some(new_token) => continuation_token = Some(new_token),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
object_ctr += total_objects_ctr;
|
||||
tenant_ctr += 1;
|
||||
if tenant_ctr % 100 == 0 {
|
||||
if tenant_ctr % 50 == 0 {
|
||||
tracing::info!(
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
|
||||
objects.len()
|
||||
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let bucket_name = target.bucket_name();
|
||||
tracing::info!(
|
||||
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
|
||||
objects.len()
|
||||
);
|
||||
Ok(LargeObjectListing { objects })
|
||||
}
|
||||
|
||||
@@ -78,8 +78,6 @@ enum Command {
|
||||
min_size: u64,
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
ignore_deltas: bool,
|
||||
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
|
||||
concurrency: usize,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -212,15 +210,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
Command::FindLargeObjects {
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
concurrency,
|
||||
} => {
|
||||
let summary = find_large_objects::find_large_objects(
|
||||
bucket_config,
|
||||
min_size,
|
||||
ignore_deltas,
|
||||
concurrency,
|
||||
)
|
||||
.await?;
|
||||
let summary =
|
||||
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
|
||||
.await?;
|
||||
println!("{}", serde_json::to_string(&summary).unwrap());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
|
||||
pcur.execute(f"INSERT into t values ({n_records}, 0)")
|
||||
n_records += 1
|
||||
with sub.cursor() as scur:
|
||||
wait_until(60, 0.5, check_that_changes_propagated)
|
||||
wait_until(10, 0.5, check_that_changes_propagated)
|
||||
|
||||
Reference in New Issue
Block a user