diff --git a/Cargo.lock b/Cargo.lock index 7288d3230b..0f8115d7ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2876,7 +2876,6 @@ dependencies = [ "serde", "thiserror", "utils", - "wal_craft", "workspace_hack", ] @@ -4896,7 +4895,9 @@ dependencies = [ "once_cell", "postgres", "postgres_ffi", + "regex", "tempfile", + "utils", "workspace_hack", ] diff --git a/Cargo.toml b/Cargo.toml index 1cb8d65948..d7bffe67e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,20 @@ members = [ "storage_broker", "workspace_hack", "trace", - "libs/*", + "libs/compute_api", + "libs/pageserver_api", + "libs/postgres_ffi", + "libs/safekeeper_api", + "libs/utils", + "libs/consumption_metrics", + "libs/postgres_backend", + "libs/pq_proto", + "libs/tenant_size_model", + "libs/metrics", + "libs/postgres_connection", + "libs/remote_storage", + "libs/tracing-utils", + "libs/postgres_ffi/wal_craft", ] [workspace.package] diff --git a/Dockerfile.compute-node b/Dockerfile.compute-node index f8429e72b8..44e13a6c73 100644 --- a/Dockerfile.compute-node +++ b/Dockerfile.compute-node @@ -531,7 +531,7 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.0.tar.gz - mkdir pgx_ulid-src && cd pgx_ulid-src && tar xvzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \ sed -i 's/pgx = "=0.7.3"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \ cargo pgx install --release && \ - echo "trusted = true" >> /usr/local/pgsql/share/extension/pgx_ulid.control + echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control ######################################################################################### # diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 162bf6b294..ddce82324c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -110,12 +110,11 @@ impl TenantState { Self::Active => Attached, // If the (initial or resumed) attach procedure fails, the tenant becomes Broken. // However, it also becomes Broken if the regular load fails. - // We would need a separate TenantState variant to distinguish these cases. - // However, there's no practical difference from Console's perspective. - // It will run a Postgres-level health check as soon as it observes Attached. - // That will fail on Broken tenants. - // Console can then rollback the attach, or, wait for operator to fix the Broken tenant. - Self::Broken { .. } => Attached, + // From Console's perspective there's no practical difference + // because attachment_status is polled by console only during attach operation execution. + Self::Broken { reason, .. } => Failed { + reason: reason.to_owned(), + }, // Why is Stopping a Maybe case? Because, during pageserver shutdown, // we set the Stopping state irrespective of whether the tenant // has finished attaching or not. @@ -312,10 +311,11 @@ impl std::ops::Deref for TenantAttachConfig { /// See [`TenantState::attachment_status`] and the OpenAPI docs for context. #[derive(Serialize, Deserialize, Clone)] -#[serde(rename_all = "snake_case")] +#[serde(tag = "slug", content = "data", rename_all = "snake_case")] pub enum TenantAttachmentStatus { Maybe, Attached, + Failed { reason: String }, } #[serde_as] @@ -809,7 +809,9 @@ mod tests { "slug": "Active", }, "current_physical_size": 42, - "attachment_status": "attached", + "attachment_status": { + "slug":"attached", + } }); let original_broken = TenantInfo { @@ -831,7 +833,9 @@ mod tests { } }, "current_physical_size": 42, - "attachment_status": "attached", + "attachment_status": { + "slug":"attached", + } }); assert_eq!( diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 159fc5946d..86e72f6bdd 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -24,7 +24,6 @@ workspace_hack.workspace = true [dev-dependencies] env_logger.workspace = true postgres.workspace = true -wal_craft = { path = "wal_craft" } [build-dependencies] anyhow.workspace = true diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index b8eb469cb0..cc115664d5 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -33,6 +33,7 @@ macro_rules! postgres_ffi { } pub mod controlfile_utils; pub mod nonrelfile_utils; + pub mod wal_craft_test_export; pub mod waldecoder_handler; pub mod xlog_utils; @@ -45,8 +46,15 @@ macro_rules! postgres_ffi { }; } -postgres_ffi!(v14); -postgres_ffi!(v15); +#[macro_export] +macro_rules! for_all_postgres_versions { + ($macro:tt) => { + $macro!(v14); + $macro!(v15); + }; +} + +for_all_postgres_versions! { postgres_ffi } pub mod pg_constants; pub mod relfile_utils; diff --git a/libs/postgres_ffi/src/wal_craft_test_export.rs b/libs/postgres_ffi/src/wal_craft_test_export.rs new file mode 100644 index 0000000000..147567c442 --- /dev/null +++ b/libs/postgres_ffi/src/wal_craft_test_export.rs @@ -0,0 +1,6 @@ +//! This module is for WAL craft to test with postgres_ffi. Should not import any thing in normal usage. + +pub use super::PG_MAJORVERSION; +pub use super::xlog_utils::*; +pub use super::bindings::*; +pub use crate::WAL_SEGMENT_SIZE; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 4d7bb61883..61a9c38a84 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -481,220 +481,4 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { wal } -#[cfg(test)] -mod tests { - use super::super::PG_MAJORVERSION; - use super::*; - use regex::Regex; - use std::cmp::min; - use std::fs; - use std::{env, str::FromStr}; - use utils::const_assert; - - fn init_logging() { - let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or( - format!("wal_craft=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"), - )) - .is_test(true) - .try_init(); - } - - fn test_end_of_wal(test_name: &str) { - use wal_craft::*; - - let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); - - // Craft some WAL - let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("..") - .join(".."); - let cfg = Conf { - pg_version, - pg_distrib_dir: top_path.join("pg_install"), - datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), - }; - if cfg.datadir.exists() { - fs::remove_dir_all(&cfg.datadir).unwrap(); - } - cfg.initdb().unwrap(); - let srv = cfg.start_server().unwrap(); - let (intermediate_lsns, expected_end_of_wal_partial) = - C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap(); - let intermediate_lsns: Vec = intermediate_lsns - .iter() - .map(|&lsn| u64::from(lsn).into()) - .collect(); - let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); - srv.kill(); - - // Check find_end_of_wal on the initial WAL - let last_segment = cfg - .wal_dir() - .read_dir() - .unwrap() - .map(|f| f.unwrap().file_name().into_string().unwrap()) - .filter(|fname| IsXLogFileName(fname)) - .max() - .unwrap(); - check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); - for start_lsn in intermediate_lsns - .iter() - .chain(std::iter::once(&expected_end_of_wal)) - { - // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. - // We assume that `start_lsn` is non-decreasing. - info!( - "Checking with start_lsn={}, erasing WAL before it", - start_lsn - ); - for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { - let fname = file.file_name().into_string().unwrap(); - if !IsXLogFileName(&fname) { - continue; - } - let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); - let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); - if seg_start_lsn > u64::from(*start_lsn) { - continue; - } - let mut f = File::options().write(true).open(file.path()).unwrap(); - const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; - f.write_all( - &ZEROS[0..min( - WAL_SEGMENT_SIZE, - (u64::from(*start_lsn) - seg_start_lsn) as usize, - )], - ) - .unwrap(); - } - check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); - } - } - - fn check_pg_waldump_end_of_wal( - cfg: &wal_craft::Conf, - last_segment: &str, - expected_end_of_wal: Lsn, - ) { - // Get the actual end of WAL by pg_waldump - let waldump_output = cfg - .pg_waldump("000000010000000000000001", last_segment) - .unwrap() - .stderr; - let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); - let caps = match Regex::new(r"invalid record length at (.+):") - .unwrap() - .captures(waldump_output) - { - Some(caps) => caps, - None => { - error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); - panic!(); - } - }; - let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); - info!( - "waldump erred on {}, expected wal end at {}", - waldump_wal_end, expected_end_of_wal - ); - assert_eq!(waldump_wal_end, expected_end_of_wal); - } - - fn check_end_of_wal( - cfg: &wal_craft::Conf, - last_segment: &str, - start_lsn: Lsn, - expected_end_of_wal: Lsn, - ) { - // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) - // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); - // info!( - // "find_end_of_wal returned wal_end={} with non-partial WAL segment", - // wal_end - // ); - // assert_eq!(wal_end, expected_end_of_wal_non_partial); - - // Rename file to partial to actually find last valid lsn, then rename it back. - fs::rename( - cfg.wal_dir().join(last_segment), - cfg.wal_dir().join(format!("{}.partial", last_segment)), - ) - .unwrap(); - let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); - info!( - "find_end_of_wal returned wal_end={} with partial WAL segment", - wal_end - ); - assert_eq!(wal_end, expected_end_of_wal); - fs::rename( - cfg.wal_dir().join(format!("{}.partial", last_segment)), - cfg.wal_dir().join(last_segment), - ) - .unwrap(); - } - - const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024); - - #[test] - pub fn test_find_end_of_wal_simple() { - init_logging(); - test_end_of_wal::("test_find_end_of_wal_simple"); - } - - #[test] - pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { - init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_crossing_segment_followed_by_small_one", - ); - } - - #[test] - pub fn test_find_end_of_wal_last_crossing_segment() { - init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_last_crossing_segment", - ); - } - - /// Check the math in update_next_xid - /// - /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, - /// currently 1024. - #[test] - pub fn test_update_next_xid() { - let checkpoint_buf = [0u8; std::mem::size_of::()]; - let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); - - checkpoint.nextXid = FullTransactionId { value: 10 }; - assert_eq!(checkpoint.nextXid.value, 10); - - // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL - // boundary - checkpoint.update_next_xid(100); - assert_eq!(checkpoint.nextXid.value, 1024); - - // No change - checkpoint.update_next_xid(500); - assert_eq!(checkpoint.nextXid.value, 1024); - checkpoint.update_next_xid(1023); - assert_eq!(checkpoint.nextXid.value, 1024); - - // The function returns the *next* XID, given the highest XID seen so - // far. So when we pass 1024, the nextXid gets bumped up to the next - // XID_CHECKPOINT_INTERVAL boundary. - checkpoint.update_next_xid(1024); - assert_eq!(checkpoint.nextXid.value, 2048); - } - - #[test] - pub fn test_encode_logical_message() { - let expected = [ - 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, - 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, - 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, - ]; - let actual = encode_logical_message("prefix", "message"); - assert_eq!(expected, actual[..]); - } -} +// If you need to craft WAL and write tests for this module, put it at wal_craft crate. diff --git a/libs/postgres_ffi/wal_craft/Cargo.toml b/libs/postgres_ffi/wal_craft/Cargo.toml index 992bf7460b..bea888b23e 100644 --- a/libs/postgres_ffi/wal_craft/Cargo.toml +++ b/libs/postgres_ffi/wal_craft/Cargo.toml @@ -15,3 +15,7 @@ postgres_ffi.workspace = true tempfile.workspace = true workspace_hack.workspace = true + +[dev-dependencies] +regex.workspace = true +utils.workspace = true diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 9f3f4dc20d..d4aed88048 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -10,6 +10,20 @@ use std::process::Command; use std::time::{Duration, Instant}; use tempfile::{tempdir, TempDir}; +macro_rules! xlog_utils_test { + ($version:ident) => { + #[path = "."] + mod $version { + pub use postgres_ffi::$version::wal_craft_test_export::*; + #[allow(clippy::duplicate_mod)] + #[cfg(test)] + mod xlog_utils_test; + } + }; +} + +postgres_ffi::for_all_postgres_versions! { xlog_utils_test } + #[derive(Debug, Clone, PartialEq, Eq)] pub struct Conf { pub pg_version: u32, diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs new file mode 100644 index 0000000000..6ff4c563b2 --- /dev/null +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -0,0 +1,219 @@ +//! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency. + +use super::*; +use crate::{error, info}; +use regex::Regex; +use std::cmp::min; +use std::fs::{self, File}; +use std::io::Write; +use std::{env, str::FromStr}; +use utils::const_assert; +use utils::lsn::Lsn; + +fn init_logging() { + let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or( + format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"), + )) + .is_test(true) + .try_init(); +} + +fn test_end_of_wal(test_name: &str) { + use crate::*; + + let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); + + // Craft some WAL + let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("..") + .join("..") + .join(".."); + let cfg = Conf { + pg_version, + pg_distrib_dir: top_path.join("pg_install"), + datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), + }; + if cfg.datadir.exists() { + fs::remove_dir_all(&cfg.datadir).unwrap(); + } + cfg.initdb().unwrap(); + let srv = cfg.start_server().unwrap(); + let (intermediate_lsns, expected_end_of_wal_partial) = + C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap(); + let intermediate_lsns: Vec = intermediate_lsns + .iter() + .map(|&lsn| u64::from(lsn).into()) + .collect(); + let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); + srv.kill(); + + // Check find_end_of_wal on the initial WAL + let last_segment = cfg + .wal_dir() + .read_dir() + .unwrap() + .map(|f| f.unwrap().file_name().into_string().unwrap()) + .filter(|fname| IsXLogFileName(fname)) + .max() + .unwrap(); + check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); + for start_lsn in intermediate_lsns + .iter() + .chain(std::iter::once(&expected_end_of_wal)) + { + // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. + // We assume that `start_lsn` is non-decreasing. + info!( + "Checking with start_lsn={}, erasing WAL before it", + start_lsn + ); + for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { + let fname = file.file_name().into_string().unwrap(); + if !IsXLogFileName(&fname) { + continue; + } + let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); + let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); + if seg_start_lsn > u64::from(*start_lsn) { + continue; + } + let mut f = File::options().write(true).open(file.path()).unwrap(); + const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; + f.write_all( + &ZEROS[0..min( + WAL_SEGMENT_SIZE, + (u64::from(*start_lsn) - seg_start_lsn) as usize, + )], + ) + .unwrap(); + } + check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); + } +} + +fn check_pg_waldump_end_of_wal( + cfg: &crate::Conf, + last_segment: &str, + expected_end_of_wal: Lsn, +) { + // Get the actual end of WAL by pg_waldump + let waldump_output = cfg + .pg_waldump("000000010000000000000001", last_segment) + .unwrap() + .stderr; + let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); + let caps = match Regex::new(r"invalid record length at (.+):") + .unwrap() + .captures(waldump_output) + { + Some(caps) => caps, + None => { + error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); + panic!(); + } + }; + let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); + info!( + "waldump erred on {}, expected wal end at {}", + waldump_wal_end, expected_end_of_wal + ); + assert_eq!(waldump_wal_end, expected_end_of_wal); +} + +fn check_end_of_wal( + cfg: &crate::Conf, + last_segment: &str, + start_lsn: Lsn, + expected_end_of_wal: Lsn, +) { + // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) + // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + // info!( + // "find_end_of_wal returned wal_end={} with non-partial WAL segment", + // wal_end + // ); + // assert_eq!(wal_end, expected_end_of_wal_non_partial); + + // Rename file to partial to actually find last valid lsn, then rename it back. + fs::rename( + cfg.wal_dir().join(last_segment), + cfg.wal_dir().join(format!("{}.partial", last_segment)), + ) + .unwrap(); + let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + info!( + "find_end_of_wal returned wal_end={} with partial WAL segment", + wal_end + ); + assert_eq!(wal_end, expected_end_of_wal); + fs::rename( + cfg.wal_dir().join(format!("{}.partial", last_segment)), + cfg.wal_dir().join(last_segment), + ) + .unwrap(); +} + +const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024); + +#[test] +pub fn test_find_end_of_wal_simple() { + init_logging(); + test_end_of_wal::("test_find_end_of_wal_simple"); +} + +#[test] +pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { + init_logging(); + test_end_of_wal::( + "test_find_end_of_wal_crossing_segment_followed_by_small_one", + ); +} + +#[test] +pub fn test_find_end_of_wal_last_crossing_segment() { + init_logging(); + test_end_of_wal::( + "test_find_end_of_wal_last_crossing_segment", + ); +} + +/// Check the math in update_next_xid +/// +/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, +/// currently 1024. +#[test] +pub fn test_update_next_xid() { + let checkpoint_buf = [0u8; std::mem::size_of::()]; + let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); + + checkpoint.nextXid = FullTransactionId { value: 10 }; + assert_eq!(checkpoint.nextXid.value, 10); + + // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL + // boundary + checkpoint.update_next_xid(100); + assert_eq!(checkpoint.nextXid.value, 1024); + + // No change + checkpoint.update_next_xid(500); + assert_eq!(checkpoint.nextXid.value, 1024); + checkpoint.update_next_xid(1023); + assert_eq!(checkpoint.nextXid.value, 1024); + + // The function returns the *next* XID, given the highest XID seen so + // far. So when we pass 1024, the nextXid gets bumped up to the next + // XID_CHECKPOINT_INTERVAL boundary. + checkpoint.update_next_xid(1024); + assert_eq!(checkpoint.nextXid.value, 2048); +} + +#[test] +pub fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, + 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, + 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix", "message"); + assert_eq!(expected, actual[..]); +} diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index ee5980212e..45dc9fad4a 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) { is_incremental: false, short_id: format!("Layer {}", i), }; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index e0731ba79b..1fa5e4ab3b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -337,33 +337,114 @@ fn start_pageserver( // Startup staging or optimizing: // - // (init_done_tx, init_done_rx) are used to control when do background loops start. This is to - // avoid starving out the BACKGROUND_RUNTIME async worker threads doing heavy work, like - // initial repartitioning while we still have Loading tenants. + // We want to minimize downtime for `page_service` connections, and trying not to overload + // BACKGROUND_RUNTIME by doing initial compactions and initial logical sizes at the same time. // - // init_done_rx is a barrier which stops waiting once all init_done_tx clones are dropped. + // init_done_rx will notify when all initial load operations have completed. + // + // background_jobs_can_start (same name used to hold off background jobs from starting at + // consumer side) will be dropped once we can start the background jobs. Currently it is behind + // completing all initial logical size calculations (init_logical_size_done_rx) and a timeout + // (background_task_maximum_delay). let (init_done_tx, init_done_rx) = utils::completion::channel(); + let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel(); + + let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel(); + + let order = pageserver::InitializationOrder { + initial_tenant_load: Some(init_done_tx), + initial_logical_size_can_start: init_done_rx.clone(), + initial_logical_size_attempt: init_logical_size_done_tx, + background_jobs_can_start: background_jobs_barrier.clone(), + }; + // Scan the local 'tenants/' directory and start loading the tenants let init_started_at = std::time::Instant::now(); + let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); + BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( conf, broker_client.clone(), remote_storage.clone(), - (init_done_tx, init_done_rx.clone()), + order, ))?; BACKGROUND_RUNTIME.spawn({ - let init_done_rx = init_done_rx.clone(); - async move { - init_done_rx.wait().await; + let init_done_rx = init_done_rx; + let shutdown_pageserver = shutdown_pageserver.clone(); + let drive_init = async move { + // NOTE: unlike many futures in pageserver, this one is cancellation-safe + let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed")); - let elapsed = init_started_at.elapsed(); + init_done_rx.wait().await; + // initial logical sizes can now start, as they were waiting on init_done_rx. + + scopeguard::ScopeGuard::into_inner(guard); + + let init_done = std::time::Instant::now(); + let elapsed = init_done - init_started_at; tracing::info!( elapsed_millis = elapsed.as_millis(), - "Initial load completed." + "Initial load completed" ); + + let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait()); + + let timeout = conf.background_task_maximum_delay; + + let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); + + let init_sizes_done = tokio::select! { + _ = &mut init_sizes_done => { + let now = std::time::Instant::now(); + tracing::info!( + from_init_done_millis = (now - init_done).as_millis(), + from_init_millis = (now - init_started_at).as_millis(), + "Initial logical sizes completed" + ); + None + } + _ = tokio::time::sleep(timeout) => { + tracing::info!( + timeout_millis = timeout.as_millis(), + "Initial logical size timeout elapsed; starting background jobs" + ); + Some(init_sizes_done) + } + }; + + scopeguard::ScopeGuard::into_inner(guard); + + // allow background jobs to start + drop(background_jobs_can_start); + + if let Some(init_sizes_done) = init_sizes_done { + // ending up here is not a bug; at the latest logical sizes will be queried by + // consumption metrics. + let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed")); + init_sizes_done.await; + + scopeguard::ScopeGuard::into_inner(guard); + + let now = std::time::Instant::now(); + tracing::info!( + from_init_done_millis = (now - init_done).as_millis(), + from_init_millis = (now - init_started_at).as_millis(), + "Initial logical sizes completed after timeout (background jobs already started)" + ); + + } + }; + + async move { + let mut drive_init = std::pin::pin!(drive_init); + // just race these tasks + tokio::select! { + _ = shutdown_pageserver.cancelled() => {}, + _ = &mut drive_init => {}, + } } }); @@ -378,7 +459,7 @@ fn start_pageserver( conf, remote_storage.clone(), disk_usage_eviction_state.clone(), - init_done_rx.clone(), + background_jobs_barrier.clone(), )?; } @@ -416,7 +497,7 @@ fn start_pageserver( ); if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint { - let init_done_rx = init_done_rx; + let background_jobs_barrier = background_jobs_barrier; let metrics_ctx = RequestContext::todo_child( TaskKind::MetricsCollection, // This task itself shouldn't download anything. @@ -432,12 +513,17 @@ fn start_pageserver( "consumption metrics collection", true, async move { - // first wait for initial load to complete before first iteration. + // first wait until background jobs are cleared to launch. // // this is because we only process active tenants and timelines, and the // Timeline::get_current_logical_size will spawn the logical size calculation, // which will not be rate-limited. - init_done_rx.wait().await; + let cancel = task_mgr::shutdown_token(); + + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = background_jobs_barrier.wait() => {} + }; pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, @@ -487,6 +573,8 @@ fn start_pageserver( ); } + let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); + // All started up! Now just sit and wait for shutdown signal. ShutdownSignals::handle(|signal| match signal { Signal::Quit => { @@ -502,6 +590,11 @@ fn start_pageserver( "Got {}. Terminating gracefully in fast shutdown mode", signal.name() ); + + // This cancels the `shutdown_pageserver` cancellation tree. + // Right now that tree doesn't reach very far, and `task_mgr` is used instead. + // The plan is to change that over time. + shutdown_pageserver.take(); BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0)); unreachable!() } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 02763c9b7d..17e6e3fb2a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -63,6 +63,7 @@ pub mod defaults { pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour"; pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option = None; pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min"; + pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s"; /// /// Default built-in configuration file. @@ -91,9 +92,10 @@ pub mod defaults { #cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}' #synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}' - #disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}} +#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' + # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -187,6 +189,15 @@ pub struct PageServerConf { pub test_remote_failures: u64, pub ondemand_download_behavior_treat_error_as_warn: bool, + + /// How long will background tasks be delayed at most after initial load of tenants. + /// + /// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works + /// as we now isolate initial loading, initial logical size calculation and background tasks. + /// Smaller nodes will have background tasks "not running" for this long unless every timeline + /// has it's initial logical size calculated. Not running background tasks for some seconds is + /// not terrible. + pub background_task_maximum_delay: Duration, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -259,6 +270,8 @@ struct PageServerConfigBuilder { test_remote_failures: BuilderValue, ondemand_download_behavior_treat_error_as_warn: BuilderValue, + + background_task_maximum_delay: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -316,6 +329,11 @@ impl Default for PageServerConfigBuilder { test_remote_failures: Set(0), ondemand_download_behavior_treat_error_as_warn: Set(false), + + background_task_maximum_delay: Set(humantime::parse_duration( + DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY, + ) + .unwrap()), } } } @@ -440,6 +458,10 @@ impl PageServerConfigBuilder { BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn); } + pub fn background_task_maximum_delay(&mut self, delay: Duration) { + self.background_task_maximum_delay = BuilderValue::Set(delay); + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries @@ -522,6 +544,9 @@ impl PageServerConfigBuilder { .ok_or(anyhow!( "missing ondemand_download_behavior_treat_error_as_warn" ))?, + background_task_maximum_delay: self + .background_task_maximum_delay + .ok_or(anyhow!("missing background_task_maximum_delay"))?, }) } } @@ -710,6 +735,7 @@ impl PageServerConf { ) }, "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?), + "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -877,6 +903,7 @@ impl PageServerConf { disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, + background_task_maximum_delay: Duration::ZERO, } } } @@ -1036,6 +1063,7 @@ metric_collection_endpoint = 'http://localhost:80/metrics' synthetic_size_calculation_interval = '333 s' log_format = 'json' +background_task_maximum_delay = '334 s' "#; @@ -1094,6 +1122,9 @@ log_format = 'json' disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, + background_task_maximum_delay: humantime::parse_duration( + defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY + )?, }, "Correct defaults should be used when no config values are provided" ); @@ -1148,6 +1179,7 @@ log_format = 'json' disk_usage_based_eviction: None, test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, + background_task_maximum_delay: Duration::from_secs(334), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 1a8886935c..ce5f81c44b 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -83,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task( conf: &'static PageServerConf, storage: GenericRemoteStorage, state: Arc, - init_done: completion::Barrier, + background_jobs_barrier: completion::Barrier, ) -> anyhow::Result<()> { let Some(task_config) = &conf.disk_usage_based_eviction else { info!("disk usage based eviction task not configured"); @@ -100,17 +100,16 @@ pub fn launch_disk_usage_global_eviction_task( "disk usage based eviction", false, async move { - // wait until initial load is complete, because we cannot evict from loading tenants. - init_done.wait().await; + let cancel = task_mgr::shutdown_token(); - disk_usage_eviction_task( - &state, - task_config, - storage, - &conf.tenants_path(), - task_mgr::shutdown_token(), - ) - .await; + // wait until initial load is complete, because we cannot evict from loading tenants. + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = background_jobs_barrier.wait() => { } + }; + + disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel) + .await; info!("disk usage based eviction task finishing"); Ok(()) }, diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 0d912c95e0..1f8298ca3e 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -928,12 +928,28 @@ components: writing to the tenant's S3 state, so, DO NOT ATTACH the tenant to any other pageserver, or we risk split-brain. - `attached` means that the attach operation has completed, - maybe successfully, maybe not. Perform a health check at - the Postgres level to determine healthiness of the tenant. + successfully + - `failed` means that attach has failed. For reason check corresponding `reason` failed. + `failed` is the terminal state, retrying attach call wont resolve the issue. + For example this can be caused by s3 being unreachable. The retry may be implemented + with call to detach, though it would be better to not automate it and inspec failed state + manually before proceeding with a retry. See the tenant `/attach` endpoint for more information. - type: string - enum: [ "maybe", "attached" ] + type: object + required: + - slug + - data + properties: + slug: + type: string + enum: [ "maybe", "attached", "failed" ] + data: + - type: object + properties: + reason: + type: string + TenantCreateRequest: allOf: - $ref: '#/components/schemas/TenantConfig' diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 40a672bee3..5831091098 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -132,6 +132,29 @@ pub fn is_uninit_mark(path: &Path) -> bool { } } +/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by +/// blocking. +/// +/// The instances of this value exist only during startup, otherwise `None` is provided, meaning no +/// delaying is needed. +#[derive(Clone)] +pub struct InitializationOrder { + /// Each initial tenant load task carries this until completion. + pub initial_tenant_load: Option, + + /// Barrier for when we can start initial logical size calculations. + pub initial_logical_size_can_start: utils::completion::Barrier, + + /// Each timeline owns a clone of this to be consumed on the initial logical size calculation + /// attempt. It is important to drop this once the attempt has completed. + pub initial_logical_size_attempt: utils::completion::Completion, + + /// Barrier for when we can start any background jobs. + /// + /// This can be broken up later on, but right now there is just one class of a background job. + pub background_jobs_can_start: utils::completion::Barrier, +} + #[cfg(test)] mod backoff_defaults_tests { use super::*; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7ce0ed81bc..29086cae86 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -65,6 +65,7 @@ use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError; use crate::tenant::storage_layer::DeltaLayer; use crate::tenant::storage_layer::ImageLayer; use crate::tenant::storage_layer::Layer; +use crate::InitializationOrder; use crate::virtual_file::VirtualFile; use crate::walredo::PostgresRedoManager; @@ -510,6 +511,7 @@ impl Tenant { local_metadata: Option, ancestor: Option>, first_save: bool, + init_order: Option<&InitializationOrder>, ctx: &RequestContext, ) -> anyhow::Result<()> { let tenant_id = self.tenant_id; @@ -535,6 +537,7 @@ impl Tenant { up_to_date_metadata, ancestor.clone(), remote_client, + init_order, )?; let timeline = UninitializedTimeline { @@ -560,6 +563,7 @@ impl Tenant { up_to_date_metadata, ancestor.clone(), None, + None, ) .with_context(|| { format!("creating broken timeline data for {tenant_id}/{timeline_id}") @@ -858,6 +862,7 @@ impl Tenant { local_metadata, ancestor, true, + None, ctx, ) .await @@ -892,16 +897,13 @@ impl Tenant { /// /// If the loading fails for some reason, the Tenant will go into Broken /// state. - /// - /// `init_done` is an optional channel used during initial load to delay background task - /// start. It is not used later. #[instrument(skip_all, fields(tenant_id=%tenant_id))] pub fn spawn_load( conf: &'static PageServerConf, tenant_id: TenantId, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: Option<(completion::Completion, completion::Barrier)>, + init_order: Option, ctx: &RequestContext, ) -> Arc { debug_assert_current_span_has_tenant_id(); @@ -937,17 +939,17 @@ impl Tenant { "initial tenant load", false, async move { - // keep the sender alive as long as we have the initial load ongoing; it will be - // None for loads spawned after init_tenant_mgr. - let (_tx, rx) = if let Some((tx, rx)) = init_done { - (Some(tx), Some(rx)) - } else { - (None, None) - }; - match tenant_clone.load(&ctx).await { + let mut init_order = init_order; + + // take the completion because initial tenant loading will complete when all of + // these tasks complete. + let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take()); + + match tenant_clone.load(init_order.as_ref(), &ctx).await { Ok(()) => { debug!("load finished, activating"); - tenant_clone.activate(broker_client, rx.as_ref(), &ctx); + let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start); + tenant_clone.activate(broker_client, background_jobs_can_start, &ctx); } Err(err) => { error!("load failed, setting tenant state to Broken: {err:?}"); @@ -974,7 +976,11 @@ impl Tenant { /// files on disk. Used at pageserver startup. /// /// No background tasks are started as part of this routine. - async fn load(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { + async fn load( + self: &Arc, + init_order: Option<&InitializationOrder>, + ctx: &RequestContext, + ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); debug!("loading tenant task"); @@ -1094,7 +1100,7 @@ impl Tenant { // 1. "Timeline has no ancestor and no layer files" for (timeline_id, local_metadata) in sorted_timelines { - self.load_local_timeline(timeline_id, local_metadata, ctx) + self.load_local_timeline(timeline_id, local_metadata, init_order, ctx) .await .with_context(|| format!("load local timeline {timeline_id}"))?; } @@ -1112,6 +1118,7 @@ impl Tenant { &self, timeline_id: TimelineId, local_metadata: TimelineMetadata, + init_order: Option<&InitializationOrder>, ctx: &RequestContext, ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -1181,6 +1188,7 @@ impl Tenant { Some(local_metadata), ancestor, false, + init_order, ctx, ) .await @@ -1724,12 +1732,12 @@ impl Tenant { /// Changes tenant status to active, unless shutdown was already requested. /// - /// `init_done` is an optional channel used during initial load to delay background task - /// start. It is not used later. + /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup + /// to delay background jobs. Background jobs can be started right away when None is given. fn activate( self: &Arc, broker_client: BrokerClientChannel, - init_done: Option<&completion::Barrier>, + background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { debug_assert_current_span_has_tenant_id(); @@ -1762,12 +1770,12 @@ impl Tenant { // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. - tasks::start_background_loops(self, init_done); + tasks::start_background_loops(self, background_jobs_can_start); let mut activated_timelines = 0; for timeline in not_broken_timelines { - timeline.activate(broker_client.clone(), init_done, ctx); + timeline.activate(broker_client.clone(), background_jobs_can_start, ctx); activated_timelines += 1; } @@ -2158,6 +2166,7 @@ impl Tenant { new_metadata: &TimelineMetadata, ancestor: Option>, remote_client: Option, + init_order: Option<&InitializationOrder>, ) -> anyhow::Result> { if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() { anyhow::ensure!( @@ -2166,6 +2175,9 @@ impl Tenant { ) } + let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start); + let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt); + let pg_version = new_metadata.pg_version(); Ok(Timeline::new( self.conf, @@ -2177,6 +2189,8 @@ impl Tenant { Arc::clone(&self.walredo_mgr), remote_client, pg_version, + initial_logical_size_can_start.cloned(), + initial_logical_size_attempt.cloned(), )) } @@ -2852,7 +2866,7 @@ impl Tenant { remote_client: Option, ) -> anyhow::Result> { let timeline_data = self - .create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client) + .create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client, None) .context("Failed to create timeline data structure")?; crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?; @@ -3420,7 +3434,7 @@ pub mod harness { timelines_to_load.insert(timeline_id, timeline_metadata); } tenant - .load(ctx) + .load(None, ctx) .instrument(info_span!("try_load", tenant_id=%self.tenant_id)) .await?; tenant.state.send_replace(TenantState::Active); diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 8d06ccd565..ca1a71b623 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -51,7 +51,9 @@ use crate::keyspace::KeyPartitioning; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; +use anyhow::Context; use anyhow::Result; +use std::collections::HashMap; use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; @@ -61,6 +63,8 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage; pub use historic_layer_coverage::Replacement; use super::storage_layer::range_eq; +use super::storage_layer::PersistentLayerDesc; +use super::storage_layer::PersistentLayerKey; /// /// LayerMap tracks what layers exist on a timeline. @@ -86,11 +90,16 @@ pub struct LayerMap { pub frozen_layers: VecDeque>, /// Index of the historic layers optimized for search - historic: BufferedHistoricLayerCoverage>, + historic: BufferedHistoricLayerCoverage>, /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. - l0_delta_layers: Vec>, + l0_delta_layers: Vec>, + + /// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and + /// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and + /// RemoteLayer will be removed. + mapping: HashMap>, } impl Default for LayerMap { @@ -101,6 +110,7 @@ impl Default for LayerMap { frozen_layers: VecDeque::default(), l0_delta_layers: Vec::default(), historic: BufferedHistoricLayerCoverage::default(), + mapping: HashMap::default(), } } } @@ -125,8 +135,9 @@ where /// /// Insert an on-disk layer. /// - pub fn insert_historic(&mut self, layer: Arc) { - self.layer_map.insert_historic_noflush(layer) + // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` + pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { + self.layer_map.insert_historic_noflush(layer_desc, layer) } /// @@ -134,8 +145,8 @@ where /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer: Arc) { - self.layer_map.remove_historic_noflush(layer) + pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { + self.layer_map.remove_historic_noflush(layer_desc, layer) } /// Replaces existing layer iff it is the `expected`. @@ -150,12 +161,15 @@ where /// that we can replace values only by updating a hashmap. pub fn replace_historic( &mut self, + expected_desc: PersistentLayerDesc, expected: &Arc, + new_desc: PersistentLayerDesc, new: Arc, ) -> anyhow::Result>> { fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound)); - self.layer_map.replace_historic_noflush(expected, new) + self.layer_map + .replace_historic_noflush(expected_desc, expected, new_desc, new) } // We will flush on drop anyway, but this method makes it @@ -230,6 +244,7 @@ where (None, None) => None, (None, Some(image)) => { let lsn_floor = image.get_lsn_range().start; + let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor, @@ -237,6 +252,7 @@ where } (Some(delta), None) => { let lsn_floor = delta.get_lsn_range().start; + let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -247,6 +263,7 @@ where let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_exact_match = img_lsn + 1 == end_lsn; if image_is_newer || image_exact_match { + let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor: img_lsn, @@ -254,6 +271,7 @@ where } else { let lsn_floor = std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); + let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -273,16 +291,33 @@ where /// /// Helper function for BatchedUpdates::insert_historic /// - pub(self) fn insert_historic_noflush(&mut self, layer: Arc) { + /// TODO(chi): remove L generic so that we do not need to pass layer object. + pub(self) fn insert_historic_noflush( + &mut self, + layer_desc: PersistentLayerDesc, + layer: Arc, + ) { + self.mapping.insert(layer_desc.key(), layer.clone()); + // TODO: See #3869, resulting #4088, attempted fix and repro #4094 - self.historic.insert( - historic_layer_coverage::LayerKey::from(&*layer), - Arc::clone(&layer), - ); if Self::is_l0(&layer) { - self.l0_delta_layers.push(layer); + self.l0_delta_layers.push(layer_desc.clone().into()); } + + self.historic.insert( + historic_layer_coverage::LayerKey::from(&*layer), + layer_desc.into(), + ); + } + + fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc { + let layer = self + .mapping + .get(key) + .with_context(|| format!("{key:?}")) + .expect("inconsistent layer mapping"); + layer } /// @@ -290,14 +325,16 @@ where /// /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic_noflush(&mut self, layer: Arc) { + pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { self.historic .remove(historic_layer_coverage::LayerKey::from(&*layer)); - if Self::is_l0(&layer) { let len_before = self.l0_delta_layers.len(); - self.l0_delta_layers - .retain(|other| !Self::compare_arced_layers(other, &layer)); + let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); + l0_delta_layers.retain(|other| { + !Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer) + }); + self.l0_delta_layers = l0_delta_layers; // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // there's a chance that the comparison fails at runtime due to it comparing (pointer, // vtable) pairs. @@ -307,11 +344,14 @@ where "failed to locate removed historic layer from l0_delta_layers" ); } + self.mapping.remove(&layer_desc.key()); } pub(self) fn replace_historic_noflush( &mut self, + expected_desc: PersistentLayerDesc, expected: &Arc, + new_desc: PersistentLayerDesc, new: Arc, ) -> anyhow::Result>> { let key = historic_layer_coverage::LayerKey::from(&**expected); @@ -332,10 +372,9 @@ where let l0_index = if expected_l0 { // find the index in case replace worked, we need to replace that as well - let pos = self - .l0_delta_layers - .iter() - .position(|slot| Self::compare_arced_layers(slot, expected)); + let pos = self.l0_delta_layers.iter().position(|slot| { + Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected) + }); if pos.is_none() { return Ok(Replacement::NotFound); @@ -345,16 +384,28 @@ where None }; - let replaced = self.historic.replace(&key, new.clone(), |existing| { - Self::compare_arced_layers(existing, expected) + let new_desc = Arc::new(new_desc); + let replaced = self.historic.replace(&key, new_desc.clone(), |existing| { + **existing == expected_desc }); if let Replacement::Replaced { .. } = &replaced { + self.mapping.remove(&expected_desc.key()); + self.mapping.insert(new_desc.key(), new); if let Some(index) = l0_index { - self.l0_delta_layers[index] = new; + self.l0_delta_layers[index] = new_desc; } } + let replaced = match replaced { + Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered }, + Replacement::NotFound => Replacement::NotFound, + Replacement::RemovalBuffered => Replacement::RemovalBuffered, + Replacement::Unexpected(x) => { + Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone()) + } + }; + Ok(replaced) } @@ -383,7 +434,7 @@ where let start = key.start.to_i128(); let end = key.end.to_i128(); - let layer_covers = |layer: Option>| match layer { + let layer_covers = |layer: Option>| match layer { Some(layer) => layer.get_lsn_range().start >= lsn.start, None => false, }; @@ -404,7 +455,9 @@ where } pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { - self.historic.iter() + self.historic + .iter() + .map(|x| self.get_layer_from_mapping(&x.key()).clone()) } /// @@ -436,14 +489,24 @@ where // Loop through the change events and push intervals for (change_key, change_val) in version.image_coverage.range(start..end) { let kr = Key::from_i128(current_key)..Key::from_i128(change_key); - coverage.push((kr, current_val.take())); + coverage.push(( + kr, + current_val + .take() + .map(|l| self.get_layer_from_mapping(&l.key()).clone()), + )); current_key = change_key; current_val = change_val.clone(); } // Add the final interval let kr = Key::from_i128(current_key)..Key::from_i128(end); - coverage.push((kr, current_val.take())); + coverage.push(( + kr, + current_val + .take() + .map(|l| self.get_layer_from_mapping(&l.key()).clone()), + )); Ok(coverage) } @@ -532,7 +595,9 @@ where let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(&val, key) as usize; + let base_count = + Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) + as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; @@ -555,7 +620,9 @@ where let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(&val, key) as usize; + let base_count = + Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) + as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; max_stacked_deltas = std::cmp::max( @@ -706,7 +773,11 @@ where /// Return all L0 delta layers pub fn get_level0_deltas(&self) -> Result>> { - Ok(self.l0_delta_layers.clone()) + Ok(self + .l0_delta_layers + .iter() + .map(|x| self.get_layer_from_mapping(&x.key()).clone()) + .collect()) } /// debugging function to print out the contents of the layer map @@ -809,12 +880,17 @@ mod tests { let layer = LayerDescriptor::from(layer); // same skeletan construction; see scenario below - let not_found: Arc = Arc::new(layer.clone()); - let new_version: Arc = Arc::new(layer); + let not_found = Arc::new(layer.clone()); + let new_version = Arc::new(layer); let mut map = LayerMap::default(); - let res = map.batch_update().replace_historic(¬_found, new_version); + let res = map.batch_update().replace_historic( + not_found.get_persistent_layer_desc(), + ¬_found, + new_version.get_persistent_layer_desc(), + new_version, + ); assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}"); } @@ -823,8 +899,8 @@ mod tests { let name = LayerFileName::from_str(layer_name).unwrap(); let skeleton = LayerDescriptor::from(name); - let remote: Arc = Arc::new(skeleton.clone()); - let downloaded: Arc = Arc::new(skeleton); + let remote = Arc::new(skeleton.clone()); + let downloaded = Arc::new(skeleton); let mut map = LayerMap::default(); @@ -834,12 +910,18 @@ mod tests { let expected_in_counts = (1, usize::from(expected_l0)); - map.batch_update().insert_historic(remote.clone()); + map.batch_update() + .insert_historic(remote.get_persistent_layer_desc(), remote.clone()); assert_eq!(count_layer_in(&map, &remote), expected_in_counts); let replaced = map .batch_update() - .replace_historic(&remote, downloaded.clone()) + .replace_historic( + remote.get_persistent_layer_desc(), + &remote, + downloaded.get_persistent_layer_desc(), + downloaded.clone(), + ) .expect("name derived attributes are the same"); assert!( matches!(replaced, Replacement::Replaced { .. }), @@ -847,11 +929,12 @@ mod tests { ); assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts); - map.batch_update().remove_historic(downloaded.clone()); + map.batch_update() + .remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); } - fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { + fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { let historic = map .iter_historic_layers() .filter(|x| LayerMap::compare_arced_layers(x, layer)) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 740f9621b6..a1638e4a95 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -21,9 +21,8 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState}; -use crate::IGNORED_TENANT_FILE_NAME; +use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME}; -use utils::completion; use utils::fs_ext::PathExt; use utils::id::{TenantId, TimelineId}; @@ -65,7 +64,7 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: (completion::Completion, completion::Barrier), + init_order: InitializationOrder, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); @@ -122,7 +121,7 @@ pub async fn init_tenant_mgr( &tenant_dir_path, broker_client.clone(), remote_storage.clone(), - Some(init_done.clone()), + Some(init_order.clone()), &ctx, ) { Ok(tenant) => { @@ -153,14 +152,12 @@ pub async fn init_tenant_mgr( Ok(()) } -/// `init_done` is an optional channel used during initial load to delay background task -/// start. It is not used later. pub fn schedule_local_tenant_processing( conf: &'static PageServerConf, tenant_path: &Path, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, - init_done: Option<(completion::Completion, completion::Barrier)>, + init_order: Option, ctx: &RequestContext, ) -> anyhow::Result> { anyhow::ensure!( @@ -219,7 +216,7 @@ pub fn schedule_local_tenant_processing( tenant_id, broker_client, remote_storage, - init_done, + init_order, ctx, ) }; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 7c071463de..6ac4fd9470 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -38,7 +38,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter}; pub use filename::{DeltaFileName, ImageFileName, LayerFileName}; pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; -pub use layer_desc::PersistentLayerDesc; +pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; use super::layer_map::BatchedUpdates; @@ -454,7 +454,9 @@ pub trait PersistentLayer: Layer { /// /// Should not change over the lifetime of the layer object because /// current_physical_size is computed as the som of this value. - fn file_size(&self) -> u64; + fn file_size(&self) -> u64 { + self.layer_desc().file_size + } fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo; @@ -483,6 +485,20 @@ pub struct LayerDescriptor { pub short_id: String, } +impl LayerDescriptor { + /// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, + /// and the tenant / timeline id does not matter. + pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { + PersistentLayerDesc::new_delta( + TenantId::from_array([0; 16]), + TimelineId::from_array([0; 16]), + self.key.clone(), + self.lsn.clone(), + 233, + ) + } +} + impl Layer for LayerDescriptor { fn get_key_range(&self) -> Range { self.key.clone() diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 5f2fb1ebea..624fe8dac4 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -182,8 +182,6 @@ pub struct DeltaLayer { pub desc: PersistentLayerDesc, - pub file_size: u64, - access_stats: LayerAccessStats, inner: RwLock, @@ -196,7 +194,7 @@ impl std::fmt::Debug for DeltaLayer { f.debug_struct("DeltaLayer") .field("key_range", &RangeDisplayDebug(&self.desc.key_range)) .field("lsn_range", &self.desc.lsn_range) - .field("file_size", &self.file_size) + .field("file_size", &self.desc.file_size) .field("inner", &self.inner) .finish() } @@ -439,10 +437,6 @@ impl PersistentLayer for DeltaLayer { Ok(()) } - fn file_size(&self) -> u64 { - self.file_size - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -451,7 +445,7 @@ impl PersistentLayer for DeltaLayer { HistoricLayerInfo::Delta { layer_file_name, - layer_file_size: self.file_size, + layer_file_size: self.desc.file_size, lsn_start: lsn_range.start, lsn_end: lsn_range.end, remote: false, @@ -602,8 +596,8 @@ impl DeltaLayer { timeline_id, filename.key_range.clone(), filename.lsn_range.clone(), + file_size, ), - file_size, access_stats, inner: RwLock::new(DeltaLayerInner { loaded: false, @@ -634,8 +628,8 @@ impl DeltaLayer { summary.timeline_id, summary.key_range, summary.lsn_range, + metadata.len(), ), - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, @@ -803,8 +797,8 @@ impl DeltaLayerWriterInner { self.timeline_id, self.key_start..key_end, self.lsn_range.clone(), + metadata.len(), ), - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(DeltaLayerInner { loaded: false, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index b55dd08a6d..07a16a7de2 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -109,8 +109,6 @@ pub struct ImageLayer { // This entry contains an image of all pages as of this LSN, should be the same as desc.lsn pub lsn: Lsn, - pub file_size: u64, - access_stats: LayerAccessStats, inner: RwLock, @@ -122,7 +120,7 @@ impl std::fmt::Debug for ImageLayer { f.debug_struct("ImageLayer") .field("key_range", &RangeDisplayDebug(&self.desc.key_range)) - .field("file_size", &self.file_size) + .field("file_size", &self.desc.file_size) .field("lsn", &self.lsn) .field("inner", &self.inner) .finish() @@ -258,17 +256,13 @@ impl PersistentLayer for ImageLayer { Ok(()) } - fn file_size(&self) -> u64 { - self.file_size - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); HistoricLayerInfo::Image { layer_file_name, - layer_file_size: self.file_size, + layer_file_size: self.desc.file_size, lsn_start: lsn_range.start, remote: false, access_stats: self.access_stats.as_api_model(reset), @@ -411,9 +405,9 @@ impl ImageLayer { filename.key_range.clone(), filename.lsn, false, + file_size, ), // Now we assume image layer ALWAYS covers the full range. This may change in the future. lsn: filename.lsn, - file_size, access_stats, inner: RwLock::new(ImageLayerInner { loaded: false, @@ -443,9 +437,9 @@ impl ImageLayer { summary.key_range, summary.lsn, false, + metadata.len(), ), // Now we assume image layer ALWAYS covers the full range. This may change in the future. lsn: summary.lsn, - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { file: None, @@ -578,14 +572,6 @@ impl ImageLayerWriterInner { file.write_all(buf.as_ref())?; } - let desc = PersistentLayerDesc::new_img( - self.tenant_id, - self.timeline_id, - self.key_range.clone(), - self.lsn, - self.is_incremental, // for now, image layer ALWAYS covers the full range - ); - // Fill in the summary on blk 0 let summary = Summary { magic: IMAGE_FILE_MAGIC, @@ -604,6 +590,15 @@ impl ImageLayerWriterInner { .metadata() .context("get metadata to determine file size")?; + let desc = PersistentLayerDesc::new_img( + self.tenant_id, + self.timeline_id, + self.key_range.clone(), + self.lsn, + self.is_incremental, // for now, image layer ALWAYS covers the full range + metadata.len(), + ); + // Note: Because we open the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't // set inner.file here. The first read will have to re-open it. @@ -611,7 +606,6 @@ impl ImageLayerWriterInner { path_or_conf: PathOrConf::Conf(self.conf), desc, lsn: self.lsn, - file_size: metadata.len(), access_stats: LayerAccessStats::empty_will_record_residence_event_later(), inner: RwLock::new(ImageLayerInner { loaded: false, diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index a9859681d3..d1cef70253 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -1,10 +1,11 @@ +use anyhow::Result; use std::ops::Range; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, }; -use crate::repository::Key; +use crate::{context::RequestContext, repository::Key}; use super::{DeltaFileName, ImageFileName, LayerFileName}; @@ -24,9 +25,27 @@ pub struct PersistentLayerDesc { /// always be equal to `is_delta`. If we land the partial image layer PR someday, image layer could also be /// incremental. pub is_incremental: bool, + /// File size + pub file_size: u64, +} + +/// A unique identifier of a persistent layer within the context of one timeline. +#[derive(Debug, PartialEq, Eq, Clone, Hash)] +pub struct PersistentLayerKey { + pub key_range: Range, + pub lsn_range: Range, + pub is_delta: bool, } impl PersistentLayerDesc { + pub fn key(&self) -> PersistentLayerKey { + PersistentLayerKey { + key_range: self.key_range.clone(), + lsn_range: self.lsn_range.clone(), + is_delta: self.is_delta, + } + } + pub fn short_id(&self) -> String { self.filename().file_name() } @@ -37,6 +56,7 @@ impl PersistentLayerDesc { key_range: Range, lsn: Lsn, is_incremental: bool, + file_size: u64, ) -> Self { Self { tenant_id, @@ -45,6 +65,7 @@ impl PersistentLayerDesc { lsn_range: Self::image_layer_lsn_range(lsn), is_delta: false, is_incremental, + file_size, } } @@ -53,6 +74,7 @@ impl PersistentLayerDesc { timeline_id: TimelineId, key_range: Range, lsn_range: Range, + file_size: u64, ) -> Self { Self { tenant_id, @@ -61,6 +83,7 @@ impl PersistentLayerDesc { lsn_range, is_delta: true, is_incremental: true, + file_size, } } @@ -106,4 +129,48 @@ impl PersistentLayerDesc { self.image_file_name().into() } } + + // TODO: remove this in the future once we refactor timeline APIs. + + pub fn get_lsn_range(&self) -> Range { + self.lsn_range.clone() + } + + pub fn get_key_range(&self) -> Range { + self.key_range.clone() + } + + pub fn get_timeline_id(&self) -> TimelineId { + self.timeline_id + } + + pub fn get_tenant_id(&self) -> TenantId { + self.tenant_id + } + + pub fn is_incremental(&self) -> bool { + self.is_incremental + } + + pub fn is_delta(&self) -> bool { + self.is_delta + } + + pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { + println!( + "----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----", + self.tenant_id, + self.timeline_id, + self.key_range.start, + self.key_range.end, + self.lsn_range.start, + self.lsn_range.end + ); + + Ok(()) + } + + pub fn file_size(&self) -> u64 { + self.file_size + } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index ff0f44da92..387bae5b1f 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -142,10 +142,6 @@ impl PersistentLayer for RemoteLayer { true } - fn file_size(&self) -> u64 { - self.layer_metadata.file_size() - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo { let layer_file_name = self.filename().file_name(); let lsn_range = self.get_lsn_range(); @@ -190,6 +186,7 @@ impl RemoteLayer { fname.key_range.clone(), fname.lsn, false, + layer_metadata.file_size(), ), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), @@ -211,6 +208,7 @@ impl RemoteLayer { timelineid, fname.key_range.clone(), fname.lsn_range.clone(), + layer_metadata.file_size(), ), layer_metadata: layer_metadata.clone(), ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)), diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 1bbc1b1c08..360818b5a7 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -15,10 +15,10 @@ use tracing::*; use utils::completion; /// Start per tenant background loops: compaction and gc. -/// -/// `init_done` is an optional channel used during initial load to delay background task -/// start. It is not used later. -pub fn start_background_loops(tenant: &Arc, init_done: Option<&completion::Barrier>) { +pub fn start_background_loops( + tenant: &Arc, + background_jobs_can_start: Option<&completion::Barrier>, +) { let tenant_id = tenant.tenant_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -29,10 +29,14 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio false, { let tenant = Arc::clone(tenant); - let init_done = init_done.cloned(); + let background_jobs_can_start = background_jobs_can_start.cloned(); async move { - completion::Barrier::maybe_wait(init_done).await; - compaction_loop(tenant) + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()) }, + _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + }; + compaction_loop(tenant, cancel) .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) .await; Ok(()) @@ -48,10 +52,14 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio false, { let tenant = Arc::clone(tenant); - let init_done = init_done.cloned(); + let background_jobs_can_start = background_jobs_can_start.cloned(); async move { - completion::Barrier::maybe_wait(init_done).await; - gc_loop(tenant) + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()) }, + _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + }; + gc_loop(tenant, cancel) .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) .await; Ok(()) @@ -63,12 +71,11 @@ pub fn start_background_loops(tenant: &Arc, init_done: Option<&completio /// /// Compaction task's main loop /// -async fn compaction_loop(tenant: Arc) { +async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { - let cancel = task_mgr::shutdown_token(); let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download); let mut first = true; loop { @@ -133,12 +140,11 @@ async fn compaction_loop(tenant: Arc) { /// /// GC task's main loop /// -async fn gc_loop(tenant: Arc) { +async fn gc_loop(tenant: Arc, cancel: CancellationToken) { let wait_duration = Duration::from_secs(2); info!("starting"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { - let cancel = task_mgr::shutdown_token(); // GC might require downloading, to find the cutoff LSN that corresponds to the // cutoff specified as time. let ctx = diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fdaad58e16..2a50a26a23 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -242,6 +242,13 @@ pub struct Timeline { pub delete_lock: tokio::sync::Mutex, eviction_task_timeline_state: tokio::sync::Mutex, + + /// Barrier to wait before doing initial logical size calculation. Used only during startup. + initial_logical_size_can_start: Option, + + /// Completion shared between all timelines loaded during startup; used to delay heavier + /// background tasks until some logical sizes have been calculated. + initial_logical_size_attempt: Mutex>, } /// Internal structure to hold all data needed for logical size calculation. @@ -932,12 +939,12 @@ impl Timeline { pub fn activate( self: &Arc, broker_client: BrokerClientChannel, - init_done: Option<&completion::Barrier>, + background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { self.launch_wal_receiver(ctx, broker_client); self.set_state(TimelineState::Active); - self.launch_eviction_task(init_done); + self.launch_eviction_task(background_jobs_can_start); } pub fn set_state(&self, new_state: TimelineState) { @@ -955,6 +962,14 @@ impl Timeline { error!("Not activating a Stopping timeline"); } (_, new_state) => { + if matches!(new_state, TimelineState::Stopping | TimelineState::Broken) { + // drop the copmletion guard, if any; it might be holding off the completion + // forever needlessly + self.initial_logical_size_attempt + .lock() + .unwrap_or_else(|e| e.into_inner()) + .take(); + } self.state.send_replace(new_state); } } @@ -1196,7 +1211,12 @@ impl Timeline { ), }); - let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? { + let replaced = match batch_updates.replace_historic( + local_layer.layer_desc().clone(), + local_layer, + new_remote_layer.layer_desc().clone(), + new_remote_layer, + )? { Replacement::Replaced { .. } => { if let Err(e) = local_layer.delete_resident_layer_file() { error!("failed to remove layer file on evict after replacement: {e:#?}"); @@ -1345,6 +1365,8 @@ impl Timeline { walredo_mgr: Arc, remote_client: Option, pg_version: u32, + initial_logical_size_can_start: Option, + initial_logical_size_attempt: Option, ) -> Arc { let disk_consistent_lsn = metadata.disk_consistent_lsn(); let (state, _) = watch::channel(TimelineState::Loading); @@ -1439,6 +1461,9 @@ impl Timeline { EvictionTaskTimelineState::default(), ), delete_lock: tokio::sync::Mutex::new(false), + + initial_logical_size_can_start, + initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt), }; result.repartition_threshold = result.get_checkpoint_distance() / 10; result @@ -1587,7 +1612,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1619,7 +1644,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1718,7 +1743,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer); + updates.remove_historic(local_layer.layer_desc().clone(), local_layer); // fall-through to adding the remote layer } } else { @@ -1757,7 +1782,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1784,7 +1809,7 @@ impl Timeline { ), ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); } } } @@ -1927,7 +1952,27 @@ impl Timeline { false, // NB: don't log errors here, task_mgr will do that. async move { - // no cancellation here, because nothing really waits for this to complete compared + + let cancel = task_mgr::shutdown_token(); + + // in case we were created during pageserver initialization, wait for + // initialization to complete before proceeding. startup time init runs on the same + // runtime. + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); }, + _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {} + }; + + // hold off background tasks from starting until all timelines get to try at least + // once initial logical size calculation; though retry will rarely be useful. + // holding off is done because heavier tasks execute blockingly on the same + // runtime. + // + // dropping this at every outcome is probably better than trying to cling on to it, + // delay will be terminated by a timeout regardless. + let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() }; + + // no extra cancellation here, because nothing really waits for this to complete compared // to spawn_ondemand_logical_size_calculation. let cancel = CancellationToken::new(); @@ -2212,7 +2257,7 @@ impl Timeline { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer); + updates.remove_historic(layer.layer_desc().clone(), layer); Ok(()) } @@ -2922,7 +2967,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - batch_updates.insert_historic(l); + batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.flush(); // update the timeline's physical size @@ -3170,7 +3215,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(l); + updates.insert_historic(l.layer_desc().clone(), l); } updates.flush(); drop(layers); @@ -3617,7 +3662,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(x); + updates.insert_historic(x.layer_desc().clone(), x); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -4152,7 +4197,7 @@ impl Timeline { { use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); - let failure = match updates.replace_historic(&l, new_layer) { + let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) { Ok(Replacement::Replaced { .. }) => false, Ok(Replacement::NotFound) => { // TODO: the downloaded file should probably be removed, otherwise diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 7029d75d63..1040dff63d 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -49,9 +49,12 @@ pub struct EvictionTaskTenantState { } impl Timeline { - pub(super) fn launch_eviction_task(self: &Arc, init_done: Option<&completion::Barrier>) { + pub(super) fn launch_eviction_task( + self: &Arc, + background_tasks_can_start: Option<&completion::Barrier>, + ) { let self_clone = Arc::clone(self); - let init_done = init_done.cloned(); + let background_tasks_can_start = background_tasks_can_start.cloned(); task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Eviction, @@ -60,8 +63,13 @@ impl Timeline { &format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id), false, async move { - completion::Barrier::maybe_wait(init_done).await; - self_clone.eviction_task(task_mgr::shutdown_token()).await; + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()); } + _ = completion::Barrier::maybe_wait(background_tasks_can_start) => {} + }; + + self_clone.eviction_task(cancel).await; info!("eviction task finishing"); Ok(()) }, diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index a99be40955..64d980d2e4 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -254,20 +254,20 @@ nwp_register_gucs(void) DefineCustomIntVariable( "neon.safekeeper_reconnect_timeout", - "Timeout for reconnecting to offline wal acceptor.", + "Walproposer reconnects to offline safekeepers once in this interval.", NULL, &wal_acceptor_reconnect_timeout, - 1000, 0, INT_MAX, /* default, min, max */ + 5000, 0, INT_MAX, /* default, min, max */ PGC_SIGHUP, /* context */ GUC_UNIT_MS, /* flags */ NULL, NULL, NULL); DefineCustomIntVariable( "neon.safekeeper_connect_timeout", - "Timeout for connection establishement and it's maintenance against safekeeper", + "Connection or connection attempt to safekeeper is terminated if no message is received (or connection attempt doesn't finish) within this period.", NULL, &wal_acceptor_connection_timeout, - 5000, 0, INT_MAX, + 10000, 0, INT_MAX, PGC_SIGHUP, GUC_UNIT_MS, NULL, NULL, NULL); @@ -441,7 +441,7 @@ WalProposerPoll(void) if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now, wal_acceptor_connection_timeout)) { - elog(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms", + elog(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that", sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout); ShutdownConnection(sk); } @@ -1035,9 +1035,16 @@ RecvAcceptorGreeting(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->greetResponse)) return; + elog(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port); + /* Protocol is all good, move to voting. */ sk->state = SS_VOTING; + /* + * Note: it would be better to track the counter on per safekeeper basis, + * but at worst walproposer would restart with 'term rejected', so leave as + * is for now. + */ ++n_connected; if (n_connected <= quorum) { diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index ba5e453e41..b1b0c032d7 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -7,6 +7,7 @@ use std::fs::{self, File, OpenOptions}; use std::io::{Read, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; +use std::time::Instant; use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; @@ -28,6 +29,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); pub trait Storage: Deref { /// Persist safekeeper state on disk and update internal state. fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + + /// Timestamp of last persist. + fn last_persist_at(&self) -> Instant; } #[derive(Debug)] @@ -38,6 +42,8 @@ pub struct FileStorage { /// Last state persisted to disk. state: SafeKeeperState, + /// Not preserved across restarts. + last_persist_at: Instant, } impl FileStorage { @@ -51,6 +57,7 @@ impl FileStorage { timeline_dir, conf: conf.clone(), state, + last_persist_at: Instant::now(), }) } @@ -66,6 +73,7 @@ impl FileStorage { timeline_dir, conf: conf.clone(), state, + last_persist_at: Instant::now(), }; Ok(store) @@ -216,6 +224,10 @@ impl Storage for FileStorage { self.state = s.clone(); Ok(()) } + + fn last_persist_at(&self) -> Instant { + self.last_persist_at + } } #[cfg(test)] diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index b6d497f34e..ad9d655fae 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -17,6 +17,9 @@ pub fn thread_main(conf: SafeKeeperConf) { let ttid = tli.ttid; let _enter = info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); + if let Err(e) = tli.maybe_pesist_control_file() { + warn!("failed to persist control file: {e}"); + } if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { warn!("failed to remove WAL: {}", e); } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 33da0c8e5a..7378ccb994 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -10,6 +10,7 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; +use std::time::Duration; use storage_broker::proto::SafekeeperTimelineInfo; use tracing::*; @@ -634,7 +635,8 @@ where } // system_id will be updated on mismatch - if self.state.server.system_id != msg.system_id { + // sync-safekeepers doesn't know sysid and sends 0, ignore it + if self.state.server.system_id != msg.system_id && msg.system_id != 0 { if self.state.server.system_id != 0 { warn!( "unexpected system ID arrived, got {}, expected {}", @@ -836,6 +838,26 @@ where self.state.persist(&state) } + /// Persist control file if there is something to save and enough time + /// passed after the last save. + pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { + const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); + if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL { + return Ok(()); + } + let need_persist = self.inmem.commit_lsn > self.state.commit_lsn + || self.inmem.backup_lsn > self.state.backup_lsn + || self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn + || inmem_remote_consistent_lsn > self.state.remote_consistent_lsn; + if need_persist { + let mut state = self.state.clone(); + state.remote_consistent_lsn = inmem_remote_consistent_lsn; + self.persist_control_file(state)?; + trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); + } + Ok(()) + } + /// Handle request to append WAL. #[allow(clippy::comparison_chain)] fn handle_append_request( @@ -948,9 +970,8 @@ where if sync_control_file { let mut state = self.state.clone(); - // Note: we do not persist remote_consistent_lsn in other paths of - // persisting cf -- that is not much needed currently. We could do - // that by storing Arc to walsenders in Safekeeper. + // Note: we could make remote_consistent_lsn update in cf common by + // storing Arc to walsenders in Safekeeper. state.remote_consistent_lsn = new_remote_consistent_lsn; self.persist_control_file(state)?; } @@ -980,7 +1001,7 @@ mod tests { use super::*; use crate::wal_storage::Storage; - use std::ops::Deref; + use std::{ops::Deref, time::Instant}; // fake storage for tests struct InMemoryState { @@ -992,6 +1013,10 @@ mod tests { self.persisted_state = s.clone(); Ok(()) } + + fn last_persist_at(&self) -> Instant { + Instant::now() + } } impl Deref for InMemoryState { diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2dbf215998..941f8dae54 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -234,7 +234,6 @@ impl SharedState { flush_lsn: self.sk.wal_store.flush_lsn().0, // note: this value is not flushed to control file yet and can be lost commit_lsn: self.sk.inmem.commit_lsn.0, - // TODO: rework feedbacks to avoid max here remote_consistent_lsn: remote_consistent_lsn.0, peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0, safekeeper_connstr: conf.listen_pg_addr.clone(), @@ -673,6 +672,17 @@ impl Timeline { Ok(()) } + /// Persist control file if there is something to save and enough time + /// passed after the last save. This helps to keep remote_consistent_lsn up + /// to date so that storage nodes restart doesn't cause many pageserver -> + /// safekeeper reconnections. + pub fn maybe_pesist_control_file(&self) -> Result<()> { + let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn(); + self.write_shared_state() + .sk + .maybe_persist_control_file(remote_consistent_lsn) + } + /// Returns full timeline info, required for the metrics. If the timeline is /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 1b82bd754e..644c956fc1 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -379,6 +379,12 @@ impl Storage for PhysicalStorage { ); } + // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on + // disk (this happens on each connect). + if end_pos == self.write_lsn { + return Ok(()); + } + // Close previously opened file, if any if let Some(mut unflushed_file) = self.file.take() { self.fdatasync_file(&mut unflushed_file)?; diff --git a/test_runner/fixtures/pageserver/utils.py b/test_runner/fixtures/pageserver/utils.py index c558387413..d7ffa633fd 100644 --- a/test_runner/fixtures/pageserver/utils.py +++ b/test_runner/fixtures/pageserver/utils.py @@ -1,5 +1,5 @@ import time -from typing import Optional +from typing import Any, Dict, Optional from fixtures.log_helper import log from fixtures.pageserver.http import PageserverHttpClient @@ -72,7 +72,7 @@ def wait_until_tenant_state( expected_state: str, iterations: int, period: float = 1.0, -) -> bool: +) -> Dict[str, Any]: """ Does not use `wait_until` for debugging purposes """ @@ -81,7 +81,7 @@ def wait_until_tenant_state( tenant = pageserver_http.tenant_status(tenant_id=tenant_id) log.debug(f"Tenant {tenant_id} data: {tenant}") if tenant["state"]["slug"] == expected_state: - return True + return tenant except Exception as e: log.debug(f"Tenant {tenant_id} state retrieval failure: {e}") diff --git a/test_runner/regress/test_disk_usage_eviction.py b/test_runner/regress/test_disk_usage_eviction.py index ab67518092..0ec023b9e1 100644 --- a/test_runner/regress/test_disk_usage_eviction.py +++ b/test_runner/regress/test_disk_usage_eviction.py @@ -110,6 +110,12 @@ class EvictionEnv: overrides=( "--pageserver-config-override=disk_usage_based_eviction=" + enc.dump_inline_table(disk_usage_config).replace("\n", " "), + # Disk usage based eviction runs as a background task. + # But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup. + # But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages. + # But, we only have a 10-second-timeout in this test. + # So, disable the delay for this test. + "--pageserver-config-override=background_task_maximum_delay='0s'", ), ) diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index baef8ecacc..742dbfff95 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -147,7 +147,12 @@ def test_remote_storage_backup_and_restore( # listing the remote timelines will fail because of the failpoint, # and the tenant will be marked as Broken. client.tenant_attach(tenant_id) - wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15) + + tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15) + assert tenant_info["attachment_status"] == { + "slug": "failed", + "data": {"reason": "storage-sync-list-remote-timelines"}, + } # Ensure that even though the tenant is broken, we can't attach it again. with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"): diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 9d0fdcfaf8..2a015d5d17 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -532,7 +532,7 @@ def test_ignored_tenant_reattach( ): neon_env_builder.enable_remote_storage( remote_storage_kind=remote_storage_kind, - test_name="test_remote_storage_backup_and_restore", + test_name="test_ignored_tenant_reattach", ) env = neon_env_builder.init_start() pageserver_http = env.pageserver.http_client()