Compare commits

...

10 Commits

Author SHA1 Message Date
Alex Chi Z
c6d5ff944d fix(test): ensure fixtures are correctly used for pageserver_aux_file_policy (#7769)
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-15 18:29:12 +00:00
Alex Chi Z
4b97683338 feat(pageserver): use fnv hash for aux file encoding (#7742)
FNV hash is simple, portable, and stable. This pull request vendors the
FNV hash implementation from servo and modified it to use the u128
variant.

replaces https://github.com/neondatabase/neon/pull/7644

ref https://github.com/neondatabase/neon/issues/7462

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-05-15 13:17:57 -04:00
Jure Bajic
affc18f912 Add performance regress test_ondemand_download_churn.py (#7242)
Add performance regress test  for on-demand download throughput.

Closes https://github.com/neondatabase/neon/issues/7146

Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-05-15 18:41:12 +02:00
Christian Schwarz
3ef6e21211 fixup #7747: actually use the fixture for neon_env_builder (#7767)
The `= None` makes it not use the fixture.

This slipped due to last-minute changes.
2024-05-15 18:17:55 +02:00
Arpad Müller
1075386d77 Add test_uploads_and_deletions test (#7758)
Adds a test that is a reproducer for many tiered compaction bugs,
both ones that have since been fixed as well as still unfxied ones:
* (now fixed) #7296 
* #7707 
* #7759
* Likely also #7244 but I haven't tried that.

The key ordering bug can be reproduced by switching to
`merge_delta_keys` instead of `merge_delta_keys_buffered`, so reverting
a big part of #7661, although it only sometimes reproduces (30-50% of
cases).

part of https://github.com/neondatabase/neon/issues/7554
2024-05-15 15:32:47 +02:00
Christian Schwarz
c3dd646ab3 chore!: always use async walredo, warn if sync is configured (#7754)
refs https://github.com/neondatabase/neon/issues/7753

This PR is step (1) of removing sync walredo from Pageserver.

Changes:
* Remove the sync impl
* If sync is configured, warn! and use async instead
* Remove the metric that exposes `kind`
* Remove the tenant status API that exposes `kind`

Future Work
-----------

After we've released this change to prod and are sure we won't
roll back, we will

1. update the prod Ansible to remove the config flag from the prod
   pageserver.toml.
2. remove the remaining `kind` code in pageserver

These two changes need no release inbetween.

See  https://github.com/neondatabase/neon/issues/7753 for details.
2024-05-15 15:04:52 +02:00
Christian Schwarz
bc78b0e9cc chore(deps): use upstream svg_fmt after they merged our PR (#7764)
They have merged our PR https://github.com/nical/rust_debug/pull/4 but
they haven't released a new crate version yet.

refs https://github.com/neondatabase/neon/issues/7763
2024-05-15 14:18:02 +02:00
John Spray
f342b87f30 pageserver: remove Option<> around remote storage, clean up metadata file refs (#7752)
## Problem

This is historical baggage from when the pageserver could be run with
local disk only: we had a bunch of places where we had to treat remote
storage as optional.

Closes: https://github.com/neondatabase/neon/issues/6890

## Changes

- Remove Option<> around remote storage (in
https://github.com/neondatabase/neon/pull/7722 we made remote storage
clearly mandatory)
- Remove code for deleting old metadata files: they're all gone now.
- Remove other references to metadata files when loading directories, as
none exist.

I checked last 14 days of logs for "found legacy metadata", there are no
instances.
2024-05-15 12:05:24 +00:00
Alexander Bayandin
438bacc32e CI(neon-extra-builds): Use small-arm64 runners instead of large-arm64 (#7740)
## Problem
There are not enough arm runners and jobs in `neon-extra-builds` workflow
take about the same amount of time on a small-arm runner as on
large-arm.

## Summary of changes
- Switch `neon-extra-builds` workflow from `large-arm64` to
`small-arm64` runners
2024-05-15 14:29:12 +03:00
Arseny Sher
1a2a3cb446 Add restart_lsn metric for logical slots. 2024-05-15 11:19:33 +03:00
38 changed files with 823 additions and 1290 deletions

View File

@@ -5,6 +5,7 @@ self-hosted-runner:
- large
- large-arm64
- small
- small-arm64
- us-east-2
config-variables:
- REMOTE_STORAGE_AZURE_CONTAINER

View File

@@ -136,7 +136,7 @@ jobs:
check-linux-arm-build:
needs: [ check-permissions, build-build-tools-image ]
timeout-minutes: 90
runs-on: [ self-hosted, large-arm64 ]
runs-on: [ self-hosted, small-arm64 ]
env:
# Use release build only, to have less debug info around
@@ -260,7 +260,7 @@ jobs:
check-codestyle-rust-arm:
needs: [ check-permissions, build-build-tools-image ]
timeout-minutes: 90
runs-on: [ self-hosted, large-arm64 ]
runs-on: [ self-hosted, small-arm64 ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}

2
Cargo.lock generated
View File

@@ -5952,7 +5952,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg_fmt"
version = "0.4.2"
source = "git+https://github.com/neondatabase/fork--nical--rust_debug?branch=neon#c1820b28664b5df68de7f043fccf2ed5d67b6ae8"
source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4"
[[package]]
name = "syn"

View File

@@ -158,8 +158,8 @@ socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
"subtle" = "2.5.0"
# https://github.com/nical/rust_debug/pull/4
svg_fmt = { git = "https://github.com/neondatabase/fork--nical--rust_debug", branch = "neon" }
# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet
svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" }
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"

View File

@@ -745,6 +745,16 @@ impl HistoricLayerInfo {
};
*field = value;
}
pub fn layer_file_size(&self) -> u64 {
match self {
HistoricLayerInfo::Delta {
layer_file_size, ..
} => *layer_file_size,
HistoricLayerInfo::Image {
layer_file_size, ..
} => *layer_file_size,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -776,9 +786,6 @@ pub struct TimelineGcRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalRedoManagerProcessStatus {
pub pid: u32,
/// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`.
/// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`.
pub kind: Cow<'static, str>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -30,47 +30,27 @@
//! 2024-04-15 on i3en.3xlarge
//!
//! ```text
//! async-short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! async-short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! async-short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! async-short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! async-short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! async-short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! async-short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! async-short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! async-medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! async-medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! async-medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! async-medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! async-medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! async-medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! async-medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! async-medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! sync-short/1 time: [25.503 µs 25.626 µs 25.771 µs]
//! sync-short/2 time: [30.850 µs 31.013 µs 31.208 µs]
//! sync-short/4 time: [45.543 µs 45.856 µs 46.193 µs]
//! sync-short/8 time: [84.114 µs 84.639 µs 85.220 µs]
//! sync-short/16 time: [185.22 µs 186.15 µs 187.13 µs]
//! sync-short/32 time: [377.43 µs 378.87 µs 380.46 µs]
//! sync-short/64 time: [756.49 µs 759.04 µs 761.70 µs]
//! sync-short/128 time: [1.4825 ms 1.4874 ms 1.4923 ms]
//! sync-medium/1 time: [105.66 µs 106.01 µs 106.43 µs]
//! sync-medium/2 time: [153.10 µs 153.84 µs 154.72 µs]
//! sync-medium/4 time: [327.13 µs 329.44 µs 332.27 µs]
//! sync-medium/8 time: [654.26 µs 658.73 µs 663.63 µs]
//! sync-medium/16 time: [1.2682 ms 1.2748 ms 1.2816 ms]
//! sync-medium/32 time: [2.4456 ms 2.4595 ms 2.4731 ms]
//! sync-medium/64 time: [4.6523 ms 4.6890 ms 4.7256 ms]
//! sync-medium/128 time: [8.7215 ms 8.8323 ms 8.9344 ms]
//! short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! ```
use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use pageserver::{
config::PageServerConf,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, ProcessKind},
};
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
use pageserver_api::{key::Key, shard::TenantShardId};
use std::{
sync::Arc,
@@ -80,39 +60,32 @@ use tokio::{sync::Barrier, task::JoinSet};
use utils::{id::TenantId, lsn::Lsn};
fn bench(c: &mut Criterion) {
for process_kind in &[ProcessKind::Async, ProcessKind::Sync] {
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(format!("{process_kind}-short"));
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| {
bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients)
});
},
);
}
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("short");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
}
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(format!("{process_kind}-medium"));
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| {
bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients)
});
},
);
}
}
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("medium");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
}
}
}
@@ -120,16 +93,10 @@ criterion::criterion_group!(benches, bench);
criterion::criterion_main!(benches);
// Returns the sum of each client's wall-clock time spent executing their share of the n_redos.
fn bench_impl(
process_kind: ProcessKind,
redo_work: Arc<Request>,
n_redos: u64,
nclients: u64,
) -> Duration {
fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration {
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
let mut conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
conf.walredo_process_kind = process_kind;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
@@ -158,27 +125,13 @@ fn bench_impl(
});
}
let elapsed = rt.block_on(async move {
rt.block_on(async move {
let mut total_wallclock_time = Duration::ZERO;
while let Some(res) = tasks.join_next().await {
total_wallclock_time += res.unwrap();
}
total_wallclock_time
});
// consistency check to ensure process kind setting worked
if nredos_per_client > 0 {
assert_eq!(
manager
.status()
.process
.map(|p| p.kind)
.expect("the benchmark work causes a walredo process to be spawned"),
std::borrow::Cow::Borrowed(process_kind.into())
);
}
elapsed
})
}
async fn client(

View File

@@ -52,7 +52,6 @@
use anyhow::{Context, Result};
use pageserver::repository::Key;
use pageserver::METADATA_FILE_NAME;
use std::cmp::Ordering;
use std::io::{self, BufRead};
use std::path::PathBuf;
@@ -159,10 +158,6 @@ pub fn main() -> Result<()> {
let line = PathBuf::from_str(&line).unwrap();
let filename = line.file_name().unwrap();
let filename = filename.to_str().unwrap();
if filename == METADATA_FILE_NAME {
// Don't try and parse "metadata" like a key-lsn range
continue;
}
let (key_range, lsn_range) = parse_filename(filename);
files.push(Layer {
filename: filename.to_owned(),

View File

@@ -2,9 +2,11 @@ use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId};
use pageserver_client::mgmt_api;
use rand::seq::SliceRandom;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use utils::id::{TenantTimelineId, TimelineId};
use std::{f64, sync::Arc};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
task::JoinSet,
@@ -12,10 +14,7 @@ use tokio::{
use std::{
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant},
};
@@ -51,19 +50,31 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
Ok(())
}
#[derive(serde::Serialize)]
struct Output {
downloads_count: u64,
downloads_bytes: u64,
evictions_count: u64,
timeline_restarts: u64,
#[serde(with = "humantime_serde")]
runtime: Duration,
}
#[derive(Debug, Default)]
struct LiveStats {
evictions: AtomicU64,
downloads: AtomicU64,
evictions_count: AtomicU64,
downloads_count: AtomicU64,
downloads_bytes: AtomicU64,
timeline_restarts: AtomicU64,
}
impl LiveStats {
fn eviction_done(&self) {
self.evictions.fetch_add(1, Ordering::Relaxed);
self.evictions_count.fetch_add(1, Ordering::Relaxed);
}
fn download_done(&self) {
self.downloads.fetch_add(1, Ordering::Relaxed);
fn download_done(&self, size: u64) {
self.downloads_count.fetch_add(1, Ordering::Relaxed);
self.downloads_bytes.fetch_add(size, Ordering::Relaxed);
}
fn timeline_restart_done(&self) {
self.timeline_restarts.fetch_add(1, Ordering::Relaxed);
@@ -92,28 +103,49 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
)
.await?;
let token = CancellationToken::new();
let mut tasks = JoinSet::new();
let live_stats = Arc::new(LiveStats::default());
let periodic_stats = Arc::new(LiveStats::default());
let total_stats = Arc::new(LiveStats::default());
let start = Instant::now();
tasks.spawn({
let live_stats = Arc::clone(&live_stats);
let periodic_stats = Arc::clone(&periodic_stats);
let total_stats = Arc::clone(&total_stats);
let cloned_token = token.clone();
async move {
let mut last_at = Instant::now();
loop {
if cloned_token.is_cancelled() {
return;
}
tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await;
let now = Instant::now();
let delta: Duration = now - last_at;
last_at = now;
let LiveStats {
evictions,
downloads,
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*live_stats;
let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
} = &*periodic_stats;
let evictions_count = evictions_count.swap(0, Ordering::Relaxed);
let downloads_count = downloads_count.swap(0, Ordering::Relaxed);
let downloads_bytes = downloads_bytes.swap(0, Ordering::Relaxed);
let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed);
info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}");
total_stats.evictions_count.fetch_add(evictions_count, Ordering::Relaxed);
total_stats.downloads_count.fetch_add(downloads_count, Ordering::Relaxed);
total_stats.downloads_bytes.fetch_add(downloads_bytes, Ordering::Relaxed);
total_stats.timeline_restarts.fetch_add(timeline_restarts, Ordering::Relaxed);
let evictions_per_s = evictions_count as f64 / delta.as_secs_f64();
let downloads_per_s = downloads_count as f64 / delta.as_secs_f64();
let downloads_mibs_per_s = downloads_bytes as f64 / delta.as_secs_f64() / ((1 << 20) as f64);
info!("evictions={evictions_per_s:.2}/s downloads={downloads_per_s:.2}/s download_bytes={downloads_mibs_per_s:.2}MiB/s timeline_restarts={timeline_restarts}");
}
}
});
@@ -124,14 +156,42 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
args,
Arc::clone(&mgmt_api_client),
tl,
Arc::clone(&live_stats),
Arc::clone(&periodic_stats),
token.clone(),
));
}
}
if let Some(runtime) = args.runtime {
tokio::spawn(async move {
tokio::time::sleep(runtime.into()).await;
token.cancel();
});
}
while let Some(res) = tasks.join_next().await {
res.unwrap();
}
let end = Instant::now();
let duration: Duration = end - start;
let output = {
let LiveStats {
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*total_stats;
Output {
downloads_count: downloads_count.load(Ordering::Relaxed),
downloads_bytes: downloads_bytes.load(Ordering::Relaxed),
evictions_count: evictions_count.load(Ordering::Relaxed),
timeline_restarts: timeline_restarts.load(Ordering::Relaxed),
runtime: duration,
}
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
Ok(())
}
@@ -140,6 +200,7 @@ async fn timeline_actor(
mgmt_api_client: Arc<pageserver_client::mgmt_api::Client>,
timeline: TenantTimelineId,
live_stats: Arc<LiveStats>,
token: CancellationToken,
) {
// TODO: support sharding
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
@@ -149,7 +210,7 @@ async fn timeline_actor(
layers: Vec<mpsc::Sender<OwnedSemaphorePermit>>,
concurrency: Arc<tokio::sync::Semaphore>,
}
loop {
while !token.is_cancelled() {
debug!("restarting timeline");
let layer_map_info = mgmt_api_client
.layer_map_info(tenant_shard_id, timeline.timeline_id)
@@ -185,7 +246,7 @@ async fn timeline_actor(
live_stats.timeline_restart_done();
loop {
while !token.is_cancelled() {
assert!(!timeline.joinset.is_empty());
if let Some(res) = timeline.joinset.try_join_next() {
debug!(?res, "a layer actor exited, should not happen");
@@ -255,7 +316,7 @@ async fn layer_actor(
.layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name())
.await
.unwrap();
live_stats.download_done();
live_stats.download_done(layer.layer_file_size());
did_it
}
};

View File

@@ -5,14 +5,35 @@ use bytes::{Buf, BufMut, Bytes};
use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
use tracing::warn;
/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, first 13B of 128b xxhash].
// BEGIN Copyright (c) 2017 Servo Contributors
/// Const version of FNV hash.
#[inline]
#[must_use]
pub const fn fnv_hash(bytes: &[u8]) -> u128 {
const INITIAL_STATE: u128 = 0x6c62272e07bb014262b821756295c58d;
const PRIME: u128 = 0x0000000001000000000000000000013B;
let mut hash = INITIAL_STATE;
let mut i = 0;
while i < bytes.len() {
hash ^= bytes[i] as u128;
hash = hash.wrapping_mul(PRIME);
i += 1;
}
hash
}
// END Copyright (c) 2017 Servo Contributors
/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, least significant 13B of FNV hash].
fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key {
let mut key = [0; METADATA_KEY_SIZE];
let hash = twox_hash::xxh3::hash128(data).to_be_bytes();
let mut key: [u8; 16] = [0; METADATA_KEY_SIZE];
let hash = fnv_hash(data).to_be_bytes();
key[0] = AUX_KEY_PREFIX;
key[1] = dir_level1;
key[2] = dir_level2;
key[3..16].copy_from_slice(&hash[0..13]);
key[3..16].copy_from_slice(&hash[3..16]);
Key::from_metadata_key_fixed_size(&key)
}
@@ -200,15 +221,19 @@ mod tests {
fn test_hash_portable() {
// AUX file encoding requires the hash to be portable across all platforms. This test case checks
// if the algorithm produces the same hash across different environments.
assert_eq!(
305317690835051308206966631765527126151,
twox_hash::xxh3::hash128("test1".as_bytes())
265160408618497461376862998434862070044,
super::fnv_hash("test1".as_bytes())
);
assert_eq!(
85104974691013376326742244813280798847,
twox_hash::xxh3::hash128("test/test2".as_bytes())
295486155126299629456360817749600553988,
super::fnv_hash("test/test2".as_bytes())
);
assert_eq!(
144066263297769815596495629667062367629,
super::fnv_hash("".as_bytes())
);
assert_eq!(0, twox_hash::xxh3::hash128("".as_bytes()));
}
#[test]
@@ -216,28 +241,28 @@ mod tests {
// To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
// of the page server.
assert_eq!(
"6200000101E5B20C5F8DD5AA3289D6D9EAFA",
encode_aux_file_key("pg_logical/mappings/test1").to_string()
"62000001017F8B83D94F7081693471ABF91C",
encode_aux_file_key("pg_logical/mappings/test1").to_string(),
);
assert_eq!(
"620000010239AAC544893139B26F501B97E6",
encode_aux_file_key("pg_logical/snapshots/test2").to_string()
"62000001027F8E83D94F7081693471ABFCCD",
encode_aux_file_key("pg_logical/snapshots/test2").to_string(),
);
assert_eq!(
"620000010300000000000000000000000000",
encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string()
"62000001032E07BB014262B821756295C58D",
encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string(),
);
assert_eq!(
"62000001FF8635AF2134B7266EC5B4189FD6",
encode_aux_file_key("pg_logical/unsupported").to_string()
"62000001FF4F38E1C74754E7D03C1A660178",
encode_aux_file_key("pg_logical/unsupported").to_string(),
);
assert_eq!(
"6200000201772D0E5D71DE14DA86142A1619",
"62000002017F8D83D94F7081693471ABFB92",
encode_aux_file_key("pg_replslot/test3").to_string()
);
assert_eq!(
"620000FFFF1866EBEB53B807B26A2416F317",
encode_aux_file_key("other_file_not_supported").to_string()
"620000FFFF2B6ECC8AEF93F643DC44F15E03",
encode_aux_file_key("other_file_not_supported").to_string(),
);
}

View File

@@ -284,7 +284,6 @@ fn start_pageserver(
))
.unwrap();
pageserver::preinitialize_metrics();
pageserver::metrics::wal_redo::set_process_kind_metric(conf.walredo_process_kind);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
@@ -383,7 +382,7 @@ fn start_pageserver(
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
// Set up remote storage client
let remote_storage = Some(create_remote_storage_client(conf)?);
let remote_storage = create_remote_storage_client(conf)?;
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
@@ -516,16 +515,12 @@ fn start_pageserver(
}
});
let secondary_controller = if let Some(remote_storage) = &remote_storage {
secondary::spawn_tasks(
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
)
} else {
secondary::null_controller()
};
let secondary_controller = secondary::spawn_tasks(
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
);
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
@@ -533,15 +528,13 @@ fn start_pageserver(
// been configured.
let disk_usage_eviction_state: Arc<disk_usage_eviction_task::State> = Arc::default();
if let Some(remote_storage) = &remote_storage {
launch_disk_usage_global_eviction_task(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}
launch_disk_usage_global_eviction_task(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
@@ -693,14 +686,7 @@ fn start_pageserver(
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
)
.await;
pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await;
unreachable!()
})
}

View File

@@ -99,7 +99,7 @@ pub mod defaults {
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "sync";
pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "async";
///
/// Default built-in configuration file.

View File

@@ -632,7 +632,7 @@ impl DeletionQueue {
///
/// If remote_storage is None, then the returned workers will also be None.
pub fn new<C>(
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
control_plane_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, Option<DeletionQueueWorkers<C>>)
@@ -658,23 +658,6 @@ impl DeletionQueue {
// longer to flush after Tenants have all been torn down.
let cancel = CancellationToken::new();
let remote_storage = match remote_storage {
None => {
return (
Self {
client: DeletionQueueClient {
tx,
executor_tx,
lsn_table: lsn_table.clone(),
},
cancel,
},
None,
)
}
Some(r) => r,
};
(
Self {
client: DeletionQueueClient {
@@ -765,7 +748,7 @@ mod test {
/// Simulate a pageserver restart by destroying and recreating the deletion queue
async fn restart(&mut self) {
let (deletion_queue, workers) = DeletionQueue::new(
Some(self.storage.clone()),
self.storage.clone(),
Some(self.mock_control_plane.clone()),
self.harness.conf,
);
@@ -875,7 +858,7 @@ mod test {
let mock_control_plane = MockControlPlane::new();
let (deletion_queue, worker) = DeletionQueue::new(
Some(storage.clone()),
storage.clone(),
Some(mock_control_plane.clone()),
harness.conf,
);

View File

@@ -104,7 +104,7 @@ pub struct State {
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
@@ -118,7 +118,7 @@ impl State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
@@ -813,12 +813,6 @@ async fn tenant_attach_handler(
let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
if state.remote_storage.is_none() {
return Err(ApiError::BadRequest(anyhow!(
"attach_tenant is not possible because pageserver was configured without remote storage"
)));
}
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let shard_params = ShardParameters::default();
let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
@@ -1643,12 +1637,6 @@ async fn tenant_time_travel_remote_storage_handler(
)));
}
let Some(storage) = state.remote_storage.as_ref() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run time travel"
)));
};
if timestamp > done_if_after {
return Err(ApiError::BadRequest(anyhow!(
"The done_if_after timestamp comes before the timestamp to recover to"
@@ -1658,7 +1646,7 @@ async fn tenant_time_travel_remote_storage_handler(
tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
remote_timeline_client::upload::time_travel_recover_tenant(
storage,
&state.remote_storage,
&tenant_shard_id,
timestamp,
done_if_after,
@@ -1903,11 +1891,6 @@ async fn deletion_queue_flush(
) -> Result<Response<Body>, ApiError> {
let state = get_state(&r);
if state.remote_storage.is_none() {
// Nothing to do if remote storage is disabled.
return json_response(StatusCode::OK, ());
}
let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
let flush = async {
@@ -2072,18 +2055,11 @@ async fn disk_usage_eviction_run(
};
let state = get_state(&r);
let Some(storage) = state.remote_storage.as_ref() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
};
let eviction_state = state.disk_usage_eviction_state.clone();
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&eviction_state,
storage,
&state.remote_storage,
usage,
&state.tenant_manager,
config.eviction_order,
@@ -2120,29 +2096,23 @@ async fn tenant_scan_remote_handler(
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let Some(remote_storage) = state.remote_storage.as_ref() else {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Remote storage not configured"
)));
};
let mut response = TenantScanRemoteStorageResponse::default();
let (shards, _other_keys) =
list_remote_tenant_shards(remote_storage, tenant_id, cancel.clone())
list_remote_tenant_shards(&state.remote_storage, tenant_id, cancel.clone())
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
for tenant_shard_id in shards {
let (timeline_ids, _other_keys) =
list_remote_timelines(remote_storage, tenant_shard_id, cancel.clone())
list_remote_timelines(&state.remote_storage, tenant_shard_id, cancel.clone())
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
let mut generation = Generation::none();
for timeline_id in timeline_ids {
match download_index_part(
remote_storage,
&state.remote_storage,
&tenant_shard_id,
&timeline_id,
Generation::MAX,

View File

@@ -57,7 +57,7 @@ pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(
tenant_manager: &TenantManager,
deletion_queue: Option<DeletionQueue>,
mut deletion_queue: DeletionQueue,
exit_code: i32,
) {
use std::time::Duration;
@@ -89,9 +89,7 @@ pub async fn shutdown_pageserver(
.await;
// Best effort to persist any outstanding deletions, to avoid leaking objects
if let Some(mut deletion_queue) = deletion_queue {
deletion_queue.shutdown(Duration::from_secs(5)).await;
}
deletion_queue.shutdown(Duration::from_secs(5)).await;
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
@@ -114,10 +112,6 @@ pub async fn shutdown_pageserver(
std::process::exit(exit_code);
}
/// The name of the metadata file pageserver creates per timeline.
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
pub const METADATA_FILE_NAME: &str = "metadata";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub(crate) const TENANT_CONFIG_NAME: &str = "config";

View File

@@ -1999,29 +1999,6 @@ impl Default for WalRedoProcessCounters {
pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
Lazy::new(WalRedoProcessCounters::default);
#[cfg(not(test))]
pub mod wal_redo {
use super::*;
static PROCESS_KIND: Lazy<std::sync::Mutex<UIntGaugeVec>> = Lazy::new(|| {
std::sync::Mutex::new(
register_uint_gauge_vec!(
"pageserver_wal_redo_process_kind",
"The configured process kind for walredo",
&["kind"],
)
.unwrap(),
)
});
pub fn set_process_kind_metric(kind: crate::walredo::ProcessKind) {
// use guard to avoid races around the next two steps
let guard = PROCESS_KIND.lock().unwrap();
guard.reset();
guard.with_label_values(&[&format!("{kind}")]).set(1);
}
}
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
pub(crate) struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,

View File

@@ -190,7 +190,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
#[derive(Clone)]
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
}
@@ -292,7 +292,7 @@ pub struct Tenant {
walredo_mgr: Option<Arc<WalRedoManager>>,
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: Option<GenericRemoteStorage>,
pub(crate) remote_storage: GenericRemoteStorage,
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: DeletionQueueClient,
@@ -551,21 +551,22 @@ impl Tenant {
);
if let Some(index_part) = index_part.as_ref() {
timeline
.remote_client
.as_ref()
.unwrap()
.init_upload_queue(index_part)?;
} else if self.remote_storage.is_some() {
timeline.remote_client.init_upload_queue(index_part)?;
} else {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
// By doing what we do here, the index part upload is retried.
// If control plane retries timeline creation in the meantime, the mgmt API handler
// for timeline creation will coalesce on the upload we queue here.
// FIXME: this branch should be dead code as we no longer write local metadata.
let rtc = timeline.remote_client.as_ref().unwrap();
rtc.init_upload_queue_for_empty_remote(&metadata)?;
rtc.schedule_index_upload_for_full_metadata_update(&metadata)?;
timeline
.remote_client
.init_upload_queue_for_empty_remote(&metadata)?;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
}
timeline
@@ -777,14 +778,14 @@ impl Tenant {
AttachType::Normal
};
let preload = match (&mode, &remote_storage) {
(SpawnMode::Create, _) => {
let preload = match &mode {
SpawnMode::Create => {
None
},
(SpawnMode::Eager | SpawnMode::Lazy, Some(remote_storage)) => {
SpawnMode::Eager | SpawnMode::Lazy => {
let _preload_timer = TENANT.preload.start_timer();
let res = tenant_clone
.preload(remote_storage, task_mgr::shutdown_token())
.preload(&remote_storage, task_mgr::shutdown_token())
.await;
match res {
Ok(p) => Some(p),
@@ -794,10 +795,7 @@ impl Tenant {
}
}
}
(_, None) => {
let _preload_timer = TENANT.preload.start_timer();
None
}
};
// Remote preload is complete.
@@ -1021,7 +1019,7 @@ impl Tenant {
index_part,
remote_metadata,
TimelineResources {
remote_client: Some(remote_client),
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
},
@@ -1047,7 +1045,7 @@ impl Tenant {
Arc::clone(self),
timeline_id,
&index_part.metadata,
Some(remote_timeline_client),
remote_timeline_client,
self.deletion_queue_client.clone(),
)
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
@@ -1139,9 +1137,7 @@ impl Tenant {
let mut size = 0;
for timeline in self.list_timelines() {
if let Some(remote_client) = &timeline.remote_client {
size += remote_client.get_remote_physical_size();
}
size += timeline.remote_client.get_remote_physical_size();
}
size
@@ -1191,6 +1187,7 @@ impl Tenant {
pub fn create_broken_tenant(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
remote_storage: GenericRemoteStorage,
reason: String,
) -> Arc<Tenant> {
Arc::new(Tenant::new(
@@ -1205,7 +1202,7 @@ impl Tenant {
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
None,
tenant_shard_id,
None,
remote_storage,
DeletionQueueClient::broken(),
))
}
@@ -1398,13 +1395,7 @@ impl Tenant {
tline.freeze_and_flush().await.context("freeze_and_flush")?;
// Make sure the freeze_and_flush reaches remote storage.
tline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();
tline.remote_client.wait_completion().await.unwrap();
let tl = uninit_tl.finish_creation()?;
// The non-test code would call tl.activate() here.
@@ -1470,20 +1461,19 @@ impl Tenant {
return Err(CreateTimelineError::Conflict);
}
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
existing
.remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
return Ok(existing);
}
@@ -1559,14 +1549,14 @@ impl Tenant {
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
// not send a success to the caller until it is. The same applies to handling retries,
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
remote_client.wait_completion().await.with_context(|| {
format!("wait for {} timeline initial uploads to complete", kind)
})?;
}
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
loaded_timeline
.remote_client
.wait_completion()
.await
.with_context(|| format!("wait for {} timeline initial uploads to complete", kind))?;
loaded_timeline.activate(self.clone(), broker_client, None, ctx);
@@ -2161,32 +2151,26 @@ impl Tenant {
) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let Some(tl_client) = &timeline.remote_client else {
anyhow::bail!("Remote storage is mandatory");
};
let Some(remote_storage) = &self.remote_storage else {
anyhow::bail!("Remote storage is mandatory");
};
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
// to ensure that they do not start a split if currently in the process of doing these.
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
tl_client.schedule_index_upload_for_file_changes()?;
tl_client.wait_completion().await?;
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tl_client.shutdown().await;
timeline.remote_client.shutdown().await;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
let result = tl_client
let result = timeline.remote_client
.download_index_file(&self.cancel)
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.await?;
@@ -2199,7 +2183,7 @@ impl Tenant {
for child_shard in child_shards {
upload_index_part(
remote_storage,
&self.remote_storage,
child_shard,
&timeline.timeline_id,
self.generation,
@@ -2475,7 +2459,7 @@ impl Tenant {
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<WalRedoManager>>,
tenant_shard_id: TenantShardId,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
let (state, mut rx) = watch::channel(state);
@@ -3119,11 +3103,10 @@ impl Tenant {
// We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
// could get incorrect information and remove more layers, than needed.
// See also https://github.com/neondatabase/neon/issues/3865
if let Some(remote_client) = new_timeline.remote_client.as_ref() {
remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)
.context("branch initial metadata upload")?;
}
new_timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)
.context("branch initial metadata upload")?;
Ok(new_timeline)
}
@@ -3155,11 +3138,6 @@ impl Tenant {
pgdata_path: &Utf8PathBuf,
timeline_id: &TimelineId,
) -> anyhow::Result<()> {
let Some(storage) = &self.remote_storage else {
// No remote storage? No upload.
return Ok(());
};
let temp_path = timelines_path.join(format!(
"{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
));
@@ -3183,7 +3161,7 @@ impl Tenant {
backoff::retry(
|| async {
self::remote_timeline_client::upload_initdb_dir(
storage,
&self.remote_storage,
&self.tenant_shard_id.tenant_id,
timeline_id,
pgdata_zstd.try_clone().await?,
@@ -3240,9 +3218,6 @@ impl Tenant {
}
}
if let Some(existing_initdb_timeline_id) = load_existing_initdb {
let Some(storage) = &self.remote_storage else {
bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
};
if existing_initdb_timeline_id != timeline_id {
let source_path = &remote_initdb_archive_path(
&self.tenant_shard_id.tenant_id,
@@ -3252,7 +3227,7 @@ impl Tenant {
&remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
// if this fails, it will get retried by retried control plane requests
storage
self.remote_storage
.copy_object(source_path, dest_path, &self.cancel)
.await
.context("copy initdb tar")?;
@@ -3260,7 +3235,7 @@ impl Tenant {
let (initdb_tar_zst_path, initdb_tar_zst) =
self::remote_timeline_client::download_initdb_tar_zst(
self.conf,
storage,
&self.remote_storage,
&self.tenant_shard_id,
&existing_initdb_timeline_id,
&self.cancel,
@@ -3355,20 +3330,14 @@ impl Tenant {
/// Call this before constructing a timeline, to build its required structures
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
let remote_client = RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
Some(remote_client)
} else {
None
};
let remote_client = RemoteTimelineClient::new(
self.remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
@@ -3392,9 +3361,9 @@ impl Tenant {
let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id);
if let Some(remote_client) = &resources.remote_client {
remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
}
resources
.remote_client
.init_upload_queue_for_empty_remote(new_metadata)?;
let timeline_struct = self
.create_timeline_struct(
@@ -3562,9 +3531,7 @@ impl Tenant {
tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
timeline.freeze_and_flush().await?;
tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
if let Some(client) = &timeline.remote_client {
client.wait_completion().await?;
}
timeline.remote_client.wait_completion().await?;
Ok(())
}
@@ -3878,7 +3845,7 @@ pub(crate) mod harness {
ShardIdentity::unsharded(),
Some(walredo_mgr),
self.tenant_shard_id,
Some(self.remote_storage.clone()),
self.remote_storage.clone(),
self.deletion_queue.new_client(),
));

View File

@@ -181,25 +181,23 @@ async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), Del
async fn remove_tenant_remote_delete_mark(
conf: &PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
remote_storage: &GenericRemoteStorage,
tenant_shard_id: &TenantShardId,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
if let Some(remote_storage) = remote_storage {
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path, cancel).await },
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?;
}
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path, cancel).await },
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?;
Ok(())
}
@@ -297,7 +295,7 @@ impl DeleteTenantFlow {
#[instrument(skip_all)]
pub(crate) async fn run(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
cancel: &CancellationToken,
@@ -308,9 +306,7 @@ impl DeleteTenantFlow {
let mut guard = Self::prepare(&tenant).await?;
if let Err(e) =
Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant, cancel).await
{
if let Err(e) = Self::run_inner(&mut guard, conf, &remote_storage, &tenant, cancel).await {
tenant.set_broken(format!("{e:#}")).await;
return Err(e);
}
@@ -327,7 +323,7 @@ impl DeleteTenantFlow {
async fn run_inner(
guard: &mut OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
remote_storage: &GenericRemoteStorage,
tenant: &Tenant,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
@@ -339,14 +335,9 @@ impl DeleteTenantFlow {
))?
});
// IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend.
// Though sounds scary, different mark name?
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
if let Some(remote_storage) = &remote_storage {
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel)
.await
.context("remote_mark")?
}
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel)
.await
.context("remote_mark")?;
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
Err(anyhow::anyhow!(
@@ -483,7 +474,7 @@ impl DeleteTenantFlow {
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) {
@@ -512,7 +503,7 @@ impl DeleteTenantFlow {
async fn background(
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
@@ -551,7 +542,7 @@ impl DeleteTenantFlow {
remove_tenant_remote_delete_mark(
conf,
remote_storage.as_ref(),
&remote_storage,
&tenant.tenant_shard_id,
&task_mgr::shutdown_token(),
)

View File

@@ -47,7 +47,7 @@ use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext::PathExt;
@@ -391,22 +391,17 @@ async fn init_load_generations(
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
// are processed, even though we don't block on recovery completing here.
//
// Must only do this if remote storage is enabled, otherwise deletion queue
// is not running and channel push will fail.
if resources.remote_storage.is_some() {
let attached_tenants = generations
.iter()
.flat_map(|(id, start_mode)| {
match start_mode {
TenantStartupMode::Attached((_mode, generation)) => Some(generation),
TenantStartupMode::Secondary => None,
}
.map(|gen| (*id, *gen))
})
.collect();
resources.deletion_queue_client.recover(attached_tenants)?;
}
let attached_tenants = generations
.iter()
.flat_map(|(id, start_mode)| {
match start_mode {
TenantStartupMode::Attached((_mode, generation)) => Some(generation),
TenantStartupMode::Secondary => None,
}
.map(|gen| (*id, *gen))
})
.collect();
resources.deletion_queue_client.recover(attached_tenants)?;
Ok(Some(generations))
}
@@ -460,53 +455,6 @@ fn load_tenant_config(
}
};
// Clean up legacy `metadata` files.
// Doing it here because every single tenant directory is visited here.
// In any later code, there's different treatment of tenant dirs
// ... depending on whether the tenant is in re-attach response or not
// ... epending on whether the tenant is ignored or not
assert_eq!(
&conf.tenant_path(&tenant_shard_id),
&tenant_dir_path,
"later use of conf....path() methods would be dubious"
);
let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
Ok(iter) => {
let mut timelines = Vec::new();
for res in iter {
let p = res?;
let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
// skip any entries that aren't TimelineId, such as
// - *.___temp dirs
// - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
continue;
};
timelines.push(timeline_id);
}
timelines
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
Err(e) => return Err(anyhow::anyhow!(e)),
};
for timeline_id in timelines {
let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
let metadata_path = timeline_path.join(METADATA_FILE_NAME);
match std::fs::remove_file(&metadata_path) {
Ok(()) => {
crashsafe::fsync(timeline_path)
.context("fsync timeline dir after removing legacy metadata file")?;
info!("removed legacy metadata file at {metadata_path}");
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// something removed the file earlier, or it was never there
// We don't care, this software version doesn't write it again, so, we're good.
}
Err(e) => {
anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
}
}
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
@@ -611,6 +559,7 @@ pub async fn init_tenant_mgr(
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_shard_id,
resources.remote_storage.clone(),
format!("{}", e),
)),
);
@@ -803,6 +752,7 @@ fn tenant_spawn(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
);
let remote_storage = resources.remote_storage.clone();
let tenant = match Tenant::spawn(
conf,
tenant_shard_id,
@@ -817,7 +767,7 @@ fn tenant_spawn(
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
Tenant::create_broken_tenant(conf, tenant_shard_id, remote_storage, format!("{e:#}"))
}
};
@@ -2276,7 +2226,7 @@ pub(crate) async fn load_tenant(
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -2937,7 +2887,7 @@ pub(crate) async fn immediate_gc(
}
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
let rtc = timeline.as_ref().map(|x| &x.remote_client);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the

View File

@@ -2137,7 +2137,7 @@ mod tests {
tenant_ctx: _tenant_ctx,
} = test_setup;
let client = timeline.remote_client.as_ref().unwrap();
let client = &timeline.remote_client;
// Download back the index.json, and check that the list of files is correct
let initial_index_part = match client
@@ -2328,7 +2328,7 @@ mod tests {
timeline,
..
} = TestSetup::new("metrics").await.unwrap();
let client = timeline.remote_client.as_ref().unwrap();
let client = &timeline.remote_client;
let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let local_path = local_layer_path(

View File

@@ -26,7 +26,7 @@ use crate::{
tasks::{warn_when_period_overrun, BackgroundLoopKind},
},
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
TEMP_FILE_SUFFIX,
};
use super::{
@@ -1074,11 +1074,7 @@ async fn init_timeline_state(
.fatal_err(&format!("Read metadata on {}", file_path));
let file_name = file_path.file_name().expect("created it from the dentry");
if file_name == METADATA_FILE_NAME {
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
continue;
} else if crate::is_temporary(&file_path)
if crate::is_temporary(&file_path)
|| is_temp_download_file(&file_path)
|| is_ephemeral_file(file_name)
{

View File

@@ -585,9 +585,6 @@ struct LayerInner {
/// [`Timeline::gate`] at the same time.
timeline: Weak<Timeline>,
/// Cached knowledge of [`Timeline::remote_client`] being `Some`.
have_remote_client: bool,
access_stats: LayerAccessStats,
/// This custom OnceCell is backed by std mutex, but only held for short time periods.
@@ -732,23 +729,23 @@ impl Drop for LayerInner {
if removed {
timeline.metrics.resident_physical_size_sub(file_size);
}
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
let res = timeline
.remote_client
.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
// demonstrating this deadlock (without spawn_blocking): stop will drop
// queued items, which will have ResidentLayer's, and those drops would try
// to re-entrantly lock the RemoteTimelineClient inner state.
if !timeline.is_active() {
tracing::info!("scheduling deletion on drop failed: {e:#}");
} else {
tracing::warn!("scheduling deletion on drop failed: {e:#}");
}
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
// demonstrating this deadlock (without spawn_blocking): stop will drop
// queued items, which will have ResidentLayer's, and those drops would try
// to re-entrantly lock the RemoteTimelineClient inner state.
if !timeline.is_active() {
tracing::info!("scheduling deletion on drop failed: {e:#}");
} else {
LAYER_IMPL_METRICS.inc_completed_deletes();
tracing::warn!("scheduling deletion on drop failed: {e:#}");
}
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
} else {
LAYER_IMPL_METRICS.inc_completed_deletes();
}
});
}
@@ -786,7 +783,6 @@ impl LayerInner {
path: local_path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_deleted: AtomicBool::new(false),
inner,
@@ -815,8 +811,6 @@ impl LayerInner {
/// in a new attempt to evict OR join the previously started attempt.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
assert!(self.have_remote_client);
let mut rx = self.status.as_ref().unwrap().subscribe();
{
@@ -973,10 +967,6 @@ impl LayerInner {
return Err(DownloadError::NotFile(ft));
}
if timeline.remote_client.as_ref().is_none() {
return Err(DownloadError::NoRemoteStorage);
}
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
}
@@ -1113,12 +1103,8 @@ impl LayerInner {
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
) -> anyhow::Result<Arc<DownloadedLayer>> {
let client = timeline
let result = timeline
.remote_client
.as_ref()
.expect("checked before download_init_and_wait");
let result = client
.download_layer_file(
&self.desc.layer_name(),
&self.metadata(),
@@ -1293,20 +1279,10 @@ impl LayerInner {
/// `DownloadedLayer` is being dropped, so it calls this method.
fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
let can_evict = self.have_remote_client;
// we cannot know without inspecting LayerInner::inner if we should evict or not, even
// though here it is very likely
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
if !can_evict {
// it would be nice to assert this case out, but we are in drop
span.in_scope(|| {
tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage");
});
return;
}
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
// drop while the `self.inner` is being locked, leading to a deadlock.
@@ -1578,8 +1554,6 @@ pub(crate) enum EvictionError {
pub(crate) enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
NoRemoteStorage,
#[error("context denies downloading")]
ContextAndConfigReallyDeniesDownloads,
#[error("downloading is really required but not allowed by this method")]

View File

@@ -145,7 +145,7 @@ async fn smoke_test() {
.await
.expect("the local layer file still exists");
let rtc = timeline.remote_client.as_ref().unwrap();
let rtc = &timeline.remote_client;
{
let layers = &[layer];
@@ -761,13 +761,7 @@ async fn eviction_cancellation_on_drop() {
timeline.freeze_and_flush().await.unwrap();
// wait for the upload to complete so our Arc::strong_count assertion holds
timeline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();
timeline.remote_client.wait_completion().await.unwrap();
let (evicted_layer, not_evicted) = {
let mut layers = {

View File

@@ -200,7 +200,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: Option<RemoteTimelineClient>,
pub remote_client: RemoteTimelineClient,
pub deletion_queue_client: DeletionQueueClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
@@ -272,7 +272,7 @@ pub struct Timeline {
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
pub remote_client: Option<Arc<RemoteTimelineClient>>,
pub remote_client: Arc<RemoteTimelineClient>,
// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
@@ -1375,22 +1375,14 @@ impl Timeline {
/// not validated with control plane yet.
/// See [`Self::get_remote_consistent_lsn_visible`].
pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_projected()
} else {
None
}
self.remote_client.remote_consistent_lsn_projected()
}
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
/// i.e. a value of remote_consistent_lsn_projected which has undergone
/// generation validation in the deletion queue.
pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_visible()
} else {
None
}
self.remote_client.remote_consistent_lsn_visible()
}
/// The sum of the file size of all historic layers in the layer map.
@@ -1760,16 +1752,14 @@ impl Timeline {
match self.freeze_and_flush().await {
Ok(_) => {
// drain the upload queue
if let Some(client) = self.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
client.shutdown().await;
}
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
self.remote_client.shutdown().await;
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
@@ -1785,18 +1775,16 @@ impl Timeline {
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;
}
self.remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;
// TODO: work toward making this a no-op. See this funciton's doc comment for more context.
tracing::debug!("Waiting for tasks...");
@@ -1922,10 +1910,6 @@ impl Timeline {
return Ok(None);
};
if self.remote_client.is_none() {
return Ok(Some(false));
}
layer.download().await?;
Ok(Some(true))
@@ -2190,7 +2174,7 @@ impl Timeline {
walredo_mgr,
walreceiver: Mutex::new(None),
remote_client: resources.remote_client.map(Arc::new),
remote_client: Arc::new(resources.remote_client),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -2437,10 +2421,6 @@ impl Timeline {
discovered_layers.push((layer_file_name, local_path, file_size));
continue;
}
Discovered::Metadata => {
warn!("found legacy metadata file, these should have been removed in load_tenant_config");
continue;
}
Discovered::IgnoredBackup => {
continue;
}
@@ -2487,12 +2467,10 @@ impl Timeline {
if local.metadata.file_size() == remote.file_size() {
// Use the local file, but take the remote metadata so that we pick up
// the correct generation.
UseLocal(
LocalLayerFileMetadata {
metadata: remote,
local_path: local.local_path
}
)
UseLocal(LocalLayerFileMetadata {
metadata: remote,
local_path: local.local_path,
})
} else {
init::cleanup_local_file_for_remote(&local, &remote)?;
UseRemote { local, remote }
@@ -2501,7 +2479,11 @@ impl Timeline {
Ok(decision) => decision,
Err(DismissedLayer::Future { local }) => {
if let Some(local) = local {
init::cleanup_future_layer(&local.local_path, &name, disk_consistent_lsn)?;
init::cleanup_future_layer(
&local.local_path,
&name,
disk_consistent_lsn,
)?;
}
needs_cleanup.push(name);
continue;
@@ -2523,7 +2505,8 @@ impl Timeline {
let layer = match decision {
UseLocal(local) => {
total_physical_size += local.metadata.file_size();
Layer::for_resident(conf, &this, local.local_path, name, local.metadata).drop_eviction_guard()
Layer::for_resident(conf, &this, local.local_path, name, local.metadata)
.drop_eviction_guard()
}
Evicted(remote) | UseRemote { remote, .. } => {
Layer::for_evicted(conf, &this, name, remote)
@@ -2543,36 +2526,36 @@ impl Timeline {
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
if let Some(rtc) = self.remote_client.as_ref() {
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_index_upload_for_file_changes()?;
// This barrier orders above DELETEs before any later operations.
// This is critical because code executing after the barrier might
// create again objects with the same key that we just scheduled for deletion.
// For example, if we just scheduled deletion of an image layer "from the future",
// later compaction might run again and re-create the same image layer.
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
// "same" here means same key range and LSN.
//
// Without a barrier between above DELETEs and the re-creation's PUTs,
// the upload queue may execute the PUT first, then the DELETE.
// In our example, we will end up with an IndexPart referencing a non-existent object.
//
// 1. a future image layer is created and uploaded
// 2. ps restart
// 3. the future layer from (1) is deleted during load layer map
// 4. image layer is re-created and uploaded
// 5. deletion queue would like to delete (1) but actually deletes (4)
// 6. delete by name works as expected, but it now deletes the wrong (later) version
//
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
rtc.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
}
self.remote_client
.schedule_layer_file_deletion(&needs_cleanup)?;
self.remote_client
.schedule_index_upload_for_file_changes()?;
// This barrier orders above DELETEs before any later operations.
// This is critical because code executing after the barrier might
// create again objects with the same key that we just scheduled for deletion.
// For example, if we just scheduled deletion of an image layer "from the future",
// later compaction might run again and re-create the same image layer.
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
// "same" here means same key range and LSN.
//
// Without a barrier between above DELETEs and the re-creation's PUTs,
// the upload queue may execute the PUT first, then the DELETE.
// In our example, we will end up with an IndexPart referencing a non-existent object.
//
// 1. a future image layer is created and uploaded
// 2. ps restart
// 3. the future layer from (1) is deleted during load layer map
// 4. image layer is re-created and uploaded
// 5. deletion queue would like to delete (1) but actually deletes (4)
// 6. delete by name works as expected, but it now deletes the wrong (later) version
//
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
self.remote_client.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
@@ -3025,9 +3008,6 @@ impl Timeline {
/// should treat this as a cue to simply skip doing any heatmap uploading
/// for this timeline.
pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
// no point in heatmaps without remote client
let _remote_client = self.remote_client.as_ref()?;
if !self.is_active() {
return None;
}
@@ -3055,10 +3035,7 @@ impl Timeline {
// branchpoint in the value in IndexPart::lineage
self.ancestor_lsn == lsn
|| (self.ancestor_lsn == Lsn::INVALID
&& self
.remote_client
.as_ref()
.is_some_and(|rtc| rtc.is_previous_ancestor_lsn(lsn)))
&& self.remote_client.is_previous_ancestor_lsn(lsn))
}
}
@@ -3978,29 +3955,23 @@ impl Timeline {
x.unwrap()
));
if let Some(remote_client) = &self.remote_client {
for layer in layers_to_upload {
remote_client.schedule_layer_file_upload(layer)?;
}
remote_client.schedule_index_upload_for_metadata_update(&update)?;
for layer in layers_to_upload {
self.remote_client.schedule_layer_file_upload(layer)?;
}
self.remote_client
.schedule_index_upload_for_metadata_update(&update)?;
Ok(())
}
pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
if let Some(remote_client) = &self.remote_client {
remote_client
.preserve_initdb_archive(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
&self.cancel,
)
.await?;
} else {
bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
}
Ok(())
self.remote_client
.preserve_initdb_archive(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
&self.cancel,
)
.await
}
// Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
@@ -4361,12 +4332,7 @@ impl Timeline {
return;
}
if self
.remote_client
.as_ref()
.map(|c| c.is_deleting())
.unwrap_or(false)
{
if self.remote_client.is_deleting() {
// The timeline was created in a deletion-resume state, we don't expect logical size to be populated
return;
}
@@ -4534,9 +4500,8 @@ impl Timeline {
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&remove_layers, new_deltas)?;
}
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
drop_wlock(guard);
@@ -4554,9 +4519,8 @@ impl Timeline {
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&drop_layers, &upload_layers)?;
}
self.remote_client
.schedule_compaction_update(&drop_layers, &upload_layers)?;
Ok(())
}
@@ -4566,16 +4530,14 @@ impl Timeline {
self: &Arc<Self>,
new_images: impl IntoIterator<Item = ResidentLayer>,
) -> anyhow::Result<()> {
let Some(remote_client) = &self.remote_client else {
return Ok(());
};
for layer in new_images {
remote_client.schedule_layer_file_upload(layer)?;
self.remote_client.schedule_layer_file_upload(layer)?;
}
// should any new image layer been created, not uploading index_part will
// result in a mismatch between remote_physical_size and layermap calculated
// size, which will fail some tests, but should not be an issue otherwise.
remote_client.schedule_index_upload_for_file_changes()?;
self.remote_client
.schedule_index_upload_for_file_changes()?;
Ok(())
}
@@ -4861,9 +4823,7 @@ impl Timeline {
result.layers_removed = gc_layers.len() as u64;
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_gc_update(&gc_layers)?;
}
self.remote_client.schedule_gc_update(&gc_layers)?;
guard.finish_gc_timeline(&gc_layers);

View File

@@ -295,13 +295,11 @@ impl Timeline {
// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
self.rewrite_layers(replace_layers, drop_layers).await?;
if let Some(remote_client) = self.remote_client.as_ref() {
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
remote_client.wait_completion().await?;
}
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
self.remote_client.wait_completion().await?;
Ok(())
}

View File

@@ -26,19 +26,21 @@ use super::{Timeline, TimelineResources};
/// during attach or pageserver restart.
/// See comment in persist_index_part_with_deleted_flag.
async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
//
// AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents
// two tasks from performing the deletion at the same time. The first task
// that starts deletion should run it to completion.
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
}
match timeline
.remote_client
.persist_index_part_with_deleted_flag()
.await
{
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
//
// AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents
// two tasks from performing the deletion at the same time. The first task
// that starts deletion should run it to completion.
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
}
}
Ok(())
@@ -117,11 +119,11 @@ pub(super) async fn delete_local_timeline_directory(
/// Removes remote layers and an index file after them.
async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
if let Some(remote_client) = &timeline.remote_client {
remote_client.delete_all().await.context("delete_all")?
};
Ok(())
timeline
.remote_client
.delete_all()
.await
.context("delete_all")
}
// This function removs remaining traces of a timeline on disk.
@@ -260,7 +262,7 @@ impl DeleteTimelineFlow {
tenant: Arc<Tenant>,
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>,
remote_client: RemoteTimelineClient,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.

View File

@@ -70,10 +70,6 @@ pub(super) async fn prepare(
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
use Error::*;
if detached.remote_client.as_ref().is_none() {
unimplemented!("no new code for running without remote storage");
}
let Some((ancestor, ancestor_lsn)) = detached
.ancestor_timeline
.as_ref()
@@ -315,8 +311,6 @@ async fn upload_rewritten_layer(
// FIXME: better shuttingdown error
target
.remote_client
.as_ref()
.unwrap()
.upload_layer_file(&copied, cancel)
.await
.map_err(UploadRewritten)?;
@@ -406,8 +400,6 @@ async fn remote_copy(
// FIXME: better shuttingdown error
adoptee
.remote_client
.as_ref()
.unwrap()
.copy_timeline_layer(adopted, &owned, cancel)
.await
.map(move |()| owned)
@@ -421,11 +413,6 @@ pub(super) async fn complete(
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
let rtc = detached
.remote_client
.as_ref()
.expect("has to have a remote timeline client for timeline ancestor detach");
let PreparedTimelineDetach { layers } = prepared;
let ancestor = detached
@@ -442,11 +429,13 @@ pub(super) async fn complete(
//
// this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
// which could give us a completely wrong layer combination.
rtc.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await?;
detached
.remote_client
.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await?;
let mut tasks = tokio::task::JoinSet::new();
@@ -491,8 +480,6 @@ pub(super) async fn complete(
async move {
let res = timeline
.remote_client
.as_ref()
.expect("reparented has to have remote client because detached has one")
.schedule_reparenting_and_wait(&new_parent)
.await;

View File

@@ -23,7 +23,7 @@ use std::{
use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use tracing::{debug, info, info_span, instrument, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -211,11 +211,6 @@ impl Timeline {
// So, we just need to deal with this.
if self.remote_client.is_none() {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read().await;

View File

@@ -9,7 +9,6 @@ use crate::{
storage_layer::LayerName,
Generation,
},
METADATA_FILE_NAME,
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
@@ -27,8 +26,6 @@ pub(super) enum Discovered {
Temporary(String),
/// Temporary on-demand download files, should be removed
TemporaryDownload(String),
/// "metadata" file we persist locally and include in `index_part.json`
Metadata,
/// Backup file from previously future layers
IgnoredBackup,
/// Unrecognized, warn about these
@@ -49,9 +46,7 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
Discovered::Layer(file_name, direntry.path().to_owned(), file_size)
}
Err(_) => {
if file_name == METADATA_FILE_NAME {
Discovered::Metadata
} else if file_name.ends_with(".old") {
if file_name.ends_with(".old") {
// ignore these
Discovered::IgnoredBackup
} else if remote_timeline_client::is_temp_download_file(direntry.path()) {

View File

@@ -153,10 +153,7 @@ impl PostgresRedoManager {
process: self
.redo_process
.get()
.map(|p| WalRedoManagerProcessStatus {
pid: p.id(),
kind: std::borrow::Cow::Borrowed(p.kind().into()),
}),
.map(|p| WalRedoManagerProcessStatus { pid: p.id() }),
}
}
}

View File

@@ -1,7 +1,10 @@
/// Layer of indirection previously used to support multiple implementations.
/// Subject to removal: <https://github.com/neondatabase/neon/issues/7753>
use std::time::Duration;
use bytes::Bytes;
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use tracing::warn;
use utils::lsn::Lsn;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
@@ -12,7 +15,6 @@ mod protocol;
mod process_impl {
pub(super) mod process_async;
pub(super) mod process_std;
}
#[derive(
@@ -34,10 +36,7 @@ pub enum Kind {
Async,
}
pub(crate) enum Process {
Sync(process_impl::process_std::WalRedoProcess),
Async(process_impl::process_async::WalRedoProcess),
}
pub(crate) struct Process(process_impl::process_async::WalRedoProcess);
impl Process {
#[inline(always)]
@@ -46,18 +45,17 @@ impl Process {
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
Ok(match conf.walredo_process_kind {
Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?),
Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?),
})
if conf.walredo_process_kind != Kind::Async {
warn!(
configured = %conf.walredo_process_kind,
"the walredo_process_kind setting has been turned into a no-op, using async implementation"
);
}
Ok(Self(process_impl::process_async::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?))
}
#[inline(always)]
@@ -69,29 +67,12 @@ impl Process {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
match self {
Process::Sync(p) => {
p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
.await
}
Process::Async(p) => {
p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
.await
}
}
self.0
.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
.await
}
pub(crate) fn id(&self) -> u32 {
match self {
Process::Sync(p) => p.id(),
Process::Async(p) => p.id(),
}
}
pub(crate) fn kind(&self) -> Kind {
match self {
Process::Sync(_) => Kind::Sync,
Process::Async(_) => Kind::Async,
}
self.0.id()
}
}

View File

@@ -1,405 +0,0 @@
use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf,
metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
walrecord::NeonWalRecord,
walredo::process::{no_leak_child, protocol},
};
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
io::{Read, Write},
process::{ChildStdin, ChildStdout, Command, Stdio},
sync::{Mutex, MutexGuard},
time::Duration,
};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, nonblock::set_nonblock};
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
}
struct ProcessInput {
stdin: ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
use no_leak_child::NoLeakChildCommandExt;
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
// NB: The redo process is not trusted after we sent it the first
// walredo work. Before that, it is trusted. Specifically, we trust
// it to
// 1. close all file descriptors except stdin, stdout, stderr because
// pageserver might not be 100% diligent in setting FD_CLOEXEC on all
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait(WalRedoKillCause::Startup);
});
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
}
pub(crate) fn id(&self) -> u32 {
self.child
.as_ref()
.expect("must not call this during Drop")
.id()
}
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) async fn apply_wal_records(
&self,
rel: RelTag,
blknum: u32,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let tag = protocol::BufferTag { rel, blknum };
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
protocol::build_push_page_msg(tag, img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
}
}
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
// in tests so capture all.
self.record_and_log(&writebuf);
}
res
}
fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
Ok(res)
}
#[cfg(feature = "testing")]
fn record_and_log(&self, writebuf: &[u8]) {
use std::sync::atomic::Ordering;
let millis = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors
if let Err(e) = res {
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
} else {
tracing::error!(filename, "erroring walredo input saved");
}
}
#[cfg(not(feature = "testing"))]
fn record_and_log(&self, _: &[u8]) {}
}
impl Drop for WalRedoProcess {
fn drop(&mut self) {
self.child
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
// no way to wait for stderr_logger_task from Drop because that is async only
}
}

View File

@@ -59,6 +59,7 @@ from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
@@ -79,6 +80,7 @@ from fixtures.utils import (
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
)
@@ -1390,8 +1392,8 @@ def neon_env_builder(
test_overlay_dir: Path,
top_output_dir: Path,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_aux_file_policy: Optional[AuxFileStore],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -4419,3 +4421,79 @@ def parse_project_git_version_output(s: str) -> str:
return commit
raise ValueError(f"unable to parse --version output: '{s}'")
def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""
if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None
if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None
ps_http = pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)
def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)
# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)

View File

@@ -0,0 +1,175 @@
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import s3_storage
from fixtures.utils import humantime_to_ms
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("io_engine", ["tokio-epoll-uring", "std-fs"])
@pytest.mark.parametrize("concurrency_per_target", [1, 10, 100])
@pytest.mark.timeout(1000)
def test_download_churn(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
io_engine: str,
concurrency_per_target: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_ondemand_download_churn.{metric}", **kwargs)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
# we don't capture `duration`, but instead use the `runtime` output field from pagebench
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
# Setup env
env = setup_env(neon_env_builder, pg_bin)
env.pageserver.allowed_errors.append(
f".*path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration)
def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# We configure tenant conf such that SQL query below produces a lot of layers.
# We don't care what's in the layers really, we just care that layers are created.
bytes_per_layer = 10 * (1024**2)
env = neon_env_builder.init_start(
initial_tenant_conf={
"pitr_interval": "1000d", # let's not make it get in the way
"gc_period": "0s", # disable periodic gc to avoid noise
"compaction_period": "0s", # disable L0=>L1 compaction
"checkpoint_timeout": "10years", # rely solely on checkpoint_distance
"checkpoint_distance": bytes_per_layer, # 10M instead of 256M to create more smaller layers
"image_creation_threshold": 100000, # don't create image layers ever
}
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as ep:
ep.safe_psql("CREATE TABLE data (random_text text)")
bytes_per_row = 512 # make big enough so WAL record size doesn't dominate
desired_layers = 300
desired_bytes = bytes_per_layer * desired_layers
nrows = desired_bytes / bytes_per_row
ep.safe_psql(
f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i",
options="-c statement_timeout=0",
)
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
# TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here
wait_for_upload_queue_empty(client, tenant_id, timeline_id)
return env
def run_benchmark(
env: NeonEnv,
pg_bin: PgBin,
record,
io_engine: str,
concurrency_per_target: int,
duration_secs: int,
):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"ondemand-download-churn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--runtime",
f"{duration_secs}s",
"--set-io-engine",
f"{io_engine}",
"--concurrency-per-target",
f"{concurrency_per_target}",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
metric = "downloads_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "downloads_bytes"
record(
metric,
metric_value=results[metric],
unit="byte",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "evictions_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "timeline_restarts"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "runtime"
record(
metric,
metric_value=humantime_to_ms(results[metric]) / 1000,
unit="s",
report=MetricReport.TEST_PARAM,
)

View File

@@ -56,14 +56,8 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
(tenant0, timeline0, pg0) = tenant_timelines[0]
log.info(f"Timeline {tenant0}/{timeline0} is left intact")
(tenant1, timeline1, pg1) = tenant_timelines[1]
metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
with open(metadata_path, "w") as f:
f.write("overwritten with garbage!")
log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled")
(tenant2, timeline2, pg2) = tenant_timelines[2]
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/"
(tenant1, timeline1, pg1) = tenant_timelines[2]
timeline_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Corrupt it
@@ -72,7 +66,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
with open(p, "wb") as f:
f.truncate(0)
f.truncate(size)
log.info(f"Timeline {tenant2}/{timeline2} got its local layer files spoiled")
log.info(f"Timeline {tenant1}/{timeline1} got its local layer files spoiled")
env.pageserver.start()
@@ -80,19 +74,15 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
pg0.start()
assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# Tenant with corrupt local metadata works: remote storage is authoritative for metadata
pg1.start()
assert pg1.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100
# Second timeline will fail during basebackup, because the local layer file is corrupt.
# It will fail when we try to read (and reconstruct) a page from it, ergo the error message.
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
pg2.start()
pg1.start()
log.info(
f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}"
f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}"
)

View File

@@ -1,10 +1,12 @@
import enum
import json
import os
from typing import Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions
from fixtures.pageserver.http import PageserverApiException
from fixtures.workload import Workload
AGGRESIVE_COMPACTION_TENANT_CONF = {
@@ -190,3 +192,61 @@ def test_sharding_compaction(
# Assert that everything is still readable
workload.validate()
class CompactionAlgorithm(str, enum.Enum):
LEGACY = "Legacy"
TIERED = "Tiered"
@pytest.mark.parametrize(
"compaction_algorithm", [CompactionAlgorithm.LEGACY, CompactionAlgorithm.TIERED]
)
def test_uploads_and_deletions(
neon_env_builder: NeonEnvBuilder,
compaction_algorithm: CompactionAlgorithm,
):
"""
:param compaction_algorithm: the compaction algorithm to use.
"""
tenant_conf = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
# TODO remove these allowed errors
# https://github.com/neondatabase/neon/issues/7707
# https://github.com/neondatabase/neon/issues/7759
allowed_errors = [
".*duplicated L1 layer.*",
".*delta layer created with.*duplicate values.*",
".*assertion failed: self.lsn_range.start <= lsn.*",
".*HTTP request handler task panicked: task.*panicked.*",
]
if compaction_algorithm == CompactionAlgorithm.TIERED:
env.pageserver.allowed_errors.extend(allowed_errors)
try:
generate_uploads_and_deletions(env, pageserver=env.pageserver)
except PageserverApiException as e:
log.info(f"Obtained PageserverApiException: {e}")
# The errors occur flakily and no error is ensured to occur,
# however at least one of them occurs.
if compaction_algorithm == CompactionAlgorithm.TIERED:
found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors)
if not found_allowed_error:
raise Exception("None of the allowed_errors occured in the log")

View File

@@ -1,35 +0,0 @@
import pytest
from fixtures.neon_fixtures import (
NeonEnvBuilder,
last_flush_lsn_upload,
)
@pytest.mark.parametrize("kind", ["sync", "async"])
def test_walredo_process_kind_config(neon_env_builder: NeonEnvBuilder, kind: str):
neon_env_builder.pageserver_config_override = f"walredo_process_kind = '{kind}'"
# ensure it starts
env = neon_env_builder.init_start()
# ensure the metric is set
ps_http = env.pageserver.http_client()
metrics = ps_http.get_metrics()
samples = metrics.query_all("pageserver_wal_redo_process_kind")
assert [(s.labels, s.value) for s in samples] == [({"kind": kind}, 1)]
# ensure default tenant's config kind matches
# => write some data to force-spawn walredo
ep = env.endpoints.create_start("main")
with ep.connect() as conn:
with conn.cursor() as cur:
cur.execute("create table foo(bar text)")
cur.execute("insert into foo select from generate_series(1, 100)")
last_flush_lsn_upload(env, ep, env.initial_tenant, env.initial_timeline)
ep.stop()
ep.start()
with ep.connect() as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from foo")
[(count,)] = cur.fetchall()
assert count == 100
status = ps_http.tenant_status(env.initial_tenant)
assert status["walredo"]["process"]["kind"] == kind

View File

@@ -21,11 +21,9 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
S3Scrubber,
flush_ep_to_pageserver,
last_flush_lsn_upload,
generate_uploads_and_deletions,
)
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
@@ -33,12 +31,11 @@ from fixtures.pageserver.utils import (
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.utils import print_gc_result, wait_until
from fixtures.utils import wait_until
from fixtures.workload import Workload
# A tenant configuration that is convenient for generating uploads and deletions
@@ -59,82 +56,6 @@ TENANT_CONF = {
}
def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""
if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None
if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None
ps_http = pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)
def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)
# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
def read_all(
env: NeonEnv, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None
):

View File

@@ -293,6 +293,16 @@ files:
values: [checkpoints_timed]
query: |
SELECT checkpoints_timed FROM pg_stat_bgwriter;
# Number of slots is limited by max_replication_slots, so collecting position for all of them shouldn't be bad.
- metric_name: logical_slot_restart_lsn
type: gauge
help: 'restart_lsn of logical slots'
key_labels:
- slot_name
values: [restart_lsn]
query: |
select slot_name, restart_lsn from pg_replication_slots where slot_type = 'logical';
- filename: neon_collector_autoscaling.yml
content: |
collector_name: neon_collector_autoscaling