mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Compare commits
13 Commits
tristan957
...
jc/verify-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
313e01098a | ||
|
|
5e989a3148 | ||
|
|
985056be37 | ||
|
|
9c6ff3aa2b | ||
|
|
9d472c79ce | ||
|
|
b43203928f | ||
|
|
c35d489539 | ||
|
|
3a50d95b6d | ||
|
|
d43b8e73ae | ||
|
|
1808dad269 | ||
|
|
7ba8519b43 | ||
|
|
f8100d66d5 | ||
|
|
51cdb570eb |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -1238,7 +1238,7 @@ jobs:
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
TIMEOUT=1800 # 30 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
|
||||
TIMEOUT=5400 # 90 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
|
||||
INTERVAL=15 # try each N seconds
|
||||
|
||||
last_status="" # a variable to carry the last status of the "build-and-upload-extensions" context
|
||||
|
||||
78
Cargo.lock
generated
78
Cargo.lock
generated
@@ -40,7 +40,7 @@ dependencies = [
|
||||
"getrandom 0.2.11",
|
||||
"once_cell",
|
||||
"version_check",
|
||||
"zerocopy",
|
||||
"zerocopy 0.7.31",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4415,9 +4415,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.2.0"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd"
|
||||
checksum = "6827e3fc394523c21d4464d02c0bb1c19966ea4a58a9844ad6d746214179d2bc"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"seize",
|
||||
@@ -5204,7 +5204,7 @@ dependencies = [
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"x509-cert",
|
||||
"zerocopy",
|
||||
"zerocopy 0.8.24",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5594,7 +5594,7 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams",
|
||||
"web-sys",
|
||||
"webpki-roots 0.26.1",
|
||||
"webpki-roots",
|
||||
"winreg",
|
||||
]
|
||||
|
||||
@@ -6195,13 +6195,13 @@ checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87"
|
||||
|
||||
[[package]]
|
||||
name = "sentry"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "00421ed8fa0c995f07cde48ba6c89e80f2b312f74ff637326f392fbfd23abe02"
|
||||
checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
|
||||
dependencies = [
|
||||
"httpdate",
|
||||
"reqwest",
|
||||
"rustls 0.21.12",
|
||||
"rustls 0.23.18",
|
||||
"sentry-backtrace",
|
||||
"sentry-contexts",
|
||||
"sentry-core",
|
||||
@@ -6209,14 +6209,14 @@ dependencies = [
|
||||
"sentry-tracing",
|
||||
"tokio",
|
||||
"ureq",
|
||||
"webpki-roots 0.25.2",
|
||||
"webpki-roots",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sentry-backtrace"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a79194074f34b0cbe5dd33896e5928bbc6ab63a889bd9df2264af5acb186921e"
|
||||
checksum = "00293cd332a859961f24fd69258f7e92af736feaeb91020cff84dac4188a4302"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"once_cell",
|
||||
@@ -6226,9 +6226,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sentry-contexts"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eba8870c5dba2bfd9db25c75574a11429f6b95957b0a78ac02e2970dd7a5249a"
|
||||
checksum = "961990f9caa76476c481de130ada05614cd7f5aa70fb57c2142f0e09ad3fb2aa"
|
||||
dependencies = [
|
||||
"hostname",
|
||||
"libc",
|
||||
@@ -6240,9 +6240,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sentry-core"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46a75011ea1c0d5c46e9e57df03ce81f5c7f0a9e199086334a1f9c0a541e0826"
|
||||
checksum = "1a6409d845707d82415c800290a5d63be5e3df3c2e417b0997c60531dfbd35ef"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"rand 0.8.5",
|
||||
@@ -6253,9 +6253,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sentry-panic"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2eaa3ecfa3c8750c78dcfd4637cfa2598b95b52897ed184b4dc77fcf7d95060d"
|
||||
checksum = "609b1a12340495ce17baeec9e08ff8ed423c337c1a84dffae36a178c783623f3"
|
||||
dependencies = [
|
||||
"sentry-backtrace",
|
||||
"sentry-core",
|
||||
@@ -6263,9 +6263,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sentry-tracing"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f715932bf369a61b7256687c6f0554141b7ce097287e30e3f7ed6e9de82498fe"
|
||||
checksum = "49f4e86402d5c50239dc7d8fd3f6d5e048221d5fcb4e026d8d50ab57fe4644cb"
|
||||
dependencies = [
|
||||
"sentry-backtrace",
|
||||
"sentry-core",
|
||||
@@ -6275,9 +6275,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sentry-types"
|
||||
version = "0.32.3"
|
||||
version = "0.37.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4519c900ce734f7a0eb7aba0869dfb225a7af8820634a7dd51449e3b093cfb7c"
|
||||
checksum = "3d3f117b8755dbede8260952de2aeb029e20f432e72634e8969af34324591631"
|
||||
dependencies = [
|
||||
"debugid",
|
||||
"hex",
|
||||
@@ -6711,8 +6711,6 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
"aws-config",
|
||||
"aws-sdk-s3",
|
||||
"camino",
|
||||
"chrono",
|
||||
"clap",
|
||||
@@ -7801,7 +7799,7 @@ dependencies = [
|
||||
"rustls 0.23.18",
|
||||
"rustls-pki-types",
|
||||
"url",
|
||||
"webpki-roots 0.26.1",
|
||||
"webpki-roots",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8169,12 +8167,6 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.25.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "0.26.1"
|
||||
@@ -8482,6 +8474,8 @@ dependencies = [
|
||||
"regex-syntax 0.8.2",
|
||||
"reqwest",
|
||||
"rustls 0.23.18",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.8",
|
||||
"scopeguard",
|
||||
"sec1 0.7.3",
|
||||
"serde",
|
||||
@@ -8510,7 +8504,6 @@ dependencies = [
|
||||
"tracing-log",
|
||||
"url",
|
||||
"uuid",
|
||||
"zerocopy",
|
||||
"zeroize",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
@@ -8614,8 +8607,16 @@ version = "0.7.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"zerocopy-derive",
|
||||
"zerocopy-derive 0.7.31",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879"
|
||||
dependencies = [
|
||||
"zerocopy-derive 0.8.24",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8629,6 +8630,17 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.8.24"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerofrom"
|
||||
version = "0.1.5"
|
||||
|
||||
@@ -164,7 +164,7 @@ scopeguard = "1.1"
|
||||
sysinfo = "0.29.2"
|
||||
sd-notify = "0.4.1"
|
||||
send-future = "0.1.0"
|
||||
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
sentry = { version = "0.37", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
serde_path_to_error = "0.1"
|
||||
@@ -220,7 +220,7 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
|
||||
walkdir = "2.3.2"
|
||||
rustls-native-certs = "0.8"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
zerocopy = { version = "0.8", features = ["derive", "simd"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
x509-cert = { version = "0.2.5" }
|
||||
|
||||
|
||||
@@ -173,7 +173,7 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
|
||||
&& rm -rf protoc.zip protoc
|
||||
|
||||
# s5cmd
|
||||
ENV S5CMD_VERSION=2.2.2
|
||||
ENV S5CMD_VERSION=2.3.0
|
||||
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
|
||||
&& chmod +x s5cmd \
|
||||
&& mv s5cmd /usr/local/bin/s5cmd
|
||||
@@ -206,7 +206,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
|
||||
&& rm awscliv2.zip
|
||||
|
||||
# Mold: A Modern Linker
|
||||
ENV MOLD_VERSION=v2.34.1
|
||||
ENV MOLD_VERSION=v2.37.1
|
||||
RUN set -e \
|
||||
&& git clone https://github.com/rui314/mold.git \
|
||||
&& mkdir mold/build \
|
||||
@@ -268,7 +268,7 @@ WORKDIR /home/nonroot
|
||||
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
|
||||
|
||||
# Python
|
||||
ENV PYTHON_VERSION=3.11.10 \
|
||||
ENV PYTHON_VERSION=3.11.12 \
|
||||
PYENV_ROOT=/home/nonroot/.pyenv \
|
||||
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
|
||||
RUN set -e \
|
||||
@@ -296,12 +296,12 @@ ENV RUSTC_VERSION=1.86.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
ARG RUSTFILT_VERSION=0.2.1
|
||||
ARG CARGO_HAKARI_VERSION=0.9.33
|
||||
ARG CARGO_DENY_VERSION=0.16.2
|
||||
ARG CARGO_HACK_VERSION=0.6.33
|
||||
ARG CARGO_NEXTEST_VERSION=0.9.85
|
||||
ARG CARGO_HAKARI_VERSION=0.9.36
|
||||
ARG CARGO_DENY_VERSION=0.18.2
|
||||
ARG CARGO_HACK_VERSION=0.6.36
|
||||
ARG CARGO_NEXTEST_VERSION=0.9.94
|
||||
ARG CARGO_CHEF_VERSION=0.1.71
|
||||
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
|
||||
ARG CARGO_DIESEL_CLI_VERSION=2.2.9
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
chmod +x rustup-init && \
|
||||
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use metrics::core::{AtomicF64, Collector, GenericGauge};
|
||||
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
|
||||
use metrics::proto::MetricFamily;
|
||||
use metrics::{
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
|
||||
register_int_gauge_vec, register_uint_gauge_vec,
|
||||
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -81,6 +81,22 @@ pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PG_CURR_DOWNTIME_MS: Lazy<GenericGauge<AtomicF64>> = Lazy::new(|| {
|
||||
register_gauge!(
|
||||
"compute_pg_current_downtime_ms",
|
||||
"Non-cumulative duration of Postgres downtime in ms; resets after successful check",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_pg_downtime_ms_total",
|
||||
"Cumulative duration of Postgres downtime in ms",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
let mut metrics = COMPUTE_CTL_UP.collect();
|
||||
metrics.extend(INSTALLED_EXTENSIONS.collect());
|
||||
@@ -88,5 +104,7 @@ pub fn collect() -> Vec<MetricFamily> {
|
||||
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
|
||||
metrics.extend(DB_MIGRATION_FAILED.collect());
|
||||
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
|
||||
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
|
||||
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
|
||||
metrics
|
||||
}
|
||||
|
||||
@@ -6,197 +6,294 @@ use chrono::{DateTime, Utc};
|
||||
use compute_api::responses::ComputeStatus;
|
||||
use compute_api::spec::ComputeFeature;
|
||||
use postgres::{Client, NoTls};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing::{Level, error, info, instrument, span};
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
|
||||
|
||||
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
|
||||
|
||||
// Spin in a loop and figure out the last activity time in the Postgres.
|
||||
// Then update it in the shared state. This function never errors out.
|
||||
// NB: the only expected panic is at `Mutex` unwrap(), all other errors
|
||||
// should be handled gracefully.
|
||||
fn watch_compute_activity(compute: &ComputeNode) {
|
||||
// Suppose that `connstr` doesn't change
|
||||
let connstr = compute.params.connstr.clone();
|
||||
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
|
||||
struct ComputeMonitor {
|
||||
compute: Arc<ComputeNode>,
|
||||
|
||||
// During startup and configuration we connect to every Postgres database,
|
||||
// but we don't want to count this as some user activity. So wait until
|
||||
// the compute fully started before monitoring activity.
|
||||
wait_for_postgres_start(compute);
|
||||
/// The moment when Postgres had some activity,
|
||||
/// that should prevent compute from being suspended.
|
||||
last_active: Option<DateTime<Utc>>,
|
||||
|
||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||
let mut client = conf.connect(NoTls);
|
||||
/// The moment when we last tried to check Postgres.
|
||||
last_checked: DateTime<Utc>,
|
||||
/// The last moment we did a successful Postgres check.
|
||||
last_up: DateTime<Utc>,
|
||||
|
||||
let mut sleep = false;
|
||||
let mut prev_active_time: Option<f64> = None;
|
||||
let mut prev_sessions: Option<i64> = None;
|
||||
/// Only used for internal statistics change tracking
|
||||
/// between monitor runs and can be outdated.
|
||||
active_time: Option<f64>,
|
||||
/// Only used for internal statistics change tracking
|
||||
/// between monitor runs and can be outdated.
|
||||
sessions: Option<i64>,
|
||||
|
||||
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
|
||||
info!("starting experimental activity monitor for {}", connstr);
|
||||
} else {
|
||||
info!("starting activity monitor for {}", connstr);
|
||||
/// Use experimental statistics-based activity monitor. It's no longer
|
||||
/// 'experimental' per se, as it's enabled for everyone, but we still
|
||||
/// keep the flag as an option to turn it off in some cases if it will
|
||||
/// misbehave.
|
||||
experimental: bool,
|
||||
}
|
||||
|
||||
impl ComputeMonitor {
|
||||
fn report_down(&self) {
|
||||
let now = Utc::now();
|
||||
|
||||
// Calculate and report current downtime
|
||||
// (since the last time Postgres was up)
|
||||
let downtime = now.signed_duration_since(self.last_up);
|
||||
PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
|
||||
|
||||
// Calculate and update total downtime
|
||||
// (cumulative duration of Postgres downtime in ms)
|
||||
let inc = now
|
||||
.signed_duration_since(self.last_checked)
|
||||
.num_milliseconds();
|
||||
PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
|
||||
}
|
||||
|
||||
loop {
|
||||
// We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
|
||||
// But skip the first sleep, so we can connect to Postgres immediately.
|
||||
if sleep {
|
||||
// Should be outside of the mutex lock to allow others to read while we sleep.
|
||||
thread::sleep(MONITOR_CHECK_INTERVAL);
|
||||
} else {
|
||||
sleep = true;
|
||||
}
|
||||
fn report_up(&mut self) {
|
||||
self.last_up = Utc::now();
|
||||
PG_CURR_DOWNTIME_MS.set(0.0);
|
||||
}
|
||||
|
||||
match &mut client {
|
||||
Ok(cli) => {
|
||||
if cli.is_closed() {
|
||||
info!("connection to Postgres is closed, trying to reconnect");
|
||||
fn downtime_info(&self) -> String {
|
||||
format!(
|
||||
"total_ms: {}, current_ms: {}, last_up: {}",
|
||||
PG_TOTAL_DOWNTIME_MS.get(),
|
||||
PG_CURR_DOWNTIME_MS.get(),
|
||||
self.last_up
|
||||
)
|
||||
}
|
||||
|
||||
// Connection is closed, reconnect and try again.
|
||||
client = conf.connect(NoTls);
|
||||
continue;
|
||||
}
|
||||
/// Spin in a loop and figure out the last activity time in the Postgres.
|
||||
/// Then update it in the shared state. This function never errors out.
|
||||
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
|
||||
/// should be handled gracefully.
|
||||
#[instrument(skip_all)]
|
||||
pub fn run(&mut self) {
|
||||
// Suppose that `connstr` doesn't change
|
||||
let connstr = self.compute.params.connstr.clone();
|
||||
let conf = self
|
||||
.compute
|
||||
.get_conn_conf(Some("compute_ctl:compute_monitor"));
|
||||
|
||||
// This is a new logic, only enable if the feature flag is set.
|
||||
// TODO: remove this once we are sure that it works OR drop it altogether.
|
||||
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
|
||||
// First, check if the total active time or sessions across all databases has changed.
|
||||
// If it did, it means that user executed some queries. In theory, it can even go down if
|
||||
// some databases were dropped, but it's still a user activity.
|
||||
match get_database_stats(cli) {
|
||||
Ok((active_time, sessions)) => {
|
||||
let mut detected_activity = false;
|
||||
// During startup and configuration we connect to every Postgres database,
|
||||
// but we don't want to count this as some user activity. So wait until
|
||||
// the compute fully started before monitoring activity.
|
||||
wait_for_postgres_start(&self.compute);
|
||||
|
||||
prev_active_time = match prev_active_time {
|
||||
Some(prev_active_time) => {
|
||||
if active_time != prev_active_time {
|
||||
detected_activity = true;
|
||||
}
|
||||
Some(active_time)
|
||||
}
|
||||
None => Some(active_time),
|
||||
};
|
||||
prev_sessions = match prev_sessions {
|
||||
Some(prev_sessions) => {
|
||||
if sessions != prev_sessions {
|
||||
detected_activity = true;
|
||||
}
|
||||
Some(sessions)
|
||||
}
|
||||
None => Some(sessions),
|
||||
};
|
||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||
let mut client = conf.connect(NoTls);
|
||||
|
||||
if detected_activity {
|
||||
// Update the last active time and continue, we don't need to
|
||||
// check backends state change.
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("could not get database statistics: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
info!("starting compute monitor for {}", connstr);
|
||||
|
||||
// Second, if database statistics is the same, check all backends state change,
|
||||
// maybe there is some with more recent activity. `get_backends_state_change()`
|
||||
// can return None or stale timestamp, so it's `compute.update_last_active()`
|
||||
// responsibility to check if the new timestamp is more recent than the current one.
|
||||
// This helps us to discover new sessions, that did nothing yet.
|
||||
match get_backends_state_change(cli) {
|
||||
Ok(last_active) => {
|
||||
compute.update_last_active(last_active);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("could not get backends state change: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, if there are existing (logical) walsenders, do not suspend.
|
||||
//
|
||||
// walproposer doesn't currently show up in pg_stat_replication,
|
||||
// but protect if it will be
|
||||
let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
|
||||
match cli.query_one(ws_count_query, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_ws) => {
|
||||
if num_ws > 0 {
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse walsenders count: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("failed to get list of walsenders: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
//
|
||||
// Don't suspend compute if there is an active logical replication subscription
|
||||
//
|
||||
// `where pid is not null` – to filter out read only computes and subscription on branches
|
||||
//
|
||||
let logical_subscriptions_query =
|
||||
"select count(*) from pg_stat_subscription where pid is not null;";
|
||||
match cli.query_one(logical_subscriptions_query, &[]) {
|
||||
Ok(row) => match row.try_get::<&str, i64>("count") {
|
||||
Ok(num_subscribers) => {
|
||||
if num_subscribers > 0 {
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to get list of active logical replication subscriptions: {:?}",
|
||||
e
|
||||
loop {
|
||||
match &mut client {
|
||||
Ok(cli) => {
|
||||
if cli.is_closed() {
|
||||
info!(
|
||||
downtime_info = self.downtime_info(),
|
||||
"connection to Postgres is closed, trying to reconnect"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
//
|
||||
// Do not suspend compute if autovacuum is running
|
||||
//
|
||||
let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
|
||||
match cli.query_one(autovacuum_count_query, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_workers) => {
|
||||
if num_workers > 0 {
|
||||
compute.update_last_active(Some(Utc::now()));
|
||||
continue;
|
||||
self.report_down();
|
||||
|
||||
// Connection is closed, reconnect and try again.
|
||||
client = conf.connect(NoTls);
|
||||
} else {
|
||||
match self.check(cli) {
|
||||
Ok(_) => {
|
||||
self.report_up();
|
||||
self.compute.update_last_active(self.last_active);
|
||||
}
|
||||
Err(e) => {
|
||||
// Although we have many places where we can return errors in `check()`,
|
||||
// normally it shouldn't happen. I.e., we will likely return error if
|
||||
// connection got broken, query timed out, Postgres returned invalid data, etc.
|
||||
// In all such cases it's suspicious, so let's report this as downtime.
|
||||
self.report_down();
|
||||
error!(
|
||||
downtime_info = self.downtime_info(),
|
||||
"could not check Postgres: {}", e
|
||||
);
|
||||
|
||||
// Reconnect to Postgres just in case. During tests, I noticed
|
||||
// that queries in `check()` can fail with `connection closed`,
|
||||
// but `cli.is_closed()` above doesn't detect it. Even if old
|
||||
// connection is still alive, it will be dropped when we reassign
|
||||
// `client` to a new connection.
|
||||
client = conf.connect(NoTls);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to parse autovacuum workers count: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("failed to get list of autovacuum workers: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("could not connect to Postgres: {}, retrying", e);
|
||||
Err(e) => {
|
||||
info!(
|
||||
downtime_info = self.downtime_info(),
|
||||
"could not connect to Postgres: {}, retrying", e
|
||||
);
|
||||
self.report_down();
|
||||
|
||||
// Establish a new connection and try again.
|
||||
client = conf.connect(NoTls);
|
||||
// Establish a new connection and try again.
|
||||
client = conf.connect(NoTls);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the `last_checked` timestamp and sleep before the next iteration.
|
||||
self.last_checked = Utc::now();
|
||||
thread::sleep(MONITOR_CHECK_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
|
||||
// This is new logic, only enable if the feature flag is set.
|
||||
// TODO: remove this once we are sure that it works OR drop it altogether.
|
||||
if self.experimental {
|
||||
// Check if the total active time or sessions across all databases has changed.
|
||||
// If it did, it means that user executed some queries. In theory, it can even go down if
|
||||
// some databases were dropped, but it's still user activity.
|
||||
match get_database_stats(cli) {
|
||||
Ok((active_time, sessions)) => {
|
||||
let mut detected_activity = false;
|
||||
|
||||
if let Some(prev_active_time) = self.active_time {
|
||||
if active_time != prev_active_time {
|
||||
detected_activity = true;
|
||||
}
|
||||
}
|
||||
self.active_time = Some(active_time);
|
||||
|
||||
if let Some(prev_sessions) = self.sessions {
|
||||
if sessions != prev_sessions {
|
||||
detected_activity = true;
|
||||
}
|
||||
}
|
||||
self.sessions = Some(sessions);
|
||||
|
||||
if detected_activity {
|
||||
// Update the last active time and continue, we don't need to
|
||||
// check backends state change.
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!("could not get database statistics: {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If database statistics are the same, check all backends for state changes.
|
||||
// Maybe there are some with more recent activity. `get_backends_state_change()`
|
||||
// can return None or stale timestamp, so it's `compute.update_last_active()`
|
||||
// responsibility to check if the new timestamp is more recent than the current one.
|
||||
// This helps us to discover new sessions that have not done anything yet.
|
||||
match get_backends_state_change(cli) {
|
||||
Ok(last_active) => match (last_active, self.last_active) {
|
||||
(Some(last_active), Some(prev_last_active)) => {
|
||||
if last_active > prev_last_active {
|
||||
self.last_active = Some(last_active);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
(Some(last_active), None) => {
|
||||
self.last_active = Some(last_active);
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"could not get backends state change: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// If there are existing (logical) walsenders, do not suspend.
|
||||
//
|
||||
// N.B. walproposer doesn't currently show up in pg_stat_replication,
|
||||
// but protect if it will.
|
||||
const WS_COUNT_QUERY: &str =
|
||||
"select count(*) from pg_stat_replication where application_name != 'walproposer';";
|
||||
match cli.query_one(WS_COUNT_QUERY, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_ws) => {
|
||||
if num_ws > 0 {
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let err: anyhow::Error = e.into();
|
||||
return Err(err.context("failed to parse walsenders count"));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
|
||||
}
|
||||
}
|
||||
|
||||
// Don't suspend compute if there is an active logical replication subscription
|
||||
//
|
||||
// `where pid is not null` – to filter out read only computes and subscription on branches
|
||||
const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
|
||||
"select count(*) from pg_stat_subscription where pid is not null;";
|
||||
match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
|
||||
Ok(row) => match row.try_get::<&str, i64>("count") {
|
||||
Ok(num_subscribers) => {
|
||||
if num_subscribers > 0 {
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to parse 'pg_stat_subscription' count: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to get list of active logical replication subscriptions: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Do not suspend compute if autovacuum is running
|
||||
const AUTOVACUUM_COUNT_QUERY: &str =
|
||||
"select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
|
||||
match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
|
||||
Ok(r) => match r.try_get::<&str, i64>("count") {
|
||||
Ok(num_workers) => {
|
||||
if num_workers > 0 {
|
||||
self.last_active = Some(Utc::now());
|
||||
return Ok(());
|
||||
};
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to parse autovacuum workers count: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"failed to get list of autovacuum workers: {}",
|
||||
e
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,9 +412,24 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
|
||||
/// Launch a separate compute monitor thread and return its `JoinHandle`.
|
||||
pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
|
||||
let compute = Arc::clone(compute);
|
||||
let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
|
||||
let now = Utc::now();
|
||||
let mut monitor = ComputeMonitor {
|
||||
compute,
|
||||
last_active: None,
|
||||
last_checked: now,
|
||||
last_up: now,
|
||||
active_time: None,
|
||||
sessions: None,
|
||||
experimental,
|
||||
};
|
||||
|
||||
let span = span!(Level::INFO, "compute_monitor");
|
||||
thread::Builder::new()
|
||||
.name("compute-monitor".into())
|
||||
.spawn(move || watch_compute_activity(&compute))
|
||||
.spawn(move || {
|
||||
let _enter = span.enter();
|
||||
monitor.run();
|
||||
})
|
||||
.expect("cannot launch compute monitor thread")
|
||||
}
|
||||
|
||||
16
deny.toml
16
deny.toml
@@ -45,9 +45,7 @@ allow = [
|
||||
"ISC",
|
||||
"MIT",
|
||||
"MPL-2.0",
|
||||
"OpenSSL",
|
||||
"Unicode-3.0",
|
||||
"Zlib",
|
||||
]
|
||||
confidence-threshold = 0.8
|
||||
exceptions = [
|
||||
@@ -56,14 +54,6 @@ exceptions = [
|
||||
{ allow = ["Zlib"], name = "const_format", version = "*" },
|
||||
]
|
||||
|
||||
[[licenses.clarify]]
|
||||
name = "ring"
|
||||
version = "*"
|
||||
expression = "MIT AND ISC AND OpenSSL"
|
||||
license-files = [
|
||||
{ path = "LICENSE", hash = 0xbd0eed23 }
|
||||
]
|
||||
|
||||
[licenses.private]
|
||||
ignore = true
|
||||
registries = []
|
||||
@@ -116,7 +106,11 @@ name = "openssl"
|
||||
unknown-registry = "warn"
|
||||
unknown-git = "warn"
|
||||
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
|
||||
allow-git = []
|
||||
allow-git = [
|
||||
# Crate pinned to commit in origin repo due to opentelemetry version.
|
||||
# TODO: Remove this once crate is fetched from crates.io again.
|
||||
"https://github.com/mattiapenati/tower-otel",
|
||||
]
|
||||
|
||||
[sources.allow-org]
|
||||
github = [
|
||||
|
||||
@@ -181,6 +181,7 @@ pub struct ConfigToml {
|
||||
pub generate_unarchival_heatmap: Option<bool>,
|
||||
pub tracing: Option<Tracing>,
|
||||
pub enable_tls_page_service_api: bool,
|
||||
pub dev_mode: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -657,6 +658,7 @@ impl Default for ConfigToml {
|
||||
generate_unarchival_heatmap: None,
|
||||
tracing: None,
|
||||
enable_tls_page_service_api: false,
|
||||
dev_mode: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -320,6 +320,35 @@ pub struct TimelineCreateRequest {
|
||||
pub mode: TimelineCreateRequestMode,
|
||||
}
|
||||
|
||||
impl TimelineCreateRequest {
|
||||
pub fn mode_tag(&self) -> &'static str {
|
||||
match &self.mode {
|
||||
TimelineCreateRequestMode::Branch { .. } => "branch",
|
||||
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
|
||||
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_import(&self) -> bool {
|
||||
matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ShardImportStatus {
|
||||
InProgress,
|
||||
Done,
|
||||
Error(String),
|
||||
}
|
||||
impl ShardImportStatus {
|
||||
pub fn is_terminal(&self) -> bool {
|
||||
match self {
|
||||
ShardImportStatus::InProgress => false,
|
||||
ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage controller specific extensions to [`TimelineInfo`].
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateResponseStorcon {
|
||||
|
||||
@@ -4,10 +4,10 @@
|
||||
//! See docs/rfcs/025-generation-numbers.md
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
|
||||
use crate::controller_api::NodeRegisterRequest;
|
||||
use crate::models::LocationConfigMode;
|
||||
use crate::models::{LocationConfigMode, ShardImportStatus};
|
||||
use crate::shard::TenantShardId;
|
||||
|
||||
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
|
||||
@@ -62,3 +62,10 @@ pub struct ValidateResponseTenant {
|
||||
pub id: TenantShardId,
|
||||
pub valid: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct PutTimelineImportStatusRequest {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub status: ShardImportStatus,
|
||||
}
|
||||
|
||||
@@ -14,8 +14,9 @@ use anyhow::{Context, Result};
|
||||
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
|
||||
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::blob::CopyStatus;
|
||||
use azure_storage_blobs::blob::operations::GetBlobBuilder;
|
||||
use azure_storage_blobs::blob::{Blob, CopyStatus};
|
||||
use azure_storage_blobs::container::operations::ListBlobsBuilder;
|
||||
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
@@ -253,53 +254,15 @@ impl AzureBlobStorage {
|
||||
download
|
||||
}
|
||||
|
||||
async fn permit(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
|
||||
let acquire = self.concurrency_limiter.acquire(kind);
|
||||
|
||||
tokio::select! {
|
||||
permit = acquire => Ok(permit.expect("never closed")),
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn container_name(&self) -> &str {
|
||||
&self.container_name
|
||||
}
|
||||
}
|
||||
|
||||
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
|
||||
let mut res = Metadata::new();
|
||||
for (k, v) in metadata.0.into_iter() {
|
||||
res.insert(k, v);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn to_download_error(error: azure_core::Error) -> DownloadError {
|
||||
if let Some(http_err) = error.as_http_error() {
|
||||
match http_err.status() {
|
||||
StatusCode::NotFound => DownloadError::NotFound,
|
||||
StatusCode::NotModified => DownloadError::Unmodified,
|
||||
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
|
||||
_ => DownloadError::Other(anyhow::Error::new(error)),
|
||||
}
|
||||
} else {
|
||||
DownloadError::Other(error.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteStorage for AzureBlobStorage {
|
||||
fn list_streaming(
|
||||
fn list_streaming_for_fn<T: Default + ListingCollector>(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
request_kind: RequestKind,
|
||||
customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
|
||||
) -> impl Stream<Item = Result<T, DownloadError>> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
|
||||
self.prefix_in_container.clone().map(|mut s| {
|
||||
@@ -311,7 +274,7 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
});
|
||||
|
||||
async_stream::stream! {
|
||||
let _permit = self.permit(RequestKind::List, cancel).await?;
|
||||
let _permit = self.permit(request_kind, cancel).await?;
|
||||
|
||||
let mut builder = self.client.list_blobs();
|
||||
|
||||
@@ -327,6 +290,8 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
builder = builder.max_results(MaxResults::new(limit));
|
||||
}
|
||||
|
||||
builder = customize_builder(builder);
|
||||
|
||||
let mut next_marker = None;
|
||||
|
||||
let mut timeout_try_cnt = 1;
|
||||
@@ -382,26 +347,20 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
break;
|
||||
};
|
||||
|
||||
let mut res = Listing::default();
|
||||
let mut res = T::default();
|
||||
next_marker = entry.continuation();
|
||||
let prefix_iter = entry
|
||||
.blobs
|
||||
.prefixes()
|
||||
.map(|prefix| self.name_to_relative_path(&prefix.name));
|
||||
res.prefixes.extend(prefix_iter);
|
||||
res.add_prefixes(self, prefix_iter);
|
||||
|
||||
let blob_iter = entry
|
||||
.blobs
|
||||
.blobs()
|
||||
.map(|k| ListingObject{
|
||||
key: self.name_to_relative_path(&k.name),
|
||||
last_modified: k.properties.last_modified.into(),
|
||||
size: k.properties.content_length,
|
||||
}
|
||||
);
|
||||
.blobs();
|
||||
|
||||
for key in blob_iter {
|
||||
res.keys.push(key);
|
||||
res.add_blob(self, key);
|
||||
|
||||
if let Some(mut mk) = max_keys {
|
||||
assert!(mk > 0);
|
||||
@@ -423,6 +382,128 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
}
|
||||
}
|
||||
|
||||
async fn permit(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
|
||||
let acquire = self.concurrency_limiter.acquire(kind);
|
||||
|
||||
tokio::select! {
|
||||
permit = acquire => Ok(permit.expect("never closed")),
|
||||
_ = cancel.cancelled() => Err(Cancelled),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn container_name(&self) -> &str {
|
||||
&self.container_name
|
||||
}
|
||||
}
|
||||
|
||||
trait ListingCollector {
|
||||
fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
|
||||
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
|
||||
}
|
||||
|
||||
impl ListingCollector for Listing {
|
||||
fn add_prefixes(
|
||||
&mut self,
|
||||
_abs: &AzureBlobStorage,
|
||||
prefix_it: impl Iterator<Item = RemotePath>,
|
||||
) {
|
||||
self.prefixes.extend(prefix_it);
|
||||
}
|
||||
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
|
||||
self.keys.push(ListingObject {
|
||||
key: abs.name_to_relative_path(&blob.name),
|
||||
last_modified: blob.properties.last_modified.into(),
|
||||
size: blob.properties.content_length,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl ListingCollector for crate::VersionListing {
|
||||
fn add_prefixes(
|
||||
&mut self,
|
||||
_abs: &AzureBlobStorage,
|
||||
_prefix_it: impl Iterator<Item = RemotePath>,
|
||||
) {
|
||||
// nothing
|
||||
}
|
||||
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
|
||||
let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
|
||||
self.versions.push(crate::Version {
|
||||
key: abs.name_to_relative_path(&blob.name),
|
||||
last_modified: blob.properties.last_modified.into(),
|
||||
kind: crate::VersionKind::Version(id),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
|
||||
let mut res = Metadata::new();
|
||||
for (k, v) in metadata.0.into_iter() {
|
||||
res.insert(k, v);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
fn to_download_error(error: azure_core::Error) -> DownloadError {
|
||||
if let Some(http_err) = error.as_http_error() {
|
||||
match http_err.status() {
|
||||
StatusCode::NotFound => DownloadError::NotFound,
|
||||
StatusCode::NotModified => DownloadError::Unmodified,
|
||||
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
|
||||
_ => DownloadError::Other(anyhow::Error::new(error)),
|
||||
}
|
||||
} else {
|
||||
DownloadError::Other(error.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteStorage for AzureBlobStorage {
|
||||
fn list_streaming(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> impl Stream<Item = Result<Listing, DownloadError>> {
|
||||
let customize_builder = |builder| builder;
|
||||
let kind = RequestKind::ListVersions;
|
||||
self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> std::result::Result<crate::VersionListing, DownloadError> {
|
||||
let customize_builder = |mut builder: ListBlobsBuilder| {
|
||||
builder = builder.include_versions(true);
|
||||
builder
|
||||
};
|
||||
let kind = RequestKind::ListVersions;
|
||||
|
||||
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
|
||||
prefix,
|
||||
mode,
|
||||
max_keys,
|
||||
cancel,
|
||||
kind,
|
||||
customize_builder
|
||||
));
|
||||
let mut combined: crate::VersionListing =
|
||||
stream.next().await.expect("At least one item required")?;
|
||||
while let Some(list) = stream.next().await {
|
||||
let list = list?;
|
||||
combined.versions.extend(list.versions.into_iter());
|
||||
}
|
||||
Ok(combined)
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
@@ -532,7 +613,12 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
let mut builder = blob_client.get();
|
||||
|
||||
if let Some(ref etag) = opts.etag {
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
|
||||
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
|
||||
}
|
||||
|
||||
if let Some(ref version_id) = opts.version_id {
|
||||
let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
|
||||
builder = builder.blob_versioning(version_id);
|
||||
}
|
||||
|
||||
if let Some((start, end)) = opts.byte_range() {
|
||||
|
||||
@@ -176,6 +176,32 @@ pub struct Listing {
|
||||
pub keys: Vec<ListingObject>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VersionListing {
|
||||
pub versions: Vec<Version>,
|
||||
}
|
||||
|
||||
pub struct Version {
|
||||
pub key: RemotePath,
|
||||
pub last_modified: SystemTime,
|
||||
pub kind: VersionKind,
|
||||
}
|
||||
|
||||
impl Version {
|
||||
pub fn version_id(&self) -> Option<&VersionId> {
|
||||
match &self.kind {
|
||||
VersionKind::Version(id) => Some(id),
|
||||
VersionKind::DeletionMarker => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum VersionKind {
|
||||
DeletionMarker,
|
||||
Version(VersionId),
|
||||
}
|
||||
|
||||
/// Options for downloads. The default value is a plain GET.
|
||||
pub struct DownloadOpts {
|
||||
/// If given, returns [`DownloadError::Unmodified`] if the object still has
|
||||
@@ -186,6 +212,8 @@ pub struct DownloadOpts {
|
||||
/// The end of the byte range to download, or unbounded. Must be after the
|
||||
/// start bound.
|
||||
pub byte_end: Bound<u64>,
|
||||
/// Optionally request a specific version of a key
|
||||
pub version_id: Option<VersionId>,
|
||||
/// Indicate whether we're downloading something small or large: this indirectly controls
|
||||
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
|
||||
/// for layer files
|
||||
@@ -197,12 +225,16 @@ pub enum DownloadKind {
|
||||
Small,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct VersionId(pub String);
|
||||
|
||||
impl Default for DownloadOpts {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
etag: Default::default(),
|
||||
byte_start: Bound::Unbounded,
|
||||
byte_end: Bound::Unbounded,
|
||||
version_id: None,
|
||||
kind: DownloadKind::Large,
|
||||
}
|
||||
}
|
||||
@@ -295,6 +327,14 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
Ok(combined)
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<VersionListing, DownloadError>;
|
||||
|
||||
/// Obtain metadata information about an object.
|
||||
async fn head_object(
|
||||
&self,
|
||||
@@ -475,6 +515,22 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
}
|
||||
}
|
||||
|
||||
// See [`RemoteStorage::list_versions`].
|
||||
pub async fn list_versions<'a>(
|
||||
&'a self,
|
||||
prefix: Option<&'a RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &'a CancellationToken,
|
||||
) -> Result<VersionListing, DownloadError> {
|
||||
match self {
|
||||
Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
|
||||
}
|
||||
}
|
||||
|
||||
// See [`RemoteStorage::head_object`].
|
||||
pub async fn head_object(
|
||||
&self,
|
||||
@@ -727,6 +783,7 @@ impl ConcurrencyLimiter {
|
||||
RequestKind::Copy => &self.write,
|
||||
RequestKind::TimeTravel => &self.write,
|
||||
RequestKind::Head => &self.read,
|
||||
RequestKind::ListVersions => &self.read,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -445,6 +445,16 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
_prefix: Option<&RemotePath>,
|
||||
_mode: ListingMode,
|
||||
_max_keys: Option<NonZeroU32>,
|
||||
_cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
|
||||
@@ -14,6 +14,7 @@ pub(crate) enum RequestKind {
|
||||
Copy = 4,
|
||||
TimeTravel = 5,
|
||||
Head = 6,
|
||||
ListVersions = 7,
|
||||
}
|
||||
|
||||
use RequestKind::*;
|
||||
@@ -29,6 +30,7 @@ impl RequestKind {
|
||||
Copy => "copy_object",
|
||||
TimeTravel => "time_travel_recover",
|
||||
Head => "head_object",
|
||||
ListVersions => "list_versions",
|
||||
}
|
||||
}
|
||||
const fn as_index(&self) -> usize {
|
||||
@@ -36,7 +38,10 @@ impl RequestKind {
|
||||
}
|
||||
}
|
||||
|
||||
const REQUEST_KIND_COUNT: usize = 7;
|
||||
const REQUEST_KIND_LIST: &[RequestKind] =
|
||||
&[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions];
|
||||
|
||||
const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len();
|
||||
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
|
||||
|
||||
impl<C> RequestTyped<C> {
|
||||
@@ -45,12 +50,11 @@ impl<C> RequestTyped<C> {
|
||||
}
|
||||
|
||||
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
|
||||
use RequestKind::*;
|
||||
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
|
||||
let mut it = REQUEST_KIND_LIST.iter();
|
||||
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
|
||||
let next = it.next().unwrap();
|
||||
assert_eq!(index, next.as_index());
|
||||
f(next)
|
||||
f(*next)
|
||||
});
|
||||
|
||||
if let Some(next) = it.next() {
|
||||
|
||||
@@ -21,9 +21,8 @@ use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
|
||||
use aws_sdk_s3::error::SdkError;
|
||||
use aws_sdk_s3::operation::get_object::GetObjectError;
|
||||
use aws_sdk_s3::operation::head_object::HeadObjectError;
|
||||
use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
|
||||
use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
use aws_smithy_types::DateTime;
|
||||
use aws_smithy_types::body::SdkBody;
|
||||
use aws_smithy_types::byte_stream::ByteStream;
|
||||
use aws_smithy_types::date_time::ConversionError;
|
||||
@@ -46,7 +45,7 @@ use crate::support::PermitCarrying;
|
||||
use crate::{
|
||||
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
|
||||
MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
|
||||
TimeTravelError, TimeoutOrCancel,
|
||||
TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
|
||||
};
|
||||
|
||||
/// AWS S3 storage.
|
||||
@@ -66,6 +65,7 @@ struct GetObjectRequest {
|
||||
key: String,
|
||||
etag: Option<String>,
|
||||
range: Option<String>,
|
||||
version_id: Option<String>,
|
||||
}
|
||||
impl S3Bucket {
|
||||
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
|
||||
@@ -251,6 +251,7 @@ impl S3Bucket {
|
||||
.get_object()
|
||||
.bucket(request.bucket)
|
||||
.key(request.key)
|
||||
.set_version_id(request.version_id)
|
||||
.set_range(request.range);
|
||||
|
||||
if let Some(etag) = request.etag {
|
||||
@@ -405,6 +406,124 @@ impl S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn list_versions_with_permit(
|
||||
&self,
|
||||
_permit: &tokio::sync::SemaphorePermit<'_>,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
|
||||
|
||||
let mut key_marker = None;
|
||||
let mut version_id_marker = None;
|
||||
let mut versions_and_deletes = Vec::new();
|
||||
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
let mut request = self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone());
|
||||
|
||||
if let ListingMode::WithDelimiter = mode {
|
||||
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
|
||||
}
|
||||
|
||||
let op = request.send();
|
||||
|
||||
tokio::select! {
|
||||
res = op => res.map_err(|e| DownloadError::Other(e.into())),
|
||||
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
|
||||
}
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions",
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| DownloadError::Cancelled)
|
||||
.and_then(|x| x)?;
|
||||
|
||||
tracing::trace!(
|
||||
" Got List response version_id_marker={:?}, key_marker={:?}",
|
||||
response.version_id_marker,
|
||||
response.key_marker
|
||||
);
|
||||
let versions = response
|
||||
.versions
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|version| {
|
||||
let key = version.key.expect("response does not contain a key");
|
||||
let key = self.s3_object_to_relative_path(&key);
|
||||
let version_id = VersionId(version.version_id.expect("needing version id"));
|
||||
let last_modified =
|
||||
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
|
||||
Ok(Version {
|
||||
key,
|
||||
last_modified,
|
||||
kind: crate::VersionKind::Version(version_id),
|
||||
})
|
||||
});
|
||||
let deletes = response
|
||||
.delete_markers
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(|version| {
|
||||
let key = version.key.expect("response does not contain a key");
|
||||
let key = self.s3_object_to_relative_path(&key);
|
||||
let last_modified =
|
||||
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
|
||||
Ok(Version {
|
||||
key,
|
||||
last_modified,
|
||||
kind: crate::VersionKind::DeletionMarker,
|
||||
})
|
||||
});
|
||||
itertools::process_results(versions.chain(deletes), |n_vds| {
|
||||
versions_and_deletes.extend(n_vds)
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
version_id_marker = none_if_empty(response.next_version_id_marker);
|
||||
key_marker = none_if_empty(response.next_key_marker);
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
if let Some(max_keys) = max_keys {
|
||||
if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
|
||||
return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(VersionListing {
|
||||
versions: versions_and_deletes,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn bucket_name(&self) -> &str {
|
||||
&self.bucket_name
|
||||
}
|
||||
@@ -621,6 +740,19 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
let kind = RequestKind::ListVersions;
|
||||
let permit = self.permit(kind, cancel).await?;
|
||||
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
@@ -801,6 +933,7 @@ impl RemoteStorage for S3Bucket {
|
||||
key: self.relative_path_to_s3_object(from),
|
||||
etag: opts.etag.as_ref().map(|e| e.to_string()),
|
||||
range: opts.byte_range_header(),
|
||||
version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
|
||||
},
|
||||
cancel,
|
||||
)
|
||||
@@ -845,94 +978,25 @@ impl RemoteStorage for S3Bucket {
|
||||
let kind = RequestKind::TimeTravel;
|
||||
let permit = self.permit(kind, cancel).await?;
|
||||
|
||||
let timestamp = DateTime::from(timestamp);
|
||||
let done_if_after = DateTime::from(done_if_after);
|
||||
|
||||
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
// Limit the number of versions deletions, mostly so that we don't
|
||||
// keep requesting forever if the list is too long, as we'd put the
|
||||
// list in RAM.
|
||||
// Building a list of 100k entries that reaches the limit roughly takes
|
||||
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
|
||||
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
|
||||
|
||||
let mut key_marker = None;
|
||||
let mut version_id_marker = None;
|
||||
let mut versions_and_deletes = Vec::new();
|
||||
|
||||
loop {
|
||||
let response = backoff::retry(
|
||||
|| async {
|
||||
let op = self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.set_key_marker(key_marker.clone())
|
||||
.set_version_id_marker(version_id_marker.clone())
|
||||
.send();
|
||||
|
||||
tokio::select! {
|
||||
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
|
||||
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
|
||||
}
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
cancel,
|
||||
)
|
||||
let mode = ListingMode::NoDelimiter;
|
||||
let version_listing = self
|
||||
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
|
||||
.await
|
||||
.ok_or_else(|| TimeTravelError::Cancelled)
|
||||
.and_then(|x| x)?;
|
||||
|
||||
tracing::trace!(
|
||||
" Got List response version_id_marker={:?}, key_marker={:?}",
|
||||
response.version_id_marker,
|
||||
response.key_marker
|
||||
);
|
||||
let versions = response
|
||||
.versions
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(VerOrDelete::from_version);
|
||||
let deletes = response
|
||||
.delete_markers
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.map(VerOrDelete::from_delete_marker);
|
||||
itertools::process_results(versions.chain(deletes), |n_vds| {
|
||||
versions_and_deletes.extend(n_vds)
|
||||
})
|
||||
.map_err(TimeTravelError::Other)?;
|
||||
fn none_if_empty(v: Option<String>) -> Option<String> {
|
||||
v.filter(|v| !v.is_empty())
|
||||
}
|
||||
version_id_marker = none_if_empty(response.next_version_id_marker);
|
||||
key_marker = none_if_empty(response.next_key_marker);
|
||||
if version_id_marker.is_none() {
|
||||
// The final response is not supposed to be truncated
|
||||
if response.is_truncated.unwrap_or_default() {
|
||||
return Err(TimeTravelError::Other(anyhow::anyhow!(
|
||||
"Received truncated ListObjectVersions response for prefix={prefix:?}"
|
||||
)));
|
||||
}
|
||||
break;
|
||||
}
|
||||
// Limit the number of versions deletions, mostly so that we don't
|
||||
// keep requesting forever if the list is too long, as we'd put the
|
||||
// list in RAM.
|
||||
// Building a list of 100k entries that reaches the limit roughly takes
|
||||
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
|
||||
const COMPLEXITY_LIMIT: usize = 100_000;
|
||||
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
|
||||
return Err(TimeTravelError::TooManyVersions);
|
||||
}
|
||||
}
|
||||
.map_err(|err| match err {
|
||||
DownloadError::Other(e) => TimeTravelError::Other(e),
|
||||
DownloadError::Cancelled => TimeTravelError::Cancelled,
|
||||
other => TimeTravelError::Other(other.into()),
|
||||
})?;
|
||||
let versions_and_deletes = version_listing.versions;
|
||||
|
||||
tracing::info!(
|
||||
"Built list for time travel with {} versions and deletions",
|
||||
@@ -948,24 +1012,26 @@ impl RemoteStorage for S3Bucket {
|
||||
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
|
||||
|
||||
for vd in &versions_and_deletes {
|
||||
let VerOrDelete {
|
||||
version_id, key, ..
|
||||
} = &vd;
|
||||
if version_id == "null" {
|
||||
let Version { key, .. } = &vd;
|
||||
let version_id = vd.version_id().map(|v| v.0.as_str());
|
||||
if version_id == Some("null") {
|
||||
return Err(TimeTravelError::Other(anyhow!(
|
||||
"Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values"
|
||||
)));
|
||||
}
|
||||
tracing::trace!(
|
||||
"Parsing version key={key} version_id={version_id} kind={:?}",
|
||||
vd.kind
|
||||
);
|
||||
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
|
||||
|
||||
vds_for_key.entry(key).or_default().push(vd);
|
||||
}
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
|
||||
|
||||
for (key, versions) in vds_for_key {
|
||||
let last_vd = versions.last().unwrap();
|
||||
let key = self.relative_path_to_s3_object(key);
|
||||
if last_vd.last_modified > done_if_after {
|
||||
tracing::trace!("Key {key} has version later than done_if_after, skipping");
|
||||
continue;
|
||||
@@ -990,11 +1056,11 @@ impl RemoteStorage for S3Bucket {
|
||||
do_delete = true;
|
||||
} else {
|
||||
match &versions[version_to_restore_to - 1] {
|
||||
VerOrDelete {
|
||||
kind: VerOrDeleteKind::Version,
|
||||
version_id,
|
||||
Version {
|
||||
kind: VersionKind::Version(version_id),
|
||||
..
|
||||
} => {
|
||||
let version_id = &version_id.0;
|
||||
tracing::trace!("Copying old version {version_id} for {key}...");
|
||||
// Restore the state to the last version by copying
|
||||
let source_id =
|
||||
@@ -1006,7 +1072,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.client
|
||||
.copy_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.key(key)
|
||||
.key(&key)
|
||||
.set_storage_class(self.upload_storage_class.clone())
|
||||
.copy_source(&source_id)
|
||||
.send();
|
||||
@@ -1027,8 +1093,8 @@ impl RemoteStorage for S3Bucket {
|
||||
.and_then(|x| x)?;
|
||||
tracing::info!(%version_id, %key, "Copied old version in S3");
|
||||
}
|
||||
VerOrDelete {
|
||||
kind: VerOrDeleteKind::DeleteMarker,
|
||||
Version {
|
||||
kind: VersionKind::DeletionMarker,
|
||||
..
|
||||
} => {
|
||||
do_delete = true;
|
||||
@@ -1036,7 +1102,7 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
};
|
||||
if do_delete {
|
||||
if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
|
||||
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
|
||||
// Key has since been deleted (but there was some history), no need to do anything
|
||||
tracing::trace!("Key {key} already deleted, skipping.");
|
||||
} else {
|
||||
@@ -1064,62 +1130,6 @@ impl RemoteStorage for S3Bucket {
|
||||
}
|
||||
}
|
||||
|
||||
// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
|
||||
struct VerOrDelete {
|
||||
kind: VerOrDeleteKind,
|
||||
last_modified: DateTime,
|
||||
version_id: String,
|
||||
key: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum VerOrDeleteKind {
|
||||
Version,
|
||||
DeleteMarker,
|
||||
}
|
||||
|
||||
impl VerOrDelete {
|
||||
fn with_kind(
|
||||
kind: VerOrDeleteKind,
|
||||
last_modified: Option<DateTime>,
|
||||
version_id: Option<String>,
|
||||
key: Option<String>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let lvk = (last_modified, version_id, key);
|
||||
let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
|
||||
anyhow::bail!(
|
||||
"One (or more) of last_modified, key, and id is None. \
|
||||
Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
|
||||
lvk.0,
|
||||
lvk.1,
|
||||
lvk.2,
|
||||
);
|
||||
};
|
||||
Ok(Self {
|
||||
kind,
|
||||
last_modified,
|
||||
version_id,
|
||||
key,
|
||||
})
|
||||
}
|
||||
fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
|
||||
Self::with_kind(
|
||||
VerOrDeleteKind::Version,
|
||||
v.last_modified,
|
||||
v.version_id,
|
||||
v.key,
|
||||
)
|
||||
}
|
||||
fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
|
||||
Self::with_kind(
|
||||
VerOrDeleteKind::DeleteMarker,
|
||||
v.last_modified,
|
||||
v.version_id,
|
||||
v.key,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroUsize;
|
||||
|
||||
@@ -139,6 +139,20 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.inner.list(prefix, mode, max_keys, cancel).await
|
||||
}
|
||||
|
||||
async fn list_versions(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
mode: ListingMode,
|
||||
max_keys: Option<NonZeroU32>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<crate::VersionListing, DownloadError> {
|
||||
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
|
||||
.map_err(DownloadError::Other)?;
|
||||
self.inner
|
||||
.list_versions(prefix, mode, max_keys, cancel)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn head_object(
|
||||
&self,
|
||||
key: &RemotePath,
|
||||
|
||||
@@ -11,6 +11,7 @@ use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::storage_layer::InMemoryLayer;
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -21,13 +22,14 @@ use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
// A very cheap hash for generating non-sequential keys.
|
||||
fn murmurhash32(mut h: u32) -> u32 {
|
||||
h ^= h >> 16;
|
||||
h = h.wrapping_mul(0x85ebca6b);
|
||||
h h.wrapping_mul(0x85ebca6b);
|
||||
h ^= h >> 13;
|
||||
h = h.wrapping_mul(0xc2b2ae35);
|
||||
h ^= h >> 16;
|
||||
h
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
||||
enum KeyLayout {
|
||||
/// Sequential unique keys
|
||||
Sequential,
|
||||
@@ -37,6 +39,7 @@ enum KeyLayout {
|
||||
RandomReuse(u32),
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, Clone, Copy, Debug)]
|
||||
enum WriteDelta {
|
||||
Yes,
|
||||
No,
|
||||
@@ -138,12 +141,15 @@ async fn ingest(
|
||||
/// Wrapper to instantiate a tokio runtime
|
||||
fn ingest_main(
|
||||
conf: &'static PageServerConf,
|
||||
io_mode: IoMode,
|
||||
put_size: usize,
|
||||
put_count: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
) {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
pageserver::virtual_file::set_io_mode(io_mode);
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
@@ -174,93 +180,207 @@ fn criterion_benchmark(c: &mut Criterion) {
|
||||
virtual_file::init(
|
||||
16384,
|
||||
virtual_file::io_engine_for_bench(),
|
||||
// immaterial, each `ingest_main` invocation below overrides this
|
||||
conf.virtual_file_io_mode,
|
||||
// without actually doing syncs, buffered writes have an unfair advantage over direct IO writes
|
||||
virtual_file::SyncMode::Sync,
|
||||
);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
{
|
||||
let mut group = c.benchmark_group("ingest-small-values");
|
||||
let put_size = 100usize;
|
||||
let put_count = 128 * 1024 * 1024 / put_size;
|
||||
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
||||
group.sample_size(10);
|
||||
group.bench_function("ingest 128MB/100b seq", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/100b rand", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Random,
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::RandomReuse(0x3ff),
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::No,
|
||||
)
|
||||
})
|
||||
});
|
||||
#[derive(serde::Serialize)]
|
||||
struct ExplodedParameters {
|
||||
io_mode: IoMode,
|
||||
volume_mib: usize,
|
||||
key_size: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
}
|
||||
|
||||
{
|
||||
let mut group = c.benchmark_group("ingest-big-values");
|
||||
let put_size = 8192usize;
|
||||
let put_count = 128 * 1024 * 1024 / put_size;
|
||||
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
|
||||
#[derive(Clone)]
|
||||
struct HandPickedParameters {
|
||||
volume_mib: usize,
|
||||
key_size: usize,
|
||||
key_layout: KeyLayout,
|
||||
write_delta: WriteDelta,
|
||||
}
|
||||
let expect = vec![
|
||||
// Small values (100b) tests
|
||||
HandPickedParameters {
|
||||
volume_mib: 128,
|
||||
key_size: 100,
|
||||
key_layout: KeyLayout::Sequential,
|
||||
write_delta: WriteDelta::Yes,
|
||||
},
|
||||
HandPickedParameters {
|
||||
volume_mib: 128,
|
||||
key_size: 100,
|
||||
key_layout: KeyLayout::Random,
|
||||
write_delta: WriteDelta::Yes,
|
||||
},
|
||||
HandPickedParameters {
|
||||
volume_mib: 128,
|
||||
key_size: 100,
|
||||
key_layout: KeyLayout::RandomReuse(0x3ff),
|
||||
write_delta: WriteDelta::Yes,
|
||||
},
|
||||
HandPickedParameters {
|
||||
volume_mib: 128,
|
||||
key_size: 100,
|
||||
key_layout: KeyLayout::Sequential,
|
||||
write_delta: WriteDelta::No,
|
||||
},
|
||||
// Large values (8k) tests
|
||||
HandPickedParameters {
|
||||
volume_mib: 128,
|
||||
key_size: 8192,
|
||||
key_layout: KeyLayout::Sequential,
|
||||
write_delta: WriteDelta::Yes,
|
||||
},
|
||||
HandPickedParameters {
|
||||
volume_mib: 128,
|
||||
key_size: 8192,
|
||||
key_layout: KeyLayout::Sequential,
|
||||
write_delta: WriteDelta::No,
|
||||
},
|
||||
];
|
||||
let exploded_parameters = {
|
||||
let mut out = Vec::new();
|
||||
for io_mode in [
|
||||
IoMode::Buffered,
|
||||
#[cfg(target_os = "linux")]
|
||||
IoMode::Direct,
|
||||
] {
|
||||
for param in expect.clone() {
|
||||
let HandPickedParameters {
|
||||
volume_mib,
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
} = param;
|
||||
out.push(ExplodedParameters {
|
||||
io_mode,
|
||||
volume_mib,
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
});
|
||||
}
|
||||
}
|
||||
out
|
||||
};
|
||||
impl ExplodedParameters {
|
||||
fn benchmark_id(&self) -> String {
|
||||
let ExplodedParameters {
|
||||
io_mode,
|
||||
volume_mib,
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
} = self;
|
||||
format!(
|
||||
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
|
||||
)
|
||||
}
|
||||
}
|
||||
let mut group = c.benchmark_group("ingest");
|
||||
for params in exploded_parameters {
|
||||
let id = params.benchmark_id();
|
||||
let ExplodedParameters {
|
||||
io_mode,
|
||||
volume_mib,
|
||||
key_size,
|
||||
key_layout,
|
||||
write_delta,
|
||||
} = params;
|
||||
let put_count = volume_mib * 1024 * 1024 / key_size;
|
||||
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
|
||||
group.sample_size(10);
|
||||
group.bench_function("ingest 128MB/8k seq", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::Yes,
|
||||
)
|
||||
})
|
||||
});
|
||||
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
|
||||
b.iter(|| {
|
||||
ingest_main(
|
||||
conf,
|
||||
put_size,
|
||||
put_count,
|
||||
KeyLayout::Sequential,
|
||||
WriteDelta::No,
|
||||
)
|
||||
})
|
||||
group.bench_function(id, |b| {
|
||||
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
criterion_group!(benches, criterion_benchmark);
|
||||
criterion_main!(benches);
|
||||
|
||||
/*
|
||||
cargo bench --bench bench_ingest
|
||||
|
||||
im4gn.2xlarge:
|
||||
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.8491 s 1.8540 s 1.8592 s]
|
||||
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [2.6976 s 2.7123 s 2.7286 s]
|
||||
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
|
||||
time: [1.7433 s 1.7510 s 1.7600 s]
|
||||
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [499.63 ms 500.07 ms 500.46 ms]
|
||||
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [456.97 ms 459.61 ms 461.92 ms]
|
||||
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [158.82 ms 159.16 ms 159.56 ms]
|
||||
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.8856 s 1.8997 s 1.9179 s]
|
||||
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [2.7468 s 2.7625 s 2.7785 s]
|
||||
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
|
||||
time: [1.7689 s 1.7726 s 1.7767 s]
|
||||
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [497.64 ms 498.60 ms 499.67 ms]
|
||||
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [493.72 ms 505.07 ms 518.03 ms]
|
||||
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [267.76 ms 267.85 ms 267.96 ms]
|
||||
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
|
||||
|
||||
Hetzner AX102:
|
||||
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.0683 s 1.1006 s 1.1386 s]
|
||||
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [1.5719 s 1.6012 s 1.6228 s]
|
||||
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
|
||||
time: [1.1095 s 1.1331 s 1.1580 s]
|
||||
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [303.20 ms 307.83 ms 311.90 ms]
|
||||
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [406.34 ms 429.37 ms 451.63 ms]
|
||||
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
|
||||
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [134.01 ms 135.78 ms 137.48 ms]
|
||||
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
|
||||
time: [1.0406 s 1.0580 s 1.0772 s]
|
||||
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
|
||||
time: [1.5059 s 1.5339 s 1.5625 s]
|
||||
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
|
||||
time: [1.0714 s 1.0934 s 1.1161 s]
|
||||
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
|
||||
time: [262.68 ms 265.14 ms 267.71 ms]
|
||||
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
|
||||
time: [375.19 ms 393.80 ms 411.40 ms]
|
||||
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
|
||||
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
|
||||
time: [123.02 ms 123.85 ms 124.66 ms]
|
||||
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
|
||||
*/
|
||||
|
||||
@@ -419,6 +419,23 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn timeline_detail(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineInfo> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
|
||||
self.request(Method::GET, &uri, ())
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn timeline_archival_config(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -225,6 +225,11 @@ pub struct PageServerConf {
|
||||
/// Does not force TLS: the client negotiates TLS usage during the handshake.
|
||||
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
|
||||
pub enable_tls_page_service_api: bool,
|
||||
|
||||
/// Run in development mode, which disables certain safety checks
|
||||
/// such as authentication requirements for HTTP and PostgreSQL APIs.
|
||||
/// This is insecure and should only be used in development environments.
|
||||
pub dev_mode: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -398,6 +403,7 @@ impl PageServerConf {
|
||||
generate_unarchival_heatmap,
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -449,6 +455,7 @@ impl PageServerConf {
|
||||
get_vectored_concurrent_io,
|
||||
tracing,
|
||||
enable_tls_page_service_api,
|
||||
dev_mode,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -3,10 +3,11 @@ use std::collections::HashMap;
|
||||
use futures::Future;
|
||||
use pageserver_api::config::NodeMetadata;
|
||||
use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
|
||||
use pageserver_api::models::ShardImportStatus;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
|
||||
ValidateRequestTenant, ValidateResponse,
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
|
||||
ValidateRequest, ValidateRequestTenant, ValidateResponse,
|
||||
};
|
||||
use reqwest::Certificate;
|
||||
use serde::Serialize;
|
||||
@@ -14,7 +15,7 @@ use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use url::Url;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::NodeId;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::{backoff, failpoint_support};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -46,6 +47,12 @@ pub trait StorageControllerUpcallApi {
|
||||
&self,
|
||||
tenants: Vec<(TenantShardId, Generation)>,
|
||||
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
|
||||
fn put_timeline_import_status(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
status: ShardImportStatus,
|
||||
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
|
||||
}
|
||||
|
||||
impl StorageControllerUpcallClient {
|
||||
@@ -273,4 +280,30 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
|
||||
|
||||
Ok(result.into_iter().collect())
|
||||
}
|
||||
|
||||
/// Send a shard import status to the storage controller
|
||||
///
|
||||
/// The implementation must have at-least-once delivery semantics.
|
||||
/// To this end, we retry the request until it succeeds. If the pageserver
|
||||
/// restarts or crashes, the shard import will start again from the beggining.
|
||||
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
|
||||
async fn put_timeline_import_status(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
status: ShardImportStatus,
|
||||
) -> Result<(), RetryForeverError> {
|
||||
let url = self
|
||||
.base_url
|
||||
.join("timeline_import_status")
|
||||
.expect("Failed to build path");
|
||||
|
||||
let request = PutTimelineImportStatusRequest {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
status,
|
||||
};
|
||||
|
||||
self.retry_http_forever(&url, request).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -787,6 +787,15 @@ mod test {
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn put_timeline_import_status(
|
||||
&self,
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
_status: pageserver_api::models::ShardImportStatus,
|
||||
) -> Result<(), RetryForeverError> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
|
||||
|
||||
@@ -28,7 +28,7 @@ use tracing::warn;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
@@ -218,7 +218,7 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
|
||||
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
|
||||
/// manually before dropping.
|
||||
pub struct BlobWriter<const BUFFERED: bool> {
|
||||
inner: VirtualFile,
|
||||
inner: TempVirtualFile,
|
||||
offset: u64,
|
||||
/// A buffer to save on write calls, only used if BUFFERED=true
|
||||
buf: Vec<u8>,
|
||||
@@ -228,7 +228,7 @@ pub struct BlobWriter<const BUFFERED: bool> {
|
||||
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
pub fn new(
|
||||
inner: VirtualFile,
|
||||
inner: TempVirtualFile,
|
||||
start_offset: u64,
|
||||
_gate: &utils::sync::gate::Gate,
|
||||
_cancel: CancellationToken,
|
||||
@@ -476,30 +476,17 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobWriter<true> {
|
||||
/// Access the underlying `VirtualFile`.
|
||||
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
|
||||
///
|
||||
/// This function flushes the internal buffer before giving access
|
||||
/// to the underlying `VirtualFile`.
|
||||
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
|
||||
self.flush_buffer(ctx).await?;
|
||||
/// If there is an internal buffer (depends on `BUFFERED`), it will
|
||||
/// be flushed before this method returns.
|
||||
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
|
||||
if BUFFERED {
|
||||
self.flush_buffer(ctx).await?;
|
||||
}
|
||||
Ok(self.inner)
|
||||
}
|
||||
|
||||
/// Access the underlying `VirtualFile`.
|
||||
///
|
||||
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
|
||||
/// the internal buffer before giving access.
|
||||
pub fn into_inner_no_flush(self) -> VirtualFile {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobWriter<false> {
|
||||
/// Access the underlying `VirtualFile`.
|
||||
pub fn into_inner(self) -> VirtualFile {
|
||||
self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -512,6 +499,7 @@ pub(crate) mod tests {
|
||||
use crate::context::DownloadBehavior;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::block_io::BlockReaderRef;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
round_trip_test_compressed::<BUFFERED>(blobs, false).await
|
||||
@@ -530,7 +518,10 @@ pub(crate) mod tests {
|
||||
// Write part (in block to drop the file)
|
||||
let mut offsets = Vec::new();
|
||||
{
|
||||
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
|
||||
let file = TempVirtualFile::new(
|
||||
VirtualFile::create(pathbuf.as_path(), ctx).await?,
|
||||
gate.enter().unwrap(),
|
||||
);
|
||||
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
|
||||
for blob in blobs.iter() {
|
||||
let (_, res) = if compression {
|
||||
@@ -553,7 +544,9 @@ pub(crate) mod tests {
|
||||
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
|
||||
let offs = res?;
|
||||
println!("Writing final blob at offs={offs}");
|
||||
wtr.flush_buffer(ctx).await?;
|
||||
|
||||
let file = wtr.into_inner(ctx).await?;
|
||||
file.disarm_into_inner();
|
||||
}
|
||||
Ok((temp_dir, pathbuf, offsets))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use tokio_epoll_uring::{BoundedBuf, Slice};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info_span};
|
||||
use utils::id::TimelineId;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
|
||||
use crate::config::PageServerConf;
|
||||
@@ -21,16 +22,33 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
|
||||
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
|
||||
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
|
||||
use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
|
||||
use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
|
||||
|
||||
use self::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
|
||||
pub struct EphemeralFile {
|
||||
_tenant_shard_id: TenantShardId,
|
||||
_timeline_id: TimelineId,
|
||||
page_cache_file_id: page_cache::FileId,
|
||||
bytes_written: u64,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
|
||||
_gate_guard: utils::sync::gate::GateGuard,
|
||||
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
|
||||
buffered_writer: BufferedWriter,
|
||||
}
|
||||
|
||||
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
|
||||
IoBufferMut,
|
||||
TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
|
||||
>;
|
||||
|
||||
/// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
|
||||
///
|
||||
/// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
|
||||
/// The co-ownership is between [`EphemeralFile`] and that flush task.)
|
||||
///
|
||||
/// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
|
||||
#[derive(Debug, Clone)]
|
||||
struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
|
||||
inner: Arc<TempVirtualFile>,
|
||||
}
|
||||
|
||||
const TAIL_SZ: usize = 64 * 1024;
|
||||
@@ -44,9 +62,12 @@ impl EphemeralFile {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<EphemeralFile> {
|
||||
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
|
||||
// TempVirtualFile requires us to never reuse a filename while an old
|
||||
// instance of TempVirtualFile created with that filename is not done dropping yet.
|
||||
// So, we use a monotonic counter to disambiguate the filenames.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let filename = conf
|
||||
.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
@@ -54,7 +75,7 @@ impl EphemeralFile {
|
||||
"ephemeral-{filename_disambiguator}"
|
||||
)));
|
||||
|
||||
let file = Arc::new(
|
||||
let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
|
||||
VirtualFile::open_with_options_v2(
|
||||
&filename,
|
||||
virtual_file::OpenOptions::new()
|
||||
@@ -64,6 +85,7 @@ impl EphemeralFile {
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
|
||||
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
|
||||
@@ -73,7 +95,8 @@ impl EphemeralFile {
|
||||
_timeline_id: timeline_id,
|
||||
page_cache_file_id,
|
||||
bytes_written: 0,
|
||||
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
|
||||
file: file.clone(),
|
||||
buffered_writer: BufferedWriter::new(
|
||||
file,
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
@@ -81,29 +104,42 @@ impl EphemeralFile {
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// unlink the file
|
||||
// we are clear to do this, because we have entered a gate
|
||||
let path = self.buffered_writer.as_inner().path();
|
||||
let res = std::fs::remove_file(path);
|
||||
if let Err(e) = res {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
// just never log the not found errors, we cannot do anything for them; on detach
|
||||
// the tenant directory is already gone.
|
||||
//
|
||||
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
|
||||
error!("could not remove ephemeral file '{path}': {e}");
|
||||
}
|
||||
impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
|
||||
fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
|
||||
fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl std::future::Future<
|
||||
Output = (
|
||||
owned_buffers_io::io_buf_ext::FullSlice<Buf>,
|
||||
std::io::Result<()>,
|
||||
),
|
||||
> + Send {
|
||||
self.inner.write_all_at(buf, offset, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
|
||||
type Target = VirtualFile;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum EphemeralFileWriteError {
|
||||
#[error("{0}")]
|
||||
@@ -262,9 +298,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
|
||||
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
|
||||
|
||||
let dst = if written_range.len() > 0 {
|
||||
let file: &VirtualFile = self.buffered_writer.as_inner();
|
||||
let bounds = dst.bounds();
|
||||
let slice = file
|
||||
let slice = self
|
||||
.file
|
||||
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
|
||||
.await?;
|
||||
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
|
||||
@@ -456,7 +492,7 @@ mod tests {
|
||||
assert_eq!(&buf, &content[range]);
|
||||
}
|
||||
|
||||
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
|
||||
let file_contents = std::fs::read(file.file.path()).unwrap();
|
||||
assert!(file_contents == content[0..cap * 2]);
|
||||
|
||||
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
|
||||
@@ -489,7 +525,7 @@ mod tests {
|
||||
// assert the state is as this test expects it to be
|
||||
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
|
||||
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
|
||||
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
|
||||
let md = file.file.path().metadata().unwrap();
|
||||
assert_eq!(
|
||||
md.len(),
|
||||
2 * cap.into_u64(),
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::future::Future;
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
@@ -15,7 +16,7 @@ use remote_storage::{
|
||||
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
|
||||
};
|
||||
use tokio::fs::{self, File, OpenOptions};
|
||||
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
@@ -40,7 +41,10 @@ use crate::span::{
|
||||
use crate::tenant::Generation;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
|
||||
|
||||
///
|
||||
/// If 'metadata' is given, we will validate that the downloaded file's size matches that
|
||||
@@ -72,21 +76,36 @@ pub async fn download_layer_file<'a>(
|
||||
layer_metadata.generation,
|
||||
);
|
||||
|
||||
// Perform a rename inspired by durable_rename from file_utils.c.
|
||||
// The sequence:
|
||||
// write(tmp)
|
||||
// fsync(tmp)
|
||||
// rename(tmp, new)
|
||||
// fsync(new)
|
||||
// fsync(parent)
|
||||
// For more context about durable_rename check this email from postgres mailing list:
|
||||
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
|
||||
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
|
||||
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
|
||||
|
||||
let bytes_amount = download_retry(
|
||||
let (bytes_amount, temp_file) = download_retry(
|
||||
|| async {
|
||||
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
|
||||
// TempVirtualFile requires us to never reuse a filename while an old
|
||||
// instance of TempVirtualFile created with that filename is not done dropping yet.
|
||||
// So, we use a monotonic counter to disambiguate the filenames.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
let temp_file_path = path_with_suffix_extension(
|
||||
local_path,
|
||||
&format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
|
||||
);
|
||||
|
||||
let temp_file = TempVirtualFile::new(
|
||||
// Not _v2 yet which is sensitive to virtual_file_io_mode.
|
||||
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
|
||||
VirtualFile::open_with_options(
|
||||
&temp_file_path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
|
||||
.map_err(DownloadError::Other)?,
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
);
|
||||
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
|
||||
},
|
||||
&format!("download {remote_path:?}"),
|
||||
cancel,
|
||||
@@ -96,7 +115,8 @@ pub async fn download_layer_file<'a>(
|
||||
let expected = layer_metadata.file_size;
|
||||
if expected != bytes_amount {
|
||||
return Err(DownloadError::Other(anyhow!(
|
||||
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
|
||||
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
|
||||
temp_file.path()
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -106,11 +126,28 @@ pub async fn download_layer_file<'a>(
|
||||
)))
|
||||
});
|
||||
|
||||
fs::rename(&temp_file_path, &local_path)
|
||||
// Try rename before disarming the temp file.
|
||||
// That way, if rename fails for whatever reason, we clean up the temp file on the return path.
|
||||
|
||||
fs::rename(temp_file.path(), &local_path)
|
||||
.await
|
||||
.with_context(|| format!("rename download layer file to {local_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
// The temp file's VirtualFile points to the temp_file_path which we moved above.
|
||||
// Drop it immediately, it's invalid.
|
||||
// This will get better in https://github.com/neondatabase/neon/issues/11692
|
||||
let _: VirtualFile = temp_file.disarm_into_inner();
|
||||
// NB: The gate guard that was stored in `temp_file` is dropped but we continue
|
||||
// to operate on it and on the parent timeline directory.
|
||||
// Those operations are safe to do because higher-level code is holding another gate guard:
|
||||
// - attached mode: the download task spawned by struct Layer is holding the gate guard
|
||||
// - secondary mode: The TenantDownloader::download holds the gate open
|
||||
|
||||
// The rename above is not durable yet.
|
||||
// It doesn't matter for crash consistency because pageserver startup deletes temp
|
||||
// files and we'll re-download on demand if necessary.
|
||||
|
||||
// We use fatal_err() below because the after the rename above,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
@@ -146,147 +183,58 @@ pub async fn download_layer_file<'a>(
|
||||
async fn download_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
src_path: &RemotePath,
|
||||
dst_path: &Utf8PathBuf,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
|
||||
destination_file: TempVirtualFile,
|
||||
gate: &utils::sync::gate::Gate,
|
||||
cancel: &CancellationToken,
|
||||
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
|
||||
) -> Result<u64, DownloadError> {
|
||||
let res = match crate::virtual_file::io_engine::get() {
|
||||
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
|
||||
crate::virtual_file::io_engine::IoEngine::StdFs => {
|
||||
async {
|
||||
let destination_file = tokio::fs::File::create(dst_path)
|
||||
.await
|
||||
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u64, TempVirtualFile), DownloadError> {
|
||||
let mut download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
let download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
let dst_path = destination_file.path().to_owned();
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
destination_file,
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
|
||||
);
|
||||
|
||||
let mut buf_writer =
|
||||
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
|
||||
|
||||
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
|
||||
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
|
||||
buf_writer.flush().await?;
|
||||
|
||||
let mut destination_file = buf_writer.into_inner();
|
||||
|
||||
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
|
||||
// A file will not be closed immediately when it goes out of scope if there are any IO operations
|
||||
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
|
||||
// you should call flush before dropping it.
|
||||
//
|
||||
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
|
||||
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
|
||||
// But for additional safety lets check/wait for any pending operations.
|
||||
destination_file
|
||||
.flush()
|
||||
.await
|
||||
.maybe_fatal_err("download_object sync_all")
|
||||
.with_context(|| format!("flush source file at {dst_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
// not using sync_data because it can lose file size update
|
||||
destination_file
|
||||
.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("download_object sync_all")
|
||||
.with_context(|| format!("failed to fsync source file at {dst_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok(bytes_amount)
|
||||
}
|
||||
.await
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::virtual_file::{IoBufferMut, owned_buffers_io};
|
||||
async {
|
||||
let destination_file = Arc::new(
|
||||
VirtualFile::create(dst_path, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("create a destination file for layer '{dst_path}'")
|
||||
})
|
||||
.map_err(DownloadError::Other)?,
|
||||
);
|
||||
|
||||
let mut download = storage
|
||||
.download(src_path, &DownloadOpts::default(), cancel)
|
||||
.await?;
|
||||
|
||||
pausable_failpoint!("before-downloading-layer-stream-pausable");
|
||||
|
||||
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
|
||||
destination_file,
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
cancel.child_token(),
|
||||
ctx,
|
||||
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
|
||||
);
|
||||
|
||||
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
|
||||
// There's chunks_vectored() on the stream.
|
||||
let (bytes_amount, destination_file) = async {
|
||||
while let Some(res) =
|
||||
futures::StreamExt::next(&mut download.download_stream).await
|
||||
{
|
||||
let chunk = match res {
|
||||
Ok(chunk) => chunk,
|
||||
Err(e) => return Err(DownloadError::from(e)),
|
||||
};
|
||||
buffered
|
||||
.write_buffered_borrowed(&chunk, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
}
|
||||
let inner = buffered
|
||||
.flush_and_into_inner(ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
Ok(inner)
|
||||
}
|
||||
.await?;
|
||||
|
||||
// not using sync_data because it can lose file size update
|
||||
destination_file
|
||||
.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("download_object sync_all")
|
||||
.with_context(|| format!("failed to fsync source file at {dst_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok(bytes_amount)
|
||||
}
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
// in case the download failed, clean up
|
||||
match res {
|
||||
Ok(bytes_amount) => Ok(bytes_amount),
|
||||
Err(e) => {
|
||||
if let Err(e) = tokio::fs::remove_file(dst_path).await {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
|
||||
}
|
||||
}
|
||||
Err(e)
|
||||
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
|
||||
// There's chunks_vectored() on the stream.
|
||||
let (bytes_amount, destination_file) = async {
|
||||
while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
|
||||
let chunk = match res {
|
||||
Ok(chunk) => chunk,
|
||||
Err(e) => return Err(DownloadError::from(e)),
|
||||
};
|
||||
buffered
|
||||
.write_buffered_borrowed(&chunk, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
}
|
||||
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
|
||||
FlushTaskError::Cancelled => DownloadError::Cancelled,
|
||||
})?;
|
||||
Ok(inner)
|
||||
}
|
||||
.await?;
|
||||
|
||||
// not using sync_data because it can lose file size update
|
||||
destination_file
|
||||
.sync_all()
|
||||
.await
|
||||
.maybe_fatal_err("download_object sync_all")
|
||||
.with_context(|| format!("failed to fsync source file at {dst_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((bytes_amount, destination_file))
|
||||
}
|
||||
|
||||
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
|
||||
|
||||
@@ -646,7 +646,7 @@ enum UpdateError {
|
||||
NoData,
|
||||
#[error("Insufficient local storage space")]
|
||||
NoSpace,
|
||||
#[error("Failed to download")]
|
||||
#[error("Failed to download: {0}")]
|
||||
DownloadError(DownloadError),
|
||||
#[error(transparent)]
|
||||
Deserialize(#[from] serde_json::Error),
|
||||
|
||||
@@ -34,6 +34,7 @@ use std::ops::Range;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_epoll_uring::IoBuf;
|
||||
@@ -74,6 +73,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
@@ -288,19 +288,20 @@ impl DeltaLayer {
|
||||
key_start: Key,
|
||||
lsn_range: &Range<Lsn>,
|
||||
) -> Utf8PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
// TempVirtualFile requires us to never reuse a filename while an old
|
||||
// instance of TempVirtualFile created with that filename is not done dropping yet.
|
||||
// So, we use a monotonic counter to disambiguate the filenames.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
conf.timeline_path(tenant_shard_id, timeline_id)
|
||||
.join(format!(
|
||||
"{}-XXX__{:016X}-{:016X}.{}.{}",
|
||||
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
|
||||
key_start,
|
||||
u64::from(lsn_range.start),
|
||||
u64::from(lsn_range.end),
|
||||
rand_string,
|
||||
filename_disambiguator,
|
||||
TEMP_FILE_SUFFIX,
|
||||
))
|
||||
}
|
||||
@@ -421,7 +422,7 @@ impl DeltaLayerWriterInner {
|
||||
let path =
|
||||
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
|
||||
|
||||
let mut file = VirtualFile::create(&path, ctx).await?;
|
||||
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
@@ -515,22 +516,6 @@ impl DeltaLayerWriterInner {
|
||||
self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let temp_path = self.path.clone();
|
||||
let result = self.finish0(key_end, ctx).await;
|
||||
if let Err(ref e) = result {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
async fn finish0(
|
||||
self,
|
||||
key_end: Key,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -598,6 +583,10 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
|
||||
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
|
||||
file.disarm_into_inner();
|
||||
|
||||
Ok((desc, self.path))
|
||||
}
|
||||
}
|
||||
@@ -726,17 +715,6 @@ impl DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for DeltaLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
// We want to remove the virtual file here, so it's fine to not
|
||||
// having completely flushed unwritten data.
|
||||
let vfile = inner.blob_writer.into_inner_no_flush();
|
||||
vfile.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
@@ -1609,8 +1587,8 @@ pub(crate) mod test {
|
||||
use bytes::Bytes;
|
||||
use itertools::MinMaxResult;
|
||||
use pageserver_api::value::Value;
|
||||
use rand::RngCore;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use rand::{Rng, RngCore};
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
|
||||
@@ -32,6 +32,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use anyhow::{Context, Result, bail, ensure};
|
||||
use bytes::Bytes;
|
||||
@@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_api::value::Value;
|
||||
use rand::Rng;
|
||||
use rand::distributions::Alphanumeric;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
@@ -72,6 +71,7 @@ use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadPlanner,
|
||||
};
|
||||
use crate::virtual_file::TempVirtualFile;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
|
||||
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
@@ -252,14 +252,18 @@ impl ImageLayer {
|
||||
tenant_shard_id: TenantShardId,
|
||||
fname: &ImageLayerName,
|
||||
) -> Utf8PathBuf {
|
||||
let rand_string: String = rand::thread_rng()
|
||||
.sample_iter(&Alphanumeric)
|
||||
.take(8)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
// TempVirtualFile requires us to never reuse a filename while an old
|
||||
// instance of TempVirtualFile created with that filename is not done dropping yet.
|
||||
// So, we use a monotonic counter to disambiguate the filenames.
|
||||
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
|
||||
let filename_disambiguator =
|
||||
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
conf.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
|
||||
.join(format!(
|
||||
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
|
||||
filename_disambiguator
|
||||
))
|
||||
}
|
||||
|
||||
///
|
||||
@@ -773,7 +777,7 @@ impl ImageLayerWriterInner {
|
||||
},
|
||||
);
|
||||
trace!("creating image layer {}", path);
|
||||
let mut file = {
|
||||
let mut file = TempVirtualFile::new(
|
||||
VirtualFile::open_with_options(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
@@ -781,8 +785,9 @@ impl ImageLayerWriterInner {
|
||||
.create_new(true),
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
.await?,
|
||||
gate.enter()?,
|
||||
);
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
|
||||
@@ -896,25 +901,6 @@ impl ImageLayerWriterInner {
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let temp_path = self.path.clone();
|
||||
let result = self.finish0(ctx, end_key).await;
|
||||
if let Err(ref e) = result {
|
||||
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
|
||||
if let Err(e) = std::fs::remove_file(&temp_path) {
|
||||
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
async fn finish0(
|
||||
self,
|
||||
ctx: &RequestContext,
|
||||
end_key: Option<Key>,
|
||||
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
|
||||
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -932,7 +918,7 @@ impl ImageLayerWriterInner {
|
||||
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
|
||||
};
|
||||
|
||||
let mut file = self.blob_writer.into_inner();
|
||||
let mut file = self.blob_writer.into_inner(ctx).await?;
|
||||
|
||||
// Write out the index
|
||||
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
|
||||
@@ -1000,6 +986,10 @@ impl ImageLayerWriterInner {
|
||||
|
||||
trace!("created image layer {}", self.path);
|
||||
|
||||
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
|
||||
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
|
||||
file.disarm_into_inner();
|
||||
|
||||
Ok((desc, self.path))
|
||||
}
|
||||
}
|
||||
@@ -1125,14 +1115,6 @@ impl ImageLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ImageLayerWriter {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.inner.take() {
|
||||
inner.blob_writer.into_inner().remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ImageLayerIterator<'a> {
|
||||
image_layer: &'a ImageLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
|
||||
@@ -1,20 +1,21 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, bail};
|
||||
use pageserver_api::models::ShardImportStatus;
|
||||
use remote_storage::RemotePath;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info, info_span};
|
||||
use tracing::info;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::Timeline;
|
||||
use crate::context::RequestContext;
|
||||
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
|
||||
mod flow;
|
||||
mod importbucket_client;
|
||||
mod importbucket_format;
|
||||
pub(crate) mod index_part_format;
|
||||
pub(crate) mod upcall_api;
|
||||
|
||||
pub async fn doit(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -34,23 +35,6 @@ pub async fn doit(
|
||||
|
||||
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
|
||||
|
||||
info!("get spec early so we know we'll be able to upcall when done");
|
||||
let Some(spec) = storage.get_spec().await? else {
|
||||
bail!("spec not found")
|
||||
};
|
||||
|
||||
let upcall_client =
|
||||
upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
|
||||
|
||||
//
|
||||
// send an early progress update to clean up k8s job early and generate potentially useful logs
|
||||
//
|
||||
info!("send early progress update");
|
||||
upcall_client
|
||||
.send_progress_until_success(&spec)
|
||||
.instrument(info_span!("early_progress_update"))
|
||||
.await?;
|
||||
|
||||
let status_prefix = RemotePath::from_string("status").unwrap();
|
||||
|
||||
//
|
||||
@@ -176,7 +160,21 @@ pub async fn doit(
|
||||
|
||||
//
|
||||
// Communicate that shard is done.
|
||||
// Ensure at-least-once delivery of the upcall to storage controller
|
||||
// before we mark the task as done and never come here again.
|
||||
//
|
||||
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
|
||||
.expect("storcon configured");
|
||||
storcon_client
|
||||
.put_timeline_import_status(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
// TODO(vlad): What about import errors?
|
||||
ShardImportStatus::Done,
|
||||
)
|
||||
.await
|
||||
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
|
||||
|
||||
storage
|
||||
.put_json(
|
||||
&shard_status_key,
|
||||
@@ -186,16 +184,6 @@ pub async fn doit(
|
||||
.context("put shard status")?;
|
||||
}
|
||||
|
||||
//
|
||||
// Ensure at-least-once deliver of the upcall to cplane
|
||||
// before we mark the task as done and never come here again.
|
||||
//
|
||||
info!("send final progress update");
|
||||
upcall_client
|
||||
.send_progress_until_success(&spec)
|
||||
.instrument(info_span!("final_progress_update"))
|
||||
.await?;
|
||||
|
||||
//
|
||||
// Mark as done in index_part.
|
||||
// This makes subsequent timeline loads enter the normal load code path
|
||||
|
||||
@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, info, instrument};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use super::{importbucket_format, index_part_format};
|
||||
use super::index_part_format;
|
||||
use crate::assert_u64_eq_usize::U64IsUsize;
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
@@ -173,12 +173,6 @@ impl RemoteStorageWrapper {
|
||||
res
|
||||
}
|
||||
|
||||
pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
|
||||
self.get_json(&RemotePath::from_string("spec.json").unwrap())
|
||||
.await
|
||||
.context("get spec")
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
|
||||
pub async fn get_json<T: DeserializeOwned>(
|
||||
&self,
|
||||
@@ -244,7 +238,8 @@ impl RemoteStorageWrapper {
|
||||
kind: DownloadKind::Large,
|
||||
etag: None,
|
||||
byte_start: Bound::Included(start_inclusive),
|
||||
byte_end: Bound::Excluded(end_exclusive)
|
||||
byte_end: Bound::Excluded(end_exclusive),
|
||||
version_id: None,
|
||||
},
|
||||
&self.cancel)
|
||||
.await?;
|
||||
|
||||
@@ -11,10 +11,3 @@ pub struct ShardStatus {
|
||||
pub done: bool,
|
||||
// TODO: remaining fields
|
||||
}
|
||||
|
||||
// TODO: dedupe with fast_import code
|
||||
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Spec {
|
||||
pub project_id: String,
|
||||
pub branch_id: String,
|
||||
}
|
||||
|
||||
@@ -1,124 +0,0 @@
|
||||
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
|
||||
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
|
||||
use reqwest::{Certificate, Method};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
|
||||
use super::importbucket_format::Spec;
|
||||
use crate::config::PageServerConf;
|
||||
|
||||
pub struct Client {
|
||||
base_url: String,
|
||||
authorization_header: Option<String>,
|
||||
client: reqwest::Client,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct ImportProgressRequest {
|
||||
// no fields yet, not sure if there every will be any
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct ImportProgressResponse {
|
||||
// we don't care
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result<Self> {
|
||||
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
|
||||
anyhow::bail!("import_pgdata_upcall_api is not configured")
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
Ok(Self {
|
||||
base_url: base_url.to_string(),
|
||||
client: http_client,
|
||||
cancel,
|
||||
authorization_header: conf
|
||||
.import_pgdata_upcall_api_token
|
||||
.as_ref()
|
||||
.map(|secret_string| secret_string.get_contents())
|
||||
.map(|jwt| format!("Bearer {jwt}")),
|
||||
})
|
||||
}
|
||||
|
||||
fn start_request<U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
) -> reqwest::RequestBuilder {
|
||||
let req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value)
|
||||
} else {
|
||||
req
|
||||
}
|
||||
}
|
||||
|
||||
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.start_request(method, uri)
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> {
|
||||
let url = format!(
|
||||
"{}/projects/{}/branches/{}/import_progress",
|
||||
self.base_url, spec.project_id, spec.branch_id
|
||||
);
|
||||
let ImportProgressResponse {} = self
|
||||
.request(Method::POST, url, &ImportProgressRequest {})
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> {
|
||||
loop {
|
||||
match self.send_progress_once(spec).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")),
|
||||
Err(err) => {
|
||||
error!(?err, "error sending progress, retrying");
|
||||
if tokio::time::timeout(
|
||||
std::time::Duration::from_secs(10),
|
||||
self.cancel.cancelled(),
|
||||
)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
anyhow::bail!("cancelled while sending early progress update");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -25,29 +25,31 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
|
||||
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
|
||||
use owned_buffers_io::io_buf_ext::FullSlice;
|
||||
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
|
||||
|
||||
use self::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
use crate::assert_u64_eq_usize::UsizeIsU64;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
|
||||
use crate::page_cache::{PAGE_SZ, PageWriteGuard};
|
||||
pub(crate) mod io_engine;
|
||||
|
||||
pub(crate) use api::IoMode;
|
||||
pub(crate) use io_engine::IoEngineKind;
|
||||
pub use io_engine::{
|
||||
FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
|
||||
io_engine_for_bench,
|
||||
};
|
||||
mod metadata;
|
||||
mod open_options;
|
||||
pub(crate) use api::IoMode;
|
||||
pub(crate) use io_engine::IoEngineKind;
|
||||
pub(crate) use metadata::Metadata;
|
||||
pub(crate) use open_options::*;
|
||||
pub use pageserver_api::models::virtual_file as api;
|
||||
pub use temporary::TempVirtualFile;
|
||||
|
||||
use self::owned_buffers_io::write::OwnedAsyncWriter;
|
||||
|
||||
pub(crate) mod io_engine;
|
||||
mod metadata;
|
||||
mod open_options;
|
||||
mod temporary;
|
||||
pub(crate) mod owned_buffers_io {
|
||||
//! Abstractions for IO with owned buffers.
|
||||
//!
|
||||
@@ -1369,7 +1371,7 @@ pub(crate) type IoPageSlice<'a> =
|
||||
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
|
||||
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
|
||||
|
||||
pub(crate) fn set_io_mode(mode: IoMode) {
|
||||
pub fn set_io_mode(mode: IoMode) {
|
||||
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
mod flush;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub(crate) use flush::FlushControl;
|
||||
use flush::FlushHandle;
|
||||
@@ -41,7 +40,6 @@ pub trait OwnedAsyncWriter {
|
||||
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
|
||||
// since we would avoid copying majority of the data into the internal buffer.
|
||||
pub struct BufferedWriter<B: Buffer, W> {
|
||||
writer: Arc<W>,
|
||||
/// Clone of the buffer that was last submitted to the flush loop.
|
||||
/// `None` if no flush request has been submitted, Some forever after.
|
||||
pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
|
||||
@@ -72,7 +70,7 @@ where
|
||||
///
|
||||
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
|
||||
pub fn new(
|
||||
writer: Arc<W>,
|
||||
writer: W,
|
||||
buf_new: impl Fn() -> B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
cancel: CancellationToken,
|
||||
@@ -80,7 +78,6 @@ where
|
||||
flush_task_span: tracing::Span,
|
||||
) -> Self {
|
||||
Self {
|
||||
writer: writer.clone(),
|
||||
mutable: Some(buf_new()),
|
||||
maybe_flushed: None,
|
||||
flush_handle: FlushHandle::spawn_new(
|
||||
@@ -95,10 +92,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_inner(&self) -> &W {
|
||||
&self.writer
|
||||
}
|
||||
|
||||
/// Returns the number of bytes submitted to the background flush task.
|
||||
pub fn bytes_submitted(&self) -> u64 {
|
||||
self.bytes_submitted
|
||||
@@ -116,20 +109,16 @@ where
|
||||
}
|
||||
|
||||
#[cfg_attr(target_os = "macos", allow(dead_code))]
|
||||
pub async fn flush_and_into_inner(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u64, Arc<W>), FlushTaskError> {
|
||||
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
|
||||
self.flush(ctx).await?;
|
||||
|
||||
let Self {
|
||||
mutable: buf,
|
||||
maybe_flushed: _,
|
||||
writer,
|
||||
mut flush_handle,
|
||||
bytes_submitted: bytes_amount,
|
||||
} = self;
|
||||
flush_handle.shutdown().await?;
|
||||
let writer = flush_handle.shutdown().await?;
|
||||
assert!(buf.is_some());
|
||||
Ok((bytes_amount, writer))
|
||||
}
|
||||
@@ -329,7 +318,7 @@ mod tests {
|
||||
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
|
||||
let ctx = test_ctx();
|
||||
let ctx = &ctx;
|
||||
let recorder = Arc::new(RecorderWriter::default());
|
||||
let recorder = RecorderWriter::default();
|
||||
let gate = utils::sync::gate::Gate::default();
|
||||
let cancel = CancellationToken::new();
|
||||
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
|
||||
@@ -350,7 +339,7 @@ mod tests {
|
||||
writer.write_buffered_borrowed(b"j", ctx).await?;
|
||||
writer.write_buffered_borrowed(b"klmno", ctx).await?;
|
||||
|
||||
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
|
||||
let (_, recorder) = writer.shutdown(ctx).await?;
|
||||
assert_eq!(
|
||||
recorder.get_writes(),
|
||||
{
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info, info_span, warn};
|
||||
@@ -21,7 +20,7 @@ pub struct FlushHandleInner<Buf, W> {
|
||||
/// and receives recyled buffer.
|
||||
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
|
||||
/// Join handle for the background flush task.
|
||||
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
|
||||
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
|
||||
}
|
||||
|
||||
struct FlushRequest<Buf> {
|
||||
@@ -120,7 +119,7 @@ where
|
||||
/// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
|
||||
/// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
|
||||
pub fn spawn_new<B>(
|
||||
file: Arc<W>,
|
||||
file: W,
|
||||
buf: B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
cancel: CancellationToken,
|
||||
@@ -183,7 +182,7 @@ where
|
||||
}
|
||||
|
||||
/// Cleans up the channel, join the flush task.
|
||||
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
|
||||
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
|
||||
let handle = self
|
||||
.inner
|
||||
.take()
|
||||
@@ -207,7 +206,7 @@ pub struct FlushBackgroundTask<Buf, W> {
|
||||
/// and send back recycled buffer.
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
/// A writter for persisting data to disk.
|
||||
writer: Arc<W>,
|
||||
writer: W,
|
||||
ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
|
||||
@@ -228,7 +227,7 @@ where
|
||||
/// Creates a new background flush task.
|
||||
fn new(
|
||||
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
|
||||
file: Arc<W>,
|
||||
file: W,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
cancel: CancellationToken,
|
||||
ctx: RequestContext,
|
||||
@@ -243,7 +242,7 @@ where
|
||||
}
|
||||
|
||||
/// Runs the background flush task.
|
||||
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
|
||||
async fn run(mut self) -> Result<W, FlushTaskError> {
|
||||
// Exit condition: channel is closed and there is no remaining buffer to be flushed
|
||||
while let Some(request) = self.channel.recv().await {
|
||||
#[cfg(test)]
|
||||
|
||||
106
pageserver/src/virtual_file/temporary.rs
Normal file
106
pageserver/src/virtual_file/temporary.rs
Normal file
@@ -0,0 +1,106 @@
|
||||
use tracing::error;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
|
||||
use super::{
|
||||
MaybeFatalIo, VirtualFile,
|
||||
owned_buffers_io::{
|
||||
io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice, write::OwnedAsyncWriter,
|
||||
},
|
||||
};
|
||||
|
||||
/// A wrapper around [`super::VirtualFile`] that deletes the file on drop.
|
||||
/// For use as a [`OwnedAsyncWriter`] in [`super::owned_buffers_io::write::BufferedWriter`].
|
||||
#[derive(Debug)]
|
||||
pub struct TempVirtualFile {
|
||||
inner: Option<Inner>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
file: VirtualFile,
|
||||
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
|
||||
_gate_guard: GateGuard,
|
||||
}
|
||||
|
||||
impl OwnedAsyncWriter for TempVirtualFile {
|
||||
fn write_all_at<Buf: IoBufAligned + Send>(
|
||||
&self,
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
|
||||
VirtualFile::write_all_at(self, buf, offset, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TempVirtualFile {
|
||||
fn drop(&mut self) {
|
||||
let Some(Inner { file, _gate_guard }) = self.inner.take() else {
|
||||
return;
|
||||
};
|
||||
let path = file.path();
|
||||
if let Err(e) =
|
||||
std::fs::remove_file(path).maybe_fatal_err("failed to remove the virtual file")
|
||||
{
|
||||
error!(err=%e, path=%path, "failed to remove");
|
||||
}
|
||||
drop(_gate_guard);
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for TempVirtualFile {
|
||||
type Target = VirtualFile;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self
|
||||
.inner
|
||||
.as_ref()
|
||||
.expect("only None after into_inner or drop")
|
||||
.file
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for TempVirtualFile {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self
|
||||
.inner
|
||||
.as_mut()
|
||||
.expect("only None after into_inner or drop")
|
||||
.file
|
||||
}
|
||||
}
|
||||
|
||||
impl TempVirtualFile {
|
||||
/// The caller is responsible for ensuring that the path of `virtual_file` is not reused
|
||||
/// until after this TempVirtualFile's `Drop` impl has completed.
|
||||
/// Failure to do so will result in unlinking of the reused path by the original instance's Drop impl.
|
||||
/// The best way to do so is by using a monotonic counter as a disambiguator.
|
||||
/// TODO: centralize this disambiguator pattern inside this struct.
|
||||
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
|
||||
pub fn new(virtual_file: VirtualFile, gate_guard: GateGuard) -> Self {
|
||||
Self {
|
||||
inner: Some(Inner {
|
||||
file: virtual_file,
|
||||
_gate_guard: gate_guard,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Dismantle this wrapper and return the underlying [`VirtualFile`].
|
||||
/// This disables auto-unlinking functionality that is the essence of this wrapper.
|
||||
///
|
||||
/// The gate guard is dropped as well; it is the callers responsibility to ensure filesystem
|
||||
/// operations after calls to this functions are still gated by some other gate guard.
|
||||
///
|
||||
/// TODO:
|
||||
/// - centralize the common usage pattern of callers (sync_all(self), rename(self, dst), sync_all(dst.parent))
|
||||
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
|
||||
pub fn disarm_into_inner(mut self) -> VirtualFile {
|
||||
self.inner
|
||||
.take()
|
||||
.expect("only None after into_inner or drop, and we are into_inner, and we consume")
|
||||
.file
|
||||
}
|
||||
}
|
||||
@@ -81,23 +81,6 @@ static int neon_compute_mode = 0;
|
||||
static int max_reconnect_attempts = 60;
|
||||
static int stripe_size;
|
||||
|
||||
static char *pageserver_sslcert = NULL;
|
||||
static char *pageserver_sslcertmode = NULL;
|
||||
static char *pageserver_sslcompression = NULL;
|
||||
static char *pageserver_sslcrl = NULL;
|
||||
static char *pageserver_sslcrldir = NULL;
|
||||
static char *pageserver_sslkey = NULL;
|
||||
static char *pageserver_sslmode = NULL;
|
||||
static char *pageserver_sslpassword = NULL;
|
||||
static char *pageserver_sslrootcert = NULL;
|
||||
static char *pageserver_sslsni = NULL;
|
||||
static char *pageserver_ssl_min_protocol_version = NULL;
|
||||
static char *pageserver_ssl_max_protocol_version = NULL;
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static char *pageserver_sslnegotiation = NULL;
|
||||
#endif
|
||||
|
||||
static int pageserver_response_log_timeout = 10000;
|
||||
/* 2.5 minutes. A bit higher than highest default TCP retransmission timeout */
|
||||
static int pageserver_response_disconnect_timeout = 150000;
|
||||
@@ -144,7 +127,7 @@ static uint64 pagestore_local_counter = 0;
|
||||
typedef enum PSConnectionState {
|
||||
PS_Disconnected, /* no connection yet */
|
||||
PS_Connecting_Startup, /* connection starting up */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connecting_PageStream, /* negotiating pagestream */
|
||||
PS_Connected, /* connected, pagestream established */
|
||||
} PSConnectionState;
|
||||
|
||||
@@ -379,7 +362,7 @@ get_shard_number(BufferTag *tag)
|
||||
}
|
||||
|
||||
static inline void
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
{
|
||||
if (shard->wes_read)
|
||||
{
|
||||
@@ -401,7 +384,7 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
|
||||
* complete the connection (e.g. due to receiving an earlier cancellation
|
||||
* during connection start).
|
||||
* Returns true if successfully connected; false if the connection failed.
|
||||
*
|
||||
*
|
||||
* Throws errors in unrecoverable situations, or when this backend's query
|
||||
* is canceled.
|
||||
*/
|
||||
@@ -424,8 +407,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
{
|
||||
case PS_Disconnected:
|
||||
{
|
||||
const char *keywords[17];
|
||||
const char *values[17];
|
||||
const char *keywords[5];
|
||||
const char *values[5];
|
||||
char pid_str[16] = { 0 };
|
||||
char endpoint_str[36] = { 0 };
|
||||
int n_pgsql_params;
|
||||
@@ -499,92 +482,6 @@ pageserver_connect(shardno_t shard_no, int elevel)
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslcertmode)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslcertmode";
|
||||
values[n_pgsql_params] = pageserver_sslcertmode;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslcompression)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslcompression";
|
||||
values[n_pgsql_params] = pageserver_sslcompression;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslcrl)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslcrl";
|
||||
values[n_pgsql_params] = pageserver_sslcrl;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslcrldir)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslcrldir";
|
||||
values[n_pgsql_params] = pageserver_sslcrldir;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslkey)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslkey";
|
||||
values[n_pgsql_params] = pageserver_sslkey;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslmode)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslmode";
|
||||
values[n_pgsql_params] = pageserver_sslmode;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
if (pageserver_sslnegotiation)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslnegotiation";
|
||||
values[n_pgsql_params] = pageserver_sslnegotiation;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pageserver_sslpassword)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslpassword";
|
||||
values[n_pgsql_params] = pageserver_sslpassword;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslrootcert)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslrootcert";
|
||||
values[n_pgsql_params] = pageserver_sslrootcert;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_sslsni)
|
||||
{
|
||||
keywords[n_pgsql_params] = "sslsni";
|
||||
values[n_pgsql_params] = pageserver_sslsni;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_ssl_max_protocol_version)
|
||||
{
|
||||
keywords[n_pgsql_params] = "ssl_max_protocol_version";
|
||||
values[n_pgsql_params] = pageserver_ssl_max_protocol_version;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
if (pageserver_ssl_min_protocol_version)
|
||||
{
|
||||
keywords[n_pgsql_params] = "ssl_min_protocol_version";
|
||||
values[n_pgsql_params] = pageserver_ssl_min_protocol_version;
|
||||
n_pgsql_params++;
|
||||
}
|
||||
|
||||
{
|
||||
bool param_set = false;
|
||||
switch (neon_compute_mode)
|
||||
@@ -1580,125 +1477,6 @@ pg_init_libpagestore(void)
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslcert",
|
||||
"SSL certificate path",
|
||||
"Refer to the Postgres documentation on libpq's sslcert keyword.",
|
||||
&pageserver_sslcert,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslcertmode",
|
||||
"SSL certificate mode",
|
||||
"Refer to the Postgres documentation on libpq's sslcertmode keyword.",
|
||||
&pageserver_sslcertmode,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslcrl",
|
||||
"Path to the SSL server certificate revocation list",
|
||||
"Refer to the Postgres documentation on libpq's sslcrl keyword.",
|
||||
&pageserver_sslcrl,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslcrldir",
|
||||
"Path to the directory of the SSL server certificate revocation list",
|
||||
"Refer to the Postgres documentation on libpq's sslcrldir keyword.",
|
||||
&pageserver_sslcrldir,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslcompression",
|
||||
"SSL compression",
|
||||
"Refer to the Postgres documentation on libpq's sslcompression keyword.",
|
||||
&pageserver_sslcompression,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslkey",
|
||||
"SSL key",
|
||||
"Refer to the Postgres documentation on libpq's sslkey keyword.",
|
||||
&pageserver_sslkey,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslmode",
|
||||
"SSL mode",
|
||||
"Refer to the Postgres documentation on libpq's sslmode keyword.",
|
||||
&pageserver_sslmode,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslnegotiation",
|
||||
"SSL negotiation",
|
||||
"Refer to the Postgres documentation on libpq's sslnegotiation keyword.",
|
||||
&pageserver_sslnegotiation,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
#endif
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslpassword",
|
||||
"SSL passphrase",
|
||||
"Refer to the Postgres documentation on libpq's sslpassword keyword.",
|
||||
&pageserver_sslpassword,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslrootcert",
|
||||
"SSL root certificate",
|
||||
"Refer to the Postgres documentation on libpq's sslrootcert keyword.",
|
||||
&pageserver_sslrootcert,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_sslsni",
|
||||
"TLS SNI extension",
|
||||
"Refer to the Postgres documentation on libpq's sslsni keyword.",
|
||||
&pageserver_sslsni,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_ssl_max_protocol_version",
|
||||
"SSL maxiumum protocol version",
|
||||
"Refer to the Postgres documentation on libpq's ssl_max_protocol_version keyword.",
|
||||
&pageserver_ssl_max_protocol_version,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.pageserver_ssl_min_protocol_version",
|
||||
"SSL minimum protocol version",
|
||||
"Refer to the Postgres documentation on libpq's ssl_min_protocol_version keyword.",
|
||||
&pageserver_ssl_min_protocol_version,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
|
||||
@@ -64,22 +64,6 @@ char *wal_acceptors_list = "";
|
||||
int wal_acceptor_reconnect_timeout = 1000;
|
||||
int wal_acceptor_connection_timeout = 10000;
|
||||
int safekeeper_proto_version = 2;
|
||||
static char *safekeeper_sslcert = NULL;
|
||||
static char *safekeeper_sslcertmode = NULL;
|
||||
static char *safekeeper_sslcompression = NULL;
|
||||
static char *safekeeper_sslcrl = NULL;
|
||||
static char *safekeeper_sslcrldir = NULL;
|
||||
static char *safekeeper_sslkey = NULL;
|
||||
static char *safekeeper_sslmode = NULL;
|
||||
static char *safekeeper_sslpassword = NULL;
|
||||
static char *safekeeper_sslrootcert = NULL;
|
||||
static char *safekeeper_sslsni = NULL;
|
||||
static char *safekeeper_ssl_min_protocol_version = NULL;
|
||||
static char *safekeeper_ssl_max_protocol_version = NULL;
|
||||
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static char *safekeeper_sslnegotiation = NULL;
|
||||
#endif
|
||||
|
||||
/* Set to true in the walproposer bgw. */
|
||||
static bool am_walproposer;
|
||||
@@ -248,125 +232,6 @@ nwp_register_gucs(void)
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslcert",
|
||||
"SSL certificate path",
|
||||
"Refer to the Postgres documentation on libpq's sslcert keyword.",
|
||||
&safekeeper_sslcert,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslcertmode",
|
||||
"SSL certificate mode",
|
||||
"Refer to the Postgres documentation on libpq's sslcertmode keyword.",
|
||||
&safekeeper_sslcertmode,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslcrl",
|
||||
"Path to the SSL server certificate revocation list",
|
||||
"Refer to the Postgres documentation on libpq's sslcrl keyword.",
|
||||
&safekeeper_sslcrl,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslcrldir",
|
||||
"Path to the directory of the SSL server certificate revocation list",
|
||||
"Refer to the Postgres documentation on libpq's sslcrldir keyword.",
|
||||
&safekeeper_sslcrldir,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslcompression",
|
||||
"SSL compression",
|
||||
"Refer to the Postgres documentation on libpq's sslcompression keyword.",
|
||||
&safekeeper_sslcompression,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslkey",
|
||||
"SSL key",
|
||||
"Refer to the Postgres documentation on libpq's sslkey keyword.",
|
||||
&safekeeper_sslkey,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslmode",
|
||||
"SSL mode",
|
||||
"Refer to the Postgres documentation on libpq's sslmode keyword.",
|
||||
&safekeeper_sslmode,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslnegotiation",
|
||||
"SSL negotiation",
|
||||
"Refer to the Postgres documentation on libpq's sslnegotiation keyword.",
|
||||
&safekeeper_sslnegotiation,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
#endif
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslpassword",
|
||||
"SSL passphrase",
|
||||
"Refer to the Postgres documentation on libpq's sslpassword keyword.",
|
||||
&safekeeper_sslpassword,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslrootcert",
|
||||
"SSL root certificate",
|
||||
"Refer to the Postgres documentation on libpq's sslrootcert keyword.",
|
||||
&safekeeper_sslrootcert,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_sslsni",
|
||||
"TLS SNI extension",
|
||||
"Refer to the Postgres documentation on libpq's sslsni keyword.",
|
||||
&safekeeper_sslsni,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_ssl_max_protocol_version",
|
||||
"SSL maxiumum protocol version",
|
||||
"Refer to the Postgres documentation on libpq's ssl_max_protocol_version keyword.",
|
||||
&safekeeper_ssl_max_protocol_version,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomStringVariable(
|
||||
"neon.safekeeper_ssl_min_protocol_version",
|
||||
"SSL minimum protocol version",
|
||||
"Refer to the Postgres documentation on libpq's ssl_min_protocol_version keyword.",
|
||||
&safekeeper_ssl_min_protocol_version,
|
||||
NULL,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
|
||||
@@ -978,13 +843,15 @@ walprop_status(Safekeeper *sk)
|
||||
WalProposerConn *
|
||||
libpqwp_connect_start(char *conninfo)
|
||||
{
|
||||
|
||||
PGconn *pg_conn;
|
||||
WalProposerConn *conn;
|
||||
const char *keywords[16];
|
||||
const char *values[16];
|
||||
const char *keywords[3];
|
||||
const char *values[3];
|
||||
int n;
|
||||
char *password = neon_auth_token;
|
||||
|
||||
|
||||
/*
|
||||
* Connect using the given connection string. If the NEON_AUTH_TOKEN
|
||||
* environment variable was set, use that as the password.
|
||||
@@ -1004,90 +871,9 @@ libpqwp_connect_start(char *conninfo)
|
||||
keywords[n] = "dbname";
|
||||
values[n] = conninfo;
|
||||
n++;
|
||||
if (safekeeper_sslcert)
|
||||
{
|
||||
keywords[n] = "sslcert";
|
||||
values[n] = safekeeper_sslcert;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslcertmode)
|
||||
{
|
||||
keywords[n] = "sslcertmode";
|
||||
values[n] = safekeeper_sslcertmode;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslcompression)
|
||||
{
|
||||
keywords[n] = "sslcompression";
|
||||
values[n] = safekeeper_sslcompression;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslcrl)
|
||||
{
|
||||
keywords[n] = "sslcrl";
|
||||
values[n] = safekeeper_sslcrl;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslcrldir)
|
||||
{
|
||||
keywords[n] = "sslcrldir";
|
||||
values[n] = safekeeper_sslcrldir;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslkey)
|
||||
{
|
||||
keywords[n] = "sslkey";
|
||||
values[n] = safekeeper_sslkey;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslmode)
|
||||
{
|
||||
keywords[n] = "sslmode";
|
||||
values[n] = safekeeper_sslmode;
|
||||
n++;
|
||||
}
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
if (safekeeper_sslnegotiation)
|
||||
{
|
||||
keywords[n] = "sslnegotiation";
|
||||
values[n] = safekeeper_sslnegotiation;
|
||||
n++;
|
||||
}
|
||||
#endif
|
||||
if (safekeeper_sslpassword)
|
||||
{
|
||||
keywords[n] = "sslpassword";
|
||||
values[n] = safekeeper_sslpassword;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslrootcert)
|
||||
{
|
||||
keywords[n] = "sslrootcert";
|
||||
values[n] = safekeeper_sslrootcert;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_sslsni)
|
||||
{
|
||||
keywords[n] = "sslsni";
|
||||
values[n] = safekeeper_sslsni;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_ssl_max_protocol_version)
|
||||
{
|
||||
keywords[n] = "ssl_max_protocol_version";
|
||||
values[n] = safekeeper_ssl_max_protocol_version;
|
||||
n++;
|
||||
}
|
||||
if (safekeeper_ssl_min_protocol_version)
|
||||
{
|
||||
keywords[n] = "ssl_min_protocol_version";
|
||||
values[n] = safekeeper_ssl_min_protocol_version;
|
||||
n++;
|
||||
}
|
||||
keywords[n] = NULL;
|
||||
values[n] = NULL;
|
||||
n++;
|
||||
|
||||
pg_conn = PQconnectStartParams(keywords, values, 1);
|
||||
|
||||
/*
|
||||
|
||||
@@ -12,7 +12,7 @@ use pin_project_lite::pin_project;
|
||||
use smol_str::SmolStr;
|
||||
use strum_macros::FromRepr;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
|
||||
use zerocopy::{FromBytes, FromZeroes};
|
||||
use zerocopy::{FromBytes, Immutable, KnownLayout, Unaligned, network_endian};
|
||||
|
||||
pin_project! {
|
||||
/// A chained [`AsyncRead`] with [`AsyncWrite`] passthrough
|
||||
@@ -339,49 +339,49 @@ trait BufExt: Sized {
|
||||
}
|
||||
impl BufExt for BytesMut {
|
||||
fn try_get<T: FromBytes>(&mut self) -> Option<T> {
|
||||
let res = T::read_from_prefix(self)?;
|
||||
let (res, _) = T::read_from_prefix(self).ok()?;
|
||||
self.advance(size_of::<T>());
|
||||
Some(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
|
||||
#[repr(C, packed)]
|
||||
struct ProxyProtocolV2Header {
|
||||
signature: [u8; 12],
|
||||
version_and_command: u8,
|
||||
protocol_and_family: u8,
|
||||
len: zerocopy::byteorder::network_endian::U16,
|
||||
len: network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
|
||||
#[repr(C, packed)]
|
||||
struct ProxyProtocolV2HeaderV4 {
|
||||
src_addr: NetworkEndianIpv4,
|
||||
dst_addr: NetworkEndianIpv4,
|
||||
src_port: zerocopy::byteorder::network_endian::U16,
|
||||
dst_port: zerocopy::byteorder::network_endian::U16,
|
||||
src_port: network_endian::U16,
|
||||
dst_port: network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
|
||||
#[repr(C, packed)]
|
||||
struct ProxyProtocolV2HeaderV6 {
|
||||
src_addr: NetworkEndianIpv6,
|
||||
dst_addr: NetworkEndianIpv6,
|
||||
src_port: zerocopy::byteorder::network_endian::U16,
|
||||
dst_port: zerocopy::byteorder::network_endian::U16,
|
||||
src_port: network_endian::U16,
|
||||
dst_port: network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
|
||||
#[repr(C, packed)]
|
||||
struct TlvHeader {
|
||||
kind: u8,
|
||||
len: zerocopy::byteorder::network_endian::U16,
|
||||
len: network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
|
||||
#[repr(transparent)]
|
||||
struct NetworkEndianIpv4(zerocopy::byteorder::network_endian::U32);
|
||||
struct NetworkEndianIpv4(network_endian::U32);
|
||||
impl NetworkEndianIpv4 {
|
||||
#[inline]
|
||||
fn get(self) -> Ipv4Addr {
|
||||
@@ -389,9 +389,9 @@ impl NetworkEndianIpv4 {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
|
||||
#[repr(transparent)]
|
||||
struct NetworkEndianIpv6(zerocopy::byteorder::network_endian::U128);
|
||||
struct NetworkEndianIpv6(network_endian::U128);
|
||||
impl NetworkEndianIpv6 {
|
||||
#[inline]
|
||||
fn get(self) -> Ipv6Addr {
|
||||
|
||||
@@ -226,11 +226,16 @@ struct Args {
|
||||
/// Path to the JWT auth token used to authenticate with other safekeepers.
|
||||
#[arg(long)]
|
||||
auth_token_path: Option<Utf8PathBuf>,
|
||||
|
||||
/// Enable TLS in WAL service API.
|
||||
/// Does not force TLS: the client negotiates TLS usage during the handshake.
|
||||
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
|
||||
#[arg(long)]
|
||||
enable_tls_wal_service_api: bool,
|
||||
|
||||
/// Run in development mode (disables security checks)
|
||||
#[arg(long, help = "Run in development mode (disables security checks)")]
|
||||
dev: bool,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE timeline_imports;
|
||||
@@ -0,0 +1,6 @@
|
||||
CREATE TABLE timeline_imports (
|
||||
tenant_id VARCHAR NOT NULL,
|
||||
timeline_id VARCHAR NOT NULL,
|
||||
shard_statuses JSONB NOT NULL,
|
||||
PRIMARY KEY(tenant_id, timeline_id)
|
||||
);
|
||||
@@ -30,7 +30,9 @@ use pageserver_api::models::{
|
||||
TimelineArchivalConfigRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
|
||||
use pageserver_api::upcall_api::{
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest,
|
||||
};
|
||||
use pageserver_client::{BlockUnblock, mgmt_api};
|
||||
use routerify::Middleware;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -154,6 +156,28 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
|
||||
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
|
||||
}
|
||||
|
||||
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::GenerationsApi)?;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
|
||||
|
||||
let state = get_state(&req);
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
state
|
||||
.service
|
||||
.handle_timeline_shard_import_progress_upcall(put_req)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
|
||||
/// (in the real control plane this is unnecessary, because the same program is managing
|
||||
/// generation numbers and doing attachments).
|
||||
@@ -1961,6 +1985,13 @@ pub fn make_router(
|
||||
.post("/upcall/v1/validate", |r| {
|
||||
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
|
||||
})
|
||||
.post("/upcall/v1/timeline_import_status", |r| {
|
||||
named_request_span(
|
||||
r,
|
||||
handle_put_timeline_import_status,
|
||||
RequestName("upcall_v1_timeline_import_status"),
|
||||
)
|
||||
})
|
||||
// Test/dev/debug endpoints
|
||||
.post("/debug/v1/attach-hook", |r| {
|
||||
named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
|
||||
|
||||
@@ -23,6 +23,7 @@ mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
mod tenant_shard;
|
||||
mod timeline_import;
|
||||
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
|
||||
struct Sequence(u64);
|
||||
|
||||
@@ -212,6 +212,21 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn timeline_detail(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineInfo> {
|
||||
measured_request!(
|
||||
"timeline_detail",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.timeline_detail(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_shard_split(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::models::TenantConfig;
|
||||
use pageserver_api::models::{ShardImportStatus, TenantConfig};
|
||||
use pageserver_api::shard::{
|
||||
ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
|
||||
};
|
||||
@@ -40,6 +40,9 @@ use crate::metrics::{
|
||||
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
|
||||
};
|
||||
use crate::node::Node;
|
||||
use crate::timeline_import::{
|
||||
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
|
||||
};
|
||||
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
|
||||
|
||||
/// ## What do we store?
|
||||
@@ -127,6 +130,9 @@ pub(crate) enum DatabaseOperation {
|
||||
RemoveTimelineReconcile,
|
||||
ListTimelineReconcile,
|
||||
ListTimelineReconcileStartup,
|
||||
InsertTimelineImport,
|
||||
UpdateTimelineImport,
|
||||
DeleteTimelineImport,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -1614,6 +1620,129 @@ impl Persistence {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn insert_timeline_import(
|
||||
&self,
|
||||
import: TimelineImportPersistence,
|
||||
) -> DatabaseResult<bool> {
|
||||
self.with_measured_conn(DatabaseOperation::InsertTimelineImport, move |conn| {
|
||||
Box::pin({
|
||||
let import = import.clone();
|
||||
async move {
|
||||
let inserted = diesel::insert_into(crate::schema::timeline_imports::table)
|
||||
.values(import)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
Ok(inserted == 1)
|
||||
}
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline_import(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timeline_imports::dsl;
|
||||
|
||||
self.with_measured_conn(DatabaseOperation::DeleteTimelineImport, move |conn| {
|
||||
Box::pin(async move {
|
||||
diesel::delete(crate::schema::timeline_imports::table)
|
||||
.filter(
|
||||
dsl::tenant_id
|
||||
.eq(tenant_id.to_string())
|
||||
.and(dsl::timeline_id.eq(timeline_id.to_string())),
|
||||
)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Idempotently update the status of one shard for an ongoing timeline import
|
||||
///
|
||||
/// If the update was persisted to the database, then the current state of the
|
||||
/// import is returned to the caller. In case of logical errors a bespoke
|
||||
/// [`TimelineImportUpdateError`] instance is returned. Other database errors
|
||||
/// are covered by the outer [`DatabaseError`].
|
||||
pub(crate) async fn update_timeline_import(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
shard_status: ShardImportStatus,
|
||||
) -> DatabaseResult<Result<Option<TimelineImport>, TimelineImportUpdateError>> {
|
||||
use crate::schema::timeline_imports::dsl;
|
||||
|
||||
self.with_measured_conn(DatabaseOperation::UpdateTimelineImport, move |conn| {
|
||||
Box::pin({
|
||||
let shard_status = shard_status.clone();
|
||||
async move {
|
||||
// Load the current state from the database
|
||||
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
|
||||
.filter(
|
||||
dsl::tenant_id
|
||||
.eq(tenant_shard_id.tenant_id.to_string())
|
||||
.and(dsl::timeline_id.eq(timeline_id.to_string())),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
|
||||
assert!(from_db.len() <= 1);
|
||||
|
||||
let mut status = match from_db.pop() {
|
||||
Some(some) => TimelineImport::from_persistent(some).unwrap(),
|
||||
None => {
|
||||
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
|
||||
tenant_id: tenant_shard_id.tenant_id,
|
||||
timeline_id,
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
// Perform the update in-memory
|
||||
let follow_up = match status.update(tenant_shard_id.to_index(), shard_status) {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return Ok(Err(err));
|
||||
}
|
||||
};
|
||||
|
||||
let new_persistent = status.to_persistent();
|
||||
|
||||
// Write back if required (in the same transaction)
|
||||
match follow_up {
|
||||
TimelineImportUpdateFollowUp::Persist => {
|
||||
let updated = diesel::update(dsl::timeline_imports)
|
||||
.filter(
|
||||
dsl::tenant_id
|
||||
.eq(tenant_shard_id.tenant_id.to_string())
|
||||
.and(dsl::timeline_id.eq(timeline_id.to_string())),
|
||||
)
|
||||
.set(dsl::shard_statuses.eq(new_persistent.shard_statuses))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if updated != 1 {
|
||||
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
|
||||
tenant_id: tenant_shard_id.tenant_id,
|
||||
timeline_id,
|
||||
}));
|
||||
}
|
||||
|
||||
Ok(Ok(Some(status)))
|
||||
}
|
||||
TimelineImportUpdateFollowUp::None => Ok(Ok(None)),
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
@@ -2171,3 +2300,11 @@ impl ToSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Clone)]
|
||||
#[diesel(table_name = crate::schema::timeline_imports)]
|
||||
pub(crate) struct TimelineImportPersistence {
|
||||
pub(crate) tenant_id: String,
|
||||
pub(crate) timeline_id: String,
|
||||
pub(crate) shard_statuses: serde_json::Value,
|
||||
}
|
||||
|
||||
@@ -76,6 +76,14 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
timeline_imports (tenant_id, timeline_id) {
|
||||
tenant_id -> Varchar,
|
||||
timeline_id -> Varchar,
|
||||
shard_statuses -> Jsonb,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
use diesel::sql_types::*;
|
||||
use super::sql_types::PgLsn;
|
||||
@@ -99,5 +107,6 @@ diesel::allow_tables_to_appear_in_same_query!(
|
||||
safekeeper_timeline_pending_ops,
|
||||
safekeepers,
|
||||
tenant_shards,
|
||||
timeline_imports,
|
||||
timelines,
|
||||
);
|
||||
|
||||
@@ -40,14 +40,14 @@ use pageserver_api::models::{
|
||||
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
|
||||
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
|
||||
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
|
||||
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
|
||||
TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
|
||||
};
|
||||
use pageserver_api::shard::{
|
||||
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
|
||||
};
|
||||
use pageserver_api::upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
|
||||
ValidateResponseTenant,
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
|
||||
ValidateRequest, ValidateResponse, ValidateResponseTenant,
|
||||
};
|
||||
use pageserver_client::{BlockUnblock, mgmt_api};
|
||||
use reqwest::{Certificate, StatusCode};
|
||||
@@ -97,6 +97,7 @@ use crate::tenant_shard::{
|
||||
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
|
||||
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
|
||||
};
|
||||
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
|
||||
|
||||
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
|
||||
@@ -3732,11 +3733,14 @@ impl Service {
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineCreateResponseStorcon, ApiError> {
|
||||
let safekeepers = self.config.timelines_onto_safekeepers;
|
||||
let timeline_id = create_req.new_timeline_id;
|
||||
|
||||
tracing::info!(
|
||||
mode=%create_req.mode_tag(),
|
||||
%safekeepers,
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
@@ -3746,15 +3750,62 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let create_mode = create_req.mode.clone();
|
||||
let is_import = create_req.is_import();
|
||||
|
||||
let timeline_info = self
|
||||
.tenant_timeline_create_pageservers(tenant_id, create_req)
|
||||
.await?;
|
||||
|
||||
let safekeepers = if safekeepers {
|
||||
let selected_safekeepers = if is_import {
|
||||
let shards = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.map(|(ts_id, _)| ts_id.to_index())
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
if !shards
|
||||
.iter()
|
||||
.map(|shard_index| shard_index.shard_count)
|
||||
.all_equal()
|
||||
{
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Inconsistent shard count"
|
||||
)));
|
||||
}
|
||||
|
||||
let import = TimelineImport {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard_statuses: ShardImportStatuses::new(shards),
|
||||
};
|
||||
|
||||
let inserted = self
|
||||
.persistence
|
||||
.insert_timeline_import(import.to_persistent())
|
||||
.await
|
||||
.context("timeline import insert")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
match inserted {
|
||||
true => {
|
||||
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
|
||||
}
|
||||
false => {
|
||||
tracing::info!(%tenant_id, %timeline_id, "Timeline import entry already present");
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
} else if safekeepers {
|
||||
// Note that we do not support creating the timeline on the safekeepers
|
||||
// for imported timelines. The `start_lsn` of the timeline is not known
|
||||
// until the import finshes.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
|
||||
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
|
||||
.await?;
|
||||
Some(res)
|
||||
@@ -3764,10 +3815,168 @@ impl Service {
|
||||
|
||||
Ok(TimelineCreateResponseStorcon {
|
||||
timeline_info,
|
||||
safekeepers,
|
||||
safekeepers: selected_safekeepers,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
|
||||
self: &Arc<Self>,
|
||||
req: PutTimelineImportStatusRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
let res = self
|
||||
.persistence
|
||||
.update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status)
|
||||
.await;
|
||||
let timeline_import = match res {
|
||||
Ok(Ok(Some(timeline_import))) => timeline_import,
|
||||
Ok(Ok(None)) => {
|
||||
// Idempotency: we've already seen and handled this update.
|
||||
return Ok(());
|
||||
}
|
||||
Ok(Err(logical_err)) => {
|
||||
return Err(logical_err.into());
|
||||
}
|
||||
Err(db_err) => {
|
||||
return Err(db_err.into());
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
tenant_id=%req.tenant_shard_id.tenant_id,
|
||||
timeline_id=%req.timeline_id,
|
||||
shard_id=%req.tenant_shard_id.shard_slug(),
|
||||
"Updated timeline import status to: {timeline_import:?}");
|
||||
|
||||
if timeline_import.is_complete() {
|
||||
tokio::task::spawn({
|
||||
let this = self.clone();
|
||||
async move { this.finalize_timeline_import(timeline_import).await }
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%import.tenant_id,
|
||||
shard_id=%import.timeline_id,
|
||||
))]
|
||||
async fn finalize_timeline_import(
|
||||
self: &Arc<Self>,
|
||||
import: TimelineImport,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO(vlad): On start-up, load up the imports and notify cplane of the
|
||||
// ones that have been completed. This assumes the new cplane API will
|
||||
// be idempotent. If that's not possible, bang a flag in the database.
|
||||
// https://github.com/neondatabase/neon/issues/11570
|
||||
|
||||
tracing::info!("Finalizing timeline import");
|
||||
|
||||
let import_failed = import.completion_error().is_some();
|
||||
|
||||
if !import_failed {
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
}
|
||||
|
||||
let active = self.timeline_active_on_all_shards(&import).await?;
|
||||
|
||||
match active {
|
||||
true => {
|
||||
tracing::info!("Timeline became active on all shards");
|
||||
break;
|
||||
}
|
||||
false => {
|
||||
tracing::info!("Timeline not active on all shards yet");
|
||||
|
||||
tokio::select! {
|
||||
_ = self.cancel.cancelled() => {
|
||||
anyhow::bail!("Shut down requested while finalizing import");
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(%import_failed, "Notifying cplane of import completion");
|
||||
|
||||
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
|
||||
client.notify_import_complete(&import).await?;
|
||||
|
||||
if let Err(err) = self
|
||||
.persistence
|
||||
.delete_timeline_import(import.tenant_id, import.timeline_id)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to delete timeline import entry from database: {err}");
|
||||
}
|
||||
|
||||
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
|
||||
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
|
||||
// https://github.com/neondatabase/neon/issues/11569
|
||||
tracing::info!(%import_failed, "Timeline import complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn timeline_active_on_all_shards(
|
||||
self: &Arc<Self>,
|
||||
import: &TimelineImport,
|
||||
) -> anyhow::Result<bool> {
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(import.tenant_id))
|
||||
{
|
||||
if !import
|
||||
.shard_statuses
|
||||
.0
|
||||
.contains_key(&tenant_shard_id.to_index())
|
||||
{
|
||||
anyhow::bail!("Shard layout change detected on completion");
|
||||
}
|
||||
|
||||
if let Some(node_id) = shard.intent.get_attached() {
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
} else {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
targets
|
||||
};
|
||||
|
||||
let results = self
|
||||
.tenant_for_shards_api(
|
||||
targets,
|
||||
|tenant_shard_id, client| async move {
|
||||
client
|
||||
.timeline_detail(tenant_shard_id, import.timeline_id)
|
||||
.await
|
||||
},
|
||||
1,
|
||||
1,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(results.into_iter().all(|res| match res {
|
||||
Ok(info) => info.state == TimelineState::Active,
|
||||
Err(_) => false,
|
||||
}))
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -15,7 +15,7 @@ use http_utils::error::ApiError;
|
||||
use pageserver_api::controller_api::{
|
||||
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
|
||||
};
|
||||
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::membership::{MemberSet, SafekeeperId};
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -207,7 +207,6 @@ impl Service {
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: &TimelineInfo,
|
||||
create_mode: models::TimelineCreateRequestMode,
|
||||
) -> Result<SafekeepersInfo, ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version * 10000;
|
||||
@@ -217,15 +216,8 @@ impl Service {
|
||||
// previously existed as on retries in theory endpoint might have
|
||||
// already written some data and advanced last_record_lsn, while we want
|
||||
// safekeepers to have consistent start_lsn.
|
||||
let start_lsn = match create_mode {
|
||||
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
|
||||
)))?;
|
||||
}
|
||||
};
|
||||
let start_lsn = timeline_info.last_record_lsn;
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
|
||||
|
||||
260
storage_controller/src/timeline_import.rs
Normal file
260
storage_controller/src/timeline_import.rs
Normal file
@@ -0,0 +1,260 @@
|
||||
use std::time::Duration;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use http_utils::error::ApiError;
|
||||
use reqwest::Method;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use pageserver_api::models::ShardImportStatus;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
shard::ShardIndex,
|
||||
};
|
||||
|
||||
use crate::{persistence::TimelineImportPersistence, service::Config};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
|
||||
|
||||
impl ShardImportStatuses {
|
||||
pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
|
||||
ShardImportStatuses(
|
||||
shards
|
||||
.into_iter()
|
||||
.map(|ts_id| (ts_id, ShardImportStatus::InProgress))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TimelineImport {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
pub(crate) shard_statuses: ShardImportStatuses,
|
||||
}
|
||||
|
||||
pub(crate) enum TimelineImportUpdateFollowUp {
|
||||
Persist,
|
||||
None,
|
||||
}
|
||||
|
||||
pub(crate) enum TimelineImportUpdateError {
|
||||
ImportNotFound {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
},
|
||||
MismatchedShards,
|
||||
UnexpectedUpdate,
|
||||
}
|
||||
|
||||
impl From<TimelineImportUpdateError> for ApiError {
|
||||
fn from(err: TimelineImportUpdateError) -> ApiError {
|
||||
match err {
|
||||
TimelineImportUpdateError::ImportNotFound {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} => ApiError::NotFound(
|
||||
anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
|
||||
),
|
||||
TimelineImportUpdateError::MismatchedShards => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Import shards do not match update request, likely a shard split happened during import, this is a bug"
|
||||
))
|
||||
}
|
||||
TimelineImportUpdateError::UnexpectedUpdate => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TimelineImport {
|
||||
pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
|
||||
let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
|
||||
let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
|
||||
let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
|
||||
|
||||
Ok(TimelineImport {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard_statuses,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
|
||||
TimelineImportPersistence {
|
||||
tenant_id: self.tenant_id.to_string(),
|
||||
timeline_id: self.timeline_id.to_string(),
|
||||
shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update(
|
||||
&mut self,
|
||||
shard: ShardIndex,
|
||||
status: ShardImportStatus,
|
||||
) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
|
||||
use std::collections::hash_map::Entry::*;
|
||||
|
||||
match self.shard_statuses.0.entry(shard) {
|
||||
Occupied(mut occ) => {
|
||||
let crnt = occ.get_mut();
|
||||
if *crnt == status {
|
||||
Ok(TimelineImportUpdateFollowUp::None)
|
||||
} else if crnt.is_terminal() && !status.is_terminal() {
|
||||
Err(TimelineImportUpdateError::UnexpectedUpdate)
|
||||
} else {
|
||||
*crnt = status;
|
||||
Ok(TimelineImportUpdateFollowUp::Persist)
|
||||
}
|
||||
}
|
||||
Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_complete(&self) -> bool {
|
||||
self.shard_statuses
|
||||
.0
|
||||
.values()
|
||||
.all(|status| status.is_terminal())
|
||||
}
|
||||
|
||||
pub(crate) fn completion_error(&self) -> Option<String> {
|
||||
assert!(self.is_complete());
|
||||
|
||||
let shard_errors: HashMap<_, _> = self
|
||||
.shard_statuses
|
||||
.0
|
||||
.iter()
|
||||
.filter_map(|(shard, status)| {
|
||||
if let ShardImportStatus::Error(err) = status {
|
||||
Some((*shard, err.clone()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if shard_errors.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(serde_json::to_string(&shard_errors).unwrap())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct UpcallClient {
|
||||
authorization_header: Option<String>,
|
||||
client: reqwest::Client,
|
||||
cancel: CancellationToken,
|
||||
base_url: String,
|
||||
}
|
||||
|
||||
const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct ImportCompleteRequest {
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
impl UpcallClient {
|
||||
pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
|
||||
let authorization_header = config
|
||||
.control_plane_jwt_token
|
||||
.clone()
|
||||
.map(|jwt| format!("Bearer {}", jwt));
|
||||
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
|
||||
.build()
|
||||
.expect("Failed to construct HTTP client");
|
||||
|
||||
let base_url = config
|
||||
.control_plane_url
|
||||
.clone()
|
||||
.expect("must be configured");
|
||||
|
||||
Self {
|
||||
authorization_header,
|
||||
client,
|
||||
cancel,
|
||||
base_url,
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify control plane of a completed import
|
||||
///
|
||||
/// This method guarantees at least once delivery semantics assuming
|
||||
/// eventual cplane availability. The cplane API is idempotent.
|
||||
pub(crate) async fn notify_import_complete(
|
||||
&self,
|
||||
import: &TimelineImport,
|
||||
) -> anyhow::Result<()> {
|
||||
let endpoint = if self.base_url.ends_with('/') {
|
||||
format!("{}import_complete", self.base_url)
|
||||
} else {
|
||||
format!("{}/import_complete", self.base_url)
|
||||
};
|
||||
|
||||
tracing::info!("Endpoint is {endpoint}");
|
||||
|
||||
let request = self
|
||||
.client
|
||||
.request(Method::PUT, endpoint)
|
||||
.json(&ImportCompleteRequest {
|
||||
tenant_id: import.tenant_id,
|
||||
timeline_id: import.timeline_id,
|
||||
error: import.completion_error(),
|
||||
})
|
||||
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
|
||||
|
||||
let request = if let Some(auth) = &self.authorization_header {
|
||||
request.header(reqwest::header::AUTHORIZATION, auth)
|
||||
} else {
|
||||
request
|
||||
};
|
||||
|
||||
const RETRY_DELAY: Duration = Duration::from_secs(1);
|
||||
let mut attempt = 1;
|
||||
|
||||
loop {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Shutting down while notifying cplane of import completion"
|
||||
));
|
||||
}
|
||||
|
||||
match request.try_clone().unwrap().send().await {
|
||||
Ok(response) if response.status().is_success() => {
|
||||
return Ok(());
|
||||
}
|
||||
Ok(response) => {
|
||||
tracing::warn!(
|
||||
"Import complete notification failed with status {}, attempt {}",
|
||||
response.status(),
|
||||
attempt
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Import complete notification failed with error: {}, attempt {}",
|
||||
e,
|
||||
attempt
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(RETRY_DELAY) => {}
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
|
||||
}
|
||||
}
|
||||
attempt += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,8 +5,6 @@ edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
aws-config.workspace = true
|
||||
aws-sdk-s3.workspace = true
|
||||
either.workspace = true
|
||||
anyhow.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -12,14 +12,9 @@ pub mod tenant_snapshot;
|
||||
|
||||
use std::env;
|
||||
use std::fmt::Display;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use anyhow::Context;
|
||||
use aws_config::retry::{RetryConfigBuilder, RetryMode};
|
||||
use aws_sdk_s3::Client;
|
||||
use aws_sdk_s3::config::Region;
|
||||
use aws_sdk_s3::error::DisplayErrorContext;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::ValueEnum;
|
||||
use futures::{Stream, StreamExt};
|
||||
@@ -28,7 +23,7 @@ use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_time
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
|
||||
RemoteStorageKind, S3Config,
|
||||
RemoteStorageKind, VersionId,
|
||||
};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -351,21 +346,6 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_s3_client(bucket_region: Region) -> Client {
|
||||
let mut retry_config_builder = RetryConfigBuilder::new();
|
||||
|
||||
retry_config_builder
|
||||
.set_max_attempts(Some(3))
|
||||
.set_mode(Some(RetryMode::Adaptive));
|
||||
|
||||
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
|
||||
.region(bucket_region)
|
||||
.retry_config(retry_config_builder.build())
|
||||
.load()
|
||||
.await;
|
||||
Client::new(&config)
|
||||
}
|
||||
|
||||
fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
|
||||
match node_kind {
|
||||
NodeKind::Pageserver => "pageserver/v1/",
|
||||
@@ -385,23 +365,6 @@ fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeK
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_remote_s3(
|
||||
bucket_config: S3Config,
|
||||
node_kind: NodeKind,
|
||||
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
|
||||
let bucket_region = Region::new(bucket_config.bucket_region);
|
||||
let s3_client = Arc::new(init_s3_client(bucket_region).await);
|
||||
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
|
||||
|
||||
let s3_root = make_root_target(
|
||||
bucket_config.bucket_name,
|
||||
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
|
||||
node_kind,
|
||||
);
|
||||
|
||||
Ok((s3_client, s3_root))
|
||||
}
|
||||
|
||||
async fn init_remote(
|
||||
mut storage_config: BucketConfig,
|
||||
node_kind: NodeKind,
|
||||
@@ -499,7 +462,7 @@ async fn list_objects_with_retries(
|
||||
remote_client.bucket_name().unwrap_or_default(),
|
||||
s3_target.prefix_in_bucket,
|
||||
s3_target.delimiter,
|
||||
DisplayErrorContext(e),
|
||||
e,
|
||||
);
|
||||
let backoff_time = 1 << trial.min(5);
|
||||
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
|
||||
@@ -549,14 +512,18 @@ async fn download_object_with_retries(
|
||||
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
|
||||
}
|
||||
|
||||
async fn download_object_to_file_s3(
|
||||
s3_client: &Client,
|
||||
bucket_name: &str,
|
||||
key: &str,
|
||||
version_id: Option<&str>,
|
||||
async fn download_object_to_file(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
key: &RemotePath,
|
||||
version_id: Option<VersionId>,
|
||||
local_path: &Utf8Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let opts = DownloadOpts {
|
||||
version_id: version_id.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
|
||||
let cancel = CancellationToken::new();
|
||||
for _ in 0..MAX_RETRIES {
|
||||
tokio::fs::remove_file(&tmp_path)
|
||||
.await
|
||||
@@ -566,28 +533,24 @@ async fn download_object_to_file_s3(
|
||||
.await
|
||||
.context("Opening output file")?;
|
||||
|
||||
let request = s3_client.get_object().bucket(bucket_name).key(key);
|
||||
let res = remote_storage.download(key, &opts, &cancel).await;
|
||||
|
||||
let request = match version_id {
|
||||
Some(version_id) => request.version_id(version_id),
|
||||
None => request,
|
||||
};
|
||||
|
||||
let response_stream = match request.send().await {
|
||||
let download = match res {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to download object for key {key} version {}: {e:#}",
|
||||
version_id.unwrap_or("")
|
||||
"Failed to download object for key {key} version {:?}: {e:#}",
|
||||
&version_id.as_ref().unwrap_or(&VersionId(String::new()))
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut read_stream = response_stream.body.into_async_read();
|
||||
//response_stream.download_stream
|
||||
|
||||
tokio::io::copy(&mut read_stream, &mut file).await?;
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
tokio::io::copy(&mut body, &mut file).await?;
|
||||
|
||||
tokio::fs::rename(&tmp_path, local_path).await?;
|
||||
return Ok(());
|
||||
|
||||
@@ -1,31 +1,30 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use async_stream::stream;
|
||||
use aws_sdk_s3::Client;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use pageserver::tenant::IndexPart;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver::tenant::remote_timeline_client::remote_layer_path;
|
||||
use pageserver::tenant::storage_layer::LayerName;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{GenericRemoteStorage, S3Config};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TenantId;
|
||||
|
||||
use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs};
|
||||
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
|
||||
use crate::{
|
||||
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file_s3,
|
||||
init_remote, init_remote_s3,
|
||||
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file, init_remote,
|
||||
};
|
||||
|
||||
pub struct SnapshotDownloader {
|
||||
s3_client: Arc<Client>,
|
||||
s3_root: RootTarget,
|
||||
remote_client: GenericRemoteStorage,
|
||||
#[allow(dead_code)]
|
||||
target: RootTarget,
|
||||
bucket_config: BucketConfig,
|
||||
bucket_config_s3: S3Config,
|
||||
tenant_id: TenantId,
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
@@ -38,17 +37,13 @@ impl SnapshotDownloader {
|
||||
output_path: Utf8PathBuf,
|
||||
concurrency: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
let bucket_config_s3 = match &bucket_config.0.storage {
|
||||
remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
|
||||
_ => panic!("only S3 configuration is supported for snapshot downloading"),
|
||||
};
|
||||
let (s3_client, s3_root) =
|
||||
init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
|
||||
let (remote_client, target) =
|
||||
init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
|
||||
|
||||
Ok(Self {
|
||||
s3_client,
|
||||
s3_root,
|
||||
remote_client,
|
||||
target,
|
||||
bucket_config,
|
||||
bucket_config_s3,
|
||||
tenant_id,
|
||||
output_path,
|
||||
concurrency,
|
||||
@@ -61,6 +56,7 @@ impl SnapshotDownloader {
|
||||
layer_name: LayerName,
|
||||
layer_metadata: LayerFileMetadata,
|
||||
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
|
||||
let cancel = CancellationToken::new();
|
||||
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
|
||||
// different layer names (remote-style has the generation suffix)
|
||||
let local_path = self.output_path.join(format!(
|
||||
@@ -82,30 +78,27 @@ impl SnapshotDownloader {
|
||||
} else {
|
||||
tracing::debug!("{} requires download...", local_path);
|
||||
|
||||
let timeline_root = self.s3_root.timeline_root(&ttid);
|
||||
let remote_layer_path = format!(
|
||||
"{}{}{}",
|
||||
timeline_root.prefix_in_bucket,
|
||||
layer_name,
|
||||
layer_metadata.generation.get_suffix()
|
||||
let remote_path = remote_layer_path(
|
||||
&ttid.tenant_shard_id.tenant_id,
|
||||
&ttid.timeline_id,
|
||||
layer_metadata.shard,
|
||||
&layer_name,
|
||||
layer_metadata.generation,
|
||||
);
|
||||
let mode = remote_storage::ListingMode::NoDelimiter;
|
||||
|
||||
// List versions: the object might be deleted.
|
||||
let versions = self
|
||||
.s3_client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_config_s3.bucket_name.clone())
|
||||
.prefix(&remote_layer_path)
|
||||
.send()
|
||||
.remote_client
|
||||
.list_versions(Some(&remote_path), mode, None, &cancel)
|
||||
.await?;
|
||||
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
|
||||
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
|
||||
let Some(version) = versions.versions.first() else {
|
||||
return Err(anyhow::anyhow!("No versions found for {remote_path}"));
|
||||
};
|
||||
download_object_to_file_s3(
|
||||
&self.s3_client,
|
||||
&self.bucket_config_s3.bucket_name,
|
||||
&remote_layer_path,
|
||||
version.version_id.as_deref(),
|
||||
download_object_to_file(
|
||||
&self.remote_client,
|
||||
&remote_path,
|
||||
version.version_id().cloned(),
|
||||
&local_path,
|
||||
)
|
||||
.await?;
|
||||
|
||||
70
test_runner/regress/test_compute_monitor.py
Normal file
70
test_runner/regress/test_compute_monitor.py
Normal file
@@ -0,0 +1,70 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_compute_monitor(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can detect Postgres going down (unresponsive) and
|
||||
reconnect when it comes back online. Also check that the downtime metrics
|
||||
are properly emitted.
|
||||
"""
|
||||
TEST_DB = "test_compute_monitor"
|
||||
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Check that default postgres database is present
|
||||
with endpoint.cursor() as cursor:
|
||||
cursor.execute("SELECT datname FROM pg_database WHERE datname = 'postgres'")
|
||||
catalog_db = cursor.fetchone()
|
||||
assert catalog_db is not None
|
||||
assert len(catalog_db) == 1
|
||||
|
||||
# Create a new database
|
||||
cursor.execute(f"CREATE DATABASE {TEST_DB}")
|
||||
|
||||
# Drop database 'postgres'
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
# Use FORCE to terminate all connections to the database
|
||||
cursor.execute("DROP DATABASE postgres WITH (FORCE)")
|
||||
|
||||
client = endpoint.http_client()
|
||||
|
||||
def check_metrics_down():
|
||||
raw_metrics = client.metrics()
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
|
||||
assert len(compute_pg_current_downtime_ms) == 1
|
||||
assert compute_pg_current_downtime_ms[0].value > 0
|
||||
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
|
||||
assert len(compute_pg_downtime_ms_total) == 1
|
||||
assert compute_pg_downtime_ms_total[0].value > 0
|
||||
|
||||
wait_until(check_metrics_down)
|
||||
|
||||
# Recreate postgres database
|
||||
with endpoint.cursor(dbname=TEST_DB) as cursor:
|
||||
cursor.execute("CREATE DATABASE postgres")
|
||||
|
||||
# Current downtime should reset to 0, but not total downtime
|
||||
def check_metrics_up():
|
||||
raw_metrics = client.metrics()
|
||||
metrics = parse_metrics(raw_metrics)
|
||||
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
|
||||
assert len(compute_pg_current_downtime_ms) == 1
|
||||
assert compute_pg_current_downtime_ms[0].value == 0
|
||||
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
|
||||
assert len(compute_pg_downtime_ms_total) == 1
|
||||
assert compute_pg_downtime_ms_total[0].value > 0
|
||||
|
||||
wait_until(check_metrics_up)
|
||||
|
||||
# Just a sanity check that we log the downtime info
|
||||
endpoint.log_contains("downtime_info")
|
||||
@@ -1,9 +1,9 @@
|
||||
import base64
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
@@ -14,12 +14,11 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
PageserverApiException,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
|
||||
from fixtures.utils import shared_buffers_for_max_cu
|
||||
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
@@ -44,6 +43,7 @@ smoke_params = [
|
||||
]
|
||||
|
||||
|
||||
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
|
||||
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
|
||||
def test_pgdata_import_smoke(
|
||||
vanilla_pg: VanillaPostgres,
|
||||
@@ -56,24 +56,29 @@ def test_pgdata_import_smoke(
|
||||
#
|
||||
# Setup fake control plane for import progress
|
||||
#
|
||||
import_completion_signaled = Event()
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"control plane request: {request.json}")
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# The test needs LocalFs support, which is only built in testing mode.
|
||||
env.pageserver.is_testing_enabled_or_skip()
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api"
|
||||
}
|
||||
)
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
@@ -193,40 +198,11 @@ def test_pgdata_import_smoke(
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
|
||||
|
||||
while True:
|
||||
locations = env.storage_controller.locate(tenant_id)
|
||||
active_count = 0
|
||||
for location in locations:
|
||||
shard_id = TenantShardId.parse(location["shard_id"])
|
||||
ps = env.get_pageserver(location["node_id"])
|
||||
try:
|
||||
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
|
||||
state = detail["state"]
|
||||
log.info(f"shard {shard_id} state: {state}")
|
||||
if state == "Active":
|
||||
active_count += 1
|
||||
except PageserverApiException as e:
|
||||
if e.status_code == 404:
|
||||
log.info("not found, import is in progress")
|
||||
continue
|
||||
elif e.status_code == 429:
|
||||
log.info("import is in progress")
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
|
||||
shard_status_file = statusdir / f"shard-{shard_id.shard_index}"
|
||||
if state == "Active":
|
||||
shard_status_file_contents = (
|
||||
shard_status_file.read_text()
|
||||
) # Active state implies import is done
|
||||
shard_status = json.loads(shard_status_file_contents)
|
||||
assert shard_status["done"] is True
|
||||
|
||||
if active_count == len(locations):
|
||||
log.info("all shards are active")
|
||||
break
|
||||
time.sleep(1)
|
||||
# Generous timeout for the MULTIPLE_RELATION_SEGMENTS test variants
|
||||
wait_until(cplane_notified, timeout=90)
|
||||
|
||||
import_duration = time.monotonic() - start
|
||||
log.info(f"import complete; duration={import_duration:.2f}s")
|
||||
@@ -372,19 +348,27 @@ def test_fast_import_with_pageserver_ingest(
|
||||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
|
||||
|
||||
# Setup pageserver and fake cplane for import progress
|
||||
import_completion_signaled = Event()
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"control plane request: {request.json}")
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
|
||||
# because import_pgdata code uses this endpoint, not the one in common remote storage config
|
||||
# TODO: maybe use common remote_storage config in pageserver?
|
||||
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
|
||||
@@ -476,42 +460,10 @@ def test_fast_import_with_pageserver_ingest(
|
||||
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
|
||||
validate_vanilla_equivalence(conn)
|
||||
|
||||
# Poll pageserver statuses in s3
|
||||
while True:
|
||||
locations = env.storage_controller.locate(tenant_id)
|
||||
active_count = 0
|
||||
for location in locations:
|
||||
shard_id = TenantShardId.parse(location["shard_id"])
|
||||
ps = env.get_pageserver(location["node_id"])
|
||||
try:
|
||||
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
|
||||
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
|
||||
state = detail["state"]
|
||||
log.info(f"shard {shard_id} state: {state}")
|
||||
if state == "Active":
|
||||
active_count += 1
|
||||
except PageserverApiException as e:
|
||||
if e.status_code == 404:
|
||||
log.info("not found, import is in progress")
|
||||
continue
|
||||
elif e.status_code == 429:
|
||||
log.info("import is in progress")
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
|
||||
if state == "Active":
|
||||
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
|
||||
shard_status_file_contents = (
|
||||
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
|
||||
)
|
||||
shard_status = json.loads(shard_status_file_contents)
|
||||
assert shard_status["done"] is True
|
||||
|
||||
if active_count == len(locations):
|
||||
log.info("all shards are active")
|
||||
break
|
||||
time.sleep(0.5)
|
||||
wait_until(cplane_notified, timeout=60)
|
||||
|
||||
import_duration = time.monotonic() - start
|
||||
log.info(f"import complete; duration={import_duration:.2f}s")
|
||||
|
||||
@@ -77,6 +77,8 @@ regex-automata = { version = "0.4", default-features = false, features = ["dfa-o
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
|
||||
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
|
||||
rustls-pki-types = { version = "1", features = ["std"] }
|
||||
rustls-webpki = { version = "0.102", default-features = false, features = ["ring", "std"] }
|
||||
scopeguard = { version = "1" }
|
||||
sec1 = { version = "0.7", features = ["pem", "serde", "std", "subtle"] }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
@@ -103,7 +105,6 @@ tracing-core = { version = "0.1" }
|
||||
tracing-log = { version = "0.2" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4", "v7"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
@@ -146,7 +147,6 @@ serde = { version = "1", features = ["alloc", "derive"] }
|
||||
syn = { version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
|
||||
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
Reference in New Issue
Block a user