From 32a0e759bd57f40af6c168f3122a761fb154b5b1 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 13 Sep 2024 01:28:12 +0300 Subject: [PATCH 1/6] safekeeper: add wal_last_modified to debug_dump. Adds to debug_dump option to include highest modified time among all WAL segments. In passing replace some str with OsStr to have less unwraps. --- libs/postgres_ffi/src/xlog_utils.rs | 34 ++++++++++++++----- libs/postgres_ffi/wal_craft/src/lib.rs | 5 +-- .../wal_craft/src/xlog_utils_test.rs | 19 ++++++----- safekeeper/src/debug_dump.rs | 33 ++++++++++++++++++ safekeeper/src/http/routes.rs | 4 +++ safekeeper/src/wal_storage.rs | 25 ++++++-------- test_runner/regress/test_wal_acceptor.py | 1 + 7 files changed, 88 insertions(+), 33 deletions(-) diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 1873734753..a636bd2a97 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -26,6 +26,7 @@ use bytes::{Buf, Bytes}; use log::*; use serde::Serialize; +use std::ffi::OsStr; use std::fs::File; use std::io::prelude::*; use std::io::ErrorKind; @@ -78,19 +79,34 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize ) } -pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) { - let tli = u32::from_str_radix(&fname[0..8], 16).unwrap(); - let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo; - let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo; - (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli) +pub fn XLogFromFileName( + fname: &OsStr, + wal_seg_size: usize, +) -> anyhow::Result<(XLogSegNo, TimeLineID)> { + if let Some(fname_str) = fname.to_str() { + let tli = u32::from_str_radix(&fname_str[0..8], 16)?; + let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo; + let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo; + Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)) + } else { + anyhow::bail!("non-ut8 filename: {:?}", fname); + } } -pub fn IsXLogFileName(fname: &str) -> bool { - return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit()); +pub fn IsXLogFileName(fname: &OsStr) -> bool { + if let Some(fname) = fname.to_str() { + fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit()) + } else { + false + } } -pub fn IsPartialXLogFileName(fname: &str) -> bool { - fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8]) +pub fn IsPartialXLogFileName(fname: &OsStr) -> bool { + if let Some(fname) = fname.to_str() { + fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8])) + } else { + false + } } /// If LSN points to the beginning of the page, then shift it to first record, diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index ddaafe65f1..5c0abda522 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -7,6 +7,7 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; use postgres_ffi::{ XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD, }; +use std::ffi::OsStr; use std::path::{Path, PathBuf}; use std::process::Command; use std::time::{Duration, Instant}; @@ -135,8 +136,8 @@ impl Conf { pub fn pg_waldump( &self, - first_segment_name: &str, - last_segment_name: &str, + first_segment_name: &OsStr, + last_segment_name: &OsStr, ) -> anyhow::Result { let first_segment_file = self.datadir.join(first_segment_name); let last_segment_file = self.datadir.join(last_segment_name); diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs index 79d45de67a..9eb3f0e95a 100644 --- a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -4,6 +4,7 @@ use super::*; use crate::{error, info}; use regex::Regex; use std::cmp::min; +use std::ffi::OsStr; use std::fs::{self, File}; use std::io::Write; use std::{env, str::FromStr}; @@ -54,7 +55,7 @@ fn test_end_of_wal(test_name: &str) { .wal_dir() .read_dir() .unwrap() - .map(|f| f.unwrap().file_name().into_string().unwrap()) + .map(|f| f.unwrap().file_name()) .filter(|fname| IsXLogFileName(fname)) .max() .unwrap(); @@ -70,11 +71,11 @@ fn test_end_of_wal(test_name: &str) { start_lsn ); for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { - let fname = file.file_name().into_string().unwrap(); + let fname = file.file_name(); if !IsXLogFileName(&fname) { continue; } - let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); + let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap(); let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); if seg_start_lsn > u64::from(*start_lsn) { continue; @@ -93,10 +94,10 @@ fn test_end_of_wal(test_name: &str) { } } -fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn { +fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn { // Get the actual end of WAL by pg_waldump let waldump_output = cfg - .pg_waldump("000000010000000000000001", last_segment) + .pg_waldump(OsStr::new("000000010000000000000001"), last_segment) .unwrap() .stderr; let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); @@ -117,7 +118,7 @@ fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn { fn check_end_of_wal( cfg: &crate::Conf, - last_segment: &str, + last_segment: &OsStr, start_lsn: Lsn, expected_end_of_wal: Lsn, ) { @@ -132,7 +133,8 @@ fn check_end_of_wal( // Rename file to partial to actually find last valid lsn, then rename it back. fs::rename( cfg.wal_dir().join(last_segment), - cfg.wal_dir().join(format!("{}.partial", last_segment)), + cfg.wal_dir() + .join(format!("{}.partial", last_segment.to_str().unwrap())), ) .unwrap(); let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); @@ -142,7 +144,8 @@ fn check_end_of_wal( ); assert_eq!(wal_end, expected_end_of_wal); fs::rename( - cfg.wal_dir().join(format!("{}.partial", last_segment)), + cfg.wal_dir() + .join(format!("{}.partial", last_segment.to_str().unwrap())), cfg.wal_dir().join(last_segment), ) .unwrap(); diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index 15b0272cd9..589536c7a8 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -17,6 +17,7 @@ use postgres_ffi::MAX_SEND_SIZE; use serde::Deserialize; use serde::Serialize; +use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName}; use sha2::{Digest, Sha256}; use utils::id::NodeId; use utils::id::TenantTimelineId; @@ -51,6 +52,9 @@ pub struct Args { /// Dump full term history. True by default. pub dump_term_history: bool, + /// Dump last modified time of WAL segments. Uses value of `dump_all` by default. + pub dump_wal_last_modified: bool, + /// Filter timelines by tenant_id. pub tenant_id: Option, @@ -128,12 +132,19 @@ async fn build_from_tli_dump( None }; + let wal_last_modified = if args.dump_wal_last_modified { + get_wal_last_modified(timeline_dir).ok().flatten() + } else { + None + }; + Timeline { tenant_id: timeline.ttid.tenant_id, timeline_id: timeline.ttid.timeline_id, control_file, memory, disk_content, + wal_last_modified, } } @@ -156,6 +167,7 @@ pub struct Timeline { pub control_file: Option, pub memory: Option, pub disk_content: Option, + pub wal_last_modified: Option>, } #[derive(Debug, Serialize, Deserialize)] @@ -302,6 +314,27 @@ fn build_file_info(entry: DirEntry) -> Result { }) } +/// Get highest modified time of WAL segments in the directory. +fn get_wal_last_modified(path: &Utf8Path) -> Result>> { + let mut res = None; + for entry in fs::read_dir(path)? { + if entry.is_err() { + continue; + } + let entry = entry?; + /* Ignore files that are not XLOG segments */ + let fname = entry.file_name(); + if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) { + continue; + } + + let metadata = entry.metadata()?; + let modified: DateTime = DateTime::from(metadata.modified()?); + res = std::cmp::max(res, Some(modified)); + } + Ok(res) +} + /// Converts SafeKeeperConf to Config, filtering out the fields that are not /// supposed to be exposed. fn build_config(config: SafeKeeperConf) -> Config { diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index e482edea55..b4590fe3e5 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -481,6 +481,7 @@ async fn dump_debug_handler(mut request: Request) -> Result let mut dump_memory: Option = None; let mut dump_disk_content: Option = None; let mut dump_term_history: Option = None; + let mut dump_wal_last_modified: Option = None; let mut tenant_id: Option = None; let mut timeline_id: Option = None; @@ -494,6 +495,7 @@ async fn dump_debug_handler(mut request: Request) -> Result "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?), "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?), "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?), + "dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?), "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?), "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?), _ => Err(ApiError::BadRequest(anyhow::anyhow!( @@ -508,6 +510,7 @@ async fn dump_debug_handler(mut request: Request) -> Result let dump_memory = dump_memory.unwrap_or(dump_all); let dump_disk_content = dump_disk_content.unwrap_or(dump_all); let dump_term_history = dump_term_history.unwrap_or(true); + let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all); let args = debug_dump::Args { dump_all, @@ -515,6 +518,7 @@ async fn dump_debug_handler(mut request: Request) -> Result dump_memory, dump_disk_content, dump_term_history, + dump_wal_last_modified, tenant_id, timeline_id, }; diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 46c260901d..6e7da94973 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -539,20 +539,17 @@ async fn remove_segments_from_disk( while let Some(entry) = entries.next_entry().await? { let entry_path = entry.path(); let fname = entry_path.file_name().unwrap(); - - if let Some(fname_str) = fname.to_str() { - /* Ignore files that are not XLOG segments */ - if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) { - continue; - } - let (segno, _) = XLogFromFileName(fname_str, wal_seg_size); - if remove_predicate(segno) { - remove_file(entry_path).await?; - n_removed += 1; - min_removed = min(min_removed, segno); - max_removed = max(max_removed, segno); - REMOVED_WAL_SEGMENTS.inc(); - } + /* Ignore files that are not XLOG segments */ + if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) { + continue; + } + let (segno, _) = XLogFromFileName(fname, wal_seg_size)?; + if remove_predicate(segno) { + remove_file(entry_path).await?; + n_removed += 1; + min_removed = min(min_removed, segno); + max_removed = max(max_removed, segno); + REMOVED_WAL_SEGMENTS.inc(); } } diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 4bf8cfe88f..8ee548bdb0 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -892,6 +892,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): log.info(f"debug_dump before reboot {debug_dump_0}") assert debug_dump_0["timelines_count"] == 1 assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id) + assert debug_dump_0["timelines"][0]["wal_last_modified"] != "" endpoint.safe_psql("create table t(i int)") From 21eeafaaa58326bea8411b5b28db58fa0755e47e Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 19 Sep 2024 14:51:00 +0100 Subject: [PATCH 2/6] pageserver: simple fix for vectored read image layer skip (#9026) ## Problem Different keyspaces may require different floor LSNs in vectored delta layer visits. This patch adds support for such cases. ## Summary of changes Different keyspaces wishing to read the same layer might require different stop lsns (or lsn floor). The start LSN of the read (or the lsn ceil) will always be the same. With this observation, we fix skipping of image layers by indexing the fringe by layer id plus lsn floor. This is very simple, but means that we can visit delta layers twice in certain cases. Still, I think it's very unlikely for any extra merging to have taken place in this case, so perhaps it makes sense to go with the simpler patch. Fixes https://github.com/neondatabase/neon/issues/9012 Alternative to https://github.com/neondatabase/neon/pull/9025 --- pageserver/src/tenant.rs | 146 ++++++++++++++++++++++++- pageserver/src/tenant/storage_layer.rs | 52 +++++---- 2 files changed, 176 insertions(+), 22 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c6f0e48101..14cb6f508d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4164,9 +4164,18 @@ pub(crate) mod harness { let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1)); if records_neon { // For Neon wal records, we can decode without spawning postgres, so do so. - let base_img = base_img.expect("Neon WAL redo requires base image").1; - let mut page = BytesMut::new(); - page.extend_from_slice(&base_img); + let mut page = match (base_img, records.first()) { + (Some((_lsn, img)), _) => { + let mut page = BytesMut::new(); + page.extend_from_slice(&img); + page + } + (_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(), + _ => { + panic!("Neon WAL redo requires base image or will init record"); + } + }; + for (record_lsn, record) in records { apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?; } @@ -8470,4 +8479,135 @@ mod tests { Ok(()) } + + // Regression test for https://github.com/neondatabase/neon/issues/9012 + // Create an image arrangement where we have to read at different LSN ranges + // from a delta layer. This is achieved by overlapping an image layer on top of + // a delta layer. Like so: + // + // A B + // +----------------+ -> delta_layer + // | | ^ lsn + // | =========|-> nested_image_layer | + // | C | | + // +----------------+ | + // ======== -> baseline_image_layer +-------> key + // + // + // When querying the key range [A, B) we need to read at different LSN ranges + // for [A, C) and [C, B). This test checks that the described edge case is handled correctly. + #[tokio::test] + async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?; + let (tenant, ctx) = harness.load().await; + + let will_init_keys = [2, 6]; + fn get_key(id: u32) -> Key { + let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let mut expected_key_values = HashMap::new(); + + let baseline_image_layer_lsn = Lsn(0x10); + let mut baseline_img_layer = Vec::new(); + for i in 0..5 { + let key = get_key(i); + let value = format!("value {i}@{baseline_image_layer_lsn}"); + + let removed = expected_key_values.insert(key, value.clone()); + assert!(removed.is_none()); + + baseline_img_layer.push((key, Bytes::from(value))); + } + + let nested_image_layer_lsn = Lsn(0x50); + let mut nested_img_layer = Vec::new(); + for i in 5..10 { + let key = get_key(i); + let value = format!("value {i}@{nested_image_layer_lsn}"); + + let removed = expected_key_values.insert(key, value.clone()); + assert!(removed.is_none()); + + nested_img_layer.push((key, Bytes::from(value))); + } + + let mut delta_layer_spec = Vec::default(); + let delta_layer_start_lsn = Lsn(0x20); + let mut delta_layer_end_lsn = delta_layer_start_lsn; + + for i in 0..10 { + let key = get_key(i); + let key_in_nested = nested_img_layer + .iter() + .any(|(key_with_img, _)| *key_with_img == key); + let lsn = { + if key_in_nested { + Lsn(nested_image_layer_lsn.0 + 0x10) + } else { + delta_layer_start_lsn + } + }; + + let will_init = will_init_keys.contains(&i); + if will_init { + delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init()))); + + expected_key_values.insert(key, "".to_string()); + } else { + let delta = format!("@{lsn}"); + delta_layer_spec.push(( + key, + lsn, + Value::WalRecord(NeonWalRecord::wal_append(&delta)), + )); + + expected_key_values + .get_mut(&key) + .expect("An image exists for each key") + .push_str(delta.as_str()); + } + delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn); + } + + delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1); + + assert!( + nested_image_layer_lsn > delta_layer_start_lsn + && nested_image_layer_lsn < delta_layer_end_lsn + ); + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + baseline_image_layer_lsn, + DEFAULT_PG_VERSION, + &ctx, + vec![DeltaLayerTestDesc::new_with_inferred_key_range( + delta_layer_start_lsn..delta_layer_end_lsn, + delta_layer_spec, + )], // delta layers + vec![ + (baseline_image_layer_lsn, baseline_img_layer), + (nested_image_layer_lsn, nested_img_layer), + ], // image layers + delta_layer_end_lsn, + ) + .await?; + + let keyspace = KeySpace::single(get_key(0)..get_key(10)); + let results = tline + .get_vectored(keyspace, delta_layer_end_lsn, &ctx) + .await + .expect("No vectored errors"); + for (key, res) in results { + let value = res.expect("No key errors"); + let expected_value = expected_key_values.remove(&key).expect("No unknown keys"); + assert_eq!(value, Bytes::from(expected_value)); + } + + Ok(()) + } } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index dac6b2f893..cd252aa371 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -276,6 +276,16 @@ pub(crate) enum LayerId { InMemoryLayerId(InMemoryLayerFileId), } +/// Uniquely identify a layer visit by the layer +/// and LSN floor (or start LSN) of the reads. +/// The layer itself is not enough since we may +/// have different LSN lower bounds for delta layer reads. +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +struct LayerToVisitId { + layer_id: LayerId, + lsn_floor: Lsn, +} + /// Layer wrapper for the read path. Note that it is valid /// to use these layers even after external operations have /// been performed on them (compaction, freeze, etc.). @@ -287,9 +297,9 @@ pub(crate) enum ReadableLayer { /// A partial description of a read to be done. #[derive(Debug, Clone)] -struct ReadDesc { +struct LayerVisit { /// An id used to resolve the readable layer within the fringe - layer_id: LayerId, + layer_to_visit_id: LayerToVisitId, /// Lsn range for the read, used for selecting the next read lsn_range: Range, } @@ -303,12 +313,12 @@ struct ReadDesc { /// a two layer indexing scheme. #[derive(Debug)] pub(crate) struct LayerFringe { - planned_reads_by_lsn: BinaryHeap, - layers: HashMap, + planned_visits_by_lsn: BinaryHeap, + visit_reads: HashMap, } #[derive(Debug)] -struct LayerKeyspace { +struct LayerVisitReads { layer: ReadableLayer, target_keyspace: KeySpaceRandomAccum, } @@ -316,23 +326,23 @@ struct LayerKeyspace { impl LayerFringe { pub(crate) fn new() -> Self { LayerFringe { - planned_reads_by_lsn: BinaryHeap::new(), - layers: HashMap::new(), + planned_visits_by_lsn: BinaryHeap::new(), + visit_reads: HashMap::new(), } } pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range)> { - let read_desc = match self.planned_reads_by_lsn.pop() { + let read_desc = match self.planned_visits_by_lsn.pop() { Some(desc) => desc, None => return None, }; - let removed = self.layers.remove_entry(&read_desc.layer_id); + let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id); match removed { Some(( _, - LayerKeyspace { + LayerVisitReads { layer, mut target_keyspace, }, @@ -351,20 +361,24 @@ impl LayerFringe { keyspace: KeySpace, lsn_range: Range, ) { - let layer_id = layer.id(); - let entry = self.layers.entry(layer_id.clone()); + let layer_to_visit_id = LayerToVisitId { + layer_id: layer.id(), + lsn_floor: lsn_range.start, + }; + + let entry = self.visit_reads.entry(layer_to_visit_id.clone()); match entry { Entry::Occupied(mut entry) => { entry.get_mut().target_keyspace.add_keyspace(keyspace); } Entry::Vacant(entry) => { - self.planned_reads_by_lsn.push(ReadDesc { + self.planned_visits_by_lsn.push(LayerVisit { lsn_range, - layer_id: layer_id.clone(), + layer_to_visit_id: layer_to_visit_id.clone(), }); let mut accum = KeySpaceRandomAccum::new(); accum.add_keyspace(keyspace); - entry.insert(LayerKeyspace { + entry.insert(LayerVisitReads { layer, target_keyspace: accum, }); @@ -379,7 +393,7 @@ impl Default for LayerFringe { } } -impl Ord for ReadDesc { +impl Ord for LayerVisit { fn cmp(&self, other: &Self) -> Ordering { let ord = self.lsn_range.end.cmp(&other.lsn_range.end); if ord == std::cmp::Ordering::Equal { @@ -390,19 +404,19 @@ impl Ord for ReadDesc { } } -impl PartialOrd for ReadDesc { +impl PartialOrd for LayerVisit { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl PartialEq for ReadDesc { +impl PartialEq for LayerVisit { fn eq(&self, other: &Self) -> bool { self.lsn_range == other.lsn_range } } -impl Eq for ReadDesc {} +impl Eq for LayerVisit {} impl ReadableLayer { pub(crate) fn id(&self) -> LayerId { From ff9f065c4386496193d62f1ff1fadd28cce92910 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 19 Sep 2024 10:43:12 -0400 Subject: [PATCH 3/6] impr(pageserver): log image layer creation (#9050) https://github.com/neondatabase/neon/pull/9028 changed the image layer creation log into trace level. However, I personally find logging image layer creation useful when reading the logs -- it makes it clear that the image layer creation is happening and gives a clear idea of the progress. Therefore, I propose to continue logging them for create_image_layers set of functions. ## Summary of changes * Add info logging for all image layers created in legacy compaction. * Add info logging for all layers creation in testing functions. Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f66491d962..a06cea2c66 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4015,6 +4015,7 @@ impl Timeline { // partition, so flush it to disk. let (desc, path) = image_layer_writer.finish(ctx).await?; let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; + info!("created image layer for rel {}", image_layer.local_path()); Ok(ImageLayerCreationOutcome { image: Some(image_layer), next_start_key: img_range.end, @@ -4104,6 +4105,10 @@ impl Timeline { // partition, so flush it to disk. let (desc, path) = image_layer_writer.finish(ctx).await?; let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; + info!( + "created image layer for metadata {}", + image_layer.local_path() + ); Ok(ImageLayerCreationOutcome { image: Some(image_layer), next_start_key: img_range.end, @@ -5407,7 +5412,7 @@ impl Timeline { } let (desc, path) = image_layer_writer.finish(ctx).await?; let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?; - + info!("force created image layer {}", image_layer.local_path()); { let mut guard = self.layers.write().await; guard.open_mut().unwrap().force_insert_layer(image_layer); @@ -5486,7 +5491,7 @@ impl Timeline { } let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?; let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?; - + info!("force created delta layer {}", delta_layer.local_path()); { let mut guard = self.layers.write().await; guard.open_mut().unwrap().force_insert_layer(delta_layer); From 0a1ca7670cbaddb2e83b4e41142b8a7f5fcf0aef Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 19 Sep 2024 16:09:30 +0100 Subject: [PATCH 4/6] proxy: remove auth info from http conn info & fixup jwt api trait (#9047) misc changes split out from #8855 - **allow cloning the request context in a read-only fashion for background tasks** - **propagate endpoint and request context through the jwk cache** - **only allow password based auth for md5 during testing** - **remove auth info from conn info** --- proxy/src/auth/backend.rs | 15 ++++------ proxy/src/auth/backend/hacks.rs | 14 ++++----- proxy/src/auth/backend/jwt.rs | 43 ++++++++++++++++++++++----- proxy/src/auth/backend/local.rs | 10 +++++-- proxy/src/console/provider.rs | 1 + proxy/src/context.rs | 34 +++++++++++++++++++++ proxy/src/metrics.rs | 20 +++++++++++++ proxy/src/serverless/backend.rs | 9 +----- proxy/src/serverless/conn_pool.rs | 9 ++++-- proxy/src/serverless/sql_over_http.rs | 21 ++++++------- test_runner/fixtures/neon_fixtures.py | 3 -- 11 files changed, 127 insertions(+), 52 deletions(-) diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 5561c9c56d..5bc2f2ff65 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -163,6 +163,7 @@ impl ComputeUserInfo { } pub(crate) enum ComputeCredentialKeys { + #[cfg(any(test, feature = "testing"))] Password(Vec), AuthKeys(AuthKeys), None, @@ -293,16 +294,10 @@ async fn auth_quirks( // We now expect to see a very specific payload in the place of password. let (info, unauthenticated_password) = match user_info.try_into() { Err(info) => { - let res = hacks::password_hack_no_authentication(ctx, info, client).await?; - - ctx.set_endpoint_id(res.info.endpoint.clone()); - let password = match res.keys { - ComputeCredentialKeys::Password(p) => p, - ComputeCredentialKeys::AuthKeys(_) | ComputeCredentialKeys::None => { - unreachable!("password hack should return a password") - } - }; - (res.info, Some(password)) + let (info, password) = + hacks::password_hack_no_authentication(ctx, info, client).await?; + ctx.set_endpoint_id(info.endpoint.clone()); + (info, Some(password)) } Ok(info) => (info, None), }; diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index e9019ce2cf..15123a2623 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -1,6 +1,4 @@ -use super::{ - ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint, -}; +use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint}; use crate::{ auth::{self, AuthFlow}, config::AuthenticationConfig, @@ -63,7 +61,7 @@ pub(crate) async fn password_hack_no_authentication( ctx: &RequestMonitoring, info: ComputeUserInfoNoEndpoint, client: &mut stream::PqStream>, -) -> auth::Result { +) -> auth::Result<(ComputeUserInfo, Vec)> { warn!("project not specified, resorting to the password hack auth flow"); ctx.set_auth_method(crate::context::AuthMethod::Cleartext); @@ -79,12 +77,12 @@ pub(crate) async fn password_hack_no_authentication( info!(project = &*payload.endpoint, "received missing parameter"); // Report tentative success; compute node will check the password anyway. - Ok(ComputeCredentials { - info: ComputeUserInfo { + Ok(( + ComputeUserInfo { user: info.user, options: info.options, endpoint: payload.endpoint, }, - keys: ComputeCredentialKeys::Password(payload.password), - }) + payload.password, + )) } diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index 1f44e4af5d..94e5999a5f 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -25,6 +25,8 @@ const MAX_JWK_BODY_SIZE: usize = 64 * 1024; pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static { fn fetch_auth_rules( &self, + ctx: &RequestMonitoring, + endpoint: EndpointId, role_name: RoleName, ) -> impl Future>> + Send; } @@ -101,7 +103,9 @@ impl JwkCacheEntryLock { async fn renew_jwks( &self, _permit: JwkRenewalPermit<'_>, + ctx: &RequestMonitoring, client: &reqwest::Client, + endpoint: EndpointId, role_name: RoleName, auth_rules: &F, ) -> anyhow::Result> { @@ -115,7 +119,9 @@ impl JwkCacheEntryLock { } } - let rules = auth_rules.fetch_auth_rules(role_name).await?; + let rules = auth_rules + .fetch_auth_rules(ctx, endpoint, role_name) + .await?; let mut key_sets = ahash::HashMap::with_capacity_and_hasher(rules.len(), ahash::RandomState::new()); // TODO(conrad): run concurrently @@ -166,6 +172,7 @@ impl JwkCacheEntryLock { self: &Arc, ctx: &RequestMonitoring, client: &reqwest::Client, + endpoint: EndpointId, role_name: RoleName, fetch: &F, ) -> Result, anyhow::Error> { @@ -176,7 +183,9 @@ impl JwkCacheEntryLock { let Some(cached) = guard else { let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Compute); let permit = self.acquire_permit().await; - return self.renew_jwks(permit, client, role_name, fetch).await; + return self + .renew_jwks(permit, ctx, client, endpoint, role_name, fetch) + .await; }; let last_update = now.duration_since(cached.last_retrieved); @@ -187,7 +196,9 @@ impl JwkCacheEntryLock { let permit = self.acquire_permit().await; // it's been too long since we checked the keys. wait for them to update. - return self.renew_jwks(permit, client, role_name, fetch).await; + return self + .renew_jwks(permit, ctx, client, endpoint, role_name, fetch) + .await; } // every 5 minutes we should spawn a job to eagerly update the token. @@ -198,8 +209,12 @@ impl JwkCacheEntryLock { let entry = self.clone(); let client = client.clone(); let fetch = fetch.clone(); + let ctx = ctx.clone(); tokio::spawn(async move { - if let Err(e) = entry.renew_jwks(permit, &client, role_name, &fetch).await { + if let Err(e) = entry + .renew_jwks(permit, &ctx, &client, endpoint, role_name, &fetch) + .await + { tracing::warn!(error=?e, "could not fetch JWKs in background job"); } }); @@ -216,6 +231,7 @@ impl JwkCacheEntryLock { ctx: &RequestMonitoring, jwt: &str, client: &reqwest::Client, + endpoint: EndpointId, role_name: RoleName, fetch: &F, ) -> Result<(), anyhow::Error> { @@ -242,7 +258,7 @@ impl JwkCacheEntryLock { let kid = header.key_id.context("missing key id")?; let mut guard = self - .get_or_update_jwk_cache(ctx, client, role_name.clone(), fetch) + .get_or_update_jwk_cache(ctx, client, endpoint.clone(), role_name.clone(), fetch) .await?; // get the key from the JWKs if possible. If not, wait for the keys to update. @@ -254,7 +270,14 @@ impl JwkCacheEntryLock { let permit = self.acquire_permit().await; guard = self - .renew_jwks(permit, client, role_name.clone(), fetch) + .renew_jwks( + permit, + ctx, + client, + endpoint.clone(), + role_name.clone(), + fetch, + ) .await?; } _ => { @@ -318,7 +341,7 @@ impl JwkCache { jwt: &str, ) -> Result<(), anyhow::Error> { // try with just a read lock first - let key = (endpoint, role_name.clone()); + let key = (endpoint.clone(), role_name.clone()); let entry = self.map.get(&key).as_deref().map(Arc::clone); let entry = entry.unwrap_or_else(|| { // acquire a write lock after to insert. @@ -327,7 +350,7 @@ impl JwkCache { }); entry - .check_jwt(ctx, jwt, &self.client, role_name, fetch) + .check_jwt(ctx, jwt, &self.client, endpoint, role_name, fetch) .await } } @@ -688,6 +711,8 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL impl FetchAuthRules for Fetch { async fn fetch_auth_rules( &self, + _ctx: &RequestMonitoring, + _endpoint: EndpointId, _role_name: RoleName, ) -> anyhow::Result> { Ok(vec![ @@ -706,6 +731,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL } let role_name = RoleName::from("user"); + let endpoint = EndpointId::from("ep"); let jwk_cache = Arc::new(JwkCacheEntryLock::default()); @@ -715,6 +741,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL &RequestMonitoring::test(), &token, &client, + endpoint.clone(), role_name.clone(), &Fetch(addr), ) diff --git a/proxy/src/auth/backend/local.rs b/proxy/src/auth/backend/local.rs index 8124f568cf..2ff2ca00f0 100644 --- a/proxy/src/auth/backend/local.rs +++ b/proxy/src/auth/backend/local.rs @@ -9,8 +9,9 @@ use crate::{ messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}, NodeInfo, }, + context::RequestMonitoring, intern::{BranchIdInt, BranchIdTag, EndpointIdTag, InternId, ProjectIdInt, ProjectIdTag}, - RoleName, + EndpointId, RoleName, }; use super::jwt::{AuthRule, FetchAuthRules, JwkCache}; @@ -57,7 +58,12 @@ pub struct JwksRoleSettings { } impl FetchAuthRules for StaticAuthRules { - async fn fetch_auth_rules(&self, role_name: RoleName) -> anyhow::Result> { + async fn fetch_auth_rules( + &self, + _ctx: &RequestMonitoring, + _endpoint: EndpointId, + role_name: RoleName, + ) -> anyhow::Result> { let mappings = JWKS_ROLE_MAP.load(); let role_mappings = mappings .as_deref() diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 12a6e2f12a..16e8da605b 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -303,6 +303,7 @@ impl NodeInfo { pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) { match keys { + #[cfg(any(test, feature = "testing"))] ComputeCredentialKeys::Password(password) => self.config.password(password), ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys), ComputeCredentialKeys::None => &mut self.config, diff --git a/proxy/src/context.rs b/proxy/src/context.rs index c013218ad9..021659e175 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -79,6 +79,40 @@ pub(crate) enum AuthMethod { Cleartext, } +impl Clone for RequestMonitoring { + fn clone(&self) -> Self { + let inner = self.0.try_lock().expect("should not deadlock"); + let new = RequestMonitoringInner { + peer_addr: inner.peer_addr, + session_id: inner.session_id, + protocol: inner.protocol, + first_packet: inner.first_packet, + region: inner.region, + span: info_span!("background_task"), + + project: inner.project, + branch: inner.branch, + endpoint_id: inner.endpoint_id.clone(), + dbname: inner.dbname.clone(), + user: inner.user.clone(), + application: inner.application.clone(), + error_kind: inner.error_kind, + auth_method: inner.auth_method.clone(), + success: inner.success, + rejected: inner.rejected, + cold_start_info: inner.cold_start_info, + pg_options: inner.pg_options.clone(), + + sender: None, + disconnect_sender: None, + latency_timer: LatencyTimer::noop(inner.protocol), + disconnect_timestamp: inner.disconnect_timestamp, + }; + + Self(TryLock::new(new)) + } +} + impl RequestMonitoring { pub fn new( session_id: Uuid, diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 2da7eac580..c2567e083a 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -397,6 +397,8 @@ pub struct LatencyTimer { protocol: Protocol, cold_start_info: ColdStartInfo, outcome: ConnectOutcome, + + skip_reporting: bool, } impl LatencyTimer { @@ -409,6 +411,20 @@ impl LatencyTimer { cold_start_info: ColdStartInfo::Unknown, // assume failed unless otherwise specified outcome: ConnectOutcome::Failed, + skip_reporting: false, + } + } + + pub(crate) fn noop(protocol: Protocol) -> Self { + Self { + start: time::Instant::now(), + stop: None, + accumulated: Accumulated::default(), + protocol, + cold_start_info: ColdStartInfo::Unknown, + // assume failed unless otherwise specified + outcome: ConnectOutcome::Failed, + skip_reporting: true, } } @@ -443,6 +459,10 @@ pub enum ConnectOutcome { impl Drop for LatencyTimer { fn drop(&mut self) { + if self.skip_reporting { + return; + } + let duration = self .stop .unwrap_or_else(time::Instant::now) diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index d163878528..aa236907db 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -27,7 +27,7 @@ use crate::{ Host, }; -use super::conn_pool::{poll_client, AuthData, Client, ConnInfo, GlobalConnPool}; +use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool}; pub(crate) struct PoolingBackend { pub(crate) pool: Arc>, @@ -274,13 +274,6 @@ impl ConnectMechanism for TokioMechanism { .dbname(&self.conn_info.dbname) .connect_timeout(timeout); - match &self.conn_info.auth { - AuthData::Jwt(_) => {} - AuthData::Password(pw) => { - config.password(pw); - } - } - let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute); let res = config.connect(tokio_postgres::NoTls).await; drop(pause); diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 6c32d5df0e..a850ecd2be 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -29,11 +29,16 @@ use tracing::{info, info_span, Instrument}; use super::backend::HttpConnError; +#[derive(Debug, Clone)] +pub(crate) struct ConnInfoWithAuth { + pub(crate) conn_info: ConnInfo, + pub(crate) auth: AuthData, +} + #[derive(Debug, Clone)] pub(crate) struct ConnInfo { pub(crate) user_info: ComputeUserInfo, pub(crate) dbname: DbName, - pub(crate) auth: AuthData, } #[derive(Debug, Clone)] @@ -787,7 +792,6 @@ mod tests { options: NeonOptions::default(), }, dbname: "dbname".into(), - auth: AuthData::Password("password".as_bytes().into()), }; let ep_pool = Arc::downgrade( &pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()), @@ -845,7 +849,6 @@ mod tests { options: NeonOptions::default(), }, dbname: "dbname".into(), - auth: AuthData::Password("password".as_bytes().into()), }; let ep_pool = Arc::downgrade( &pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()), diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 06e540d149..7c78439a0a 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -60,6 +60,7 @@ use super::backend::PoolingBackend; use super::conn_pool::AuthData; use super::conn_pool::Client; use super::conn_pool::ConnInfo; +use super::conn_pool::ConnInfoWithAuth; use super::http_util::json_response; use super::json::json_to_pg_text; use super::json::pg_text_row_to_json; @@ -148,7 +149,7 @@ fn get_conn_info( ctx: &RequestMonitoring, headers: &HeaderMap, tls: Option<&TlsConfig>, -) -> Result { +) -> Result { // HTTP only uses cleartext (for now and likely always) ctx.set_auth_method(crate::context::AuthMethod::Cleartext); @@ -235,11 +236,8 @@ fn get_conn_info( options: options.unwrap_or_default(), }; - Ok(ConnInfo { - user_info, - dbname, - auth, - }) + let conn_info = ConnInfo { user_info, dbname }; + Ok(ConnInfoWithAuth { conn_info, auth }) } // TODO: return different http error codes @@ -523,7 +521,10 @@ async fn handle_inner( // TLS config should be there. let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref())?; - info!(user = conn_info.user_info.user.as_str(), "credentials"); + info!( + user = conn_info.conn_info.user_info.user.as_str(), + "credentials" + ); // Allow connection pooling only if explicitly requested // or if we have decided that http pool is no longer opt-in @@ -568,20 +569,20 @@ async fn handle_inner( .authenticate_with_password( ctx, &config.authentication_config, - &conn_info.user_info, + &conn_info.conn_info.user_info, pw, ) .await? } AuthData::Jwt(jwt) => { backend - .authenticate_with_jwt(ctx, &conn_info.user_info, jwt) + .authenticate_with_jwt(ctx, &conn_info.conn_info.user_info, jwt) .await? } }; let client = backend - .connect_to_compute(ctx, conn_info, keys, !allow_pool) + .connect_to_compute(ctx, conn_info.conn_info, keys, !allow_pool) .await?; // not strictly necessary to mark success here, // but it's just insurance for if we forget it somewhere else diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index cbbb162cc6..fc83cf3f7c 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3863,9 +3863,6 @@ def static_proxy( dbname = vanilla_pg.default_options["dbname"] auth_endpoint = f"postgres://proxy:password@{host}:{port}/{dbname}" - # require password for 'http_auth' user - vanilla_pg.edit_hba([f"host {dbname} http_auth {host} password"]) - # For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql` vanilla_pg.start() vanilla_pg.safe_psql("create user proxy with login superuser password 'password'") From 1708743e786cde41eb1bba51d4e5267895d8227d Mon Sep 17 00:00:00 2001 From: Yuchen Liang <70461588+yliang412@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:27:10 -0400 Subject: [PATCH 5/6] pageserver: wait for lsn lease duration after transition into AttachedSingle (#9024) Part of #7497, closes https://github.com/neondatabase/neon/issues/8890. ## Problem Since leases are in-memory objects, we need to take special care of them after pageserver restarts and while doing a live migration. The approach we took for pageserver restart is to wait for at least lease duration before doing first GC. We want to do the same for live migration. Since we do not do any GC when a tenant is in `AttachedStale` or `AttachedMulti` mode, only the transition from `AttachedMulti` to `AttachedSingle` requires this treatment. ## Summary of changes - Added `lsn_lease_deadline` field in `GcBlock::reasons`: the tenant is temporarily blocked from GC until we reach the deadline. This information does not persist to S3. - In `GCBlock::start`, skip the GC iteration if we are blocked by the lsn lease deadline. - In `TenantManager::upsert_location`, set the lsn_lease_deadline to `Instant::now() + lsn_lease_length` so the granted leases have a chance to be renewed before we run GC for the first time after transitioned from AttachedMulti to AttachedSingle. Signed-off-by: Yuchen Liang Co-authored-by: Joonas Koivunen --- pageserver/src/tenant/gc_block.rs | 83 ++++++++++++++----- pageserver/src/tenant/mgr.rs | 6 ++ pageserver/src/tenant/tasks.rs | 18 +--- test_runner/regress/test_branch_and_gc.py | 1 + test_runner/regress/test_branch_behind.py | 4 +- test_runner/regress/test_branching.py | 2 +- test_runner/regress/test_compaction.py | 1 + test_runner/regress/test_hot_standby.py | 2 +- test_runner/regress/test_layer_eviction.py | 1 + .../regress/test_pageserver_generations.py | 1 + test_runner/regress/test_remote_storage.py | 2 + test_runner/regress/test_sharding.py | 1 + .../regress/test_storage_controller.py | 2 +- test_runner/regress/test_storage_scrubber.py | 1 + test_runner/regress/test_tenant_detach.py | 4 +- .../regress/test_timeline_gc_blocking.py | 5 +- 16 files changed, 91 insertions(+), 43 deletions(-) diff --git a/pageserver/src/tenant/gc_block.rs b/pageserver/src/tenant/gc_block.rs index 8b41ba1746..1271d25b76 100644 --- a/pageserver/src/tenant/gc_block.rs +++ b/pageserver/src/tenant/gc_block.rs @@ -1,11 +1,29 @@ -use std::collections::HashMap; - -use utils::id::TimelineId; +use std::{collections::HashMap, time::Duration}; use super::remote_timeline_client::index::GcBlockingReason; +use tokio::time::Instant; +use utils::id::TimelineId; -type Storage = HashMap>; +type TimelinesBlocked = HashMap>; +#[derive(Default)] +struct Storage { + timelines_blocked: TimelinesBlocked, + /// The deadline before which we are blocked from GC so that + /// leases have a chance to be renewed. + lsn_lease_deadline: Option, +} + +impl Storage { + fn is_blocked_by_lsn_lease_deadline(&self) -> bool { + self.lsn_lease_deadline + .map(|d| Instant::now() < d) + .unwrap_or(false) + } +} + +/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc +/// blocking. #[derive(Default)] pub(crate) struct GcBlock { /// The timelines which have current reasons to block gc. @@ -13,6 +31,12 @@ pub(crate) struct GcBlock { /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`. reasons: std::sync::Mutex, + + /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this. + /// + /// Do not add any more features taking and forbidding taking this lock. It should be + /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`] + /// synchronizes with gc attempts by locking and unlocking this mutex. blocking: tokio::sync::Mutex<()>, } @@ -42,6 +66,20 @@ impl GcBlock { } } + /// Sets a deadline before which we cannot proceed to GC due to lsn lease. + /// + /// We do this as the leases mapping are not persisted to disk. By delaying GC by lease + /// length, we guarantee that all the leases we granted before will have a chance to renew + /// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle. + pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) { + let deadline = Instant::now() + lsn_lease_length; + let mut g = self.reasons.lock().unwrap(); + g.lsn_lease_deadline = Some(deadline); + } + + /// Describe the current gc blocking reasons. + /// + /// TODO: make this json serializable. pub(crate) fn summary(&self) -> Option { let g = self.reasons.lock().unwrap(); @@ -64,7 +102,7 @@ impl GcBlock { ) -> anyhow::Result { let (added, uploaded) = { let mut g = self.reasons.lock().unwrap(); - let set = g.entry(timeline.timeline_id).or_default(); + let set = g.timelines_blocked.entry(timeline.timeline_id).or_default(); let added = set.insert(reason); // LOCK ORDER: intentionally hold the lock, see self.reasons. @@ -95,7 +133,7 @@ impl GcBlock { let (remaining_blocks, uploaded) = { let mut g = self.reasons.lock().unwrap(); - match g.entry(timeline.timeline_id) { + match g.timelines_blocked.entry(timeline.timeline_id) { Entry::Occupied(mut oe) => { let set = oe.get_mut(); set.remove(reason); @@ -109,7 +147,7 @@ impl GcBlock { } } - let remaining_blocks = g.len(); + let remaining_blocks = g.timelines_blocked.len(); // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons let uploaded = timeline @@ -134,11 +172,11 @@ impl GcBlock { pub(crate) fn before_delete(&self, timeline: &super::Timeline) { let unblocked = { let mut g = self.reasons.lock().unwrap(); - if g.is_empty() { + if g.timelines_blocked.is_empty() { return; } - g.remove(&timeline.timeline_id); + g.timelines_blocked.remove(&timeline.timeline_id); BlockingReasons::clean_and_summarize(g).is_none() }; @@ -149,10 +187,11 @@ impl GcBlock { } /// Initialize with the non-deleted timelines of this tenant. - pub(crate) fn set_scanned(&self, scanned: Storage) { + pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) { let mut g = self.reasons.lock().unwrap(); - assert!(g.is_empty()); - g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); + assert!(g.timelines_blocked.is_empty()); + g.timelines_blocked + .extend(scanned.into_iter().filter(|(_, v)| !v.is_empty())); if let Some(reasons) = BlockingReasons::clean_and_summarize(g) { tracing::info!(summary=?reasons, "initialized with gc blocked"); @@ -166,6 +205,7 @@ pub(super) struct Guard<'a> { #[derive(Debug)] pub(crate) struct BlockingReasons { + tenant_blocked_by_lsn_lease_deadline: bool, timelines: usize, reasons: enumset::EnumSet, } @@ -174,8 +214,8 @@ impl std::fmt::Display for BlockingReasons { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "{} timelines block for {:?}", - self.timelines, self.reasons + "tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}", + self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons ) } } @@ -183,13 +223,15 @@ impl std::fmt::Display for BlockingReasons { impl BlockingReasons { fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option { let mut reasons = enumset::EnumSet::empty(); - g.retain(|_key, value| { + g.timelines_blocked.retain(|_key, value| { reasons = reasons.union(*value); !value.is_empty() }); - if !g.is_empty() { + let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline(); + if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline { Some(BlockingReasons { - timelines: g.len(), + tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline, + timelines: g.timelines_blocked.len(), reasons, }) } else { @@ -198,14 +240,17 @@ impl BlockingReasons { } fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option { - if g.is_empty() { + let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline(); + if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline { None } else { let reasons = g + .timelines_blocked .values() .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next)); Some(BlockingReasons { - timelines: g.len(), + tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline, + timelines: g.timelines_blocked.len(), reasons, }) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 2104f41531..1e7c1e10a5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -949,6 +949,12 @@ impl TenantManager { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { match attach_conf.generation.cmp(&tenant.generation) { Ordering::Equal => { + if attach_conf.attach_mode == AttachmentMode::Single { + tenant + .gc_block + .set_lsn_lease_deadline(tenant.get_lsn_lease_length()); + } + // A transition from Attached to Attached in the same generation, we may // take our fast path and just provide the updated configuration // to the tenant. diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 478e9bb4f0..57f0123d8f 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download); let mut first = true; + tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length()); loop { tokio::select! { _ = cancel.cancelled() => { @@ -363,7 +364,6 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { first = false; let delays = async { - delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?; random_init_delay(period, &cancel).await?; Ok::<_, Cancelled>(()) }; @@ -538,28 +538,12 @@ pub(crate) async fn random_init_delay( let mut rng = rand::thread_rng(); rng.gen_range(Duration::ZERO..=period) }; - match tokio::time::timeout(d, cancel.cancelled()).await { Ok(_) => Err(Cancelled), Err(_) => Ok(()), } } -/// Delays GC by defaul lease length at restart. -/// -/// We do this as the leases mapping are not persisted to disk. By delaying GC by default -/// length, we gurantees that all the leases we granted before the restart will expire -/// when we run GC for the first time after the restart. -pub(crate) async fn delay_by_lease_length( - length: Duration, - cancel: &CancellationToken, -) -> Result<(), Cancelled> { - match tokio::time::timeout(length, cancel.cancelled()).await { - Ok(_) => Err(Cancelled), - Err(_) => Ok(()), - } -} - struct Iteration { started_at: Instant, period: Duration, diff --git a/test_runner/regress/test_branch_and_gc.py b/test_runner/regress/test_branch_and_gc.py index f2e3855c12..d7c4cf059a 100644 --- a/test_runner/regress/test_branch_and_gc.py +++ b/test_runner/regress/test_branch_and_gc.py @@ -142,6 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): "image_creation_threshold": "1", # set PITR interval to be small, so we can do GC "pitr_interval": "0 s", + "lsn_lease_length": "0s", } ) diff --git a/test_runner/regress/test_branch_behind.py b/test_runner/regress/test_branch_behind.py index 0a5336f5a2..2bf7041cf1 100644 --- a/test_runner/regress/test_branch_behind.py +++ b/test_runner/regress/test_branch_behind.py @@ -11,7 +11,9 @@ from fixtures.utils import print_gc_result, query_scalar # def test_branch_behind(neon_env_builder: NeonEnvBuilder): # Disable pitr, because here we want to test branch creation after GC - env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"}) + env = neon_env_builder.init_start( + initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"} + ) error_regexes = [ ".*invalid branch start lsn.*", diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 1729e2fc98..3d5c34a595 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -419,7 +419,7 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder): def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) client = env.pageserver.http_client() diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index be787e0642..cb34551b53 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -240,6 +240,7 @@ def test_uploads_and_deletions( "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", "compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}), + "lsn_lease_length": "0s", } env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index d94704012f..35e0c0decb 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep): # Without hs feedback enabled we'd see 'User query might have needed to see row # versions that must be removed.' errors. def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) agressive_vacuum_conf = [ "log_autovacuum_min_duration = 0", "autovacuum_naptime = 10s", diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py index 193149ea03..97093ea535 100644 --- a/test_runner/regress/test_layer_eviction.py +++ b/test_runner/regress/test_layer_eviction.py @@ -173,6 +173,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder): # "image_creation_threshold": set at runtime "compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers "image_layer_creation_check_threshold": "0", # always check if a new image layer can be created + "lsn_lease_length": "0s", } def tenant_update_config(changes): diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index c923713432..519994f774 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -53,6 +53,7 @@ TENANT_CONF = { # create image layers eagerly, so that GC can remove some layers "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", + "lsn_lease_length": "0s", } diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 2e5260ca78..0a57fc9605 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -244,6 +244,7 @@ def test_remote_storage_upload_queue_retries( # create image layers eagerly, so that GC can remove some layers "image_creation_threshold": "1", "image_layer_creation_check_threshold": "0", + "lsn_lease_length": "0s", } ) @@ -391,6 +392,7 @@ def test_remote_timeline_client_calls_started_metric( # disable background compaction and GC. We invoke it manually when we want it to happen. "gc_period": "0s", "compaction_period": "0s", + "lsn_lease_length": "0s", } ) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 4a84dca399..1eb33b2d39 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -200,6 +200,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: # Disable automatic creation of image layers, as we will create them explicitly when we want them "image_creation_threshold": 9999, "image_layer_creation_check_threshold": 0, + "lsn_lease_length": "0s", } neon_env_builder.storage_controller_config = { diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index dc90a6e9a0..4106efd4f9 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -485,7 +485,7 @@ def test_storage_controller_compute_hook( httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler) # Start running - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) # Initial notification from tenant creation assert len(notifications) == 1 diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py index 848e214c5e..b6c19f03f6 100644 --- a/test_runner/regress/test_storage_scrubber.py +++ b/test_runner/regress/test_storage_scrubber.py @@ -204,6 +204,7 @@ def test_scrubber_physical_gc_ancestors( # No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas # and makes them GC'able "pitr_interval": "0s", + "lsn_lease_length": "0s", }, ) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index b165588636..e7c6d5a4c3 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -266,13 +266,13 @@ def test_tenant_reattach_while_busy( def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"}) pageserver_http = env.pageserver.http_client() env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS) # create new nenant - tenant_id, timeline_id = env.neon_cli.create_tenant() + tenant_id, timeline_id = env.initial_tenant, env.initial_timeline # assert tenant exists on disk assert env.pageserver.tenant_dir(tenant_id).exists() diff --git a/test_runner/regress/test_timeline_gc_blocking.py b/test_runner/regress/test_timeline_gc_blocking.py index ddfe9b911f..765c72cf2a 100644 --- a/test_runner/regress/test_timeline_gc_blocking.py +++ b/test_runner/regress/test_timeline_gc_blocking.py @@ -45,7 +45,10 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool tenant_after = http.tenant_status(env.initial_tenant) assert tenant_before != tenant_after gc_blocking = tenant_after["gc_blocking"] - assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }" + assert ( + gc_blocking + == "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }" + ) wait_for_another_gc_round() pss.assert_log_contains(gc_skipped_line) From d0cbfda15c916ee75066919940d0c3da714c5b95 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 19 Sep 2024 16:29:28 -0400 Subject: [PATCH 6/6] refactor(pageserver): check layer map valid in one place (#9051) We have 3 places where we implement layer map checks. ## Summary of changes Now we have a single check function being called in all places. --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 31 +++++++++++ pageserver/src/tenant/checks.rs | 55 +++++++++++++++++++ pageserver/src/tenant/timeline.rs | 33 ++---------- pageserver/src/tenant/timeline/compaction.rs | 21 +++----- storage_scrubber/src/checks.rs | 56 ++------------------ 5 files changed, 101 insertions(+), 95 deletions(-) create mode 100644 pageserver/src/tenant/checks.rs diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 14cb6f508d..d699d56075 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -140,6 +140,7 @@ pub mod metadata; pub mod remote_timeline_client; pub mod storage_layer; +pub mod checks; pub mod config; pub mod mgr; pub mod secondary; @@ -1573,6 +1574,9 @@ impl Tenant { image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, end_lsn: Lsn, ) -> anyhow::Result> { + use checks::check_valid_layermap; + use itertools::Itertools; + let tline = self .create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) .await?; @@ -1587,6 +1591,18 @@ impl Tenant { .force_create_image_layer(lsn, images, Some(initdb_lsn), ctx) .await?; } + let layer_names = tline + .layers + .read() + .await + .layer_map() + .unwrap() + .iter_historic_layers() + .map(|layer| layer.layer_name()) + .collect_vec(); + if let Some(err) = check_valid_layermap(&layer_names) { + bail!("invalid layermap: {err}"); + } Ok(tline) } @@ -3197,6 +3213,9 @@ impl Tenant { image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>, end_lsn: Lsn, ) -> anyhow::Result> { + use checks::check_valid_layermap; + use itertools::Itertools; + let tline = self .branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx) .await?; @@ -3217,6 +3236,18 @@ impl Tenant { .force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx) .await?; } + let layer_names = tline + .layers + .read() + .await + .layer_map() + .unwrap() + .iter_historic_layers() + .map(|layer| layer.layer_name()) + .collect_vec(); + if let Some(err) = check_valid_layermap(&layer_names) { + bail!("invalid layermap: {err}"); + } Ok(tline) } diff --git a/pageserver/src/tenant/checks.rs b/pageserver/src/tenant/checks.rs new file mode 100644 index 0000000000..8eaa8a001c --- /dev/null +++ b/pageserver/src/tenant/checks.rs @@ -0,0 +1,55 @@ +use std::collections::BTreeSet; + +use itertools::Itertools; + +use super::storage_layer::LayerName; + +/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong). +/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example, +/// +/// ```plain +/// | | | | +/// | 1 | | 2 | | 3 | +/// | | | | | | +/// ``` +/// +/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have +/// the same LSN range. +/// +/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example, +/// +/// ```plain +/// | | | 2 | | | +/// | 1 | |-------| | 3 | +/// | | | 4 | | | +/// +/// If layer 2 and 4 contain the same single key, this is also a valid layer map. +pub fn check_valid_layermap(metadata: &[LayerName]) -> Option { + let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?) + let mut all_delta_layers = Vec::new(); + for name in metadata { + if let LayerName::Delta(layer) = name { + if layer.key_range.start.next() != layer.key_range.end { + all_delta_layers.push(layer.clone()); + } + } + } + for layer in &all_delta_layers { + let lsn_range = &layer.lsn_range; + lsn_split_point.insert(lsn_range.start); + lsn_split_point.insert(lsn_range.end); + } + for layer in &all_delta_layers { + let lsn_range = layer.lsn_range.clone(); + let intersects = lsn_split_point.range(lsn_range).collect_vec(); + if intersects.len() > 1 { + let err = format!( + "layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]", + layer, + intersects.into_iter().map(|lsn| lsn.to_string()).join(", ") + ); + return Some(err); + } + } + None +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a06cea2c66..f08f5caf95 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5378,7 +5378,8 @@ impl Timeline { /// Force create an image layer and place it into the layer map. /// /// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`] - /// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run. + /// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are + /// placed into the layer map in one run AND be validated. #[cfg(test)] pub(super) async fn force_create_image_layer( self: &Arc, @@ -5424,7 +5425,8 @@ impl Timeline { /// Force create a delta layer and place it into the layer map. /// /// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`] - /// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run. + /// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are + /// placed into the layer map in one run AND be validated. #[cfg(test)] pub(super) async fn force_create_delta_layer( self: &Arc, @@ -5450,33 +5452,6 @@ impl Timeline { if let Some(check_start_lsn) = check_start_lsn { assert!(deltas.lsn_range.start >= check_start_lsn); } - // check if the delta layer does not violate the LSN invariant, the legacy compaction should always produce a batch of - // layers of the same start/end LSN, and so should the force inserted layer - { - /// Checks if a overlaps with b, assume a/b = [start, end). - pub fn overlaps_with(a: &Range, b: &Range) -> bool { - !(a.end <= b.start || b.end <= a.start) - } - - if deltas.key_range.start.next() != deltas.key_range.end { - let guard = self.layers.read().await; - let mut invalid_layers = - guard.layer_map()?.iter_historic_layers().filter(|layer| { - layer.is_delta() - && overlaps_with(&layer.lsn_range, &deltas.lsn_range) - && layer.lsn_range != deltas.lsn_range - // skip single-key layer files - && layer.key_range.start.next() != layer.key_range.end - }); - if let Some(layer) = invalid_layers.next() { - // If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic - panic!( - "inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}", - deltas.lsn_range.start, deltas.lsn_range.end, layer.lsn_range.start, layer.lsn_range.end - ); - } - } - } let mut delta_layer_writer = DeltaLayerWriter::new( self.conf, self.timeline_id, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index d1f06e3480..d1567b6b39 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -29,6 +29,7 @@ use utils::id::TimelineId; use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}; use crate::page_cache; +use crate::tenant::checks::check_valid_layermap; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::merge_iterator::MergeIterator; use crate::tenant::storage_layer::split_writer::{ @@ -1788,20 +1789,12 @@ impl Timeline { stat.visit_image_layer(desc.file_size()); } } - for layer in &layer_selection { - let desc = layer.layer_desc(); - let key_range = &desc.key_range; - if desc.is_delta() && key_range.start.next() != key_range.end { - let lsn_range = desc.lsn_range.clone(); - let intersects = lsn_split_point.range(lsn_range).collect_vec(); - if intersects.len() > 1 { - bail!( - "cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]", - desc.key(), - intersects.into_iter().map(|lsn| lsn.to_string()).join(", ") - ); - } - } + let layer_names: Vec = layer_selection + .iter() + .map(|layer| layer.layer_desc().layer_name()) + .collect_vec(); + if let Some(err) = check_valid_layermap(&layer_names) { + bail!("cannot run gc-compaction because {}", err); } // The maximum LSN we are processing in this compaction loop let end_lsn = layer_selection diff --git a/storage_scrubber/src/checks.rs b/storage_scrubber/src/checks.rs index 15dfb101b5..de6918b3da 100644 --- a/storage_scrubber/src/checks.rs +++ b/storage_scrubber/src/checks.rs @@ -1,7 +1,8 @@ -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use anyhow::Context; use itertools::Itertools; +use pageserver::tenant::checks::check_valid_layermap; use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver_api::shard::ShardIndex; @@ -48,56 +49,6 @@ impl TimelineAnalysis { } } -/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong). -/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example, -/// -/// ```plain -/// | | | | -/// | 1 | | 2 | | 3 | -/// | | | | | | -/// ``` -/// -/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have -/// the same LSN range. -/// -/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example, -/// -/// ```plain -/// | | | 2 | | | -/// | 1 | |-------| | 3 | -/// | | | 4 | | | -/// -/// If layer 2 and 4 contain the same single key, this is also a valid layer map. -fn check_valid_layermap(metadata: &HashMap) -> Option { - let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?) - let mut all_delta_layers = Vec::new(); - for (name, _) in metadata.iter() { - if let LayerName::Delta(layer) = name { - if layer.key_range.start.next() != layer.key_range.end { - all_delta_layers.push(layer.clone()); - } - } - } - for layer in &all_delta_layers { - let lsn_range = &layer.lsn_range; - lsn_split_point.insert(lsn_range.start); - lsn_split_point.insert(lsn_range.end); - } - for layer in &all_delta_layers { - let lsn_range = layer.lsn_range.clone(); - let intersects = lsn_split_point.range(lsn_range).collect_vec(); - if intersects.len() > 1 { - let err = format!( - "layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]", - layer, - intersects.into_iter().map(|lsn| lsn.to_string()).join(", ") - ); - return Some(err); - } - } - None -} - pub(crate) async fn branch_cleanup_and_check_errors( remote_client: &GenericRemoteStorage, id: &TenantShardTimelineId, @@ -177,7 +128,8 @@ pub(crate) async fn branch_cleanup_and_check_errors( } } - if let Some(err) = check_valid_layermap(&index_part.layer_metadata) { + let layer_names = index_part.layer_metadata.keys().cloned().collect_vec(); + if let Some(err) = check_valid_layermap(&layer_names) { result.errors.push(format!( "index_part.json contains invalid layer map structure: {err}" ));