diff --git a/.github/actionlint.yml b/.github/actionlint.yml index 942861ecd8..37983798b7 100644 --- a/.github/actionlint.yml +++ b/.github/actionlint.yml @@ -5,6 +5,7 @@ self-hosted-runner: - large - large-arm64 - small + - small-arm64 - us-east-2 config-variables: - REMOTE_STORAGE_AZURE_CONTAINER diff --git a/.github/workflows/neon_extra_builds.yml b/.github/workflows/neon_extra_builds.yml index fdb03963fb..7d2187e59c 100644 --- a/.github/workflows/neon_extra_builds.yml +++ b/.github/workflows/neon_extra_builds.yml @@ -136,7 +136,7 @@ jobs: check-linux-arm-build: needs: [ check-permissions, build-build-tools-image ] timeout-minutes: 90 - runs-on: [ self-hosted, large-arm64 ] + runs-on: [ self-hosted, small-arm64 ] env: # Use release build only, to have less debug info around @@ -260,7 +260,7 @@ jobs: check-codestyle-rust-arm: needs: [ check-permissions, build-build-tools-image ] timeout-minutes: 90 - runs-on: [ self-hosted, large-arm64 ] + runs-on: [ self-hosted, small-arm64 ] container: image: ${{ needs.build-build-tools-image.outputs.image }} diff --git a/Cargo.lock b/Cargo.lock index 6ce7180d67..961101b151 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5952,7 +5952,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "svg_fmt" version = "0.4.2" -source = "git+https://github.com/neondatabase/fork--nical--rust_debug?branch=neon#c1820b28664b5df68de7f043fccf2ed5d67b6ae8" +source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" [[package]] name = "syn" diff --git a/Cargo.toml b/Cargo.toml index 17f30a1327..3ccdabee18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,8 +158,8 @@ socket2 = "0.5" strum = "0.24" strum_macros = "0.24" "subtle" = "2.5.0" -# https://github.com/nical/rust_debug/pull/4 -svg_fmt = { git = "https://github.com/neondatabase/fork--nical--rust_debug", branch = "neon" } +# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet +svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" } sync_wrapper = "0.1.2" tar = "0.4" task-local-extensions = "0.1.4" diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index cde9ac43bd..d52fb5e93d 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -745,6 +745,16 @@ impl HistoricLayerInfo { }; *field = value; } + pub fn layer_file_size(&self) -> u64 { + match self { + HistoricLayerInfo::Delta { + layer_file_size, .. + } => *layer_file_size, + HistoricLayerInfo::Image { + layer_file_size, .. + } => *layer_file_size, + } + } } #[derive(Debug, Serialize, Deserialize)] @@ -776,9 +786,6 @@ pub struct TimelineGcRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalRedoManagerProcessStatus { pub pid: u32, - /// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`. - /// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`. - pub kind: Cow<'static, str>, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 5b871c5d5e..5aab10e5d9 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -30,47 +30,27 @@ //! 2024-04-15 on i3en.3xlarge //! //! ```text -//! async-short/1 time: [24.584 µs 24.737 µs 24.922 µs] -//! async-short/2 time: [33.479 µs 33.660 µs 33.888 µs] -//! async-short/4 time: [42.713 µs 43.046 µs 43.440 µs] -//! async-short/8 time: [71.814 µs 72.478 µs 73.240 µs] -//! async-short/16 time: [132.73 µs 134.45 µs 136.22 µs] -//! async-short/32 time: [258.31 µs 260.73 µs 263.27 µs] -//! async-short/64 time: [511.61 µs 514.44 µs 517.51 µs] -//! async-short/128 time: [992.64 µs 998.23 µs 1.0042 ms] -//! async-medium/1 time: [110.11 µs 110.50 µs 110.96 µs] -//! async-medium/2 time: [153.06 µs 153.85 µs 154.99 µs] -//! async-medium/4 time: [317.51 µs 319.92 µs 322.85 µs] -//! async-medium/8 time: [638.30 µs 644.68 µs 652.12 µs] -//! async-medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] -//! async-medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] -//! async-medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] -//! async-medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] -//! sync-short/1 time: [25.503 µs 25.626 µs 25.771 µs] -//! sync-short/2 time: [30.850 µs 31.013 µs 31.208 µs] -//! sync-short/4 time: [45.543 µs 45.856 µs 46.193 µs] -//! sync-short/8 time: [84.114 µs 84.639 µs 85.220 µs] -//! sync-short/16 time: [185.22 µs 186.15 µs 187.13 µs] -//! sync-short/32 time: [377.43 µs 378.87 µs 380.46 µs] -//! sync-short/64 time: [756.49 µs 759.04 µs 761.70 µs] -//! sync-short/128 time: [1.4825 ms 1.4874 ms 1.4923 ms] -//! sync-medium/1 time: [105.66 µs 106.01 µs 106.43 µs] -//! sync-medium/2 time: [153.10 µs 153.84 µs 154.72 µs] -//! sync-medium/4 time: [327.13 µs 329.44 µs 332.27 µs] -//! sync-medium/8 time: [654.26 µs 658.73 µs 663.63 µs] -//! sync-medium/16 time: [1.2682 ms 1.2748 ms 1.2816 ms] -//! sync-medium/32 time: [2.4456 ms 2.4595 ms 2.4731 ms] -//! sync-medium/64 time: [4.6523 ms 4.6890 ms 4.7256 ms] -//! sync-medium/128 time: [8.7215 ms 8.8323 ms 8.9344 ms] +//! short/1 time: [24.584 µs 24.737 µs 24.922 µs] +//! short/2 time: [33.479 µs 33.660 µs 33.888 µs] +//! short/4 time: [42.713 µs 43.046 µs 43.440 µs] +//! short/8 time: [71.814 µs 72.478 µs 73.240 µs] +//! short/16 time: [132.73 µs 134.45 µs 136.22 µs] +//! short/32 time: [258.31 µs 260.73 µs 263.27 µs] +//! short/64 time: [511.61 µs 514.44 µs 517.51 µs] +//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms] +//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs] +//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs] +//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs] +//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs] +//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] +//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] +//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] +//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] //! ``` use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; -use pageserver::{ - config::PageServerConf, - walrecord::NeonWalRecord, - walredo::{PostgresRedoManager, ProcessKind}, -}; +use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager}; use pageserver_api::{key::Key, shard::TenantShardId}; use std::{ sync::Arc, @@ -80,39 +60,32 @@ use tokio::{sync::Barrier, task::JoinSet}; use utils::{id::TenantId, lsn::Lsn}; fn bench(c: &mut Criterion) { - for process_kind in &[ProcessKind::Async, ProcessKind::Sync] { - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group(format!("{process_kind}-short")); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::short_input()); - b.iter_custom(|iters| { - bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients) - }); - }, - ); - } + { + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group("short"); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + let redo_work = Arc::new(Request::short_input()); + b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); + }, + ); } - - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group(format!("{process_kind}-medium")); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::medium_input()); - b.iter_custom(|iters| { - bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients) - }); - }, - ); - } + } + { + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group("medium"); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + let redo_work = Arc::new(Request::medium_input()); + b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); + }, + ); } } } @@ -120,16 +93,10 @@ criterion::criterion_group!(benches, bench); criterion::criterion_main!(benches); // Returns the sum of each client's wall-clock time spent executing their share of the n_redos. -fn bench_impl( - process_kind: ProcessKind, - redo_work: Arc, - n_redos: u64, - nclients: u64, -) -> Duration { +fn bench_impl(redo_work: Arc, n_redos: u64, nclients: u64) -> Duration { let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap(); - let mut conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); - conf.walredo_process_kind = process_kind; + let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); @@ -158,27 +125,13 @@ fn bench_impl( }); } - let elapsed = rt.block_on(async move { + rt.block_on(async move { let mut total_wallclock_time = Duration::ZERO; while let Some(res) = tasks.join_next().await { total_wallclock_time += res.unwrap(); } total_wallclock_time - }); - - // consistency check to ensure process kind setting worked - if nredos_per_client > 0 { - assert_eq!( - manager - .status() - .process - .map(|p| p.kind) - .expect("the benchmark work causes a walredo process to be spawned"), - std::borrow::Cow::Borrowed(process_kind.into()) - ); - } - - elapsed + }) } async fn client( diff --git a/pageserver/ctl/src/draw_timeline_dir.rs b/pageserver/ctl/src/draw_timeline_dir.rs index 4dff8af1fc..389519c65a 100644 --- a/pageserver/ctl/src/draw_timeline_dir.rs +++ b/pageserver/ctl/src/draw_timeline_dir.rs @@ -52,7 +52,6 @@ use anyhow::{Context, Result}; use pageserver::repository::Key; -use pageserver::METADATA_FILE_NAME; use std::cmp::Ordering; use std::io::{self, BufRead}; use std::path::PathBuf; @@ -159,10 +158,6 @@ pub fn main() -> Result<()> { let line = PathBuf::from_str(&line).unwrap(); let filename = line.file_name().unwrap(); let filename = filename.to_str().unwrap(); - if filename == METADATA_FILE_NAME { - // Don't try and parse "metadata" like a key-lsn range - continue; - } let (key_range, lsn_range) = parse_filename(filename); files.push(Layer { filename: filename.to_owned(), diff --git a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs index 197e782dca..1bb71b9353 100644 --- a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs +++ b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs @@ -2,9 +2,11 @@ use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId}; use pageserver_client::mgmt_api; use rand::seq::SliceRandom; +use tokio_util::sync::CancellationToken; use tracing::{debug, info}; use utils::id::{TenantTimelineId, TimelineId}; +use std::{f64, sync::Arc}; use tokio::{ sync::{mpsc, OwnedSemaphorePermit}, task::JoinSet, @@ -12,10 +14,7 @@ use tokio::{ use std::{ num::NonZeroUsize, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant}, }; @@ -51,19 +50,31 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> { Ok(()) } +#[derive(serde::Serialize)] +struct Output { + downloads_count: u64, + downloads_bytes: u64, + evictions_count: u64, + timeline_restarts: u64, + #[serde(with = "humantime_serde")] + runtime: Duration, +} + #[derive(Debug, Default)] struct LiveStats { - evictions: AtomicU64, - downloads: AtomicU64, + evictions_count: AtomicU64, + downloads_count: AtomicU64, + downloads_bytes: AtomicU64, timeline_restarts: AtomicU64, } impl LiveStats { fn eviction_done(&self) { - self.evictions.fetch_add(1, Ordering::Relaxed); + self.evictions_count.fetch_add(1, Ordering::Relaxed); } - fn download_done(&self) { - self.downloads.fetch_add(1, Ordering::Relaxed); + fn download_done(&self, size: u64) { + self.downloads_count.fetch_add(1, Ordering::Relaxed); + self.downloads_bytes.fetch_add(size, Ordering::Relaxed); } fn timeline_restart_done(&self) { self.timeline_restarts.fetch_add(1, Ordering::Relaxed); @@ -92,28 +103,49 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { ) .await?; + let token = CancellationToken::new(); let mut tasks = JoinSet::new(); - let live_stats = Arc::new(LiveStats::default()); + let periodic_stats = Arc::new(LiveStats::default()); + let total_stats = Arc::new(LiveStats::default()); + + let start = Instant::now(); tasks.spawn({ - let live_stats = Arc::clone(&live_stats); + let periodic_stats = Arc::clone(&periodic_stats); + let total_stats = Arc::clone(&total_stats); + let cloned_token = token.clone(); async move { let mut last_at = Instant::now(); loop { + if cloned_token.is_cancelled() { + return; + } tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await; let now = Instant::now(); let delta: Duration = now - last_at; last_at = now; let LiveStats { - evictions, - downloads, + evictions_count, + downloads_count, + downloads_bytes, timeline_restarts, - } = &*live_stats; - let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64(); - let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64(); + } = &*periodic_stats; + let evictions_count = evictions_count.swap(0, Ordering::Relaxed); + let downloads_count = downloads_count.swap(0, Ordering::Relaxed); + let downloads_bytes = downloads_bytes.swap(0, Ordering::Relaxed); let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed); - info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}"); + + total_stats.evictions_count.fetch_add(evictions_count, Ordering::Relaxed); + total_stats.downloads_count.fetch_add(downloads_count, Ordering::Relaxed); + total_stats.downloads_bytes.fetch_add(downloads_bytes, Ordering::Relaxed); + total_stats.timeline_restarts.fetch_add(timeline_restarts, Ordering::Relaxed); + + let evictions_per_s = evictions_count as f64 / delta.as_secs_f64(); + let downloads_per_s = downloads_count as f64 / delta.as_secs_f64(); + let downloads_mibs_per_s = downloads_bytes as f64 / delta.as_secs_f64() / ((1 << 20) as f64); + + info!("evictions={evictions_per_s:.2}/s downloads={downloads_per_s:.2}/s download_bytes={downloads_mibs_per_s:.2}MiB/s timeline_restarts={timeline_restarts}"); } } }); @@ -124,14 +156,42 @@ async fn main_impl(args: Args) -> anyhow::Result<()> { args, Arc::clone(&mgmt_api_client), tl, - Arc::clone(&live_stats), + Arc::clone(&periodic_stats), + token.clone(), )); } } + if let Some(runtime) = args.runtime { + tokio::spawn(async move { + tokio::time::sleep(runtime.into()).await; + token.cancel(); + }); + } while let Some(res) = tasks.join_next().await { res.unwrap(); } + let end = Instant::now(); + let duration: Duration = end - start; + + let output = { + let LiveStats { + evictions_count, + downloads_count, + downloads_bytes, + timeline_restarts, + } = &*total_stats; + Output { + downloads_count: downloads_count.load(Ordering::Relaxed), + downloads_bytes: downloads_bytes.load(Ordering::Relaxed), + evictions_count: evictions_count.load(Ordering::Relaxed), + timeline_restarts: timeline_restarts.load(Ordering::Relaxed), + runtime: duration, + } + }; + let output = serde_json::to_string_pretty(&output).unwrap(); + println!("{output}"); + Ok(()) } @@ -140,6 +200,7 @@ async fn timeline_actor( mgmt_api_client: Arc, timeline: TenantTimelineId, live_stats: Arc, + token: CancellationToken, ) { // TODO: support sharding let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id); @@ -149,7 +210,7 @@ async fn timeline_actor( layers: Vec>, concurrency: Arc, } - loop { + while !token.is_cancelled() { debug!("restarting timeline"); let layer_map_info = mgmt_api_client .layer_map_info(tenant_shard_id, timeline.timeline_id) @@ -185,7 +246,7 @@ async fn timeline_actor( live_stats.timeline_restart_done(); - loop { + while !token.is_cancelled() { assert!(!timeline.joinset.is_empty()); if let Some(res) = timeline.joinset.try_join_next() { debug!(?res, "a layer actor exited, should not happen"); @@ -255,7 +316,7 @@ async fn layer_actor( .layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name()) .await .unwrap(); - live_stats.download_done(); + live_stats.download_done(layer.layer_file_size()); did_it } }; diff --git a/pageserver/src/aux_file.rs b/pageserver/src/aux_file.rs index e6d950487d..38e1875db1 100644 --- a/pageserver/src/aux_file.rs +++ b/pageserver/src/aux_file.rs @@ -5,14 +5,35 @@ use bytes::{Buf, BufMut, Bytes}; use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE}; use tracing::warn; -/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, first 13B of 128b xxhash]. +// BEGIN Copyright (c) 2017 Servo Contributors + +/// Const version of FNV hash. +#[inline] +#[must_use] +pub const fn fnv_hash(bytes: &[u8]) -> u128 { + const INITIAL_STATE: u128 = 0x6c62272e07bb014262b821756295c58d; + const PRIME: u128 = 0x0000000001000000000000000000013B; + + let mut hash = INITIAL_STATE; + let mut i = 0; + while i < bytes.len() { + hash ^= bytes[i] as u128; + hash = hash.wrapping_mul(PRIME); + i += 1; + } + hash +} + +// END Copyright (c) 2017 Servo Contributors + +/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, least significant 13B of FNV hash]. fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key { - let mut key = [0; METADATA_KEY_SIZE]; - let hash = twox_hash::xxh3::hash128(data).to_be_bytes(); + let mut key: [u8; 16] = [0; METADATA_KEY_SIZE]; + let hash = fnv_hash(data).to_be_bytes(); key[0] = AUX_KEY_PREFIX; key[1] = dir_level1; key[2] = dir_level2; - key[3..16].copy_from_slice(&hash[0..13]); + key[3..16].copy_from_slice(&hash[3..16]); Key::from_metadata_key_fixed_size(&key) } @@ -200,15 +221,19 @@ mod tests { fn test_hash_portable() { // AUX file encoding requires the hash to be portable across all platforms. This test case checks // if the algorithm produces the same hash across different environments. + assert_eq!( - 305317690835051308206966631765527126151, - twox_hash::xxh3::hash128("test1".as_bytes()) + 265160408618497461376862998434862070044, + super::fnv_hash("test1".as_bytes()) ); assert_eq!( - 85104974691013376326742244813280798847, - twox_hash::xxh3::hash128("test/test2".as_bytes()) + 295486155126299629456360817749600553988, + super::fnv_hash("test/test2".as_bytes()) + ); + assert_eq!( + 144066263297769815596495629667062367629, + super::fnv_hash("".as_bytes()) ); - assert_eq!(0, twox_hash::xxh3::hash128("".as_bytes())); } #[test] @@ -216,28 +241,28 @@ mod tests { // To correct retrieve AUX files, the generated keys for the same file must be the same for all versions // of the page server. assert_eq!( - "6200000101E5B20C5F8DD5AA3289D6D9EAFA", - encode_aux_file_key("pg_logical/mappings/test1").to_string() + "62000001017F8B83D94F7081693471ABF91C", + encode_aux_file_key("pg_logical/mappings/test1").to_string(), ); assert_eq!( - "620000010239AAC544893139B26F501B97E6", - encode_aux_file_key("pg_logical/snapshots/test2").to_string() + "62000001027F8E83D94F7081693471ABFCCD", + encode_aux_file_key("pg_logical/snapshots/test2").to_string(), ); assert_eq!( - "620000010300000000000000000000000000", - encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string() + "62000001032E07BB014262B821756295C58D", + encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string(), ); assert_eq!( - "62000001FF8635AF2134B7266EC5B4189FD6", - encode_aux_file_key("pg_logical/unsupported").to_string() + "62000001FF4F38E1C74754E7D03C1A660178", + encode_aux_file_key("pg_logical/unsupported").to_string(), ); assert_eq!( - "6200000201772D0E5D71DE14DA86142A1619", + "62000002017F8D83D94F7081693471ABFB92", encode_aux_file_key("pg_replslot/test3").to_string() ); assert_eq!( - "620000FFFF1866EBEB53B807B26A2416F317", - encode_aux_file_key("other_file_not_supported").to_string() + "620000FFFF2B6ECC8AEF93F643DC44F15E03", + encode_aux_file_key("other_file_not_supported").to_string(), ); } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 49f8a41b37..ba5b2608bd 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -284,7 +284,6 @@ fn start_pageserver( )) .unwrap(); pageserver::preinitialize_metrics(); - pageserver::metrics::wal_redo::set_process_kind_metric(conf.walredo_process_kind); // If any failpoints were set from FAILPOINTS environment variable, // print them to the log for debugging purposes @@ -383,7 +382,7 @@ fn start_pageserver( let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); // Set up remote storage client - let remote_storage = Some(create_remote_storage_client(conf)?); + let remote_storage = create_remote_storage_client(conf)?; // Set up deletion queue let (deletion_queue, deletion_workers) = DeletionQueue::new( @@ -516,16 +515,12 @@ fn start_pageserver( } }); - let secondary_controller = if let Some(remote_storage) = &remote_storage { - secondary::spawn_tasks( - tenant_manager.clone(), - remote_storage.clone(), - background_jobs_barrier.clone(), - shutdown_pageserver.clone(), - ) - } else { - secondary::null_controller() - }; + let secondary_controller = secondary::spawn_tasks( + tenant_manager.clone(), + remote_storage.clone(), + background_jobs_barrier.clone(), + shutdown_pageserver.clone(), + ); // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint @@ -533,15 +528,13 @@ fn start_pageserver( // been configured. let disk_usage_eviction_state: Arc = Arc::default(); - if let Some(remote_storage) = &remote_storage { - launch_disk_usage_global_eviction_task( - conf, - remote_storage.clone(), - disk_usage_eviction_state.clone(), - tenant_manager.clone(), - background_jobs_barrier.clone(), - )?; - } + launch_disk_usage_global_eviction_task( + conf, + remote_storage.clone(), + disk_usage_eviction_state.clone(), + tenant_manager.clone(), + background_jobs_barrier.clone(), + )?; // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. @@ -654,17 +647,20 @@ fn start_pageserver( None, "libpq endpoint listener", true, - async move { - page_service::libpq_listener_main( - conf, - broker_client, - pg_auth, - pageserver_listener, - conf.pg_auth_type, - libpq_ctx, - task_mgr::shutdown_token(), - ) - .await + { + let tenant_manager = tenant_manager.clone(); + async move { + page_service::libpq_listener_main( + tenant_manager, + broker_client, + pg_auth, + pageserver_listener, + conf.pg_auth_type, + libpq_ctx, + task_mgr::shutdown_token(), + ) + .await + } }, ); } @@ -693,14 +689,7 @@ fn start_pageserver( // Right now that tree doesn't reach very far, and `task_mgr` is used instead. // The plan is to change that over time. shutdown_pageserver.take(); - let bg_remote_storage = remote_storage.clone(); - let bg_deletion_queue = deletion_queue.clone(); - pageserver::shutdown_pageserver( - &tenant_manager, - bg_remote_storage.map(|_| bg_deletion_queue), - 0, - ) - .await; + pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await; unreachable!() }) } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 258eed0b12..b0afb6414b 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -99,7 +99,7 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; - pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "sync"; + pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "async"; /// /// Default built-in configuration file. diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index c937309d83..8790a9b0a8 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -632,7 +632,7 @@ impl DeletionQueue { /// /// If remote_storage is None, then the returned workers will also be None. pub fn new( - remote_storage: Option, + remote_storage: GenericRemoteStorage, control_plane_client: Option, conf: &'static PageServerConf, ) -> (Self, Option>) @@ -658,23 +658,6 @@ impl DeletionQueue { // longer to flush after Tenants have all been torn down. let cancel = CancellationToken::new(); - let remote_storage = match remote_storage { - None => { - return ( - Self { - client: DeletionQueueClient { - tx, - executor_tx, - lsn_table: lsn_table.clone(), - }, - cancel, - }, - None, - ) - } - Some(r) => r, - }; - ( Self { client: DeletionQueueClient { @@ -765,7 +748,7 @@ mod test { /// Simulate a pageserver restart by destroying and recreating the deletion queue async fn restart(&mut self) { let (deletion_queue, workers) = DeletionQueue::new( - Some(self.storage.clone()), + self.storage.clone(), Some(self.mock_control_plane.clone()), self.harness.conf, ); @@ -875,7 +858,7 @@ mod test { let mock_control_plane = MockControlPlane::new(); let (deletion_queue, worker) = DeletionQueue::new( - Some(storage.clone()), + storage.clone(), Some(mock_control_plane.clone()), harness.conf, ); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2469e2776d..b8d5c67ce0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -110,7 +110,7 @@ pub struct State { tenant_manager: Arc, auth: Option>, allowlist_routes: Vec, - remote_storage: Option, + remote_storage: GenericRemoteStorage, broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, @@ -124,7 +124,7 @@ impl State { conf: &'static PageServerConf, tenant_manager: Arc, auth: Option>, - remote_storage: Option, + remote_storage: GenericRemoteStorage, broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, @@ -819,12 +819,6 @@ async fn tenant_attach_handler( let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?; - if state.remote_storage.is_none() { - return Err(ApiError::BadRequest(anyhow!( - "attach_tenant is not possible because pageserver was configured without remote storage" - ))); - } - let tenant_shard_id = TenantShardId::unsharded(tenant_id); let shard_params = ShardParameters::default(); let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params); @@ -1649,12 +1643,6 @@ async fn tenant_time_travel_remote_storage_handler( ))); } - let Some(storage) = state.remote_storage.as_ref() else { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "remote storage not configured, cannot run time travel" - ))); - }; - if timestamp > done_if_after { return Err(ApiError::BadRequest(anyhow!( "The done_if_after timestamp comes before the timestamp to recover to" @@ -1664,7 +1652,7 @@ async fn tenant_time_travel_remote_storage_handler( tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}"); remote_timeline_client::upload::time_travel_recover_tenant( - storage, + &state.remote_storage, &tenant_shard_id, timestamp, done_if_after, @@ -1909,11 +1897,6 @@ async fn deletion_queue_flush( ) -> Result, ApiError> { let state = get_state(&r); - if state.remote_storage.is_none() { - // Nothing to do if remote storage is disabled. - return json_response(StatusCode::OK, ()); - } - let execute = parse_query_param(&r, "execute")?.unwrap_or(false); let flush = async { @@ -2078,18 +2061,11 @@ async fn disk_usage_eviction_run( }; let state = get_state(&r); - - let Some(storage) = state.remote_storage.as_ref() else { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "remote storage not configured, cannot run eviction iteration" - ))); - }; - let eviction_state = state.disk_usage_eviction_state.clone(); let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( &eviction_state, - storage, + &state.remote_storage, usage, &state.tenant_manager, config.eviction_order, @@ -2126,29 +2102,23 @@ async fn tenant_scan_remote_handler( let state = get_state(&request); let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - let Some(remote_storage) = state.remote_storage.as_ref() else { - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Remote storage not configured" - ))); - }; - let mut response = TenantScanRemoteStorageResponse::default(); let (shards, _other_keys) = - list_remote_tenant_shards(remote_storage, tenant_id, cancel.clone()) + list_remote_tenant_shards(&state.remote_storage, tenant_id, cancel.clone()) .await .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; for tenant_shard_id in shards { let (timeline_ids, _other_keys) = - list_remote_timelines(remote_storage, tenant_shard_id, cancel.clone()) + list_remote_timelines(&state.remote_storage, tenant_shard_id, cancel.clone()) .await .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; let mut generation = Generation::none(); for timeline_id in timeline_ids { match download_index_part( - remote_storage, + &state.remote_storage, &tenant_shard_id, &timeline_id, Generation::MAX, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 930700e50c..c69fb8c83b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -57,7 +57,7 @@ pub use crate::metrics::preinitialize_metrics; #[tracing::instrument(skip_all, fields(%exit_code))] pub async fn shutdown_pageserver( tenant_manager: &TenantManager, - deletion_queue: Option, + mut deletion_queue: DeletionQueue, exit_code: i32, ) { use std::time::Duration; @@ -89,9 +89,7 @@ pub async fn shutdown_pageserver( .await; // Best effort to persist any outstanding deletions, to avoid leaking objects - if let Some(mut deletion_queue) = deletion_queue { - deletion_queue.shutdown(Duration::from_secs(5)).await; - } + deletion_queue.shutdown(Duration::from_secs(5)).await; // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. @@ -114,10 +112,6 @@ pub async fn shutdown_pageserver( std::process::exit(exit_code); } -/// The name of the metadata file pageserver creates per timeline. -/// Full path: `tenants//timelines//metadata`. -pub const METADATA_FILE_NAME: &str = "metadata"; - /// Per-tenant configuration file. /// Full path: `tenants//config`. pub(crate) const TENANT_CONFIG_NAME: &str = "config"; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index c9dbad7ae6..5315f0b936 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1999,29 +1999,6 @@ impl Default for WalRedoProcessCounters { pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy = Lazy::new(WalRedoProcessCounters::default); -#[cfg(not(test))] -pub mod wal_redo { - use super::*; - - static PROCESS_KIND: Lazy> = Lazy::new(|| { - std::sync::Mutex::new( - register_uint_gauge_vec!( - "pageserver_wal_redo_process_kind", - "The configured process kind for walredo", - &["kind"], - ) - .unwrap(), - ) - }); - - pub fn set_process_kind_metric(kind: crate::walredo::ProcessKind) { - // use guard to avoid races around the next two steps - let guard = PROCESS_KIND.lock().unwrap(); - guard.reset(); - guard.with_label_values(&[&format!("{kind}")]).set(1); - } -} - /// Similar to `prometheus::HistogramTimer` but does not record on drop. pub(crate) struct StorageTimeMetricsTimer { metrics: StorageTimeMetrics, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f6b251283c..35aba044b2 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -32,6 +32,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use tokio::io::AsyncWriteExt; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; @@ -49,7 +50,6 @@ use utils::{ use crate::auth::check_permission; use crate::basebackup; use crate::basebackup::BasebackupError; -use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::import_datadir::import_wal_from_tar; use crate::metrics; @@ -59,13 +59,15 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant::mgr; -use crate::tenant::mgr::get_active_tenant_with_timeout; use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::mgr::GetTenantError; +use crate::tenant::mgr::ShardResolveResult; use crate::tenant::mgr::ShardSelector; +use crate::tenant::mgr::TenantManager; use crate::tenant::timeline::WaitLsnError; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; +use crate::tenant::Tenant; use crate::tenant::Timeline; use crate::trace::Tracer; use pageserver_api::key::rel_block_to_key; @@ -135,7 +137,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<() /// Listens for connections, and launches a new handler task for each. /// pub async fn libpq_listener_main( - conf: &'static PageServerConf, + tenant_manager: Arc, broker_client: storage_broker::BrokerClientChannel, auth: Option>, listener: TcpListener, @@ -180,7 +182,7 @@ pub async fn libpq_listener_main( "serving compute connection task", false, page_service_conn_main( - conf, + tenant_manager.clone(), broker_client.clone(), local_auth, socket, @@ -203,7 +205,7 @@ pub async fn libpq_listener_main( #[instrument(skip_all, fields(peer_addr))] async fn page_service_conn_main( - conf: &'static PageServerConf, + tenant_manager: Arc, broker_client: storage_broker::BrokerClientChannel, auth: Option>, socket: tokio::net::TcpStream, @@ -260,7 +262,8 @@ async fn page_service_conn_main( // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. - let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx); + let mut conn_handler = + PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx); let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend @@ -291,11 +294,12 @@ struct HandlerTimeline { } struct PageServerHandler { - _conf: &'static PageServerConf, broker_client: storage_broker::BrokerClientChannel, auth: Option>, claims: Option, + tenant_manager: Arc, + /// The context created for the lifetime of the connection /// services by this PageServerHandler. /// For each query received over the connection, @@ -381,13 +385,13 @@ impl From for QueryError { impl PageServerHandler { pub fn new( - conf: &'static PageServerConf, + tenant_manager: Arc, broker_client: storage_broker::BrokerClientChannel, auth: Option>, connection_ctx: RequestContext, ) -> Self { PageServerHandler { - _conf: conf, + tenant_manager, broker_client, auth, claims: None, @@ -552,13 +556,9 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id(); - let tenant = mgr::get_active_tenant_with_timeout( - tenant_id, - ShardSelector::First, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT) + .await?; // Make request tracer if needed let mut tracer = if tenant.get_trace_read_requests() { @@ -726,13 +726,9 @@ impl PageServerHandler { // Create empty timeline info!("creating new timeline"); - let tenant = get_active_tenant_with_timeout( - tenant_id, - ShardSelector::Zero, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT) + .await?; let timeline = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .await?; @@ -1370,18 +1366,69 @@ impl PageServerHandler { timeline_id: TimelineId, selector: ShardSelector, ) -> Result, GetActiveTimelineError> { - let tenant = get_active_tenant_with_timeout( - tenant_id, - selector, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await - .map_err(GetActiveTimelineError::Tenant)?; + let tenant = self + .get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT) + .await + .map_err(GetActiveTimelineError::Tenant)?; let timeline = tenant.get_timeline(timeline_id, true)?; set_tracing_field_shard_id(&timeline); Ok(timeline) } + + /// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some + /// slots for this tenant are `InProgress` then we will wait. + /// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait. + /// + /// `timeout` is used as a total timeout for the whole wait operation. + async fn get_active_tenant_with_timeout( + &self, + tenant_id: TenantId, + shard_selector: ShardSelector, + timeout: Duration, + ) -> Result, GetActiveTenantError> { + let wait_start = Instant::now(); + let deadline = wait_start + timeout; + + // Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is + // for handling the rare case that the slot we're accessing is InProgress. + let tenant_shard = loop { + let resolved = self + .tenant_manager + .resolve_attached_shard(&tenant_id, shard_selector); + match resolved { + ShardResolveResult::Found(tenant_shard) => break tenant_shard, + ShardResolveResult::NotFound => { + return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( + tenant_id, + ))); + } + ShardResolveResult::InProgress(barrier) => { + // We can't authoritatively answer right now: wait for InProgress state + // to end, then try again + tokio::select! { + _ = self.await_connection_cancelled() => { + return Err(GetActiveTenantError::Cancelled) + }, + _ = barrier.wait() => { + // The barrier completed: proceed around the loop to try looking up again + }, + _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => { + return Err(GetActiveTenantError::WaitForActiveTimeout { + latest_state: None, + wait_time: timeout, + }); + } + } + } + }; + }; + + tracing::debug!("Waiting for tenant to enter active state..."); + tenant_shard + .wait_to_become_active(deadline.duration_since(Instant::now())) + .await?; + Ok(tenant_shard) + } } #[async_trait::async_trait] @@ -1771,13 +1818,13 @@ where self.check_permission(Some(tenant_id))?; - let tenant = get_active_tenant_with_timeout( - tenant_id, - ShardSelector::Zero, - ACTIVE_TENANT_TIMEOUT, - &task_mgr::shutdown_token(), - ) - .await?; + let tenant = self + .get_active_tenant_with_timeout( + tenant_id, + ShardSelector::Zero, + ACTIVE_TENANT_TIMEOUT, + ) + .await?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f54fe9b920..54b63f7042 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -191,7 +191,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted"; #[derive(Clone)] pub struct TenantSharedResources { pub broker_client: storage_broker::BrokerClientChannel, - pub remote_storage: Option, + pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, } @@ -293,7 +293,7 @@ pub struct Tenant { walredo_mgr: Option>, // provides access to timeline data sitting in the remote storage - pub(crate) remote_storage: Option, + pub(crate) remote_storage: GenericRemoteStorage, // Access to global deletion queue for when this tenant wants to schedule a deletion deletion_queue_client: DeletionQueueClient, @@ -552,21 +552,22 @@ impl Tenant { ); if let Some(index_part) = index_part.as_ref() { - timeline - .remote_client - .as_ref() - .unwrap() - .init_upload_queue(index_part)?; - } else if self.remote_storage.is_some() { + timeline.remote_client.init_upload_queue(index_part)?; + } else { // No data on the remote storage, but we have local metadata file. We can end up // here with timeline_create being interrupted before finishing index part upload. // By doing what we do here, the index part upload is retried. // If control plane retries timeline creation in the meantime, the mgmt API handler // for timeline creation will coalesce on the upload we queue here. + // FIXME: this branch should be dead code as we no longer write local metadata. - let rtc = timeline.remote_client.as_ref().unwrap(); - rtc.init_upload_queue_for_empty_remote(&metadata)?; - rtc.schedule_index_upload_for_full_metadata_update(&metadata)?; + + timeline + .remote_client + .init_upload_queue_for_empty_remote(&metadata)?; + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; } timeline @@ -778,14 +779,14 @@ impl Tenant { AttachType::Normal }; - let preload = match (&mode, &remote_storage) { - (SpawnMode::Create, _) => { + let preload = match &mode { + SpawnMode::Create => { None }, - (SpawnMode::Eager | SpawnMode::Lazy, Some(remote_storage)) => { + SpawnMode::Eager | SpawnMode::Lazy => { let _preload_timer = TENANT.preload.start_timer(); let res = tenant_clone - .preload(remote_storage, task_mgr::shutdown_token()) + .preload(&remote_storage, task_mgr::shutdown_token()) .await; match res { Ok(p) => Some(p), @@ -795,10 +796,7 @@ impl Tenant { } } } - (_, None) => { - let _preload_timer = TENANT.preload.start_timer(); - None - } + }; // Remote preload is complete. @@ -1022,7 +1020,7 @@ impl Tenant { index_part, remote_metadata, TimelineResources { - remote_client: Some(remote_client), + remote_client, deletion_queue_client: self.deletion_queue_client.clone(), timeline_get_throttle: self.timeline_get_throttle.clone(), }, @@ -1048,7 +1046,7 @@ impl Tenant { Arc::clone(self), timeline_id, &index_part.metadata, - Some(remote_timeline_client), + remote_timeline_client, self.deletion_queue_client.clone(), ) .instrument(tracing::info_span!("timeline_delete", %timeline_id)) @@ -1140,9 +1138,7 @@ impl Tenant { let mut size = 0; for timeline in self.list_timelines() { - if let Some(remote_client) = &timeline.remote_client { - size += remote_client.get_remote_physical_size(); - } + size += timeline.remote_client.get_remote_physical_size(); } size @@ -1192,6 +1188,7 @@ impl Tenant { pub fn create_broken_tenant( conf: &'static PageServerConf, tenant_shard_id: TenantShardId, + remote_storage: GenericRemoteStorage, reason: String, ) -> Arc { Arc::new(Tenant::new( @@ -1206,7 +1203,7 @@ impl Tenant { ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count), None, tenant_shard_id, - None, + remote_storage, DeletionQueueClient::broken(), )) } @@ -1399,13 +1396,7 @@ impl Tenant { tline.freeze_and_flush().await.context("freeze_and_flush")?; // Make sure the freeze_and_flush reaches remote storage. - tline - .remote_client - .as_ref() - .unwrap() - .wait_completion() - .await - .unwrap(); + tline.remote_client.wait_completion().await.unwrap(); let tl = uninit_tl.finish_creation()?; // The non-test code would call tl.activate() here. @@ -1471,20 +1462,19 @@ impl Tenant { return Err(CreateTimelineError::Conflict); } - if let Some(remote_client) = existing.remote_client.as_ref() { - // Wait for uploads to complete, so that when we return Ok, the timeline - // is known to be durable on remote storage. Just like we do at the end of - // this function, after we have created the timeline ourselves. - // - // We only really care that the initial version of `index_part.json` has - // been uploaded. That's enough to remember that the timeline - // exists. However, there is no function to wait specifically for that so - // we just wait for all in-progress uploads to finish. - remote_client - .wait_completion() - .await - .context("wait for timeline uploads to complete")?; - } + // Wait for uploads to complete, so that when we return Ok, the timeline + // is known to be durable on remote storage. Just like we do at the end of + // this function, after we have created the timeline ourselves. + // + // We only really care that the initial version of `index_part.json` has + // been uploaded. That's enough to remember that the timeline + // exists. However, there is no function to wait specifically for that so + // we just wait for all in-progress uploads to finish. + existing + .remote_client + .wait_completion() + .await + .context("wait for timeline uploads to complete")?; return Ok(existing); } @@ -1560,14 +1550,14 @@ impl Tenant { // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must // not send a success to the caller until it is. The same applies to handling retries, // see the handling of [`TimelineExclusionError::AlreadyExists`] above. - if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { - let kind = ancestor_timeline_id - .map(|_| "branched") - .unwrap_or("bootstrapped"); - remote_client.wait_completion().await.with_context(|| { - format!("wait for {} timeline initial uploads to complete", kind) - })?; - } + let kind = ancestor_timeline_id + .map(|_| "branched") + .unwrap_or("bootstrapped"); + loaded_timeline + .remote_client + .wait_completion() + .await + .with_context(|| format!("wait for {} timeline initial uploads to complete", kind))?; loaded_timeline.activate(self.clone(), broker_client, None, ctx); @@ -2162,32 +2152,26 @@ impl Tenant { ) -> anyhow::Result<()> { let timelines = self.timelines.lock().unwrap().clone(); for timeline in timelines.values() { - let Some(tl_client) = &timeline.remote_client else { - anyhow::bail!("Remote storage is mandatory"); - }; - - let Some(remote_storage) = &self.remote_storage else { - anyhow::bail!("Remote storage is mandatory"); - }; - // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels // to ensure that they do not start a split if currently in the process of doing these. // Upload an index from the parent: this is partly to provide freshness for the // child tenants that will copy it, and partly for general ease-of-debugging: there will // always be a parent shard index in the same generation as we wrote the child shard index. - tl_client.schedule_index_upload_for_file_changes()?; - tl_client.wait_completion().await?; + timeline + .remote_client + .schedule_index_upload_for_file_changes()?; + timeline.remote_client.wait_completion().await?; // Shut down the timeline's remote client: this means that the indices we write // for child shards will not be invalidated by the parent shard deleting layers. - tl_client.shutdown().await; + timeline.remote_client.shutdown().await; // Download methods can still be used after shutdown, as they don't flow through the remote client's // queue. In principal the RemoteTimelineClient could provide this without downloading it, but this // operation is rare, so it's simpler to just download it (and robustly guarantees that the index // we use here really is the remotely persistent one). - let result = tl_client + let result = timeline.remote_client .download_index_file(&self.cancel) .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id)) .await?; @@ -2200,7 +2184,7 @@ impl Tenant { for child_shard in child_shards { upload_index_part( - remote_storage, + &self.remote_storage, child_shard, &timeline.timeline_id, self.generation, @@ -2227,9 +2211,9 @@ impl Tenant { result.physical_size += timeline .remote_client - .as_ref() - .map(|c| c.metrics.remote_physical_size_gauge.get()) - .unwrap_or(0); + .metrics + .remote_physical_size_gauge + .get(); result.max_logical_size = std::cmp::max( result.max_logical_size, timeline.metrics.current_logical_size_gauge.get(), @@ -2501,7 +2485,7 @@ impl Tenant { shard_identity: ShardIdentity, walredo_mgr: Option>, tenant_shard_id: TenantShardId, - remote_storage: Option, + remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, ) -> Tenant { let (state, mut rx) = watch::channel(state); @@ -3145,11 +3129,10 @@ impl Tenant { // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC // could get incorrect information and remove more layers, than needed. // See also https://github.com/neondatabase/neon/issues/3865 - if let Some(remote_client) = new_timeline.remote_client.as_ref() { - remote_client - .schedule_index_upload_for_full_metadata_update(&metadata) - .context("branch initial metadata upload")?; - } + new_timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata) + .context("branch initial metadata upload")?; Ok(new_timeline) } @@ -3181,11 +3164,6 @@ impl Tenant { pgdata_path: &Utf8PathBuf, timeline_id: &TimelineId, ) -> anyhow::Result<()> { - let Some(storage) = &self.remote_storage else { - // No remote storage? No upload. - return Ok(()); - }; - let temp_path = timelines_path.join(format!( "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}" )); @@ -3209,7 +3187,7 @@ impl Tenant { backoff::retry( || async { self::remote_timeline_client::upload_initdb_dir( - storage, + &self.remote_storage, &self.tenant_shard_id.tenant_id, timeline_id, pgdata_zstd.try_clone().await?, @@ -3266,9 +3244,6 @@ impl Tenant { } } if let Some(existing_initdb_timeline_id) = load_existing_initdb { - let Some(storage) = &self.remote_storage else { - bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}"); - }; if existing_initdb_timeline_id != timeline_id { let source_path = &remote_initdb_archive_path( &self.tenant_shard_id.tenant_id, @@ -3278,7 +3253,7 @@ impl Tenant { &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id); // if this fails, it will get retried by retried control plane requests - storage + self.remote_storage .copy_object(source_path, dest_path, &self.cancel) .await .context("copy initdb tar")?; @@ -3286,7 +3261,7 @@ impl Tenant { let (initdb_tar_zst_path, initdb_tar_zst) = self::remote_timeline_client::download_initdb_tar_zst( self.conf, - storage, + &self.remote_storage, &self.tenant_shard_id, &existing_initdb_timeline_id, &self.cancel, @@ -3381,20 +3356,14 @@ impl Tenant { /// Call this before constructing a timeline, to build its required structures fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources { - let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() { - let remote_client = RemoteTimelineClient::new( - remote_storage.clone(), - self.deletion_queue_client.clone(), - self.conf, - self.tenant_shard_id, - timeline_id, - self.generation, - ); - Some(remote_client) - } else { - None - }; - + let remote_client = RemoteTimelineClient::new( + self.remote_storage.clone(), + self.deletion_queue_client.clone(), + self.conf, + self.tenant_shard_id, + timeline_id, + self.generation, + ); TimelineResources { remote_client, deletion_queue_client: self.deletion_queue_client.clone(), @@ -3418,9 +3387,9 @@ impl Tenant { let tenant_shard_id = self.tenant_shard_id; let resources = self.build_timeline_resources(new_timeline_id); - if let Some(remote_client) = &resources.remote_client { - remote_client.init_upload_queue_for_empty_remote(new_metadata)?; - } + resources + .remote_client + .init_upload_queue_for_empty_remote(new_metadata)?; let timeline_struct = self .create_timeline_struct( @@ -3588,9 +3557,7 @@ impl Tenant { tracing::info!(timeline_id=%timeline.timeline_id, "Flushing..."); timeline.freeze_and_flush().await?; tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads..."); - if let Some(client) = &timeline.remote_client { - client.wait_completion().await?; - } + timeline.remote_client.wait_completion().await?; Ok(()) } @@ -3904,7 +3871,7 @@ pub(crate) mod harness { ShardIdentity::unsharded(), Some(walredo_mgr), self.tenant_shard_id, - Some(self.remote_storage.clone()), + self.remote_storage.clone(), self.deletion_queue.new_client(), )); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 2e5259bfe2..3173a33dad 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -181,25 +181,23 @@ async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), Del async fn remove_tenant_remote_delete_mark( conf: &PageServerConf, - remote_storage: Option<&GenericRemoteStorage>, + remote_storage: &GenericRemoteStorage, tenant_shard_id: &TenantShardId, cancel: &CancellationToken, ) -> Result<(), DeleteTenantError> { - if let Some(remote_storage) = remote_storage { - let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; - backoff::retry( - || async { remote_storage.delete(&path, cancel).await }, - TimeoutOrCancel::caused_by_cancel, - FAILED_UPLOAD_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "remove_tenant_remote_delete_mark", - cancel, - ) - .await - .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) - .and_then(|x| x) - .context("remove_tenant_remote_delete_mark")?; - } + let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; + backoff::retry( + || async { remote_storage.delete(&path, cancel).await }, + TimeoutOrCancel::caused_by_cancel, + FAILED_UPLOAD_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, + "remove_tenant_remote_delete_mark", + cancel, + ) + .await + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) + .and_then(|x| x) + .context("remove_tenant_remote_delete_mark")?; Ok(()) } @@ -297,7 +295,7 @@ impl DeleteTenantFlow { #[instrument(skip_all)] pub(crate) async fn run( conf: &'static PageServerConf, - remote_storage: Option, + remote_storage: GenericRemoteStorage, tenants: &'static std::sync::RwLock, tenant: Arc, cancel: &CancellationToken, @@ -308,9 +306,7 @@ impl DeleteTenantFlow { let mut guard = Self::prepare(&tenant).await?; - if let Err(e) = - Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant, cancel).await - { + if let Err(e) = Self::run_inner(&mut guard, conf, &remote_storage, &tenant, cancel).await { tenant.set_broken(format!("{e:#}")).await; return Err(e); } @@ -327,7 +323,7 @@ impl DeleteTenantFlow { async fn run_inner( guard: &mut OwnedMutexGuard, conf: &'static PageServerConf, - remote_storage: Option<&GenericRemoteStorage>, + remote_storage: &GenericRemoteStorage, tenant: &Tenant, cancel: &CancellationToken, ) -> Result<(), DeleteTenantError> { @@ -339,14 +335,9 @@ impl DeleteTenantFlow { ))? }); - // IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend. - // Though sounds scary, different mark name? - // Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state. - if let Some(remote_storage) = &remote_storage { - create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel) - .await - .context("remote_mark")? - } + create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel) + .await + .context("remote_mark")?; fail::fail_point!("tenant-delete-before-create-local-mark", |_| { Err(anyhow::anyhow!( @@ -483,7 +474,7 @@ impl DeleteTenantFlow { fn schedule_background( guard: OwnedMutexGuard, conf: &'static PageServerConf, - remote_storage: Option, + remote_storage: GenericRemoteStorage, tenants: &'static std::sync::RwLock, tenant: Arc, ) { @@ -512,7 +503,7 @@ impl DeleteTenantFlow { async fn background( mut guard: OwnedMutexGuard, conf: &PageServerConf, - remote_storage: Option, + remote_storage: GenericRemoteStorage, tenants: &'static std::sync::RwLock, tenant: &Arc, ) -> Result<(), DeleteTenantError> { @@ -551,7 +542,7 @@ impl DeleteTenantFlow { remove_tenant_remote_delete_mark( conf, - remote_storage.as_ref(), + &remote_storage, &tenant.tenant_shard_id, &task_mgr::shutdown_token(), ) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7a3e36bf02..1d8e2cf6d3 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -16,10 +16,9 @@ use std::cmp::Ordering; use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use sysinfo::SystemExt; use tokio::fs; -use utils::timeout::{timeout_cancellable, TimeoutCancellableError}; use anyhow::Context; use once_cell::sync::Lazy; @@ -47,7 +46,7 @@ use crate::tenant::span::debug_assert_current_span_has_tenant_id; use crate::tenant::storage_layer::inmemory_layer; use crate::tenant::timeline::ShutdownMode; use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState}; -use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX}; +use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX}; use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext::PathExt; @@ -119,6 +118,7 @@ pub(crate) enum TenantsMapRemoveResult { /// When resolving a TenantId to a shard, we may be looking for the 0th /// shard, or we might be looking for whichever shard holds a particular page. +#[derive(Copy, Clone)] pub(crate) enum ShardSelector { /// Only return the 0th shard, if it is present. If a non-0th shard is present, /// ignore it. @@ -169,6 +169,14 @@ impl TenantStartupMode { } } +/// Result type for looking up a TenantId to a specific shard +pub(crate) enum ShardResolveResult { + NotFound, + Found(Arc), + // Wait for this barrrier, then query again + InProgress(utils::completion::Barrier), +} + impl TenantsMap { /// Convenience function for typical usage, where we want to get a `Tenant` object, for /// working with attached tenants. If the TenantId is in the map but in Secondary state, @@ -182,51 +190,6 @@ impl TenantsMap { } } - /// A page service client sends a TenantId, and to look up the correct Tenant we must - /// resolve this to a fully qualified TenantShardId. - fn resolve_attached_shard( - &self, - tenant_id: &TenantId, - selector: ShardSelector, - ) -> Option { - let mut want_shard = None; - match self { - TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - for slot in m.range(TenantShardId::tenant_range(*tenant_id)) { - // Ignore all slots that don't contain an attached tenant - let tenant = match &slot.1 { - TenantSlot::Attached(t) => t, - _ => continue, - }; - - match selector { - ShardSelector::First => return Some(*slot.0), - ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { - return Some(*slot.0) - } - ShardSelector::Page(key) => { - // First slot we see for this tenant, calculate the expected shard number - // for the key: we will use this for checking if this and subsequent - // slots contain the key, rather than recalculating the hash each time. - if want_shard.is_none() { - want_shard = Some(tenant.shard_identity.get_shard_number(&key)); - } - - if Some(tenant.shard_identity.number) == want_shard { - return Some(*slot.0); - } - } - _ => continue, - } - } - - // Fall through: we didn't find an acceptable shard - None - } - } - } - /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map. /// /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded @@ -391,22 +354,17 @@ async fn init_load_generations( // deletion list entries may still be valid. We provide that by pushing a recovery operation into // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions // are processed, even though we don't block on recovery completing here. - // - // Must only do this if remote storage is enabled, otherwise deletion queue - // is not running and channel push will fail. - if resources.remote_storage.is_some() { - let attached_tenants = generations - .iter() - .flat_map(|(id, start_mode)| { - match start_mode { - TenantStartupMode::Attached((_mode, generation)) => Some(generation), - TenantStartupMode::Secondary => None, - } - .map(|gen| (*id, *gen)) - }) - .collect(); - resources.deletion_queue_client.recover(attached_tenants)?; - } + let attached_tenants = generations + .iter() + .flat_map(|(id, start_mode)| { + match start_mode { + TenantStartupMode::Attached((_mode, generation)) => Some(generation), + TenantStartupMode::Secondary => None, + } + .map(|gen| (*id, *gen)) + }) + .collect(); + resources.deletion_queue_client.recover(attached_tenants)?; Ok(Some(generations)) } @@ -460,53 +418,6 @@ fn load_tenant_config( } }; - // Clean up legacy `metadata` files. - // Doing it here because every single tenant directory is visited here. - // In any later code, there's different treatment of tenant dirs - // ... depending on whether the tenant is in re-attach response or not - // ... epending on whether the tenant is ignored or not - assert_eq!( - &conf.tenant_path(&tenant_shard_id), - &tenant_dir_path, - "later use of conf....path() methods would be dubious" - ); - let timelines: Vec = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() { - Ok(iter) => { - let mut timelines = Vec::new(); - for res in iter { - let p = res?; - let Some(timeline_id) = p.file_name().parse::().ok() else { - // skip any entries that aren't TimelineId, such as - // - *.___temp dirs - // - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart) - continue; - }; - timelines.push(timeline_id); - } - timelines - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![], - Err(e) => return Err(anyhow::anyhow!(e)), - }; - for timeline_id in timelines { - let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id); - let metadata_path = timeline_path.join(METADATA_FILE_NAME); - match std::fs::remove_file(&metadata_path) { - Ok(()) => { - crashsafe::fsync(timeline_path) - .context("fsync timeline dir after removing legacy metadata file")?; - info!("removed legacy metadata file at {metadata_path}"); - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - // something removed the file earlier, or it was never there - // We don't care, this software version doesn't write it again, so, we're good. - } - Err(e) => { - anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}"); - } - } - } - let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME); if tenant_ignore_mark_file.exists() { info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant"); @@ -611,6 +522,7 @@ pub async fn init_tenant_mgr( TenantSlot::Attached(Tenant::create_broken_tenant( conf, tenant_shard_id, + resources.remote_storage.clone(), format!("{}", e), )), ); @@ -803,6 +715,7 @@ fn tenant_spawn( "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}" ); + let remote_storage = resources.remote_storage.clone(); let tenant = match Tenant::spawn( conf, tenant_shard_id, @@ -817,7 +730,7 @@ fn tenant_spawn( Ok(tenant) => tenant, Err(e) => { error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}"); - Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}")) + Tenant::create_broken_tenant(conf, tenant_shard_id, remote_storage, format!("{e:#}")) } }; @@ -2103,6 +2016,72 @@ impl TenantManager { Ok(reparented) } + + /// A page service client sends a TenantId, and to look up the correct Tenant we must + /// resolve this to a fully qualified TenantShardId. + /// + /// During shard splits: we shall see parent shards in InProgress state and skip them, and + /// instead match on child shards which should appear in Attached state. Very early in a shard + /// split, or in other cases where a shard is InProgress, we will return our own InProgress result + /// to instruct the caller to wait for that to finish before querying again. + pub(crate) fn resolve_attached_shard( + &self, + tenant_id: &TenantId, + selector: ShardSelector, + ) -> ShardResolveResult { + let tenants = self.tenants.read().unwrap(); + let mut want_shard = None; + let mut any_in_progress = None; + + match &*tenants { + TenantsMap::Initializing => ShardResolveResult::NotFound, + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { + for slot in m.range(TenantShardId::tenant_range(*tenant_id)) { + // Ignore all slots that don't contain an attached tenant + let tenant = match &slot.1 { + TenantSlot::Attached(t) => t, + TenantSlot::InProgress(barrier) => { + // We might still find a usable shard, but in case we don't, remember that + // we saw at least one InProgress slot, so that we can distinguish this case + // from a simple NotFound in our return value. + any_in_progress = Some(barrier.clone()); + continue; + } + _ => continue, + }; + + match selector { + ShardSelector::First => return ShardResolveResult::Found(tenant.clone()), + ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { + return ShardResolveResult::Found(tenant.clone()) + } + ShardSelector::Page(key) => { + // First slot we see for this tenant, calculate the expected shard number + // for the key: we will use this for checking if this and subsequent + // slots contain the key, rather than recalculating the hash each time. + if want_shard.is_none() { + want_shard = Some(tenant.shard_identity.get_shard_number(&key)); + } + + if Some(tenant.shard_identity.number) == want_shard { + return ShardResolveResult::Found(tenant.clone()); + } + } + _ => continue, + } + } + + // Fall through: we didn't find a slot that was in Attached state & matched our selector. If + // we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise + // this requested shard simply isn't found. + if let Some(barrier) = any_in_progress { + ShardResolveResult::InProgress(barrier) + } else { + ShardResolveResult::NotFound + } + } + } + } } #[derive(Debug, thiserror::Error)] @@ -2151,105 +2130,6 @@ pub(crate) enum GetActiveTenantError { Broken(String), } -/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`] -/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`], -/// then wait for up to `timeout` (minus however long we waited for the slot). -pub(crate) async fn get_active_tenant_with_timeout( - tenant_id: TenantId, - shard_selector: ShardSelector, - timeout: Duration, - cancel: &CancellationToken, -) -> Result, GetActiveTenantError> { - enum WaitFor { - Barrier(utils::completion::Barrier), - Tenant(Arc), - } - - let wait_start = Instant::now(); - let deadline = wait_start + timeout; - - let (wait_for, tenant_shard_id) = { - let locked = TENANTS.read().unwrap(); - - // Resolve TenantId to TenantShardId - let tenant_shard_id = locked - .resolve_attached_shard(&tenant_id, shard_selector) - .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - )))?; - - let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; - match peek_slot { - Some(TenantSlot::Attached(tenant)) => { - match tenant.current_state() { - TenantState::Active => { - // Fast path: we don't need to do any async waiting. - return Ok(tenant.clone()); - } - _ => { - tenant.activate_now(); - (WaitFor::Tenant(tenant.clone()), tenant_shard_id) - } - } - } - Some(TenantSlot::Secondary(_)) => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( - tenant_shard_id, - ))) - } - Some(TenantSlot::InProgress(barrier)) => { - (WaitFor::Barrier(barrier.clone()), tenant_shard_id) - } - None => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( - tenant_id, - ))) - } - } - }; - - let tenant = match wait_for { - WaitFor::Barrier(barrier) => { - tracing::debug!("Waiting for tenant InProgress state to pass..."); - timeout_cancellable( - deadline.duration_since(Instant::now()), - cancel, - barrier.wait(), - ) - .await - .map_err(|e| match e { - TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout { - latest_state: None, - wait_time: wait_start.elapsed(), - }, - TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled, - })?; - { - let locked = TENANTS.read().unwrap(); - let peek_slot = - tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; - match peek_slot { - Some(TenantSlot::Attached(tenant)) => tenant.clone(), - _ => { - return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive( - tenant_shard_id, - ))) - } - } - } - } - WaitFor::Tenant(tenant) => tenant, - }; - - tracing::debug!("Waiting for tenant to enter active state..."); - tenant - .wait_to_become_active(deadline.duration_since(Instant::now())) - .await?; - Ok(tenant) -} - #[derive(Debug, thiserror::Error)] pub(crate) enum DeleteTimelineError { #[error("Tenant {0}")] @@ -2276,7 +2156,7 @@ pub(crate) async fn load_tenant( tenant_id: TenantId, generation: Generation, broker_client: storage_broker::BrokerClientChannel, - remote_storage: Option, + remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { @@ -2937,7 +2817,7 @@ pub(crate) async fn immediate_gc( } let timeline = tenant.get_timeline(timeline_id, false).ok(); - let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref()); + let rtc = timeline.as_ref().map(|x| &x.remote_client); if let Some(rtc) = rtc { // layer drops schedule actions on remote timeline client to actually do the diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index afd0cc0c34..4844618316 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -2137,7 +2137,7 @@ mod tests { tenant_ctx: _tenant_ctx, } = test_setup; - let client = timeline.remote_client.as_ref().unwrap(); + let client = &timeline.remote_client; // Download back the index.json, and check that the list of files is correct let initial_index_part = match client @@ -2328,7 +2328,7 @@ mod tests { timeline, .. } = TestSetup::new("metrics").await.unwrap(); - let client = timeline.remote_client.as_ref().unwrap(); + let client = &timeline.remote_client; let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let local_path = local_layer_path( diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index c28e041fa2..46a3d7e81f 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -26,7 +26,7 @@ use crate::{ tasks::{warn_when_period_overrun, BackgroundLoopKind}, }, virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}, - METADATA_FILE_NAME, TEMP_FILE_SUFFIX, + TEMP_FILE_SUFFIX, }; use super::{ @@ -1074,11 +1074,7 @@ async fn init_timeline_state( .fatal_err(&format!("Read metadata on {}", file_path)); let file_name = file_path.file_name().expect("created it from the dentry"); - if file_name == METADATA_FILE_NAME { - // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant. - warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config"); - continue; - } else if crate::is_temporary(&file_path) + if crate::is_temporary(&file_path) || is_temp_download_file(&file_path) || is_ephemeral_file(file_name) { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index b6f7702247..e8c712c4c6 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -585,9 +585,6 @@ struct LayerInner { /// [`Timeline::gate`] at the same time. timeline: Weak, - /// Cached knowledge of [`Timeline::remote_client`] being `Some`. - have_remote_client: bool, - access_stats: LayerAccessStats, /// This custom OnceCell is backed by std mutex, but only held for short time periods. @@ -732,23 +729,23 @@ impl Drop for LayerInner { if removed { timeline.metrics.resident_physical_size_sub(file_size); } - if let Some(remote_client) = timeline.remote_client.as_ref() { - let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]); + let res = timeline + .remote_client + .schedule_deletion_of_unlinked(vec![(file_name, meta)]); - if let Err(e) = res { - // test_timeline_deletion_with_files_stuck_in_upload_queue is good at - // demonstrating this deadlock (without spawn_blocking): stop will drop - // queued items, which will have ResidentLayer's, and those drops would try - // to re-entrantly lock the RemoteTimelineClient inner state. - if !timeline.is_active() { - tracing::info!("scheduling deletion on drop failed: {e:#}"); - } else { - tracing::warn!("scheduling deletion on drop failed: {e:#}"); - } - LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed); + if let Err(e) = res { + // test_timeline_deletion_with_files_stuck_in_upload_queue is good at + // demonstrating this deadlock (without spawn_blocking): stop will drop + // queued items, which will have ResidentLayer's, and those drops would try + // to re-entrantly lock the RemoteTimelineClient inner state. + if !timeline.is_active() { + tracing::info!("scheduling deletion on drop failed: {e:#}"); } else { - LAYER_IMPL_METRICS.inc_completed_deletes(); + tracing::warn!("scheduling deletion on drop failed: {e:#}"); } + LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed); + } else { + LAYER_IMPL_METRICS.inc_completed_deletes(); } }); } @@ -786,7 +783,6 @@ impl LayerInner { path: local_path, desc, timeline: Arc::downgrade(timeline), - have_remote_client: timeline.remote_client.is_some(), access_stats, wanted_deleted: AtomicBool::new(false), inner, @@ -815,8 +811,6 @@ impl LayerInner { /// in a new attempt to evict OR join the previously started attempt. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))] pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> { - assert!(self.have_remote_client); - let mut rx = self.status.as_ref().unwrap().subscribe(); { @@ -973,10 +967,6 @@ impl LayerInner { return Err(DownloadError::NotFile(ft)); } - if timeline.remote_client.as_ref().is_none() { - return Err(DownloadError::NoRemoteStorage); - } - if let Some(ctx) = ctx { self.check_expected_download(ctx)?; } @@ -1113,12 +1103,8 @@ impl LayerInner { permit: heavier_once_cell::InitPermit, ctx: &RequestContext, ) -> anyhow::Result> { - let client = timeline + let result = timeline .remote_client - .as_ref() - .expect("checked before download_init_and_wait"); - - let result = client .download_layer_file( &self.desc.layer_name(), &self.metadata(), @@ -1293,20 +1279,10 @@ impl LayerInner { /// `DownloadedLayer` is being dropped, so it calls this method. fn on_downloaded_layer_drop(self: Arc, only_version: usize) { - let can_evict = self.have_remote_client; - // we cannot know without inspecting LayerInner::inner if we should evict or not, even // though here it is very likely let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version); - if !can_evict { - // it would be nice to assert this case out, but we are in drop - span.in_scope(|| { - tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage"); - }); - return; - } - // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might // drop while the `self.inner` is being locked, leading to a deadlock. @@ -1578,8 +1554,6 @@ pub(crate) enum EvictionError { pub(crate) enum DownloadError { #[error("timeline has already shutdown")] TimelineShutdown, - #[error("no remote storage configured")] - NoRemoteStorage, #[error("context denies downloading")] ContextAndConfigReallyDeniesDownloads, #[error("downloading is really required but not allowed by this method")] diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 52f62faa8d..fa9142d5e9 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -145,7 +145,7 @@ async fn smoke_test() { .await .expect("the local layer file still exists"); - let rtc = timeline.remote_client.as_ref().unwrap(); + let rtc = &timeline.remote_client; { let layers = &[layer]; @@ -761,13 +761,7 @@ async fn eviction_cancellation_on_drop() { timeline.freeze_and_flush().await.unwrap(); // wait for the upload to complete so our Arc::strong_count assertion holds - timeline - .remote_client - .as_ref() - .unwrap() - .wait_completion() - .await - .unwrap(); + timeline.remote_client.wait_completion().await.unwrap(); let (evicted_layer, not_evicted) = { let mut layers = { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ca34b4fadc..df9bc9b35b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -200,7 +200,7 @@ fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { /// The outward-facing resources required to build a Timeline pub struct TimelineResources { - pub remote_client: Option, + pub remote_client: RemoteTimelineClient, pub deletion_queue_client: DeletionQueueClient, pub timeline_get_throttle: Arc< crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, @@ -272,7 +272,7 @@ pub struct Timeline { /// Remote storage client. /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details. - pub remote_client: Option>, + pub remote_client: Arc, // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all @@ -1375,22 +1375,14 @@ impl Timeline { /// not validated with control plane yet. /// See [`Self::get_remote_consistent_lsn_visible`]. pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option { - if let Some(remote_client) = &self.remote_client { - remote_client.remote_consistent_lsn_projected() - } else { - None - } + self.remote_client.remote_consistent_lsn_projected() } /// remote_consistent_lsn which the tenant is guaranteed not to go backward from, /// i.e. a value of remote_consistent_lsn_projected which has undergone /// generation validation in the deletion queue. pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option { - if let Some(remote_client) = &self.remote_client { - remote_client.remote_consistent_lsn_visible() - } else { - None - } + self.remote_client.remote_consistent_lsn_visible() } /// The sum of the file size of all historic layers in the layer map. @@ -1760,16 +1752,14 @@ impl Timeline { match self.freeze_and_flush().await { Ok(_) => { // drain the upload queue - if let Some(client) = self.remote_client.as_ref() { - // if we did not wait for completion here, it might be our shutdown process - // didn't wait for remote uploads to complete at all, as new tasks can forever - // be spawned. - // - // what is problematic is the shutting down of RemoteTimelineClient, because - // obviously it does not make sense to stop while we wait for it, but what - // about corner cases like s3 suddenly hanging up? - client.shutdown().await; - } + // if we did not wait for completion here, it might be our shutdown process + // didn't wait for remote uploads to complete at all, as new tasks can forever + // be spawned. + // + // what is problematic is the shutting down of RemoteTimelineClient, because + // obviously it does not make sense to stop while we wait for it, but what + // about corner cases like s3 suddenly hanging up? + self.remote_client.shutdown().await; } Err(e) => { // Non-fatal. Shutdown is infallible. Failures to flush just mean that @@ -1785,18 +1775,16 @@ impl Timeline { // Transition the remote_client into a state where it's only useful for timeline deletion. // (The deletion use case is why we can't just hook up remote_client to Self::cancel).) - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.stop(); - // As documented in remote_client.stop()'s doc comment, it's our responsibility - // to shut down the upload queue tasks. - // TODO: fix that, task management should be encapsulated inside remote_client. - task_mgr::shutdown_tasks( - Some(TaskKind::RemoteUploadTask), - Some(self.tenant_shard_id), - Some(self.timeline_id), - ) - .await; - } + self.remote_client.stop(); + // As documented in remote_client.stop()'s doc comment, it's our responsibility + // to shut down the upload queue tasks. + // TODO: fix that, task management should be encapsulated inside remote_client. + task_mgr::shutdown_tasks( + Some(TaskKind::RemoteUploadTask), + Some(self.tenant_shard_id), + Some(self.timeline_id), + ) + .await; // TODO: work toward making this a no-op. See this funciton's doc comment for more context. tracing::debug!("Waiting for tasks..."); @@ -1922,10 +1910,6 @@ impl Timeline { return Ok(None); }; - if self.remote_client.is_none() { - return Ok(Some(false)); - } - layer.download().await?; Ok(Some(true)) @@ -2190,7 +2174,7 @@ impl Timeline { walredo_mgr, walreceiver: Mutex::new(None), - remote_client: resources.remote_client.map(Arc::new), + remote_client: Arc::new(resources.remote_client), // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { @@ -2437,10 +2421,6 @@ impl Timeline { discovered_layers.push((layer_file_name, local_path, file_size)); continue; } - Discovered::Metadata => { - warn!("found legacy metadata file, these should have been removed in load_tenant_config"); - continue; - } Discovered::IgnoredBackup => { continue; } @@ -2487,12 +2467,10 @@ impl Timeline { if local.metadata.file_size() == remote.file_size() { // Use the local file, but take the remote metadata so that we pick up // the correct generation. - UseLocal( - LocalLayerFileMetadata { - metadata: remote, - local_path: local.local_path - } - ) + UseLocal(LocalLayerFileMetadata { + metadata: remote, + local_path: local.local_path, + }) } else { init::cleanup_local_file_for_remote(&local, &remote)?; UseRemote { local, remote } @@ -2501,7 +2479,11 @@ impl Timeline { Ok(decision) => decision, Err(DismissedLayer::Future { local }) => { if let Some(local) = local { - init::cleanup_future_layer(&local.local_path, &name, disk_consistent_lsn)?; + init::cleanup_future_layer( + &local.local_path, + &name, + disk_consistent_lsn, + )?; } needs_cleanup.push(name); continue; @@ -2523,7 +2505,8 @@ impl Timeline { let layer = match decision { UseLocal(local) => { total_physical_size += local.metadata.file_size(); - Layer::for_resident(conf, &this, local.local_path, name, local.metadata).drop_eviction_guard() + Layer::for_resident(conf, &this, local.local_path, name, local.metadata) + .drop_eviction_guard() } Evicted(remote) | UseRemote { remote, .. } => { Layer::for_evicted(conf, &this, name, remote) @@ -2543,36 +2526,36 @@ impl Timeline { guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1); - if let Some(rtc) = self.remote_client.as_ref() { - rtc.schedule_layer_file_deletion(&needs_cleanup)?; - rtc.schedule_index_upload_for_file_changes()?; - // This barrier orders above DELETEs before any later operations. - // This is critical because code executing after the barrier might - // create again objects with the same key that we just scheduled for deletion. - // For example, if we just scheduled deletion of an image layer "from the future", - // later compaction might run again and re-create the same image layer. - // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn. - // "same" here means same key range and LSN. - // - // Without a barrier between above DELETEs and the re-creation's PUTs, - // the upload queue may execute the PUT first, then the DELETE. - // In our example, we will end up with an IndexPart referencing a non-existent object. - // - // 1. a future image layer is created and uploaded - // 2. ps restart - // 3. the future layer from (1) is deleted during load layer map - // 4. image layer is re-created and uploaded - // 5. deletion queue would like to delete (1) but actually deletes (4) - // 6. delete by name works as expected, but it now deletes the wrong (later) version - // - // See https://github.com/neondatabase/neon/issues/5878 - // - // NB: generation numbers naturally protect against this because they disambiguate - // (1) and (4) - rtc.schedule_barrier()?; - // Tenant::create_timeline will wait for these uploads to happen before returning, or - // on retry. - } + self.remote_client + .schedule_layer_file_deletion(&needs_cleanup)?; + self.remote_client + .schedule_index_upload_for_file_changes()?; + // This barrier orders above DELETEs before any later operations. + // This is critical because code executing after the barrier might + // create again objects with the same key that we just scheduled for deletion. + // For example, if we just scheduled deletion of an image layer "from the future", + // later compaction might run again and re-create the same image layer. + // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn. + // "same" here means same key range and LSN. + // + // Without a barrier between above DELETEs and the re-creation's PUTs, + // the upload queue may execute the PUT first, then the DELETE. + // In our example, we will end up with an IndexPart referencing a non-existent object. + // + // 1. a future image layer is created and uploaded + // 2. ps restart + // 3. the future layer from (1) is deleted during load layer map + // 4. image layer is re-created and uploaded + // 5. deletion queue would like to delete (1) but actually deletes (4) + // 6. delete by name works as expected, but it now deletes the wrong (later) version + // + // See https://github.com/neondatabase/neon/issues/5878 + // + // NB: generation numbers naturally protect against this because they disambiguate + // (1) and (4) + self.remote_client.schedule_barrier()?; + // Tenant::create_timeline will wait for these uploads to happen before returning, or + // on retry. info!( "loaded layer map with {} layers at {}, total physical size: {}", @@ -3025,9 +3008,6 @@ impl Timeline { /// should treat this as a cue to simply skip doing any heatmap uploading /// for this timeline. pub(crate) async fn generate_heatmap(&self) -> Option { - // no point in heatmaps without remote client - let _remote_client = self.remote_client.as_ref()?; - if !self.is_active() { return None; } @@ -3055,10 +3035,7 @@ impl Timeline { // branchpoint in the value in IndexPart::lineage self.ancestor_lsn == lsn || (self.ancestor_lsn == Lsn::INVALID - && self - .remote_client - .as_ref() - .is_some_and(|rtc| rtc.is_previous_ancestor_lsn(lsn))) + && self.remote_client.is_previous_ancestor_lsn(lsn)) } } @@ -3978,29 +3955,23 @@ impl Timeline { x.unwrap() )); - if let Some(remote_client) = &self.remote_client { - for layer in layers_to_upload { - remote_client.schedule_layer_file_upload(layer)?; - } - remote_client.schedule_index_upload_for_metadata_update(&update)?; + for layer in layers_to_upload { + self.remote_client.schedule_layer_file_upload(layer)?; } + self.remote_client + .schedule_index_upload_for_metadata_update(&update)?; Ok(()) } pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> { - if let Some(remote_client) = &self.remote_client { - remote_client - .preserve_initdb_archive( - &self.tenant_shard_id.tenant_id, - &self.timeline_id, - &self.cancel, - ) - .await?; - } else { - bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id); - } - Ok(()) + self.remote_client + .preserve_initdb_archive( + &self.tenant_shard_id.tenant_id, + &self.timeline_id, + &self.cancel, + ) + .await } // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked @@ -4361,12 +4332,7 @@ impl Timeline { return; } - if self - .remote_client - .as_ref() - .map(|c| c.is_deleting()) - .unwrap_or(false) - { + if self.remote_client.is_deleting() { // The timeline was created in a deletion-resume state, we don't expect logical size to be populated return; } @@ -4534,9 +4500,8 @@ impl Timeline { // deletion will happen later, the layer file manager calls garbage_collect_on_drop guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics); - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.schedule_compaction_update(&remove_layers, new_deltas)?; - } + self.remote_client + .schedule_compaction_update(&remove_layers, new_deltas)?; drop_wlock(guard); @@ -4554,9 +4519,8 @@ impl Timeline { let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect(); - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.schedule_compaction_update(&drop_layers, &upload_layers)?; - } + self.remote_client + .schedule_compaction_update(&drop_layers, &upload_layers)?; Ok(()) } @@ -4566,16 +4530,14 @@ impl Timeline { self: &Arc, new_images: impl IntoIterator, ) -> anyhow::Result<()> { - let Some(remote_client) = &self.remote_client else { - return Ok(()); - }; for layer in new_images { - remote_client.schedule_layer_file_upload(layer)?; + self.remote_client.schedule_layer_file_upload(layer)?; } // should any new image layer been created, not uploading index_part will // result in a mismatch between remote_physical_size and layermap calculated // size, which will fail some tests, but should not be an issue otherwise. - remote_client.schedule_index_upload_for_file_changes()?; + self.remote_client + .schedule_index_upload_for_file_changes()?; Ok(()) } @@ -4861,9 +4823,7 @@ impl Timeline { result.layers_removed = gc_layers.len() as u64; - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.schedule_gc_update(&gc_layers)?; - } + self.remote_client.schedule_gc_update(&gc_layers)?; guard.finish_gc_timeline(&gc_layers); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 4226bf431e..ed48b4c9cb 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -295,13 +295,11 @@ impl Timeline { // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage self.rewrite_layers(replace_layers, drop_layers).await?; - if let Some(remote_client) = self.remote_client.as_ref() { - // We wait for all uploads to complete before finishing this compaction stage. This is not - // necessary for correctness, but it simplifies testing, and avoids proceeding with another - // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O - // load. - remote_client.wait_completion().await?; - } + // We wait for all uploads to complete before finishing this compaction stage. This is not + // necessary for correctness, but it simplifies testing, and avoids proceeding with another + // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O + // load. + self.remote_client.wait_completion().await?; Ok(()) } diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d8701be170..901f5149b3 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -26,19 +26,21 @@ use super::{Timeline, TimelineResources}; /// during attach or pageserver restart. /// See comment in persist_index_part_with_deleted_flag. async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTimelineError> { - if let Some(remote_client) = timeline.remote_client.as_ref() { - match remote_client.persist_index_part_with_deleted_flag().await { - // If we (now, or already) marked it successfully as deleted, we can proceed - Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (), - // Bail out otherwise - // - // AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents - // two tasks from performing the deletion at the same time. The first task - // that starts deletion should run it to completion. - Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_)) - | Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => { - return Err(DeleteTimelineError::Other(anyhow::anyhow!(e))); - } + match timeline + .remote_client + .persist_index_part_with_deleted_flag() + .await + { + // If we (now, or already) marked it successfully as deleted, we can proceed + Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (), + // Bail out otherwise + // + // AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents + // two tasks from performing the deletion at the same time. The first task + // that starts deletion should run it to completion. + Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_)) + | Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => { + return Err(DeleteTimelineError::Other(anyhow::anyhow!(e))); } } Ok(()) @@ -117,11 +119,11 @@ pub(super) async fn delete_local_timeline_directory( /// Removes remote layers and an index file after them. async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> { - if let Some(remote_client) = &timeline.remote_client { - remote_client.delete_all().await.context("delete_all")? - }; - - Ok(()) + timeline + .remote_client + .delete_all() + .await + .context("delete_all") } // This function removs remaining traces of a timeline on disk. @@ -260,7 +262,7 @@ impl DeleteTimelineFlow { tenant: Arc, timeline_id: TimelineId, local_metadata: &TimelineMetadata, - remote_client: Option, + remote_client: RemoteTimelineClient, deletion_queue_client: DeletionQueueClient, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 9471ba860f..7f59758c87 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -70,10 +70,6 @@ pub(super) async fn prepare( ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> { use Error::*; - if detached.remote_client.as_ref().is_none() { - unimplemented!("no new code for running without remote storage"); - } - let Some((ancestor, ancestor_lsn)) = detached .ancestor_timeline .as_ref() @@ -315,8 +311,6 @@ async fn upload_rewritten_layer( // FIXME: better shuttingdown error target .remote_client - .as_ref() - .unwrap() .upload_layer_file(&copied, cancel) .await .map_err(UploadRewritten)?; @@ -406,8 +400,6 @@ async fn remote_copy( // FIXME: better shuttingdown error adoptee .remote_client - .as_ref() - .unwrap() .copy_timeline_layer(adopted, &owned, cancel) .await .map(move |()| owned) @@ -421,11 +413,6 @@ pub(super) async fn complete( prepared: PreparedTimelineDetach, _ctx: &RequestContext, ) -> Result, anyhow::Error> { - let rtc = detached - .remote_client - .as_ref() - .expect("has to have a remote timeline client for timeline ancestor detach"); - let PreparedTimelineDetach { layers } = prepared; let ancestor = detached @@ -442,11 +429,13 @@ pub(super) async fn complete( // // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart // which could give us a completely wrong layer combination. - rtc.schedule_adding_existing_layers_to_index_detach_and_wait( - &layers, - (ancestor.timeline_id, ancestor_lsn), - ) - .await?; + detached + .remote_client + .schedule_adding_existing_layers_to_index_detach_and_wait( + &layers, + (ancestor.timeline_id, ancestor_lsn), + ) + .await?; let mut tasks = tokio::task::JoinSet::new(); @@ -491,8 +480,6 @@ pub(super) async fn complete( async move { let res = timeline .remote_client - .as_ref() - .expect("reparented has to have remote client because detached has one") .schedule_reparenting_and_wait(&new_parent) .await; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 3567761b9a..8a8c38d0ce 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -23,7 +23,7 @@ use std::{ use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; +use tracing::{debug, info, info_span, instrument, warn, Instrument}; use crate::{ context::{DownloadBehavior, RequestContext}, @@ -211,11 +211,6 @@ impl Timeline { // So, we just need to deal with this. - if self.remote_client.is_none() { - error!("no remote storage configured, cannot evict layers"); - return ControlFlow::Continue(()); - } - let mut js = tokio::task::JoinSet::new(); { let guard = self.layers.read().await; diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs index 66aa765015..feadc79e5e 100644 --- a/pageserver/src/tenant/timeline/init.rs +++ b/pageserver/src/tenant/timeline/init.rs @@ -9,7 +9,6 @@ use crate::{ storage_layer::LayerName, Generation, }, - METADATA_FILE_NAME, }; use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; @@ -27,8 +26,6 @@ pub(super) enum Discovered { Temporary(String), /// Temporary on-demand download files, should be removed TemporaryDownload(String), - /// "metadata" file we persist locally and include in `index_part.json` - Metadata, /// Backup file from previously future layers IgnoredBackup, /// Unrecognized, warn about these @@ -49,9 +46,7 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result { - if file_name == METADATA_FILE_NAME { - Discovered::Metadata - } else if file_name.ends_with(".old") { + if file_name.ends_with(".old") { // ignore these Discovered::IgnoredBackup } else if remote_timeline_client::is_temp_download_file(direntry.path()) { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9776d4ce88..3decea0c6d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -153,10 +153,7 @@ impl PostgresRedoManager { process: self .redo_process .get() - .map(|p| WalRedoManagerProcessStatus { - pid: p.id(), - kind: std::borrow::Cow::Borrowed(p.kind().into()), - }), + .map(|p| WalRedoManagerProcessStatus { pid: p.id() }), } } } diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index ad6b4e5fe9..02c9c04bf1 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -1,7 +1,10 @@ +/// Layer of indirection previously used to support multiple implementations. +/// Subject to removal: use std::time::Duration; use bytes::Bytes; use pageserver_api::{reltag::RelTag, shard::TenantShardId}; +use tracing::warn; use utils::lsn::Lsn; use crate::{config::PageServerConf, walrecord::NeonWalRecord}; @@ -12,7 +15,6 @@ mod protocol; mod process_impl { pub(super) mod process_async; - pub(super) mod process_std; } #[derive( @@ -34,10 +36,7 @@ pub enum Kind { Async, } -pub(crate) enum Process { - Sync(process_impl::process_std::WalRedoProcess), - Async(process_impl::process_async::WalRedoProcess), -} +pub(crate) struct Process(process_impl::process_async::WalRedoProcess); impl Process { #[inline(always)] @@ -46,18 +45,17 @@ impl Process { tenant_shard_id: TenantShardId, pg_version: u32, ) -> anyhow::Result { - Ok(match conf.walredo_process_kind { - Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch( - conf, - tenant_shard_id, - pg_version, - )?), - Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch( - conf, - tenant_shard_id, - pg_version, - )?), - }) + if conf.walredo_process_kind != Kind::Async { + warn!( + configured = %conf.walredo_process_kind, + "the walredo_process_kind setting has been turned into a no-op, using async implementation" + ); + } + Ok(Self(process_impl::process_async::WalRedoProcess::launch( + conf, + tenant_shard_id, + pg_version, + )?)) } #[inline(always)] @@ -69,29 +67,12 @@ impl Process { records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> anyhow::Result { - match self { - Process::Sync(p) => { - p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) - .await - } - Process::Async(p) => { - p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) - .await - } - } + self.0 + .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) + .await } pub(crate) fn id(&self) -> u32 { - match self { - Process::Sync(p) => p.id(), - Process::Async(p) => p.id(), - } - } - - pub(crate) fn kind(&self) -> Kind { - match self { - Process::Sync(_) => Kind::Sync, - Process::Async(_) => Kind::Async, - } + self.0.id() } } diff --git a/pageserver/src/walredo/process/process_impl/process_std.rs b/pageserver/src/walredo/process/process_impl/process_std.rs deleted file mode 100644 index e7a6c263c9..0000000000 --- a/pageserver/src/walredo/process/process_impl/process_std.rs +++ /dev/null @@ -1,405 +0,0 @@ -use self::no_leak_child::NoLeakChild; -use crate::{ - config::PageServerConf, - metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, - walrecord::NeonWalRecord, - walredo::process::{no_leak_child, protocol}, -}; -use anyhow::Context; -use bytes::Bytes; -use nix::poll::{PollFd, PollFlags}; -use pageserver_api::{reltag::RelTag, shard::TenantShardId}; -use postgres_ffi::BLCKSZ; -use std::os::fd::AsRawFd; -#[cfg(feature = "testing")] -use std::sync::atomic::AtomicUsize; -use std::{ - collections::VecDeque, - io::{Read, Write}, - process::{ChildStdin, ChildStdout, Command, Stdio}, - sync::{Mutex, MutexGuard}, - time::Duration, -}; -use tracing::{debug, error, instrument, Instrument}; -use utils::{lsn::Lsn, nonblock::set_nonblock}; - -pub struct WalRedoProcess { - #[allow(dead_code)] - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - // Some() on construction, only becomes None on Drop. - child: Option, - stdout: Mutex, - stdin: Mutex, - /// Counter to separate same sized walredo inputs failing at the same millisecond. - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize, -} - -struct ProcessInput { - stdin: ChildStdin, - n_requests: usize, -} - -struct ProcessOutput { - stdout: ChildStdout, - pending_responses: VecDeque>, - n_processed_responses: usize, -} - -impl WalRedoProcess { - // - // Start postgres binary in special WAL redo mode. - // - #[instrument(skip_all,fields(pg_version=pg_version))] - pub(crate) fn launch( - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - pg_version: u32, - ) -> anyhow::Result { - crate::span::debug_assert_current_span_has_tenant_id(); - - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; - - use no_leak_child::NoLeakChildCommandExt; - // Start postgres itself - let child = Command::new(pg_bin_dir_path.join("postgres")) - // the first arg must be --wal-redo so the child process enters into walredo mode - .arg("--wal-redo") - // the child doesn't process this arg, but, having it in the argv helps indentify the - // walredo process for a particular tenant when debugging a pagserver - .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir_path) - .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) - // NB: The redo process is not trusted after we sent it the first - // walredo work. Before that, it is trusted. Specifically, we trust - // it to - // 1. close all file descriptors except stdin, stdout, stderr because - // pageserver might not be 100% diligent in setting FD_CLOEXEC on all - // the files it opens, and - // 2. to use seccomp to sandbox itself before processing the first - // walredo request. - .spawn_no_leak_child(tenant_shard_id) - .context("spawn process")?; - WAL_REDO_PROCESS_COUNTERS.started.inc(); - let mut child = scopeguard::guard(child, |child| { - error!("killing wal-redo-postgres process due to a problem during launch"); - child.kill_and_wait(WalRedoKillCause::Startup); - }); - - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - let stderr = tokio::process::ChildStderr::from_std(stderr) - .context("convert to tokio::ChildStderr")?; - macro_rules! set_nonblock_or_log_err { - ($file:ident) => {{ - let res = set_nonblock($file.as_raw_fd()); - if let Err(e) = &res { - error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); - } - res - }}; - } - set_nonblock_or_log_err!(stdin)?; - set_nonblock_or_log_err!(stdout)?; - - // all fallible operations post-spawn are complete, so get rid of the guard - let child = scopeguard::ScopeGuard::into_inner(child); - - tokio::spawn( - async move { - scopeguard::defer! { - debug!("wal-redo-postgres stderr_logger_task finished"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); - } - debug!("wal-redo-postgres stderr_logger_task started"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); - - use tokio::io::AsyncBufReadExt; - let mut stderr_lines = tokio::io::BufReader::new(stderr); - let mut buf = Vec::new(); - let res = loop { - buf.clear(); - // TODO we don't trust the process to cap its stderr length. - // Currently it can do unbounded Vec allocation. - match stderr_lines.read_until(b'\n', &mut buf).await { - Ok(0) => break Ok(()), // eof - Ok(num_bytes) => { - let output = String::from_utf8_lossy(&buf[..num_bytes]); - error!(%output, "received output"); - } - Err(e) => { - break Err(e); - } - } - }; - match res { - Ok(()) => (), - Err(e) => { - error!(error=?e, "failed to read from walredo stderr"); - } - } - }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) - ); - - Ok(Self { - conf, - tenant_shard_id, - child: Some(child), - stdin: Mutex::new(ProcessInput { - stdin, - n_requests: 0, - }), - stdout: Mutex::new(ProcessOutput { - stdout, - pending_responses: VecDeque::new(), - n_processed_responses: 0, - }), - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize::default(), - }) - } - - pub(crate) fn id(&self) -> u32 { - self.child - .as_ref() - .expect("must not call this during Drop") - .id() - } - - // Apply given WAL records ('records') over an old page image. Returns - // new page image. - // - #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] - pub(crate) async fn apply_wal_records( - &self, - rel: RelTag, - blknum: u32, - base_img: &Option, - records: &[(Lsn, NeonWalRecord)], - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let tag = protocol::BufferTag { rel, blknum }; - let input = self.stdin.lock().unwrap(); - - // Serialize all the messages to send the WAL redo process first. - // - // This could be problematic if there are millions of records to replay, - // but in practice the number of records is usually so small that it doesn't - // matter, and it's better to keep this code simple. - // - // Most requests start with a before-image with BLCKSZ bytes, followed by - // by some other WAL records. Start with a buffer that can hold that - // comfortably. - let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); - protocol::build_begin_redo_for_block_msg(tag, &mut writebuf); - if let Some(img) = base_img { - protocol::build_push_page_msg(tag, img, &mut writebuf); - } - for (lsn, rec) in records.iter() { - if let NeonWalRecord::Postgres { - will_init: _, - rec: postgres_rec, - } = rec - { - protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); - } else { - anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); - } - } - protocol::build_get_page_msg(tag, &mut writebuf); - WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - - let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); - - if res.is_err() { - // not all of these can be caused by this particular input, however these are so rare - // in tests so capture all. - self.record_and_log(&writebuf); - } - - res - } - - fn apply_wal_records0( - &self, - writebuf: &[u8], - input: MutexGuard, - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. - let mut nwrite = 0usize; - - while nwrite < writebuf.len() { - let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; - let n = loop { - match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If 'stdin' is writeable, do write. - let in_revents = stdin_pollfds[0].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += proc.stdin.write(&writebuf[nwrite..])?; - } - if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - anyhow::bail!("WAL redo process closed its stdin unexpectedly"); - } - } - let request_no = proc.n_requests; - proc.n_requests += 1; - drop(proc); - - // To improve walredo performance we separate sending requests and receiving - // responses. Them are protected by different mutexes (output and input). - // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process - // then there is not warranty that T1 will first granted output mutex lock. - // To address this issue we maintain number of sent requests, number of processed - // responses and ring buffer with pending responses. After sending response - // (under input mutex), threads remembers request number. Then it releases - // input mutex, locks output mutex and fetch in ring buffer all responses until - // its stored request number. The it takes correspondent element from - // pending responses ring buffer and truncate all empty elements from the front, - // advancing processed responses number. - - let mut output = self.stdout.lock().unwrap(); - let n_processed_responses = output.n_processed_responses; - while n_processed_responses + output.pending_responses.len() <= request_no { - // We expect the WAL redo process to respond with an 8k page image. We read it - // into this buffer. - let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far - while nresult < BLCKSZ.into() { - let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; - // We do two things simultaneously: reading response from stdout - // and forward any logging information that the child writes to its stderr to the page server's log. - let n = loop { - match nix::poll::poll( - &mut stdout_pollfds[..], - wal_redo_timeout.as_millis() as i32, - ) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If we have some data in stdout, read it to the result buffer. - let out_revents = stdout_pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += output.stdout.read(&mut resultbuf[nresult..])?; - } - if out_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stdout unexpectedly"); - } - } - output - .pending_responses - .push_back(Some(Bytes::from(resultbuf))); - } - // Replace our request's response with None in `pending_responses`. - // Then make space in the ring buffer by clearing out any seqence of contiguous - // `None`'s from the front of `pending_responses`. - // NB: We can't pop_front() because other requests' responses because another - // requester might have grabbed the output mutex before us: - // T1: grab input mutex - // T1: send request_no 23 - // T1: release input mutex - // T2: grab input mutex - // T2: send request_no 24 - // T2: release input mutex - // T2: grab output mutex - // T2: n_processed_responses + output.pending_responses.len() <= request_no - // 23 0 24 - // T2: enters poll loop that reads stdout - // T2: put response for 23 into pending_responses - // T2: put response for 24 into pending_resposnes - // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back - // T2: takes its response_24 - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: releases output mutex - // T1: grabs output mutex - // T1: n_processed_responses + output.pending_responses.len() > request_no - // 23 2 23 - // T1: skips poll loop that reads stdout - // T1: takes its response_23 - // pending_responses now looks like this: Front None None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Back - // n_processed_responses now has value 25 - let res = output.pending_responses[request_no - n_processed_responses] - .take() - .expect("we own this request_no, nobody else is supposed to take it"); - while let Some(front) = output.pending_responses.front() { - if front.is_none() { - output.pending_responses.pop_front(); - output.n_processed_responses += 1; - } else { - break; - } - } - Ok(res) - } - - #[cfg(feature = "testing")] - fn record_and_log(&self, writebuf: &[u8]) { - use std::sync::atomic::Ordering; - - let millis = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(); - - let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); - - // these files will be collected to an allure report - let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); - - let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); - - let res = std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .read(true) - .open(path) - .and_then(|mut f| f.write_all(writebuf)); - - // trip up allowed_errors - if let Err(e) = res { - tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); - } else { - tracing::error!(filename, "erroring walredo input saved"); - } - } - - #[cfg(not(feature = "testing"))] - fn record_and_log(&self, _: &[u8]) {} -} - -impl Drop for WalRedoProcess { - fn drop(&mut self) { - self.child - .take() - .expect("we only do this once") - .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); - // no way to wait for stderr_logger_task from Drop because that is async only - } -} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 62a4b974a3..a6fd4792dd 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -59,6 +59,7 @@ from fixtures.pageserver.http import PageserverHttpClient from fixtures.pageserver.utils import ( wait_for_last_record_lsn, wait_for_upload, + wait_for_upload_queue_empty, ) from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor @@ -79,6 +80,7 @@ from fixtures.utils import ( allure_attach_from_dir, assert_no_errors, get_self_dir, + print_gc_result, subprocess_capture, wait_until, ) @@ -1390,8 +1392,8 @@ def neon_env_builder( test_overlay_dir: Path, top_output_dir: Path, pageserver_virtual_file_io_engine: str, - pageserver_aux_file_policy: Optional[AuxFileStore] = None, - pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, + pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]], + pageserver_aux_file_policy: Optional[AuxFileStore], ) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -4419,3 +4421,79 @@ def parse_project_git_version_output(s: str) -> str: return commit raise ValueError(f"unable to parse --version output: '{s}'") + + +def generate_uploads_and_deletions( + env: NeonEnv, + *, + init: bool = True, + tenant_id: Optional[TenantId] = None, + timeline_id: Optional[TimelineId] = None, + data: Optional[str] = None, + pageserver: NeonPageserver, +): + """ + Using the environment's default tenant + timeline, generate a load pattern + that results in some uploads and some deletions to remote storage. + """ + + if tenant_id is None: + tenant_id = env.initial_tenant + assert tenant_id is not None + + if timeline_id is None: + timeline_id = env.initial_timeline + assert timeline_id is not None + + ps_http = pageserver.http_client() + + with env.endpoints.create_start( + "main", tenant_id=tenant_id, pageserver_id=pageserver.id + ) as endpoint: + if init: + endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") + last_flush_lsn_upload( + env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + ) + + def churn(data): + endpoint.safe_psql_many( + [ + f""" + INSERT INTO foo (id, val) + SELECT g, '{data}' + FROM generate_series(1, 200) g + ON CONFLICT (id) DO UPDATE + SET val = EXCLUDED.val + """, + # to ensure that GC can actually remove some layers + "VACUUM foo", + ] + ) + assert tenant_id is not None + assert timeline_id is not None + # We are waiting for uploads as well as local flush, in order to avoid leaving the system + # in a state where there are "future layers" in remote storage that will generate deletions + # after a restart. + last_flush_lsn_upload( + env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id + ) + + # Compaction should generate some GC-elegible layers + for i in range(0, 2): + churn(f"{i if data is None else data}") + + gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0) + print_gc_result(gc_result) + assert gc_result["layers_removed"] > 0 + + # Stop endpoint and flush all data to pageserver, then checkpoint it: this + # ensures that the pageserver is in a fully idle state: there will be no more + # background ingest, no more uploads pending, and therefore no non-determinism + # in subsequent actions like pageserver restarts. + final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) + ps_http.timeline_checkpoint(tenant_id, timeline_id) + # Finish uploads + wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn) + # Finish all remote writes (including deletions) + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) diff --git a/test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py b/test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py new file mode 100644 index 0000000000..644c1f559b --- /dev/null +++ b/test_runner/performance/pageserver/pagebench/test_ondemand_download_churn.py @@ -0,0 +1,175 @@ +import json +from pathlib import Path +from typing import Any, Dict, Tuple + +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn +from fixtures.pageserver.utils import wait_for_upload_queue_empty +from fixtures.remote_storage import s3_storage +from fixtures.utils import humantime_to_ms + + +@pytest.mark.parametrize("duration", [30]) +@pytest.mark.parametrize("io_engine", ["tokio-epoll-uring", "std-fs"]) +@pytest.mark.parametrize("concurrency_per_target", [1, 10, 100]) +@pytest.mark.timeout(1000) +def test_download_churn( + neon_env_builder: NeonEnvBuilder, + zenbenchmark: NeonBenchmarker, + pg_bin: PgBin, + io_engine: str, + concurrency_per_target: int, + duration: int, +): + def record(metric, **kwargs): + zenbenchmark.record(metric_name=f"pageserver_ondemand_download_churn.{metric}", **kwargs) + + params: Dict[str, Tuple[Any, Dict[str, Any]]] = {} + + # params from fixtures + params.update( + { + # we don't capture `duration`, but instead use the `runtime` output field from pagebench + } + ) + + # configure cache sizes like in prod + page_cache_size = 16384 + max_file_descriptors = 500000 + neon_env_builder.pageserver_config_override = ( + f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}" + ) + params.update( + { + "pageserver_config_override.page_cache_size": ( + page_cache_size * 8192, + {"unit": "byte"}, + ), + "pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}), + } + ) + + for param, (value, kwargs) in params.items(): + record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs) + + # Setup env + env = setup_env(neon_env_builder, pg_bin) + env.pageserver.allowed_errors.append( + f".*path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing" + ) + + run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration) + + +def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): + remote_storage_kind = s3_storage() + neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) + + # We configure tenant conf such that SQL query below produces a lot of layers. + # We don't care what's in the layers really, we just care that layers are created. + bytes_per_layer = 10 * (1024**2) + env = neon_env_builder.init_start( + initial_tenant_conf={ + "pitr_interval": "1000d", # let's not make it get in the way + "gc_period": "0s", # disable periodic gc to avoid noise + "compaction_period": "0s", # disable L0=>L1 compaction + "checkpoint_timeout": "10years", # rely solely on checkpoint_distance + "checkpoint_distance": bytes_per_layer, # 10M instead of 256M to create more smaller layers + "image_creation_threshold": 100000, # don't create image layers ever + } + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + client = env.pageserver.http_client() + + with env.endpoints.create_start("main", tenant_id=tenant_id) as ep: + ep.safe_psql("CREATE TABLE data (random_text text)") + bytes_per_row = 512 # make big enough so WAL record size doesn't dominate + desired_layers = 300 + desired_bytes = bytes_per_layer * desired_layers + nrows = desired_bytes / bytes_per_row + ep.safe_psql( + f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i", + options="-c statement_timeout=0", + ) + wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id) + # TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here + wait_for_upload_queue_empty(client, tenant_id, timeline_id) + + return env + + +def run_benchmark( + env: NeonEnv, + pg_bin: PgBin, + record, + io_engine: str, + concurrency_per_target: int, + duration_secs: int, +): + ps_http = env.pageserver.http_client() + cmd = [ + str(env.neon_binpath / "pagebench"), + "ondemand-download-churn", + "--mgmt-api-endpoint", + ps_http.base_url, + "--runtime", + f"{duration_secs}s", + "--set-io-engine", + f"{io_engine}", + "--concurrency-per-target", + f"{concurrency_per_target}", + # don't specify the targets explicitly, let pagebench auto-discover them + ] + + log.info(f"command: {' '.join(cmd)}") + basepath = pg_bin.run_capture(cmd, with_command_header=False) + results_path = Path(basepath + ".stdout") + log.info(f"Benchmark results at: {results_path}") + + with open(results_path, "r") as f: + results = json.load(f) + log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}") + + metric = "downloads_count" + record( + metric, + metric_value=results[metric], + unit="", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "downloads_bytes" + record( + metric, + metric_value=results[metric], + unit="byte", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "evictions_count" + record( + metric, + metric_value=results[metric], + unit="", + report=MetricReport.HIGHER_IS_BETTER, + ) + + metric = "timeline_restarts" + record( + metric, + metric_value=results[metric], + unit="", + report=MetricReport.LOWER_IS_BETTER, + ) + + metric = "runtime" + record( + metric, + metric_value=humantime_to_ms(results[metric]) / 1000, + unit="s", + report=MetricReport.TEST_PARAM, + ) diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 7d4e101189..61afd820ca 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -56,14 +56,8 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): (tenant0, timeline0, pg0) = tenant_timelines[0] log.info(f"Timeline {tenant0}/{timeline0} is left intact") - (tenant1, timeline1, pg1) = tenant_timelines[1] - metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata" - with open(metadata_path, "w") as f: - f.write("overwritten with garbage!") - log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled") - - (tenant2, timeline2, pg2) = tenant_timelines[2] - timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/" + (tenant1, timeline1, pg1) = tenant_timelines[2] + timeline_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/" for filename in os.listdir(timeline_path): if filename.startswith("00000"): # Looks like a layer file. Corrupt it @@ -72,7 +66,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): with open(p, "wb") as f: f.truncate(0) f.truncate(size) - log.info(f"Timeline {tenant2}/{timeline2} got its local layer files spoiled") + log.info(f"Timeline {tenant1}/{timeline1} got its local layer files spoiled") env.pageserver.start() @@ -80,19 +74,15 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): pg0.start() assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100 - # Tenant with corrupt local metadata works: remote storage is authoritative for metadata - pg1.start() - assert pg1.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100 - # Second timeline will fail during basebackup, because the local layer file is corrupt. # It will fail when we try to read (and reconstruct) a page from it, ergo the error message. # (We don't check layer file contents on startup, when loading the timeline) # # This will change when we implement checksums for layers with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err: - pg2.start() + pg1.start() log.info( - f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}" + f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}" ) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index 43a3323462..93a16620a3 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -1,10 +1,12 @@ +import enum import json import os from typing import Optional import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions +from fixtures.pageserver.http import PageserverApiException from fixtures.workload import Workload AGGRESIVE_COMPACTION_TENANT_CONF = { @@ -190,3 +192,61 @@ def test_sharding_compaction( # Assert that everything is still readable workload.validate() + + +class CompactionAlgorithm(str, enum.Enum): + LEGACY = "Legacy" + TIERED = "Tiered" + + +@pytest.mark.parametrize( + "compaction_algorithm", [CompactionAlgorithm.LEGACY, CompactionAlgorithm.TIERED] +) +def test_uploads_and_deletions( + neon_env_builder: NeonEnvBuilder, + compaction_algorithm: CompactionAlgorithm, +): + """ + :param compaction_algorithm: the compaction algorithm to use. + """ + + tenant_conf = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": f"{128 * 1024}", + "compaction_threshold": "1", + "compaction_target_size": f"{128 * 1024}", + # no PITR horizon, we specify the horizon when we request on-demand GC + "pitr_interval": "0s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # create image layers eagerly, so that GC can remove some layers + "image_creation_threshold": "1", + "image_layer_creation_check_threshold": "0", + "compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}), + } + env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf) + + # TODO remove these allowed errors + # https://github.com/neondatabase/neon/issues/7707 + # https://github.com/neondatabase/neon/issues/7759 + allowed_errors = [ + ".*duplicated L1 layer.*", + ".*delta layer created with.*duplicate values.*", + ".*assertion failed: self.lsn_range.start <= lsn.*", + ".*HTTP request handler task panicked: task.*panicked.*", + ] + if compaction_algorithm == CompactionAlgorithm.TIERED: + env.pageserver.allowed_errors.extend(allowed_errors) + + try: + generate_uploads_and_deletions(env, pageserver=env.pageserver) + except PageserverApiException as e: + log.info(f"Obtained PageserverApiException: {e}") + + # The errors occur flakily and no error is ensured to occur, + # however at least one of them occurs. + if compaction_algorithm == CompactionAlgorithm.TIERED: + found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors) + if not found_allowed_error: + raise Exception("None of the allowed_errors occured in the log") diff --git a/test_runner/regress/test_pageserver_config.py b/test_runner/regress/test_pageserver_config.py deleted file mode 100644 index c04348b488..0000000000 --- a/test_runner/regress/test_pageserver_config.py +++ /dev/null @@ -1,35 +0,0 @@ -import pytest -from fixtures.neon_fixtures import ( - NeonEnvBuilder, - last_flush_lsn_upload, -) - - -@pytest.mark.parametrize("kind", ["sync", "async"]) -def test_walredo_process_kind_config(neon_env_builder: NeonEnvBuilder, kind: str): - neon_env_builder.pageserver_config_override = f"walredo_process_kind = '{kind}'" - # ensure it starts - env = neon_env_builder.init_start() - # ensure the metric is set - ps_http = env.pageserver.http_client() - metrics = ps_http.get_metrics() - samples = metrics.query_all("pageserver_wal_redo_process_kind") - assert [(s.labels, s.value) for s in samples] == [({"kind": kind}, 1)] - # ensure default tenant's config kind matches - # => write some data to force-spawn walredo - ep = env.endpoints.create_start("main") - with ep.connect() as conn: - with conn.cursor() as cur: - cur.execute("create table foo(bar text)") - cur.execute("insert into foo select from generate_series(1, 100)") - last_flush_lsn_upload(env, ep, env.initial_tenant, env.initial_timeline) - ep.stop() - ep.start() - with ep.connect() as conn: - with conn.cursor() as cur: - cur.execute("select count(*) from foo") - [(count,)] = cur.fetchall() - assert count == 100 - - status = ps_http.tenant_status(env.initial_tenant) - assert status["walredo"]["process"]["kind"] == kind diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 4fdc5852f5..9b97254410 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -21,11 +21,9 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, - NeonPageserver, PgBin, S3Scrubber, - flush_ep_to_pageserver, - last_flush_lsn_upload, + generate_uploads_and_deletions, ) from fixtures.pageserver.http import PageserverApiException from fixtures.pageserver.utils import ( @@ -33,12 +31,11 @@ from fixtures.pageserver.utils import ( list_prefix, wait_for_last_record_lsn, wait_for_upload, - wait_for_upload_queue_empty, ) from fixtures.remote_storage import ( RemoteStorageKind, ) -from fixtures.utils import print_gc_result, wait_until +from fixtures.utils import wait_until from fixtures.workload import Workload # A tenant configuration that is convenient for generating uploads and deletions @@ -59,82 +56,6 @@ TENANT_CONF = { } -def generate_uploads_and_deletions( - env: NeonEnv, - *, - init: bool = True, - tenant_id: Optional[TenantId] = None, - timeline_id: Optional[TimelineId] = None, - data: Optional[str] = None, - pageserver: NeonPageserver, -): - """ - Using the environment's default tenant + timeline, generate a load pattern - that results in some uploads and some deletions to remote storage. - """ - - if tenant_id is None: - tenant_id = env.initial_tenant - assert tenant_id is not None - - if timeline_id is None: - timeline_id = env.initial_timeline - assert timeline_id is not None - - ps_http = pageserver.http_client() - - with env.endpoints.create_start( - "main", tenant_id=tenant_id, pageserver_id=pageserver.id - ) as endpoint: - if init: - endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)") - last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id - ) - - def churn(data): - endpoint.safe_psql_many( - [ - f""" - INSERT INTO foo (id, val) - SELECT g, '{data}' - FROM generate_series(1, 200) g - ON CONFLICT (id) DO UPDATE - SET val = EXCLUDED.val - """, - # to ensure that GC can actually remove some layers - "VACUUM foo", - ] - ) - assert tenant_id is not None - assert timeline_id is not None - # We are waiting for uploads as well as local flush, in order to avoid leaving the system - # in a state where there are "future layers" in remote storage that will generate deletions - # after a restart. - last_flush_lsn_upload( - env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id - ) - - # Compaction should generate some GC-elegible layers - for i in range(0, 2): - churn(f"{i if data is None else data}") - - gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0) - print_gc_result(gc_result) - assert gc_result["layers_removed"] > 0 - - # Stop endpoint and flush all data to pageserver, then checkpoint it: this - # ensures that the pageserver is in a fully idle state: there will be no more - # background ingest, no more uploads pending, and therefore no non-determinism - # in subsequent actions like pageserver restarts. - final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) - ps_http.timeline_checkpoint(tenant_id, timeline_id) - # Finish uploads - wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn) - # Finish all remote writes (including deletions) - wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) - - def read_all( env: NeonEnv, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None ): diff --git a/vm-image-spec.yaml b/vm-image-spec.yaml index e9d983eba3..fa7cd014bf 100644 --- a/vm-image-spec.yaml +++ b/vm-image-spec.yaml @@ -293,6 +293,16 @@ files: values: [checkpoints_timed] query: | SELECT checkpoints_timed FROM pg_stat_bgwriter; + + # Number of slots is limited by max_replication_slots, so collecting position for all of them shouldn't be bad. + - metric_name: logical_slot_restart_lsn + type: gauge + help: 'restart_lsn of logical slots' + key_labels: + - slot_name + values: [restart_lsn] + query: | + select slot_name, restart_lsn from pg_replication_slots where slot_type = 'logical'; - filename: neon_collector_autoscaling.yml content: | collector_name: neon_collector_autoscaling