Compare commits

..

13 Commits

Author SHA1 Message Date
Jan Christian Grünhage
313e01098a no merge: intentionally break benchmarks for verifying CI 2025-04-25 11:29:33 +02:00
Alexander Bayandin
5e989a3148 CI(build-tools): bump packages in build-tools image (#11697)
## Problem

`cargo-deny` 0.16.2 spits a bunch of warnings like:
```
warning[index-failure]: unable to check for yanked crates
```

The issue is fixed for the latest version of `cargo-deny` (0.18.2). And
while we're here, let's bump all the packages we have in `build-tools`
image

## Summary of changes
- bump cargo-hakari to 0.9.36
- bump cargo-deny to 0.18.2
- bump cargo-hack to 0.6.36
- bump cargo-nextest to 0.9.94
- bump diesel_cli to 2.2.9
- bump s5cmd to 2.3.0
- bump mold to 2.37.1
- bump python to 3.11.12
2025-04-24 14:13:04 +00:00
Alexey Kondratov
985056be37 feat(compute): Introduce Postgres downtime metrics (#11346)
## Problem

Currently, we only report the timestamp of the last moment we think
Postgres was active. The problem is that if Postgres gets completely
unresponsive, we still report some old timestamp, and it's impossible to
distinguish situations 'Postgres is effectively down' and 'Postgres is
running, but no client activity'.

## Summary of changes

Refactor the `compute_ctl`'s compute monitor so that it was easier to
track the connection errors and failed activity checks, and report
- `now() - last_successful_check` as current downtime on any failure
- cumulative Postgres downtime during the whole compute lifetime

After adding a test, I also noticed that the compute monitor may not
reconnect even though queries fail with `connection closed` or `error
communicating with the server: Connection reset by peer (os error 54)`,
but for some reason we do not catch it with `client.is_closed()`, so I
added an explicit reconnect in case of any failures.

Discussion:
https://neondb.slack.com/archives/C03TN5G758R/p1742489426966639
2025-04-24 13:51:09 +00:00
Christian Schwarz
9c6ff3aa2b refactor(BufferedWriter): flush task owns the VirtualFile & abstraction for cleanup on drop (#11549)
Main change:

- `BufferedWriter` owns the `W`; no more `Arc<W>`
- We introduce auto-delete-on-drop wrappers for `VirtualFile`.
  - `TempVirtualFile` for write-only users
- `TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter` for
EphemeralFile which requires read access to the immutable prefix of the
file (see doc comments for details)
- Users of `BufferedWriter` hand it such a wrapped `VirtualFile`.
- The wrapped `VirtualFile` moves to the background flush task.
- On `BufferedWriter` shutdown, ownership moves back.
- Callers remove the wrapper (`disarm_into_inner()`) after doing final
touches, e.g., flushing index blocks and summary for delta/image layer
writers.

If the BufferedWriter isn't shut down properly via
`BufferedWriter::shutdown`, or if there is an error during final
touches, the wrapper type ensures that the file gets unlinked.

We store a GateGuard inside the wrapper to ensure that the Timeline is
still alive when unlinking on drop.

Rust doesn't have async drop yet, so, the unlinking happens using a
synchronous syscall.
NB we don't fsync the surrounding directory.
This is how it's been before this PR; I believe it is correct because
all of these files are temporary paths that get cleaned up on timeline
load.
Again, timeline load does not need to fsync because the next timeline
load will unlink again if the file reappears.

The auto-delete-on-drop can happen after a higher-level mechanism
retries.
Therefore, we switch all users to monotonically increasing, never-reused
temp file disambiguators.

The aspects pointed out in the last two paragraphs will receive further
cleanup in follow-up task
- https://github.com/neondatabase/neon/issues/11692

Drive-by changes:
- It turns out we can remove the two-pronged code in the layer file
download code.
No need to make this a separate PR because all of production already
uses `tokio-epoll-uring` with the buffered writer for many weeks.


Refs
- epic https://github.com/neondatabase/neon/issues/9868
- alternative to https://github.com/neondatabase/neon/pull/11544
2025-04-24 13:07:57 +00:00
Folke Behrens
9d472c79ce Fix what's currently flagged by cargo deny (#11693)
* Replace yanked papaya version
* Remove unused allowed license: OpenSSL
* Remove Zlib license from general allow list since it's listed in the
exceptions section per crate
* Drop clarification for ring since they have separate LICENSE files now
* List the tower-otel repo as allowed source while we sort out the OTel
deps
2025-04-24 13:02:31 +00:00
Arpad Müller
b43203928f Switch tenant snapshot subcommand to remote_storage (#11685)
Switches the tenant snapshot subcommand of the storage scrubber to
`remote_storage`. As this is the last piece of the storage scrubber
still using the S3 SDK, this finishes the project started in #7547.

This allows us to do tenant snapshots on Azure as well.

Builds on #11671
Fixes #8830
2025-04-24 12:22:07 +00:00
Arpad Müller
c35d489539 versioning API for remote_storage (#11671)
Adds a versioning API to remote_storage. We want to use it in the
scrubber, both for tenant snapshot as well as for metadata checks.

for #8830
and for #11588
2025-04-24 11:41:48 +00:00
Vlad Lazar
3a50d95b6d storage_controller: coordinate imports across shards in the storage controller (#11345)
## Problem

Pageservers notify control plane directly when a shard import has
completed.
Control plane has to download the status of each shard from S3 and
figure out if everything is truly done,
before proceeding with branch activation.

Issues with this approach are:
* We can't control shard split behaviour on the storage controller side.
It's unsafe to split
during import.
* Control plane needs to know about shards and implement logic to check
all timelines are indeed ready.

## Summary of changes

In short, storage controller coordinates imports, and, only when
everything is done, notifies control plane.

Big rocks:
1. Store timeline imports in the storage controller database. Each
import stores the status of its shards in the database.
We hook into the timeline creation call as our entry point for this.
2. Pageservers get a new upcall endpoint to notify the storage
controller of shard import updates.
3. Storage controller handles these updates by updating persisted state.
If an update finalizes the import,
then poll pageservers until timeline activation, and, then, notify the
control plane that the import is complete.

Cplane side change with new endpoint is in
https://github.com/neondatabase/cloud/pull/26166

Closes https://github.com/neondatabase/neon/issues/11566
2025-04-24 11:26:06 +00:00
Arpad Müller
d43b8e73ae Update sentry to 0.37 (#11686)
Update the sentry crate to 0.37. This deduplicates the `webpki-roots`
crate in our crate graph, and brings another dependency onto newer
rustls `0.23.18`.
2025-04-24 11:20:41 +00:00
devin-ai-integration[bot]
1808dad269 Add --dev CLI flag to pageserver and safekeeper binaries (#11526)
# Add --dev CLI flag to pageserver and safekeeper binaries

This PR adds the `--dev` CLI flag to both the pageserver and safekeeper
binaries without implementing any functionality yet. This is a precursor
to PR #11517, which will implement the full functionality to require
authentication by default unless the `--dev` flag is specified.

## Changes
- Add `dev_mode` config field to pageserver binary
- Add `--dev` CLI flag to safekeeper binary

This PR is needed for forward compatibility tests to work properly, when
we try to merge #11517

Link to Devin run:
https://app.devin.ai/sessions/ad8231b4e2be430398072b6fc4e85d46
Requested by: John Spray (john@neon.tech)

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: John Spray <john@neon.tech>
2025-04-24 10:45:40 +00:00
Folke Behrens
7ba8519b43 proxy: Update zerocopy to 0.8 (#11681)
Also add some macros that might result in more efficient code.
2025-04-24 09:39:08 +00:00
Christian Schwarz
f8100d66d5 ci: extend 'Wait for extension build to finish' timeout (#11689)
Refs
- https://neondb.slack.com/archives/C059ZC138NR/p1745427571307149
2025-04-24 08:15:08 +00:00
Christian Schwarz
51cdb570eb bench_ingest: general overhaul & add parametrization over virtual_file_io_mode (#11667)
Changes:
- clean up existing parametrization & criterion `BenchmarkId`
- additional parametrization over `virtual_file_io_mode`
- switch to `multi_thread` to be closer to production ([Slack
thread](https://neondb.slack.com/archives/C033RQ5SPDH/p1745339543093159))

Refs
- epic https://github.com/neondatabase/neon/issues/9868
- extracted from https://github.com/neondatabase/neon/pull/11558
2025-04-24 07:38:18 +00:00
55 changed files with 2303 additions and 1671 deletions

View File

@@ -1238,7 +1238,7 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TIMEOUT=1800 # 30 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
TIMEOUT=5400 # 90 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
INTERVAL=15 # try each N seconds
last_status="" # a variable to carry the last status of the "build-and-upload-extensions" context

78
Cargo.lock generated
View File

@@ -40,7 +40,7 @@ dependencies = [
"getrandom 0.2.11",
"once_cell",
"version_check",
"zerocopy",
"zerocopy 0.7.31",
]
[[package]]
@@ -4415,9 +4415,9 @@ dependencies = [
[[package]]
name = "papaya"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd"
checksum = "6827e3fc394523c21d4464d02c0bb1c19966ea4a58a9844ad6d746214179d2bc"
dependencies = [
"equivalent",
"seize",
@@ -5204,7 +5204,7 @@ dependencies = [
"walkdir",
"workspace_hack",
"x509-cert",
"zerocopy",
"zerocopy 0.8.24",
]
[[package]]
@@ -5594,7 +5594,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.26.1",
"webpki-roots",
"winreg",
]
@@ -6195,13 +6195,13 @@ checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87"
[[package]]
name = "sentry"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00421ed8fa0c995f07cde48ba6c89e80f2b312f74ff637326f392fbfd23abe02"
checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.21.12",
"rustls 0.23.18",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -6209,14 +6209,14 @@ dependencies = [
"sentry-tracing",
"tokio",
"ureq",
"webpki-roots 0.25.2",
"webpki-roots",
]
[[package]]
name = "sentry-backtrace"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a79194074f34b0cbe5dd33896e5928bbc6ab63a889bd9df2264af5acb186921e"
checksum = "00293cd332a859961f24fd69258f7e92af736feaeb91020cff84dac4188a4302"
dependencies = [
"backtrace",
"once_cell",
@@ -6226,9 +6226,9 @@ dependencies = [
[[package]]
name = "sentry-contexts"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba8870c5dba2bfd9db25c75574a11429f6b95957b0a78ac02e2970dd7a5249a"
checksum = "961990f9caa76476c481de130ada05614cd7f5aa70fb57c2142f0e09ad3fb2aa"
dependencies = [
"hostname",
"libc",
@@ -6240,9 +6240,9 @@ dependencies = [
[[package]]
name = "sentry-core"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46a75011ea1c0d5c46e9e57df03ce81f5c7f0a9e199086334a1f9c0a541e0826"
checksum = "1a6409d845707d82415c800290a5d63be5e3df3c2e417b0997c60531dfbd35ef"
dependencies = [
"once_cell",
"rand 0.8.5",
@@ -6253,9 +6253,9 @@ dependencies = [
[[package]]
name = "sentry-panic"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eaa3ecfa3c8750c78dcfd4637cfa2598b95b52897ed184b4dc77fcf7d95060d"
checksum = "609b1a12340495ce17baeec9e08ff8ed423c337c1a84dffae36a178c783623f3"
dependencies = [
"sentry-backtrace",
"sentry-core",
@@ -6263,9 +6263,9 @@ dependencies = [
[[package]]
name = "sentry-tracing"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f715932bf369a61b7256687c6f0554141b7ce097287e30e3f7ed6e9de82498fe"
checksum = "49f4e86402d5c50239dc7d8fd3f6d5e048221d5fcb4e026d8d50ab57fe4644cb"
dependencies = [
"sentry-backtrace",
"sentry-core",
@@ -6275,9 +6275,9 @@ dependencies = [
[[package]]
name = "sentry-types"
version = "0.32.3"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4519c900ce734f7a0eb7aba0869dfb225a7af8820634a7dd51449e3b093cfb7c"
checksum = "3d3f117b8755dbede8260952de2aeb029e20f432e72634e8969af34324591631"
dependencies = [
"debugid",
"hex",
@@ -6711,8 +6711,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"aws-config",
"aws-sdk-s3",
"camino",
"chrono",
"clap",
@@ -7801,7 +7799,7 @@ dependencies = [
"rustls 0.23.18",
"rustls-pki-types",
"url",
"webpki-roots 0.26.1",
"webpki-roots",
]
[[package]]
@@ -8169,12 +8167,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "webpki-roots"
version = "0.26.1"
@@ -8482,6 +8474,8 @@ dependencies = [
"regex-syntax 0.8.2",
"reqwest",
"rustls 0.23.18",
"rustls-pki-types",
"rustls-webpki 0.102.8",
"scopeguard",
"sec1 0.7.3",
"serde",
@@ -8510,7 +8504,6 @@ dependencies = [
"tracing-log",
"url",
"uuid",
"zerocopy",
"zeroize",
"zstd",
"zstd-safe",
@@ -8614,8 +8607,16 @@ version = "0.7.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
dependencies = [
"byteorder",
"zerocopy-derive",
"zerocopy-derive 0.7.31",
]
[[package]]
name = "zerocopy"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879"
dependencies = [
"zerocopy-derive 0.8.24",
]
[[package]]
@@ -8629,6 +8630,17 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "zerofrom"
version = "0.1.5"

View File

@@ -164,7 +164,7 @@ scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
send-future = "0.1.0"
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.37", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
@@ -220,7 +220,7 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.8"
whoami = "1.5.1"
zerocopy = { version = "0.7", features = ["derive"] }
zerocopy = { version = "0.8", features = ["derive", "simd"] }
json-structural-diff = { version = "0.2.0" }
x509-cert = { version = "0.2.5" }

View File

@@ -173,7 +173,7 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
&& rm -rf protoc.zip protoc
# s5cmd
ENV S5CMD_VERSION=2.2.2
ENV S5CMD_VERSION=2.3.0
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
&& chmod +x s5cmd \
&& mv s5cmd /usr/local/bin/s5cmd
@@ -206,7 +206,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
&& rm awscliv2.zip
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.34.1
ENV MOLD_VERSION=v2.37.1
RUN set -e \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \
@@ -268,7 +268,7 @@ WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
# Python
ENV PYTHON_VERSION=3.11.10 \
ENV PYTHON_VERSION=3.11.12 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
@@ -296,12 +296,12 @@ ENV RUSTC_VERSION=1.86.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_HAKARI_VERSION=0.9.36
ARG CARGO_DENY_VERSION=0.18.2
ARG CARGO_HACK_VERSION=0.6.36
ARG CARGO_NEXTEST_VERSION=0.9.94
ARG CARGO_CHEF_VERSION=0.1.71
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
ARG CARGO_DIESEL_CLI_VERSION=2.2.9
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \

View File

@@ -1,8 +1,8 @@
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -81,6 +81,22 @@ pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static PG_CURR_DOWNTIME_MS: Lazy<GenericGauge<AtomicF64>> = Lazy::new(|| {
register_gauge!(
"compute_pg_current_downtime_ms",
"Non-cumulative duration of Postgres downtime in ms; resets after successful check",
)
.expect("failed to define a metric")
});
pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::new(|| {
register_int_counter!(
"compute_pg_downtime_ms_total",
"Cumulative duration of Postgres downtime in ms",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -88,5 +104,7 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics
}

View File

@@ -6,197 +6,294 @@ use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use postgres::{Client, NoTls};
use tracing::{debug, error, info, warn};
use tracing::{Level, error, info, instrument, span};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// NB: the only expected panic is at `Mutex` unwrap(), all other errors
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let connstr = compute.params.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
struct ComputeMonitor {
compute: Arc<ComputeNode>,
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(compute);
/// The moment when Postgres had some activity,
/// that should prevent compute from being suspended.
last_active: Option<DateTime<Utc>>,
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
/// The moment when we last tried to check Postgres.
last_checked: DateTime<Utc>,
/// The last moment we did a successful Postgres check.
last_up: DateTime<Utc>,
let mut sleep = false;
let mut prev_active_time: Option<f64> = None;
let mut prev_sessions: Option<i64> = None;
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
active_time: Option<f64>,
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
sessions: Option<i64>,
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
info!("starting experimental activity monitor for {}", connstr);
} else {
info!("starting activity monitor for {}", connstr);
/// Use experimental statistics-based activity monitor. It's no longer
/// 'experimental' per se, as it's enabled for everyone, but we still
/// keep the flag as an option to turn it off in some cases if it will
/// misbehave.
experimental: bool,
}
impl ComputeMonitor {
fn report_down(&self) {
let now = Utc::now();
// Calculate and report current downtime
// (since the last time Postgres was up)
let downtime = now.signed_duration_since(self.last_up);
PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
// Calculate and update total downtime
// (cumulative duration of Postgres downtime in ms)
let inc = now
.signed_duration_since(self.last_checked)
.num_milliseconds();
PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
}
loop {
// We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
// But skip the first sleep, so we can connect to Postgres immediately.
if sleep {
// Should be outside of the mutex lock to allow others to read while we sleep.
thread::sleep(MONITOR_CHECK_INTERVAL);
} else {
sleep = true;
}
fn report_up(&mut self) {
self.last_up = Utc::now();
PG_CURR_DOWNTIME_MS.set(0.0);
}
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!("connection to Postgres is closed, trying to reconnect");
fn downtime_info(&self) -> String {
format!(
"total_ms: {}, current_ms: {}, last_up: {}",
PG_TOTAL_DOWNTIME_MS.get(),
PG_CURR_DOWNTIME_MS.get(),
self.last_up
)
}
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
continue;
}
/// Spin in a loop and figure out the last activity time in the Postgres.
/// Then update it in the shared state. This function never errors out.
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
/// should be handled gracefully.
#[instrument(skip_all)]
pub fn run(&mut self) {
// Suppose that `connstr` doesn't change
let connstr = self.compute.params.connstr.clone();
let conf = self
.compute
.get_conn_conf(Some("compute_ctl:compute_monitor"));
// This is a new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
// First, check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still a user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(&self.compute);
prev_active_time = match prev_active_time {
Some(prev_active_time) => {
if active_time != prev_active_time {
detected_activity = true;
}
Some(active_time)
}
None => Some(active_time),
};
prev_sessions = match prev_sessions {
Some(prev_sessions) => {
if sessions != prev_sessions {
detected_activity = true;
}
Some(sessions)
}
None => Some(sessions),
};
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
error!("could not get database statistics: {}", e);
continue;
}
}
}
info!("starting compute monitor for {}", connstr);
// Second, if database statistics is the same, check all backends state change,
// maybe there is some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions, that did nothing yet.
match get_backends_state_change(cli) {
Ok(last_active) => {
compute.update_last_active(last_active);
}
Err(e) => {
error!("could not get backends state change: {}", e);
}
}
// Finally, if there are existing (logical) walsenders, do not suspend.
//
// walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will be
let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(ws_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse walsenders count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of walsenders: {:?}", e);
continue;
}
}
//
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
//
let logical_subscriptions_query =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(logical_subscriptions_query, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
continue;
}
},
Err(e) => {
warn!(
"failed to get list of active logical replication subscriptions: {:?}",
e
loop {
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!(
downtime_info = self.downtime_info(),
"connection to Postgres is closed, trying to reconnect"
);
continue;
}
}
//
// Do not suspend compute if autovacuum is running
//
let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(autovacuum_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
self.report_down();
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
} else {
match self.check(cli) {
Ok(_) => {
self.report_up();
self.compute.update_last_active(self.last_active);
}
Err(e) => {
// Although we have many places where we can return errors in `check()`,
// normally it shouldn't happen. I.e., we will likely return error if
// connection got broken, query timed out, Postgres returned invalid data, etc.
// In all such cases it's suspicious, so let's report this as downtime.
self.report_down();
error!(
downtime_info = self.downtime_info(),
"could not check Postgres: {}", e
);
// Reconnect to Postgres just in case. During tests, I noticed
// that queries in `check()` can fail with `connection closed`,
// but `cli.is_closed()` above doesn't detect it. Even if old
// connection is still alive, it will be dropped when we reassign
// `client` to a new connection.
client = conf.connect(NoTls);
}
}
Err(e) => {
warn!("failed to parse autovacuum workers count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of autovacuum workers: {:?}", e);
continue;
}
}
}
Err(e) => {
debug!("could not connect to Postgres: {}, retrying", e);
Err(e) => {
info!(
downtime_info = self.downtime_info(),
"could not connect to Postgres: {}, retrying", e
);
self.report_down();
// Establish a new connection and try again.
client = conf.connect(NoTls);
// Establish a new connection and try again.
client = conf.connect(NoTls);
}
}
// Reset the `last_checked` timestamp and sleep before the next iteration.
self.last_checked = Utc::now();
thread::sleep(MONITOR_CHECK_INTERVAL);
}
}
#[instrument(skip_all)]
fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
// This is new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if self.experimental {
// Check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
if let Some(prev_active_time) = self.active_time {
if active_time != prev_active_time {
detected_activity = true;
}
}
self.active_time = Some(active_time);
if let Some(prev_sessions) = self.sessions {
if sessions != prev_sessions {
detected_activity = true;
}
}
self.sessions = Some(sessions);
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
return Err(anyhow::anyhow!("could not get database statistics: {}", e));
}
}
}
// If database statistics are the same, check all backends for state changes.
// Maybe there are some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions that have not done anything yet.
match get_backends_state_change(cli) {
Ok(last_active) => match (last_active, self.last_active) {
(Some(last_active), Some(prev_last_active)) => {
if last_active > prev_last_active {
self.last_active = Some(last_active);
return Ok(());
}
}
(Some(last_active), None) => {
self.last_active = Some(last_active);
return Ok(());
}
_ => {}
},
Err(e) => {
return Err(anyhow::anyhow!(
"could not get backends state change: {}",
e
));
}
}
// If there are existing (logical) walsenders, do not suspend.
//
// N.B. walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will.
const WS_COUNT_QUERY: &str =
"select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(WS_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
let err: anyhow::Error = e.into();
return Err(err.context("failed to parse walsenders count"));
}
},
Err(e) => {
return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
}
}
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse 'pg_stat_subscription' count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of active logical replication subscriptions: {}",
e
));
}
}
// Do not suspend compute if autovacuum is running
const AUTOVACUUM_COUNT_QUERY: &str =
"select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
};
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse autovacuum workers count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of autovacuum workers: {}",
e
));
}
}
Ok(())
}
}
@@ -315,9 +412,24 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
let now = Utc::now();
let mut monitor = ComputeMonitor {
compute,
last_active: None,
last_checked: now,
last_up: now,
active_time: None,
sessions: None,
experimental,
};
let span = span!(Level::INFO, "compute_monitor");
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&compute))
.spawn(move || {
let _enter = span.enter();
monitor.run();
})
.expect("cannot launch compute monitor thread")
}

View File

@@ -45,9 +45,7 @@ allow = [
"ISC",
"MIT",
"MPL-2.0",
"OpenSSL",
"Unicode-3.0",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [
@@ -56,14 +54,6 @@ exceptions = [
{ allow = ["Zlib"], name = "const_format", version = "*" },
]
[[licenses.clarify]]
name = "ring"
version = "*"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 }
]
[licenses.private]
ignore = true
registries = []
@@ -116,7 +106,11 @@ name = "openssl"
unknown-registry = "warn"
unknown-git = "warn"
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
allow-git = []
allow-git = [
# Crate pinned to commit in origin repo due to opentelemetry version.
# TODO: Remove this once crate is fetched from crates.io again.
"https://github.com/mattiapenati/tower-otel",
]
[sources.allow-org]
github = [

View File

@@ -181,6 +181,7 @@ pub struct ConfigToml {
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -657,6 +658,7 @@ impl Default for ConfigToml {
generate_unarchival_heatmap: None,
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
}
}
}

View File

@@ -320,6 +320,35 @@ pub struct TimelineCreateRequest {
pub mode: TimelineCreateRequestMode,
}
impl TimelineCreateRequest {
pub fn mode_tag(&self) -> &'static str {
match &self.mode {
TimelineCreateRequestMode::Branch { .. } => "branch",
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
}
}
pub fn is_import(&self) -> bool {
matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. })
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum ShardImportStatus {
InProgress,
Done,
Error(String),
}
impl ShardImportStatus {
pub fn is_terminal(&self) -> bool {
match self {
ShardImportStatus::InProgress => false,
ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
}
}
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {

View File

@@ -4,10 +4,10 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest;
use crate::models::LocationConfigMode;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::TenantShardId;
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
@@ -62,3 +62,10 @@ pub struct ValidateResponseTenant {
pub id: TenantShardId,
pub valid: bool,
}
#[derive(Serialize, Deserialize)]
pub struct PutTimelineImportStatusRequest {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub status: ShardImportStatus,
}

View File

@@ -14,8 +14,9 @@ use anyhow::{Context, Result};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::blob::operations::GetBlobBuilder;
use azure_storage_blobs::blob::{Blob, CopyStatus};
use azure_storage_blobs::container::operations::ListBlobsBuilder;
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use bytes::Bytes;
use futures::FutureExt;
@@ -253,53 +254,15 @@ impl AzureBlobStorage {
download
}
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
res.insert(k, v);
}
res
}
fn to_download_error(error: azure_core::Error) -> DownloadError {
if let Some(http_err) = error.as_http_error() {
match http_err.status() {
StatusCode::NotFound => DownloadError::NotFound,
StatusCode::NotModified => DownloadError::Unmodified,
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
_ => DownloadError::Other(anyhow::Error::new(error)),
}
} else {
DownloadError::Other(error.into())
}
}
impl RemoteStorage for AzureBlobStorage {
fn list_streaming(
fn list_streaming_for_fn<T: Default + ListingCollector>(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
request_kind: RequestKind,
customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
) -> impl Stream<Item = Result<T, DownloadError>> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
self.prefix_in_container.clone().map(|mut s| {
@@ -311,7 +274,7 @@ impl RemoteStorage for AzureBlobStorage {
});
async_stream::stream! {
let _permit = self.permit(RequestKind::List, cancel).await?;
let _permit = self.permit(request_kind, cancel).await?;
let mut builder = self.client.list_blobs();
@@ -327,6 +290,8 @@ impl RemoteStorage for AzureBlobStorage {
builder = builder.max_results(MaxResults::new(limit));
}
builder = customize_builder(builder);
let mut next_marker = None;
let mut timeout_try_cnt = 1;
@@ -382,26 +347,20 @@ impl RemoteStorage for AzureBlobStorage {
break;
};
let mut res = Listing::default();
let mut res = T::default();
next_marker = entry.continuation();
let prefix_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.prefixes.extend(prefix_iter);
res.add_prefixes(self, prefix_iter);
let blob_iter = entry
.blobs
.blobs()
.map(|k| ListingObject{
key: self.name_to_relative_path(&k.name),
last_modified: k.properties.last_modified.into(),
size: k.properties.content_length,
}
);
.blobs();
for key in blob_iter {
res.keys.push(key);
res.add_blob(self, key);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
@@ -423,6 +382,128 @@ impl RemoteStorage for AzureBlobStorage {
}
}
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
trait ListingCollector {
fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
}
impl ListingCollector for Listing {
fn add_prefixes(
&mut self,
_abs: &AzureBlobStorage,
prefix_it: impl Iterator<Item = RemotePath>,
) {
self.prefixes.extend(prefix_it);
}
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
self.keys.push(ListingObject {
key: abs.name_to_relative_path(&blob.name),
last_modified: blob.properties.last_modified.into(),
size: blob.properties.content_length,
});
}
}
impl ListingCollector for crate::VersionListing {
fn add_prefixes(
&mut self,
_abs: &AzureBlobStorage,
_prefix_it: impl Iterator<Item = RemotePath>,
) {
// nothing
}
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
self.versions.push(crate::Version {
key: abs.name_to_relative_path(&blob.name),
last_modified: blob.properties.last_modified.into(),
kind: crate::VersionKind::Version(id),
});
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
res.insert(k, v);
}
res
}
fn to_download_error(error: azure_core::Error) -> DownloadError {
if let Some(http_err) = error.as_http_error() {
match http_err.status() {
StatusCode::NotFound => DownloadError::NotFound,
StatusCode::NotModified => DownloadError::Unmodified,
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
_ => DownloadError::Other(anyhow::Error::new(error)),
}
} else {
DownloadError::Other(error.into())
}
}
impl RemoteStorage for AzureBlobStorage {
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let customize_builder = |builder| builder;
let kind = RequestKind::ListVersions;
self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> std::result::Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
builder
};
let kind = RequestKind::ListVersions;
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
}
async fn head_object(
&self,
key: &RemotePath,
@@ -532,7 +613,12 @@ impl RemoteStorage for AzureBlobStorage {
let mut builder = blob_client.get();
if let Some(ref etag) = opts.etag {
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
}
if let Some(ref version_id) = opts.version_id {
let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
builder = builder.blob_versioning(version_id);
}
if let Some((start, end)) = opts.byte_range() {

View File

@@ -176,6 +176,32 @@ pub struct Listing {
pub keys: Vec<ListingObject>,
}
#[derive(Default)]
pub struct VersionListing {
pub versions: Vec<Version>,
}
pub struct Version {
pub key: RemotePath,
pub last_modified: SystemTime,
pub kind: VersionKind,
}
impl Version {
pub fn version_id(&self) -> Option<&VersionId> {
match &self.kind {
VersionKind::Version(id) => Some(id),
VersionKind::DeletionMarker => None,
}
}
}
#[derive(Debug)]
pub enum VersionKind {
DeletionMarker,
Version(VersionId),
}
/// Options for downloads. The default value is a plain GET.
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
@@ -186,6 +212,8 @@ pub struct DownloadOpts {
/// The end of the byte range to download, or unbounded. Must be after the
/// start bound.
pub byte_end: Bound<u64>,
/// Optionally request a specific version of a key
pub version_id: Option<VersionId>,
/// Indicate whether we're downloading something small or large: this indirectly controls
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
/// for layer files
@@ -197,12 +225,16 @@ pub enum DownloadKind {
Small,
}
#[derive(Debug, Clone)]
pub struct VersionId(pub String);
impl Default for DownloadOpts {
fn default() -> Self {
Self {
etag: Default::default(),
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
version_id: None,
kind: DownloadKind::Large,
}
}
@@ -295,6 +327,14 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<VersionListing, DownloadError>;
/// Obtain metadata information about an object.
async fn head_object(
&self,
@@ -475,6 +515,22 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
}
}
// See [`RemoteStorage::list_versions`].
pub async fn list_versions<'a>(
&'a self,
prefix: Option<&'a RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &'a CancellationToken,
) -> Result<VersionListing, DownloadError> {
match self {
Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
}
}
// See [`RemoteStorage::head_object`].
pub async fn head_object(
&self,
@@ -727,6 +783,7 @@ impl ConcurrencyLimiter {
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
RequestKind::ListVersions => &self.read,
}
}

View File

@@ -445,6 +445,16 @@ impl RemoteStorage for LocalFs {
}
}
async fn list_versions(
&self,
_prefix: Option<&RemotePath>,
_mode: ListingMode,
_max_keys: Option<NonZeroU32>,
_cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
unimplemented!()
}
async fn head_object(
&self,
key: &RemotePath,

View File

@@ -14,6 +14,7 @@ pub(crate) enum RequestKind {
Copy = 4,
TimeTravel = 5,
Head = 6,
ListVersions = 7,
}
use RequestKind::*;
@@ -29,6 +30,7 @@ impl RequestKind {
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
ListVersions => "list_versions",
}
}
const fn as_index(&self) -> usize {
@@ -36,7 +38,10 @@ impl RequestKind {
}
}
const REQUEST_KIND_COUNT: usize = 7;
const REQUEST_KIND_LIST: &[RequestKind] =
&[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions];
const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len();
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
impl<C> RequestTyped<C> {
@@ -45,12 +50,11 @@ impl<C> RequestTyped<C> {
}
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let mut it = REQUEST_KIND_LIST.iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)
f(*next)
});
if let Some(next) = it.next() {

View File

@@ -21,9 +21,8 @@ use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_types::DateTime;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;
use aws_smithy_types::date_time::ConversionError;
@@ -46,7 +45,7 @@ use crate::support::PermitCarrying;
use crate::{
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
TimeTravelError, TimeoutOrCancel,
TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
};
/// AWS S3 storage.
@@ -66,6 +65,7 @@ struct GetObjectRequest {
key: String,
etag: Option<String>,
range: Option<String>,
version_id: Option<String>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
@@ -251,6 +251,7 @@ impl S3Bucket {
.get_object()
.bucket(request.bucket)
.key(request.key)
.set_version_id(request.version_id)
.set_range(request.range);
if let Some(etag) = request.etag {
@@ -405,6 +406,124 @@ impl S3Bucket {
Ok(())
}
async fn list_versions_with_permit(
&self,
_permit: &tokio::sync::SemaphorePermit<'_>,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
let mut versions_and_deletes = Vec::new();
loop {
let response = backoff::retry(
|| async {
let mut request = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone());
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let op = request.send();
tokio::select! {
res = op => res.map_err(|e| DownloadError::Other(e.into())),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions",
cancel,
)
.await
.ok_or_else(|| DownloadError::Cancelled)
.and_then(|x| x)?;
tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
response.version_id_marker,
response.key_marker
);
let versions = response
.versions
.unwrap_or_default()
.into_iter()
.map(|version| {
let key = version.key.expect("response does not contain a key");
let key = self.s3_object_to_relative_path(&key);
let version_id = VersionId(version.version_id.expect("needing version id"));
let last_modified =
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
Ok(Version {
key,
last_modified,
kind: crate::VersionKind::Version(version_id),
})
});
let deletes = response
.delete_markers
.unwrap_or_default()
.into_iter()
.map(|version| {
let key = version.key.expect("response does not contain a key");
let key = self.s3_object_to_relative_path(&key);
let last_modified =
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
Ok(Version {
key,
last_modified,
kind: crate::VersionKind::DeletionMarker,
})
});
itertools::process_results(versions.chain(deletes), |n_vds| {
versions_and_deletes.extend(n_vds)
})
.map_err(DownloadError::Other)?;
fn none_if_empty(v: Option<String>) -> Option<String> {
v.filter(|v| !v.is_empty())
}
version_id_marker = none_if_empty(response.next_version_id_marker);
key_marker = none_if_empty(response.next_key_marker);
if version_id_marker.is_none() {
// The final response is not supposed to be truncated
if response.is_truncated.unwrap_or_default() {
return Err(DownloadError::Other(anyhow::anyhow!(
"Received truncated ListObjectVersions response for prefix={prefix:?}"
)));
}
break;
}
if let Some(max_keys) = max_keys {
if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
}
}
}
Ok(VersionListing {
versions: versions_and_deletes,
})
}
pub fn bucket_name(&self) -> &str {
&self.bucket_name
}
@@ -621,6 +740,19 @@ impl RemoteStorage for S3Bucket {
}
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
let kind = RequestKind::ListVersions;
let permit = self.permit(kind, cancel).await?;
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
&self,
key: &RemotePath,
@@ -801,6 +933,7 @@ impl RemoteStorage for S3Bucket {
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
},
cancel,
)
@@ -845,94 +978,25 @@ impl RemoteStorage for S3Bucket {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
let timestamp = DateTime::from(timestamp);
let done_if_after = DateTime::from(done_if_after);
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
let mut versions_and_deletes = Vec::new();
loop {
let response = backoff::retry(
|| async {
let op = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone())
.send();
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions for time_travel_recover",
cancel,
)
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
.await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
response.version_id_marker,
response.key_marker
);
let versions = response
.versions
.unwrap_or_default()
.into_iter()
.map(VerOrDelete::from_version);
let deletes = response
.delete_markers
.unwrap_or_default()
.into_iter()
.map(VerOrDelete::from_delete_marker);
itertools::process_results(versions.chain(deletes), |n_vds| {
versions_and_deletes.extend(n_vds)
})
.map_err(TimeTravelError::Other)?;
fn none_if_empty(v: Option<String>) -> Option<String> {
v.filter(|v| !v.is_empty())
}
version_id_marker = none_if_empty(response.next_version_id_marker);
key_marker = none_if_empty(response.next_key_marker);
if version_id_marker.is_none() {
// The final response is not supposed to be truncated
if response.is_truncated.unwrap_or_default() {
return Err(TimeTravelError::Other(anyhow::anyhow!(
"Received truncated ListObjectVersions response for prefix={prefix:?}"
)));
}
break;
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: usize = 100_000;
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
return Err(TimeTravelError::TooManyVersions);
}
}
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),
DownloadError::Cancelled => TimeTravelError::Cancelled,
other => TimeTravelError::Other(other.into()),
})?;
let versions_and_deletes = version_listing.versions;
tracing::info!(
"Built list for time travel with {} versions and deletions",
@@ -948,24 +1012,26 @@ impl RemoteStorage for S3Bucket {
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
for vd in &versions_and_deletes {
let VerOrDelete {
version_id, key, ..
} = &vd;
if version_id == "null" {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"
)));
}
tracing::trace!(
"Parsing version key={key} version_id={version_id} kind={:?}",
vd.kind
);
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
vds_for_key.entry(key).or_default().push(vd);
}
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
for (key, versions) in vds_for_key {
let last_vd = versions.last().unwrap();
let key = self.relative_path_to_s3_object(key);
if last_vd.last_modified > done_if_after {
tracing::trace!("Key {key} has version later than done_if_after, skipping");
continue;
@@ -990,11 +1056,11 @@ impl RemoteStorage for S3Bucket {
do_delete = true;
} else {
match &versions[version_to_restore_to - 1] {
VerOrDelete {
kind: VerOrDeleteKind::Version,
version_id,
Version {
kind: VersionKind::Version(version_id),
..
} => {
let version_id = &version_id.0;
tracing::trace!("Copying old version {version_id} for {key}...");
// Restore the state to the last version by copying
let source_id =
@@ -1006,7 +1072,7 @@ impl RemoteStorage for S3Bucket {
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(key)
.key(&key)
.set_storage_class(self.upload_storage_class.clone())
.copy_source(&source_id)
.send();
@@ -1027,8 +1093,8 @@ impl RemoteStorage for S3Bucket {
.and_then(|x| x)?;
tracing::info!(%version_id, %key, "Copied old version in S3");
}
VerOrDelete {
kind: VerOrDeleteKind::DeleteMarker,
Version {
kind: VersionKind::DeletionMarker,
..
} => {
do_delete = true;
@@ -1036,7 +1102,7 @@ impl RemoteStorage for S3Bucket {
}
};
if do_delete {
if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
// Key has since been deleted (but there was some history), no need to do anything
tracing::trace!("Key {key} already deleted, skipping.");
} else {
@@ -1064,62 +1130,6 @@ impl RemoteStorage for S3Bucket {
}
}
// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
struct VerOrDelete {
kind: VerOrDeleteKind,
last_modified: DateTime,
version_id: String,
key: String,
}
#[derive(Debug)]
enum VerOrDeleteKind {
Version,
DeleteMarker,
}
impl VerOrDelete {
fn with_kind(
kind: VerOrDeleteKind,
last_modified: Option<DateTime>,
version_id: Option<String>,
key: Option<String>,
) -> anyhow::Result<Self> {
let lvk = (last_modified, version_id, key);
let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
anyhow::bail!(
"One (or more) of last_modified, key, and id is None. \
Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
lvk.0,
lvk.1,
lvk.2,
);
};
Ok(Self {
kind,
last_modified,
version_id,
key,
})
}
fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
Self::with_kind(
VerOrDeleteKind::Version,
v.last_modified,
v.version_id,
v.key,
)
}
fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
Self::with_kind(
VerOrDeleteKind::DeleteMarker,
v.last_modified,
v.version_id,
v.key,
)
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;

View File

@@ -139,6 +139,20 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner
.list_versions(prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
&self,
key: &RemotePath,

View File

@@ -11,6 +11,7 @@ use pageserver::task_mgr::TaskKind;
use pageserver::tenant::storage_layer::InMemoryLayer;
use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
@@ -21,13 +22,14 @@ use wal_decoder::serialized_batch::SerializedValueBatch;
// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h = h.wrapping_mul(0x85ebca6b);
h h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum KeyLayout {
/// Sequential unique keys
Sequential,
@@ -37,6 +39,7 @@ enum KeyLayout {
RandomReuse(u32),
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum WriteDelta {
Yes,
No,
@@ -138,12 +141,15 @@ async fn ingest(
/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
io_mode: IoMode,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
let runtime = tokio::runtime::Builder::new_current_thread()
pageserver::virtual_file::set_io_mode(io_mode);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
@@ -174,93 +180,207 @@ fn criterion_benchmark(c: &mut Criterion) {
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
// immaterial, each `ingest_main` invocation below overrides this
conf.virtual_file_io_mode,
// without actually doing syncs, buffered writes have an unfair advantage over direct IO writes
virtual_file::SyncMode::Sync,
);
page_cache::init(conf.page_cache_size);
{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
#[derive(serde::Serialize)]
struct ExplodedParameters {
io_mode: IoMode,
volume_mib: usize,
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
}
{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
#[derive(Clone)]
struct HandPickedParameters {
volume_mib: usize,
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
}
let expect = vec![
// Small values (100b) tests
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Random,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::RandomReuse(0x3ff),
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::No,
},
// Large values (8k) tests
HandPickedParameters {
volume_mib: 128,
key_size: 8192,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 8192,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::No,
},
];
let exploded_parameters = {
let mut out = Vec::new();
for io_mode in [
IoMode::Buffered,
#[cfg(target_os = "linux")]
IoMode::Direct,
] {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,
key_size,
key_layout,
write_delta,
} = param;
out.push(ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
});
}
}
out
};
impl ExplodedParameters {
fn benchmark_id(&self) -> String {
let ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
} = self;
format!(
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
)
}
}
let mut group = c.benchmark_group("ingest");
for params in exploded_parameters {
let id = params.benchmark_id();
let ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
} = params;
let put_count = volume_mib * 1024 * 1024 / key_size;
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
group.bench_function(id, |b| {
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
/*
cargo bench --bench bench_ingest
im4gn.2xlarge:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8491 s 1.8540 s 1.8592 s]
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.6976 s 2.7123 s 2.7286 s]
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.7433 s 1.7510 s 1.7600 s]
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [499.63 ms 500.07 ms 500.46 ms]
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [456.97 ms 459.61 ms 461.92 ms]
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [158.82 ms 159.16 ms 159.56 ms]
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8856 s 1.8997 s 1.9179 s]
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.7468 s 2.7625 s 2.7785 s]
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.7689 s 1.7726 s 1.7767 s]
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [497.64 ms 498.60 ms 499.67 ms]
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [493.72 ms 505.07 ms 518.03 ms]
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [267.76 ms 267.85 ms 267.96 ms]
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
Hetzner AX102:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0683 s 1.1006 s 1.1386 s]
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5719 s 1.6012 s 1.6228 s]
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.1095 s 1.1331 s 1.1580 s]
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [303.20 ms 307.83 ms 311.90 ms]
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [406.34 ms 429.37 ms 451.63 ms]
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [134.01 ms 135.78 ms 137.48 ms]
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0406 s 1.0580 s 1.0772 s]
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5059 s 1.5339 s 1.5625 s]
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.0714 s 1.0934 s 1.1161 s]
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [262.68 ms 265.14 ms 267.71 ms]
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [375.19 ms 393.80 ms 411.40 ms]
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [123.02 ms 123.85 ms 124.66 ms]
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
*/

View File

@@ -419,6 +419,23 @@ impl Client {
}
}
pub async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
);
self.request(Method::GET, &uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_archival_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -225,6 +225,11 @@ pub struct PageServerConf {
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
/// Run in development mode, which disables certain safety checks
/// such as authentication requirements for HTTP and PostgreSQL APIs.
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
}
/// Token for authentication to safekeepers
@@ -398,6 +403,7 @@ impl PageServerConf {
generate_unarchival_heatmap,
tracing,
enable_tls_page_service_api,
dev_mode,
} = config_toml;
let mut conf = PageServerConf {
@@ -449,6 +455,7 @@ impl PageServerConf {
get_vectored_concurrent_io,
tracing,
enable_tls_page_service_api,
dev_mode,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -3,10 +3,11 @@ use std::collections::HashMap;
use futures::Future;
use pageserver_api::config::NodeMetadata;
use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
use pageserver_api::models::ShardImportStatus;
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
ValidateRequestTenant, ValidateResponse,
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequest, ValidateRequestTenant, ValidateResponse,
};
use reqwest::Certificate;
use serde::Serialize;
@@ -14,7 +15,7 @@ use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::generation::Generation;
use utils::id::NodeId;
use utils::id::{NodeId, TimelineId};
use utils::{backoff, failpoint_support};
use crate::config::PageServerConf;
@@ -46,6 +47,12 @@ pub trait StorageControllerUpcallApi {
&self,
tenants: Vec<(TenantShardId, Generation)>,
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
}
impl StorageControllerUpcallClient {
@@ -273,4 +280,30 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(result.into_iter().collect())
}
/// Send a shard import status to the storage controller
///
/// The implementation must have at-least-once delivery semantics.
/// To this end, we retry the request until it succeeds. If the pageserver
/// restarts or crashes, the shard import will start again from the beggining.
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
async fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> Result<(), RetryForeverError> {
let url = self
.base_url
.join("timeline_import_status")
.expect("Failed to build path");
let request = PutTimelineImportStatusRequest {
tenant_shard_id,
timeline_id,
status,
};
self.retry_http_forever(&url, request).await
}
}

View File

@@ -787,6 +787,15 @@ mod test {
Ok(result)
}
async fn put_timeline_import_status(
&self,
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
_status: pageserver_api::models::ShardImportStatus,
) -> Result<(), RetryForeverError> {
unimplemented!()
}
}
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {

View File

@@ -28,7 +28,7 @@ use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
#[derive(Copy, Clone, Debug)]
@@ -218,7 +218,7 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: VirtualFile,
inner: TempVirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
@@ -228,7 +228,7 @@ pub struct BlobWriter<const BUFFERED: bool> {
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(
inner: VirtualFile,
inner: TempVirtualFile,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
@@ -476,30 +476,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}
}
impl BlobWriter<true> {
/// Access the underlying `VirtualFile`.
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
///
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?;
/// If there is an internal buffer (depends on `BUFFERED`), it will
/// be flushed before this method returns.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
if BUFFERED {
self.flush_buffer(ctx).await?;
}
Ok(self.inner)
}
/// Access the underlying `VirtualFile`.
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
self.inner
}
}
impl BlobWriter<false> {
/// Access the underlying `VirtualFile`.
pub fn into_inner(self) -> VirtualFile {
self.inner
}
}
#[cfg(test)]
@@ -512,6 +499,7 @@ pub(crate) mod tests {
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use crate::virtual_file::VirtualFile;
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
@@ -530,7 +518,10 @@ pub(crate) mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let file = TempVirtualFile::new(
VirtualFile::create(pathbuf.as_path(), ctx).await?,
gate.enter().unwrap(),
);
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
@@ -553,7 +544,9 @@ pub(crate) mod tests {
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
wtr.flush_buffer(ctx).await?;
let file = wtr.into_inner(ctx).await?;
file.disarm_into_inner();
}
Ok((temp_dir, pathbuf, offsets))
}

View File

@@ -12,6 +12,7 @@ use tokio_epoll_uring::{BoundedBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::{error, info_span};
use utils::id::TimelineId;
use utils::sync::gate::GateGuard;
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
@@ -21,16 +22,33 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
buffered_writer: BufferedWriter,
}
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
IoBufferMut,
TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
>;
/// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
///
/// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
/// The co-ownership is between [`EphemeralFile`] and that flush task.)
///
/// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
#[derive(Debug, Clone)]
struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
inner: Arc<TempVirtualFile>,
}
const TAIL_SZ: usize = 64 * 1024;
@@ -44,9 +62,12 @@ impl EphemeralFile {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<EphemeralFile> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
.timeline_path(&tenant_shard_id, &timeline_id)
@@ -54,7 +75,7 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));
let file = Arc::new(
let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
VirtualFile::open_with_options_v2(
&filename,
virtual_file::OpenOptions::new()
@@ -64,6 +85,7 @@ impl EphemeralFile {
ctx,
)
.await?,
gate.enter()?,
);
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
@@ -73,7 +95,8 @@ impl EphemeralFile {
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file: file.clone(),
buffered_writer: BufferedWriter::new(
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
@@ -81,29 +104,42 @@ impl EphemeralFile {
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
})
}
}
impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = self.buffered_writer.as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
}
}
}
impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
&self,
buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<
Output = (
owned_buffers_io::io_buf_ext::FullSlice<Buf>,
std::io::Result<()>,
),
> + Send {
self.inner.write_all_at(buf, offset, ctx)
}
}
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
@@ -262,9 +298,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = self.buffered_writer.as_inner();
let bounds = dst.bounds();
let slice = file
let slice = self
.file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
@@ -456,7 +492,7 @@ mod tests {
assert_eq!(&buf, &content[range]);
}
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
let file_contents = std::fs::read(file.file.path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
@@ -489,7 +525,7 @@ mod tests {
// assert the state is as this test expects it to be
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
let md = file.file.path().metadata().unwrap();
assert_eq!(
md.len(),
2 * cap.into_u64(),

View File

@@ -6,6 +6,7 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;
use anyhow::{Context, anyhow};
@@ -15,7 +16,7 @@ use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
};
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::io::AsyncSeekExt;
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
@@ -40,7 +41,10 @@ use crate::span::{
use crate::tenant::Generation;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
use crate::virtual_file;
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
///
/// If 'metadata' is given, we will validate that the downloaded file's size matches that
@@ -72,21 +76,36 @@ pub async fn download_layer_file<'a>(
layer_metadata.generation,
);
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
// write(tmp)
// fsync(tmp)
// rename(tmp, new)
// fsync(new)
// fsync(parent)
// For more context about durable_rename check this email from postgres mailing list:
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
let bytes_amount = download_retry(
let (bytes_amount, temp_file) = download_retry(
|| async {
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let temp_file_path = path_with_suffix_extension(
local_path,
&format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
);
let temp_file = TempVirtualFile::new(
// Not _v2 yet which is sensitive to virtual_file_io_mode.
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
VirtualFile::open_with_options(
&temp_file_path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await
.with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
.map_err(DownloadError::Other)?,
gate.enter().map_err(|_| DownloadError::Cancelled)?,
);
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
},
&format!("download {remote_path:?}"),
cancel,
@@ -96,7 +115,8 @@ pub async fn download_layer_file<'a>(
let expected = layer_metadata.file_size;
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
temp_file.path()
)));
}
@@ -106,11 +126,28 @@ pub async fn download_layer_file<'a>(
)))
});
fs::rename(&temp_file_path, &local_path)
// Try rename before disarming the temp file.
// That way, if rename fails for whatever reason, we clean up the temp file on the return path.
fs::rename(temp_file.path(), &local_path)
.await
.with_context(|| format!("rename download layer file to {local_path}"))
.map_err(DownloadError::Other)?;
// The temp file's VirtualFile points to the temp_file_path which we moved above.
// Drop it immediately, it's invalid.
// This will get better in https://github.com/neondatabase/neon/issues/11692
let _: VirtualFile = temp_file.disarm_into_inner();
// NB: The gate guard that was stored in `temp_file` is dropped but we continue
// to operate on it and on the parent timeline directory.
// Those operations are safe to do because higher-level code is holding another gate guard:
// - attached mode: the download task spawned by struct Layer is holding the gate guard
// - secondary mode: The TenantDownloader::download holds the gate open
// The rename above is not durable yet.
// It doesn't matter for crash consistency because pageserver startup deletes temp
// files and we'll re-download on demand if necessary.
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
@@ -146,147 +183,58 @@ pub async fn download_layer_file<'a>(
async fn download_object(
storage: &GenericRemoteStorage,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
destination_file: TempVirtualFile,
gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
crate::virtual_file::io_engine::IoEngine::StdFs => {
async {
let destination_file = tokio::fs::File::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
ctx: &RequestContext,
) -> Result<(u64, TempVirtualFile), DownloadError> {
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
let download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
pausable_failpoint!("before-downloading-layer-stream-pausable");
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
let mut buf_writer =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
buf_writer.flush().await?;
let mut destination_file = buf_writer.into_inner();
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use std::sync::Arc;
use crate::virtual_file::{IoBufferMut, owned_buffers_io};
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)
.await
.with_context(|| {
format!("create a destination file for layer '{dst_path}'")
})
.map_err(DownloadError::Other)?,
);
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered
.flush_and_into_inner(ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
};
// in case the download failed, clean up
match res {
Ok(bytes_amount) => Ok(bytes_amount),
Err(e) => {
if let Err(e) = tokio::fs::remove_file(dst_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
}
}
Err(e)
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok((bytes_amount, destination_file))
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";

View File

@@ -646,7 +646,7 @@ enum UpdateError {
NoData,
#[error("Insufficient local storage space")]
NoSpace,
#[error("Failed to download")]
#[error("Failed to download: {0}")]
DownloadError(DownloadError),
#[error(transparent)]
Deserialize(#[from] serde_json::Error),

View File

@@ -34,6 +34,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
@@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
@@ -74,6 +73,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -288,19 +288,20 @@ impl DeltaLayer {
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!(
"{}-XXX__{:016X}-{:016X}.{}.{}",
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
rand_string,
filename_disambiguator,
TEMP_FILE_SUFFIX,
))
}
@@ -421,7 +422,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = VirtualFile::create(&path, ctx).await?;
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -515,22 +516,6 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
async fn finish0(
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -598,6 +583,10 @@ impl DeltaLayerWriterInner {
trace!("created delta layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path))
}
}
@@ -726,17 +715,6 @@ impl DeltaLayerWriter {
}
}
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -1609,8 +1587,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -32,6 +32,7 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
@@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
@@ -72,6 +71,7 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -252,14 +252,18 @@ impl ImageLayer {
tenant_shard_id: TenantShardId,
fname: &ImageLayerName,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
}
///
@@ -773,7 +777,7 @@ impl ImageLayerWriterInner {
},
);
trace!("creating image layer {}", path);
let mut file = {
let mut file = TempVirtualFile::new(
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
@@ -781,8 +785,9 @@ impl ImageLayerWriterInner {
.create_new(true),
ctx,
)
.await?
};
.await?,
gate.enter()?,
);
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -896,25 +901,6 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -932,7 +918,7 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
};
let mut file = self.blob_writer.into_inner();
let mut file = self.blob_writer.into_inner(ctx).await?;
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
@@ -1000,6 +986,10 @@ impl ImageLayerWriterInner {
trace!("created image layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path))
}
}
@@ -1125,14 +1115,6 @@ impl ImageLayerWriter {
}
}
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
}
}
pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,

