Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
7b3fabbe8e docker build tokio_unstable 2024-05-11 11:58:01 +01:00
Conrad Ludgate
2aa74d1aab proxy: add measured-tokio 2024-05-11 11:33:24 +01:00
157 changed files with 2126 additions and 3072 deletions

View File

@@ -2,6 +2,7 @@
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
rustdocflags = ["-Arustdoc::private_intra_doc_links"]
rustflags = ["--cfg=tokio_unstable"]
[alias]
build_testing = ["build", "--features", "testing"]

View File

@@ -5,7 +5,6 @@ self-hosted-runner:
- large
- large-arm64
- small
- small-arm64
- us-east-2
config-variables:
- REMOTE_STORAGE_AZURE_CONTAINER

View File

@@ -546,27 +546,9 @@ jobs:
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
report-benchmarks-failures:
needs: [ benchmarks, create-test-report ]
if: github.ref_name == 'main' && needs.benchmarks.result == 'failure'
runs-on: ubuntu-latest
steps:
- uses: slackapi/slack-github-action@v1
with:
channel-id: C060CNA47S9 # on-call-staging-storage-stream
slack-message: |
Benchmarks failed on main: ${{ github.event.head_commit.url }}
Allure report: ${{ needs.create-test-report.outputs.report-url }}
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
create-test-report:
needs: [ check-permissions, regress-tests, coverage-report, benchmarks, build-build-tools-image ]
if: ${{ !cancelled() && contains(fromJSON('["skipped", "success"]'), needs.check-permissions.result) }}
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, gen3, small ]
container:

View File

@@ -136,7 +136,7 @@ jobs:
check-linux-arm-build:
needs: [ check-permissions, build-build-tools-image ]
timeout-minutes: 90
runs-on: [ self-hosted, small-arm64 ]
runs-on: [ self-hosted, large-arm64 ]
env:
# Use release build only, to have less debug info around
@@ -260,7 +260,7 @@ jobs:
check-codestyle-rust-arm:
needs: [ check-permissions, build-build-tools-image ]
timeout-minutes: 90
runs-on: [ self-hosted, small-arm64 ]
runs-on: [ self-hosted, large-arm64 ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}

14
Cargo.lock generated
View File

