diff --git a/pageserver/src/bin/draw_layer-trace.rs b/pageserver/src/bin/draw_layer-trace.rs new file mode 100644 index 0000000000..ac4cf83431 --- /dev/null +++ b/pageserver/src/bin/draw_layer-trace.rs @@ -0,0 +1,493 @@ +use anyhow::Result; +use pageserver::repository::Key; +use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::io::{self, BufRead}; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap}, + fmt::Write, + ops::Range, +}; +use svg_fmt::{rgb, BeginSvg, EndSvg, Fill, Stroke, Style}; +use utils::{lsn::Lsn, project_git_version}; + +project_git_version!(GIT_VERSION); + +// Map values to their compressed coordinate - the index the value +// would have in a sorted and deduplicated list of all values. +fn build_coordinate_compression_map(coords: Vec) -> BTreeMap { + let set: BTreeSet = coords.into_iter().collect(); + + let mut map: BTreeMap = BTreeMap::new(); + for (i, e) in set.iter().enumerate() { + map.insert(*e, i); + } + + map +} + +fn parse_filename(name: &str) -> (Range, Range) { + let split: Vec<&str> = name.split("__").collect(); + let keys: Vec<&str> = split[0].split('-').collect(); + let mut lsns: Vec<&str> = split[1].split('-').collect(); + if lsns.len() == 1 { + lsns.push(lsns[0]); + } + + let keys = Key::from_hex(keys[0]).unwrap()..Key::from_hex(keys[1]).unwrap(); + let lsns = Lsn::from_hex(lsns[0]).unwrap()..Lsn::from_hex(lsns[1]).unwrap(); + (keys, lsns) +} + +#[derive(Serialize, Deserialize, PartialEq)] +enum LayerTraceOp { + #[serde(rename = "evict")] + Evict, + #[serde(rename = "flush")] + Flush, + #[serde(rename = "compact_create")] + CompactCreate, + #[serde(rename = "compact_delete")] + CompactDelete, + #[serde(rename = "image_create")] + ImageCreate, + #[serde(rename = "gc_delete")] + GcDelete, + #[serde(rename = "gc_start")] + GcStart, +} + +impl std::fmt::Display for LayerTraceOp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + let op_str = match self { + LayerTraceOp::Evict => "evict", + LayerTraceOp::Flush => "flush", + LayerTraceOp::CompactCreate => "compact_create", + LayerTraceOp::CompactDelete => "compact_delete", + LayerTraceOp::ImageCreate => "image_create", + LayerTraceOp::GcDelete => "gc_delete", + LayerTraceOp::GcStart => "gc_start", + }; + f.write_str(op_str) + } +} + +#[serde_with::serde_as] +#[derive(Serialize, Deserialize)] +struct LayerTraceLine { + time: u64, + op: LayerTraceOp, + #[serde(default)] + filename: String, + #[serde_as(as = "Option")] + cutoff: Option, +} + +struct LayerTraceFile { + filename: String, + key_range: Range, + lsn_range: Range, +} + +impl LayerTraceFile { + fn is_image(&self) -> bool { + self.lsn_range.start == self.lsn_range.end + } +} + +struct LayerTraceEvent { + time_rel: u64, + op: LayerTraceOp, + filename: String, +} + +struct GcEvent { + time_rel: u64, + cutoff: Lsn, +} + +fn main() -> Result<()> { + // Parse trace lines from stdin + let stdin = io::stdin(); + + let mut files: HashMap = HashMap::new(); + let mut layer_events: Vec = Vec::new(); + let mut gc_events: Vec = Vec::new(); + let mut first_time: Option = None; + for line in stdin.lock().lines() { + let line = line.unwrap(); + let parsed_line: LayerTraceLine = serde_json::from_str(&line)?; + + let time_rel = if let Some(first_time) = first_time { + parsed_line.time - first_time + } else { + first_time = Some(parsed_line.time); + 0 + }; + + if parsed_line.op == LayerTraceOp::GcStart { + gc_events.push(GcEvent { + time_rel, + cutoff: parsed_line.cutoff.unwrap(), + }); + } else { + layer_events.push(LayerTraceEvent { + time_rel, + filename: parsed_line.filename.clone(), + op: parsed_line.op, + }); + + if !files.contains_key(&parsed_line.filename) { + let (key_range, lsn_range) = parse_filename(&parsed_line.filename); + files.insert(parsed_line.filename.clone(), LayerTraceFile { + filename: parsed_line.filename.clone(), + key_range, + lsn_range, + }); + }; + } + } + let mut last_time_rel = layer_events.last().unwrap().time_rel; + if let Some(last_gc) = gc_events.last() { + last_time_rel = std::cmp::min(last_gc.time_rel, last_time_rel); + } + + // Collect all coordinates + let mut keys: Vec = vec![]; + let mut lsns: Vec = vec![]; + for f in files.values() { + keys.push(f.key_range.start); + keys.push(f.key_range.end); + lsns.push(f.lsn_range.start); + lsns.push(f.lsn_range.end); + } + for gc_event in &gc_events { + lsns.push(gc_event.cutoff); + } + + // Analyze + let key_map = build_coordinate_compression_map(keys); + let lsn_map = build_coordinate_compression_map(lsns); + + // Initialize stats + let mut num_deltas = 0; + let mut num_images = 0; + + let mut svg = String::new(); + + // Draw + let stretch = 3.0; // Stretch out vertically for better visibility + writeln!(svg, + "{}", + BeginSvg { + w: key_map.len() as f32, + h: stretch * lsn_map.len() as f32 + } + )?; + let lsn_max = lsn_map.len(); + + // Sort the files by LSN, but so that image layers go after all delta layers + // The SVG is painted in the order the elements appear, and we want to draw + // image layers on top of the delta layers if they overlap + let mut files_sorted: Vec = files.into_values().collect(); + files_sorted.sort_by(|a, b| { + if a.is_image() && !b.is_image() { + Ordering::Greater + } else if !a.is_image() && b.is_image() { + Ordering::Less + } else { + a.lsn_range.end.cmp(&b.lsn_range.end) + } + }); + + for f in files_sorted { + let key_start = *key_map.get(&f.key_range.start).unwrap(); + let key_end = *key_map.get(&f.key_range.end).unwrap(); + let key_diff = key_end - key_start; + + if key_start >= key_end { + panic!("Invalid key range {}-{}", key_start, key_end); + } + + let lsn_start = *lsn_map.get(&f.lsn_range.start).unwrap(); + let lsn_end = *lsn_map.get(&f.lsn_range.end).unwrap(); + + // Fill in and thicken rectangle if it's an + // image layer so that we can see it. + let mut style = Style::default(); + style.fill = Fill::Color(rgb(0x80, 0x80, 0x80)); + style.stroke = Stroke::Color(rgb(0, 0, 0), 0.5); + + let y_start = stretch * (lsn_max - lsn_start) as f32; + let y_end = stretch * (lsn_max - lsn_end) as f32; + + let x_margin = 0.25; + let y_margin = 0.5; + + match lsn_start.cmp(&lsn_end) { + Ordering::Less => { + num_deltas += 1; + write!(svg, + r#" "#, + f.filename, + key_start as f32 + x_margin, + y_end + y_margin, + key_diff as f32 - x_margin * 2.0, + y_start - y_end - y_margin * 2.0, + 1.0, // border_radius, + style.to_string(), + )?; + write!(svg, "{}<br>{} - {}", f.filename, lsn_end, y_end)?; + writeln!(svg, "")?; + } + Ordering::Equal => { + num_images += 1; + //lsn_diff = 0.3; + //lsn_offset = -lsn_diff / 2.0; + //margin = 0.05; + style.fill = Fill::Color(rgb(0x80, 0, 0x80)); + style.stroke = Stroke::Color(rgb(0x80, 0, 0x80), 3.0); + write!(svg, + r#" "#, + f.filename, + key_start as f32 + x_margin, + y_end, + key_end as f32 - x_margin, + y_end, + style.to_string(), + )?; + write!(svg, "{}<br>{} - {}", f.filename, lsn_end, y_end)?; + writeln!(svg, "")?; + } + Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end), + } + } + + for (idx, gc) in gc_events.iter().enumerate() { + let cutoff_lsn = *lsn_map.get(&gc.cutoff).unwrap(); + + let mut style = Style::default(); + style.fill = Fill::None; + style.stroke = Stroke::Color(rgb(0xff, 0, 0), 0.5); + + let y = stretch * (lsn_max as f32 - (cutoff_lsn as f32)); + writeln!(svg, + r#" "#, + idx, + 0, + y, + key_map.len() as f32, + y, + style.to_string(), + )?; + } + + writeln!(svg, "{}", EndSvg)?; + + let mut layer_events_str = String::new(); + let mut first = true; + for e in layer_events { + if !first { + writeln!(layer_events_str, ",")?; + } + write!(layer_events_str, + r#" {{"time_rel": {}, "filename": "{}", "op": "{}"}}"#, + e.time_rel, e.filename, e.op)?; + first = false; + } + writeln!(layer_events_str)?; + + let mut gc_events_str = String::new(); + let mut first = true; + for e in gc_events { + if !first { + writeln!(gc_events_str, ",")?; + } + write!(gc_events_str, + r#" {{"time_rel": {}, "cutoff_lsn": "{}"}}"#, + e.time_rel, e.cutoff)?; + first = false; + } + writeln!(gc_events_str)?; + + println!(r#" + + + + + + + + +
+
+ : +
+ + pos:
+ event:
+ gc:
+
+ + + + + + +
+ +
+{svg} +
+ + +"#); + + eprintln!("num_images: {}", num_images); + eprintln!("num_deltas: {}", num_deltas); + + Ok(()) +} diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index 64024a2d8d..2b51877ba0 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -1,11 +1,12 @@ use crate::repository::{key_range_size, singleton_range, Key}; use postgres_ffi::BLCKSZ; use std::ops::Range; +use tracing::debug; /// /// Represents a set of Keys, in a compact form. /// -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct KeySpace { /// Contiguous ranges of keys that belong to the key space. In key order, /// and with no overlap. @@ -61,6 +62,60 @@ impl KeySpace { KeyPartitioning { parts } } + + /// Add range to keyspace. + /// + /// Unlike KeySpaceAccum, it accepts key ranges in any order and overlapping ranges. + pub fn add_range(&mut self, range: Range) { + let start = range.start; + let mut end = range.end; + let mut prev_index = match self.ranges.binary_search_by_key(&end, |r| r.start) { + Ok(index) => index, + Err(0) => { + self.ranges.insert(0, range); + return; + } + Err(index) => index - 1, + }; + loop { + let mut prev = &mut self.ranges[prev_index]; + if prev.end >= start { + // two ranges overlap + if prev.start <= start { + // combine with prev range + if prev.end < end { + prev.end = end; + debug!("Extend wanted image {}..{}", prev.start, end); + } + return; + } else { + if prev.end > end { + end = prev.end; + } + self.ranges.remove(prev_index); + } + } else { + break; + } + if prev_index == 0 { + break; + } + prev_index -= 1; + } + debug!("Wanted image {}..{}", start, end); + self.ranges.insert(prev_index, start..end); + } + + /// + /// Check if key space contains overlapping range + /// + pub fn overlaps(&self, range: &Range) -> bool { + match self.ranges.binary_search_by_key(&range.end, |r| r.start) { + Ok(_) => false, + Err(0) => false, + Err(index) => self.ranges[index - 1].end > range.start, + } + } } /// diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 33909e749b..d0bca02184 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2,6 +2,7 @@ mod eviction_task; mod walreceiver; +mod layer_trace; use anyhow::{anyhow, bail, ensure, Context}; use bytes::Bytes; @@ -19,8 +20,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; -use std::collections::BinaryHeap; -use std::collections::HashMap; +use std::collections::{BinaryHeap, HashMap}; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; @@ -115,6 +115,17 @@ pub struct Timeline { pub(super) layers: RwLock>, + /// Set of key ranges which should be covered by image layers to + /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. + /// It is used by compaction task when it checks if new image layer should be created. + /// Newly created image layer doesn't help to remove the delta layer, until the + /// newly created image layer falls off the PITR horizon. So on next GC cycle, + /// gc_timeline may still want the new image layer to be created. To avoid redundant + /// image layers creation we should check if image layer exists but beyond PITR horizon. + /// This is why we need remember GC cutoff LSN. + /// + wanted_image_layers: Mutex>, + last_freeze_at: AtomicLsn, // Atomic would be more appropriate here. last_freeze_ts: RwLock, @@ -216,6 +227,8 @@ pub struct Timeline { download_all_remote_layers_task_info: RwLock>, state: watch::Sender, + + layer_trace_file: Mutex>, } /// Internal structure to hold all data needed for logical size calculation. @@ -312,7 +325,7 @@ impl LogicalSize { // we change the type. match self.initial_logical_size.get() { Some(initial_size) => { - initial_size.checked_add_signed(size_increment) + initial_size.checked_add(size_increment.try_into().unwrap()) .with_context(|| format!("Overflow during logical size calculation, initial_size: {initial_size}, size_increment: {size_increment}")) .map(CurrentLogicalSize::Exact) } @@ -857,6 +870,7 @@ impl Timeline { } pub fn activate(self: &Arc) { + self.start_layer_tracing(); self.set_state(TimelineState::Active); self.launch_wal_receiver(); self.launch_eviction_task(); @@ -1080,6 +1094,7 @@ impl Timeline { self.metrics .resident_physical_size_gauge .sub(layer_file_size); + self.trace_layer_evict(&local_layer.filename()); self.metrics.evictions.inc(); @@ -1186,6 +1201,7 @@ impl Timeline { tenant_id, pg_version, layers: RwLock::new(LayerMap::default()), + wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1248,6 +1264,8 @@ impl Timeline { download_all_remote_layers_task_info: RwLock::new(None), state, + + layer_trace_file: Mutex::new(None), }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -1747,7 +1765,7 @@ impl Timeline { .size_added_after_initial .load(AtomicOrdering::Relaxed); - let sum = calculated_size.saturating_add_signed(added); + let sum = calculated_size.saturating_add(added.try_into().unwrap()); // set the gauge value before it can be set in `update_current_logical_size`. self_clone.metrics.current_logical_size_gauge.set(sum); @@ -2628,6 +2646,8 @@ impl Timeline { self.conf.timeline_path(&self.timeline_id, &self.tenant_id), ])?; + self.trace_layer_flush(&new_delta.filename()); + // Add it to the layer map self.layers .write() @@ -2683,6 +2703,30 @@ impl Timeline { let layers = self.layers.read().unwrap(); let mut max_deltas = 0; + let wanted_image_layers = self.wanted_image_layers.lock().unwrap(); + if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers { + let img_range = + partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; + if wanted.overlaps(&img_range) { + // + // gc_timeline only pays attention to image layers that are older than the GC cutoff, + // but create_image_layers creates image layers at last-record-lsn. + // So it's possible that gc_timeline decides that it wants new image layer to be created for a key range, + // and on next compcation create_image_layers creates the image layer. + // But on next GC cycle, gc_timeline still wantes the new image layer to be created, + // because the newly created image layer doesn't help to remove the delta layer, + // until the newly created image layer falls off the PITR horizon. + // + // So we should check if image layer beyond cutoff LSN already exists. + if !layers.image_layer_exists(&img_range, &(*cutoff_lsn..lsn))? { + debug!( + "Force generation of layer {}-{} wanted by GC)", + img_range.start, img_range.end + ); + return Ok(true); + } + } + } for part_range in &partition.ranges { let image_coverage = layers.image_coverage(part_range, lsn)?; @@ -2802,6 +2846,11 @@ impl Timeline { image_layers.push(image_layer); } } + // All wanted layers are taken in account by time_for_new_image_layer. + // The wanted_image_layers could get updated out of turn and we could + // clear something which hasn't been looked at all. This is fine, because + // next gc round any wanted would get added back in. + *self.wanted_image_layers.lock().unwrap() = None; // Sync the new layer to disk before adding it to the layer map, to make sure // we don't garbage collect something based on the new layer, before it has @@ -2838,6 +2887,7 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); + self.trace_layer_image_create(&l.filename()); updates.insert_historic(Arc::new(l)); } updates.flush(); @@ -3268,6 +3318,7 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); + self.trace_layer_compact_create(&l.filename()); new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); @@ -3278,6 +3329,7 @@ impl Timeline { // delete the old ones let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { + self.trace_layer_compact_delete(&l.filename()); layer_names_to_delete.push(l.filename()); self.delete_historic_layer(layer_removal_cs, l, &mut updates)?; } @@ -3474,6 +3526,8 @@ impl Timeline { info!("GC starting"); + self.trace_gc_start(new_gc_cutoff); + debug!("retain_lsns: {:?}", retain_lsns); // Before deleting any layers, we need to wait for their upload ops to finish. @@ -3488,6 +3542,7 @@ impl Timeline { } let mut layers_to_remove = Vec::new(); + let mut wanted_image_layers = KeySpace::default(); // Scan all layers in the timeline (remote or on-disk). // @@ -3571,6 +3626,15 @@ impl Timeline { "keeping {} because it is the latest layer", l.filename().file_name() ); + // Collect delta key ranges that need image layers to allow garbage + // collecting the layers. + // It is not so obvious whether we need to propagate information only about + // delta layers. Image layers can form "stairs" preventing old image from been deleted. + // But image layers are in any case less sparse than delta layers. Also we need some + // protection from replacing recent image layers with new one after each GC iteration. + if l.is_incremental() && !LayerMap::is_l0(&*l) { + wanted_image_layers.add_range(l.get_key_range()); + } result.layers_not_updated += 1; continue 'outer; } @@ -3583,6 +3647,10 @@ impl Timeline { ); layers_to_remove.push(Arc::clone(&l)); } + self.wanted_image_layers + .lock() + .unwrap() + .replace((new_gc_cutoff, wanted_image_layers)); let mut updates = layers.batch_update(); if !layers_to_remove.is_empty() { @@ -3597,6 +3665,7 @@ impl Timeline { { for doomed_layer in layers_to_remove { layer_names_to_delete.push(doomed_layer.filename()); + self.trace_layer_gc_delete(&doomed_layer.filename()); self.delete_historic_layer(layer_removal_cs, doomed_layer, &mut updates)?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } diff --git a/pageserver/src/tenant/timeline/layer_trace.rs b/pageserver/src/tenant/timeline/layer_trace.rs new file mode 100644 index 0000000000..d007ce4784 --- /dev/null +++ b/pageserver/src/tenant/timeline/layer_trace.rs @@ -0,0 +1,81 @@ +use crate::tenant::timeline::LayerFileName; +use crate::tenant::Timeline; +use std::io::Write; +use std::time::UNIX_EPOCH; +use tracing::*; +use std::fs::File; +use utils::lsn::Lsn; + +impl Timeline { + + pub(super) fn start_layer_tracing(&self) { + let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + + let path = timeline_path.join("layer_trace"); + + match File::options() + .create(true) + .append(true) + .open(&path) + { + Ok(file) => { + info!("enabled layer tracing"); + self.layer_trace_file.lock().unwrap().replace(file); + }, + Err(e) => { + warn!("could not open layer tracing file \"{}\": {}", path.display(), e); + } + } + } + + fn trace_op(&self, op: &str, filename: &str) { + let opt_out = &self.layer_trace_file.lock().unwrap(); + if let Some(mut out) = opt_out.as_ref() { + if let Ok(elapsed) = UNIX_EPOCH.elapsed() { + let time = elapsed.as_millis(); + let _ = writeln!(out, "{{ \"time\": {time}, \"op\": \"{op}\", \"filename\": \"{filename}\"}}"); + } + else { + warn!("could not get current timestamp"); + } + } + } + + pub(super) fn trace_layer_evict(&self, filename: &LayerFileName) { + self.trace_op("evict", &filename.file_name()) + } + + pub(super) fn trace_layer_flush(&self, filename: &LayerFileName) { + self.trace_op("flush", &filename.file_name()) + } + + pub(super) fn trace_layer_compact_create(&self, filename: &LayerFileName) { + self.trace_op("compact_create", &filename.file_name()) + } + + pub(super) fn trace_layer_compact_delete(&self, filename: &LayerFileName) { + self.trace_op("compact_delete", &filename.file_name()) + } + + pub(super) fn trace_layer_image_create(&self, filename: &LayerFileName) { + self.trace_op("image_create", &filename.file_name()) + } + + pub(super) fn trace_layer_gc_delete(&self, filename: &LayerFileName) { + self.trace_op("gc_delete", &filename.file_name()) + } + + // TODO: also report 'retain_lsns' + pub(super) fn trace_gc_start(&self, cutoff_lsn: Lsn) { + let opt_out = &self.layer_trace_file.lock().unwrap(); + if let Some(mut out) = opt_out.as_ref() { + if let Ok(elapsed) = UNIX_EPOCH.elapsed() { + let time = elapsed.as_millis(); + let _ = writeln!(out, "{{ \"time\": {time}, \"op\": \"gc_start\", \"cutoff\": \"{cutoff_lsn}\"}}"); + } + else { + warn!("could not get current timestamp"); + } + } + } +}