Compare commits

...

3 Commits

Author SHA1 Message Date
Heikki Linnakangas
7eefad691c Improve the scaling, add Play/Stop buttons 2023-03-22 19:40:54 +02:00
Heikki Linnakangas
fe59a063ea WIP: Collect and draw layer trace 2023-03-22 19:40:54 +02:00
Heikki Linnakangas
ae8e5b3a8e Add test from PR #3673 2023-03-22 19:40:54 +02:00
5 changed files with 818 additions and 5 deletions

View File

@@ -0,0 +1,541 @@
use anyhow::Result;
use pageserver::repository::Key;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::io::{self, BufRead};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
fmt::Write,
ops::Range,
};
use svg_fmt::{rgb, BeginSvg, EndSvg, Fill, Stroke, Style};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
// Map values to their compressed coordinate - the index the value
// would have in a sorted and deduplicated list of all values.
struct CoordinateMap<T: Ord + Copy> {
map: BTreeMap<T, usize>,
stretch: f32
}
impl<T: Ord + Copy> CoordinateMap<T> {
fn new(coords: Vec<T>, stretch: f32) -> Self {
let set: BTreeSet<T> = coords.into_iter().collect();
let mut map: BTreeMap<T, usize> = BTreeMap::new();
for (i, e) in set.iter().enumerate() {
map.insert(*e, i);
}
Self { map, stretch }
}
fn map(&self, val: T) -> f32 {
*self.map.get(&val).unwrap() as f32 * self.stretch
}
fn max(&self) -> f32 {
self.map.len() as f32 * self.stretch
}
}
fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let split: Vec<&str> = name.split("__").collect();
let keys: Vec<&str> = split[0].split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
if lsns.len() == 1 {
lsns.push(lsns[0]);
}
let keys = Key::from_hex(keys[0]).unwrap()..Key::from_hex(keys[1]).unwrap();
let lsns = Lsn::from_hex(lsns[0]).unwrap()..Lsn::from_hex(lsns[1]).unwrap();
(keys, lsns)
}
#[derive(Serialize, Deserialize, PartialEq)]
enum LayerTraceOp {
#[serde(rename = "evict")]
Evict,
#[serde(rename = "flush")]
Flush,
#[serde(rename = "compact_create")]
CompactCreate,
#[serde(rename = "compact_delete")]
CompactDelete,
#[serde(rename = "image_create")]
ImageCreate,
#[serde(rename = "gc_delete")]
GcDelete,
#[serde(rename = "gc_start")]
GcStart,
}
impl std::fmt::Display for LayerTraceOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
let op_str = match self {
LayerTraceOp::Evict => "evict",
LayerTraceOp::Flush => "flush",
LayerTraceOp::CompactCreate => "compact_create",
LayerTraceOp::CompactDelete => "compact_delete",
LayerTraceOp::ImageCreate => "image_create",
LayerTraceOp::GcDelete => "gc_delete",
LayerTraceOp::GcStart => "gc_start",
};
f.write_str(op_str)
}
}
#[serde_with::serde_as]
#[derive(Serialize, Deserialize)]
struct LayerTraceLine {
time: u64,
op: LayerTraceOp,
#[serde(default)]
filename: String,
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
cutoff: Option<Lsn>,
}
struct LayerTraceFile {
filename: String,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
}
impl LayerTraceFile {
fn is_image(&self) -> bool {
self.lsn_range.start == self.lsn_range.end
}
}
struct LayerTraceEvent {
time_rel: u64,
op: LayerTraceOp,
filename: String,
}
struct GcEvent {
time_rel: u64,
cutoff: Lsn,
}
fn main() -> Result<()> {
// Parse trace lines from stdin
let stdin = io::stdin();
let mut files: HashMap<String, LayerTraceFile> = HashMap::new();
let mut layer_events: Vec<LayerTraceEvent> = Vec::new();
let mut gc_events: Vec<GcEvent> = Vec::new();
let mut first_time: Option<u64> = None;
for line in stdin.lock().lines() {
let line = line.unwrap();
let parsed_line: LayerTraceLine = serde_json::from_str(&line)?;
let time_rel = if let Some(first_time) = first_time {
parsed_line.time - first_time
} else {
first_time = Some(parsed_line.time);
0
};
if parsed_line.op == LayerTraceOp::GcStart {
gc_events.push(GcEvent {
time_rel,
cutoff: parsed_line.cutoff.unwrap(),
});
} else {
layer_events.push(LayerTraceEvent {
time_rel,
filename: parsed_line.filename.clone(),
op: parsed_line.op,
});
if !files.contains_key(&parsed_line.filename) {
let (key_range, lsn_range) = parse_filename(&parsed_line.filename);
files.insert(parsed_line.filename.clone(), LayerTraceFile {
filename: parsed_line.filename.clone(),
key_range,
lsn_range,
});
};
}
}
let mut last_time_rel = layer_events.last().unwrap().time_rel;
if let Some(last_gc) = gc_events.last() {
last_time_rel = std::cmp::min(last_gc.time_rel, last_time_rel);
}
// Collect all coordinates
let mut keys: Vec<Key> = vec![];
let mut lsns: Vec<Lsn> = vec![];
for f in files.values() {
keys.push(f.key_range.start);
keys.push(f.key_range.end);
lsns.push(f.lsn_range.start);
lsns.push(f.lsn_range.end);
}
for gc_event in &gc_events {
lsns.push(gc_event.cutoff);
}
// Analyze
let key_map = CoordinateMap::new(keys, 2.0);
// Stretch out vertically for better visibility
let lsn_map = CoordinateMap::new(lsns, 3.0);
// Initialize stats
let mut num_deltas = 0;
let mut num_images = 0;
let mut svg = String::new();
// Draw
writeln!(svg,
"{}",
BeginSvg {
w: key_map.max(),
h: lsn_map.max(),
}
)?;
let lsn_max = lsn_map.max();
// Sort the files by LSN, but so that image layers go after all delta layers
// The SVG is painted in the order the elements appear, and we want to draw
// image layers on top of the delta layers if they overlap
let mut files_sorted: Vec<LayerTraceFile> = files.into_values().collect();
files_sorted.sort_by(|a, b| {
if a.is_image() && !b.is_image() {
Ordering::Greater
} else if !a.is_image() && b.is_image() {
Ordering::Less
} else {
a.lsn_range.end.cmp(&b.lsn_range.end)
}
});
for f in files_sorted {
let key_start = key_map.map(f.key_range.start);
let key_end = key_map.map(f.key_range.end);
let key_diff = key_end - key_start;
if key_start >= key_end {
panic!("Invalid key range {}-{}", key_start, key_end);
}
let lsn_start = lsn_map.map(f.lsn_range.start);
let lsn_end = lsn_map.map(f.lsn_range.end);
// Fill in and thicken rectangle if it's an
// image layer so that we can see it.
let mut style = Style::default();
style.fill = Fill::Color(rgb(0x80, 0x80, 0x80));
style.stroke = Stroke::Color(rgb(0, 0, 0), 0.5);
let y_start = (lsn_max - lsn_start) as f32;
let y_end = (lsn_max - lsn_end) as f32;
let x_margin = 0.25;
let y_margin = 0.5;
match f.lsn_range.start.cmp(&f.lsn_range.end) {
Ordering::Less => {
num_deltas += 1;
write!(svg,
r#" <rect id="layer_{}" x="{}" y="{}" width="{}" height="{}" ry="{}" style="{}">"#,
f.filename,
key_start as f32 + x_margin,
y_end + y_margin,
key_diff as f32 - x_margin * 2.0,
y_start - y_end - y_margin * 2.0,
1.0, // border_radius,
style.to_string(),
)?;
write!(svg, "<title>{}<br>{} - {}</title>", f.filename, lsn_end, y_end)?;
writeln!(svg, "</rect>")?;
}
Ordering::Equal => {
num_images += 1;
//lsn_diff = 0.3;
//lsn_offset = -lsn_diff / 2.0;
//margin = 0.05;
style.fill = Fill::Color(rgb(0x80, 0, 0x80));
style.stroke = Stroke::Color(rgb(0x80, 0, 0x80), 3.0);
write!(svg,
r#" <line id="layer_{}" x1="{}" y1="{}" x2="{}" y2="{}" style="{}">"#,
f.filename,
key_start as f32 + x_margin,
y_end,
key_end as f32 - x_margin,
y_end,
style.to_string(),
)?;
write!(svg, "<title>{}<br>{} - {}</title>", f.filename, lsn_end, y_end)?;
writeln!(svg, "</line>")?;
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
}
}
for (idx, gc) in gc_events.iter().enumerate() {
let cutoff_lsn = lsn_map.map(gc.cutoff);
let mut style = Style::default();
style.fill = Fill::None;
style.stroke = Stroke::Color(rgb(0xff, 0, 0), 0.5);
let y = lsn_max - cutoff_lsn;
writeln!(svg,
r#" <line id="gc_{}" x1="{}" y1="{}" x2="{}" y2="{}" style="{}" />"#,
idx,
0,
y,
key_map.max(),
y,
style.to_string(),
)?;
}
writeln!(svg, "{}", EndSvg)?;
let mut layer_events_str = String::new();
let mut first = true;
for e in layer_events {
if !first {
writeln!(layer_events_str, ",")?;
}
write!(layer_events_str,
r#" {{"time_rel": {}, "filename": "{}", "op": "{}"}}"#,
e.time_rel, e.filename, e.op)?;
first = false;
}
writeln!(layer_events_str)?;
let mut gc_events_str = String::new();
let mut first = true;
for e in gc_events {
if !first {
writeln!(gc_events_str, ",")?;
}
write!(gc_events_str,
r#" {{"time_rel": {}, "cutoff_lsn": "{}"}}"#,
e.time_rel, e.cutoff)?;
first = false;
}
writeln!(gc_events_str)?;
println!(r#"<!DOCTYPE html>
<html>
<head>
<style>
/* Keep the slider pinned at top */
.topbar {{
display: block;
overflow: hidden;
background-color: lightgrey;
position: fixed;
top: 0;
width: 100%;
/* width: 500px; */
}}
.slidercontainer {{
float: left;
width: 50%;
margin-right: 200px;
}}
.slider {{
float: left;
width: 100%;
}}
.legend {{
width: 200px;
float: right;
}}
/* Main content */
.main {{
margin-top: 50px; /* Add a top margin to avoid content overlay */
}}
</style>
</head>
<body onload="init()">
<script type="text/javascript">
var layer_events = [{layer_events_str}]
var gc_events = [{gc_events_str}]
let ticker;
function init() {{
moveSlider({last_time_rel})
moveSlider(0)
moveSlider(last_slider_pos)
}}
function startAnimation() {{
ticker = setInterval(animateStep, 100);
}}
function stopAnimation() {{
clearInterval(ticker);
}}
function animateStep() {{
if (last_layer_event < layer_events.length - 1) {{
var slider = document.getElementById("time-slider");
let prevPos = slider.value
let nextEvent = last_layer_event
while (nextEvent < layer_events.length - 1) {{
if (layer_events[nextEvent].time_rel > prevPos) {{
break;
}}
nextEvent += 1;
}}
let nextPos = layer_events[nextEvent].time_rel
slider.value = nextPos
moveSlider(nextPos)
}}
}}
function redoLayerEvent(n, dir) {{
var layer = document.getElementById("layer_" + layer_events[n].filename);
switch (layer_events[n].op) {{
case "evict":
break;
case "flush":
layer.style.visibility = "visible";
break;
case "compact_create":
layer.style.visibility = "visible";
break;
case "image_create":
layer.style.visibility = "visible";
break;
case "compact_delete":
layer.style.visibility = "hidden";
break;
case "gc_delete":
layer.style.visibility = "hidden";
break;
case "gc_start":
layer.style.visibility = "hidden";
break;
}}
}}
function undoLayerEvent(n) {{
var layer = document.getElementById("layer_" + layer_events[n].filename);
switch (layer_events[n].op) {{
case "evict":
break;
case "flush":
layer.style.visibility = "hidden";
break;
case "compact_create":
layer.style.visibility = "hidden";
break;
case "image_create":
layer.style.visibility = "hidden";
break;
case "compact_delete":
layer.style.visibility = "visible";
break;
case "gc_delete":
layer.style.visibility = "visible";
break;
}}
}}
function redoGcEvent(n) {{
var prev_gc_bar = document.getElementById("gc_" + (n - 1));
var new_gc_bar = document.getElementById("gc_" + n);
prev_gc_bar.style.visibility = "hidden"
new_gc_bar.style.visibility = "visible"
}}
function undoGcEvent(n) {{
var prev_gc_bar = document.getElementById("gc_" + n);
var new_gc_bar = document.getElementById("gc_" + (n - 1));
prev_gc_bar.style.visibility = "hidden"
new_gc_bar.style.visibility = "visible"
}}
var last_slider_pos = 0
var last_layer_event = 0
var last_gc_event = 0
var moveSlider = function(new_pos) {{
if (new_pos > last_slider_pos) {{
while (last_layer_event < layer_events.length - 1) {{
if (layer_events[last_layer_event + 1].time_rel > new_pos) {{
break;
}}
last_layer_event += 1;
redoLayerEvent(last_layer_event)
}}
while (last_gc_event < gc_events.length - 1) {{
if (gc_events[last_gc_event + 1].time_rel > new_pos) {{
break;
}}
last_gc_event += 1;
redoGcEvent(last_gc_event)
}}
}}
if (new_pos < last_slider_pos) {{
while (last_layer_event > 0) {{
if (layer_events[last_layer_event - 1].time_rel < new_pos) {{
break;
}}
undoLayerEvent(last_layer_event)
last_layer_event -= 1;
}}
while (last_gc_event > 0) {{
if (gc_events[last_gc_event - 1].time_rel < new_pos) {{
break;
}}
undoGcEvent(last_gc_event)
last_gc_event -= 1;
}}
}}
last_slider_pos = new_pos;
document.getElementById("debug_pos").textContent=new_pos;
document.getElementById("debug_layer_event").textContent=last_layer_event + " " + layer_events[last_layer_event].time_rel + " " + layer_events[last_layer_event].op;
document.getElementById("debug_gc_event").textContent=last_gc_event + " " + gc_events[last_gc_event].time_rel;
}}
</script>
<div class="topbar">
<div class="slidercontainer">
<label for="time-slider">TIME</label>:
<input id="time-slider" class="slider" type="range" min="0" max="{last_time_rel}" value="0" oninput="moveSlider(this.value)"><br>
pos: <span id="debug_pos"></span><br>
event: <span id="debug_layer_event"></span><br>
gc: <span id="debug_gc_event"></span><br>
</div>
<button onclick="startAnimation()">Play</button>
<button onclick="stopAnimation()">Stop</button>
<svg class="legend">
<rect x=5 y=0 width=20 height=20 style="fill:rgb(128,128,128);stroke:rgb(0,0,0);stroke-width:0.5;fill-opacity:1;stroke-opacity:1;"/>
<line x1=5 y1=30 x2=25 y2=30 style="fill:rgb(128,0,128);stroke:rgb(128,0,128);stroke-width:3;fill-opacity:1;stroke-opacity:1;"/>
<line x1=0 y1=40 x2=30 y2=40 style="fill:none;stroke:rgb(255,0,0);stroke-width:0.5;fill-opacity:1;stroke-opacity:1;"/>
</svg>
</div>
<div class="main">
{svg}
</div>
</body>
</html>
"#);
eprintln!("num_images: {}", num_images);
eprintln!("num_deltas: {}", num_deltas);
Ok(())
}

View File

@@ -1,11 +1,12 @@
use crate::repository::{key_range_size, singleton_range, Key};
use postgres_ffi::BLCKSZ;
use std::ops::Range;
use tracing::debug;
///
/// Represents a set of Keys, in a compact form.
///
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct KeySpace {
/// Contiguous ranges of keys that belong to the key space. In key order,
/// and with no overlap.
@@ -61,6 +62,60 @@ impl KeySpace {
KeyPartitioning { parts }
}
/// Add range to keyspace.
///
/// Unlike KeySpaceAccum, it accepts key ranges in any order and overlapping ranges.
pub fn add_range(&mut self, range: Range<Key>) {
let start = range.start;
let mut end = range.end;
let mut prev_index = match self.ranges.binary_search_by_key(&end, |r| r.start) {
Ok(index) => index,
Err(0) => {
self.ranges.insert(0, range);
return;
}
Err(index) => index - 1,
};
loop {
let mut prev = &mut self.ranges[prev_index];
if prev.end >= start {
// two ranges overlap
if prev.start <= start {
// combine with prev range
if prev.end < end {
prev.end = end;
debug!("Extend wanted image {}..{}", prev.start, end);
}
return;
} else {
if prev.end > end {
end = prev.end;
}
self.ranges.remove(prev_index);
}
} else {
break;
}
if prev_index == 0 {
break;
}
prev_index -= 1;
}
debug!("Wanted image {}..{}", start, end);
self.ranges.insert(prev_index, start..end);
}
///
/// Check if key space contains overlapping range
///
pub fn overlaps(&self, range: &Range<Key>) -> bool {
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
Ok(_) => false,
Err(0) => false,
Err(index) => self.ranges[index - 1].end > range.start,
}
}
}
///

View File

@@ -2,6 +2,7 @@
mod eviction_task;
mod walreceiver;
mod layer_trace;
use anyhow::{anyhow, bail, ensure, Context};
use bytes::Bytes;
@@ -19,8 +20,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::fs;
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
@@ -115,6 +115,17 @@ pub struct Timeline {
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
/// Newly created image layer doesn't help to remove the delta layer, until the
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
/// gc_timeline may still want the new image layer to be created. To avoid redundant
/// image layers creation we should check if image layer exists but beyond PITR horizon.
/// This is why we need remember GC cutoff LSN.
///
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
@@ -216,6 +227,8 @@ pub struct Timeline {
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
state: watch::Sender<TimelineState>,
layer_trace_file: Mutex<Option<std::fs::File>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -312,7 +325,7 @@ impl LogicalSize {
// we change the type.
match self.initial_logical_size.get() {
Some(initial_size) => {
initial_size.checked_add_signed(size_increment)
initial_size.checked_add(size_increment.try_into().unwrap())
.with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}"))
.map(CurrentLogicalSize::Exact)
}
@@ -857,6 +870,7 @@ impl Timeline {
}
pub fn activate(self: &Arc<Self>) {
self.start_layer_tracing();
self.set_state(TimelineState::Active);
self.launch_wal_receiver();
self.launch_eviction_task();
@@ -1080,6 +1094,7 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.sub(layer_file_size);
self.trace_layer_evict(&local_layer.filename());
self.metrics.evictions.inc();
@@ -1186,6 +1201,7 @@ impl Timeline {
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -1248,6 +1264,8 @@ impl Timeline {
download_all_remote_layers_task_info: RwLock::new(None),
state,
layer_trace_file: Mutex::new(None),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1747,7 +1765,7 @@ impl Timeline {
.size_added_after_initial
.load(AtomicOrdering::Relaxed);
let sum = calculated_size.saturating_add_signed(added);
let sum = calculated_size.saturating_add(added.try_into().unwrap());
// set the gauge value before it can be set in `update_current_logical_size`.
self_clone.metrics.current_logical_size_gauge.set(sum);
@@ -2628,6 +2646,8 @@ impl Timeline {
self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
])?;
self.trace_layer_flush(&new_delta.filename());
// Add it to the layer map
self.layers
.write()
@@ -2683,6 +2703,30 @@ impl Timeline {
let layers = self.layers.read().unwrap();
let mut max_deltas = 0;
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
let img_range =
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
if wanted.overlaps(&img_range) {
//
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
// but create_image_layers creates image layers at last-record-lsn.
// So it's possible that gc_timeline decides that it wants new image layer to be created for a key range,
// and on next compcation create_image_layers creates the image layer.
// But on next GC cycle, gc_timeline still wantes the new image layer to be created,
// because the newly created image layer doesn't help to remove the delta layer,
// until the newly created image layer falls off the PITR horizon.
//
// So we should check if image layer beyond cutoff LSN already exists.
if !layers.image_layer_exists(&img_range, &(*cutoff_lsn..lsn))? {
debug!(
"Force generation of layer {}-{} wanted by GC)",
img_range.start, img_range.end
);
return Ok(true);
}
}
}
for part_range in &partition.ranges {
let image_coverage = layers.image_coverage(part_range, lsn)?;
@@ -2802,6 +2846,11 @@ impl Timeline {
image_layers.push(image_layer);
}
}
// All wanted layers are taken in account by time_for_new_image_layer.
// The wanted_image_layers could get updated out of turn and we could
// clear something which hasn't been looked at all. This is fine, because
// next gc round any wanted would get added back in.
*self.wanted_image_layers.lock().unwrap() = None;
// Sync the new layer to disk before adding it to the layer map, to make sure
// we don't garbage collect something based on the new layer, before it has
@@ -2838,6 +2887,7 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
self.trace_layer_image_create(&l.filename());
updates.insert_historic(Arc::new(l));
}
updates.flush();
@@ -3268,6 +3318,7 @@ impl Timeline {
self.metrics
.resident_physical_size_gauge
.add(metadata.len());
self.trace_layer_compact_create(&l.filename());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
@@ -3278,6 +3329,7 @@ impl Timeline {
// delete the old ones
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
self.trace_layer_compact_delete(&l.filename());
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs, l, &mut updates)?;
}
@@ -3474,6 +3526,8 @@ impl Timeline {
info!("GC starting");
self.trace_gc_start(new_gc_cutoff);
debug!("retain_lsns: {:?}", retain_lsns);
// Before deleting any layers, we need to wait for their upload ops to finish.
@@ -3488,6 +3542,7 @@ impl Timeline {
}
let mut layers_to_remove = Vec::new();
let mut wanted_image_layers = KeySpace::default();
// Scan all layers in the timeline (remote or on-disk).
//
@@ -3571,6 +3626,15 @@ impl Timeline {
"keeping {} because it is the latest layer",
l.filename().file_name()
);
// Collect delta key ranges that need image layers to allow garbage
// collecting the layers.
// It is not so obvious whether we need to propagate information only about
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if l.is_incremental() && !LayerMap::is_l0(&*l) {
wanted_image_layers.add_range(l.get_key_range());
}
result.layers_not_updated += 1;
continue 'outer;
}
@@ -3583,6 +3647,10 @@ impl Timeline {
);
layers_to_remove.push(Arc::clone(&l));
}
self.wanted_image_layers
.lock()
.unwrap()
.replace((new_gc_cutoff, wanted_image_layers));
let mut updates = layers.batch_update();
if !layers_to_remove.is_empty() {
@@ -3597,6 +3665,7 @@ impl Timeline {
{
for doomed_layer in layers_to_remove {
layer_names_to_delete.push(doomed_layer.filename());
self.trace_layer_gc_delete(&doomed_layer.filename());
self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}

View File

@@ -0,0 +1,81 @@
use crate::tenant::timeline::LayerFileName;
use crate::tenant::Timeline;
use std::io::Write;
use std::time::UNIX_EPOCH;
use tracing::*;
use std::fs::File;
use utils::lsn::Lsn;
impl Timeline {
pub(super) fn start_layer_tracing(&self) {
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
let path = timeline_path.join("layer_trace");
match File::options()
.create(true)
.append(true)
.open(&path)
{
Ok(file) => {
info!("enabled layer tracing");
self.layer_trace_file.lock().unwrap().replace(file);
},
Err(e) => {
warn!("could not open layer tracing file \"{}\": {}", path.display(), e);
}
}
}
fn trace_op(&self, op: &str, filename: &str) {
let opt_out = &self.layer_trace_file.lock().unwrap();
if let Some(mut out) = opt_out.as_ref() {
if let Ok(elapsed) = UNIX_EPOCH.elapsed() {
let time = elapsed.as_millis();
let _ = writeln!(out, "{{ \"time\": {time}, \"op\": \"{op}\", \"filename\": \"{filename}\"}}");
}
else {
warn!("could not get current timestamp");
}
}
}
pub(super) fn trace_layer_evict(&self, filename: &LayerFileName) {
self.trace_op("evict", &filename.file_name())
}
pub(super) fn trace_layer_flush(&self, filename: &LayerFileName) {
self.trace_op("flush", &filename.file_name())
}
pub(super) fn trace_layer_compact_create(&self, filename: &LayerFileName) {
self.trace_op("compact_create", &filename.file_name())
}
pub(super) fn trace_layer_compact_delete(&self, filename: &LayerFileName) {
self.trace_op("compact_delete", &filename.file_name())
}
pub(super) fn trace_layer_image_create(&self, filename: &LayerFileName) {
self.trace_op("image_create", &filename.file_name())
}
pub(super) fn trace_layer_gc_delete(&self, filename: &LayerFileName) {
self.trace_op("gc_delete", &filename.file_name())
}
// TODO: also report 'retain_lsns'
pub(super) fn trace_gc_start(&self, cutoff_lsn: Lsn) {
let opt_out = &self.layer_trace_file.lock().unwrap();
if let Some(mut out) = opt_out.as_ref() {
if let Ok(elapsed) = UNIX_EPOCH.elapsed() {
let time = elapsed.as_millis();
let _ = writeln!(out, "{{ \"time\": {time}, \"op\": \"gc_start\", \"cutoff\": \"{cutoff_lsn}\"}}");
}
else {
warn!("could not get current timestamp");
}
}
}
}

View File

@@ -0,0 +1,67 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(10000)
def test_gc_old_layers(neon_env_builder: NeonEnvBuilder):
"""
Test that GC is able to collect all old layers even if them are forming
"stairs" and there are not three delta layers since last image layer.
Information about image layers needed to collect old layers should
be propagated by GC to compaction task which should take in in account
when make a decision which new image layers needs to be created.
"""
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
tenant_id, _ = env.neon_cli.create_tenant(
conf={
# disable default GC and compaction
"gc_period": "1000 m",
"compaction_period": "0 s",
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
# set PITR interval to be small, so we can do GC
"pitr_interval": "1 s",
# "compaction_threshold": "3",
# "image_creation_threshold": "2",
}
)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
n_steps = 10
n_update_iters = 100
step_size = 10000
with pg.cursor() as cur:
cur.execute("SET statement_timeout='1000s'")
cur.execute(
"CREATE TABLE t(pk bigint primary key, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
)
for step in range(n_steps):
cur.execute(
f"INSERT INTO t (pk) values (generate_series({step*step_size+1},{(step+1)*step_size}))"
)
for i in range(n_update_iters):
cur.execute(
f"UPDATE t set count=count+1 where pk BETWEEN {(step-1)*step_size+1+i*step_size//n_update_iters} AND {step*step_size+i*step_size//n_update_iters}"
)
cur.execute("vacuum t")
# cur.execute("select pg_table_size('t')")
# logical_size = cur.fetchone()[0]
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
log.info(f"Logical storage size {logical_size}")
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical storage size {physical_size}")
client.timeline_checkpoint(tenant_id, timeline_id)
# Do compaction and GC
client.timeline_gc(tenant_id, timeline_id, 0)
client.timeline_compact(tenant_id, timeline_id)
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
log.info(f"Physical after GC {physical_size}")