View File

@@ -1,20 +1,21 @@
use std::sync::Arc;
use anyhow::{Context, bail};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span};
use tracing::info;
use utils::lsn::Lsn;
use super::Timeline;
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
mod flow;
mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub(crate) mod upcall_api;
pub async fn doit(
timeline: &Arc<Timeline>,
@@ -34,23 +35,6 @@ pub async fn doit(
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
info!("get spec early so we know we'll be able to upcall when done");
let Some(spec) = storage.get_spec().await? else {
bail!("spec not found")
};
let upcall_client =
upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
//
// send an early progress update to clean up k8s job early and generate potentially useful logs
//
info!("send early progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("early_progress_update"))
.await?;
let status_prefix = RemotePath::from_string("status").unwrap();
//
@@ -176,7 +160,21 @@ pub async fn doit(
//
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
storage
.put_json(
&shard_status_key,
@@ -186,16 +184,6 @@ pub async fn doit(
.context("put shard status")?;
}
//
// Ensure at-least-once deliver of the upcall to cplane
// before we mark the task as done and never come here again.
//
info!("send final progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("final_progress_update"))
.await?;
//
// Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path

View File

@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
use utils::lsn::Lsn;
use super::{importbucket_format, index_part_format};
use super::index_part_format;
use crate::assert_u64_eq_usize::U64IsUsize;
use crate::config::PageServerConf;
@@ -173,12 +173,6 @@ impl RemoteStorageWrapper {
res
}
pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
self.get_json(&RemotePath::from_string("spec.json").unwrap())
.await
.context("get spec")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_json<T: DeserializeOwned>(
&self,
@@ -244,7 +238,8 @@ impl RemoteStorageWrapper {
kind: DownloadKind::Large,
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive)
byte_end: Bound::Excluded(end_exclusive),
version_id: None,
},
&self.cancel)
.await?;

View File

@@ -11,10 +11,3 @@ pub struct ShardStatus {
pub done: bool,
// TODO: remaining fields
}
// TODO: dedupe with fast_import code
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Spec {
pub project_id: String,
pub branch_id: String,
}

View File

@@ -1,124 +0,0 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use reqwest::{Certificate, Method};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::error;
use super::importbucket_format::Spec;
use crate::config::PageServerConf;
pub struct Client {
base_url: String,
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressRequest {
// no fields yet, not sure if there every will be any
}
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressResponse {
// we don't care
}
impl Client {
pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result<Self> {
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
anyhow::bail!("import_pgdata_upcall_api is not configured")
};
let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
}
let http_client = http_client.build()?;
Ok(Self {
base_url: base_url.to_string(),
client: http_client,
cancel,
authorization_header: conf
.import_pgdata_upcall_api_token
.as_ref()
.map(|secret_string| secret_string.get_contents())
.map(|jwt| format!("Bearer {jwt}")),
})
}
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
}
}
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> {
let url = format!(
"{}/projects/{}/branches/{}/import_progress",
self.base_url, spec.project_id, spec.branch_id
);
let ImportProgressResponse {} = self
.request(Method::POST, url, &ImportProgressRequest {})
.await?
.json()
.await
.map_err(Error::ReceiveBody)?;
Ok(())
}
pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> {
loop {
match self.send_progress_once(spec).await {
Ok(()) => return Ok(()),
Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")),
Err(err) => {
error!(?err, "error sending progress, retrying");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
self.cancel.cancelled(),
)
.await
.is_ok()
{
anyhow::bail!("cancelled while sending early progress update");
}
}
}
}
}
}

View File

@@ -25,29 +25,31 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pub use pageserver_api::models::virtual_file as api;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use self::owned_buffers_io::write::OwnedAsyncWriter;
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::context::RequestContext;
use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
use crate::page_cache::{PAGE_SZ, PageWriteGuard};
pub(crate) mod io_engine;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub use io_engine::{
FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
io_engine_for_bench,
};
mod metadata;
mod open_options;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub use pageserver_api::models::virtual_file as api;
pub use temporary::TempVirtualFile;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) mod io_engine;
mod metadata;
mod open_options;
mod temporary;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
//!
@@ -1369,7 +1371,7 @@ pub(crate) type IoPageSlice<'a> =
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
pub(crate) fn set_io_mode(mode: IoMode) {
pub fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}