@@ -3038,6 +3038,17 @@ dependencies = [
"procfs 0.16.0",
]
[[package]]
name = "measured-tokio"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b4ed0773ddbda3a85e39d0e094934549c410ca686a5095bcd72fb62100252a0"
dependencies = [
"itoa",
"measured",
"tokio",
]
[[package]]
name = "memchr"
version = "2.6.4"
@@ -3079,6 +3090,7 @@ dependencies = [
"libc",
"measured",
"measured-process",
"measured-tokio",
"once_cell",
"procfs 0.14.2",
"prometheus",
@@ -5952,7 +5964,7 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg_fmt"
version = "0.4.2"
source = "git+https://github.com/nical/rust_debug?rev=28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4#28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4"
source = "git+https://github.com/neondatabase/fork--nical--rust_debug?branch=neon#c1820b28664b5df68de7f043fccf2ed5d67b6ae8"
[[package]]
name = "syn"

View File

@@ -109,6 +109,7 @@ leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-tokio = { version = "0.0.21" }
measured-process = { version = "0.0.21" }
memoffset = "0.8"
native-tls = "0.2"
@@ -158,8 +159,8 @@ socket2 = "0.5"
strum = "0.24"
strum_macros = "0.24"
"subtle" = "2.5.0"
# Our PR https://github.com/nical/rust_debug/pull/4 has been merged but no new version released yet
svg_fmt = { git = "https://github.com/nical/rust_debug", rev = "28a7d96eecff2f28e75b1ea09f2d499a60d0e3b4" }
# https://github.com/nical/rust_debug/pull/4
svg_fmt = { git = "https://github.com/neondatabase/fork--nical--rust_debug", branch = "neon" }
sync_wrapper = "0.1.2"
tar = "0.4"
task-local-extensions = "0.1.4"

View File

@@ -47,7 +47,7 @@ COPY --chown=nonroot . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment --cfg=tokio_unstable" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -87,7 +87,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
&& rm awscliv2.zip
# Mold: A Modern Linker
ENV MOLD_VERSION v2.31.0
ENV MOLD_VERSION v2.4.0
RUN set -e \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \

View File

@@ -152,9 +152,6 @@ pub struct NeonStorageControllerConf {
/// Heartbeat timeout before marking a node offline
#[serde(with = "humantime_serde")]
pub max_unavailable: Duration,
/// Threshold for auto-splitting a tenant into shards
pub split_threshold: Option<u64>,
}
impl NeonStorageControllerConf {
@@ -167,7 +164,6 @@ impl Default for NeonStorageControllerConf {
fn default() -> Self {
Self {
max_unavailable: Self::DEFAULT_MAX_UNAVAILABLE_INTERVAL,
split_threshold: None,
}
}
}

View File

@@ -305,10 +305,6 @@ impl StorageController {
));
}
if let Some(split_threshold) = self.config.split_threshold.as_ref() {
args.push(format!("--split-threshold={split_threshold}"))
}
background_process::start_process(
COMMAND,
&self.env.base_data_dir,

View File

@@ -18,6 +18,9 @@ workspace_hack.workspace = true
procfs.workspace = true
measured-process.workspace = true
[target.'cfg(tokio_unstable)'.dependencies]
measured-tokio.workspace = true
[dev-dependencies]
rand = "0.8"
rand_distr = "0.4.3"

View File

@@ -148,6 +148,11 @@ pub struct NeonMetrics {
#[metric(init = measured_process::ProcessCollector::for_self())]
process: measured_process::ProcessCollector,
#[cfg(tokio_unstable)]
#[metric(namespace = "tokio")]
#[metric(init = measured_tokio::NamedRuntimesCollector::new())]
pub tokio: measured_tokio::NamedRuntimesCollector,
#[metric(namespace = "libmetrics")]
#[metric(init = LibMetrics::new(build_info))]
libmetrics: LibMetrics,

View File

@@ -745,16 +745,6 @@ impl HistoricLayerInfo {
};
*field = value;
}
pub fn layer_file_size(&self) -> u64 {
match self {
HistoricLayerInfo::Delta {
layer_file_size, ..
} => *layer_file_size,
HistoricLayerInfo::Image {
layer_file_size, ..
} => *layer_file_size,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
@@ -786,6 +776,9 @@ pub struct TimelineGcRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalRedoManagerProcessStatus {
pub pid: u32,
/// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`.
/// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`.
pub kind: Cow<'static, str>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -824,55 +817,6 @@ pub struct TenantScanRemoteStorageResponse {
pub shards: Vec<TenantScanRemoteStorageShard>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum TenantSorting {
ResidentSize,
MaxLogicalSize,
}
impl Default for TenantSorting {
fn default() -> Self {
Self::ResidentSize
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TopTenantShardsRequest {
// How would you like to sort the tenants?
pub order_by: TenantSorting,
// How many results?
pub limit: usize,
// Omit tenants with more than this many shards (e.g. if this is the max number of shards
// that the caller would ever split to)
pub where_shards_lt: Option<ShardCount>,
// Omit tenants where the ordering metric is less than this (this is an optimization to
// let us quickly exclude numerous tiny shards)
pub where_gt: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct TopTenantShardItem {
pub id: TenantShardId,
/// Total size of layers on local disk for all timelines in this tenant
pub resident_size: u64,
/// Total size of layers in remote storage for all timelines in this tenant
pub physical_size: u64,
/// The largest logical size of a timeline within this tenant
pub max_logical_size: u64,
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TopTenantShardsResponse {
pub shards: Vec<TopTenantShardItem>,
}
pub mod virtual_file {
#[derive(
Copy,

View File

@@ -125,7 +125,7 @@ impl ShardCount {
/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub const fn new(val: u8) -> Self {
pub fn new(val: u8) -> Self {
Self(val)
}
}

View File

@@ -29,7 +29,6 @@ use http_types::{StatusCode, Url};
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::RemoteStorageActivity;
use crate::{
error::Cancelled, s3_bucket::RequestKind, AzureConfig, ConcurrencyLimiter, Download,
DownloadError, Listing, ListingMode, RemotePath, RemoteStorage, StorageMetadata,
@@ -526,10 +525,6 @@ impl RemoteStorage for AzureBlobStorage {
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Err(TimeTravelError::Unimplemented)
}
fn activity(&self) -> RemoteStorageActivity {
self.concurrency_limiter.activity()
}
}
pin_project_lite::pin_project! {

View File

@@ -263,17 +263,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
done_if_after: SystemTime,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError>;
/// Query how busy we currently are: may be used by callers which wish to politely
/// back off if there are already a lot of operations underway.
fn activity(&self) -> RemoteStorageActivity;
}
pub struct RemoteStorageActivity {
pub read_available: usize,
pub read_total: usize,
pub write_available: usize,
pub write_total: usize,
}
/// DownloadStream is sensitive to the timeout and cancellation used with the original
@@ -455,15 +444,6 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
}
pub fn activity(&self) -> RemoteStorageActivity {
match self {
Self::LocalFs(s) => s.activity(),
Self::AwsS3(s) => s.activity(),
Self::AzureBlob(s) => s.activity(),
Self::Unreliable(s) => s.activity(),
}
}
}
impl GenericRemoteStorage {
@@ -794,9 +774,6 @@ struct ConcurrencyLimiter {
// The helps to ensure we don't exceed the thresholds.
write: Arc<Semaphore>,
read: Arc<Semaphore>,
write_total: usize,
read_total: usize,
}
impl ConcurrencyLimiter {
@@ -825,21 +802,10 @@ impl ConcurrencyLimiter {
Arc::clone(self.for_kind(kind)).acquire_owned().await
}
fn activity(&self) -> RemoteStorageActivity {
RemoteStorageActivity {
read_available: self.read.available_permits(),
read_total: self.read_total,
write_available: self.write.available_permits(),
write_total: self.write_total,
}
}
fn new(limit: usize) -> ConcurrencyLimiter {
Self {
read: Arc::new(Semaphore::new(limit)),
write: Arc::new(Semaphore::new(limit)),
read_total: limit,
write_total: limit,
}
}
}

View File

@@ -23,8 +23,8 @@ use tokio_util::{io::ReaderStream, sync::CancellationToken};
use utils::crashsafe::path_with_suffix_extension;
use crate::{
Download, DownloadError, Listing, ListingMode, RemotePath, RemoteStorageActivity,
TimeTravelError, TimeoutOrCancel, REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, Listing, ListingMode, RemotePath, TimeTravelError, TimeoutOrCancel,
REMOTE_STORAGE_PREFIX_SEPARATOR,
};
use super::{RemoteStorage, StorageMetadata};
@@ -605,16 +605,6 @@ impl RemoteStorage for LocalFs {
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}
fn activity(&self) -> RemoteStorageActivity {
// LocalFS has no concurrency limiting: give callers the impression that plenty of units are available
RemoteStorageActivity {
read_available: 16,
read_total: 16,
write_available: 16,
write_total: 16,
}
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {

View File

@@ -47,8 +47,8 @@ use utils::backoff;
use super::StorageMetadata;
use crate::{
error::Cancelled, support::PermitCarrying, ConcurrencyLimiter, Download, DownloadError,
Listing, ListingMode, RemotePath, RemoteStorage, RemoteStorageActivity, S3Config,
TimeTravelError, TimeoutOrCancel, MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
Listing, ListingMode, RemotePath, RemoteStorage, S3Config, TimeTravelError, TimeoutOrCancel,
MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
pub(super) mod metrics;
@@ -975,10 +975,6 @@ impl RemoteStorage for S3Bucket {
}
Ok(())
}
fn activity(&self) -> RemoteStorageActivity {
self.concurrency_limiter.activity()
}
}
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].

View File

@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
use crate::{
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
RemoteStorageActivity, StorageMetadata, TimeTravelError,
StorageMetadata, TimeTravelError,
};
pub struct UnreliableWrapper {
@@ -213,8 +213,4 @@ impl RemoteStorage for UnreliableWrapper {
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
fn activity(&self) -> RemoteStorageActivity {
self.inner.activity()
}
}

View File

@@ -30,27 +30,47 @@
//! 2024-04-15 on i3en.3xlarge
//!
//! ```text
//! short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! async-short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! async-short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! async-short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! async-short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! async-short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! async-short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! async-short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! async-short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! async-medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! async-medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! async-medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! async-medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! async-medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! async-medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! async-medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! async-medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! sync-short/1 time: [25.503 µs 25.626 µs 25.771 µs]
//! sync-short/2 time: [30.850 µs 31.013 µs 31.208 µs]
//! sync-short/4 time: [45.543 µs 45.856 µs 46.193 µs]
//! sync-short/8 time: [84.114 µs 84.639 µs 85.220 µs]
//! sync-short/16 time: [185.22 µs 186.15 µs 187.13 µs]
//! sync-short/32 time: [377.43 µs 378.87 µs 380.46 µs]
//! sync-short/64 time: [756.49 µs 759.04 µs 761.70 µs]
//! sync-short/128 time: [1.4825 ms 1.4874 ms 1.4923 ms]
//! sync-medium/1 time: [105.66 µs 106.01 µs 106.43 µs]
//! sync-medium/2 time: [153.10 µs 153.84 µs 154.72 µs]
//! sync-medium/4 time: [327.13 µs 329.44 µs 332.27 µs]
//! sync-medium/8 time: [654.26 µs 658.73 µs 663.63 µs]
//! sync-medium/16 time: [1.2682 ms 1.2748 ms 1.2816 ms]
//! sync-medium/32 time: [2.4456 ms 2.4595 ms 2.4731 ms]
//! sync-medium/64 time: [4.6523 ms 4.6890 ms 4.7256 ms]
//! sync-medium/128 time: [8.7215 ms 8.8323 ms 8.9344 ms]
//! ```
use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
use pageserver::{
config::PageServerConf,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, ProcessKind},
};
use pageserver_api::{key::Key, shard::TenantShardId};
use std::{
sync::Arc,
@@ -60,32 +80,39 @@ use tokio::{sync::Barrier, task::JoinSet};
use utils::{id::TenantId, lsn::Lsn};
fn bench(c: &mut Criterion) {
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("short");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
for process_kind in &[ProcessKind::Async, ProcessKind::Sync] {
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(format!("{process_kind}-short"));
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| {
bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients)
});
},
);
}
}
}
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("medium");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(format!("{process_kind}-medium"));
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| {
bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients)
});
},
);
}
}
}
}
@@ -93,10 +120,16 @@ criterion::criterion_group!(benches, bench);
criterion::criterion_main!(benches);
// Returns the sum of each client's wall-clock time spent executing their share of the n_redos.
fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration {
fn bench_impl(
process_kind: ProcessKind,
redo_work: Arc<Request>,
n_redos: u64,
nclients: u64,
) -> Duration {
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let mut conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
conf.walredo_process_kind = process_kind;
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
@@ -125,13 +158,27 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
});
}
rt.block_on(async move {
let elapsed = rt.block_on(async move {
let mut total_wallclock_time = Duration::ZERO;
while let Some(res) = tasks.join_next().await {
total_wallclock_time += res.unwrap();
}
total_wallclock_time
})
});
// consistency check to ensure process kind setting worked
if nredos_per_client > 0 {
assert_eq!(
manager
.status()
.process
.map(|p| p.kind)
.expect("the benchmark work causes a walredo process to be spawned"),
std::borrow::Cow::Borrowed(process_kind.into())
);
}
elapsed
}
async fn client(

View File

@@ -486,18 +486,6 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn top_tenant_shards(
&self,
request: TopTenantShardsRequest,
) -> Result<TopTenantShardsResponse> {
let uri = format!("{}/v1/top_tenants", self.mgmt_api_endpoint);
self.request(Method::POST, uri, request)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn layer_map_info(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -530,6 +530,8 @@ where
// If we have accumulated only a narrow band of keyspace, create an
// image layer. Otherwise write a delta layer.
// FIXME: deal with the case of lots of values for same key
// FIXME: we are ignoring images here. Did we already divide the work
// so that we won't encounter them here?
@@ -548,93 +550,38 @@ where
let mut new_jobs = Vec::new();
// Slide a window through the keyspace
let mut key_accum =
std::pin::pin!(accum_key_values(key_value_stream, self.target_file_size));
let mut key_accum = std::pin::pin!(accum_key_values(key_value_stream));
let mut all_in_window: bool = false;
let mut window = Window::new();
// Helper function to create a job for a new delta layer with given key-lsn
// rectangle.
let create_delta_job = |key_range, lsn_range: &Range<Lsn>, new_jobs: &mut Vec<_>| {
// The inputs for the job are all the input layers of the original job that
// overlap with the rectangle.
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
};
loop {
if all_in_window && window.is_empty() {
if all_in_window && window.elems.is_empty() {
// All done!
break;
}
// If we now have enough keyspace for next delta layer in the window, create a
// new delta layer
if let Some(key_range) = window.choose_next_delta(self.target_file_size, !all_in_window)
{
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
continue;
}
assert!(!all_in_window);
// Process next key in the key space
match key_accum.next().await.transpose()? {
None => {
all_in_window = true;
}
Some(next_key) if next_key.partition_lsns.is_empty() => {
// Normal case: extend the window by the key
let batch_layers: Vec<LayerId> = job
.input_layers
.iter()
.filter(|layer_id| {
overlaps_with(self.layers[layer_id.0].layer.key_range(), &key_range)
})
.cloned()
.collect();
assert!(!batch_layers.is_empty());
new_jobs.push(CompactionJob {
key_range,
lsn_range: job.lsn_range.clone(),
strategy: CompactionStrategy::CreateDelta,
input_layers: batch_layers,
completed: false,
});
} else {
assert!(!all_in_window);
if let Some(next_key) = key_accum.next().await.transpose()? {
window.feed(next_key.key, next_key.size);
}
Some(next_key) => {
// A key with too large size impact for a single delta layer. This
// case occurs if you make a huge number of updates for a single key.
//
// Drain the window with has_more = false to make a clean cut before
// the key, and then make dedicated delta layers for the single key.
//
// We cannot cluster the key with the others, because we don't want
// layer files to overlap with each other in the lsn,key space (no
// overlaps for the rectangles).
let key = next_key.key;
debug!("key {key} with size impact larger than the layer size");
while !window.is_empty() {
let has_more = false;
let key_range = window.choose_next_delta(self.target_file_size, has_more)
.expect("with has_more==false, choose_next_delta always returns something for a non-empty Window");
create_delta_job(key_range, &job.lsn_range, &mut new_jobs);
}
// Not really required: but here for future resilience:
// We make a "gap" here, so any structure the window holds should
// probably be reset.
window = Window::new();
let mut prior_lsn = job.lsn_range.start;
let mut lsn_ranges = Vec::new();
for (lsn, _size) in next_key.partition_lsns.iter() {
lsn_ranges.push(prior_lsn..*lsn);
prior_lsn = *lsn;
}
lsn_ranges.push(prior_lsn..job.lsn_range.end);
for lsn_range in lsn_ranges {
let key_range = key..key.next();
create_delta_job(key_range, &lsn_range, &mut new_jobs);
}
} else {
all_in_window = true;
}
}
}
@@ -856,10 +803,6 @@ where
self.elems.front().unwrap().accum_size - self.splitoff_size
}
fn is_empty(&self) -> bool {
self.elems.is_empty()
}
fn commit_upto(&mut self, mut upto: usize) {
while upto > 1 {
let popped = self.elems.pop_front().unwrap();

View File

@@ -235,14 +235,9 @@ pub struct KeySize<K> {
pub key: K,
pub num_values: u64,
pub size: u64,
/// The lsns to partition at (if empty then no per-lsn partitioning)
pub partition_lsns: Vec<(Lsn, u64)>,
}
pub fn accum_key_values<'a, I, K, D, E>(
input: I,
target_size: u64,
) -> impl Stream<Item = Result<KeySize<K>, E>>
pub fn accum_key_values<'a, I, K, D, E>(input: I) -> impl Stream<Item = Result<KeySize<K>, E>>
where
K: Eq + PartialOrd + Display + Copy,
I: Stream<Item = Result<D, E>>,
@@ -254,35 +249,25 @@ where
if let Some(first) = input.next().await {
let first = first?;
let mut part_size = first.size();
let mut accum: KeySize<K> = KeySize {
key: first.key(),
num_values: 1,
size: part_size,
partition_lsns: Vec::new(),
size: first.size(),
};
let mut last_key = accum.key;
while let Some(this) = input.next().await {
let this = this?;
if this.key() == accum.key {
let add_size = this.size();
if part_size + add_size > target_size {
accum.partition_lsns.push((this.lsn(), part_size));
part_size = 0;
}
part_size += add_size;
accum.size += add_size;
accum.size += this.size();
accum.num_values += 1;
} else {
assert!(last_key <= accum.key, "last_key={last_key} <= accum.key={}", accum.key);
last_key = accum.key;
yield accum;
part_size = this.size();
accum = KeySize {
key: this.key(),
num_values: 1,
size: part_size,
partition_lsns: Vec::new(),
size: this.size(),
};
}
}

View File

@@ -184,12 +184,6 @@ impl<L> Level<L> {
}
let mut events: Vec<Event<K>> = Vec::new();
for (idx, l) in self.layers.iter().enumerate() {
let key_range = l.key_range();
if key_range.end == key_range.start.next() && l.is_delta() {
// Ignore single-key delta layers as they can be stacked on top of each other
// as that is the only way to cut further.
continue;
}
events.push(Event {
key: l.key_range().start,
layer_idx: idx,

View File

@@ -20,6 +20,10 @@ pub(crate) fn setup_logging() {
/// even if we produce an extremely narrow delta layer, spanning just that one
/// key, we still too many records to fit in the target file size. We need to
/// split in the LSN dimension too in that case.
///
/// TODO: The code to avoid this problem has not been implemented yet! So the
/// assertion currently fails, but we need to make it not fail.
#[ignore]
#[tokio::test]
async fn test_many_updates_for_single_key() {
setup_logging();
@@ -39,9 +43,9 @@ async fn test_many_updates_for_single_key() {
}
for l in executor.live_layers.iter() {
assert!(l.file_size() < executor.target_file_size * 2);
// Sanity check that none of the delta layers are empty either.
// sanity check that none of the delta layers are stupidly small either
if l.is_delta() {
assert!(l.file_size() > 0);
assert!(l.file_size() > executor.target_file_size / 2);
}
}
}

View File

@@ -52,6 +52,7 @@
use anyhow::{Context, Result};
use pageserver::repository::Key;
use pageserver::METADATA_FILE_NAME;
use std::cmp::Ordering;
use std::io::{self, BufRead};
use std::path::PathBuf;
@@ -82,11 +83,6 @@ fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let split: Vec<&str> = name.split("__").collect();
let keys: Vec<&str> = split[0].split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
if lsns.last().expect("should").len() == 8 {
lsns.pop();
}
if lsns.len() == 1 {
lsns.push(lsns[0]);
}
@@ -158,6 +154,10 @@ pub fn main() -> Result<()> {
let line = PathBuf::from_str(&line).unwrap();
let filename = line.file_name().unwrap();
let filename = filename.to_str().unwrap();
if filename == METADATA_FILE_NAME {
// Don't try and parse "metadata" like a key-lsn range
continue;
}
let (key_range, lsn_range) = parse_filename(filename);
files.push(Layer {
filename: filename.to_owned(),

View File

@@ -100,7 +100,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
let file = VirtualFile::open(path, ctx).await?;
let file = VirtualFile::open(path).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;

View File

@@ -61,7 +61,7 @@ async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10, virtual_file::api::IoEngineKind::StdFs);
page_cache::init(100);
let file = VirtualFile::open(path, ctx).await?;
let file = VirtualFile::open(path).await?;
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader.read_blk(0, ctx).await?;

View File

@@ -2,11 +2,9 @@ use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId};
use pageserver_client::mgmt_api;
use rand::seq::SliceRandom;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use utils::id::{TenantTimelineId, TimelineId};
use std::{f64, sync::Arc};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
task::JoinSet,
@@ -14,7 +12,10 @@ use tokio::{
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicU64, Ordering},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
};
@@ -50,31 +51,19 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
Ok(())
}
#[derive(serde::Serialize)]
struct Output {
downloads_count: u64,
downloads_bytes: u64,
evictions_count: u64,
timeline_restarts: u64,
#[serde(with = "humantime_serde")]
runtime: Duration,
}
#[derive(Debug, Default)]
struct LiveStats {
evictions_count: AtomicU64,
downloads_count: AtomicU64,
downloads_bytes: AtomicU64,
evictions: AtomicU64,
downloads: AtomicU64,
timeline_restarts: AtomicU64,
}
impl LiveStats {
fn eviction_done(&self) {
self.evictions_count.fetch_add(1, Ordering::Relaxed);
self.evictions.fetch_add(1, Ordering::Relaxed);
}
fn download_done(&self, size: u64) {
self.downloads_count.fetch_add(1, Ordering::Relaxed);
self.downloads_bytes.fetch_add(size, Ordering::Relaxed);
fn download_done(&self) {
self.downloads.fetch_add(1, Ordering::Relaxed);
}
fn timeline_restart_done(&self) {
self.timeline_restarts.fetch_add(1, Ordering::Relaxed);
@@ -103,49 +92,28 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
)
.await?;
let token = CancellationToken::new();
let mut tasks = JoinSet::new();
let periodic_stats = Arc::new(LiveStats::default());
let total_stats = Arc::new(LiveStats::default());
let start = Instant::now();
let live_stats = Arc::new(LiveStats::default());
tasks.spawn({
let periodic_stats = Arc::clone(&periodic_stats);
let total_stats = Arc::clone(&total_stats);
let cloned_token = token.clone();
let live_stats = Arc::clone(&live_stats);
async move {
let mut last_at = Instant::now();
loop {
if cloned_token.is_cancelled() {
return;
}
tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await;
let now = Instant::now();
let delta: Duration = now - last_at;
last_at = now;
let LiveStats {
evictions_count,
downloads_count,
downloads_bytes,
evictions,
downloads,
timeline_restarts,
} = &*periodic_stats;
let evictions_count = evictions_count.swap(0, Ordering::Relaxed);
let downloads_count = downloads_count.swap(0, Ordering::Relaxed);
let downloads_bytes = downloads_bytes.swap(0, Ordering::Relaxed);
} = &*live_stats;
let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed);
total_stats.evictions_count.fetch_add(evictions_count, Ordering::Relaxed);
total_stats.downloads_count.fetch_add(downloads_count, Ordering::Relaxed);
total_stats.downloads_bytes.fetch_add(downloads_bytes, Ordering::Relaxed);
total_stats.timeline_restarts.fetch_add(timeline_restarts, Ordering::Relaxed);
let evictions_per_s = evictions_count as f64 / delta.as_secs_f64();
let downloads_per_s = downloads_count as f64 / delta.as_secs_f64();
let downloads_mibs_per_s = downloads_bytes as f64 / delta.as_secs_f64() / ((1 << 20) as f64);
info!("evictions={evictions_per_s:.2}/s downloads={downloads_per_s:.2}/s download_bytes={downloads_mibs_per_s:.2}MiB/s timeline_restarts={timeline_restarts}");
info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}");
}
}
});
@@ -156,42 +124,14 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
args,
Arc::clone(&mgmt_api_client),
tl,
Arc::clone(&periodic_stats),
token.clone(),
Arc::clone(&live_stats),
));
}
}
if let Some(runtime) = args.runtime {
tokio::spawn(async move {
tokio::time::sleep(runtime.into()).await;
token.cancel();
});
}
while let Some(res) = tasks.join_next().await {
res.unwrap();
}
let end = Instant::now();
let duration: Duration = end - start;
let output = {
let LiveStats {
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*total_stats;
Output {
downloads_count: downloads_count.load(Ordering::Relaxed),
downloads_bytes: downloads_bytes.load(Ordering::Relaxed),
evictions_count: evictions_count.load(Ordering::Relaxed),
timeline_restarts: timeline_restarts.load(Ordering::Relaxed),
runtime: duration,
}
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
Ok(())
}
@@ -200,7 +140,6 @@ async fn timeline_actor(
mgmt_api_client: Arc<pageserver_client::mgmt_api::Client>,
timeline: TenantTimelineId,
live_stats: Arc<LiveStats>,
token: CancellationToken,
) {
// TODO: support sharding
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
@@ -210,7 +149,7 @@ async fn timeline_actor(
layers: Vec<mpsc::Sender<OwnedSemaphorePermit>>,
concurrency: Arc<tokio::sync::Semaphore>,
}
while !token.is_cancelled() {
loop {
debug!("restarting timeline");
let layer_map_info = mgmt_api_client
.layer_map_info(tenant_shard_id, timeline.timeline_id)
@@ -246,7 +185,7 @@ async fn timeline_actor(
live_stats.timeline_restart_done();
while !token.is_cancelled() {
loop {
assert!(!timeline.joinset.is_empty());
if let Some(res) = timeline.joinset.try_join_next() {
debug!(?res, "a layer actor exited, should not happen");
@@ -316,7 +255,7 @@ async fn layer_actor(
.layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name())
.await
.unwrap();
live_stats.download_done(layer.layer_file_size());
live_stats.download_done();
did_it
}
};

View File

@@ -1,39 +1,15 @@
use std::sync::Arc;
use ::metrics::IntGauge;
use bytes::{Buf, BufMut, Bytes};
use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
use tracing::warn;
// BEGIN Copyright (c) 2017 Servo Contributors
/// Const version of FNV hash.
#[inline]
#[must_use]
pub const fn fnv_hash(bytes: &[u8]) -> u128 {
const INITIAL_STATE: u128 = 0x6c62272e07bb014262b821756295c58d;
const PRIME: u128 = 0x0000000001000000000000000000013B;
let mut hash = INITIAL_STATE;
let mut i = 0;
while i < bytes.len() {
hash ^= bytes[i] as u128;
hash = hash.wrapping_mul(PRIME);
i += 1;
}
hash
}
// END Copyright (c) 2017 Servo Contributors
/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, least significant 13B of FNV hash].
/// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, first 13B of 128b xxhash].
fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key {
let mut key: [u8; 16] = [0; METADATA_KEY_SIZE];
let hash = fnv_hash(data).to_be_bytes();
let mut key = [0; METADATA_KEY_SIZE];
let hash = twox_hash::xxh3::hash128(data).to_be_bytes();
key[0] = AUX_KEY_PREFIX;
key[1] = dir_level1;
key[2] = dir_level2;
key[3..16].copy_from_slice(&hash[3..16]);
key[3..16].copy_from_slice(&hash[0..13]);
Key::from_metadata_key_fixed_size(&key)
}
@@ -164,55 +140,6 @@ pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result<Vec<u8>> {
Ok(encoded)
}
/// An estimation of the size of aux files.
pub struct AuxFileSizeEstimator {
aux_file_size_gauge: IntGauge,
size: Arc<std::sync::Mutex<Option<isize>>>,
}
impl AuxFileSizeEstimator {
pub fn new(aux_file_size_gauge: IntGauge) -> Self {
Self {
aux_file_size_gauge,
size: Arc::new(std::sync::Mutex::new(None)),
}
}
pub fn on_base_backup(&self, new_size: usize) {
let mut guard = self.size.lock().unwrap();
*guard = Some(new_size as isize);
self.report(new_size as isize);
}
pub fn on_add(&self, file_size: usize) {
let mut guard = self.size.lock().unwrap();
if let Some(size) = &mut *guard {
*size += file_size as isize;
self.report(*size);
}
}
pub fn on_remove(&self, file_size: usize) {
let mut guard = self.size.lock().unwrap();
if let Some(size) = &mut *guard {
*size -= file_size as isize;
self.report(*size);
}
}
pub fn on_update(&self, old_size: usize, new_size: usize) {
let mut guard = self.size.lock().unwrap();
if let Some(size) = &mut *guard {
*size += new_size as isize - old_size as isize;
self.report(*size);
}
}
pub fn report(&self, size: isize) {
self.aux_file_size_gauge.set(size as i64);
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -221,19 +148,15 @@ mod tests {
fn test_hash_portable() {
// AUX file encoding requires the hash to be portable across all platforms. This test case checks
// if the algorithm produces the same hash across different environments.
assert_eq!(
265160408618497461376862998434862070044,
super::fnv_hash("test1".as_bytes())
305317690835051308206966631765527126151,
twox_hash::xxh3::hash128("test1".as_bytes())
);
assert_eq!(
295486155126299629456360817749600553988,
super::fnv_hash("test/test2".as_bytes())
);
assert_eq!(
144066263297769815596495629667062367629,
super::fnv_hash("".as_bytes())
85104974691013376326742244813280798847,
twox_hash::xxh3::hash128("test/test2".as_bytes())
);
assert_eq!(0, twox_hash::xxh3::hash128("".as_bytes()));
}
#[test]
@@ -241,28 +164,28 @@ mod tests {
// To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
// of the page server.
assert_eq!(
"62000001017F8B83D94F7081693471ABF91C",
encode_aux_file_key("pg_logical/mappings/test1").to_string(),
"6200000101E5B20C5F8DD5AA3289D6D9EAFA",
encode_aux_file_key("pg_logical/mappings/test1").to_string()
);
assert_eq!(
"62000001027F8E83D94F7081693471ABFCCD",
encode_aux_file_key("pg_logical/snapshots/test2").to_string(),
"620000010239AAC544893139B26F501B97E6",
encode_aux_file_key("pg_logical/snapshots/test2").to_string()
);
assert_eq!(
"62000001032E07BB014262B821756295C58D",
encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string(),
"620000010300000000000000000000000000",
encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string()
);
assert_eq!(
"62000001FF4F38E1C74754E7D03C1A660178",
encode_aux_file_key("pg_logical/unsupported").to_string(),
"62000001FF8635AF2134B7266EC5B4189FD6",
encode_aux_file_key("pg_logical/unsupported").to_string()
);
assert_eq!(
"62000002017F8D83D94F7081693471ABFB92",
"6200000201772D0E5D71DE14DA86142A1619",
encode_aux_file_key("pg_replslot/test3").to_string()
);
assert_eq!(
"620000FFFF2B6ECC8AEF93F643DC44F15E03",
encode_aux_file_key("other_file_not_supported").to_string(),
"620000FFFF1866EBEB53B807B26A2416F317",
encode_aux_file_key("other_file_not_supported").to_string()
);
}

View File

@@ -284,6 +284,7 @@ fn start_pageserver(
))
.unwrap();
pageserver::preinitialize_metrics();
pageserver::metrics::wal_redo::set_process_kind_metric(conf.walredo_process_kind);
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
@@ -515,12 +516,16 @@ fn start_pageserver(
}
});
let secondary_controller = secondary::spawn_tasks(
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
);
let secondary_controller = if let Some(remote_storage) = &remote_storage {
secondary::spawn_tasks(
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
)
} else {
secondary::null_controller()
};
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
@@ -528,13 +533,15 @@ fn start_pageserver(
// been configured.
let disk_usage_eviction_state: Arc<disk_usage_eviction_task::State> = Arc::default();
launch_disk_usage_global_eviction_task(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
if let Some(remote_storage) = &remote_storage {
launch_disk_usage_global_eviction_task(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}
// Start up the service to handle HTTP mgmt API request. We created the
// listener earlier already.
@@ -647,20 +654,17 @@ fn start_pageserver(
None,
"libpq endpoint listener",
true,
{
let tenant_manager = tenant_manager.clone();
async move {
page_service::libpq_listener_main(
tenant_manager,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
task_mgr::shutdown_token(),
)
.await
}
async move {
page_service::libpq_listener_main(
conf,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,
libpq_ctx,
task_mgr::shutdown_token(),
)
.await
},
);
}
@@ -689,7 +693,14 @@ fn start_pageserver(
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await;
let bg_remote_storage = remote_storage.clone();
let bg_deletion_queue = deletion_queue.clone();
pageserver::shutdown_pageserver(
&tenant_manager,
bg_remote_storage.map(|_| bg_deletion_queue),
0,
)
.await;
unreachable!()
})
}
@@ -697,11 +708,12 @@ fn start_pageserver(
fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<GenericRemoteStorage> {
) -> anyhow::Result<Option<GenericRemoteStorage>> {
let config = if let Some(config) = &conf.remote_storage_config {
config
} else {
anyhow::bail!("no remote storage configured, this is a deprecated configuration");
tracing::warn!("no remote storage configured, this is a deprecated configuration");
return Ok(None);
};
// Create the client
@@ -721,7 +733,7 @@ fn create_remote_storage_client(
GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
}
Ok(remote_storage)
Ok(Some(remote_storage))
}
fn cli() -> Command {

View File

@@ -99,7 +99,7 @@ pub mod defaults {
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "async";
pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "sync";
///
/// Default built-in configuration file.

View File

@@ -632,7 +632,7 @@ impl DeletionQueue {
///
/// If remote_storage is None, then the returned workers will also be None.
pub fn new<C>(
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
control_plane_client: Option<C>,
conf: &'static PageServerConf,
) -> (Self, Option<DeletionQueueWorkers<C>>)
@@ -658,6 +658,23 @@ impl DeletionQueue {
// longer to flush after Tenants have all been torn down.
let cancel = CancellationToken::new();
let remote_storage = match remote_storage {
None => {
return (
Self {
client: DeletionQueueClient {
tx,
executor_tx,
lsn_table: lsn_table.clone(),
},
cancel,
},
None,
)
}
Some(r) => r,
};
(
Self {
client: DeletionQueueClient {
@@ -748,7 +765,7 @@ mod test {
/// Simulate a pageserver restart by destroying and recreating the deletion queue
async fn restart(&mut self) {
let (deletion_queue, workers) = DeletionQueue::new(
self.storage.clone(),
Some(self.storage.clone()),
Some(self.mock_control_plane.clone()),
self.harness.conf,
);
@@ -858,7 +875,7 @@ mod test {
let mock_control_plane = MockControlPlane::new();
let (deletion_queue, worker) = DeletionQueue::new(
storage.clone(),
Some(storage.clone()),
Some(mock_control_plane.clone()),
harness.conf,
);

View File

@@ -1,8 +1,6 @@
//!
//! Management HTTP API
//!
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
@@ -26,11 +24,7 @@ use pageserver_api::models::TenantScanRemoteStorageShard;
use pageserver_api::models::TenantShardLocation;
use pageserver_api::models::TenantShardSplitRequest;
use pageserver_api::models::TenantShardSplitResponse;
use pageserver_api::models::TenantSorting;
use pageserver_api::models::TenantState;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::TopTenantShardsRequest;
use pageserver_api::models::TopTenantShardsResponse;
use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
@@ -110,7 +104,7 @@ pub struct State {
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
@@ -124,7 +118,7 @@ impl State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
@@ -819,6 +813,12 @@ async fn tenant_attach_handler(
let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?;
if state.remote_storage.is_none() {
return Err(ApiError::BadRequest(anyhow!(
"attach_tenant is not possible because pageserver was configured without remote storage"
)));
}
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let shard_params = ShardParameters::default();
let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params);
@@ -1643,6 +1643,12 @@ async fn tenant_time_travel_remote_storage_handler(
)));
}
let Some(storage) = state.remote_storage.as_ref() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run time travel"
)));
};
if timestamp > done_if_after {
return Err(ApiError::BadRequest(anyhow!(
"The done_if_after timestamp comes before the timestamp to recover to"
@@ -1652,7 +1658,7 @@ async fn tenant_time_travel_remote_storage_handler(
tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}");
remote_timeline_client::upload::time_travel_recover_tenant(
&state.remote_storage,
storage,
&tenant_shard_id,
timestamp,
done_if_after,
@@ -1709,7 +1715,12 @@ async fn timeline_gc_handler(
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let gc_result = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?;
let wait_task_done = mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx)?;
let gc_result = wait_task_done
.await
.context("wait for gc task")
.map_err(ApiError::InternalServerError)?
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, gc_result)
}
@@ -1897,6 +1908,11 @@ async fn deletion_queue_flush(
) -> Result<Response<Body>, ApiError> {
let state = get_state(&r);
if state.remote_storage.is_none() {
// Nothing to do if remote storage is disabled.
return json_response(StatusCode::OK, ());
}
let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
let flush = async {
@@ -2061,11 +2077,18 @@ async fn disk_usage_eviction_run(
};
let state = get_state(&r);
let Some(storage) = state.remote_storage.as_ref() else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"remote storage not configured, cannot run eviction iteration"
)));
};
let eviction_state = state.disk_usage_eviction_state.clone();
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&eviction_state,
&state.remote_storage,
storage,
usage,
&state.tenant_manager,
config.eviction_order,
@@ -2102,23 +2125,29 @@ async fn tenant_scan_remote_handler(
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let Some(remote_storage) = state.remote_storage.as_ref() else {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Remote storage not configured"
)));
};
let mut response = TenantScanRemoteStorageResponse::default();
let (shards, _other_keys) =
list_remote_tenant_shards(&state.remote_storage, tenant_id, cancel.clone())
list_remote_tenant_shards(remote_storage, tenant_id, cancel.clone())
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
for tenant_shard_id in shards {
let (timeline_ids, _other_keys) =
list_remote_timelines(&state.remote_storage, tenant_shard_id, cancel.clone())
list_remote_timelines(remote_storage, tenant_shard_id, cancel.clone())
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
let mut generation = Generation::none();
for timeline_id in timeline_ids {
match download_index_part(
&state.remote_storage,
remote_storage,
&tenant_shard_id,
&timeline_id,
Generation::MAX,
@@ -2329,97 +2358,6 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError)
}
/// Report on the largest tenants on this pageserver, for the storage controller to identify
/// candidates for splitting
async fn post_top_tenants(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let request: TopTenantShardsRequest = json_request(&mut r).await?;
let state = get_state(&r);
fn get_size_metric(sizes: &TopTenantShardItem, order_by: &TenantSorting) -> u64 {
match order_by {
TenantSorting::ResidentSize => sizes.resident_size,
TenantSorting::MaxLogicalSize => sizes.max_logical_size,
}
}
#[derive(Eq, PartialEq)]
struct HeapItem {
metric: u64,
sizes: TopTenantShardItem,
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// Heap items have reverse ordering on their metric: this enables using BinaryHeap, which
/// supports popping the greatest item but not the smallest.
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
Reverse(self.metric).cmp(&Reverse(other.metric))
}
}
let mut top_n: BinaryHeap<HeapItem> = BinaryHeap::with_capacity(request.limit);
// FIXME: this is a lot of clones to take this tenant list
for (tenant_shard_id, tenant_slot) in state.tenant_manager.list() {
if let Some(shards_lt) = request.where_shards_lt {
// Ignore tenants which already have >= this many shards
if tenant_shard_id.shard_count >= shards_lt {
continue;
}
}
let sizes = match tenant_slot {
TenantSlot::Attached(tenant) => tenant.get_sizes(),
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
continue;
}
};
let metric = get_size_metric(&sizes, &request.order_by);
if let Some(gt) = request.where_gt {
// Ignore tenants whose metric is <= the lower size threshold, to do less sorting work
if metric <= gt {
continue;
}
};
match top_n.peek() {
None => {
// Top N list is empty: candidate becomes first member
top_n.push(HeapItem { metric, sizes });
}
Some(i) if i.metric > metric && top_n.len() < request.limit => {
// Lowest item in list is greater than our candidate, but we aren't at limit yet: push to end
top_n.push(HeapItem { metric, sizes });
}
Some(i) if i.metric > metric => {
// List is at limit and lowest value is greater than our candidate, drop it.
}
Some(_) => top_n.push(HeapItem { metric, sizes }),
}
while top_n.len() > request.limit {
top_n.pop();
}
}
json_response(
StatusCode::OK,
TopTenantShardsResponse {
shards: top_n.into_iter().map(|i| i.sizes).collect(),
},
)
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -2706,6 +2644,5 @@ pub fn make_router(
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.post("/v1/top_tenants", |r| api_handler(r, post_top_tenants))
.any(handler_404))
}

View File

@@ -57,7 +57,7 @@ pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(
tenant_manager: &TenantManager,
mut deletion_queue: DeletionQueue,
deletion_queue: Option<DeletionQueue>,
exit_code: i32,
) {
use std::time::Duration;
@@ -89,7 +89,9 @@ pub async fn shutdown_pageserver(
.await;
// Best effort to persist any outstanding deletions, to avoid leaking objects
deletion_queue.shutdown(Duration::from_secs(5)).await;
if let Some(mut deletion_queue) = deletion_queue {
deletion_queue.shutdown(Duration::from_secs(5)).await;
}
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
@@ -112,6 +114,10 @@ pub async fn shutdown_pageserver(
std::process::exit(exit_code);
}
/// The name of the metadata file pageserver creates per timeline.
/// Full path: `tenants/<tenant_id>/timelines/<timeline_id>/metadata`.
pub const METADATA_FILE_NAME: &str = "metadata";
/// Per-tenant configuration file.
/// Full path: `tenants/<tenant_id>/config`.
pub(crate) const TENANT_CONFIG_NAME: &str = "config";

View File

@@ -585,15 +585,6 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define current logical size metric")
});
static AUX_FILE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_aux_file_estimated_size",
"The size of all aux files for a timeline in aux file v2 store.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) mod initial_logical_size {
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
@@ -1999,6 +1990,29 @@ impl Default for WalRedoProcessCounters {
pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
Lazy::new(WalRedoProcessCounters::default);
#[cfg(not(test))]
pub mod wal_redo {
use super::*;
static PROCESS_KIND: Lazy<std::sync::Mutex<UIntGaugeVec>> = Lazy::new(|| {
std::sync::Mutex::new(
register_uint_gauge_vec!(
"pageserver_wal_redo_process_kind",
"The configured process kind for walredo",
&["kind"],
)
.unwrap(),
)
});
pub fn set_process_kind_metric(kind: crate::walredo::ProcessKind) {
// use guard to avoid races around the next two steps
let guard = PROCESS_KIND.lock().unwrap();
guard.reset();
guard.with_label_values(&[&format!("{kind}")]).set(1);
}
}
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
pub(crate) struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,
@@ -2098,10 +2112,9 @@ pub(crate) struct TimelineMetrics {
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
pub directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>>,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
@@ -2174,9 +2187,6 @@ impl TimelineMetrics {
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let aux_file_size_gauge = AUX_FILE_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
// TODO use impl Trait syntax here once we have ability to use it: https://github.com/rust-lang/rust/issues/63065
let directory_entries_count_gauge_closure = {
let tenant_shard_id = *tenant_shard_id;
@@ -2214,7 +2224,6 @@ impl TimelineMetrics {
last_record_gauge,
resident_physical_size_gauge,
current_logical_size_gauge,
aux_file_size_gauge,
directory_entries_count_gauge,
evictions,
evictions_with_low_residence_duration: std::sync::RwLock::new(
@@ -2255,7 +2264,6 @@ impl TimelineMetrics {
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = EVICTIONS.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = AUX_FILE_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
@@ -2312,7 +2320,6 @@ use pin_project_lite::pin_project;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
@@ -2322,35 +2329,35 @@ use crate::task_mgr::TaskKind;
use crate::tenant::mgr::TenantSlot;
/// Maintain a per timeline gauge in addition to the global gauge.
pub(crate) struct PerTimelineRemotePhysicalSizeGauge {
last_set: AtomicU64,
struct PerTimelineRemotePhysicalSizeGauge {
last_set: u64,
gauge: UIntGauge,
}
impl PerTimelineRemotePhysicalSizeGauge {
fn new(per_timeline_gauge: UIntGauge) -> Self {
Self {
last_set: AtomicU64::new(0),
last_set: per_timeline_gauge.get(),
gauge: per_timeline_gauge,
}
}
pub(crate) fn set(&self, sz: u64) {
fn set(&mut self, sz: u64) {
self.gauge.set(sz);
let prev = self.last_set.swap(sz, std::sync::atomic::Ordering::Relaxed);
if sz < prev {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(prev - sz);
if sz < self.last_set {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set - sz);
} else {
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - prev);
REMOTE_PHYSICAL_SIZE_GLOBAL.add(sz - self.last_set);
};
self.last_set = sz;
}
pub(crate) fn get(&self) -> u64 {
fn get(&self) -> u64 {
self.gauge.get()
}
}
impl Drop for PerTimelineRemotePhysicalSizeGauge {
fn drop(&mut self) {
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set.load(std::sync::atomic::Ordering::Relaxed));
REMOTE_PHYSICAL_SIZE_GLOBAL.sub(self.last_set);
}
}
@@ -2358,7 +2365,7 @@ pub(crate) struct RemoteTimelineClientMetrics {
tenant_id: String,
shard_id: String,
timeline_id: String,
pub(crate) remote_physical_size_gauge: PerTimelineRemotePhysicalSizeGauge,
remote_physical_size_gauge: Mutex<Option<PerTimelineRemotePhysicalSizeGauge>>,
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
@@ -2366,27 +2373,38 @@ pub(crate) struct RemoteTimelineClientMetrics {
impl RemoteTimelineClientMetrics {
pub fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id_str = tenant_shard_id.tenant_id.to_string();
let shard_id_str = format!("{}", tenant_shard_id.shard_slug());
let timeline_id_str = timeline_id.to_string();
let remote_physical_size_gauge = PerTimelineRemotePhysicalSizeGauge::new(
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id_str, &shard_id_str, &timeline_id_str])
.unwrap(),
);
RemoteTimelineClientMetrics {
tenant_id: tenant_id_str,
shard_id: shard_id_str,
timeline_id: timeline_id_str,
tenant_id: tenant_shard_id.tenant_id.to_string(),
shard_id: format!("{}", tenant_shard_id.shard_slug()),
timeline_id: timeline_id.to_string(),
calls: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge,
remote_physical_size_gauge: Mutex::new(None),
}
}
pub(crate) fn remote_physical_size_set(&self, sz: u64) {
let mut guard = self.remote_physical_size_gauge.lock().unwrap();
let gauge = guard.get_or_insert_with(|| {
PerTimelineRemotePhysicalSizeGauge::new(
REMOTE_PHYSICAL_SIZE
.get_metric_with_label_values(&[
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.unwrap(),
)
});
gauge.set(sz);
}
pub(crate) fn remote_physical_size_get(&self) -> u64 {
let guard = self.remote_physical_size_gauge.lock().unwrap();
guard.as_ref().map(|gauge| gauge.get()).unwrap_or(0)
}
pub fn remote_operation_time(
&self,
file_kind: &RemoteOpFileKind,

View File

@@ -32,7 +32,6 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
@@ -50,6 +49,7 @@ use utils::{
use crate::auth::check_permission;
use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
@@ -59,15 +59,13 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::GetTenantError;
use crate::tenant::mgr::ShardResolveResult;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::mgr::TenantManager;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Tenant;
use crate::tenant::Timeline;
use crate::trace::Tracer;
use pageserver_api::key::rel_block_to_key;
@@ -137,7 +135,7 @@ async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()
/// Listens for connections, and launches a new handler task for each.
///
pub async fn libpq_listener_main(
tenant_manager: Arc<TenantManager>,
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
listener: TcpListener,
@@ -182,7 +180,7 @@ pub async fn libpq_listener_main(
"serving compute connection task",
false,
page_service_conn_main(
tenant_manager.clone(),
conf,
broker_client.clone(),
local_auth,
socket,
@@ -205,7 +203,7 @@ pub async fn libpq_listener_main(
#[instrument(skip_all, fields(peer_addr))]
async fn page_service_conn_main(
tenant_manager: Arc<TenantManager>,
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
@@ -262,8 +260,7 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler =
PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx);
let mut conn_handler = PageServerHandler::new(conf, broker_client, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
@@ -294,12 +291,11 @@ struct HandlerTimeline {
}
struct PageServerHandler {
_conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
tenant_manager: Arc<TenantManager>,
/// The context created for the lifetime of the connection
/// services by this PageServerHandler.
/// For each query received over the connection,
@@ -385,13 +381,13 @@ impl From<WaitLsnError> for QueryError {
impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
tenant_manager,
_conf: conf,
broker_client,
auth,
claims: None,
@@ -556,9 +552,13 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let tenant = self
.get_active_tenant_with_timeout(tenant_id, ShardSelector::First, ACTIVE_TENANT_TIMEOUT)
.await?;
let tenant = mgr::get_active_tenant_with_timeout(
tenant_id,
ShardSelector::First,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
// Make request tracer if needed
let mut tracer = if tenant.get_trace_read_requests() {
@@ -726,9 +726,13 @@ impl PageServerHandler {
// Create empty timeline
info!("creating new timeline");
let tenant = self
.get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT)
.await?;
let tenant = get_active_tenant_with_timeout(
tenant_id,
ShardSelector::Zero,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
@@ -1366,69 +1370,18 @@ impl PageServerHandler {
timeline_id: TimelineId,
selector: ShardSelector,
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
let tenant = self
.get_active_tenant_with_timeout(tenant_id, selector, ACTIVE_TENANT_TIMEOUT)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let tenant = get_active_tenant_with_timeout(
tenant_id,
selector,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await
.map_err(GetActiveTimelineError::Tenant)?;
let timeline = tenant.get_timeline(timeline_id, true)?;
set_tracing_field_shard_id(&timeline);
Ok(timeline)
}
/// Get a shard's [`Tenant`] in its active state, if present. If we don't find the shard and some
/// slots for this tenant are `InProgress` then we will wait.
/// If we find the [`Tenant`] and it's not yet in state [`TenantState::Active`], we will wait.
///
/// `timeout` is used as a total timeout for the whole wait operation.
async fn get_active_tenant_with_timeout(
&self,
tenant_id: TenantId,
shard_selector: ShardSelector,
timeout: Duration,
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let wait_start = Instant::now();
let deadline = wait_start + timeout;
// Resolve TenantId to TenantShardId. This is usually a quick one-shot thing, the loop is
// for handling the rare case that the slot we're accessing is InProgress.
let tenant_shard = loop {
let resolved = self
.tenant_manager
.resolve_attached_shard(&tenant_id, shard_selector);
match resolved {
ShardResolveResult::Found(tenant_shard) => break tenant_shard,
ShardResolveResult::NotFound => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
)));
}
ShardResolveResult::InProgress(barrier) => {
// We can't authoritatively answer right now: wait for InProgress state
// to end, then try again
tokio::select! {
_ = self.await_connection_cancelled() => {
return Err(GetActiveTenantError::Cancelled)
},
_ = barrier.wait() => {
// The barrier completed: proceed around the loop to try looking up again
},
_ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
return Err(GetActiveTenantError::WaitForActiveTimeout {
latest_state: None,
wait_time: timeout,
});
}
}
}
};
};
tracing::debug!("Waiting for tenant to enter active state...");
tenant_shard
.wait_to_become_active(deadline.duration_since(Instant::now()))
.await?;
Ok(tenant_shard)
}
}
#[async_trait::async_trait]
@@ -1818,13 +1771,13 @@ where
self.check_permission(Some(tenant_id))?;
let tenant = self
.get_active_tenant_with_timeout(
tenant_id,
ShardSelector::Zero,
ACTIVE_TENANT_TIMEOUT,
)
.await?;
let tenant = get_active_tenant_with_timeout(
tenant_id,
ShardSelector::Zero,
ACTIVE_TENANT_TIMEOUT,
&task_mgr::shutdown_token(),
)
.await?;
pgb.write_message_noflush(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),

View File

@@ -699,17 +699,13 @@ impl Timeline {
.await
.context("scan")?;
let mut result = HashMap::new();
let mut sz = 0;
for (_, v) in kv {
let v = v.context("get value")?;
let v = aux_file::decode_file_value_bytes(&v).context("value decode")?;
for (fname, content) in v {
sz += fname.len();
sz += content.len();
result.insert(fname, content);
}
}
self.aux_file_size_estimator.on_base_backup(sz);
Ok(result)
}
@@ -1478,45 +1474,23 @@ impl<'a> DatadirModification<'a> {
Err(PageReconstructError::MissingKey(_)) => None,
Err(e) => return Err(e.into()),
};
let files: Vec<(&str, &[u8])> = if let Some(ref old_val) = old_val {
let files = if let Some(ref old_val) = old_val {
aux_file::decode_file_value(old_val)?
} else {
Vec::new()
};
let mut other_files = Vec::with_capacity(files.len());
let mut modifying_file = None;
for file @ (p, content) in files {
if path == p {
assert!(
modifying_file.is_none(),
"duplicated entries found for {}",
path
);
modifying_file = Some(content);
} else {
other_files.push(file);
}
}
let mut new_files = other_files;
match (modifying_file, content.is_empty()) {
(Some(old_content), false) => {
self.tline
.aux_file_size_estimator
.on_update(old_content.len(), content.len());
new_files.push((path, content));
}
(Some(old_content), true) => {
self.tline
.aux_file_size_estimator
.on_remove(old_content.len());
// not adding the file key to the final `new_files` vec.
}
(None, false) => {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
(None, true) => anyhow::bail!("removing non-existing aux file: {}", path),
}
let new_files = if content.is_empty() {
files
.into_iter()
.filter(|(p, _)| &path != p)
.collect::<Vec<_>>()
} else {
files
.into_iter()
.filter(|(p, _)| &path != p)
.chain(std::iter::once((path, content)))
.collect::<Vec<_>>()
};
let new_val = aux_file::encode_file_value(&new_files)?;
self.put(key, Value::Image(new_val.into()));
}
@@ -1697,7 +1671,7 @@ impl<'a> DatadirModification<'a> {
}
if !self.pending_deletions.is_empty() {
writer.delete_batch(&self.pending_deletions, ctx).await?;
writer.delete_batch(&self.pending_deletions).await?;
self.pending_deletions.clear();
}

View File

@@ -21,7 +21,6 @@ use futures::FutureExt;
use futures::StreamExt;
use pageserver_api::models;
use pageserver_api::models::TimelineState;
use pageserver_api::models::TopTenantShardItem;
use pageserver_api::models::WalRedoManagerStatus;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::shard::ShardStripeSize;
@@ -191,7 +190,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
#[derive(Clone)]
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: GenericRemoteStorage,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
}
@@ -293,7 +292,7 @@ pub struct Tenant {
walredo_mgr: Option<Arc<WalRedoManager>>,
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: GenericRemoteStorage,
pub(crate) remote_storage: Option<GenericRemoteStorage>,
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: DeletionQueueClient,
@@ -552,22 +551,21 @@ impl Tenant {
);
if let Some(index_part) = index_part.as_ref() {
timeline.remote_client.init_upload_queue(index_part)?;
} else {
timeline
.remote_client
.as_ref()
.unwrap()
.init_upload_queue(index_part)?;
} else if self.remote_storage.is_some() {
// No data on the remote storage, but we have local metadata file. We can end up
// here with timeline_create being interrupted before finishing index part upload.
// By doing what we do here, the index part upload is retried.
// If control plane retries timeline creation in the meantime, the mgmt API handler
// for timeline creation will coalesce on the upload we queue here.
// FIXME: this branch should be dead code as we no longer write local metadata.
timeline
.remote_client
.init_upload_queue_for_empty_remote(&metadata)?;
timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)?;
let rtc = timeline.remote_client.as_ref().unwrap();
rtc.init_upload_queue_for_empty_remote(&metadata)?;
rtc.schedule_index_upload_for_full_metadata_update(&metadata)?;
}
timeline
@@ -779,14 +777,14 @@ impl Tenant {
AttachType::Normal
};
let preload = match &mode {
SpawnMode::Create => {
let preload = match (&mode, &remote_storage) {
(SpawnMode::Create, _) => {
None
},
SpawnMode::Eager | SpawnMode::Lazy => {
(SpawnMode::Eager | SpawnMode::Lazy, Some(remote_storage)) => {
let _preload_timer = TENANT.preload.start_timer();
let res = tenant_clone
.preload(&remote_storage, task_mgr::shutdown_token())
.preload(remote_storage, task_mgr::shutdown_token())
.await;
match res {
Ok(p) => Some(p),
@@ -796,7 +794,10 @@ impl Tenant {
}
}
}
(_, None) => {
let _preload_timer = TENANT.preload.start_timer();
None
}
};
// Remote preload is complete.
@@ -1020,7 +1021,7 @@ impl Tenant {
index_part,
remote_metadata,
TimelineResources {
remote_client,
remote_client: Some(remote_client),
deletion_queue_client: self.deletion_queue_client.clone(),
timeline_get_throttle: self.timeline_get_throttle.clone(),
},
@@ -1046,7 +1047,7 @@ impl Tenant {
Arc::clone(self),
timeline_id,
&index_part.metadata,
remote_timeline_client,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
)
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
@@ -1138,7 +1139,9 @@ impl Tenant {
let mut size = 0;
for timeline in self.list_timelines() {
size += timeline.remote_client.get_remote_physical_size();
if let Some(remote_client) = &timeline.remote_client {
size += remote_client.get_remote_physical_size();
}
}
size
@@ -1188,7 +1191,6 @@ impl Tenant {
pub fn create_broken_tenant(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
remote_storage: GenericRemoteStorage,
reason: String,
) -> Arc<Tenant> {
Arc::new(Tenant::new(
@@ -1203,7 +1205,7 @@ impl Tenant {
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
None,
tenant_shard_id,
remote_storage,
None,
DeletionQueueClient::broken(),
))
}
@@ -1396,7 +1398,13 @@ impl Tenant {
tline.freeze_and_flush().await.context("freeze_and_flush")?;
// Make sure the freeze_and_flush reaches remote storage.
tline.remote_client.wait_completion().await.unwrap();
tline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();
let tl = uninit_tl.finish_creation()?;
// The non-test code would call tl.activate() here.
@@ -1462,19 +1470,20 @@ impl Tenant {
return Err(CreateTimelineError::Conflict);
}
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
existing
.remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
if let Some(remote_client) = existing.remote_client.as_ref() {
// Wait for uploads to complete, so that when we return Ok, the timeline
// is known to be durable on remote storage. Just like we do at the end of
// this function, after we have created the timeline ourselves.
//
// We only really care that the initial version of `index_part.json` has
// been uploaded. That's enough to remember that the timeline
// exists. However, there is no function to wait specifically for that so
// we just wait for all in-progress uploads to finish.
remote_client
.wait_completion()
.await
.context("wait for timeline uploads to complete")?;
}
return Ok(existing);
}
@@ -1550,14 +1559,14 @@ impl Tenant {
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
// not send a success to the caller until it is. The same applies to handling retries,
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
loaded_timeline
.remote_client
.wait_completion()
.await
.with_context(|| format!("wait for {} timeline initial uploads to complete", kind))?;
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
let kind = ancestor_timeline_id
.map(|_| "branched")
.unwrap_or("bootstrapped");
remote_client.wait_completion().await.with_context(|| {
format!("wait for {} timeline initial uploads to complete", kind)
})?;
}
loaded_timeline.activate(self.clone(), broker_client, None, ctx);
@@ -2152,26 +2161,32 @@ impl Tenant {
) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for timeline in timelines.values() {
let Some(tl_client) = &timeline.remote_client else {
anyhow::bail!("Remote storage is mandatory");
};
let Some(remote_storage) = &self.remote_storage else {
anyhow::bail!("Remote storage is mandatory");
};
// We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels
// to ensure that they do not start a split if currently in the process of doing these.
// Upload an index from the parent: this is partly to provide freshness for the
// child tenants that will copy it, and partly for general ease-of-debugging: there will
// always be a parent shard index in the same generation as we wrote the child shard index.
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
tl_client.schedule_index_upload_for_file_changes()?;
tl_client.wait_completion().await?;
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
timeline.remote_client.shutdown().await;
tl_client.shutdown().await;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this
// operation is rare, so it's simpler to just download it (and robustly guarantees that the index
// we use here really is the remotely persistent one).
let result = timeline.remote_client
let result = tl_client
.download_index_file(&self.cancel)
.instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))
.await?;
@@ -2184,7 +2199,7 @@ impl Tenant {
for child_shard in child_shards {
upload_index_part(
&self.remote_storage,
remote_storage,
child_shard,
&timeline.timeline_id,
self.generation,
@@ -2197,31 +2212,6 @@ impl Tenant {
Ok(())
}
pub(crate) fn get_sizes(&self) -> TopTenantShardItem {
let mut result = TopTenantShardItem {
id: self.tenant_shard_id,
resident_size: 0,
physical_size: 0,
max_logical_size: 0,
};
for timeline in self.timelines.lock().unwrap().values() {
result.resident_size += timeline.metrics.resident_physical_size_gauge.get();
result.physical_size += timeline
.remote_client
.metrics
.remote_physical_size_gauge
.get();
result.max_logical_size = std::cmp::max(
result.max_logical_size,
timeline.metrics.current_logical_size_gauge.get(),
);
}
result
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
@@ -2485,7 +2475,7 @@ impl Tenant {
shard_identity: ShardIdentity,
walredo_mgr: Option<Arc<WalRedoManager>>,
tenant_shard_id: TenantShardId,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
let (state, mut rx) = watch::channel(state);
@@ -2810,7 +2800,7 @@ impl Tenant {
// See comments in [`Tenant::branch_timeline`] for more information about why branch
// creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if cancel.is_cancelled() {
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
// We were requested to shut down. Stop and return with the progress we
// made.
break;
@@ -3129,10 +3119,11 @@ impl Tenant {
// We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
// could get incorrect information and remove more layers, than needed.
// See also https://github.com/neondatabase/neon/issues/3865
new_timeline
.remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)
.context("branch initial metadata upload")?;
if let Some(remote_client) = new_timeline.remote_client.as_ref() {
remote_client
.schedule_index_upload_for_full_metadata_update(&metadata)
.context("branch initial metadata upload")?;
}
Ok(new_timeline)
}
@@ -3164,6 +3155,11 @@ impl Tenant {
pgdata_path: &Utf8PathBuf,
timeline_id: &TimelineId,
) -> anyhow::Result<()> {
let Some(storage) = &self.remote_storage else {
// No remote storage? No upload.
return Ok(());
};
let temp_path = timelines_path.join(format!(
"{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
));
@@ -3187,7 +3183,7 @@ impl Tenant {
backoff::retry(
|| async {
self::remote_timeline_client::upload_initdb_dir(
&self.remote_storage,
storage,
&self.tenant_shard_id.tenant_id,
timeline_id,
pgdata_zstd.try_clone().await?,
@@ -3244,6 +3240,9 @@ impl Tenant {
}
}
if let Some(existing_initdb_timeline_id) = load_existing_initdb {
let Some(storage) = &self.remote_storage else {
bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
};
if existing_initdb_timeline_id != timeline_id {
let source_path = &remote_initdb_archive_path(
&self.tenant_shard_id.tenant_id,
@@ -3253,7 +3252,7 @@ impl Tenant {
&remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
// if this fails, it will get retried by retried control plane requests
self.remote_storage
storage
.copy_object(source_path, dest_path, &self.cancel)
.await
.context("copy initdb tar")?;
@@ -3261,7 +3260,7 @@ impl Tenant {
let (initdb_tar_zst_path, initdb_tar_zst) =
self::remote_timeline_client::download_initdb_tar_zst(
self.conf,
&self.remote_storage,
storage,
&self.tenant_shard_id,
&existing_initdb_timeline_id,
&self.cancel,
@@ -3356,14 +3355,20 @@ impl Tenant {
/// Call this before constructing a timeline, to build its required structures
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
let remote_client = RemoteTimelineClient::new(
self.remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
let remote_client = RemoteTimelineClient::new(
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
self.tenant_shard_id,
timeline_id,
self.generation,
);
Some(remote_client)
} else {
None
};
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
@@ -3387,9 +3392,9 @@ impl Tenant {
let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id);
resources
.remote_client
.init_upload_queue_for_empty_remote(new_metadata)?;
if let Some(remote_client) = &resources.remote_client {
remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
}
let timeline_struct = self
.create_timeline_struct(
@@ -3557,7 +3562,9 @@ impl Tenant {
tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
timeline.freeze_and_flush().await?;
tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
timeline.remote_client.wait_completion().await?;
if let Some(client) = &timeline.remote_client {
client.wait_completion().await?;
}
Ok(())
}
@@ -3871,7 +3878,7 @@ pub(crate) mod harness {
ShardIdentity::unsharded(),
Some(walredo_mgr),
self.tenant_shard_id,
self.remote_storage.clone(),
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),
));

View File

@@ -299,7 +299,7 @@ mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), &ctx).await?;
let file = VirtualFile::create(pathbuf.as_path()).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = wtr.write_blob(blob.clone(), &ctx).await;
@@ -314,7 +314,7 @@ mod tests {
wtr.flush_buffer(&ctx).await?;
}
let file = VirtualFile::open(pathbuf.as_path(), &ctx).await?;
let file = VirtualFile::open(pathbuf.as_path()).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -102,7 +102,7 @@ impl<'a> BlockReaderRef<'a> {
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum, ctx).await,
VirtualFile(r) => r.read_blk(blknum).await,
}
}
}
@@ -177,11 +177,10 @@ impl<'a> FileBlockReader<'a> {
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64, ctx)
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
@@ -207,7 +206,7 @@ impl<'a> FileBlockReader<'a> {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
let write_guard = self.fill_buffer(write_guard, blknum, ctx).await?;
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}

View File

@@ -181,23 +181,25 @@ async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), Del
async fn remove_tenant_remote_delete_mark(
conf: &PageServerConf,
remote_storage: &GenericRemoteStorage,
remote_storage: Option<&GenericRemoteStorage>,
tenant_shard_id: &TenantShardId,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path, cancel).await },
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?;
if let Some(remote_storage) = remote_storage {
let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path, cancel).await },
TimeoutOrCancel::caused_by_cancel,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
cancel,
)
.await
.ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?;
}
Ok(())
}
@@ -295,7 +297,7 @@ impl DeleteTenantFlow {
#[instrument(skip_all)]
pub(crate) async fn run(
conf: &'static PageServerConf,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
cancel: &CancellationToken,
@@ -306,7 +308,9 @@ impl DeleteTenantFlow {
let mut guard = Self::prepare(&tenant).await?;
if let Err(e) = Self::run_inner(&mut guard, conf, &remote_storage, &tenant, cancel).await {
if let Err(e) =
Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant, cancel).await
{
tenant.set_broken(format!("{e:#}")).await;
return Err(e);
}
@@ -323,7 +327,7 @@ impl DeleteTenantFlow {
async fn run_inner(
guard: &mut OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: &GenericRemoteStorage,
remote_storage: Option<&GenericRemoteStorage>,
tenant: &Tenant,
cancel: &CancellationToken,
) -> Result<(), DeleteTenantError> {
@@ -335,9 +339,14 @@ impl DeleteTenantFlow {
))?
});
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel)
.await
.context("remote_mark")?;
// IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend.
// Though sounds scary, different mark name?
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
if let Some(remote_storage) = &remote_storage {
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel)
.await
.context("remote_mark")?
}
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
Err(anyhow::anyhow!(
@@ -474,7 +483,7 @@ impl DeleteTenantFlow {
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) {
@@ -503,7 +512,7 @@ impl DeleteTenantFlow {
async fn background(
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
@@ -542,7 +551,7 @@ impl DeleteTenantFlow {
remove_tenant_remote_delete_mark(
conf,
&remote_storage,
remote_storage.as_ref(),
&tenant.tenant_shard_id,
&task_mgr::shutdown_token(),
)

View File

@@ -28,7 +28,6 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
ctx: &RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
@@ -46,7 +45,6 @@ impl EphemeralFile {
.read(true)
.write(true)
.create(true),
ctx,
)
.await?;
@@ -155,7 +153,7 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(

View File

@@ -78,7 +78,7 @@ impl RW {
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = writer
.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64, ctx)
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));

View File

@@ -16,9 +16,10 @@ use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use sysinfo::SystemExt;
use tokio::fs;
use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
use anyhow::Context;
use once_cell::sync::Lazy;
@@ -46,7 +47,7 @@ use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
use utils::crashsafe::path_with_suffix_extension;
use utils::fs_ext::PathExt;
@@ -118,7 +119,6 @@ pub(crate) enum TenantsMapRemoveResult {
/// When resolving a TenantId to a shard, we may be looking for the 0th
/// shard, or we might be looking for whichever shard holds a particular page.
#[derive(Copy, Clone)]
pub(crate) enum ShardSelector {
/// Only return the 0th shard, if it is present. If a non-0th shard is present,
/// ignore it.
@@ -169,14 +169,6 @@ impl TenantStartupMode {
}
}
/// Result type for looking up a TenantId to a specific shard
pub(crate) enum ShardResolveResult {
NotFound,
Found(Arc<Tenant>),
// Wait for this barrrier, then query again
InProgress(utils::completion::Barrier),
}
impl TenantsMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
@@ -190,6 +182,51 @@ impl TenantsMap {
}
}
/// A page service client sends a TenantId, and to look up the correct Tenant we must
/// resolve this to a fully qualified TenantShardId.
fn resolve_attached_shard(
&self,
tenant_id: &TenantId,
selector: ShardSelector,
) -> Option<TenantShardId> {
let mut want_shard = None;
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
// Ignore all slots that don't contain an attached tenant
let tenant = match &slot.1 {
TenantSlot::Attached(t) => t,
_ => continue,
};
match selector {
ShardSelector::First => return Some(*slot.0),
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
return Some(*slot.0)
}
ShardSelector::Page(key) => {
// First slot we see for this tenant, calculate the expected shard number
// for the key: we will use this for checking if this and subsequent
// slots contain the key, rather than recalculating the hash each time.
if want_shard.is_none() {
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
}
if Some(tenant.shard_identity.number) == want_shard {
return Some(*slot.0);
}
}
_ => continue,
}
}
// Fall through: we didn't find an acceptable shard
None
}
}
}
/// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
///
/// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
@@ -354,17 +391,22 @@ async fn init_load_generations(
// deletion list entries may still be valid. We provide that by pushing a recovery operation into
// the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
// are processed, even though we don't block on recovery completing here.
let attached_tenants = generations
.iter()
.flat_map(|(id, start_mode)| {
match start_mode {
TenantStartupMode::Attached((_mode, generation)) => Some(generation),
TenantStartupMode::Secondary => None,
}
.map(|gen| (*id, *gen))
})
.collect();
resources.deletion_queue_client.recover(attached_tenants)?;
//
// Must only do this if remote storage is enabled, otherwise deletion queue
// is not running and channel push will fail.
if resources.remote_storage.is_some() {
let attached_tenants = generations
.iter()
.flat_map(|(id, start_mode)| {
match start_mode {
TenantStartupMode::Attached((_mode, generation)) => Some(generation),
TenantStartupMode::Secondary => None,
}
.map(|gen| (*id, *gen))
})
.collect();
resources.deletion_queue_client.recover(attached_tenants)?;
}
Ok(Some(generations))
}
@@ -418,6 +460,53 @@ fn load_tenant_config(
}
};
// Clean up legacy `metadata` files.
// Doing it here because every single tenant directory is visited here.
// In any later code, there's different treatment of tenant dirs
// ... depending on whether the tenant is in re-attach response or not
// ... epending on whether the tenant is ignored or not
assert_eq!(
&conf.tenant_path(&tenant_shard_id),
&tenant_dir_path,
"later use of conf....path() methods would be dubious"
);
let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
Ok(iter) => {
let mut timelines = Vec::new();
for res in iter {
let p = res?;
let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
// skip any entries that aren't TimelineId, such as
// - *.___temp dirs
// - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
continue;
};
timelines.push(timeline_id);
}
timelines
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
Err(e) => return Err(anyhow::anyhow!(e)),
};
for timeline_id in timelines {
let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
let metadata_path = timeline_path.join(METADATA_FILE_NAME);
match std::fs::remove_file(&metadata_path) {
Ok(()) => {
crashsafe::fsync(timeline_path)
.context("fsync timeline dir after removing legacy metadata file")?;
info!("removed legacy metadata file at {metadata_path}");
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// something removed the file earlier, or it was never there
// We don't care, this software version doesn't write it again, so, we're good.
}
Err(e) => {
anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
}
}
}
let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
if tenant_ignore_mark_file.exists() {
info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
@@ -522,7 +611,6 @@ pub async fn init_tenant_mgr(
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_shard_id,
resources.remote_storage.clone(),
format!("{}", e),
)),
);
@@ -715,7 +803,6 @@ fn tenant_spawn(
"Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
);
let remote_storage = resources.remote_storage.clone();
let tenant = match Tenant::spawn(
conf,
tenant_shard_id,
@@ -730,7 +817,7 @@ fn tenant_spawn(
Ok(tenant) => tenant,
Err(e) => {
error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
Tenant::create_broken_tenant(conf, tenant_shard_id, remote_storage, format!("{e:#}"))
Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
}
};
@@ -2016,72 +2103,6 @@ impl TenantManager {
Ok(reparented)
}
/// A page service client sends a TenantId, and to look up the correct Tenant we must
/// resolve this to a fully qualified TenantShardId.
///
/// During shard splits: we shall see parent shards in InProgress state and skip them, and
/// instead match on child shards which should appear in Attached state. Very early in a shard
/// split, or in other cases where a shard is InProgress, we will return our own InProgress result
/// to instruct the caller to wait for that to finish before querying again.
pub(crate) fn resolve_attached_shard(
&self,
tenant_id: &TenantId,
selector: ShardSelector,
) -> ShardResolveResult {
let tenants = self.tenants.read().unwrap();
let mut want_shard = None;
let mut any_in_progress = None;
match &*tenants {
TenantsMap::Initializing => ShardResolveResult::NotFound,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
// Ignore all slots that don't contain an attached tenant
let tenant = match &slot.1 {
TenantSlot::Attached(t) => t,
TenantSlot::InProgress(barrier) => {
// We might still find a usable shard, but in case we don't, remember that
// we saw at least one InProgress slot, so that we can distinguish this case
// from a simple NotFound in our return value.
any_in_progress = Some(barrier.clone());
continue;
}
_ => continue,
};
match selector {
ShardSelector::First => return ShardResolveResult::Found(tenant.clone()),
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
return ShardResolveResult::Found(tenant.clone())
}
ShardSelector::Page(key) => {
// First slot we see for this tenant, calculate the expected shard number
// for the key: we will use this for checking if this and subsequent
// slots contain the key, rather than recalculating the hash each time.
if want_shard.is_none() {
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
}
if Some(tenant.shard_identity.number) == want_shard {
return ShardResolveResult::Found(tenant.clone());
}
}
_ => continue,
}
}
// Fall through: we didn't find a slot that was in Attached state & matched our selector. If
// we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise
// this requested shard simply isn't found.
if let Some(barrier) = any_in_progress {
ShardResolveResult::InProgress(barrier)
} else {
ShardResolveResult::NotFound
}
}
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -2130,6 +2151,105 @@ pub(crate) enum GetActiveTenantError {
Broken(String),
}
/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
/// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
/// then wait for up to `timeout` (minus however long we waited for the slot).
pub(crate) async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
shard_selector: ShardSelector,
timeout: Duration,
cancel: &CancellationToken,
) -> Result<Arc<Tenant>, GetActiveTenantError> {
enum WaitFor {
Barrier(utils::completion::Barrier),
Tenant(Arc<Tenant>),
}
let wait_start = Instant::now();
let deadline = wait_start + timeout;
let (wait_for, tenant_shard_id) = {
let locked = TENANTS.read().unwrap();
// Resolve TenantId to TenantShardId
let tenant_shard_id = locked
.resolve_attached_shard(&tenant_id, shard_selector)
.ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
)))?;
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => {
match tenant.current_state() {
TenantState::Active => {
// Fast path: we don't need to do any async waiting.
return Ok(tenant.clone());
}
_ => {
tenant.activate_now();
(WaitFor::Tenant(tenant.clone()), tenant_shard_id)
}
}
}
Some(TenantSlot::Secondary(_)) => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
tenant_shard_id,
)))
}
Some(TenantSlot::InProgress(barrier)) => {
(WaitFor::Barrier(barrier.clone()), tenant_shard_id)
}
None => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
tenant_id,
)))
}
}
};
let tenant = match wait_for {
WaitFor::Barrier(barrier) => {
tracing::debug!("Waiting for tenant InProgress state to pass...");
timeout_cancellable(
deadline.duration_since(Instant::now()),
cancel,
barrier.wait(),
)
.await
.map_err(|e| match e {
TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
latest_state: None,
wait_time: wait_start.elapsed(),
},
TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
})?;
{
let locked = TENANTS.read().unwrap();
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
_ => {
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
tenant_shard_id,
)))
}
}
}
}
WaitFor::Tenant(tenant) => tenant,
};
tracing::debug!("Waiting for tenant to enter active state...");
tenant
.wait_to_become_active(deadline.duration_since(Instant::now()))
.await?;
Ok(tenant)
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum DeleteTimelineError {
#[error("Tenant {0}")]
@@ -2156,7 +2276,7 @@ pub(crate) async fn load_tenant(
tenant_id: TenantId,
generation: Generation,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
@@ -2760,73 +2880,86 @@ use {
utils::http::error::ApiError,
};
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
pub(crate) async fn immediate_gc(
pub(crate) fn immediate_gc(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gc_req: TimelineGcRequest,
cancel: CancellationToken,
ctx: &RequestContext,
) -> Result<GcResult, ApiError> {
let tenant = {
let guard = TENANTS.read().unwrap();
guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?
};
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
let guard = TENANTS.read().unwrap();
let tenant = guard
.get(&tenant_shard_id)
.cloned()
.with_context(|| format!("tenant {tenant_shard_id}"))
.map_err(|e| ApiError::NotFound(e.into()))?;
let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
// Use tenant's pitr setting
let pitr = tenant.get_pitr_interval();
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// Run in task_mgr to avoid race with tenant_detach operation
let ctx: RequestContext =
ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
let span = info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
// TODO: spawning is redundant now, need to hold the gate
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::GarbageCollector,
Some(tenant_shard_id),
Some(timeline_id),
&format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
false,
async move {
fail::fail_point!("immediate_gc_task_pre");
fail::fail_point!("immediate_gc_task_pre");
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[allow(unused_mut)]
let mut result = tenant
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
.await;
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
// better once the types support it.
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
}
tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
}
}
#[cfg(feature = "testing")]
{
// we need to synchronize with drop completion for python tests without polling for
// log messages
if let Ok(result) = result.as_mut() {
let mut js = tokio::task::JoinSet::new();
for layer in std::mem::take(&mut result.doomed_layers) {
js.spawn(layer.wait_drop());
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
tracing::info!(
total = js.len(),
"starting to wait for the gc'd layers to be dropped"
);
while let Some(res) = js.join_next().await {
res.expect("wait_drop should not panic");
match task_done.send(result) {
Ok(_) => (),
Err(result) => error!("failed to send gc result: {result:?}"),
}
Ok(())
}
.instrument(span)
);
let timeline = tenant.get_timeline(timeline_id, false).ok();
let rtc = timeline.as_ref().map(|x| &x.remote_client);
// drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
drop(guard);
if let Some(rtc) = rtc {
// layer drops schedule actions on remote timeline client to actually do the
// deletions; don't care about the shutdown error, just exit fast
drop(rtc.wait_completion().await);
}
}
result.map_err(ApiError::InternalServerError)
Ok(wait_task_done)
}
#[cfg(test)]

View File

@@ -317,7 +317,7 @@ pub struct RemoteTimelineClient {
upload_queue: Mutex<UploadQueue>,
pub(crate) metrics: Arc<RemoteTimelineClientMetrics>,
metrics: Arc<RemoteTimelineClientMetrics>,
storage_impl: GenericRemoteStorage,
@@ -461,11 +461,11 @@ impl RemoteTimelineClient {
} else {
0
};
self.metrics.remote_physical_size_gauge.set(size);
self.metrics.remote_physical_size_set(size);
}
pub fn get_remote_physical_size(&self) -> u64 {
self.metrics.remote_physical_size_gauge.get()
self.metrics.remote_physical_size_get()
}
//
@@ -1127,11 +1127,6 @@ impl RemoteTimelineClient {
Ok(())
}
pub(crate) fn is_deleting(&self) -> bool {
let mut locked = self.upload_queue.lock().unwrap();
locked.stopped_mut().is_ok()
}
pub(crate) async fn preserve_initdb_archive(
self: &Arc<Self>,
tenant_id: &TenantId,
@@ -2137,7 +2132,7 @@ mod tests {
tenant_ctx: _tenant_ctx,
} = test_setup;
let client = &timeline.remote_client;
let client = timeline.remote_client.as_ref().unwrap();
// Download back the index.json, and check that the list of files is correct
let initial_index_part = match client
@@ -2328,7 +2323,7 @@ mod tests {
timeline,
..
} = TestSetup::new("metrics").await.unwrap();
let client = &timeline.remote_client;
let client = timeline.remote_client.as_ref().unwrap();
let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let local_path = local_layer_path(

View File

@@ -112,17 +112,14 @@ pub async fn download_layer_file<'a>(
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let work = {
let ctx = ctx.detached_child(ctx.task_kind(), ctx.download_behavior());
async move {
let timeline_dir = VirtualFile::open(&timeline_path, &ctx)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
let work = async move {
let timeline_dir = VirtualFile::open(&timeline_path)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
};
crate::virtual_file::io_engine::get()
.spawn_blocking_and_block_on_if_std(work)
@@ -199,7 +196,7 @@ async fn download_object<'a>(
use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer};
use bytes::BytesMut;
async {
let destination_file = VirtualFile::create(dst_path, ctx)
let destination_file = VirtualFile::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;

View File

@@ -26,7 +26,7 @@ use crate::{
tasks::{warn_when_period_overrun, BackgroundLoopKind},
},
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
TEMP_FILE_SUFFIX,
METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
};
use super::{
@@ -45,10 +45,10 @@ use crate::tenant::{
use camino::Utf8PathBuf;
use chrono::format::{DelayedFormat, StrftimeItems};
use futures::{Future, StreamExt};
use futures::Future;
use pageserver_api::models::SecondaryProgress;
use pageserver_api::shard::TenantShardId;
use remote_storage::{DownloadError, Etag, GenericRemoteStorage, RemoteStorageActivity};
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
use tokio_util::sync::CancellationToken;
use tracing::{info_span, instrument, warn, Instrument};
@@ -71,12 +71,6 @@ use super::{
/// `<ttps://github.com/neondatabase/neon/issues/6200>`
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
/// Range of concurrency we may use when downloading layers within a timeline. This is independent
/// for each tenant we're downloading: the concurrency of _tenants_ is defined separately in
/// `PageServerConf::secondary_download_concurrency`
const MAX_LAYER_CONCURRENCY: usize = 16;
const MIN_LAYER_CONCURRENCY: usize = 1;
pub(super) async fn downloader_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
@@ -85,15 +79,14 @@ pub(super) async fn downloader_task(
cancel: CancellationToken,
root_ctx: RequestContext,
) {
// How many tenants' secondary download operations we will run concurrently
let tenant_concurrency = tenant_manager.get_conf().secondary_download_concurrency;
let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
let generator = SecondaryDownloader {
tenant_manager,
remote_storage,
root_ctx,
};
let mut scheduler = Scheduler::new(generator, tenant_concurrency);
let mut scheduler = Scheduler::new(generator, concurrency);
scheduler
.run(command_queue, background_jobs_can_start, cancel)
@@ -799,8 +792,6 @@ impl<'a> TenantDownloader<'a> {
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
let mut download_futs = Vec::new();
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
@@ -883,33 +874,67 @@ impl<'a> TenantDownloader<'a> {
}
}
download_futs.push(self.download_layer(
tenant_shard_id,
&timeline.timeline_id,
layer,
// Failpoint for simulating slow remote storage
failpoint_support::sleep_millis_async!(
"secondary-layer-download-sleep",
&self.secondary_state.cancel
);
// Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
let downloaded_bytes = match download_layer_file(
self.conf,
self.remote_storage,
*tenant_shard_id,
timeline.timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
&self.secondary_state.cancel,
ctx,
));
}
// Break up layer downloads into chunks, so that for each chunk we can re-check how much
// concurrency to use based on activity level of remote storage.
while !download_futs.is_empty() {
let chunk =
download_futs.split_off(download_futs.len().saturating_sub(MAX_LAYER_CONCURRENCY));
let concurrency = Self::layer_concurrency(self.remote_storage.activity());
let mut result_stream = futures::stream::iter(chunk).buffered(concurrency);
let mut result_stream = std::pin::pin!(result_stream);
while let Some(result) = result_stream.next().await {
match result {
Err(e) => return Err(e),
Ok(None) => {
// No error, but we didn't download the layer. Don't mark it touched
}
Ok(Some(layer)) => touched.push(layer),
)
.await
{
Ok(bytes) => bytes,
Err(DownloadError::NotFound) => {
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
// This is harmless: continue to download the next layer. It is expected during compaction
// GC.
tracing::debug!(
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
continue;
}
Err(e) => return Err(e.into()),
};
if downloaded_bytes != layer.metadata.file_size {
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
&timeline.timeline_id,
&layer.name,
&layer.metadata.generation,
);
tracing::warn!(
"Downloaded layer {} with unexpected size {} != {}. Removing download.",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
} else {
tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
let mut progress = self.secondary_state.progress.lock().unwrap();
progress.bytes_downloaded += downloaded_bytes;
progress.layers_downloaded += 1;
}
SECONDARY_MODE.download_layer.inc();
touched.push(layer)
}
// Write updates to state to record layers we just downloaded or touched.
@@ -941,90 +966,6 @@ impl<'a> TenantDownloader<'a> {
Ok(())
}
async fn download_layer(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
layer: HeatMapLayer,
ctx: &RequestContext,
) -> Result<Option<HeatMapLayer>, UpdateError> {
// Failpoint for simulating slow remote storage
failpoint_support::sleep_millis_async!(
"secondary-layer-download-sleep",
&self.secondary_state.cancel
);
// Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
let downloaded_bytes = match download_layer_file(
self.conf,
self.remote_storage,
*tenant_shard_id,
*timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
&self.secondary_state.cancel,
ctx,
)
.await
{
Ok(bytes) => bytes,
Err(DownloadError::NotFound) => {
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
// This is harmless: continue to download the next layer. It is expected during compaction
// GC.
tracing::debug!(
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
return Ok(None);
}
Err(e) => return Err(e.into()),
};
if downloaded_bytes != layer.metadata.file_size {
let local_path = local_layer_path(
self.conf,
tenant_shard_id,
timeline_id,
&layer.name,
&layer.metadata.generation,
);
tracing::warn!(
"Downloaded layer {} with unexpected size {} != {}. Removing download.",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
} else {
tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes);
let mut progress = self.secondary_state.progress.lock().unwrap();
progress.bytes_downloaded += downloaded_bytes;
progress.layers_downloaded += 1;
}
SECONDARY_MODE.download_layer.inc();
Ok(Some(layer))
}
/// Calculate the currently allowed parallelism of layer download tasks, based on activity level of the remote storage
fn layer_concurrency(activity: RemoteStorageActivity) -> usize {
// When less than 75% of units are available, use minimum concurrency. Else, do a linear mapping
// of our concurrency range to the units available within the remaining 25%.
let clamp_at = (activity.read_total * 3) / 4;
if activity.read_available > clamp_at {
(MAX_LAYER_CONCURRENCY * (activity.read_available - clamp_at))
/ (activity.read_total - clamp_at)
} else {
MIN_LAYER_CONCURRENCY
}
}
}
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
@@ -1074,7 +1015,11 @@ async fn init_timeline_state(
.fatal_err(&format!("Read metadata on {}", file_path));
let file_name = file_path.file_name().expect("created it from the dentry");
if crate::is_temporary(&file_path)
if file_name == METADATA_FILE_NAME {
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config");
continue;
} else if crate::is_temporary(&file_path)
|| is_temp_download_file(&file_path)
|| is_ephemeral_file(file_name)
{
@@ -1147,58 +1092,3 @@ async fn init_timeline_state(
detail
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn layer_concurrency() {
// Totally idle
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 16,
read_total: 16,
write_available: 16,
write_total: 16
}),
MAX_LAYER_CONCURRENCY
);
// Totally busy
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 0,
read_total: 16,
write_available: 16,
write_total: 16
}),
MIN_LAYER_CONCURRENCY
);
// Edge of the range at which we interpolate
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 12,
read_total: 16,
write_available: 16,
write_total: 16
}),
MIN_LAYER_CONCURRENCY
);
// Midpoint of the range in which we interpolate
assert_eq!(
TenantDownloader::layer_concurrency(RemoteStorageActivity {
read_available: 14,
read_total: 16,
write_available: 16,
write_total: 16
}),
MAX_LAYER_CONCURRENCY / 2
);
}
}

View File

@@ -15,14 +15,6 @@ pub(super) struct HeatMapTenant {
pub(super) generation: Generation,
pub(super) timelines: Vec<HeatMapTimeline>,
/// Uploaders provide their own upload period in the heatmap, as a hint to downloaders
/// of how frequently it is worthwhile to check for updates.
///
/// This is optional for backward compat, and because we sometimes might upload
/// a heatmap explicitly via API for a tenant that has no periodic upload configured.
#[serde(default)]
pub(super) upload_period_ms: Option<u128>,
}
#[serde_as]
@@ -89,21 +81,4 @@ impl HeatMapTenant {
stats
}
pub(crate) fn strip_atimes(self) -> Self {
Self {
timelines: self
.timelines
.into_iter()
.map(|mut tl| {
for layer in &mut tl.layers {
layer.access_time = SystemTime::UNIX_EPOCH;
}
tl
})
.collect(),
generation: self.generation,
upload_period_ms: self.upload_period_ms,
}
}
}

View File

@@ -80,7 +80,7 @@ impl RunningJob for WriteInProgress {
struct UploadPending {
tenant: Arc<Tenant>,
last_upload: Option<LastUploadState>,
last_digest: Option<md5::Digest>,
target_time: Option<Instant>,
period: Option<Duration>,
}
@@ -94,7 +94,7 @@ impl scheduler::PendingJob for UploadPending {
struct WriteComplete {
tenant_shard_id: TenantShardId,
completed_at: Instant,
uploaded: Option<LastUploadState>,
digest: Option<md5::Digest>,
next_upload: Option<Instant>,
}
@@ -115,7 +115,10 @@ struct UploaderTenantState {
tenant: Weak<Tenant>,
/// Digest of the serialized heatmap that we last successfully uploaded
last_upload_state: Option<LastUploadState>,
///
/// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
/// which is also an md5sum.
last_digest: Option<md5::Digest>,
/// When the last upload attempt completed (may have been successful or failed)
last_upload: Option<Instant>,
@@ -184,7 +187,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
tenant: Arc::downgrade(&tenant),
last_upload: None,
next_upload: Some(now.checked_add(period_warmup(period)).unwrap_or(now)),
last_upload_state: None,
last_digest: None,
});
// Decline to do the upload if insufficient time has passed
@@ -192,10 +195,10 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
return;
}
let last_upload = state.last_upload_state.clone();
let last_digest = state.last_digest;
result.jobs.push(UploadPending {
tenant,
last_upload,
last_digest,
target_time: state.next_upload,
period: Some(period),
});
@@ -215,7 +218,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
) {
let UploadPending {
tenant,
last_upload,
last_digest,
target_time,
period,
} = job;
@@ -228,16 +231,16 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let _completion = completion;
let started_at = Instant::now();
let uploaded = match upload_tenant_heatmap(remote_storage, &tenant, last_upload.clone()).await {
Ok(UploadHeatmapOutcome::Uploaded(uploaded)) => {
let digest = match upload_tenant_heatmap(remote_storage, &tenant, last_digest).await {
Ok(UploadHeatmapOutcome::Uploaded(digest)) => {
let duration = Instant::now().duration_since(started_at);
SECONDARY_MODE
.upload_heatmap_duration
.observe(duration.as_secs_f64());
SECONDARY_MODE.upload_heatmap.inc();
Some(uploaded)
Some(digest)
}
Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_upload,
Ok(UploadHeatmapOutcome::NoChange | UploadHeatmapOutcome::Skipped) => last_digest,
Err(UploadHeatmapError::Upload(e)) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
@@ -248,11 +251,11 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
.upload_heatmap_duration
.observe(duration.as_secs_f64());
SECONDARY_MODE.upload_heatmap_errors.inc();
last_upload
last_digest
}
Err(UploadHeatmapError::Cancelled) => {
tracing::info!("Cancelled heatmap upload, shutting down");
last_upload
last_digest
}
};
@@ -274,7 +277,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
WriteComplete {
tenant_shard_id: *tenant.get_tenant_shard_id(),
completed_at: now,
uploaded,
digest,
next_upload,
}
}.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
@@ -296,7 +299,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
Ok(UploadPending {
// Ignore our state for last digest: this forces an upload even if nothing has changed
last_upload: None,
last_digest: None,
tenant,
target_time: None,
period: None,
@@ -309,7 +312,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let WriteComplete {
tenant_shard_id,
completed_at,
uploaded,
digest,
next_upload,
} = completion;
use std::collections::hash_map::Entry;
@@ -319,7 +322,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
}
Entry::Occupied(mut entry) => {
entry.get_mut().last_upload = Some(completed_at);
entry.get_mut().last_upload_state = uploaded;
entry.get_mut().last_digest = digest;
entry.get_mut().next_upload = next_upload
}
}
@@ -328,7 +331,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
enum UploadHeatmapOutcome {
/// We successfully wrote to remote storage, with this digest.
Uploaded(LastUploadState),
Uploaded(md5::Digest),
/// We did not upload because the heatmap digest was unchanged since the last upload
NoChange,
/// We skipped the upload for some reason, such as tenant/timeline not ready
@@ -344,25 +347,12 @@ enum UploadHeatmapError {
Upload(#[from] anyhow::Error),
}
/// Digests describing the heatmap we most recently uploaded successfully.
///
/// md5 is generally a bad hash. We use it because it's convenient for interop with AWS S3's ETag,
/// which is also an md5sum.
#[derive(Clone)]
struct LastUploadState {
// Digest of json-encoded HeatMapTenant
uploaded_digest: md5::Digest,
// Digest without atimes set.
layers_only_digest: md5::Digest,
}
/// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
/// of the object we would have uploaded.
async fn upload_tenant_heatmap(
remote_storage: GenericRemoteStorage,
tenant: &Arc<Tenant>,
last_upload: Option<LastUploadState>,
last_digest: Option<md5::Digest>,
) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
debug_assert_current_span_has_tenant_id();
@@ -378,7 +368,6 @@ async fn upload_tenant_heatmap(
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
generation,
upload_period_ms: tenant.get_heatmap_period().map(|p| p.as_millis()),
};
let timelines = tenant.timelines.lock().unwrap().clone();
@@ -407,31 +396,15 @@ async fn upload_tenant_heatmap(
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap).map_err(|e| anyhow::anyhow!(e))?;
let bytes = bytes::Bytes::from(bytes);
let size = bytes.len();
// Drop out early if nothing changed since our last upload
let digest = md5::compute(&bytes);
if Some(&digest) == last_upload.as_ref().map(|d| &d.uploaded_digest) {
if Some(digest) == last_digest {
return Ok(UploadHeatmapOutcome::NoChange);
}
// Calculate a digest that omits atimes, so that we can distinguish actual changes in
// layers from changes only in atimes.
let heatmap_size_bytes = heatmap.get_stats().bytes;
let layers_only_bytes =
serde_json::to_vec(&heatmap.strip_atimes()).map_err(|e| anyhow::anyhow!(e))?;
let layers_only_digest = md5::compute(&layers_only_bytes);
if heatmap_size_bytes < tenant.get_checkpoint_distance() {
// For small tenants, skip upload if only atimes changed. This avoids doing frequent
// uploads from long-idle tenants whose atimes are just incremented by periodic
// size calculations.
if Some(&layers_only_digest) == last_upload.as_ref().map(|d| &d.layers_only_digest) {
return Ok(UploadHeatmapOutcome::NoChange);
}
}
let bytes = bytes::Bytes::from(bytes);
let size = bytes.len();
let path = remote_heatmap_path(tenant.get_tenant_shard_id());
let cancel = &tenant.cancel;
@@ -463,8 +436,5 @@ async fn upload_tenant_heatmap(
tracing::info!("Successfully uploaded {size} byte heatmap to {path}");
Ok(UploadHeatmapOutcome::Uploaded(LastUploadState {
uploaded_digest: digest,
layers_only_digest,
}))
Ok(UploadHeatmapOutcome::Uploaded(digest))
}

View File

@@ -394,7 +394,6 @@ impl DeltaLayerWriterInner {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename. We don't know
// the end key yet, so we cannot form the final filename yet. We will
@@ -405,7 +404,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path, ctx).await?;
let mut file = VirtualFile::create(&path).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
@@ -587,7 +586,6 @@ impl DeltaLayerWriter {
tenant_shard_id: TenantShardId,
key_start: Key,
lsn_range: Range<Lsn>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
inner: Some(
@@ -597,7 +595,6 @@ impl DeltaLayerWriter {
tenant_shard_id,
key_start,
lsn_range,
ctx,
)
.await?,
),
@@ -704,7 +701,6 @@ impl DeltaLayer {
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
@@ -738,7 +734,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
@@ -912,7 +908,7 @@ impl DeltaLayerInner {
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
@@ -1016,7 +1012,6 @@ impl DeltaLayerInner {
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
@@ -1034,7 +1029,7 @@ impl DeltaLayerInner {
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await;
let blobs_buf = match res {
@@ -1279,7 +1274,7 @@ impl DeltaLayerInner {
buf.clear();
buf.reserve(read.size());
let res = reader.read_blobs(&read, buf, ctx).await?;
let res = reader.read_blobs(&read, buf).await?;
for blob in res.blobs {
let key = blob.meta.key;
@@ -1796,7 +1791,6 @@ mod test {
harness.tenant_shard_id,
entries_meta.key_range.start,
entries_meta.lsn_range.clone(),
&ctx,
)
.await?;
@@ -1854,7 +1848,7 @@ mod test {
for read in vectored_reads {
let blobs_buf = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), &ctx)
.read_blobs(&read, buf.take().expect("Should have a buffer"))
.await?;
for meta in blobs_buf.blobs.iter() {
let value = &blobs_buf.buf[meta.start..meta.end];
@@ -1984,7 +1978,6 @@ mod test {
tenant.tenant_shard_id,
Key::MIN,
Lsn(0x11)..truncate_at,
ctx,
)
.await
.unwrap();

View File

@@ -343,7 +343,6 @@ impl ImageLayer {
let mut file = VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new().read(true).write(true),
ctx,
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
@@ -378,7 +377,7 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
@@ -475,7 +474,7 @@ impl ImageLayerInner {
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
Ok(())
@@ -538,7 +537,6 @@ impl ImageLayerInner {
&self,
reads: Vec<VectoredRead>,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let max_vectored_read_bytes = self
.max_vectored_read_bytes
@@ -567,7 +565,7 @@ impl ImageLayerInner {
}
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
let res = vectored_blob_reader.read_blobs(&read, buf).await;
match res {
Ok(blobs_buf) => {
@@ -633,7 +631,6 @@ impl ImageLayerWriterInner {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
// We'll atomically rename it to the final name when we're done.
@@ -653,7 +650,6 @@ impl ImageLayerWriterInner {
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
ctx,
)
.await?
};
@@ -808,11 +804,10 @@ impl ImageLayerWriter {
tenant_shard_id: TenantShardId,
key_range: &Range<Key>,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<ImageLayerWriter> {
Ok(Self {
inner: Some(
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx)
ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn)
.await?,
),
})

View File

@@ -473,11 +473,10 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
Ok(InMemoryLayer {
@@ -643,7 +642,6 @@ impl InMemoryLayer {
self.tenant_shard_id,
Key::MIN,
self.start_lsn..end_lsn,
ctx,
)
.await?;

View File

@@ -129,16 +129,19 @@ pub(crate) fn local_layer_path(
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
layer_file_name: &LayerName,
generation: &Generation,
_generation: &Generation,
) -> Utf8PathBuf {
let timeline_path = conf.timeline_path(tenant_shard_id, timeline_id);
if generation.is_none() {
// Without a generation, we may only use legacy path style
timeline_path.join(layer_file_name.to_string())
} else {
timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
}
timeline_path.join(layer_file_name.to_string())
// TODO: switch to enabling new-style layer paths after next release
// if generation.is_none() {
// // Without a generation, we may only use legacy path style
// timeline_path.join(layer_file_name.to_string())
// } else {
// timeline_path.join(format!("{}-v1{}", layer_file_name, generation.get_suffix()))
// }
}
impl Layer {
@@ -585,6 +588,9 @@ struct LayerInner {
/// [`Timeline::gate`] at the same time.
timeline: Weak<Timeline>,
/// Cached knowledge of [`Timeline::remote_client`] being `Some`.
have_remote_client: bool,
access_stats: LayerAccessStats,
/// This custom OnceCell is backed by std mutex, but only held for short time periods.
@@ -729,23 +735,23 @@ impl Drop for LayerInner {
if removed {
timeline.metrics.resident_physical_size_sub(file_size);
}
let res = timeline
.remote_client
.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
// demonstrating this deadlock (without spawn_blocking): stop will drop
// queued items, which will have ResidentLayer's, and those drops would try
// to re-entrantly lock the RemoteTimelineClient inner state.
if !timeline.is_active() {
tracing::info!("scheduling deletion on drop failed: {e:#}");
if let Err(e) = res {
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
// demonstrating this deadlock (without spawn_blocking): stop will drop
// queued items, which will have ResidentLayer's, and those drops would try
// to re-entrantly lock the RemoteTimelineClient inner state.
if !timeline.is_active() {
tracing::info!("scheduling deletion on drop failed: {e:#}");
} else {
tracing::warn!("scheduling deletion on drop failed: {e:#}");
}
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
} else {
tracing::warn!("scheduling deletion on drop failed: {e:#}");
LAYER_IMPL_METRICS.inc_completed_deletes();
}
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
} else {
LAYER_IMPL_METRICS.inc_completed_deletes();
}
});
}
@@ -783,6 +789,7 @@ impl LayerInner {
path: local_path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_deleted: AtomicBool::new(false),
inner,
@@ -811,6 +818,8 @@ impl LayerInner {
/// in a new attempt to evict OR join the previously started attempt.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))]
pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> {
assert!(self.have_remote_client);
let mut rx = self.status.as_ref().unwrap().subscribe();
{
@@ -967,6 +976,10 @@ impl LayerInner {
return Err(DownloadError::NotFile(ft));
}
if timeline.remote_client.as_ref().is_none() {
return Err(DownloadError::NoRemoteStorage);
}
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
}
@@ -1103,8 +1116,12 @@ impl LayerInner {
permit: heavier_once_cell::InitPermit,
ctx: &RequestContext,
) -> anyhow::Result<Arc<DownloadedLayer>> {
let result = timeline
let client = timeline
.remote_client
.as_ref()
.expect("checked before download_init_and_wait");
let result = client
.download_layer_file(
&self.desc.layer_name(),
&self.metadata(),
@@ -1279,10 +1296,20 @@ impl LayerInner {
/// `DownloadedLayer` is being dropped, so it calls this method.
fn on_downloaded_layer_drop(self: Arc<LayerInner>, only_version: usize) {
let can_evict = self.have_remote_client;
// we cannot know without inspecting LayerInner::inner if we should evict or not, even
// though here it is very likely
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version);
if !can_evict {
// it would be nice to assert this case out, but we are in drop
span.in_scope(|| {
tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage");
});
return;
}
// NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might
// drop while the `self.inner` is being locked, leading to a deadlock.
@@ -1554,6 +1581,8 @@ pub(crate) enum EvictionError {
pub(crate) enum DownloadError {
#[error("timeline has already shutdown")]
TimelineShutdown,
#[error("no remote storage configured")]
NoRemoteStorage,
#[error("context denies downloading")]
ContextAndConfigReallyDeniesDownloads,
#[error("downloading is really required but not allowed by this method")]

View File

@@ -145,7 +145,7 @@ async fn smoke_test() {
.await
.expect("the local layer file still exists");
let rtc = &timeline.remote_client;
let rtc = timeline.remote_client.as_ref().unwrap();
{
let layers = &[layer];
@@ -761,7 +761,13 @@ async fn eviction_cancellation_on_drop() {
timeline.freeze_and_flush().await.unwrap();
// wait for the upload to complete so our Arc::strong_count assertion holds
timeline.remote_client.wait_completion().await.unwrap();
timeline
.remote_client
.as_ref()
.unwrap()
.wait_completion()
.await
.unwrap();
let (evicted_layer, not_evicted) = {
let mut layers = {

View File

@@ -41,7 +41,7 @@ static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore
tokio::sync::Semaphore::new(permits)
});
#[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr, enum_map::Enum)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, strum_macros::IntoStaticStr)]
#[strum(serialize_all = "snake_case")]
pub(crate) enum BackgroundLoopKind {
Compaction,
@@ -57,25 +57,19 @@ pub(crate) enum BackgroundLoopKind {
impl BackgroundLoopKind {
fn as_static_str(&self) -> &'static str {
self.into()
let s: &'static str = self.into();
s
}
}
static PERMIT_GAUGES: once_cell::sync::Lazy<
enum_map::EnumMap<BackgroundLoopKind, metrics::IntCounterPair>,
> = once_cell::sync::Lazy::new(|| {
enum_map::EnumMap::from_array(std::array::from_fn(|i| {
let kind = <BackgroundLoopKind as enum_map::Enum>::from_usize(i);
crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE.with_label_values(&[kind.into()])
}))
});
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
loop_kind: BackgroundLoopKind,
_ctx: &RequestContext,
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = PERMIT_GAUGES[loop_kind].guard();
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
.with_label_values(&[loop_kind.as_static_str()])
.guard();
pausable_failpoint!(
"initial-size-calculation-permit-pause",

View File

@@ -61,12 +61,9 @@ use std::{
};
use crate::tenant::timeline::init::LocalLayerFileMetadata;
use crate::{
aux_file::AuxFileSizeEstimator,
tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
},
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -200,7 +197,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: RemoteTimelineClient,
pub remote_client: Option<RemoteTimelineClient>,
pub deletion_queue_client: DeletionQueueClient,
pub timeline_get_throttle: Arc<
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
@@ -272,7 +269,7 @@ pub struct Timeline {
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
pub remote_client: Arc<RemoteTimelineClient>,
pub remote_client: Option<Arc<RemoteTimelineClient>>,
// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
@@ -412,8 +409,6 @@ pub struct Timeline {
/// Keep aux directory cache to avoid it's reconstruction on each update
pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
pub(crate) aux_file_size_estimator: AuxFileSizeEstimator,
}
pub struct WalReceiverInfo {
@@ -1375,14 +1370,22 @@ impl Timeline {
/// not validated with control plane yet.
/// See [`Self::get_remote_consistent_lsn_visible`].
pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
self.remote_client.remote_consistent_lsn_projected()
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_projected()
} else {
None
}
}
/// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
/// i.e. a value of remote_consistent_lsn_projected which has undergone
/// generation validation in the deletion queue.
pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
self.remote_client.remote_consistent_lsn_visible()
if let Some(remote_client) = &self.remote_client {
remote_client.remote_consistent_lsn_visible()
} else {
None
}
}
/// The sum of the file size of all historic layers in the layer map.
@@ -1752,14 +1755,16 @@ impl Timeline {
match self.freeze_and_flush().await {
Ok(_) => {
// drain the upload queue
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
self.remote_client.shutdown().await;
if let Some(client) = self.remote_client.as_ref() {
// if we did not wait for completion here, it might be our shutdown process
// didn't wait for remote uploads to complete at all, as new tasks can forever
// be spawned.
//
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
client.shutdown().await;
}
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
@@ -1775,16 +1780,18 @@ impl Timeline {
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
self.remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
task_mgr::shutdown_tasks(
Some(TaskKind::RemoteUploadTask),
Some(self.tenant_shard_id),
Some(self.timeline_id),
)
.await;
}
// TODO: work toward making this a no-op. See this funciton's doc comment for more context.
tracing::debug!("Waiting for tasks...");
@@ -1910,6 +1917,10 @@ impl Timeline {
return Ok(None);
};
if self.remote_client.is_none() {
return Ok(Some(false));
}
layer.download().await?;
Ok(Some(true))
@@ -2150,16 +2161,6 @@ impl Timeline {
};
Arc::new_cyclic(|myself| {
let metrics = TimelineMetrics::new(
&tenant_shard_id,
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
evictions_low_residence_duration_metric_threshold,
),
);
let aux_file_metrics = metrics.aux_file_size_gauge.clone();
let mut result = Timeline {
conf,
tenant_conf,
@@ -2174,7 +2175,7 @@ impl Timeline {
walredo_mgr,
walreceiver: Mutex::new(None),
remote_client: Arc::new(resources.remote_client),
remote_client: resources.remote_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -2191,7 +2192,14 @@ impl Timeline {
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),
metrics,
metrics: TimelineMetrics::new(
&tenant_shard_id,
&timeline_id,
crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
"mtime",
evictions_low_residence_duration_metric_threshold,
),
),
query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
&tenant_shard_id,
@@ -2255,8 +2263,6 @@ impl Timeline {
dir: None,
n_deltas: 0,
}),
aux_file_size_estimator: AuxFileSizeEstimator::new(aux_file_metrics),
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
@@ -2421,6 +2427,10 @@ impl Timeline {
discovered_layers.push((layer_file_name, local_path, file_size));
continue;
}
Discovered::Metadata => {
warn!("found legacy metadata file, these should have been removed in load_tenant_config");
continue;
}
Discovered::IgnoredBackup => {
continue;
}
@@ -2467,10 +2477,12 @@ impl Timeline {
if local.metadata.file_size() == remote.file_size() {
// Use the local file, but take the remote metadata so that we pick up
// the correct generation.
UseLocal(LocalLayerFileMetadata {
metadata: remote,
local_path: local.local_path,
})
UseLocal(
LocalLayerFileMetadata {
metadata: remote,
local_path: local.local_path
}
)
} else {
init::cleanup_local_file_for_remote(&local, &remote)?;
UseRemote { local, remote }
@@ -2479,11 +2491,7 @@ impl Timeline {
Ok(decision) => decision,
Err(DismissedLayer::Future { local }) => {
if let Some(local) = local {
init::cleanup_future_layer(
&local.local_path,
&name,
disk_consistent_lsn,
)?;
init::cleanup_future_layer(&local.local_path, &name, disk_consistent_lsn)?;
}
needs_cleanup.push(name);
continue;
@@ -2505,8 +2513,7 @@ impl Timeline {
let layer = match decision {
UseLocal(local) => {
total_physical_size += local.metadata.file_size();
Layer::for_resident(conf, &this, local.local_path, name, local.metadata)
.drop_eviction_guard()
Layer::for_resident(conf, &this, local.local_path, name, local.metadata).drop_eviction_guard()
}
Evicted(remote) | UseRemote { remote, .. } => {
Layer::for_evicted(conf, &this, name, remote)
@@ -2526,36 +2533,36 @@ impl Timeline {
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
self.remote_client
.schedule_layer_file_deletion(&needs_cleanup)?;
self.remote_client
.schedule_index_upload_for_file_changes()?;
// This barrier orders above DELETEs before any later operations.
// This is critical because code executing after the barrier might
// create again objects with the same key that we just scheduled for deletion.
// For example, if we just scheduled deletion of an image layer "from the future",
// later compaction might run again and re-create the same image layer.
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
// "same" here means same key range and LSN.
//
// Without a barrier between above DELETEs and the re-creation's PUTs,
// the upload queue may execute the PUT first, then the DELETE.
// In our example, we will end up with an IndexPart referencing a non-existent object.
//
// 1. a future image layer is created and uploaded
// 2. ps restart
// 3. the future layer from (1) is deleted during load layer map
// 4. image layer is re-created and uploaded
// 5. deletion queue would like to delete (1) but actually deletes (4)
// 6. delete by name works as expected, but it now deletes the wrong (later) version
//
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
self.remote_client.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
if let Some(rtc) = self.remote_client.as_ref() {
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_index_upload_for_file_changes()?;
// This barrier orders above DELETEs before any later operations.
// This is critical because code executing after the barrier might
// create again objects with the same key that we just scheduled for deletion.
// For example, if we just scheduled deletion of an image layer "from the future",
// later compaction might run again and re-create the same image layer.
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
// "same" here means same key range and LSN.
//
// Without a barrier between above DELETEs and the re-creation's PUTs,
// the upload queue may execute the PUT first, then the DELETE.
// In our example, we will end up with an IndexPart referencing a non-existent object.
//
// 1. a future image layer is created and uploaded
// 2. ps restart
// 3. the future layer from (1) is deleted during load layer map
// 4. image layer is re-created and uploaded
// 5. deletion queue would like to delete (1) but actually deletes (4)
// 6. delete by name works as expected, but it now deletes the wrong (later) version
//
// See https://github.com/neondatabase/neon/issues/5878
//
// NB: generation numbers naturally protect against this because they disambiguate
// (1) and (4)
rtc.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
}
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
@@ -2614,7 +2621,6 @@ impl Timeline {
// Don't make noise.
} else {
warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
debug_assert!(false);
}
}
};
@@ -3008,6 +3014,9 @@ impl Timeline {
/// should treat this as a cue to simply skip doing any heatmap uploading
/// for this timeline.
pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
// no point in heatmaps without remote client
let _remote_client = self.remote_client.as_ref()?;
if !self.is_active() {
return None;
}
@@ -3035,7 +3044,10 @@ impl Timeline {
// branchpoint in the value in IndexPart::lineage
self.ancestor_lsn == lsn
|| (self.ancestor_lsn == Lsn::INVALID
&& self.remote_client.is_previous_ancestor_lsn(lsn))
&& self
.remote_client
.as_ref()
.is_some_and(|rtc| rtc.is_previous_ancestor_lsn(lsn)))
}
}
@@ -3548,11 +3560,7 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
async fn get_layer_for_write(
&self,
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard
.get_layer_for_write(
@@ -3561,7 +3569,6 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
ctx,
)
.await?;
Ok(layer)
@@ -3826,8 +3833,8 @@ impl Timeline {
);
self.create_delta_layer(
&frozen_layer,
Some(metadata_keyspace.0.ranges[0].clone()),
ctx,
Some(metadata_keyspace.0.ranges[0].clone()),
)
.await?
} else {
@@ -3856,7 +3863,7 @@ impl Timeline {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let Some(layer) = self.create_delta_layer(&frozen_layer, None, ctx).await? else {
let Some(layer) = self.create_delta_layer(&frozen_layer, ctx, None).await? else {
panic!("delta layer cannot be empty if no filter is applied");
};
(
@@ -3955,23 +3962,29 @@ impl Timeline {
x.unwrap()
));
for layer in layers_to_upload {
self.remote_client.schedule_layer_file_upload(layer)?;
if let Some(remote_client) = &self.remote_client {
for layer in layers_to_upload {
remote_client.schedule_layer_file_upload(layer)?;
}
remote_client.schedule_index_upload_for_metadata_update(&update)?;
}
self.remote_client
.schedule_index_upload_for_metadata_update(&update)?;
Ok(())
}
pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
self.remote_client
.preserve_initdb_archive(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
&self.cancel,
)
.await
if let Some(remote_client) = &self.remote_client {
remote_client
.preserve_initdb_archive(
&self.tenant_shard_id.tenant_id,
&self.timeline_id,
&self.cancel,
)
.await?;
} else {
bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
}
Ok(())
}
// Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
@@ -3979,8 +3992,8 @@ impl Timeline {
async fn create_delta_layer(
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
key_range: Option<Range<Key>>,
ctx: &RequestContext,
key_range: Option<Range<Key>>,
) -> anyhow::Result<Option<ResidentLayer>> {
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
@@ -4003,7 +4016,6 @@ impl Timeline {
&self_clone
.conf
.timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id),
&ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -4197,7 +4209,6 @@ impl Timeline {
self.tenant_shard_id,
&img_range,
lsn,
ctx,
)
.await?;
@@ -4302,7 +4313,6 @@ impl Timeline {
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -4327,16 +4337,6 @@ impl Timeline {
/// this Timeline is shut down. Calling this function will cause the initial
/// logical size calculation to skip waiting for the background jobs barrier.
pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
if !self.shard_identity.is_shard_zero() {
// We don't populate logical size on shard >0: skip waiting for it.
return;
}
if self.remote_client.is_deleting() {
// The timeline was created in a deletion-resume state, we don't expect logical size to be populated
return;
}
if let Some(await_bg_cancel) = self
.current_logical_size
.cancel_wait_for_background_loop_concurrency_limit_semaphore
@@ -4348,10 +4348,9 @@ impl Timeline {
// the logical size cancellation to skip the concurrency limit semaphore.
// TODO: this is an unexpected case. We should restructure so that it
// can't happen.
tracing::warn!(
tracing::info!(
"await_initial_logical_size: can't get semaphore cancel token, skipping"
);
debug_assert!(false);
}
tokio::select!(
@@ -4500,8 +4499,9 @@ impl Timeline {
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&remove_layers, new_deltas)?;
}
drop_wlock(guard);
@@ -4519,8 +4519,9 @@ impl Timeline {
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
self.remote_client
.schedule_compaction_update(&drop_layers, &upload_layers)?;
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&drop_layers, &upload_layers)?;
}
Ok(())
}
@@ -4530,14 +4531,16 @@ impl Timeline {
self: &Arc<Self>,
new_images: impl IntoIterator<Item = ResidentLayer>,
) -> anyhow::Result<()> {
let Some(remote_client) = &self.remote_client else {
return Ok(());
};
for layer in new_images {
self.remote_client.schedule_layer_file_upload(layer)?;
remote_client.schedule_layer_file_upload(layer)?;
}
// should any new image layer been created, not uploading index_part will
// result in a mismatch between remote_physical_size and layermap calculated
// size, which will fail some tests, but should not be an issue otherwise.
self.remote_client
.schedule_index_upload_for_file_changes()?;
remote_client.schedule_index_upload_for_file_changes()?;
Ok(())
}
@@ -4635,9 +4638,11 @@ impl Timeline {
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
// this is most likely the background tasks, but it might be the spawned task from
// immediate_gc
let cancel = crate::task_mgr::shutdown_token();
let _g = tokio::select! {
guard = self.gc_lock.lock() => guard,
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
_ = cancel.cancelled() => return Ok(GcResult::default()),
};
let timer = self.metrics.garbage_collect_histo.start_timer();
@@ -4823,7 +4828,9 @@ impl Timeline {
result.layers_removed = gc_layers.len() as u64;
self.remote_client.schedule_gc_update(&gc_layers)?;
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_gc_update(&gc_layers)?;
}
guard.finish_gc_timeline(&gc_layers);
@@ -5207,7 +5214,7 @@ impl<'a> TimelineWriter<'a> {
let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
let action = self.get_open_layer_action(lsn, buf_size);
let layer = self.handle_open_layer_action(lsn, action, ctx).await?;
let layer = self.handle_open_layer_action(lsn, action).await?;
let res = layer.put_value(key, lsn, &buf, ctx).await;
if res.is_ok() {
@@ -5230,15 +5237,14 @@ impl<'a> TimelineWriter<'a> {
&mut self,
at: Lsn,
action: OpenLayerAction,
ctx: &RequestContext,
) -> anyhow::Result<&Arc<InMemoryLayer>> {
match action {
OpenLayerAction::Roll => {
let freeze_at = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
self.roll_layer(freeze_at).await?;
self.open_layer(at, ctx).await?;
self.open_layer(at).await?;
}
OpenLayerAction::Open => self.open_layer(at, ctx).await?,
OpenLayerAction::Open => self.open_layer(at).await?,
OpenLayerAction::None => {
assert!(self.write_guard.is_some());
}
@@ -5247,8 +5253,8 @@ impl<'a> TimelineWriter<'a> {
Ok(&self.write_guard.as_ref().unwrap().open_layer)
}
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?;
async fn open_layer(&mut self, at: Lsn) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at).await?;
let initial_size = layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
@@ -5325,14 +5331,10 @@ impl<'a> TimelineWriter<'a> {
Ok(())
}
pub(crate) async fn delete_batch(
&mut self,
batch: &[(Range<Key>, Lsn)],
ctx: &RequestContext,
) -> anyhow::Result<()> {
pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
if let Some((_, lsn)) = batch.first() {
let action = self.get_open_layer_action(*lsn, 0);
let layer = self.handle_open_layer_action(*lsn, action, ctx).await?;
let layer = self.handle_open_layer_action(*lsn, action).await?;
layer.put_tombstones(batch).await?;
}

View File

@@ -295,11 +295,13 @@ impl Timeline {
// Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage
self.rewrite_layers(replace_layers, drop_layers).await?;
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
self.remote_client.wait_completion().await?;
if let Some(remote_client) = self.remote_client.as_ref() {
// We wait for all uploads to complete before finishing this compaction stage. This is not
// necessary for correctness, but it simplifies testing, and avoids proceeding with another
// Timeline's compaction while this timeline's uploads may be generating lots of disk I/O
// load.
remote_client.wait_completion().await?;
}
Ok(())
}
@@ -698,7 +700,6 @@ impl Timeline {
debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
lsn_range.clone()
},
ctx,
)
.await?,
);
@@ -754,7 +755,6 @@ impl Timeline {
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -1093,7 +1093,6 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range.start,
lsn_range.clone(),
ctx,
)
.await?;
@@ -1168,7 +1167,6 @@ impl TimelineAdaptor {
self.timeline.tenant_shard_id,
key_range,
lsn,
ctx,
)
.await?;

View File

@@ -26,21 +26,19 @@ use super::{Timeline, TimelineResources};
/// during attach or pageserver restart.
/// See comment in persist_index_part_with_deleted_flag.
async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
match timeline
.remote_client
.persist_index_part_with_deleted_flag()
.await
{
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
//
// AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents
// two tasks from performing the deletion at the same time. The first task
// that starts deletion should run it to completion.
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
//
// AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents
// two tasks from performing the deletion at the same time. The first task
// that starts deletion should run it to completion.
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
}
}
}
Ok(())
@@ -119,11 +117,11 @@ pub(super) async fn delete_local_timeline_directory(
/// Removes remote layers and an index file after them.
async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
timeline
.remote_client
.delete_all()
.await
.context("delete_all")
if let Some(remote_client) = &timeline.remote_client {
remote_client.delete_all().await.context("delete_all")?
};
Ok(())
}
// This function removs remaining traces of a timeline on disk.
@@ -262,7 +260,7 @@ impl DeleteTimelineFlow {
tenant: Arc<Tenant>,
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: RemoteTimelineClient,
remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: DeletionQueueClient,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.

View File

@@ -70,6 +70,10 @@ pub(super) async fn prepare(
) -> Result<(completion::Completion, PreparedTimelineDetach), Error> {
use Error::*;
if detached.remote_client.as_ref().is_none() {
unimplemented!("no new code for running without remote storage");
}
let Some((ancestor, ancestor_lsn)) = detached
.ancestor_timeline
.as_ref()
@@ -211,7 +215,6 @@ pub(super) async fn prepare(
&detached
.conf
.timeline_path(&detached.tenant_shard_id, &detached.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
@@ -311,6 +314,8 @@ async fn upload_rewritten_layer(
// FIXME: better shuttingdown error
target
.remote_client
.as_ref()
.unwrap()
.upload_layer_file(&copied, cancel)
.await
.map_err(UploadRewritten)?;
@@ -334,7 +339,6 @@ async fn copy_lsn_prefix(
target_timeline.tenant_shard_id,
layer.layer_desc().key_range.start,
layer.layer_desc().lsn_range.start..end_lsn,
ctx,
)
.await
.map_err(CopyDeltaPrefix)?;
@@ -400,6 +404,8 @@ async fn remote_copy(
// FIXME: better shuttingdown error
adoptee
.remote_client
.as_ref()
.unwrap()
.copy_timeline_layer(adopted, &owned, cancel)
.await
.map(move |()| owned)
@@ -413,6 +419,11 @@ pub(super) async fn complete(
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<Vec<TimelineId>, anyhow::Error> {
let rtc = detached
.remote_client
.as_ref()
.expect("has to have a remote timeline client for timeline ancestor detach");
let PreparedTimelineDetach { layers } = prepared;
let ancestor = detached
@@ -429,13 +440,11 @@ pub(super) async fn complete(
//
// this is not perfect, but it avoids us a retry happening after a compaction or gc on restart
// which could give us a completely wrong layer combination.
detached
.remote_client
.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await?;
rtc.schedule_adding_existing_layers_to_index_detach_and_wait(
&layers,
(ancestor.timeline_id, ancestor_lsn),
)
.await?;
let mut tasks = tokio::task::JoinSet::new();
@@ -480,6 +489,8 @@ pub(super) async fn complete(
async move {
let res = timeline
.remote_client
.as_ref()
.expect("reparented has to have remote client because detached has one")
.schedule_reparenting_and_wait(&new_parent)
.await;

View File

@@ -23,7 +23,7 @@ use std::{
use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, info_span, instrument, warn, Instrument};
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
use crate::{
context::{DownloadBehavior, RequestContext},
@@ -211,6 +211,11 @@ impl Timeline {
// So, we just need to deal with this.
if self.remote_client.is_none() {
error!("no remote storage configured, cannot evict layers");
return ControlFlow::Continue(());
}
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read().await;

View File

@@ -9,6 +9,7 @@ use crate::{
storage_layer::LayerName,
Generation,
},
METADATA_FILE_NAME,
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
@@ -26,6 +27,8 @@ pub(super) enum Discovered {
Temporary(String),
/// Temporary on-demand download files, should be removed
TemporaryDownload(String),
/// "metadata" file we persist locally and include in `index_part.json`
Metadata,
/// Backup file from previously future layers
IgnoredBackup,
/// Unrecognized, warn about these
@@ -46,7 +49,9 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovere
Discovered::Layer(file_name, direntry.path().to_owned(), file_size)
}
Err(_) => {
if file_name.ends_with(".old") {
if file_name == METADATA_FILE_NAME {
Discovered::Metadata
} else if file_name.ends_with(".old") {
// ignore these
Discovered::IgnoredBackup
} else if remote_timeline_client::is_temp_download_file(direntry.path()) {

View File

@@ -9,7 +9,6 @@ use utils::{
use crate::{
config::PageServerConf,
context::RequestContext,
metrics::TimelineMetrics,
tenant::{
layer_map::{BatchedUpdates, LayerMap},
@@ -70,7 +69,6 @@ impl LayerManager {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
ctx: &RequestContext,
) -> Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
@@ -107,7 +105,7 @@ impl LayerManager {
);
let new_layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn).await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());

View File

@@ -23,7 +23,6 @@ use pageserver_api::key::Key;
use utils::lsn::Lsn;
use utils::vec_map::VecMap;
use crate::context::RequestContext;
use crate::virtual_file::VirtualFile;
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -286,7 +285,6 @@ impl<'a> VectoredBlobReader<'a> {
&self,
read: &VectoredRead,
buf: BytesMut,
ctx: &RequestContext,
) -> Result<VectoredBlobsBuf, std::io::Error> {
assert!(read.size() > 0);
assert!(
@@ -297,7 +295,7 @@ impl<'a> VectoredBlobReader<'a> {
);
let buf = self
.file
.read_exact_at_n(buf, read.start, read.size(), ctx)
.read_exact_at_n(buf, read.start, read.size())
.await?;
let blobs_at = read.blobs_at.as_slice();

View File

@@ -344,23 +344,16 @@ macro_rules! with_file {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(
path: &Utf8Path,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true), ctx).await
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(
path: &Utf8Path,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
.await
}
@@ -373,7 +366,6 @@ impl VirtualFile {
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
@@ -584,34 +576,21 @@ impl VirtualFile {
Ok(self.pos)
}
pub async fn read_exact_at<B>(
&self,
buf: B,
offset: u64,
ctx: &RequestContext,
) -> Result<B, Error>
pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| {
self.read_at(buf, offset, ctx)
})
.await;
let (buf, res) =
read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await;
res.map(|()| buf)
}
pub async fn read_exact_at_n<B>(
&self,
buf: B,
offset: u64,
count: usize,
ctx: &RequestContext,
) -> Result<B, Error>
pub async fn read_exact_at_n<B>(&self, buf: B, offset: u64, count: usize) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| {
self.read_at(buf, offset, ctx)
self.read_at(buf, offset)
})
.await;
res.map(|()| buf)
@@ -622,13 +601,12 @@ impl VirtualFile {
&self,
page: PageWriteGuard<'static>,
offset: u64,
ctx: &RequestContext,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf {
page,
init_up_to: 0,
};
let res = self.read_exact_at(buf, offset, ctx).await;
let res = self.read_exact_at(buf, offset).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
@@ -721,12 +699,7 @@ impl VirtualFile {
(buf, Ok(n))
}
pub(crate) async fn read_at<B>(
&self,
buf: B,
offset: u64,
_ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */
) -> (B, Result<usize, Error>)
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
@@ -1047,21 +1020,20 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64), ctx)
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
.await?;
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>, ctx: &RequestContext) -> Result<(), Error> {
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let res;
(tmp, res) = self.read_at(tmp, self.pos, ctx).await;
(tmp, res) = self.read_at(tmp, self.pos).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
@@ -1187,6 +1159,7 @@ mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::Rng;
use std::future::Future;
use std::io::Write;
use std::os::unix::fs::FileExt;
use std::sync::Arc;
@@ -1203,14 +1176,9 @@ mod tests {
}
impl MaybeVirtualFile {
async fn read_exact_at(
&self,
mut buf: Vec<u8>,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, Error> {
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset, ctx).await,
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
@@ -1262,13 +1230,13 @@ mod tests {
// Helper function to slurp contents of a file, starting at the current position,
// into a string
async fn read_string(&mut self, ctx: &RequestContext) -> Result<String, Error> {
async fn read_string(&mut self) -> Result<String, Error> {
use std::io::Read;
let mut buf = String::new();
match self {
MaybeVirtualFile::VirtualFile(file) => {
let mut buf = Vec::new();
file.read_to_end(&mut buf, ctx).await?;
file.read_to_end(&mut buf).await?;
return Ok(String::from_utf8(buf).unwrap());
}
MaybeVirtualFile::File(file) => {
@@ -1279,14 +1247,9 @@ mod tests {
}
// Helper function to slurp a portion of a file into a string
async fn read_string_at(
&mut self,
pos: u64,
len: usize,
ctx: &RequestContext,
) -> Result<String, Error> {
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos, ctx).await?;
let buf = self.read_exact_at(buf, pos).await?;
Ok(String::from_utf8(buf).unwrap())
}
}
@@ -1300,101 +1263,73 @@ mod tests {
// results with VirtualFiles as with native Files. (Except that with
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
struct A;
impl Adapter for A {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
let vf = VirtualFile::open_with_options(&path, &opts, ctx).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
}
}
test_files::<A>("virtual_files").await
test_files("virtual_files", |path, open_options| async move {
let vf = VirtualFile::open_with_options(&path, &open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
}
#[tokio::test]
async fn test_physical_files() -> anyhow::Result<()> {
struct B;
impl Adapter for B {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
_ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error> {
Ok(MaybeVirtualFile::File({
let owned_fd = opts.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
}
}
test_files::<B>("physical_files").await
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File({
let owned_fd = open_options.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
})
.await
}
/// This is essentially a closure which returns a MaybeVirtualFile, but because rust edition
/// 2024 is not yet out with new lifetime capture or outlives rules, this is a async function
/// in trait which benefits from the new lifetime capture rules already.
trait Adapter {
async fn open(
path: Utf8PathBuf,
opts: OpenOptions,
ctx: &RequestContext,
) -> Result<MaybeVirtualFile, anyhow::Error>;
}
async fn test_files<A>(testname: &str) -> anyhow::Result<()>
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
where
A: Adapter,
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
std::fs::create_dir_all(&testdir)?;
let path_a = testdir.join("file_a");
let mut file_a = A::open(
let mut file_a = openfunc(
path_a.clone(),
OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.to_owned(),
&ctx,
)
.await?;
file_a.write_all(b"foobar".to_vec(), &ctx).await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string(&ctx).await.unwrap_err();
let _ = file_a.read_string().await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = A::open(path_a, OpenOptions::new().read(true).to_owned(), &ctx).await?;
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar".to_vec(), &ctx).await.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string(&ctx).await?);
assert_eq!("foobar", file_a.read_string().await?);
// It's positioned at the EOF now.
assert_eq!("", file_a.read_string(&ctx).await?);
assert_eq!("", file_a.read_string().await?);
// Test seeks.
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!("oobar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::End(-2)).await?, 4);
assert_eq!("ar", file_a.read_string(&ctx).await?);
assert_eq!("ar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Start(1)).await?, 1);
assert_eq!(file_a.seek(SeekFrom::Current(2)).await?, 3);
assert_eq!("bar", file_a.read_string(&ctx).await?);
assert_eq!("bar", file_a.read_string().await?);
assert_eq!(file_a.seek(SeekFrom::Current(-5)).await?, 1);
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!("oobar", file_a.read_string().await?);
// Test erroneous seeks to before byte 0
file_a.seek(SeekFrom::End(-7)).await.unwrap_err();
@@ -1402,11 +1337,11 @@ mod tests {
file_a.seek(SeekFrom::Current(-2)).await.unwrap_err();
// the erroneous seek should have left the position unchanged
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!("oobar", file_a.read_string().await?);
// Create another test file, and try FileExt functions on it.
let path_b = testdir.join("file_b");
let mut file_b = A::open(
let mut file_b = openfunc(
path_b.clone(),
OpenOptions::new()
.read(true)
@@ -1414,13 +1349,12 @@ mod tests {
.create(true)
.truncate(true)
.to_owned(),
&ctx,
)
.await?;
file_b.write_all_at(b"BAR".to_vec(), 3, &ctx).await?;
file_b.write_all_at(b"FOO".to_vec(), 0, &ctx).await?;
assert_eq!(file_b.read_string_at(2, 3, &ctx).await?, "OBA");
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
// Open a lot of files, enough to cause some evictions. (Or to be precise,
// open the same file many times. The effect is the same.)
@@ -1430,13 +1364,9 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile = A::open(
path_b.clone(),
OpenOptions::new().read(true).to_owned(),
&ctx,
)
.await?;
assert_eq!("FOOBAR", vfile.read_string(&ctx).await?);
let mut vfile =
openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
@@ -1445,13 +1375,13 @@ mod tests {
// The underlying file descriptor for 'file_a' should be closed now. Try to read
// from it again. We left the file positioned at offset 1 above.
assert_eq!("oobar", file_a.read_string(&ctx).await?);
assert_eq!("oobar", file_a.read_string().await?);
// Check that all the other FDs still work too. Use them in random order for
// good measure.
vfiles.as_mut_slice().shuffle(&mut thread_rng());
for vfile in vfiles.iter_mut() {
assert_eq!("OOBAR", vfile.read_string_at(1, 5, &ctx).await?);
assert_eq!("OOBAR", vfile.read_string_at(1, 5).await?);
}
Ok(())
@@ -1467,7 +1397,6 @@ mod tests {
const THREADS: usize = 100;
const SAMPLE: [u8; SIZE] = [0xADu8; SIZE];
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency");
std::fs::create_dir_all(&testdir)?;
@@ -1481,12 +1410,8 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(
&test_file_path,
OpenOptions::new().read(true),
&ctx,
)
.await?;
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
files.push(f);
}
let files = Arc::new(files);
@@ -1500,13 +1425,12 @@ mod tests {
let mut hdls = Vec::new();
for _threadno in 0..THREADS {
let files = files.clone();
let ctx = ctx.detached_child(TaskKind::UnitTest, DownloadBehavior::Error);
let hdl = rt.spawn(async move {
let mut buf = vec![0u8; SIZE];
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
buf = f.read_exact_at(buf, 0, &ctx).await.unwrap();
buf = f.read_exact_at(buf, 0).await.unwrap();
assert!(buf == SAMPLE);
}
});
@@ -1522,7 +1446,6 @@ mod tests {
#[tokio::test]
async fn test_atomic_overwrite_basic() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
std::fs::create_dir_all(&testdir).unwrap();
@@ -1532,8 +1455,8 @@ mod tests {
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
@@ -1541,8 +1464,8 @@ mod tests {
VirtualFile::crashsafe_overwrite(path.clone(), tmp_path.clone(), b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
@@ -1550,7 +1473,6 @@ mod tests {
#[tokio::test]
async fn test_atomic_overwrite_preexisting_tmp() {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let testdir =
crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
std::fs::create_dir_all(&testdir).unwrap();
@@ -1565,8 +1487,8 @@ mod tests {
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path, &ctx).await.unwrap());
let post = file.read_string(&ctx).await.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);

View File

@@ -153,7 +153,10 @@ impl PostgresRedoManager {
process: self
.redo_process
.get()
.map(|p| WalRedoManagerProcessStatus { pid: p.id() }),
.map(|p| WalRedoManagerProcessStatus {
pid: p.id(),
kind: std::borrow::Cow::Borrowed(p.kind().into()),
}),
}
}
}

View File

@@ -1,10 +1,7 @@
/// Layer of indirection previously used to support multiple implementations.
/// Subject to removal: <https://github.com/neondatabase/neon/issues/7753>
use std::time::Duration;
use bytes::Bytes;
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use tracing::warn;
use utils::lsn::Lsn;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
@@ -15,6 +12,7 @@ mod protocol;
mod process_impl {
pub(super) mod process_async;
pub(super) mod process_std;
}
#[derive(
@@ -36,7 +34,10 @@ pub enum Kind {
Async,
}
pub(crate) struct Process(process_impl::process_async::WalRedoProcess);
pub(crate) enum Process {
Sync(process_impl::process_std::WalRedoProcess),
Async(process_impl::process_async::WalRedoProcess),
}
impl Process {
#[inline(always)]
@@ -45,17 +46,18 @@ impl Process {
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
if conf.walredo_process_kind != Kind::Async {
warn!(
configured = %conf.walredo_process_kind,
"the walredo_process_kind setting has been turned into a no-op, using async implementation"
);
}
Ok(Self(process_impl::process_async::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?))
Ok(match conf.walredo_process_kind {
Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?),
Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?),
})
}
#[inline(always)]
@@ -67,12 +69,29 @@ impl Process {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
self.0
.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
.await
match self {
Process::Sync(p) => {
p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
.await
}
Process::Async(p) => {
p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
.await
}
}
}
pub(crate) fn id(&self) -> u32 {
self.0.id()
match self {
Process::Sync(p) => p.id(),
Process::Async(p) => p.id(),
}
}
pub(crate) fn kind(&self) -> Kind {
match self {
Process::Sync(_) => Kind::Sync,
Process::Async(_) => Kind::Async,
}
}
}

View File

@@ -0,0 +1,405 @@
use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf,
metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
walrecord::NeonWalRecord,
walredo::process::{no_leak_child, protocol},
};
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
io::{Read, Write},
process::{ChildStdin, ChildStdout, Command, Stdio},
sync::{Mutex, MutexGuard},
time::Duration,
};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, nonblock::set_nonblock};
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
}
struct ProcessInput {
stdin: ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
use no_leak_child::NoLeakChildCommandExt;
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
// NB: The redo process is not trusted after we sent it the first
// walredo work. Before that, it is trusted. Specifically, we trust
// it to
// 1. close all file descriptors except stdin, stdout, stderr because
// pageserver might not be 100% diligent in setting FD_CLOEXEC on all
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait(WalRedoKillCause::Startup);
});
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
}
pub(crate) fn id(&self) -> u32 {
self.child
.as_ref()
.expect("must not call this during Drop")
.id()
}
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) async fn apply_wal_records(
&self,
rel: RelTag,
blknum: u32,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let tag = protocol::BufferTag { rel, blknum };
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
protocol::build_push_page_msg(tag, img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
}
}
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
// in tests so capture all.
self.record_and_log(&writebuf);
}
res
}
fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
Ok(res)
}
#[cfg(feature = "testing")]
fn record_and_log(&self, writebuf: &[u8]) {
use std::sync::atomic::Ordering;
let millis = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors
if let Err(e) = res {
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
} else {
tracing::error!(filename, "erroring walredo input saved");
}
}
#[cfg(not(feature = "testing"))]
fn record_and_log(&self, _: &[u8]) {}
}
impl Drop for WalRedoProcess {
fn drop(&mut self) {
self.child
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
// no way to wait for stderr_logger_task from Drop because that is async only
}
}

View File

@@ -49,7 +49,7 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version = 1;
int neon_protocol_version = 2;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
@@ -860,7 +860,7 @@ pg_init_libpagestore(void)
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
1, /* default to old protocol for now */
2, /* use protocol version 2 */
1, /* min */
2, /* max */
PGC_SU_BACKEND,

View File

@@ -237,50 +237,18 @@ extern void neon_zeroextend(SMgrRelation reln, ForkNumber forknum,
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
/*
* LSN values associated with each request to the pageserver
*/
typedef struct
{
/*
* 'request_lsn' is the main value that determines which page version to
* fetch.
*/
XLogRecPtr request_lsn;
/*
* A hint to the pageserver that the requested page hasn't been modified
* between this LSN and 'request_lsn'. That allows the pageserver to
* return the page faster, without waiting for 'request_lsn' to arrive in
* the pageserver, as long as 'not_modified_since' has arrived.
*/
XLogRecPtr not_modified_since;
/*
* 'effective_request_lsn' is not included in the request that's sent to
* the pageserver, but is used to keep track of the latest LSN of when the
* request was made. In a standby server, this is always the same as the
* 'request_lsn', but in the primary we use UINT64_MAX as the
* 'request_lsn' to request the latest page version, so we need this
* separate field to remember that latest LSN was when the request was
* made. It's needed to manage prefetch request, to verify if the response
* to a prefetched request is still valid.
*/
XLogRecPtr effective_request_lsn;
} neon_request_lsns;
#if PG_MAJORVERSION_NUM < 16
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
#else
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void *buffer);
extern PGDLLEXPORT void neon_read_at_lsn(NRelFileInfo rnode, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
extern void neon_write(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, const void *buffer, bool skipFsync);
#endif

View File

@@ -168,7 +168,8 @@ typedef enum PrefetchStatus
typedef struct PrefetchRequest
{
BufferTag buftag; /* must be first entry in the struct */
neon_request_lsns request_lsns;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
NeonResponse *response; /* may be null */
PrefetchStatus status;
shardno_t shard_no;
@@ -270,15 +271,16 @@ static PrefetchState *MyPState;
static bool compact_prefetch_buffers(void);
static void consume_prefetch_responses(void);
static uint64 prefetch_register_buffer(BufferTag tag, neon_request_lsns *force_request_lsns);
static uint64 prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
static bool prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns);
static void prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since);
static bool prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup_trailing_unused(void);
static inline void prefetch_set_unused(uint64 ring_index);
static neon_request_lsns neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno);
static bool neon_prefetch_response_usable(neon_request_lsns request_lsns,
static void neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since);
static bool neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
PrefetchRequest *slot);
static bool
@@ -336,7 +338,8 @@ compact_prefetch_buffers(void)
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->response = source_slot->response;
target_slot->request_lsns = source_slot->request_lsns;
target_slot->request_lsn = source_slot->request_lsn;
target_slot->not_modified_since = source_slot->not_modified_since;
target_slot->my_ring_index = empty_ring_index;
prfh_delete(MyPState->prf_hash, source_slot);
@@ -355,9 +358,8 @@ compact_prefetch_buffers(void)
};
source_slot->response = NULL;
source_slot->my_ring_index = 0;
source_slot->request_lsns = (neon_request_lsns) {
InvalidXLogRecPtr, InvalidXLogRecPtr, InvalidXLogRecPtr
};
source_slot->request_lsn = InvalidXLogRecPtr;
source_slot->not_modified_since = InvalidXLogRecPtr;
/* update bookkeeping */
n_moved++;
@@ -687,7 +689,7 @@ prefetch_set_unused(uint64 ring_index)
* prefetch_wait_for().
*/
static void
prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns)
prefetch_do_request(PrefetchRequest *slot, XLogRecPtr *force_request_lsn, XLogRecPtr *force_not_modified_since)
{
bool found;
NeonGetPageRequest request = {
@@ -698,14 +700,23 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
.blkno = slot->buftag.blockNum,
};
if (force_request_lsns)
slot->request_lsns = *force_request_lsns;
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
if (force_request_lsn)
{
request.req.lsn = *force_request_lsn;
request.req.not_modified_since = *force_not_modified_since;
}
else
slot->request_lsns = neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum);
request.req.lsn = slot->request_lsns.request_lsn;
request.req.not_modified_since = slot->request_lsns.not_modified_since;
{
neon_get_request_lsn(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum,
&request.req.lsn,
&request.req.not_modified_since);
}
slot->request_lsn = request.req.lsn;
slot->not_modified_since = request.req.not_modified_since;
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
@@ -731,22 +742,25 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
*
* Register that we may want the contents of BufferTag in the near future.
*
* If force_request_lsns is not NULL, those values are sent to the
* pageserver. If NULL, we utilize the lastWrittenLsn -infrastructure
* to calculate the LSNs to send.
* If force_request_lsn and force_not_modified_since are not NULL, those
* values are sent to the pageserver. If they are NULL, we utilize the
* lastWrittenLsn -infrastructure to fill them in.
*
* NOTE: this function may indirectly update MyPState->pfs_hash; which
* invalidates any active pointers into the hash table.
*/
static uint64
prefetch_register_buffer(BufferTag tag, neon_request_lsns *force_request_lsns)
prefetch_register_buffer(BufferTag tag, XLogRecPtr *force_request_lsn,
XLogRecPtr *force_not_modified_since)
{
uint64 ring_index;
PrefetchRequest req;
PrefetchRequest *slot;
PrfHashEntry *entry;
Assert(((force_request_lsn != NULL) == (force_not_modified_since != NULL)));
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
Retry:
@@ -767,9 +781,10 @@ Retry:
* If the caller specified a request LSN to use, only accept prefetch
* responses that satisfy that request.
*/
if (force_request_lsns)
if (force_request_lsn)
{
if (!neon_prefetch_response_usable(*force_request_lsns, slot))
if (!neon_prefetch_response_usable(*force_request_lsn,
*force_not_modified_since, slot))
{
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(ring_index))
@@ -871,7 +886,7 @@ Retry:
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
prefetch_do_request(slot, force_request_lsns);
prefetch_do_request(slot, force_request_lsn, force_not_modified_since);
Assert(slot->status == PRFS_REQUESTED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
@@ -1514,11 +1529,11 @@ nm_adjust_lsn(XLogRecPtr lsn)
/*
* Return LSN for requesting pages and number of blocks from page server
*/
static neon_request_lsns
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
static void
neon_get_request_lsn(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
XLogRecPtr *request_lsn, XLogRecPtr *not_modified_since)
{
XLogRecPtr last_written_lsn;
neon_request_lsns result;
last_written_lsn = GetLastWrittenLSN(rinfo, forknum, blkno);
last_written_lsn = nm_adjust_lsn(last_written_lsn);
@@ -1527,13 +1542,12 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
if (RecoveryInProgress())
{
/* Request the page at the last replayed LSN. */
result.request_lsn = GetXLogReplayRecPtr(NULL);
result.not_modified_since = last_written_lsn;
result.effective_request_lsn = result.request_lsn;
Assert(last_written_lsn <= result.request_lsn);
*request_lsn = GetXLogReplayRecPtr(NULL);
*not_modified_since = last_written_lsn;
Assert(last_written_lsn <= *request_lsn);
neon_log(DEBUG1, "neon_get_request_lsns request lsn %X/%X, not_modified_since %X/%X",
LSN_FORMAT_ARGS(result.request_lsn), LSN_FORMAT_ARGS(result.not_modified_since));
neon_log(DEBUG1, "neon_get_request_lsn request lsn %X/%X, not_modified_since %X/%X",
LSN_FORMAT_ARGS(*request_lsn), LSN_FORMAT_ARGS(*not_modified_since));
}
else
{
@@ -1545,7 +1559,7 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
* must still in the buffer cache, so our request cannot concern
* those.
*/
neon_log(DEBUG1, "neon_get_request_lsns GetLastWrittenLSN lsn %X/%X",
neon_log(DEBUG1, "neon_get_request_lsn GetLastWrittenLSN lsn %X/%X ",
LSN_FORMAT_ARGS(last_written_lsn));
/*
@@ -1571,33 +1585,16 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
}
/*
* Request the very latest version of the page. In principle we
* want to read the page at the current insert LSN, and we could
* use that value in the request. However, there's a corner case
* with pageserver's garbage collection. If the GC horizon is
* set to a very small value, it's possible that by the time
* that the pageserver processes our request, the GC horizon has
* already moved past the LSN we calculate here. Standby servers
* always have that problem as the can always lag behind the
* primary, but for the primary we can avoid it by always
* requesting the latest page, by setting request LSN to
* UINT64_MAX.
*
* Remember the current LSN, however, so that we can later
* correctly determine if the response to the request is still
* valid. The most up-to-date LSN we could use for that purpose
* would be the current insert LSN, but to avoid the overhead of
* looking it up, use 'flushlsn' instead. This relies on the
* assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we
* wouldn't be requesting it.
* Request the latest version of the page. The most up-to-date request
* LSN we could use would be the current insert LSN, but to avoid the
* overhead of looking it up, use 'flushlsn' instead. This relies on
* the assumption that if the page was modified since the last WAL
* flush, it should still be in the buffer cache, and we wouldn't be
* requesting it.
*/
result.request_lsn = UINT64_MAX;
result.not_modified_since = last_written_lsn;
result.effective_request_lsn = flushlsn;
*request_lsn = flushlsn;
*not_modified_since = last_written_lsn;
}
return result;
}
/*
@@ -1607,16 +1604,12 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno)
* satisfy a page read now.
*/
static bool
neon_prefetch_response_usable(neon_request_lsns request_lsns,
neon_prefetch_response_usable(XLogRecPtr request_lsn, XLogRecPtr not_modified_since,
PrefetchRequest *slot)
{
/* sanity check the LSN's on the old and the new request */
Assert(request_lsns.request_lsn >= request_lsns.not_modified_since);
Assert(request_lsns.effective_request_lsn >= request_lsns.not_modified_since);
Assert(request_lsns.effective_request_lsn <= request_lsns.request_lsn);
Assert(slot->request_lsns.request_lsn >= slot->request_lsns.not_modified_since);
Assert(slot->request_lsns.effective_request_lsn >= slot->request_lsns.not_modified_since);
Assert(slot->request_lsns.effective_request_lsn <= slot->request_lsns.request_lsn);
Assert(request_lsn >= not_modified_since);
Assert(slot->request_lsn >= slot->not_modified_since);
Assert(slot->status != PRFS_UNUSED);
/*
@@ -1634,40 +1627,26 @@ neon_prefetch_response_usable(neon_request_lsns request_lsns,
* calculate LSNs "out of order" with each other, but the prefetch queue
* is backend-private at the moment.)
*/
if (request_lsns.effective_request_lsn < slot->request_lsns.effective_request_lsn ||
request_lsns.not_modified_since < slot->request_lsns.not_modified_since)
if (request_lsn < slot->request_lsn || not_modified_since < slot->not_modified_since)
{
ereport(LOG,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "request with unexpected LSN after prefetch"),
errdetail("Request %X/%X not_modified_since %X/%X, prefetch %X/%X not_modified_since %X/%X)",
LSN_FORMAT_ARGS(request_lsns.effective_request_lsn),
LSN_FORMAT_ARGS(request_lsns.not_modified_since),
LSN_FORMAT_ARGS(slot->request_lsns.effective_request_lsn),
LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since))));
LSN_FORMAT_ARGS(request_lsn), LSN_FORMAT_ARGS(not_modified_since),
LSN_FORMAT_ARGS(slot->request_lsn), LSN_FORMAT_ARGS(slot->not_modified_since))));
return false;
}
/*---
* Each request to the pageserver has three LSN values associated with it:
* `not_modified_since`, `request_lsn`, and 'effective_request_lsn'.
* `not_modified_since` and `request_lsn` are sent to the pageserver, but
* in the primary node, we always use UINT64_MAX as the `request_lsn`, so
* we remember `effective_request_lsn` separately. In a primary,
* `effective_request_lsn` is the last flush WAL position when the request
* was sent to the pageserver. That's logically the LSN that we are
* requesting the page at, but we send UINT64_MAX to the pageserver so
* that if the GC horizon advances past that position, we still get a
* valid response instead of an error.
*
* To determine whether a response to a GetPage request issued earlier is
* still valid to satisfy a new page read, we look at the
* (not_modified_since, effective_request_lsn] range of the request. It is
* effectively a claim that the page has not been modified between those
* LSNs. If the range of the old request in the queue overlaps with the
* new request, we know that the page hasn't been modified in the union of
* the ranges. We can use the response to old request to satisfy the new
* request in that case. For example:
* Each request to the pageserver carries two LSN values:
* `not_modified_since` and `request_lsn`. The (not_modified_since,
* request_lsn] range of each request is effectively a claim that the page
* has not been modified between those LSNs. If the range of the old
* request in the queue overlaps with the new request, we know that the
* page hasn't been modified in the union of the ranges. We can use the
* response to old request to satisfy the new request in that case. For
* example:
*
* 100 500
* Old request: +--------+
@@ -1696,9 +1675,9 @@ neon_prefetch_response_usable(neon_request_lsns request_lsns,
*/
/* this follows from the checks above */
Assert(request_lsns.effective_request_lsn >= slot->request_lsns.not_modified_since);
Assert(request_lsn >= slot->not_modified_since);
return request_lsns.not_modified_since <= slot->request_lsns.effective_request_lsn;
return not_modified_since <= slot->request_lsn;
}
/*
@@ -1710,7 +1689,8 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
bool exists;
NeonResponse *resp;
BlockNumber n_blocks;
neon_request_lsns request_lsns;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -1765,15 +1745,15 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
return false;
}
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,
.req.lsn = request_lsns.request_lsn,
.req.not_modified_since = request_lsns.not_modified_since,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forkNum
};
.forknum = forkNum};
resp = page_server_request(&request);
}
@@ -1790,7 +1770,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forkNum,
LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)),
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
@@ -2155,7 +2135,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
CopyNRelFileInfoToBufTag(tag, InfoFromSMgrRel(reln));
ring_index = prefetch_register_buffer(tag, NULL);
ring_index = prefetch_register_buffer(tag, NULL, NULL);
Assert(ring_index < MyPState->ring_unused &&
MyPState->ring_last <= ring_index);
@@ -2208,10 +2188,10 @@ neon_writeback(SMgrRelation reln, ForkNumber forknum,
void
#if PG_MAJORVERSION_NUM < 16
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer)
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer)
#else
neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer)
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer)
#endif
{
NeonResponse *resp;
@@ -2243,7 +2223,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* value of the LwLsn cache when the entry is not found.
*/
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
XLogWaitForReplayOf(request_lsns.request_lsn);
XLogWaitForReplayOf(request_lsn);
/*
* Try to find prefetched page in the list of received pages.
@@ -2254,7 +2234,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
if (entry != NULL)
{
slot = entry->slot;
if (neon_prefetch_response_usable(request_lsns, slot))
if (neon_prefetch_response_usable(request_lsn, not_modified_since, slot))
{
ring_index = slot->my_ring_index;
pgBufferUsage.prefetch.hits += 1;
@@ -2288,7 +2268,8 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
{
pgBufferUsage.prefetch.misses += 1;
ring_index = prefetch_register_buffer(buftag, &request_lsns);
ring_index = prefetch_register_buffer(buftag, &request_lsn,
&not_modified_since);
slot = GetPrfSlot(ring_index);
}
else
@@ -2329,7 +2310,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
slot->shard_no, blkno,
RelFileInfoFmt(rinfo),
forkNum,
LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)),
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
@@ -2352,7 +2333,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, char *buffer
neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer)
#endif
{
neon_request_lsns request_lsns;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -2377,8 +2359,9 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return;
}
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
neon_get_request_lsn(InfoFromSMgrRel(reln), forkNum, blkno,
&request_lsn, &not_modified_since);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsn, not_modified_since, buffer);
#ifdef DEBUG_COMPARE_LOCAL
if (forkNum == MAIN_FORKNUM && IS_LOCAL_REL(reln))
@@ -2547,7 +2530,8 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
{
NeonResponse *resp;
BlockNumber n_blocks;
neon_request_lsns request_lsns;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
switch (reln->smgr_relpersistence)
{
@@ -2574,12 +2558,13 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
return n_blocks;
}
request_lsns = neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(InfoFromSMgrRel(reln), forknum, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest,
.req.lsn = request_lsns.request_lsn,
.req.not_modified_since = request_lsns.not_modified_since,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forknum,
};
@@ -2599,7 +2584,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)),
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
@@ -2610,10 +2595,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
LSN_FORMAT_ARGS(request_lsns.effective_request_lsn),
n_blocks);
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
n_blocks);
pfree(resp);
return n_blocks;
@@ -2627,15 +2612,17 @@ neon_dbsize(Oid dbNode)
{
NeonResponse *resp;
int64 db_size;
neon_request_lsns request_lsns;
XLogRecPtr request_lsn,
not_modified_since;
NRelFileInfo dummy_node = {0};
request_lsns = neon_get_request_lsns(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO);
neon_get_request_lsn(dummy_node, MAIN_FORKNUM, REL_METADATA_PSEUDO_BLOCKNO,
&request_lsn, &not_modified_since);
{
NeonDbSizeRequest request = {
.req.tag = T_NeonDbSizeRequest,
.req.lsn = request_lsns.request_lsn,
.req.not_modified_since = request_lsns.not_modified_since,
.req.lsn = request_lsn,
.req.not_modified_since = not_modified_since,
.dbNode = dbNode,
};
@@ -2652,7 +2639,8 @@ neon_dbsize(Oid dbNode)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X",
dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)),
dbNode,
(uint32) (request_lsn >> 32), (uint32) request_lsn),
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
@@ -2662,7 +2650,9 @@ neon_dbsize(Oid dbNode)
}
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size);
dbNode,
(uint32) (request_lsn >> 32), (uint32) request_lsn,
db_size);
pfree(resp);
return db_size;
@@ -2907,10 +2897,6 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
XLogRecPtr request_lsn,
not_modified_since;
/*
* Compute a request LSN to use, similar to neon_get_request_lsns() but the
* logic is a bit simpler.
*/
if (RecoveryInProgress())
{
request_lsn = GetXLogReplayRecPtr(NULL);
@@ -2922,10 +2908,10 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
*/
request_lsn = GetRedoStartLsn();
}
request_lsn = nm_adjust_lsn(request_lsn);
}
else
request_lsn = UINT64_MAX;
request_lsn = GetXLogInsertRecPtr();
request_lsn = nm_adjust_lsn(request_lsn);
/*
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU

View File

@@ -48,10 +48,10 @@ PG_FUNCTION_INFO_V1(neon_xlogflush);
*/
#if PG_MAJORVERSION_NUM < 16
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, char *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, char *buffer);
#else
typedef void (*neon_read_at_lsn_type) (NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
neon_request_lsns request_lsns, void *buffer);
XLogRecPtr request_lsn, XLogRecPtr not_modified_since, void *buffer);
#endif
static neon_read_at_lsn_type neon_read_at_lsn_ptr;
@@ -298,7 +298,9 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
text *relname;
text *forkname;
uint32 blkno;
neon_request_lsns request_lsns;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
if (PG_NARGS() != 5)
elog(ERROR, "unexpected number of arguments in SQL function signature");
@@ -310,15 +312,8 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
forkname = PG_GETARG_TEXT_PP(1);
blkno = PG_GETARG_UINT32(2);
request_lsns.request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
request_lsns.not_modified_since = PG_ARGISNULL(4) ? request_lsns.request_lsn : PG_GETARG_LSN(4);
/*
* For the time being, use the same LSN for request and
* effective request LSN. If any test needed to use UINT64_MAX
* as the request LSN, we'd need to add effective_request_lsn
* as a new argument.
*/
request_lsns.effective_request_lsn = request_lsns.request_lsn;
request_lsn = PG_ARGISNULL(3) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(3);
not_modified_since = PG_ARGISNULL(4) ? request_lsn : PG_GETARG_LSN(4);
if (!superuser())
ereport(ERROR,
@@ -372,8 +367,7 @@ get_raw_page_at_lsn(PG_FUNCTION_ARGS)
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsns,
raw_page_data);
neon_read_at_lsn(InfoFromRelation(rel), forknum, blkno, request_lsn, not_modified_since, raw_page_data);
relation_close(rel, AccessShareLock);
@@ -419,25 +413,19 @@ get_raw_page_at_lsn_ex(PG_FUNCTION_ARGS)
ForkNumber forknum = PG_GETARG_UINT32(3);
uint32 blkno = PG_GETARG_UINT32(4);
neon_request_lsns request_lsns;
XLogRecPtr request_lsn;
XLogRecPtr not_modified_since;
/* Initialize buffer to copy to */
bytea *raw_page = (bytea *) palloc(BLCKSZ + VARHDRSZ);
request_lsns.request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
request_lsns.not_modified_since = PG_ARGISNULL(6) ? request_lsns.request_lsn : PG_GETARG_LSN(6);
/*
* For the time being, use the same LSN for request
* and effective request LSN. If any test needed to
* use UINT64_MAX as the request LSN, we'd need to add
* effective_request_lsn as a new argument.
*/
request_lsns.effective_request_lsn = request_lsns.request_lsn;
request_lsn = PG_ARGISNULL(5) ? GetXLogInsertRecPtr() : PG_GETARG_LSN(5);
not_modified_since = PG_ARGISNULL(6) ? request_lsn : PG_GETARG_LSN(6);
SET_VARSIZE(raw_page, BLCKSZ + VARHDRSZ);
raw_page_data = VARDATA(raw_page);
neon_read_at_lsn(rinfo, forknum, blkno, request_lsns, raw_page_data);
neon_read_at_lsn(rinfo, forknum, blkno, request_lsn, not_modified_since, raw_page_data);
PG_RETURN_BYTEA_P(raw_page);
}
}

View File

@@ -268,6 +268,10 @@ async fn main() -> anyhow::Result<()> {
build_tag: BUILD_TAG,
});
// add the current runtime to the collector
#[cfg(tokio_unstable)]
neon_metrics.tokio.add_current("proxy");
let jemalloc = match proxy::jemalloc::MetricRecorder::new() {
Ok(t) => Some(t),
Err(e) => {

View File

@@ -1,18 +0,0 @@
#!/usr/bin/env bash
set -eu
HELPER_DIR="$(dirname "${BASH_SOURCE[0]}")"
SCRIPT="test_runner/fixtures/pageserver/allowed_errors.py"
# first run to understand all of the errors:
#
# example: ./scripts/check_allowed_errors.sh -i - < pageserver.log
# example: ./scripts/check_allowed_errors.sh -i pageserver.log
#
# then edit the test local allowed_errors to the
# test_runner/fixtures/pageserver/allowed_errors.py, then re-run to make sure
# they are handled.
#
# finally revert any local changes to allowed_errors.py.
poetry run python3 "$HELPER_DIR/../$SCRIPT" $*

View File

@@ -5,11 +5,10 @@ import json
import logging
import os
from collections import defaultdict
from typing import Any, DefaultDict, Dict, Optional
from typing import DefaultDict, Dict
import psycopg2
import psycopg2.extras
import toml
FLAKY_TESTS_QUERY = """
SELECT
@@ -59,24 +58,6 @@ def main(args: argparse.Namespace):
else:
pageserver_virtual_file_io_engine_parameter = ""
# re-use existing records of flaky tests from before parametrization by compaction_algorithm
def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
"""Duplicated from parametrize.py"""
toml_table = os.getenv("PAGESERVER_DEFAULT_TENANT_CONFIG_COMPACTION_ALGORITHM")
if toml_table is None:
return None
v = toml.loads(toml_table)
assert isinstance(v, dict)
return v
pageserver_default_tenant_config_compaction_algorithm_parameter = ""
if (
explicit_default := get_pageserver_default_tenant_config_compaction_algorithm()
) is not None:
pageserver_default_tenant_config_compaction_algorithm_parameter = (
f"-{explicit_default['kind']}"
)
for row in rows:
# We don't want to automatically rerun tests in a performance suite
if row["parent_suite"] != "test_runner.regress":
@@ -85,10 +66,10 @@ def main(args: argparse.Namespace):
if row["name"].endswith("]"):
parametrized_test = row["name"].replace(
"[",
f"[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}{pageserver_default_tenant_config_compaction_algorithm_parameter}-",
f"[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}-",
)
else:
parametrized_test = f"{row['name']}[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}{pageserver_default_tenant_config_compaction_algorithm_parameter}]"
parametrized_test = f"{row['name']}[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}]"
res[row["parent_suite"]][row["suite"]][parametrized_test] = True

View File

@@ -66,10 +66,6 @@ struct Cli {
#[arg(long)]
max_unavailable_interval: Option<humantime::Duration>,
/// Size threshold for automatically splitting shards (disabled by default)
#[arg(long)]
split_threshold: Option<u64>,
/// Maximum number of reconcilers that may run in parallel
#[arg(long)]
reconciler_concurrency: Option<usize>,
@@ -259,7 +255,6 @@ async fn async_main() -> anyhow::Result<()> {
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -2,7 +2,7 @@ use pageserver_api::{
models::{
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
TenantScanRemoteStorageResponse, TenantShardSplitRequest, TenantShardSplitResponse,
TimelineCreateRequest, TimelineInfo, TopTenantShardsRequest, TopTenantShardsResponse,
TimelineCreateRequest, TimelineInfo,
},
shard::TenantShardId,
};
@@ -234,16 +234,4 @@ impl PageserverClient {
self.inner.get_utilization().await
)
}
pub(crate) async fn top_tenant_shards(
&self,
request: TopTenantShardsRequest,
) -> Result<TopTenantShardsResponse> {
measured_request!(
"top_tenants",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.top_tenant_shards(request).await
)
}
}

View File

@@ -32,10 +32,10 @@ use pageserver_api::{
TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
models::{SecondaryProgress, TenantConfigRequest},
};
use reqwest::StatusCode;
use tracing::{instrument, Instrument};
use tracing::instrument;
use crate::pageserver_client::PageserverClient;
use pageserver_api::{
@@ -222,10 +222,6 @@ pub struct Config {
/// How many Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
/// How large must a shard grow in bytes before we split it?
/// None disables auto-splitting.
pub split_threshold: Option<u64>,
}
impl From<DatabaseError> for ApiError {
@@ -703,7 +699,7 @@ impl Service {
/// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
/// for those retries.
#[instrument(skip_all)]
async fn background_reconcile(self: &Arc<Self>) {
async fn background_reconcile(&self) {
self.startup_complete.clone().wait().await;
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
@@ -715,11 +711,7 @@ impl Service {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
// Run optimizer only when we didn't find any other work to do
let optimizations = self.optimize_all().await;
if optimizations == 0 {
// Run new splits only when no optimizations are pending
self.autosplit_tenants().await;
}
self.optimize_all().await;
}
}
_ = self.cancel.cancelled() => return
@@ -4774,104 +4766,6 @@ impl Service {
validated_work
}
/// Look for shards which are oversized and in need of splitting
async fn autosplit_tenants(self: &Arc<Self>) {
let Some(split_threshold) = self.config.split_threshold else {
// Auto-splitting is disabled
return;
};
let nodes = self.inner.read().unwrap().nodes.clone();
const SPLIT_TO_MAX: ShardCount = ShardCount::new(8);
let mut top_n = Vec::new();
// Call into each node to look for big tenants
let top_n_request = TopTenantShardsRequest {
// We currently split based on logical size, for simplicity: logical size is a signal of
// the user's intent to run a large database, whereas physical/resident size can be symptoms
// of compaction issues. Eventually we should switch to using resident size to bound the
// disk space impact of one shard.
order_by: models::TenantSorting::MaxLogicalSize,
limit: 10,
where_shards_lt: Some(SPLIT_TO_MAX),
where_gt: Some(split_threshold),
};
for node in nodes.values() {
let request_ref = &top_n_request;
match node
.with_client_retries(
|client| async move {
let request = request_ref.clone();
client.top_tenant_shards(request.clone()).await
},
&self.config.jwt_token,
3,
3,
Duration::from_secs(5),
&self.cancel,
)
.await
{
Some(Ok(node_top_n)) => {
top_n.extend(node_top_n.shards.into_iter());
}
Some(Err(mgmt_api::Error::Cancelled)) => {
continue;
}
Some(Err(e)) => {
tracing::warn!("Failed to fetch top N tenants from {node}: {e}");
continue;
}
None => {
// Node is shutting down
continue;
}
};
}
// Pick the biggest tenant to split first
top_n.sort_by_key(|i| i.resident_size);
let Some(split_candidate) = top_n.into_iter().next() else {
tracing::debug!("No split-elegible shards found");
return;
};
// We spawn a task to run this, so it's exactly like some external API client requesting it. We don't
// want to block the background reconcile loop on this.
tracing::info!("Auto-splitting tenant for size threshold {split_threshold}: current size {split_candidate:?}");
let this = self.clone();
tokio::spawn(
async move {
match this
.tenant_shard_split(
split_candidate.id.tenant_id,
TenantShardSplitRequest {
// Always split to the max number of shards: this avoids stepping through
// intervening shard counts and encountering the overrhead of a split+cleanup
// each time as a tenant grows, and is not too expensive because our max shard
// count is relatively low anyway.
// This policy will be adjusted in future once we support higher shard count.
new_shard_count: SPLIT_TO_MAX.literal(),
new_stripe_size: Some(ShardParameters::DEFAULT_STRIPE_SIZE),
},
)
.await
{
Ok(_) => {
tracing::info!("Successful auto-split");
}
Err(e) => {
tracing::error!("Auto-split failed: {e}");
}
}
}
.instrument(tracing::info_span!("auto_split", tenant_id=%split_candidate.id.tenant_id)),
);
}
/// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
/// put the system into a quiescent state where future background reconciliations won't do anything.

View File

@@ -19,9 +19,9 @@ from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from _pytest.terminal import TerminalReporter
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
from fixtures.types import TenantId, TimelineId
"""
This file contains fixtures for micro-benchmarks.

View File

@@ -5,8 +5,8 @@ import pytest
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
from fixtures.common_types import TenantId
from fixtures.log_helper import log
from fixtures.types import TenantId
class ComputeReconfigure:

View File

@@ -149,7 +149,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_storage_operations_seconds_sum_total",
"pageserver_evictions_total",
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold
# "pageserver_broken_tenants_count" -- used only for broken

View File

@@ -47,19 +47,17 @@ from urllib3.util.retry import Retry
from fixtures import overlayfs
from fixtures.broker import NeonBroker
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pageserver.allowed_errors import (
DEFAULT_PAGESERVER_ALLOWED_ERRORS,
DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS,
)
from fixtures.pageserver.common_types import IndexPartDump, LayerName, parse_layer_file_name
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.types import IndexPartDump, LayerName, parse_layer_file_name
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
@@ -74,13 +72,13 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
allure_add_grafana_links,
allure_attach_from_dir,
assert_no_errors,
get_self_dir,
print_gc_result,
subprocess_capture,
wait_until,
)
@@ -469,7 +467,6 @@ class NeonEnvBuilder:
initial_timeline: Optional[TimelineId] = None,
pageserver_virtual_file_io_engine: Optional[str] = None,
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -510,14 +507,6 @@ class NeonEnvBuilder:
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
self.pageserver_default_tenant_config_compaction_algorithm: Optional[
Dict[str, Any]
] = pageserver_default_tenant_config_compaction_algorithm
if self.pageserver_default_tenant_config_compaction_algorithm is not None:
log.debug(
f"Overriding pageserver default compaction algorithm to {self.pageserver_default_tenant_config_compaction_algorithm}"
)
self.pageserver_get_vectored_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored":
self.pageserver_get_vectored_impl = "vectored"
@@ -713,7 +702,8 @@ class NeonEnvBuilder:
config["branch_name_mappings"] = snapshot_config["branch_name_mappings"]
# Update the config with new neon + postgres path in case of compat test
config["pg_distrib_dir"] = str(self.pg_distrib_dir)
# FIXME: overriding pg_distrib_dir cause storage controller fail to start
# config["pg_distrib_dir"] = str(self.pg_distrib_dir)
config["neon_distrib_dir"] = str(self.neon_binpath)
with (self.repo_dir / "config").open("w") as f:
@@ -1114,11 +1104,6 @@ class NeonEnv:
ps_cfg["get_impl"] = config.pageserver_get_impl
if config.pageserver_validate_vectored_get is not None:
ps_cfg["validate_vectored_get"] = config.pageserver_validate_vectored_get
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config[
"compaction_algorithm"
] = config.pageserver_default_tenant_config_compaction_algorithm
if self.pageserver_remote_storage is not None:
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(
@@ -1320,7 +1305,6 @@ def _shared_simple_env(
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
@@ -1352,7 +1336,6 @@ def _shared_simple_env(
test_output_dir=test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
) as builder:
env = builder.init_start()
@@ -1392,8 +1375,7 @@ def neon_env_builder(
test_overlay_dir: Path,
top_output_dir: Path,
pageserver_virtual_file_io_engine: str,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_aux_file_policy: Optional[AuxFileStore],
pageserver_aux_file_policy: Optional[AuxFileStore] = None,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1428,7 +1410,6 @@ def neon_env_builder(
test_output_dir=test_output_dir,
test_overlay_dir=test_overlay_dir,
pageserver_aux_file_policy=pageserver_aux_file_policy,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
) as builder:
yield builder
@@ -2344,10 +2325,11 @@ class NeonStorageController(MetricsGetter, LogUtils):
log.info(f"reconcile_all waited for {n} shards")
return n
def reconcile_until_idle(self, timeout_secs=30, delay_max=5):
def reconcile_until_idle(self, timeout_secs=30):
start_at = time.time()
n = 1
delay_sec = 0.5
delay_max = 5
while n > 0:
n = self.reconcile_all()
if n == 0:
@@ -4420,79 +4402,3 @@ def parse_project_git_version_output(s: str) -> str:
return commit
raise ValueError(f"unable to parse --version output: '{s}'")
def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
pageserver: NeonPageserver,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""
if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None
if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None
ps_http = pageserver.http_client()
with env.endpoints.create_start(
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)
def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 200) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
)
# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
# Stop endpoint and flush all data to pageserver, then checkpoint it: this
# ensures that the pageserver is in a fully idle state: there will be no more
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Finish uploads
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
# Finish all remote writes (including deletions)
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)

View File

@@ -131,10 +131,9 @@ if __name__ == "__main__":
"-i",
"--input",
type=argparse.FileType("r"),
help="Pageserver logs file. Use '-' for stdin.",
required=True,
default=sys.stdin,
help="Pageserver logs file. Reads from stdin if no file is provided.",
)
args = parser.parse_args()
errors = _check_allowed_errors(args.input)

View File

@@ -11,10 +11,10 @@ import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import Fn
@@ -890,18 +890,3 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
assert current_logical_size == non_incremental
assert isinstance(current_logical_size, int)
return current_logical_size
def top_tenants(
self, order_by: str, limit: int, where_shards_lt: int, where_gt: int
) -> dict[Any, Any]:
res = self.post(
f"http://localhost:{self.port}/v1/top_tenants",
json={
"order_by": order_by,
"limit": limit,
"where_shards_lt": where_shards_lt,
"where_gt": where_gt,
},
)
self.verbose_error(res)
return res.json() # type: ignore

View File

@@ -3,7 +3,6 @@ import time
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.remote_storage
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -13,6 +12,7 @@ from fixtures.pageserver.utils import (
wait_until_tenant_state,
)
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId, TimelineId
def single_timeline(

View File

@@ -6,13 +6,13 @@ import threading
from pathlib import Path
from typing import Any, List, Tuple
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv, Pagectl
from fixtures.pageserver.common_types import (
from fixtures.pageserver.types import (
InvalidFileName,
parse_layer_file_name,
)
from fixtures.remote_storage import LocalFsStorage
from fixtures.types import TenantId, TimelineId
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):

View File

@@ -2,7 +2,7 @@ import re
from dataclasses import dataclass
from typing import Any, Dict, Tuple, Union
from fixtures.common_types import KEY_MAX, KEY_MIN, Key, Lsn
from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn
@dataclass

View File

@@ -8,10 +8,10 @@ from mypy_boto3_s3.type_defs import (
ObjectTypeDef,
)
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
from fixtures.remote_storage import RemoteStorage, RemoteStorageKind, S3Storage
from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until

View File

@@ -1,8 +1,7 @@
import os
from typing import Any, Dict, Optional
from typing import Optional
import pytest
import toml
from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
@@ -38,20 +37,6 @@ def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
return None
def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
toml_table = os.getenv("PAGESERVER_DEFAULT_TENANT_CONFIG_COMPACTION_ALGORITHM")
if toml_table is None:
return None
v = toml.loads(toml_table)
assert isinstance(v, dict)
return v
@pytest.fixture(scope="function", autouse=True)
def pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
return get_pageserver_default_tenant_config_compaction_algorithm()
def pytest_generate_tests(metafunc: Metafunc):
if (bt := os.getenv("BUILD_TYPE")) is None:
build_types = ["debug", "release"]
@@ -75,16 +60,6 @@ def pytest_generate_tests(metafunc: Metafunc):
):
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])
# Same hack for pageserver_default_tenant_config_compaction_algorithm
if (
explicit_default := get_pageserver_default_tenant_config_compaction_algorithm()
) is not None:
metafunc.parametrize(
"pageserver_default_tenant_config_compaction_algorithm",
[explicit_default],
ids=[explicit_default["kind"]],
)
# For performance tests, parametrize also by platform
if (
"test_runner/performance" in metafunc.definition._nodeid

View File

@@ -12,8 +12,8 @@ import boto3
import toml
from mypy_boto3_s3 import S3Client
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.types import TenantId, TimelineId
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
TENANT_HEATMAP_FILE_NAME = "heatmap-v1.json"

View File

@@ -6,8 +6,8 @@ from typing import Any, Dict, List, Optional, Tuple, Union
import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId
# Walreceiver as returned by sk's timeline status endpoint.

View File

@@ -1,6 +1,6 @@
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.types import TenantId, TimelineId
def are_walreceivers_absent(

View File

@@ -25,14 +25,14 @@ import zstandard
from psycopg2.extensions import cursor
from fixtures.log_helper import log
from fixtures.pageserver.common_types import (
from fixtures.pageserver.types import (
parse_delta_layer,
parse_image_layer,
)
if TYPE_CHECKING:
from fixtures.neon_fixtures import PgBin
from fixtures.common_types import TimelineId
from fixtures.types import TimelineId
Fn = TypeVar("Fn", bound=Callable[..., Any])
@@ -452,7 +452,6 @@ def humantime_to_ms(humantime: str) -> float:
def scan_log_for_errors(input: Iterable[str], allowed_errors: List[str]) -> List[Tuple[int, str]]:
# FIXME: this duplicates test_runner/fixtures/pageserver/allowed_errors.py
error_or_warn = re.compile(r"\s(ERROR|WARN)")
errors = []
for lineno, line in enumerate(input, start=1):
@@ -485,7 +484,7 @@ def assert_no_errors(log_file, service, allowed_errors):
for _lineno, error in errors:
log.info(f"not allowed {service} error: {error.strip()}")
assert not errors, f"First log error on {service}: {errors[0]}\nHint: use scripts/check_allowed_errors.sh to test any new allowed_error you add"
assert not errors, f"Log errors on {service}: {errors[0]}"
@enum.unique

View File

@@ -1,7 +1,6 @@
import threading
from typing import Any, Optional
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -11,6 +10,7 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
# neon_local doesn't handle creating/modifying endpoints concurrently, so we use a mutex
# to ensure we don't do that: this enables running lots of Workloads in parallel safely.

View File

@@ -1,175 +0,0 @@
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import s3_storage
from fixtures.utils import humantime_to_ms
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("io_engine", ["tokio-epoll-uring", "std-fs"])
@pytest.mark.parametrize("concurrency_per_target", [1, 10, 100])
@pytest.mark.timeout(1000)
def test_download_churn(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
io_engine: str,
concurrency_per_target: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_ondemand_download_churn.{metric}", **kwargs)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
# we don't capture `duration`, but instead use the `runtime` output field from pagebench
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
# Setup env
env = setup_env(neon_env_builder, pg_bin)
env.pageserver.allowed_errors.append(
f".*path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration)
def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# We configure tenant conf such that SQL query below produces a lot of layers.
# We don't care what's in the layers really, we just care that layers are created.
bytes_per_layer = 10 * (1024**2)
env = neon_env_builder.init_start(
initial_tenant_conf={
"pitr_interval": "1000d", # let's not make it get in the way
"gc_period": "0s", # disable periodic gc to avoid noise
"compaction_period": "0s", # disable L0=>L1 compaction
"checkpoint_timeout": "10years", # rely solely on checkpoint_distance
"checkpoint_distance": bytes_per_layer, # 10M instead of 256M to create more smaller layers
"image_creation_threshold": 100000, # don't create image layers ever
}
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as ep:
ep.safe_psql("CREATE TABLE data (random_text text)")
bytes_per_row = 512 # make big enough so WAL record size doesn't dominate
desired_layers = 300
desired_bytes = bytes_per_layer * desired_layers
nrows = desired_bytes / bytes_per_row
ep.safe_psql(
f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i",
options="-c statement_timeout=0",
)
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
# TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here
wait_for_upload_queue_empty(client, tenant_id, timeline_id)
return env
def run_benchmark(
env: NeonEnv,
pg_bin: PgBin,
record,
io_engine: str,
concurrency_per_target: int,
duration_secs: int,
):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"ondemand-download-churn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--runtime",
f"{duration_secs}s",
"--set-io-engine",
f"{io_engine}",
"--concurrency-per-target",
f"{concurrency_per_target}",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
metric = "downloads_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "downloads_bytes"
record(
metric,
metric_value=results[metric],
unit="byte",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "evictions_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "timeline_restarts"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "runtime"
record(
metric,
metric_value=humantime_to_ms(results[metric]) / 1000,
unit="s",
report=MetricReport.TEST_PARAM,
)

View File

@@ -5,13 +5,13 @@ Utilities used by all code in this sub-directory
from typing import Any, Callable, Dict, Tuple
import fixtures.pageserver.many_tenants as many_tenants
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.pageserver.utils import wait_until_all_tenants_state
from fixtures.types import TenantId, TimelineId
def ensure_pageserver_ready_for_benchmarking(env: NeonEnv, n_tenants: int):

View File

@@ -9,11 +9,11 @@ from typing import List
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonPageserver
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.types import Lsn
from fixtures.utils import wait_until
from prometheus_client.samples import Sample

View File

@@ -2,10 +2,10 @@ from contextlib import closing
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.pageserver.utils import wait_tenant_status_404
from fixtures.pg_version import PgVersion
from fixtures.types import Lsn
#

View File

@@ -1,296 +0,0 @@
import concurrent.futures
import re
import threading
from pathlib import Path
import pytest
from fixtures.common_types import TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
tenant_get_shards,
)
@pytest.mark.timeout(600)
def test_sharding_autosplit(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Check that sharding, including auto-splitting, "just works" under pgbench workloads.
This is not a benchmark, but it lives in the same place as benchmarks in order to be run
on a dedicated node that can sustain some significant throughput.
Other tests validate the details of shard splitting, error cases etc. This test is
the sanity check that it all really works as expected with realistic amounts of data
and under load.
Success conditions:
- Tenants auto-split when their capacity grows
- Client workloads are not interrupted while that happens
"""
neon_env_builder.num_pageservers = 8
neon_env_builder.storage_controller_config = {
# Split tenants at 500MB: it's up to the storage controller how it interprets this (logical
# sizes, physical sizes, etc). We will write this much data logically, therefore other sizes
# will reliably be greater.
"split_threshold": 1024 * 1024 * 500
}
tenant_conf = {
# We want layer rewrites to happen as soon as possible (this is the most stressful
# case for the system), so set PITR interval to something tiny.
"pitr_interval": "5s",
# Scaled down thresholds. We will run at ~1GB scale but would like to emulate
# the behavior of a system running at ~100GB scale.
"checkpoint_distance": f"{1024 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{1024 * 1024}",
"image_creation_threshold": "2",
"image_layer_creation_check_threshold": "0",
}
env = neon_env_builder.init_start()
for ps in env.pageservers:
ps.allowed_errors.extend(
[
# We shut down pageservers while they might have some compaction work going on
".*Compaction failed.*shutting down.*"
]
)
# Total tenants
tenant_count = 3
# Transaction rate: we set this rather than running at full-speed because we
# might run on a slow node that doesn't cope well with many full-speed pgbenches running concurrently.
transaction_rate = 50
# Choose a pgbench scale that is just high enough to hit the split threshold around the time init
# finishes (we want splits going on during the main read/write bench)
pgbench_scale = 40
# Runtime selected to give storage controller time to do all the shard splits while it runs
pgbench_runtime = 180
class TenantState:
def __init__(self, timeline_id, endpoint):
self.timeline_id = timeline_id
self.endpoint = endpoint
# Create tenants
tenants = {}
for tenant_id in set(TenantId.generate() for _i in range(0, tenant_count)):
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id, timeline_id, conf=tenant_conf, placement_policy='{"Attached":1}'
)
endpoint = env.endpoints.create("main", tenant_id=tenant_id)
tenants[tenant_id] = TenantState(timeline_id, endpoint)
endpoint.start()
def run_pgbench_init(endpoint):
pg_bin.run_capture(
[
"pgbench",
f"-s{pgbench_scale}",
"-i",
f"postgres://cloud_admin@localhost:{endpoint.pg_port}/postgres",
]
)
def check_pgbench_output(out_path: str):
"""
When we run pgbench, we want not just an absence of errors, but also continuous evidence
of I/O progressing: our shard splitting and migration should not interrrupt the benchmark.
"""
matched_lines = 0
stderr = Path(f"{out_path}.stderr").read_text()
low_watermark = None
# Apply this as a threshold for what we consider an unacceptable interruption to I/O
min_tps = transaction_rate // 10
for line in stderr.split("\n"):
match = re.match(r"progress: ([0-9\.]+) s, ([0-9\.]+) tps, .* ([0-9]+) failed", line)
if match is None:
# Fall back to older-version pgbench output (omits failure count)
match = re.match(r"progress: ([0-9\.]+) s, ([0-9\.]+) tps, .*", line)
if match is None:
continue
else:
(_time, tps) = match.groups()
tps = float(tps)
failed = 0
else:
(_time, tps, failed) = match.groups() # type: ignore
tps = float(tps)
failed = int(failed)
matched_lines += 1
if failed > 0:
raise RuntimeError(
f"pgbench on tenant {endpoint.tenant_id} run at {out_path} has failed > 0"
)
if low_watermark is None or low_watermark > tps:
low_watermark = tps
if tps < min_tps:
raise RuntimeError(
f"pgbench on tenant {endpoint.tenant_id} run at {out_path} has tps < {min_tps}"
)
log.info(f"Checked {matched_lines} progress lines, lowest TPS was {min_tps}")
if matched_lines == 0:
raise RuntimeError(f"pgbench output at {out_path} contained no progress lines")
def run_pgbench_main(endpoint):
out_path = pg_bin.run_capture(
[
"pgbench",
"-T",
f"{pgbench_runtime}",
"-R",
f"{transaction_rate}",
"-P",
"1",
f"postgres://cloud_admin@localhost:{endpoint.pg_port}/postgres",
]
)
check_pgbench_output(out_path)
def run_pgbench_read(endpoint):
out_path = pg_bin.run_capture(
[
"pgbench",
"-T",
"60",
"-R",
f"{transaction_rate}",
"-S",
"-P",
"1",
f"postgres://cloud_admin@localhost:{endpoint.pg_port}/postgres",
]
)
check_pgbench_output(out_path)
background_reconcile_stop = threading.Event()
def background_reconcile_task():
# The controller will do all this autonomously, but with a 20 second wait between each
# time it considers doing a split/optimization. To enable a shorter test, actively
# poll the reconcile_all endpoint to make it all happen faster.
#
# Note that this is mainly to drain the post-split migrations faster, rather than to
# prompt the splits themselves.
while not background_reconcile_stop.is_set():
env.storage_controller.reconcile_until_idle(timeout_secs=pgbench_runtime, delay_max=0.5)
background_reconcile_stop.wait(5)
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count + 1) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_init, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench inits")
for fut in pgbench_futs:
fut.result()
reconcile_fut = pgbench_threads.submit(background_reconcile_task)
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_main, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench read/write pass")
for fut in pgbench_futs:
fut.result()
log.info("Waiting for background reconcile thread")
background_reconcile_stop.set()
reconcile_fut.result()
def assert_all_split():
for tenant_id in tenants.keys():
shards = tenant_get_shards(env, tenant_id)
assert len(shards) == 8
# This is not a wait_until, because we wanted the splits to happen _while_ pgbench is running: otherwise
# this test is not properly doing its job of validating that splits work nicely under load.
assert_all_split()
env.storage_controller.assert_log_contains(".*Successful auto-split.*")
# Log timeline sizes, useful for debug, and implicitly validates that the shards
# are available in the places the controller thinks they should be.
for tenant_id, tenant_state in tenants.items():
(shard_zero_id, shard_zero_ps) = tenant_get_shards(env, tenant_id)[0]
timeline_info = shard_zero_ps.http_client().timeline_detail(
shard_zero_id, tenant_state.timeline_id
)
log.info(f"{shard_zero_id} timeline: {timeline_info}")
# Run compaction for all tenants, restart endpoint so that on subsequent reads we will
# definitely hit pageserver for reads. This compaction passis expected to drop unwanted
# layers but not do any rewrites (we're still in the same generation)
for tenant_id, tenant_state in tenants.items():
tenant_state.endpoint.stop()
for shard_id, shard_ps in tenant_get_shards(env, tenant_id):
shard_ps.http_client().timeline_gc(shard_id, tenant_state.timeline_id, gc_horizon=None)
shard_ps.http_client().timeline_compact(shard_id, tenant_state.timeline_id)
tenant_state.endpoint.start()
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_read, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench read pass")
for fut in pgbench_futs:
fut.result()
env.storage_controller.consistency_check()
# Restart the storage controller
env.storage_controller.stop()
env.storage_controller.start()
env.storage_controller.consistency_check()
# Restart all pageservers
for ps in env.pageservers:
ps.stop()
ps.start()
# Freshen gc_info in Timeline, so that when compaction runs in the background in the
# subsequent pgbench period, the last_gc_cutoff is updated and enables the conditions for a rewrite to pass.
for tenant_id, tenant_state in tenants.items():
for shard_id, shard_ps in tenant_get_shards(env, tenant_id):
shard_ps.http_client().timeline_gc(shard_id, tenant_state.timeline_id, gc_horizon=None)
# One last check data remains readable after everything has restarted
with concurrent.futures.ThreadPoolExecutor(max_workers=tenant_count) as pgbench_threads:
pgbench_futs = []
for tenant_state in tenants.values():
fut = pgbench_threads.submit(run_pgbench_read, tenant_state.endpoint)
pgbench_futs.append(fut)
log.info("Waiting for pgbench read pass")
for fut in pgbench_futs:
fut.result()
# Assert that some rewrites happened
# TODO: uncomment this after https://github.com/neondatabase/neon/pull/7531 is merged
# assert any(ps.log_contains(".*Rewriting layer after shard split.*") for ps in env.pageservers)

Some files were not shown because too many files have changed in this diff Show More