View File

@@ -1,5 +1,4 @@
mod flush;
use std::sync::Arc;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
@@ -41,7 +40,6 @@ pub trait OwnedAsyncWriter {
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
// since we would avoid copying majority of the data into the internal buffer.
pub struct BufferedWriter<B: Buffer, W> {
writer: Arc<W>,
/// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after.
pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
@@ -72,7 +70,7 @@ where
///
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(
writer: Arc<W>,
writer: W,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -80,7 +78,6 @@ where
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
mutable: Some(buf_new()),
maybe_flushed: None,
flush_handle: FlushHandle::spawn_new(
@@ -95,10 +92,6 @@ where
}
}
pub fn as_inner(&self) -> &W {
&self.writer
}
/// Returns the number of bytes submitted to the background flush task.
pub fn bytes_submitted(&self) -> u64 {
self.bytes_submitted
@@ -116,20 +109,16 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner(
mut self,
ctx: &RequestContext,
) -> Result<(u64, Arc<W>), FlushTaskError> {
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
self.flush(ctx).await?;
let Self {
mutable: buf,
maybe_flushed: _,
writer,
mut flush_handle,
bytes_submitted: bytes_amount,
} = self;
flush_handle.shutdown().await?;
let writer = flush_handle.shutdown().await?;
assert!(buf.is_some());
Ok((bytes_amount, writer))
}
@@ -329,7 +318,7 @@ mod tests {
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = Arc::new(RecorderWriter::default());
let recorder = RecorderWriter::default();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
@@ -350,7 +339,7 @@ mod tests {
writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?;
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
let (_, recorder) = writer.shutdown(ctx).await?;
assert_eq!(
recorder.get_writes(),
{

View File

@@ -1,5 +1,4 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
@@ -21,7 +20,7 @@ pub struct FlushHandleInner<Buf, W> {
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
}
struct FlushRequest<Buf> {
@@ -120,7 +119,7 @@ where
/// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
/// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
pub fn spawn_new<B>(
file: Arc<W>,
file: W,
buf: B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -183,7 +182,7 @@ where
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
let handle = self
.inner
.take()
@@ -207,7 +206,7 @@ pub struct FlushBackgroundTask<Buf, W> {
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
/// A writter for persisting data to disk.
writer: Arc<W>,
writer: W,
ctx: RequestContext,
cancel: CancellationToken,
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
@@ -228,7 +227,7 @@ where
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: Arc<W>,
file: W,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
ctx: RequestContext,
@@ -243,7 +242,7 @@ where
}
/// Runs the background flush task.
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
async fn run(mut self) -> Result<W, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]

View File

@@ -0,0 +1,106 @@
use tracing::error;
use utils::sync::gate::GateGuard;
use crate::context::RequestContext;
use super::{
MaybeFatalIo, VirtualFile,
owned_buffers_io::{
io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice, write::OwnedAsyncWriter,
},
};
/// A wrapper around [`super::VirtualFile`] that deletes the file on drop.
/// For use as a [`OwnedAsyncWriter`] in [`super::owned_buffers_io::write::BufferedWriter`].
#[derive(Debug)]
pub struct TempVirtualFile {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
file: VirtualFile,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: GateGuard,
}
impl OwnedAsyncWriter for TempVirtualFile {
fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
VirtualFile::write_all_at(self, buf, offset, ctx)
}
}
impl Drop for TempVirtualFile {
fn drop(&mut self) {
let Some(Inner { file, _gate_guard }) = self.inner.take() else {
return;
};
let path = file.path();
if let Err(e) =
std::fs::remove_file(path).maybe_fatal_err("failed to remove the virtual file")
{
error!(err=%e, path=%path, "failed to remove");
}
drop(_gate_guard);
}
}
impl std::ops::Deref for TempVirtualFile {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self
.inner
.as_ref()
.expect("only None after into_inner or drop")
.file
}
}
impl std::ops::DerefMut for TempVirtualFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.inner
.as_mut()
.expect("only None after into_inner or drop")
.file
}
}
impl TempVirtualFile {
/// The caller is responsible for ensuring that the path of `virtual_file` is not reused
/// until after this TempVirtualFile's `Drop` impl has completed.
/// Failure to do so will result in unlinking of the reused path by the original instance's Drop impl.
/// The best way to do so is by using a monotonic counter as a disambiguator.
/// TODO: centralize this disambiguator pattern inside this struct.
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn new(virtual_file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Some(Inner {
file: virtual_file,
_gate_guard: gate_guard,
}),
}
}
/// Dismantle this wrapper and return the underlying [`VirtualFile`].
/// This disables auto-unlinking functionality that is the essence of this wrapper.
///
/// The gate guard is dropped as well; it is the callers responsibility to ensure filesystem
/// operations after calls to this functions are still gated by some other gate guard.
///
/// TODO:
/// - centralize the common usage pattern of callers (sync_all(self), rename(self, dst), sync_all(dst.parent))
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn disarm_into_inner(mut self) -> VirtualFile {
self.inner
.take()
.expect("only None after into_inner or drop, and we are into_inner, and we consume")
.file
}
}

View File

@@ -81,23 +81,6 @@ static int neon_compute_mode = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
static char *pageserver_sslcert = NULL;
static char *pageserver_sslcertmode = NULL;
static char *pageserver_sslcompression = NULL;
static char *pageserver_sslcrl = NULL;
static char *pageserver_sslcrldir = NULL;
static char *pageserver_sslkey = NULL;
static char *pageserver_sslmode = NULL;
static char *pageserver_sslpassword = NULL;
static char *pageserver_sslrootcert = NULL;
static char *pageserver_sslsni = NULL;
static char *pageserver_ssl_min_protocol_version = NULL;
static char *pageserver_ssl_max_protocol_version = NULL;
#if PG_MAJORVERSION_NUM >= 17
static char *pageserver_sslnegotiation = NULL;
#endif
static int pageserver_response_log_timeout = 10000;
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
static int pageserver_response_disconnect_timeout = 150000;
@@ -144,7 +127,7 @@ static uint64 pagestore_local_counter = 0;
typedef enum PSConnectionState {
PS_Disconnected, /* no connection yet */
PS_Connecting_Startup, /* connection starting up */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connecting_PageStream, /* negotiating pagestream */
PS_Connected, /* connected, pagestream established */
} PSConnectionState;
@@ -379,7 +362,7 @@ get_shard_number(BufferTag *tag)
}
static inline void
CLEANUP_AND_DISCONNECT(PageServer *shard)
CLEANUP_AND_DISCONNECT(PageServer *shard)
{
if (shard->wes_read)
{
@@ -401,7 +384,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
* complete the connection (e.g. due to receiving an earlier cancellation
* during connection start).
* Returns true if successfully connected; false if the connection failed.
*
*
* Throws errors in unrecoverable situations, or when this backend's query
* is canceled.
*/
@@ -424,8 +407,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[17];
const char *values[17];
const char *keywords[5];
const char *values[5];
char pid_str[16] = { 0 };
char endpoint_str[36] = { 0 };
int n_pgsql_params;
@@ -499,92 +482,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
n_pgsql_params++;
}
if (pageserver_sslcertmode)
{
keywords[n_pgsql_params] = "sslcertmode";
values[n_pgsql_params] = pageserver_sslcertmode;
n_pgsql_params++;
}
if (pageserver_sslcompression)
{
keywords[n_pgsql_params] = "sslcompression";
values[n_pgsql_params] = pageserver_sslcompression;
n_pgsql_params++;
}
if (pageserver_sslcrl)
{
keywords[n_pgsql_params] = "sslcrl";
values[n_pgsql_params] = pageserver_sslcrl;
n_pgsql_params++;
}
if (pageserver_sslcrldir)
{
keywords[n_pgsql_params] = "sslcrldir";
values[n_pgsql_params] = pageserver_sslcrldir;
n_pgsql_params++;
}
if (pageserver_sslkey)
{
keywords[n_pgsql_params] = "sslkey";
values[n_pgsql_params] = pageserver_sslkey;
n_pgsql_params++;
}
if (pageserver_sslmode)
{
keywords[n_pgsql_params] = "sslmode";
values[n_pgsql_params] = pageserver_sslmode;
n_pgsql_params++;
}
#if PG_MAJORVERSION_NUM >= 17
if (pageserver_sslnegotiation)
{
keywords[n_pgsql_params] = "sslnegotiation";
values[n_pgsql_params] = pageserver_sslnegotiation;
n_pgsql_params++;
}
#endif
if (pageserver_sslpassword)
{
keywords[n_pgsql_params] = "sslpassword";
values[n_pgsql_params] = pageserver_sslpassword;
n_pgsql_params++;
}
if (pageserver_sslrootcert)
{
keywords[n_pgsql_params] = "sslrootcert";
values[n_pgsql_params] = pageserver_sslrootcert;
n_pgsql_params++;
}
if (pageserver_sslsni)
{
keywords[n_pgsql_params] = "sslsni";
values[n_pgsql_params] = pageserver_sslsni;
n_pgsql_params++;
}
if (pageserver_ssl_max_protocol_version)
{
keywords[n_pgsql_params] = "ssl_max_protocol_version";
values[n_pgsql_params] = pageserver_ssl_max_protocol_version;
n_pgsql_params++;
}
if (pageserver_ssl_min_protocol_version)
{
keywords[n_pgsql_params] = "ssl_min_protocol_version";
values[n_pgsql_params] = pageserver_ssl_min_protocol_version;
n_pgsql_params++;
}
{
bool param_set = false;
switch (neon_compute_mode)
@@ -1580,125 +1477,6 @@ pg_init_libpagestore(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslcert",
"SSL certificate path",
"Refer to the Postgres documentation on libpq's sslcert keyword.",
&pageserver_sslcert,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslcertmode",
"SSL certificate mode",
"Refer to the Postgres documentation on libpq's sslcertmode keyword.",
&pageserver_sslcertmode,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslcrl",
"Path to the SSL server certificate revocation list",
"Refer to the Postgres documentation on libpq's sslcrl keyword.",
&pageserver_sslcrl,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslcrldir",
"Path to the directory of the SSL server certificate revocation list",
"Refer to the Postgres documentation on libpq's sslcrldir keyword.",
&pageserver_sslcrldir,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslcompression",
"SSL compression",
"Refer to the Postgres documentation on libpq's sslcompression keyword.",
&pageserver_sslcompression,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslkey",
"SSL key",
"Refer to the Postgres documentation on libpq's sslkey keyword.",
&pageserver_sslkey,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslmode",
"SSL mode",
"Refer to the Postgres documentation on libpq's sslmode keyword.",
&pageserver_sslmode,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
#if PG_MAJORVERSION_NUM >= 17
DefineCustomStringVariable(
"neon.pageserver_sslnegotiation",
"SSL negotiation",
"Refer to the Postgres documentation on libpq's sslnegotiation keyword.",
&pageserver_sslnegotiation,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
#endif
DefineCustomStringVariable(
"neon.pageserver_sslpassword",
"SSL passphrase",
"Refer to the Postgres documentation on libpq's sslpassword keyword.",
&pageserver_sslpassword,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslrootcert",
"SSL root certificate",
"Refer to the Postgres documentation on libpq's sslrootcert keyword.",
&pageserver_sslrootcert,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_sslsni",
"TLS SNI extension",
"Refer to the Postgres documentation on libpq's sslsni keyword.",
&pageserver_sslsni,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_ssl_max_protocol_version",
"SSL maxiumum protocol version",
"Refer to the Postgres documentation on libpq's ssl_max_protocol_version keyword.",
&pageserver_ssl_max_protocol_version,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.pageserver_ssl_min_protocol_version",
"SSL minimum protocol version",
"Refer to the Postgres documentation on libpq's ssl_min_protocol_version keyword.",
&pageserver_ssl_min_protocol_version,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -64,22 +64,6 @@ char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 2;
static char *safekeeper_sslcert = NULL;
static char *safekeeper_sslcertmode = NULL;
static char *safekeeper_sslcompression = NULL;
static char *safekeeper_sslcrl = NULL;
static char *safekeeper_sslcrldir = NULL;
static char *safekeeper_sslkey = NULL;
static char *safekeeper_sslmode = NULL;
static char *safekeeper_sslpassword = NULL;
static char *safekeeper_sslrootcert = NULL;
static char *safekeeper_sslsni = NULL;
static char *safekeeper_ssl_min_protocol_version = NULL;
static char *safekeeper_ssl_max_protocol_version = NULL;
#if PG_MAJORVERSION_NUM >= 17
static char *safekeeper_sslnegotiation = NULL;
#endif
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -248,125 +232,6 @@ nwp_register_gucs(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslcert",
"SSL certificate path",
"Refer to the Postgres documentation on libpq's sslcert keyword.",
&safekeeper_sslcert,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslcertmode",
"SSL certificate mode",
"Refer to the Postgres documentation on libpq's sslcertmode keyword.",
&safekeeper_sslcertmode,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslcrl",
"Path to the SSL server certificate revocation list",
"Refer to the Postgres documentation on libpq's sslcrl keyword.",
&safekeeper_sslcrl,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslcrldir",
"Path to the directory of the SSL server certificate revocation list",
"Refer to the Postgres documentation on libpq's sslcrldir keyword.",
&safekeeper_sslcrldir,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslcompression",
"SSL compression",
"Refer to the Postgres documentation on libpq's sslcompression keyword.",
&safekeeper_sslcompression,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslkey",
"SSL key",
"Refer to the Postgres documentation on libpq's sslkey keyword.",
&safekeeper_sslkey,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslmode",
"SSL mode",
"Refer to the Postgres documentation on libpq's sslmode keyword.",
&safekeeper_sslmode,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
#if PG_MAJORVERSION_NUM >= 17
DefineCustomStringVariable(
"neon.safekeeper_sslnegotiation",
"SSL negotiation",
"Refer to the Postgres documentation on libpq's sslnegotiation keyword.",
&safekeeper_sslnegotiation,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
#endif
DefineCustomStringVariable(
"neon.safekeeper_sslpassword",
"SSL passphrase",
"Refer to the Postgres documentation on libpq's sslpassword keyword.",
&safekeeper_sslpassword,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslrootcert",
"SSL root certificate",
"Refer to the Postgres documentation on libpq's sslrootcert keyword.",
&safekeeper_sslrootcert,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_sslsni",
"TLS SNI extension",
"Refer to the Postgres documentation on libpq's sslsni keyword.",
&safekeeper_sslsni,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_ssl_max_protocol_version",
"SSL maxiumum protocol version",
"Refer to the Postgres documentation on libpq's ssl_max_protocol_version keyword.",
&safekeeper_ssl_max_protocol_version,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
DefineCustomStringVariable(
"neon.safekeeper_ssl_min_protocol_version",
"SSL minimum protocol version",
"Refer to the Postgres documentation on libpq's ssl_min_protocol_version keyword.",
&safekeeper_ssl_min_protocol_version,
NULL,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
}
@@ -978,13 +843,15 @@ walprop_status(Safekeeper *sk)
WalProposerConn *
libpqwp_connect_start(char *conninfo)
{
PGconn *pg_conn;
WalProposerConn *conn;
const char *keywords[16];
const char *values[16];
const char *keywords[3];
const char *values[3];
int n;
char *password = neon_auth_token;
/*
* Connect using the given connection string. If the NEON_AUTH_TOKEN
* environment variable was set, use that as the password.
@@ -1004,90 +871,9 @@ libpqwp_connect_start(char *conninfo)
keywords[n] = "dbname";
values[n] = conninfo;
n++;
if (safekeeper_sslcert)
{
keywords[n] = "sslcert";
values[n] = safekeeper_sslcert;
n++;
}
if (safekeeper_sslcertmode)
{
keywords[n] = "sslcertmode";
values[n] = safekeeper_sslcertmode;
n++;
}
if (safekeeper_sslcompression)
{
keywords[n] = "sslcompression";
values[n] = safekeeper_sslcompression;
n++;
}
if (safekeeper_sslcrl)
{
keywords[n] = "sslcrl";
values[n] = safekeeper_sslcrl;
n++;
}
if (safekeeper_sslcrldir)
{
keywords[n] = "sslcrldir";
values[n] = safekeeper_sslcrldir;
n++;
}
if (safekeeper_sslkey)
{
keywords[n] = "sslkey";
values[n] = safekeeper_sslkey;
n++;
}
if (safekeeper_sslmode)
{
keywords[n] = "sslmode";
values[n] = safekeeper_sslmode;
n++;
}
#if PG_MAJORVERSION_NUM >= 17
if (safekeeper_sslnegotiation)
{
keywords[n] = "sslnegotiation";
values[n] = safekeeper_sslnegotiation;
n++;
}
#endif
if (safekeeper_sslpassword)
{
keywords[n] = "sslpassword";
values[n] = safekeeper_sslpassword;
n++;
}
if (safekeeper_sslrootcert)
{
keywords[n] = "sslrootcert";
values[n] = safekeeper_sslrootcert;
n++;
}
if (safekeeper_sslsni)
{
keywords[n] = "sslsni";
values[n] = safekeeper_sslsni;
n++;
}
if (safekeeper_ssl_max_protocol_version)
{
keywords[n] = "ssl_max_protocol_version";
values[n] = safekeeper_ssl_max_protocol_version;
n++;
}
if (safekeeper_ssl_min_protocol_version)
{
keywords[n] = "ssl_min_protocol_version";
values[n] = safekeeper_ssl_min_protocol_version;
n++;
}
keywords[n] = NULL;
values[n] = NULL;
n++;
pg_conn = PQconnectStartParams(keywords, values, 1);
/*

View File

@@ -12,7 +12,7 @@ use pin_project_lite::pin_project;
use smol_str::SmolStr;
use strum_macros::FromRepr;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use zerocopy::{FromBytes, FromZeroes};
use zerocopy::{FromBytes, Immutable, KnownLayout, Unaligned, network_endian};
pin_project! {
/// A chained [`AsyncRead`] with [`AsyncWrite`] passthrough
@@ -339,49 +339,49 @@ trait BufExt: Sized {
}
impl BufExt for BytesMut {
fn try_get<T: FromBytes>(&mut self) -> Option<T> {
let res = T::read_from_prefix(self)?;
let (res, _) = T::read_from_prefix(self).ok()?;
self.advance(size_of::<T>());
Some(res)
}
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct ProxyProtocolV2Header {
signature: [u8; 12],
version_and_command: u8,
protocol_and_family: u8,
len: zerocopy::byteorder::network_endian::U16,
len: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct ProxyProtocolV2HeaderV4 {
src_addr: NetworkEndianIpv4,
dst_addr: NetworkEndianIpv4,
src_port: zerocopy::byteorder::network_endian::U16,
dst_port: zerocopy::byteorder::network_endian::U16,
src_port: network_endian::U16,
dst_port: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct ProxyProtocolV2HeaderV6 {
src_addr: NetworkEndianIpv6,
dst_addr: NetworkEndianIpv6,
src_port: zerocopy::byteorder::network_endian::U16,
dst_port: zerocopy::byteorder::network_endian::U16,
src_port: network_endian::U16,
dst_port: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
struct TlvHeader {
kind: u8,
len: zerocopy::byteorder::network_endian::U16,
len: network_endian::U16,
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(transparent)]
struct NetworkEndianIpv4(zerocopy::byteorder::network_endian::U32);
struct NetworkEndianIpv4(network_endian::U32);
impl NetworkEndianIpv4 {
#[inline]
fn get(self) -> Ipv4Addr {
@@ -389,9 +389,9 @@ impl NetworkEndianIpv4 {
}
}
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(transparent)]
struct NetworkEndianIpv6(zerocopy::byteorder::network_endian::U128);
struct NetworkEndianIpv6(network_endian::U128);
impl NetworkEndianIpv6 {
#[inline]
fn get(self) -> Ipv6Addr {

View File

@@ -226,11 +226,16 @@ struct Args {
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
/// Enable TLS in WAL service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
#[arg(long)]
enable_tls_wal_service_api: bool,
/// Run in development mode (disables security checks)
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
}
// Like PathBufValueParser, but allows empty string.

View File

@@ -0,0 +1 @@
DROP TABLE timeline_imports;

View File

@@ -0,0 +1,6 @@
CREATE TABLE timeline_imports (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
shard_statuses JSONB NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);

View File

@@ -30,7 +30,9 @@ use pageserver_api::models::{
TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
use pageserver_api::upcall_api::{
PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest,
};
use pageserver_client::{BlockUnblock, mgmt_api};
use routerify::Middleware;
use tokio_util::sync::CancellationToken;
@@ -154,6 +156,28 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.handle_timeline_shard_import_progress_upcall(put_req)
.await?,
)
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments).
@@ -1961,6 +1985,13 @@ pub fn make_router(
.post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
})
.post("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,
handle_put_timeline_import_status,
RequestName("upcall_v1_timeline_import_status"),
)
})
// Test/dev/debug endpoints
.post("/debug/v1/attach-hook", |r| {
named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))

View File

@@ -23,6 +23,7 @@ mod scheduler;
mod schema;
pub mod service;
mod tenant_shard;
mod timeline_import;
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
struct Sequence(u64);

View File

@@ -212,6 +212,21 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
measured_request!(
"timeline_detail",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner
.timeline_detail(tenant_shard_id, timeline_id)
.await
)
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
};
use pageserver_api::models::TenantConfig;
use pageserver_api::models::{ShardImportStatus, TenantConfig};
use pageserver_api::shard::{
ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
@@ -40,6 +40,9 @@ use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
};
use crate::node::Node;
use crate::timeline_import::{
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
};
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// ## What do we store?
@@ -127,6 +130,9 @@ pub(crate) enum DatabaseOperation {
RemoveTimelineReconcile,
ListTimelineReconcile,
ListTimelineReconcileStartup,
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
}
#[must_use]
@@ -1614,6 +1620,129 @@ impl Persistence {
Ok(())
}
pub(crate) async fn insert_timeline_import(
&self,
import: TimelineImportPersistence,
) -> DatabaseResult<bool> {
self.with_measured_conn(DatabaseOperation::InsertTimelineImport, move |conn| {
Box::pin({
let import = import.clone();
async move {
let inserted = diesel::insert_into(crate::schema::timeline_imports::table)
.values(import)
.execute(conn)
.await?;
Ok(inserted == 1)
}
})
})
.await
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::DeleteTimelineImport, move |conn| {
Box::pin(async move {
diesel::delete(crate::schema::timeline_imports::table)
.filter(
dsl::tenant_id
.eq(tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.execute(conn)
.await?;
Ok(())
})
})
.await
}
/// Idempotently update the status of one shard for an ongoing timeline import
///
/// If the update was persisted to the database, then the current state of the
/// import is returned to the caller. In case of logical errors a bespoke
/// [`TimelineImportUpdateError`] instance is returned. Other database errors
/// are covered by the outer [`DatabaseError`].
pub(crate) async fn update_timeline_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
shard_status: ShardImportStatus,
) -> DatabaseResult<Result<Option<TimelineImport>, TimelineImportUpdateError>> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::UpdateTimelineImport, move |conn| {
Box::pin({
let shard_status = shard_status.clone();
async move {
// Load the current state from the database
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.load(conn)
.await?;
assert!(from_db.len() <= 1);
let mut status = match from_db.pop() {
Some(some) => TimelineImport::from_persistent(some).unwrap(),
None => {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
};
// Perform the update in-memory
let follow_up = match status.update(tenant_shard_id.to_index(), shard_status) {
Ok(ok) => ok,
Err(err) => {
return Ok(Err(err));
}
};
let new_persistent = status.to_persistent();
// Write back if required (in the same transaction)
match follow_up {
TimelineImportUpdateFollowUp::Persist => {
let updated = diesel::update(dsl::timeline_imports)
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.set(dsl::shard_statuses.eq(new_persistent.shard_statuses))
.execute(conn)
.await?;
if updated != 1 {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
Ok(Ok(Some(status)))
}
TimelineImportUpdateFollowUp::None => Ok(Ok(None)),
}
}
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -2171,3 +2300,11 @@ impl ToSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
.map_err(Into::into)
}
}
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Clone)]
#[diesel(table_name = crate::schema::timeline_imports)]
pub(crate) struct TimelineImportPersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) shard_statuses: serde_json::Value,
}

View File

@@ -76,6 +76,14 @@ diesel::table! {
}
}
diesel::table! {
timeline_imports (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
shard_statuses -> Jsonb,
}
}
diesel::table! {
use diesel::sql_types::*;
use super::sql_types::PgLsn;
@@ -99,5 +107,6 @@ diesel::allow_tables_to_appear_in_same_query!(
safekeeper_timeline_pending_ops,
safekeepers,
tenant_shards,
timeline_imports,
timelines,
);

View File

@@ -40,14 +40,14 @@ use pageserver_api::models::{
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
use pageserver_api::upcall_api::{
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
ValidateResponseTenant,
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequest, ValidateResponse, ValidateResponseTenant,
};
use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode};
@@ -97,6 +97,7 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -3732,11 +3733,14 @@ impl Service {
create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = self.config.timelines_onto_safekeepers;
let timeline_id = create_req.new_timeline_id;
tracing::info!(
mode=%create_req.mode_tag(),
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
timeline_id,
);
let _tenant_lock = trace_shared_lock(
@@ -3746,15 +3750,62 @@ impl Service {
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let create_mode = create_req.mode.clone();
let is_import = create_req.is_import();
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
let safekeepers = if safekeepers {
let selected_safekeepers = if is_import {
let shards = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(ts_id, _)| ts_id.to_index())
.collect::<Vec<_>>()
};
if !shards
.iter()
.map(|shard_index| shard_index.shard_count)
.all_equal()
{
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Inconsistent shard count"
)));
}
let import = TimelineImport {
tenant_id,
timeline_id,
shard_statuses: ShardImportStatuses::new(shards),
};
let inserted = self
.persistence
.insert_timeline_import(import.to_persistent())
.await
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
}
false => {
tracing::info!(%tenant_id, %timeline_id, "Timeline import entry already present");
}
}
None
} else if safekeepers {
// Note that we do not support creating the timeline on the safekeepers
// for imported timelines. The `start_lsn` of the timeline is not known
// until the import finshes.
// https://github.com/neondatabase/neon/issues/11569
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
.await?;
Some(res)
@@ -3764,10 +3815,168 @@ impl Service {
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers,
safekeepers: selected_safekeepers,
})
}
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,
) -> Result<(), ApiError> {
let res = self
.persistence
.update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status)
.await;
let timeline_import = match res {
Ok(Ok(Some(timeline_import))) => timeline_import,
Ok(Ok(None)) => {
// Idempotency: we've already seen and handled this update.
return Ok(());
}
Ok(Err(logical_err)) => {
return Err(logical_err.into());
}
Err(db_err) => {
return Err(db_err.into());
}
};
tracing::info!(
tenant_id=%req.tenant_shard_id.tenant_id,
timeline_id=%req.timeline_id,
shard_id=%req.tenant_shard_id.shard_slug(),
"Updated timeline import status to: {timeline_import:?}");
if timeline_import.is_complete() {
tokio::task::spawn({
let this = self.clone();
async move { this.finalize_timeline_import(timeline_import).await }
});
}
Ok(())
}
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
tracing::info!("Finalizing timeline import");
let import_failed = import.completion_error().is_some();
if !import_failed {
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
true => {
tracing::info!("Timeline became active on all shards");
break;
}
false => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
};
}
}
}
}
tracing::info!(%import_failed, "Notifying cplane of import completion");
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
client.notify_import_complete(&import).await?;
if let Err(err) = self
.persistence
.delete_timeline_import(import.tenant_id, import.timeline_id)
.await
{
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<bool> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
anyhow::bail!("Shard layout change detected on completion");
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(false);
}
}
targets
};
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_detail(tenant_shard_id, import.timeline_id)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
Ok(results.into_iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
}))
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,

View File

@@ -15,7 +15,7 @@ use http_utils::error::ApiError;
use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -207,7 +207,6 @@ impl Service {
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
@@ -217,15 +216,8 @@ impl Service {
// previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn.
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
let start_lsn = timeline_info.last_record_lsn;
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();

View File

@@ -0,0 +1,260 @@
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};
use http_utils::error::ApiError;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use pageserver_api::models::ShardImportStatus;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
shard::ShardIndex,
};
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
impl ShardImportStatuses {
pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
ShardImportStatuses(
shards
.into_iter()
.map(|ts_id| (ts_id, ShardImportStatus::InProgress))
.collect(),
)
}
}
#[derive(Debug)]
pub(crate) struct TimelineImport {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) shard_statuses: ShardImportStatuses,
}
pub(crate) enum TimelineImportUpdateFollowUp {
Persist,
None,
}
pub(crate) enum TimelineImportUpdateError {
ImportNotFound {
tenant_id: TenantId,
timeline_id: TimelineId,
},
MismatchedShards,
UnexpectedUpdate,
}
impl From<TimelineImportUpdateError> for ApiError {
fn from(err: TimelineImportUpdateError) -> ApiError {
match err {
TimelineImportUpdateError::ImportNotFound {
tenant_id,
timeline_id,
} => ApiError::NotFound(
anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
),
TimelineImportUpdateError::MismatchedShards => {
ApiError::InternalServerError(anyhow::anyhow!(
"Import shards do not match update request, likely a shard split happened during import, this is a bug"
))
}
TimelineImportUpdateError::UnexpectedUpdate => {
ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
}
}
}
}
impl TimelineImport {
pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
Ok(TimelineImport {
tenant_id,
timeline_id,
shard_statuses,
})
}
pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
TimelineImportPersistence {
tenant_id: self.tenant_id.to_string(),
timeline_id: self.timeline_id.to_string(),
shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
}
}
pub(crate) fn update(
&mut self,
shard: ShardIndex,
status: ShardImportStatus,
) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
use std::collections::hash_map::Entry::*;
match self.shard_statuses.0.entry(shard) {
Occupied(mut occ) => {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && !status.is_terminal() {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;
Ok(TimelineImportUpdateFollowUp::Persist)
}
}
Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
}
}
pub(crate) fn is_complete(&self) -> bool {
self.shard_statuses
.0
.values()
.all(|status| status.is_terminal())
}
pub(crate) fn completion_error(&self) -> Option<String> {
assert!(self.is_complete());
let shard_errors: HashMap<_, _> = self
.shard_statuses
.0
.iter()
.filter_map(|(shard, status)| {
if let ShardImportStatus::Error(err) = status {
Some((*shard, err.clone()))
} else {
None
}
})
.collect();
if shard_errors.is_empty() {
None
} else {
Some(serde_json::to_string(&shard_errors).unwrap())
}
}
}
pub(crate) struct UpcallClient {
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
base_url: String,
}
const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Debug)]
struct ImportCompleteRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
error: Option<String>,
}
impl UpcallClient {
pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {}", jwt));
let client = reqwest::ClientBuilder::new()
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
.build()
.expect("Failed to construct HTTP client");
let base_url = config
.control_plane_url
.clone()
.expect("must be configured");
Self {
authorization_header,
client,
cancel,
base_url,
}
}
/// Notify control plane of a completed import
///
/// This method guarantees at least once delivery semantics assuming
/// eventual cplane availability. The cplane API is idempotent.
pub(crate) async fn notify_import_complete(
&self,
import: &TimelineImport,
) -> anyhow::Result<()> {
let endpoint = if self.base_url.ends_with('/') {
format!("{}import_complete", self.base_url)
} else {
format!("{}/import_complete", self.base_url)
};
tracing::info!("Endpoint is {endpoint}");
let request = self
.client
.request(Method::PUT, endpoint)
.json(&ImportCompleteRequest {
tenant_id: import.tenant_id,
timeline_id: import.timeline_id,
error: import.completion_error(),
})
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
let request = if let Some(auth) = &self.authorization_header {
request.header(reqwest::header::AUTHORIZATION, auth)
} else {
request
};
const RETRY_DELAY: Duration = Duration::from_secs(1);
let mut attempt = 1;
loop {
if self.cancel.is_cancelled() {
return Err(anyhow::anyhow!(
"Shutting down while notifying cplane of import completion"
));
}
match request.try_clone().unwrap().send().await {
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) => {
tracing::warn!(
"Import complete notification failed with status {}, attempt {}",
response.status(),
attempt
);
}
Err(e) => {
tracing::warn!(
"Import complete notification failed with error: {}, attempt {}",
e,
attempt
);
}
}
tokio::select! {
_ = tokio::time::sleep(RETRY_DELAY) => {}
_ = self.cancel.cancelled() => {
return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
}
}
attempt += 1;
}
}
}

View File

@@ -5,8 +5,6 @@ edition = "2024"
license.workspace = true
[dependencies]
aws-config.workspace = true
aws-sdk-s3.workspace = true
either.workspace = true
anyhow.workspace = true
hex.workspace = true

View File

@@ -12,14 +12,9 @@ pub mod tenant_snapshot;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::Context;
use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::Client;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use futures::{Stream, StreamExt};
@@ -28,7 +23,7 @@ use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_time
use pageserver_api::shard::TenantShardId;
use remote_storage::{
DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
RemoteStorageKind, S3Config,
RemoteStorageKind, VersionId,
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -351,21 +346,6 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
}
}
async fn init_s3_client(bucket_region: Region) -> Client {
let mut retry_config_builder = RetryConfigBuilder::new();
retry_config_builder
.set_max_attempts(Some(3))
.set_mode(Some(RetryMode::Adaptive));
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
.region(bucket_region)
.retry_config(retry_config_builder.build())
.load()
.await;
Client::new(&config)
}
fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
match node_kind {
NodeKind::Pageserver => "pageserver/v1/",
@@ -385,23 +365,6 @@ fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeK
}
}
async fn init_remote_s3(
bucket_config: S3Config,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.bucket_region);
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let s3_root = make_root_target(
bucket_config.bucket_name,
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
node_kind,
);
Ok((s3_client, s3_root))
}
async fn init_remote(
mut storage_config: BucketConfig,
node_kind: NodeKind,
@@ -499,7 +462,7 @@ async fn list_objects_with_retries(
remote_client.bucket_name().unwrap_or_default(),
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
e,
);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
@@ -549,14 +512,18 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file_s3(
s3_client: &Client,
bucket_name: &str,
key: &str,
version_id: Option<&str>,
async fn download_object_to_file(
remote_storage: &GenericRemoteStorage,
key: &RemotePath,
version_id: Option<VersionId>,
local_path: &Utf8Path,
) -> anyhow::Result<()> {
let opts = DownloadOpts {
version_id: version_id.clone(),
..Default::default()
};
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
let cancel = CancellationToken::new();
for _ in 0..MAX_RETRIES {
tokio::fs::remove_file(&tmp_path)
.await
@@ -566,28 +533,24 @@ async fn download_object_to_file_s3(
.await
.context("Opening output file")?;
let request = s3_client.get_object().bucket(bucket_name).key(key);
let res = remote_storage.download(key, &opts, &cancel).await;
let request = match version_id {
Some(version_id) => request.version_id(version_id),
None => request,
};
let response_stream = match request.send().await {
let download = match res {
Ok(response) => response,
Err(e) => {
error!(
"Failed to download object for key {key} version {}: {e:#}",
version_id.unwrap_or("")
"Failed to download object for key {key} version {:?}: {e:#}",
&version_id.as_ref().unwrap_or(&VersionId(String::new()))
);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let mut read_stream = response_stream.body.into_async_read();
//response_stream.download_stream
tokio::io::copy(&mut read_stream, &mut file).await?;
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
tokio::io::copy(&mut body, &mut file).await?;
tokio::fs::rename(&tmp_path, local_path).await?;
return Ok(());

View File

@@ -1,31 +1,30 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context;
use async_stream::stream;
use aws_sdk_s3::Client;
use camino::Utf8PathBuf;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::IndexPart;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::storage_layer::LayerName;
use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, S3Config};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::generation::Generation;
use utils::id::TenantId;
use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file_s3,
init_remote, init_remote_s3,
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file, init_remote,
};
pub struct SnapshotDownloader {
s3_client: Arc<Client>,
s3_root: RootTarget,
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
bucket_config: BucketConfig,
bucket_config_s3: S3Config,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -38,17 +37,13 @@ impl SnapshotDownloader {
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let bucket_config_s3 = match &bucket_config.0.storage {
remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
_ => panic!("only S3 configuration is supported for snapshot downloading"),
};
let (s3_client, s3_root) =
init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
let (remote_client, target) =
init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
Ok(Self {
s3_client,
s3_root,
remote_client,
target,
bucket_config,
bucket_config_s3,
tenant_id,
output_path,
concurrency,
@@ -61,6 +56,7 @@ impl SnapshotDownloader {
layer_name: LayerName,
layer_metadata: LayerFileMetadata,
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
let cancel = CancellationToken::new();
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
// different layer names (remote-style has the generation suffix)
let local_path = self.output_path.join(format!(
@@ -82,30 +78,27 @@ impl SnapshotDownloader {
} else {
tracing::debug!("{} requires download...", local_path);
let timeline_root = self.s3_root.timeline_root(&ttid);
let remote_layer_path = format!(
"{}{}{}",
timeline_root.prefix_in_bucket,
layer_name,
layer_metadata.generation.get_suffix()
let remote_path = remote_layer_path(
&ttid.tenant_shard_id.tenant_id,
&ttid.timeline_id,
layer_metadata.shard,
&layer_name,
layer_metadata.generation,
);
let mode = remote_storage::ListingMode::NoDelimiter;
// List versions: the object might be deleted.
let versions = self
.s3_client
.list_object_versions()
.bucket(self.bucket_config_s3.bucket_name.clone())
.prefix(&remote_layer_path)
.send()
.remote_client
.list_versions(Some(&remote_path), mode, None, &cancel)
.await?;
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
let Some(version) = versions.versions.first() else {
return Err(anyhow::anyhow!("No versions found for {remote_path}"));
};
download_object_to_file_s3(
&self.s3_client,
&self.bucket_config_s3.bucket_name,
&remote_layer_path,
version.version_id.as_deref(),
download_object_to_file(
&self.remote_client,
&remote_path,
version.version_id().cloned(),
&local_path,
)
.await?;

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.metrics import parse_metrics
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
def test_compute_monitor(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can detect Postgres going down (unresponsive) and
reconnect when it comes back online. Also check that the downtime metrics
are properly emitted.
"""
TEST_DB = "test_compute_monitor"
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
# Check that default postgres database is present
with endpoint.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname = 'postgres'")
catalog_db = cursor.fetchone()
assert catalog_db is not None
assert len(catalog_db) == 1
# Create a new database
cursor.execute(f"CREATE DATABASE {TEST_DB}")
# Drop database 'postgres'
with endpoint.cursor(dbname=TEST_DB) as cursor:
# Use FORCE to terminate all connections to the database
cursor.execute("DROP DATABASE postgres WITH (FORCE)")
client = endpoint.http_client()
def check_metrics_down():
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
assert len(compute_pg_current_downtime_ms) == 1
assert compute_pg_current_downtime_ms[0].value > 0
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
assert len(compute_pg_downtime_ms_total) == 1
assert compute_pg_downtime_ms_total[0].value > 0
wait_until(check_metrics_down)
# Recreate postgres database
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute("CREATE DATABASE postgres")
# Current downtime should reset to 0, but not total downtime
def check_metrics_up():
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
assert len(compute_pg_current_downtime_ms) == 1
assert compute_pg_current_downtime_ms[0].value == 0
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
assert len(compute_pg_downtime_ms_total) == 1
assert compute_pg_downtime_ms_total[0].value > 0
wait_until(check_metrics_up)
# Just a sanity check that we log the downtime info
endpoint.log_contains("downtime_info")

View File

@@ -1,9 +1,9 @@
import base64
import json
import re
import time
from enum import Enum
from pathlib import Path
from threading import Event
import psycopg2
import psycopg2.errors
@@ -14,12 +14,11 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
PageserverApiException,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from fixtures.utils import shared_buffers_for_max_cu
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -44,6 +43,7 @@ smoke_params = [
]
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
vanilla_pg: VanillaPostgres,
@@ -56,24 +56,29 @@ def test_pgdata_import_smoke(
#
# Setup fake control plane for import progress
#
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}")
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode.
env.pageserver.is_testing_enabled_or_skip()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api"
}
)
env.pageserver.stop()
env.pageserver.start()
@@ -193,40 +198,11 @@ def test_pgdata_import_smoke(
)
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
def cplane_notified():
assert import_completion_signaled.is_set()
shard_status_file = statusdir / f"shard-{shard_id.shard_index}"
if state == "Active":
shard_status_file_contents = (
shard_status_file.read_text()
) # Active state implies import is done
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(1)
# Generous timeout for the MULTIPLE_RELATION_SEGMENTS test variants
wait_until(cplane_notified, timeout=90)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")
@@ -372,19 +348,27 @@ def test_fast_import_with_pageserver_ingest(
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
# Setup pageserver and fake cplane for import progress
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}")
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
# because import_pgdata code uses this endpoint, not the one in common remote storage config
# TODO: maybe use common remote_storage config in pageserver?
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
@@ -476,42 +460,10 @@ def test_fast_import_with_pageserver_ingest(
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
validate_vanilla_equivalence(conn)
# Poll pageserver statuses in s3
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
def cplane_notified():
assert import_completion_signaled.is_set()
if state == "Active":
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
shard_status_file_contents = (
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
)
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(0.5)
wait_until(cplane_notified, timeout=60)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")

View File

@@ -77,6 +77,8 @@ regex-automata = { version = "0.4", default-features = false, features = ["dfa-o
regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
rustls-pki-types = { version = "1", features = ["std"] }
rustls-webpki = { version = "0.102", default-features = false, features = ["ring", "std"] }
scopeguard = { version = "1" }
sec1 = { version = "0.7", features = ["pem", "serde", "std", "subtle"] }
serde = { version = "1", features = ["alloc", "derive"] }
@@ -103,7 +105,6 @@ tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zeroize = { version = "1", features = ["derive", "serde"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
@@ -146,7 +147,6 @@ serde = { version = "1", features = ["alloc", "derive"] }
syn = { version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
toml_edit = { version = "0.22", features = ["serde"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }