Compare commits

..

1 Commits

Author SHA1 Message Date
John Crowley
d4326bfaf4 Initial implementation of GCS provider. 2025-04-22 13:46:57 -05:00
87 changed files with 2584 additions and 2910 deletions

View File

@@ -19,7 +19,7 @@
!pageserver/
!pgxn/
!proxy/
!endpoint_storage/
!object_storage/
!storage_scrubber/
!safekeeper/
!storage_broker/

View File

@@ -133,7 +133,6 @@ runs:
fi
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
echo "PERF_REPORT_DIR=${PERF_REPORT_DIR}" >> ${GITHUB_ENV}
rm -rf $PERF_REPORT_DIR
TEST_SELECTION="test_runner/${{ inputs.test_selection }}"
@@ -210,12 +209,11 @@ runs:
--verbose \
-rA $TEST_SELECTION $EXTRA_PARAMS
- name: Upload performance report
if: ${{ !cancelled() && inputs.save_perf_report == 'true' }}
shell: bash -euxo pipefail {0}
run: |
export REPORT_FROM="${PERF_REPORT_DIR}"
scripts/generate_and_push_perf_report.sh
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
export REPORT_FROM="$PERF_REPORT_DIR"
export REPORT_TO="$PLATFORM"
scripts/generate_and_push_perf_report.sh
fi
- name: Upload compatibility snapshot
# Note, that we use `github.base_ref` which is a target branch for a PR

View File

@@ -1238,7 +1238,7 @@ jobs:
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
TIMEOUT=5400 # 90 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
TIMEOUT=1800 # 30 minutes, usually it takes ~2-3 minutes, but if runners are busy, it might take longer
INTERVAL=15 # try each N seconds
last_status="" # a variable to carry the last status of the "build-and-upload-extensions" context

218
Cargo.lock generated
View File

@@ -40,7 +40,7 @@ dependencies = [
"getrandom 0.2.11",
"once_cell",
"version_check",
"zerocopy 0.7.31",
"zerocopy",
]
[[package]]
@@ -2037,33 +2037,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "endpoint_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "enum-map"
version = "2.5.0"
@@ -2451,6 +2424,33 @@ dependencies = [
"slab",
]
[[package]]
name = "gcp_auth"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf67f30198e045a039264c01fb44659ce82402d7771c50938beb41a5ac87733"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"chrono",
"home",
"http 1.1.0",
"http-body-util",
"hyper 1.4.1",
"hyper-rustls 0.27.5",
"hyper-util",
"ring",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tracing",
"tracing-futures",
"url",
]
[[package]]
name = "gen_ops"
version = "0.4.0"
@@ -2749,6 +2749,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hostname"
version = "0.4.0"
@@ -2978,6 +2987,24 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-rustls"
version = "0.27.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.4.1",
"hyper-util",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.0",
"tower-service",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
@@ -3733,6 +3760,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -4025,6 +4062,33 @@ dependencies = [
"memchr",
]
[[package]]
name = "object_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum-extra",
"camino",
"camino-tempfile",
"futures",
"http-body-util",
"itertools 0.10.5",
"jsonwebtoken",
"prometheus",
"rand 0.8.5",
"remote_storage",
"serde",
"serde_json",
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "once_cell"
version = "1.20.2"
@@ -4415,9 +4479,9 @@ dependencies = [
[[package]]
name = "papaya"
version = "0.2.1"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6827e3fc394523c21d4464d02c0bb1c19966ea4a58a9844ad6d746214179d2bc"
checksum = "aab21828b6b5952fdadd6c377728ffae53ec3a21b2febc47319ab65741f7e2fd"
dependencies = [
"equivalent",
"seize",
@@ -5204,7 +5268,7 @@ dependencies = [
"walkdir",
"workspace_hack",
"x509-cert",
"zerocopy 0.8.24",
"zerocopy",
]
[[package]]
@@ -5522,8 +5586,11 @@ dependencies = [
"bytes",
"camino",
"camino-tempfile",
"chrono",
"futures",
"futures-util",
"gcp_auth",
"http 1.1.0",
"http-body-util",
"http-types",
"humantime-serde",
@@ -5544,7 +5611,9 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"url",
"utils",
"uuid",
]
[[package]]
@@ -5574,6 +5643,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"once_cell",
"percent-encoding",
"pin-project-lite",
@@ -5594,7 +5664,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
"webpki-roots 0.26.1",
"winreg",
]
@@ -6195,13 +6265,13 @@ checksum = "224e328af6e080cddbab3c770b1cf50f0351ba0577091ef2410c3951d835ff87"
[[package]]
name = "sentry"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "255914a8e53822abd946e2ce8baa41d4cded6b8e938913b7f7b9da5b7ab44335"
checksum = "00421ed8fa0c995f07cde48ba6c89e80f2b312f74ff637326f392fbfd23abe02"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.23.18",
"rustls 0.21.12",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -6209,14 +6279,14 @@ dependencies = [
"sentry-tracing",
"tokio",
"ureq",
"webpki-roots",
"webpki-roots 0.25.2",
]
[[package]]
name = "sentry-backtrace"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00293cd332a859961f24fd69258f7e92af736feaeb91020cff84dac4188a4302"
checksum = "a79194074f34b0cbe5dd33896e5928bbc6ab63a889bd9df2264af5acb186921e"
dependencies = [
"backtrace",
"once_cell",
@@ -6226,9 +6296,9 @@ dependencies = [
[[package]]
name = "sentry-contexts"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "961990f9caa76476c481de130ada05614cd7f5aa70fb57c2142f0e09ad3fb2aa"
checksum = "eba8870c5dba2bfd9db25c75574a11429f6b95957b0a78ac02e2970dd7a5249a"
dependencies = [
"hostname",
"libc",
@@ -6240,9 +6310,9 @@ dependencies = [
[[package]]
name = "sentry-core"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a6409d845707d82415c800290a5d63be5e3df3c2e417b0997c60531dfbd35ef"
checksum = "46a75011ea1c0d5c46e9e57df03ce81f5c7f0a9e199086334a1f9c0a541e0826"
dependencies = [
"once_cell",
"rand 0.8.5",
@@ -6253,9 +6323,9 @@ dependencies = [
[[package]]
name = "sentry-panic"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "609b1a12340495ce17baeec9e08ff8ed423c337c1a84dffae36a178c783623f3"
checksum = "2eaa3ecfa3c8750c78dcfd4637cfa2598b95b52897ed184b4dc77fcf7d95060d"
dependencies = [
"sentry-backtrace",
"sentry-core",
@@ -6263,9 +6333,9 @@ dependencies = [
[[package]]
name = "sentry-tracing"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f4e86402d5c50239dc7d8fd3f6d5e048221d5fcb4e026d8d50ab57fe4644cb"
checksum = "f715932bf369a61b7256687c6f0554141b7ce097287e30e3f7ed6e9de82498fe"
dependencies = [
"sentry-backtrace",
"sentry-core",
@@ -6275,9 +6345,9 @@ dependencies = [
[[package]]
name = "sentry-types"
version = "0.37.0"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d3f117b8755dbede8260952de2aeb029e20f432e72634e8969af34324591631"
checksum = "4519c900ce734f7a0eb7aba0869dfb225a7af8820634a7dd51449e3b093cfb7c"
dependencies = [
"debugid",
"hex",
@@ -6711,6 +6781,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"aws-config",
"aws-sdk-s3",
"camino",
"chrono",
"clap",
@@ -7595,6 +7667,16 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
@@ -7748,6 +7830,12 @@ dependencies = [
"libc",
]
[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-bidi"
version = "0.3.17"
@@ -7799,7 +7887,7 @@ dependencies = [
"rustls 0.23.18",
"rustls-pki-types",
"url",
"webpki-roots",
"webpki-roots 0.26.1",
]
[[package]]
@@ -8167,6 +8255,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc"
[[package]]
name = "webpki-roots"
version = "0.26.1"
@@ -8474,8 +8568,6 @@ dependencies = [
"regex-syntax 0.8.2",
"reqwest",
"rustls 0.23.18",
"rustls-pki-types",
"rustls-webpki 0.102.8",
"scopeguard",
"sec1 0.7.3",
"serde",
@@ -8504,6 +8596,7 @@ dependencies = [
"tracing-log",
"url",
"uuid",
"zerocopy",
"zeroize",
"zstd",
"zstd-safe",
@@ -8607,16 +8700,8 @@ version = "0.7.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
dependencies = [
"zerocopy-derive 0.7.31",
]
[[package]]
name = "zerocopy"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879"
dependencies = [
"zerocopy-derive 0.8.24",
"byteorder",
"zerocopy-derive",
]
[[package]]
@@ -8630,17 +8715,6 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]]
name = "zerofrom"
version = "0.1.5"

View File

@@ -40,7 +40,7 @@ members = [
"libs/proxy/postgres-protocol2",
"libs/proxy/postgres-types2",
"libs/proxy/tokio-postgres2",
"endpoint_storage",
"object_storage",
]
[workspace.package]
@@ -164,7 +164,7 @@ scopeguard = "1.1"
sysinfo = "0.29.2"
sd-notify = "0.4.1"
send-future = "0.1.0"
sentry = { version = "0.37", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
sentry = { version = "0.32", default-features = false, features = ["backtrace", "contexts", "panic", "rustls", "reqwest" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
@@ -220,7 +220,7 @@ uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.8"
whoami = "1.5.1"
zerocopy = { version = "0.8", features = ["derive", "simd"] }
zerocopy = { version = "0.7", features = ["derive"] }
json-structural-diff = { version = "0.2.0" }
x509-cert = { version = "0.2.5" }

View File

@@ -89,7 +89,7 @@ RUN set -e \
--bin storage_broker \
--bin storage_controller \
--bin proxy \
--bin endpoint_storage \
--bin object_storage \
--bin neon_local \
--bin storage_scrubber \
--locked --release
@@ -122,7 +122,7 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_broker /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_controller /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/endpoint_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/object_storage /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/neon_local /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/storage_scrubber /usr/local/bin

View File

@@ -173,7 +173,7 @@ RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v$
&& rm -rf protoc.zip protoc
# s5cmd
ENV S5CMD_VERSION=2.3.0
ENV S5CMD_VERSION=2.2.2
RUN curl -sL "https://github.com/peak/s5cmd/releases/download/v${S5CMD_VERSION}/s5cmd_${S5CMD_VERSION}_Linux-$(uname -m | sed 's/x86_64/64bit/g' | sed 's/aarch64/arm64/g').tar.gz" | tar zxvf - s5cmd \
&& chmod +x s5cmd \
&& mv s5cmd /usr/local/bin/s5cmd
@@ -206,7 +206,7 @@ RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-$(uname -m).zip" -o "aws
&& rm awscliv2.zip
# Mold: A Modern Linker
ENV MOLD_VERSION=v2.37.1
ENV MOLD_VERSION=v2.34.1
RUN set -e \
&& git clone https://github.com/rui314/mold.git \
&& mkdir mold/build \
@@ -268,7 +268,7 @@ WORKDIR /home/nonroot
RUN echo -e "--retry-connrefused\n--connect-timeout 15\n--retry 5\n--max-time 300\n" > /home/nonroot/.curlrc
# Python
ENV PYTHON_VERSION=3.11.12 \
ENV PYTHON_VERSION=3.11.10 \
PYENV_ROOT=/home/nonroot/.pyenv \
PATH=/home/nonroot/.pyenv/shims:/home/nonroot/.pyenv/bin:/home/nonroot/.poetry/bin:$PATH
RUN set -e \
@@ -296,12 +296,12 @@ ENV RUSTC_VERSION=1.86.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1
ARG CARGO_HAKARI_VERSION=0.9.36
ARG CARGO_DENY_VERSION=0.18.2
ARG CARGO_HACK_VERSION=0.6.36
ARG CARGO_NEXTEST_VERSION=0.9.94
ARG CARGO_HAKARI_VERSION=0.9.33
ARG CARGO_DENY_VERSION=0.16.2
ARG CARGO_HACK_VERSION=0.6.33
ARG CARGO_NEXTEST_VERSION=0.9.85
ARG CARGO_CHEF_VERSION=0.1.71
ARG CARGO_DIESEL_CLI_VERSION=2.2.9
ARG CARGO_DIESEL_CLI_VERSION=2.2.6
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
chmod +x rustup-init && \
./rustup-init -y --default-toolchain ${RUSTC_VERSION} && \

View File

@@ -1677,7 +1677,7 @@ RUN set -e \
&& apt clean && rm -rf /var/lib/apt/lists/*
# Use `dist_man_MANS=` to skip manpage generation (which requires python3/pandoc)
ENV PGBOUNCER_TAG=pgbouncer_1_24_1
ENV PGBOUNCER_TAG=pgbouncer_1_22_1
RUN set -e \
&& git clone --recurse-submodules --depth 1 --branch ${PGBOUNCER_TAG} https://github.com/pgbouncer/pgbouncer.git pgbouncer \
&& cd pgbouncer \

View File

@@ -11,14 +11,6 @@ index bf6edcb..89b4c7f 100644
USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/regress/expected/init-extension.out b/regress/expected/init-extension.out
index 9f2e171..f6e4f8d 100644
--- a/regress/expected/init-extension.out
+++ b/regress/expected/init-extension.out
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/expected/nosuper.out b/regress/expected/nosuper.out
index 8d0a94e..63b68bf 100644
--- a/regress/expected/nosuper.out
@@ -50,14 +42,6 @@ index 8d0a94e..63b68bf 100644
INFO: repacking table "public.tbl_cluster"
ERROR: query failed: ERROR: current transaction is aborted, commands ignored until end of transaction block
DETAIL: query was: RESET lock_timeout
diff --git a/regress/sql/init-extension.sql b/regress/sql/init-extension.sql
index 9f2e171..f6e4f8d 100644
--- a/regress/sql/init-extension.sql
+++ b/regress/sql/init-extension.sql
@@ -1,3 +1,2 @@
SET client_min_messages = warning;
CREATE EXTENSION pg_repack;
-RESET client_min_messages;
diff --git a/regress/sql/nosuper.sql b/regress/sql/nosuper.sql
index 072f0fa..dbe60f8 100644
--- a/regress/sql/nosuper.sql

View File

@@ -1,8 +1,8 @@
use metrics::core::{AtomicF64, AtomicU64, Collector, GenericCounter, GenericGauge};
use metrics::core::{AtomicF64, Collector, GenericGauge};
use metrics::proto::MetricFamily;
use metrics::{
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter,
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec,
IntCounterVec, IntGaugeVec, UIntGaugeVec, register_gauge, register_int_counter_vec,
register_int_gauge_vec, register_uint_gauge_vec,
};
use once_cell::sync::Lazy;
@@ -81,22 +81,6 @@ pub(crate) static COMPUTE_CTL_UP: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static PG_CURR_DOWNTIME_MS: Lazy<GenericGauge<AtomicF64>> = Lazy::new(|| {
register_gauge!(
"compute_pg_current_downtime_ms",
"Non-cumulative duration of Postgres downtime in ms; resets after successful check",
)
.expect("failed to define a metric")
});
pub(crate) static PG_TOTAL_DOWNTIME_MS: Lazy<GenericCounter<AtomicU64>> = Lazy::new(|| {
register_int_counter!(
"compute_pg_downtime_ms_total",
"Cumulative duration of Postgres downtime in ms",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -104,7 +88,5 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(REMOTE_EXT_REQUESTS_TOTAL.collect());
metrics.extend(DB_MIGRATION_FAILED.collect());
metrics.extend(AUDIT_LOG_DIR_SIZE.collect());
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics
}

View File

@@ -6,294 +6,197 @@ use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::spec::ComputeFeature;
use postgres::{Client, NoTls};
use tracing::{Level, error, info, instrument, span};
use tracing::{debug, error, info, warn};
use crate::compute::ComputeNode;
use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
struct ComputeMonitor {
compute: Arc<ComputeNode>,
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// NB: the only expected panic is at `Mutex` unwrap(), all other errors
// should be handled gracefully.
fn watch_compute_activity(compute: &ComputeNode) {
// Suppose that `connstr` doesn't change
let connstr = compute.params.connstr.clone();
let conf = compute.get_conn_conf(Some("compute_ctl:activity_monitor"));
/// The moment when Postgres had some activity,
/// that should prevent compute from being suspended.
last_active: Option<DateTime<Utc>>,
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(compute);
/// The moment when we last tried to check Postgres.
last_checked: DateTime<Utc>,
/// The last moment we did a successful Postgres check.
last_up: DateTime<Utc>,
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
active_time: Option<f64>,
/// Only used for internal statistics change tracking
/// between monitor runs and can be outdated.
sessions: Option<i64>,
let mut sleep = false;
let mut prev_active_time: Option<f64> = None;
let mut prev_sessions: Option<i64> = None;
/// Use experimental statistics-based activity monitor. It's no longer
/// 'experimental' per se, as it's enabled for everyone, but we still
/// keep the flag as an option to turn it off in some cases if it will
/// misbehave.
experimental: bool,
}
impl ComputeMonitor {
fn report_down(&self) {
let now = Utc::now();
// Calculate and report current downtime
// (since the last time Postgres was up)
let downtime = now.signed_duration_since(self.last_up);
PG_CURR_DOWNTIME_MS.set(downtime.num_milliseconds() as f64);
// Calculate and update total downtime
// (cumulative duration of Postgres downtime in ms)
let inc = now
.signed_duration_since(self.last_checked)
.num_milliseconds();
PG_TOTAL_DOWNTIME_MS.inc_by(inc as u64);
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
info!("starting experimental activity monitor for {}", connstr);
} else {
info!("starting activity monitor for {}", connstr);
}
fn report_up(&mut self) {
self.last_up = Utc::now();
PG_CURR_DOWNTIME_MS.set(0.0);
}
fn downtime_info(&self) -> String {
format!(
"total_ms: {}, current_ms: {}, last_up: {}",
PG_TOTAL_DOWNTIME_MS.get(),
PG_CURR_DOWNTIME_MS.get(),
self.last_up
)
}
/// Spin in a loop and figure out the last activity time in the Postgres.
/// Then update it in the shared state. This function never errors out.
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
/// should be handled gracefully.
#[instrument(skip_all)]
pub fn run(&mut self) {
// Suppose that `connstr` doesn't change
let connstr = self.compute.params.connstr.clone();
let conf = self
.compute
.get_conn_conf(Some("compute_ctl:compute_monitor"));
// During startup and configuration we connect to every Postgres database,
// but we don't want to count this as some user activity. So wait until
// the compute fully started before monitoring activity.
wait_for_postgres_start(&self.compute);
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = conf.connect(NoTls);
info!("starting compute monitor for {}", connstr);
loop {
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!(
downtime_info = self.downtime_info(),
"connection to Postgres is closed, trying to reconnect"
);
self.report_down();
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
} else {
match self.check(cli) {
Ok(_) => {
self.report_up();
self.compute.update_last_active(self.last_active);
}
Err(e) => {
// Although we have many places where we can return errors in `check()`,
// normally it shouldn't happen. I.e., we will likely return error if
// connection got broken, query timed out, Postgres returned invalid data, etc.
// In all such cases it's suspicious, so let's report this as downtime.
self.report_down();
error!(
downtime_info = self.downtime_info(),
"could not check Postgres: {}", e
);
// Reconnect to Postgres just in case. During tests, I noticed
// that queries in `check()` can fail with `connection closed`,
// but `cli.is_closed()` above doesn't detect it. Even if old
// connection is still alive, it will be dropped when we reassign
// `client` to a new connection.
client = conf.connect(NoTls);
}
}
}
}
Err(e) => {
info!(
downtime_info = self.downtime_info(),
"could not connect to Postgres: {}, retrying", e
);
self.report_down();
// Establish a new connection and try again.
client = conf.connect(NoTls);
}
}
// Reset the `last_checked` timestamp and sleep before the next iteration.
self.last_checked = Utc::now();
loop {
// We use `continue` a lot, so it's more convenient to sleep at the top of the loop.
// But skip the first sleep, so we can connect to Postgres immediately.
if sleep {
// Should be outside of the mutex lock to allow others to read while we sleep.
thread::sleep(MONITOR_CHECK_INTERVAL);
} else {
sleep = true;
}
}
#[instrument(skip_all)]
fn check(&mut self, cli: &mut Client) -> anyhow::Result<()> {
// This is new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if self.experimental {
// Check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
match &mut client {
Ok(cli) => {
if cli.is_closed() {
info!("connection to Postgres is closed, trying to reconnect");
if let Some(prev_active_time) = self.active_time {
if active_time != prev_active_time {
detected_activity = true;
// Connection is closed, reconnect and try again.
client = conf.connect(NoTls);
continue;
}
// This is a new logic, only enable if the feature flag is set.
// TODO: remove this once we are sure that it works OR drop it altogether.
if compute.has_feature(ComputeFeature::ActivityMonitorExperimental) {
// First, check if the total active time or sessions across all databases has changed.
// If it did, it means that user executed some queries. In theory, it can even go down if
// some databases were dropped, but it's still a user activity.
match get_database_stats(cli) {
Ok((active_time, sessions)) => {
let mut detected_activity = false;
prev_active_time = match prev_active_time {
Some(prev_active_time) => {
if active_time != prev_active_time {
detected_activity = true;
}
Some(active_time)
}
None => Some(active_time),
};
prev_sessions = match prev_sessions {
Some(prev_sessions) => {
if sessions != prev_sessions {
detected_activity = true;
}
Some(sessions)
}
None => Some(sessions),
};
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
error!("could not get database statistics: {}", e);
continue;
}
}
self.active_time = Some(active_time);
}
if let Some(prev_sessions) = self.sessions {
if sessions != prev_sessions {
detected_activity = true;
// Second, if database statistics is the same, check all backends state change,
// maybe there is some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions, that did nothing yet.
match get_backends_state_change(cli) {
Ok(last_active) => {
compute.update_last_active(last_active);
}
Err(e) => {
error!("could not get backends state change: {}", e);
}
}
// Finally, if there are existing (logical) walsenders, do not suspend.
//
// walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will be
let ws_count_query = "select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(ws_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
}
self.sessions = Some(sessions);
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
self.last_active = Some(Utc::now());
return Ok(());
Err(e) => {
warn!("failed to parse walsenders count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of walsenders: {:?}", e);
continue;
}
}
Err(e) => {
return Err(anyhow::anyhow!("could not get database statistics: {}", e));
//
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
//
let logical_subscriptions_query =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(logical_subscriptions_query, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse `pg_stat_subscription` count: {:?}", e);
continue;
}
},
Err(e) => {
warn!(
"failed to get list of active logical replication subscriptions: {:?}",
e
);
continue;
}
}
//
// Do not suspend compute if autovacuum is running
//
let autovacuum_count_query = "select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(autovacuum_count_query, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
compute.update_last_active(Some(Utc::now()));
continue;
}
}
Err(e) => {
warn!("failed to parse autovacuum workers count: {:?}", e);
continue;
}
},
Err(e) => {
warn!("failed to get list of autovacuum workers: {:?}", e);
continue;
}
}
}
}
// If database statistics are the same, check all backends for state changes.
// Maybe there are some with more recent activity. `get_backends_state_change()`
// can return None or stale timestamp, so it's `compute.update_last_active()`
// responsibility to check if the new timestamp is more recent than the current one.
// This helps us to discover new sessions that have not done anything yet.
match get_backends_state_change(cli) {
Ok(last_active) => match (last_active, self.last_active) {
(Some(last_active), Some(prev_last_active)) => {
if last_active > prev_last_active {
self.last_active = Some(last_active);
return Ok(());
}
}
(Some(last_active), None) => {
self.last_active = Some(last_active);
return Ok(());
}
_ => {}
},
Err(e) => {
return Err(anyhow::anyhow!(
"could not get backends state change: {}",
e
));
debug!("could not connect to Postgres: {}, retrying", e);
// Establish a new connection and try again.
client = conf.connect(NoTls);
}
}
// If there are existing (logical) walsenders, do not suspend.
//
// N.B. walproposer doesn't currently show up in pg_stat_replication,
// but protect if it will.
const WS_COUNT_QUERY: &str =
"select count(*) from pg_stat_replication where application_name != 'walproposer';";
match cli.query_one(WS_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
let err: anyhow::Error = e.into();
return Err(err.context("failed to parse walsenders count"));
}
},
Err(e) => {
return Err(anyhow::anyhow!("failed to get list of walsenders: {}", e));
}
}
// Don't suspend compute if there is an active logical replication subscription
//
// `where pid is not null` to filter out read only computes and subscription on branches
const LOGICAL_SUBSCRIPTIONS_QUERY: &str =
"select count(*) from pg_stat_subscription where pid is not null;";
match cli.query_one(LOGICAL_SUBSCRIPTIONS_QUERY, &[]) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
}
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse 'pg_stat_subscription' count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of active logical replication subscriptions: {}",
e
));
}
}
// Do not suspend compute if autovacuum is running
const AUTOVACUUM_COUNT_QUERY: &str =
"select count(*) from pg_stat_activity where backend_type = 'autovacuum worker'";
match cli.query_one(AUTOVACUUM_COUNT_QUERY, &[]) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
self.last_active = Some(Utc::now());
return Ok(());
};
}
Err(e) => {
return Err(anyhow::anyhow!(
"failed to parse autovacuum workers count: {}",
e
));
}
},
Err(e) => {
return Err(anyhow::anyhow!(
"failed to get list of autovacuum workers: {}",
e
));
}
}
Ok(())
}
}
@@ -412,24 +315,9 @@ fn get_backends_state_change(cli: &mut Client) -> anyhow::Result<Option<DateTime
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
let experimental = compute.has_feature(ComputeFeature::ActivityMonitorExperimental);
let now = Utc::now();
let mut monitor = ComputeMonitor {
compute,
last_active: None,
last_checked: now,
last_up: now,
active_time: None,
sessions: None,
experimental,
};
let span = span!(Level::INFO, "compute_monitor");
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || {
let _enter = span.enter();
monitor.run();
})
.spawn(move || watch_compute_activity(&compute))
.expect("cannot launch compute monitor thread")
}

View File

@@ -18,11 +18,12 @@ use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage};
use control_plane::local_env::{
EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf,
NeonLocalInitPageserverConf, SafekeeperConf,
InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf,
ObjectStorageConf, SafekeeperConf,
};
use control_plane::object_storage::OBJECT_STORAGE_DEFAULT_PORT;
use control_plane::object_storage::ObjectStorage;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
@@ -92,7 +93,7 @@ enum NeonLocalCmd {
#[command(subcommand)]
Safekeeper(SafekeeperCmd),
#[command(subcommand)]
EndpointStorage(EndpointStorageCmd),
ObjectStorage(ObjectStorageCmd),
#[command(subcommand)]
Endpoint(EndpointCmd),
#[command(subcommand)]
@@ -459,14 +460,14 @@ enum SafekeeperCmd {
#[derive(clap::Subcommand)]
#[clap(about = "Manage object storage")]
enum EndpointStorageCmd {
Start(EndpointStorageStartCmd),
Stop(EndpointStorageStopCmd),
enum ObjectStorageCmd {
Start(ObjectStorageStartCmd),
Stop(ObjectStorageStopCmd),
}
#[derive(clap::Args)]
#[clap(about = "Start object storage")]
struct EndpointStorageStartCmd {
struct ObjectStorageStartCmd {
#[clap(short = 't', long, help = "timeout until we fail the command")]
#[arg(default_value = "10s")]
start_timeout: humantime::Duration,
@@ -474,7 +475,7 @@ struct EndpointStorageStartCmd {
#[derive(clap::Args)]
#[clap(about = "Stop object storage")]
struct EndpointStorageStopCmd {
struct ObjectStorageStopCmd {
#[arg(value_enum, default_value = "fast")]
#[clap(
short = 'm',
@@ -796,9 +797,7 @@ fn main() -> Result<()> {
}
NeonLocalCmd::StorageBroker(subcmd) => rt.block_on(handle_storage_broker(&subcmd, env)),
NeonLocalCmd::Safekeeper(subcmd) => rt.block_on(handle_safekeeper(&subcmd, env)),
NeonLocalCmd::EndpointStorage(subcmd) => {
rt.block_on(handle_endpoint_storage(&subcmd, env))
}
NeonLocalCmd::ObjectStorage(subcmd) => rt.block_on(handle_object_storage(&subcmd, env)),
NeonLocalCmd::Endpoint(subcmd) => rt.block_on(handle_endpoint(&subcmd, env)),
NeonLocalCmd::Mappings(subcmd) => handle_mappings(&subcmd, env),
};
@@ -1015,8 +1014,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
}
})
.collect(),
endpoint_storage: EndpointStorageConf {
port: ENDPOINT_STORAGE_DEFAULT_PORT,
object_storage: ObjectStorageConf {
port: OBJECT_STORAGE_DEFAULT_PORT,
},
pg_distrib_dir: None,
neon_distrib_dir: None,
@@ -1736,15 +1735,12 @@ async fn handle_safekeeper(subcmd: &SafekeeperCmd, env: &local_env::LocalEnv) ->
Ok(())
}
async fn handle_endpoint_storage(
subcmd: &EndpointStorageCmd,
env: &local_env::LocalEnv,
) -> Result<()> {
use EndpointStorageCmd::*;
let storage = EndpointStorage::from_env(env);
async fn handle_object_storage(subcmd: &ObjectStorageCmd, env: &local_env::LocalEnv) -> Result<()> {
use ObjectStorageCmd::*;
let storage = ObjectStorage::from_env(env);
// In tests like test_forward_compatibility or test_graceful_cluster_restart
// old neon binaries (without endpoint_storage) are present
// old neon binaries (without object_storage) are present
if !storage.bin.exists() {
eprintln!(
"{} binary not found. Ignore if this is a compatibility test",
@@ -1754,13 +1750,13 @@ async fn handle_endpoint_storage(
}
match subcmd {
Start(EndpointStorageStartCmd { start_timeout }) => {
Start(ObjectStorageStartCmd { start_timeout }) => {
if let Err(e) = storage.start(start_timeout).await {
eprintln!("endpoint_storage start failed: {e}");
eprintln!("object_storage start failed: {e}");
exit(1);
}
}
Stop(EndpointStorageStopCmd { stop_mode }) => {
Stop(ObjectStorageStopCmd { stop_mode }) => {
let immediate = match stop_mode {
StopMode::Fast => false,
StopMode::Immediate => true,
@@ -1870,10 +1866,10 @@ async fn handle_start_all_impl(
}
js.spawn(async move {
EndpointStorage::from_env(env)
ObjectStorage::from_env(env)
.start(&retry_timeout)
.await
.map_err(|e| e.context("start endpoint_storage"))
.map_err(|e| e.context("start object_storage"))
});
})();
@@ -1972,9 +1968,9 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
}
}
let storage = EndpointStorage::from_env(env);
let storage = ObjectStorage::from_env(env);
if let Err(e) = storage.stop(immediate) {
eprintln!("endpoint_storage stop failed: {:#}", e);
eprintln!("object_storage stop failed: {:#}", e);
}
for ps_conf in &env.pageservers {

View File

@@ -9,8 +9,8 @@
mod background_process;
pub mod broker;
pub mod endpoint;
pub mod endpoint_storage;
pub mod local_env;
pub mod object_storage;
pub mod pageserver;
pub mod postgresql_conf;
pub mod safekeeper;

View File

@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use utils::auth::encode_from_key_file;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage};
use crate::object_storage::{OBJECT_STORAGE_REMOTE_STORAGE_DIR, ObjectStorage};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
@@ -72,7 +72,7 @@ pub struct LocalEnv {
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
// Control plane upcall API for pageserver: if None, we will not run storage_controller If set, this will
// be propagated into each pageserver's configuration.
@@ -110,7 +110,7 @@ pub struct OnDiskConfig {
)]
pub pageservers: Vec<PageServerConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub control_plane_compute_hook_api: Option<Url>,
@@ -144,7 +144,7 @@ pub struct NeonLocalInitConf {
pub storage_controller: Option<NeonStorageControllerConf>,
pub pageservers: Vec<NeonLocalInitPageserverConf>,
pub safekeepers: Vec<SafekeeperConf>,
pub endpoint_storage: EndpointStorageConf,
pub object_storage: ObjectStorageConf,
pub control_plane_api: Option<Url>,
pub control_plane_hooks_api: Option<Url>,
pub generate_local_ssl_certs: bool,
@@ -152,7 +152,7 @@ pub struct NeonLocalInitConf {
#[derive(Serialize, Default, Deserialize, PartialEq, Eq, Clone, Debug)]
#[serde(default)]
pub struct EndpointStorageConf {
pub struct ObjectStorageConf {
pub port: u16,
}
@@ -413,8 +413,8 @@ impl LocalEnv {
self.pg_dir(pg_version, "lib")
}
pub fn endpoint_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("endpoint_storage")
pub fn object_storage_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("object_storage")
}
pub fn pageserver_bin(&self) -> PathBuf {
@@ -450,8 +450,8 @@ impl LocalEnv {
self.base_data_dir.join("safekeepers").join(data_dir_name)
}
pub fn endpoint_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("endpoint_storage")
pub fn object_storage_data_dir(&self) -> PathBuf {
self.base_data_dir.join("object_storage")
}
pub fn get_pageserver_conf(&self, id: NodeId) -> anyhow::Result<&PageServerConf> {
@@ -615,7 +615,7 @@ impl LocalEnv {
control_plane_compute_hook_api: _,
branch_name_mappings,
generate_local_ssl_certs,
endpoint_storage,
object_storage,
} = on_disk_config;
LocalEnv {
base_data_dir: repopath.to_owned(),
@@ -632,7 +632,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings,
generate_local_ssl_certs,
endpoint_storage,
object_storage,
}
};
@@ -742,7 +742,7 @@ impl LocalEnv {
control_plane_compute_hook_api: None,
branch_name_mappings: self.branch_name_mappings.clone(),
generate_local_ssl_certs: self.generate_local_ssl_certs,
endpoint_storage: self.endpoint_storage.clone(),
object_storage: self.object_storage.clone(),
},
)
}
@@ -849,7 +849,7 @@ impl LocalEnv {
control_plane_api,
generate_local_ssl_certs,
control_plane_hooks_api,
endpoint_storage,
object_storage,
} = conf;
// Find postgres binaries.
@@ -901,7 +901,7 @@ impl LocalEnv {
control_plane_hooks_api,
branch_name_mappings: Default::default(),
generate_local_ssl_certs,
endpoint_storage,
object_storage,
};
if generate_local_ssl_certs {
@@ -929,13 +929,13 @@ impl LocalEnv {
.context("pageserver init failed")?;
}
EndpointStorage::from_env(&env)
ObjectStorage::from_env(&env)
.init()
.context("object storage init failed")?;
// setup remote remote location for default LocalFs remote storage
std::fs::create_dir_all(env.base_data_dir.join(PAGESERVER_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR))?;
std::fs::create_dir_all(env.base_data_dir.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR))?;
env.persist_config()
}

View File

@@ -1,33 +1,34 @@
use crate::background_process::{self, start_process, stop_process};
use crate::local_env::LocalEnv;
use anyhow::anyhow;
use anyhow::{Context, Result};
use camino::Utf8PathBuf;
use std::io::Write;
use std::time::Duration;
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const ENDPOINT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/endpoint_storage";
pub const ENDPOINT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub const OBJECT_STORAGE_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/object_storage";
pub const OBJECT_STORAGE_DEFAULT_PORT: u16 = 9993;
pub struct EndpointStorage {
pub struct ObjectStorage {
pub bin: Utf8PathBuf,
pub data_dir: Utf8PathBuf,
pub pemfile: Utf8PathBuf,
pub port: u16,
}
impl EndpointStorage {
pub fn from_env(env: &LocalEnv) -> EndpointStorage {
EndpointStorage {
bin: Utf8PathBuf::from_path_buf(env.endpoint_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.endpoint_storage_data_dir()).unwrap(),
impl ObjectStorage {
pub fn from_env(env: &LocalEnv) -> ObjectStorage {
ObjectStorage {
bin: Utf8PathBuf::from_path_buf(env.object_storage_bin()).unwrap(),
data_dir: Utf8PathBuf::from_path_buf(env.object_storage_data_dir()).unwrap(),
pemfile: Utf8PathBuf::from_path_buf(env.public_key_path.clone()).unwrap(),
port: env.endpoint_storage.port,
port: env.object_storage.port,
}
}
fn config_path(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.json")
self.data_dir.join("object_storage.json")
}
fn listen_addr(&self) -> Utf8PathBuf {
@@ -48,7 +49,7 @@ impl EndpointStorage {
let cfg = Cfg {
listen: self.listen_addr(),
pemfile: parent.join(self.pemfile.clone()),
local_path: parent.join(ENDPOINT_STORAGE_REMOTE_STORAGE_DIR),
local_path: parent.join(OBJECT_STORAGE_REMOTE_STORAGE_DIR),
r#type: "LocalFs".to_string(),
};
std::fs::create_dir_all(self.config_path().parent().unwrap())?;
@@ -58,19 +59,24 @@ impl EndpointStorage {
}
pub async fn start(&self, retry_timeout: &Duration) -> Result<()> {
println!("Starting endpoint_storage at {}", self.listen_addr());
println!("Starting s3 proxy at {}", self.listen_addr());
std::io::stdout().flush().context("flush stdout")?;
let process_status_check = || async {
let res = reqwest::Client::new().get(format!("http://{}/metrics", self.listen_addr()));
match res.send().await {
Ok(res) => Ok(res.status().is_success()),
Err(_) => Ok(false),
tokio::time::sleep(Duration::from_millis(500)).await;
let res = reqwest::Client::new()
.get(format!("http://{}/metrics", self.listen_addr()))
.send()
.await;
match res {
Ok(response) if response.status().is_success() => Ok(true),
Ok(_) => Err(anyhow!("Failed to query /metrics")),
Err(e) => Err(anyhow!("Failed to check node status: {e}")),
}
};
let res = start_process(
"endpoint_storage",
"object_storage",
&self.data_dir.clone().into_std_path_buf(),
&self.bin.clone().into_std_path_buf(),
vec![self.config_path().to_string()],
@@ -88,14 +94,14 @@ impl EndpointStorage {
}
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
stop_process(immediate, "endpoint_storage", &self.pid_file())
stop_process(immediate, "object_storage", &self.pid_file())
}
fn log_file(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.log")
self.data_dir.join("object_storage.log")
}
fn pid_file(&self) -> Utf8PathBuf {
self.data_dir.join("endpoint_storage.pid")
self.data_dir.join("object_storage.pid")
}
}

View File

@@ -45,7 +45,9 @@ allow = [
"ISC",
"MIT",
"MPL-2.0",
"OpenSSL",
"Unicode-3.0",
"Zlib",
]
confidence-threshold = 0.8
exceptions = [
@@ -54,6 +56,14 @@ exceptions = [
{ allow = ["Zlib"], name = "const_format", version = "*" },
]
[[licenses.clarify]]
name = "ring"
version = "*"
expression = "MIT AND ISC AND OpenSSL"
license-files = [
{ path = "LICENSE", hash = 0xbd0eed23 }
]
[licenses.private]
ignore = true
registries = []
@@ -106,11 +116,7 @@ name = "openssl"
unknown-registry = "warn"
unknown-git = "warn"
allow-registry = ["https://github.com/rust-lang/crates.io-index"]
allow-git = [
# Crate pinned to commit in origin repo due to opentelemetry version.
# TODO: Remove this once crate is fetched from crates.io again.
"https://github.com/mattiapenati/tower-otel",
]
allow-git = []
[sources.allow-org]
github = [

View File

@@ -242,22 +242,13 @@ impl RemoteExtSpec {
match self.extension_data.get(real_ext_name) {
Some(_ext_data) => {
// We have decided to use the Go naming convention due to Kubernetes.
let arch = match std::env::consts::ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
arch => arch,
};
// Construct the path to the extension archive
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
let archive_path_str = format!(
"{build_tag}/{arch}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"
);
let archive_path_str =
format!("{build_tag}/{pg_major_version}/extensions/{real_ext_name}.tar.zst");
Ok((
real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?,

View File

@@ -181,7 +181,6 @@ pub struct ConfigToml {
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -658,7 +657,6 @@ impl Default for ConfigToml {
generate_unarchival_heatmap: None,
tracing: None,
enable_tls_page_service_api: false,
dev_mode: false,
}
}
}

View File

@@ -320,35 +320,6 @@ pub struct TimelineCreateRequest {
pub mode: TimelineCreateRequestMode,
}
impl TimelineCreateRequest {
pub fn mode_tag(&self) -> &'static str {
match &self.mode {
TimelineCreateRequestMode::Branch { .. } => "branch",
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
}
}
pub fn is_import(&self) -> bool {
matches!(self.mode, TimelineCreateRequestMode::ImportPgdata { .. })
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum ShardImportStatus {
InProgress,
Done,
Error(String),
}
impl ShardImportStatus {
pub fn is_terminal(&self) -> bool {
match self {
ShardImportStatus::InProgress => false,
ShardImportStatus::Done | ShardImportStatus::Error(_) => true,
}
}
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {

View File

@@ -4,10 +4,10 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TimelineId};
use utils::id::NodeId;
use crate::controller_api::NodeRegisterRequest;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::models::LocationConfigMode;
use crate::shard::TenantShardId;
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
@@ -62,10 +62,3 @@ pub struct ValidateResponseTenant {
pub id: TenantShardId,
pub valid: bool,
}
#[derive(Serialize, Deserialize)]
pub struct PutTimelineImportStatusRequest {
pub tenant_shard_id: TenantShardId,
pub timeline_id: TimelineId,
pub status: ShardImportStatus,
}

View File

@@ -18,7 +18,8 @@ camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
hyper = { workspace = true, features = ["client"] }
futures.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = ["multipart", "stream"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
@@ -40,6 +41,10 @@ http-types.workspace = true
http-body-util.workspace = true
itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }
gcp_auth = "0.12.3"
url.workspace = true
http.workspace = true
uuid.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true

View File

@@ -14,9 +14,8 @@ use anyhow::{Context, Result};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::blob::operations::GetBlobBuilder;
use azure_storage_blobs::blob::{Blob, CopyStatus};
use azure_storage_blobs::container::operations::ListBlobsBuilder;
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use bytes::Bytes;
use futures::FutureExt;
@@ -254,15 +253,53 @@ impl AzureBlobStorage {
download
}
fn list_streaming_for_fn<T: Default + ListingCollector>(
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
res.insert(k, v);
}
res
}
fn to_download_error(error: azure_core::Error) -> DownloadError {
if let Some(http_err) = error.as_http_error() {
match http_err.status() {
StatusCode::NotFound => DownloadError::NotFound,
StatusCode::NotModified => DownloadError::Unmodified,
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
_ => DownloadError::Other(anyhow::Error::new(error)),
}
} else {
DownloadError::Other(error.into())
}
}
impl RemoteStorage for AzureBlobStorage {
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
request_kind: RequestKind,
customize_builder: impl Fn(ListBlobsBuilder) -> ListBlobsBuilder,
) -> impl Stream<Item = Result<T, DownloadError>> {
) -> impl Stream<Item = Result<Listing, DownloadError>> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
self.prefix_in_container.clone().map(|mut s| {
@@ -274,7 +311,7 @@ impl AzureBlobStorage {
});
async_stream::stream! {
let _permit = self.permit(request_kind, cancel).await?;
let _permit = self.permit(RequestKind::List, cancel).await?;
let mut builder = self.client.list_blobs();
@@ -290,8 +327,6 @@ impl AzureBlobStorage {
builder = builder.max_results(MaxResults::new(limit));
}
builder = customize_builder(builder);
let mut next_marker = None;
let mut timeout_try_cnt = 1;
@@ -347,20 +382,26 @@ impl AzureBlobStorage {
break;
};
let mut res = T::default();
let mut res = Listing::default();
next_marker = entry.continuation();
let prefix_iter = entry
.blobs
.prefixes()
.map(|prefix| self.name_to_relative_path(&prefix.name));
res.add_prefixes(self, prefix_iter);
res.prefixes.extend(prefix_iter);
let blob_iter = entry
.blobs
.blobs();
.blobs()
.map(|k| ListingObject{
key: self.name_to_relative_path(&k.name),
last_modified: k.properties.last_modified.into(),
size: k.properties.content_length,
}
);
for key in blob_iter {
res.add_blob(self, key);
res.keys.push(key);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
@@ -382,128 +423,6 @@ impl AzureBlobStorage {
}
}
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let acquire = self.concurrency_limiter.acquire(kind);
tokio::select! {
permit = acquire => Ok(permit.expect("never closed")),
_ = cancel.cancelled() => Err(Cancelled),
}
}
pub fn container_name(&self) -> &str {
&self.container_name
}
}
trait ListingCollector {
fn add_prefixes(&mut self, abs: &AzureBlobStorage, prefix_it: impl Iterator<Item = RemotePath>);
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob);
}
impl ListingCollector for Listing {
fn add_prefixes(
&mut self,
_abs: &AzureBlobStorage,
prefix_it: impl Iterator<Item = RemotePath>,
) {
self.prefixes.extend(prefix_it);
}
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
self.keys.push(ListingObject {
key: abs.name_to_relative_path(&blob.name),
last_modified: blob.properties.last_modified.into(),
size: blob.properties.content_length,
});
}
}
impl ListingCollector for crate::VersionListing {
fn add_prefixes(
&mut self,
_abs: &AzureBlobStorage,
_prefix_it: impl Iterator<Item = RemotePath>,
) {
// nothing
}
fn add_blob(&mut self, abs: &AzureBlobStorage, blob: &Blob) {
let id = crate::VersionId(blob.version_id.clone().expect("didn't find version ID"));
self.versions.push(crate::Version {
key: abs.name_to_relative_path(&blob.name),
last_modified: blob.properties.last_modified.into(),
kind: crate::VersionKind::Version(id),
});
}
}
fn to_azure_metadata(metadata: StorageMetadata) -> Metadata {
let mut res = Metadata::new();
for (k, v) in metadata.0.into_iter() {
res.insert(k, v);
}
res
}
fn to_download_error(error: azure_core::Error) -> DownloadError {
if let Some(http_err) = error.as_http_error() {
match http_err.status() {
StatusCode::NotFound => DownloadError::NotFound,
StatusCode::NotModified => DownloadError::Unmodified,
StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(error)),
_ => DownloadError::Other(anyhow::Error::new(error)),
}
} else {
DownloadError::Other(error.into())
}
}
impl RemoteStorage for AzureBlobStorage {
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let customize_builder = |builder| builder;
let kind = RequestKind::ListVersions;
self.list_streaming_for_fn(prefix, mode, max_keys, cancel, kind, customize_builder)
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> std::result::Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
builder
};
let kind = RequestKind::ListVersions;
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
}
async fn head_object(
&self,
key: &RemotePath,
@@ -613,12 +532,7 @@ impl RemoteStorage for AzureBlobStorage {
let mut builder = blob_client.get();
if let Some(ref etag) = opts.etag {
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()));
}
if let Some(ref version_id) = opts.version_id {
let version_id = azure_storage_blobs::prelude::VersionId::new(version_id.0.clone());
builder = builder.blob_versioning(version_id);
builder = builder.if_match(IfMatchCondition::NotMatch(etag.to_string()))
}
if let Some((start, end)) = opts.byte_range() {

View File

@@ -41,6 +41,7 @@ impl RemoteStorageKind {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name),
RemoteStorageKind::AzureContainer(config) => Some(&config.container_name),
RemoteStorageKind::GCS(config) => Some(&config.bucket_name),
}
}
}
@@ -51,6 +52,7 @@ impl RemoteStorageConfig {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::GCS(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
}
}
@@ -85,6 +87,9 @@ pub enum RemoteStorageKind {
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
/// Google Cloud based storage, storing all files in the GCS bucket
/// specified by the config
GCS(GCSConfig),
}
/// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write).
@@ -154,6 +159,32 @@ impl Debug for S3Config {
}
}
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct GCSConfig {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
impl Debug for GCSConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GCSConfig")
.field("bucket_name", &self.bucket_name)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AzureConfig {
@@ -268,6 +299,30 @@ timeout = '5s'";
);
}
#[test]
fn test_gcs_parsing() {
let toml = "\
bucket_name = 'foo-bar'
prefix_in_bucket = '/pageserver'
";
let config = parse(toml).unwrap();
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::GCS(GCSConfig {
bucket_name: "foo-bar".into(),
prefix_in_bucket: Some("pageserver/".into()),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
concurrency_limit: std::num::NonZero::new(100).unwrap(),
}),
timeout: Duration::from_secs(120),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\

View File

@@ -0,0 +1,978 @@
#![allow(dead_code)]
#![allow(unused)]
use crate::config::GCSConfig;
use crate::error::Cancelled;
pub(super) use crate::metrics::RequestKind;
use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests};
use crate::{
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, GCS_SCOPES, Listing, ListingMode,
ListingObject, MAX_KEYS_PER_DELETE_GCS, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath,
RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
};
use anyhow::Context;
use azure_core::Etag;
use bytes::Bytes;
use bytes::BytesMut;
use chrono::DateTime;
use futures::stream::Stream;
use futures::stream::TryStreamExt;
use futures_util::StreamExt;
use gcp_auth::{Token, TokenProvider};
use http::Method;
use http::StatusCode;
use reqwest::{Client, header};
use scopeguard::ScopeGuard;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::num::NonZeroU32;
use std::pin::{Pin, pin};
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_util::sync::CancellationToken;
use tracing;
use url::Url;
use uuid::Uuid;
// ---------
pub struct GCSBucket {
token_provider: Arc<dyn TokenProvider>,
bucket_name: String,
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
concurrency_limiter: ConcurrencyLimiter,
pub timeout: Duration,
}
struct GetObjectRequest {
bucket: String,
key: String,
etag: Option<String>,
range: Option<String>,
}
// ---------
impl GCSBucket {
pub async fn new(remote_storage_config: &GCSConfig, timeout: Duration) -> anyhow::Result<Self> {
tracing::debug!(
"creating remote storage for gcs bucket {}",
remote_storage_config.bucket_name
);
// clean up 'prefix_in_bucket' if user provides '/pageserver' or 'pageserver/'
let prefix_in_bucket = remote_storage_config
.prefix_in_bucket
.as_deref()
.map(|prefix| {
let mut prefix = prefix;
while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
prefix = &prefix[1..];
}
let mut prefix = prefix.to_string();
if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
prefix.pop();
}
prefix
});
// get GOOGLE_APPLICATION_CREDENTIALS
let provider = gcp_auth::provider().await?;
Ok(GCSBucket {
token_provider: Arc::clone(&provider),
bucket_name: remote_storage_config.bucket_name.clone(),
prefix_in_bucket,
timeout,
max_keys_per_list_response: remote_storage_config.max_keys_per_list_response,
concurrency_limiter: ConcurrencyLimiter::new(
remote_storage_config.concurrency_limit.get(),
),
})
}
// convert `RemotePath` -> `String`
pub fn relative_path_to_gcs_object(&self, path: &RemotePath) -> String {
let path_string = path.get_path().as_str();
match &self.prefix_in_bucket {
Some(prefix) => prefix.clone() + "/" + path_string,
None => path_string.to_string(),
}
}
// convert `String` -> `RemotePath`
pub fn gcs_object_to_relative_path(&self, key: &str) -> RemotePath {
let relative_path =
match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
Some(stripped) => stripped,
// we rely on GCS to return properly prefixed paths
// for requests with a certain prefix
None => panic!(
"Key {} does not start with bucket prefix {:?}",
key, self.prefix_in_bucket
),
};
RemotePath(
relative_path
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
.collect(),
)
}
pub fn bucket_name(&self) -> &str {
&self.bucket_name
}
fn max_keys_per_delete(&self) -> usize {
MAX_KEYS_PER_DELETE_GCS
}
async fn permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::SemaphorePermit<'_>, Cancelled> {
let started_at = start_counting_cancelled_wait(kind);
let acquire = self.concurrency_limiter.acquire(kind);
let permit = tokio::select! {
permit = acquire => permit.expect("semaphore is never closed"),
_ = cancel.cancelled() => return Err(Cancelled),
};
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.wait_seconds
.observe_elapsed(kind, started_at);
Ok(permit)
}
async fn owned_permit(
&self,
kind: RequestKind,
cancel: &CancellationToken,
) -> Result<tokio::sync::OwnedSemaphorePermit, Cancelled> {
let started_at = start_counting_cancelled_wait(kind);
let acquire = self.concurrency_limiter.acquire_owned(kind);
let permit = tokio::select! {
permit = acquire => permit.expect("semaphore is never closed"),
_ = cancel.cancelled() => return Err(Cancelled),
};
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.wait_seconds
.observe_elapsed(kind, started_at);
Ok(permit)
}
async fn put_object(
&self,
byte_stream: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
fs_size: usize,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
// https://cloud.google.com/storage/docs/xml-api/reference-headers#chunked
let mut headers = header::HeaderMap::new();
headers.insert(
header::TRANSFER_ENCODING,
header::HeaderValue::from_static("chunked"),
);
// TODO Check if we need type 'multipart/related' file to attach metadata like Neon's S3
// `.upload()` does.
// https://cloud.google.com/storage/docs/uploading-objects#uploading-an-object
let upload_uri = format!(
"https://storage.googleapis.com/upload/storage/v1/b/{}/o/?uploadType=media&name={}",
self.bucket_name.clone(),
self.relative_path_to_gcs_object(to).trim_start_matches("/")
);
let upload = Client::new()
.post(upload_uri)
.body(reqwest::Body::wrap_stream(byte_stream))
.headers(headers)
.bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str())
.send();
// We await it in a race against the Tokio timeout
let upload = tokio::time::timeout(self.timeout, upload);
let res = tokio::select! {
res = upload => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
match res {
Ok(Ok(res)) => {
if !res.status().is_success() {
match res.status() {
StatusCode::NOT_FOUND => {
return Err(anyhow::anyhow!("GCS error: not found \n\t {:?}", res));
}
_ => {
return Err(anyhow::anyhow!(
"GCS PUT response contained no response body \n\t {:?}",
res
));
}
}
} else {
Ok(())
}
}
Ok(Err(reqw)) => Err(reqw.into()),
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
}
}
async fn copy(
&self,
from: String,
to: String,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Copy;
let _permit = self.permit(kind, cancel).await?;
let timeout = tokio::time::sleep(self.timeout);
let started_at = start_measuring_requests(kind);
let copy_uri = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
self.bucket_name.clone(),
&from,
self.bucket_name.clone(),
&to
);
let op = Client::new()
.post(copy_uri)
.bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str())
.send();
let res = tokio::select! {
res = op => res,
_ = timeout => return Err(TimeoutOrCancel::Timeout.into()),
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &res, started_at);
res?;
Ok(())
}
async fn delete_oids(
&self,
delete_objects: &[String],
cancel: &CancellationToken,
_permit: &tokio::sync::SemaphorePermit<'_>,
) -> anyhow::Result<()> {
let kind = RequestKind::Delete;
let mut cancel = std::pin::pin!(cancel.cancelled());
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_GCS) {
let started_at = start_measuring_requests(kind);
// Use this to report keys that didn't delete based on 'content_id'
let mut delete_objects_status = HashMap::new();
let mut form = reqwest::multipart::Form::new();
let bulk_uri = "https://storage.googleapis.com/batch/storage/v1";
for (index, path) in delete_objects.iter().enumerate() {
delete_objects_status.insert(index + 1, path.clone());
let path_to_delete: String =
url::form_urlencoded::byte_serialize(path.trim_start_matches("/").as_bytes())
.collect();
let delete_req = format!(
"
DELETE /storage/v1/b/{}/o/{} HTTP/1.1\r\n\
Content-Type: application/json\r\n\
accept: application/json\r\n\
content-length: 0\r\n
",
self.bucket_name.clone(),
path_to_delete
)
.trim()
.to_string();
let content_id = format!("<{}+{}>", Uuid::new_v4(), index + 1);
let mut part_headers = header::HeaderMap::new();
part_headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/http"),
);
part_headers.insert(
header::TRANSFER_ENCODING,
header::HeaderValue::from_static("binary"),
);
part_headers.insert(
header::HeaderName::from_static("content-id"),
header::HeaderValue::from_str(&content_id)?,
);
let part = reqwest::multipart::Part::text(delete_req).headers(part_headers);
form = form.part(format!("request-{}", index), part);
}
let mut headers = header::HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_str(&format!(
"multipart/mixed; boundary={}",
form.boundary()
))?,
);
let req = Client::new()
.post(bulk_uri)
.bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str())
.multipart(form)
.headers(headers)
.send();
let resp = tokio::select! {
resp = req => resp,
_ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()),
_ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()),
};
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);
let resp = resp.context("request deletion")?;
crate::metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);
let res_headers = resp.headers().to_owned();
let boundary = res_headers
.get(header::CONTENT_TYPE)
.unwrap()
.to_str()?
.split("=")
.last()
.unwrap();
let res_body = resp.text().await?;
let parsed: HashMap<String, String> = res_body
.split(&format!("--{}", boundary))
.filter_map(|c| {
let mut lines = c.lines();
let id = lines.find_map(|line| {
line.strip_prefix("Content-ID:")
.and_then(|suf| suf.split('+').last())
.and_then(|suf| suf.split('>').next())
.map(|x| x.trim().to_string())
});
let status_code = lines.find_map(|line| {
// Not sure if this protocol version shouldn't be so specific
line.strip_prefix("HTTP/1.1")
.and_then(|x| x.split_whitespace().next())
.map(|x| x.trim().to_string())
});
id.zip(status_code)
})
.collect();
// Gather failures
let errors: HashMap<usize, &String> = parsed
.iter()
.filter_map(|(x, y)| {
if y.chars().next() != Some('2') {
x.parse::<usize>().ok().map(|v| (v, y))
} else {
None
}
})
.collect();
if !errors.is_empty() {
// Report 10 of them like S3
const LOG_UP_TO_N_ERRORS: usize = 10;
for (id, code) in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed with code: {}",
delete_objects_status.get(id).unwrap(),
code
);
}
return Err(anyhow::anyhow!(
"Failed to delete {}/{} objects",
errors.len(),
chunk.len(),
));
}
}
Ok(())
}
async fn list_objects_v2(&self, list_uri: String) -> anyhow::Result<reqwest::RequestBuilder> {
let res = Client::new()
.get(list_uri)
.bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str());
Ok(res)
}
// need a 'bucket', a 'key', and a bytes 'range'.
async fn get_object(
&self,
request: GetObjectRequest,
cancel: &CancellationToken,
) -> anyhow::Result<Download, DownloadError> {
let kind = RequestKind::Get;
let permit = self.owned_permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let encoded_path: String =
url::form_urlencoded::byte_serialize(request.key.as_bytes()).collect();
/// We do this in two parts:
/// 1. Serialize the metadata of the first request to get Etag, last modified, etc
/// 2. We do not .await the second request pass on the pinned stream to the 'get_object'
/// caller
// 1. Serialize Metadata in initial request
let metadata_uri_mod = "alt=json";
let download_uri = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o/{}?{}",
self.bucket_name.clone(),
encoded_path,
metadata_uri_mod
);
let res = Client::new()
.get(download_uri)
.bearer_auth(
self.token_provider
.token(GCS_SCOPES)
.await
.map_err(|e: gcp_auth::Error| DownloadError::Other(e.into()))?
.as_str(),
)
.send()
.await
.map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?;
if !res.status().is_success() {
match res.status() {
StatusCode::NOT_FOUND => return Err(DownloadError::NotFound),
_ => {
return Err(DownloadError::Other(anyhow::anyhow!(
"GCS GET resposne contained no response body"
)));
}
}
};
let body = res
.text()
.await
.map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?;
let resp: GCSObject = serde_json::from_str(&body)
.map_err(|e: serde_json::Error| DownloadError::Other(e.into()))?;
// 2. Byte Stream request
let mut headers = header::HeaderMap::new();
headers.insert(header::RANGE, header::HeaderValue::from_static("bytes=0-"));
let encoded_path: String =
url::form_urlencoded::byte_serialize(request.key.as_bytes()).collect();
let stream_uri_mod = "alt=media";
let stream_uri = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o/{}?{}",
self.bucket_name.clone(),
encoded_path,
stream_uri_mod
);
let mut req = Client::new()
.get(stream_uri)
.headers(headers)
.bearer_auth(
self.token_provider
.token(GCS_SCOPES)
.await
.map_err(|e: gcp_auth::Error| DownloadError::Other(e.into()))?
.as_str(),
)
.send();
let get_object = tokio::select! {
res = req => res,
_ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout),
_ = cancel.cancelled() => return Err(DownloadError::Cancelled),
};
let started_at = ScopeGuard::into_inner(started_at);
let object_output = match get_object {
Ok(object_output) => {
if !object_output.status().is_success() {
match object_output.status() {
StatusCode::NOT_FOUND => return Err(DownloadError::NotFound),
_ => {
return Err(DownloadError::Other(anyhow::anyhow!(
"GCS GET resposne contained no response body"
)));
}
}
} else {
object_output
}
}
Err(e) => {
crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
kind,
AttemptOutcome::Err,
started_at,
);
return Err(DownloadError::Other(
anyhow::Error::new(e).context("download s3 object"),
));
}
};
let remaining = self.timeout.saturating_sub(started_at.elapsed());
let metadata = resp.metadata.map(StorageMetadata);
let etag = resp
.etag
.ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))?
.into();
let last_modified: SystemTime = resp
.updated
.and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
.map(|s| s.into())
.unwrap_or(SystemTime::now());
// But let data stream pass through
Ok(Download {
download_stream: Box::pin(object_output.bytes_stream().map(|item| {
item.map_err(|e: reqwest::Error| std::io::Error::new(std::io::ErrorKind::Other, e))
})),
etag,
last_modified,
metadata,
})
}
}
impl RemoteStorage for GCSBucket {
// ---------------------------------------
// Neon wrappers for GCS client functions
// ---------------------------------------
fn list_streaming(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
let kind = RequestKind::List;
let mut max_keys = max_keys.map(|mk| mk.get() as i32);
let list_prefix = prefix
.map(|p| self.relative_path_to_gcs_object(p))
.or_else(|| {
self.prefix_in_bucket.clone().map(|mut s| {
s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
s
})
})
.unwrap();
let request_max_keys = self
.max_keys_per_list_response
.into_iter()
.chain(max_keys.into_iter())
.min()
// https://cloud.google.com/storage/docs/json_api/v1/objects/list?hl=en#parameters
// TODO set this to default
.unwrap_or(1000);
// We pass URI in to `list_objects_v2` as we'll modify it with `NextPageToken`, hence
// `mut`
let mut list_uri = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o?prefix={}&maxResults={}",
self.bucket_name.clone(),
list_prefix,
request_max_keys,
);
// on ListingMode:
// https://github.com/neondatabase/neon/blob/edc11253b65e12a10843711bd88ad277511396d7/libs/remote_storage/src/lib.rs#L158C1-L164C2
if let ListingMode::WithDelimiter = mode {
list_uri.push_str(&format!(
"&delimiter={}",
REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()
));
}
async_stream::stream! {
let mut continuation_token = None;
'outer: loop {
let started_at = start_measuring_requests(kind);
let request = self.list_objects_v2(list_uri.clone())
.await
.map_err(DownloadError::Other)?
.send();
// this is like `await`
let response = tokio::select! {
res = request => Ok(res),
_ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}?;
// just mapping our `Result' error variant's type.
let response = response
.context("Failed to list GCS prefixes")
.map_err(DownloadError::Other);
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &response, started_at);
let response = match response {
Ok(response) => response,
Err(e) => {
// The error is potentially retryable, so we must rewind the loop after yielding.
yield Err(e);
continue 'outer;
},
};
let body = response.text()
.await
.map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?;
let resp: GCSListResponse = serde_json::from_str(&body).map_err(|e: serde_json::Error| DownloadError::Other(e.into()))?;
let prefixes = resp.common_prefixes();
let keys = resp.contents();
tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len());
let mut result = Listing::default();
for res in keys.iter() {
let last_modified: SystemTime = res.updated.clone()
.and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
.map(|s| s.into())
.unwrap_or(SystemTime::now());
let size = res.size.clone().unwrap_or("0".to_string()).parse::<u64>().unwrap();
let key = res.name.clone();
result.keys.push(
ListingObject{
key: self.gcs_object_to_relative_path(&key),
last_modified,
size,
}
);
if let Some(mut mk) = max_keys {
assert!(mk > 0);
mk -= 1;
if mk == 0 {
tracing::debug!("reached limit set by max_keys");
yield Ok(result);
break 'outer;
}
max_keys = Some(mk);
};
}
result.prefixes.extend(prefixes.iter().filter_map(|p| {
Some(
self.gcs_object_to_relative_path(
p.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR)
),
)
}));
yield Ok(result);
continuation_token = match resp.next_page_token {
Some(token) => {
list_uri = list_uri + "&pageToken=" + &token;
Some(token)
},
None => break
}
}
}
}
async fn head_object(
&self,
key: &RemotePath,
cancel: &CancellationToken,
) -> Result<ListingObject, DownloadError> {
let kind = RequestKind::Head;
todo!();
}
async fn upload(
&self,
from: impl Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Put;
let _permit = self.permit(kind, cancel).await?;
let started_at = start_measuring_requests(kind);
let upload = self.put_object(from, from_size_bytes, to, cancel);
let upload = tokio::time::timeout(self.timeout, upload);
let res = tokio::select! {
res = upload => res,
_ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()),
};
if let Ok(inner) = &res {
// do not incl. timeouts as errors in metrics but cancellations
let started_at = ScopeGuard::into_inner(started_at);
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, inner, started_at);
}
match res {
Ok(Ok(_put)) => Ok(()),
Ok(Err(sdk)) => Err(sdk.into()),
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
}
}
async fn copy(
&self,
from: &RemotePath,
to: &RemotePath,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Copy;
let _permit = self.permit(kind, cancel).await?;
let timeout = tokio::time::sleep(self.timeout);
let started_at = start_measuring_requests(kind);
// we need to specify bucket_name as a prefix
let copy_source = format!(
"{}/{}",
self.bucket_name,
self.relative_path_to_gcs_object(from)
);
todo!();
}
async fn download(
&self,
from: &RemotePath,
opts: &DownloadOpts,
cancel: &CancellationToken,
) -> Result<Download, DownloadError> {
// if prefix is not none then download file `prefix/from`
// if prefix is none then download file `from`
self.get_object(
GetObjectRequest {
bucket: self.bucket_name.clone(),
key: self
.relative_path_to_gcs_object(from)
.trim_start_matches("/")
.to_string(),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
},
cancel,
)
.await
}
async fn delete_objects(
&self,
paths: &[RemotePath],
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::Delete;
let permit = self.permit(kind, cancel).await?;
let mut delete_objects: Vec<String> = Vec::with_capacity(paths.len());
let delete_objects: Vec<String> = paths
.iter()
.map(|i| self.relative_path_to_gcs_object(i))
.collect();
self.delete_oids(&delete_objects, cancel, &permit).await
}
fn max_keys_per_delete(&self) -> usize {
MAX_KEYS_PER_DELETE_GCS
}
async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> {
let paths = std::array::from_ref(path);
self.delete_objects(paths, cancel).await
}
async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
Ok(())
}
}
// ---------
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct GCSListResponse {
#[serde(rename = "nextPageToken")]
pub next_page_token: Option<String>,
pub items: Option<Vec<GCSObject>>,
pub prefixes: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct GCSObject {
pub name: String,
pub bucket: String,
pub generation: String,
pub metageneration: String,
#[serde(rename = "contentType")]
pub content_type: Option<String>,
#[serde(rename = "storageClass")]
pub storage_class: String,
pub size: Option<String>,
#[serde(rename = "md5Hash")]
pub md5_hash: Option<String>,
pub crc32c: String,
pub etag: Option<String>,
#[serde(rename = "timeCreated")]
pub time_created: String,
pub updated: Option<String>,
#[serde(rename = "timeStorageClassUpdated")]
pub time_storage_class_updated: String,
#[serde(rename = "timeFinalized")]
pub time_finalized: String,
pub metadata: Option<HashMap<String, String>>,
}
impl GCSListResponse {
pub fn contents(&self) -> &[GCSObject] {
self.items.as_deref().unwrap_or_default()
}
pub fn common_prefixes(&self) -> &[String] {
self.prefixes.as_deref().unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use gcp_auth;
use std::num::NonZero;
use std::pin::pin;
use std::sync::Arc;
const BUFFER_SIZE: usize = 32 * 1024;
// TODO what does Neon want here for integration tests?
const BUCKET: &str = "https://storage.googleapis.com/storage/v1/b/my-test-bucket";
#[tokio::test]
async fn list_returns_keys_from_bucket() {
let provider = gcp_auth::provider().await.unwrap();
let gcs = GCSBucket {
token_provider: Arc::clone(&provider),
bucket_name: BUCKET.to_string(),
prefix_in_bucket: None,
max_keys_per_list_response: Some(100),
concurrency_limiter: ConcurrencyLimiter::new(100),
timeout: std::time::Duration::from_secs(120),
};
let cancel = CancellationToken::new();
let remote_prefix = "box/tiff/2023/TN".to_string();
let max_keys: u32 = 100;
let mut stream = pin!(gcs.list_streaming(Some(remote_prefix), NonZero::new(max_keys)));
let mut combined = stream
.next()
.await
.expect("At least one item required")
.unwrap();
while let Some(list) = stream.next().await {
let list = list.unwrap();
combined.keys.extend(list.keys.into_iter());
combined.prefixes.extend_from_slice(&list.prefixes);
}
for key in combined.keys.iter() {
println!("Item: {} -- {:?}", key.key, key.last_modified);
}
assert_ne!(0, combined.keys.len());
}
}

View File

@@ -12,6 +12,7 @@
mod azure_blob;
mod config;
mod error;
mod gcs_bucket;
mod local_fs;
mod metrics;
mod s3_bucket;
@@ -42,6 +43,7 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
pub use self::azure_blob::AzureBlobStorage;
pub use self::gcs_bucket::GCSBucket;
pub use self::local_fs::LocalFs;
pub use self::s3_bucket::S3Bucket;
pub use self::simulate_failures::UnreliableWrapper;
@@ -80,8 +82,12 @@ pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
pub const MAX_KEYS_PER_DELETE_GCS: usize = 1000;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
const GCS_SCOPES: &[&str] = &["https://www.googleapis.com/auth/cloud-platform"];
/// Path on the remote storage, relative to some inner prefix.
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
@@ -176,32 +182,6 @@ pub struct Listing {
pub keys: Vec<ListingObject>,
}
#[derive(Default)]
pub struct VersionListing {
pub versions: Vec<Version>,
}
pub struct Version {
pub key: RemotePath,
pub last_modified: SystemTime,
pub kind: VersionKind,
}
impl Version {
pub fn version_id(&self) -> Option<&VersionId> {
match &self.kind {
VersionKind::Version(id) => Some(id),
VersionKind::DeletionMarker => None,
}
}
}
#[derive(Debug)]
pub enum VersionKind {
DeletionMarker,
Version(VersionId),
}
/// Options for downloads. The default value is a plain GET.
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
@@ -212,8 +192,6 @@ pub struct DownloadOpts {
/// The end of the byte range to download, or unbounded. Must be after the
/// start bound.
pub byte_end: Bound<u64>,
/// Optionally request a specific version of a key
pub version_id: Option<VersionId>,
/// Indicate whether we're downloading something small or large: this indirectly controls
/// timeouts: for something like an index/manifest/heatmap, we should time out faster than
/// for layer files
@@ -225,16 +203,12 @@ pub enum DownloadKind {
Small,
}
#[derive(Debug, Clone)]
pub struct VersionId(pub String);
impl Default for DownloadOpts {
fn default() -> Self {
Self {
etag: Default::default(),
byte_start: Bound::Unbounded,
byte_end: Bound::Unbounded,
version_id: None,
kind: DownloadKind::Large,
}
}
@@ -327,14 +301,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
Ok(combined)
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<VersionListing, DownloadError>;
/// Obtain metadata information about an object.
async fn head_object(
&self,
@@ -479,6 +445,7 @@ pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
AwsS3(Arc<S3Bucket>),
AzureBlob(Arc<AzureBlobStorage>),
Unreliable(Other),
GCS(Arc<GCSBucket>),
}
impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
@@ -495,6 +462,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::GCS(s) => s.list(prefix, mode, max_keys, cancel).await,
}
}
@@ -512,22 +480,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
}
}
// See [`RemoteStorage::list_versions`].
pub async fn list_versions<'a>(
&'a self,
prefix: Option<&'a RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &'a CancellationToken,
) -> Result<VersionListing, DownloadError> {
match self {
Self::LocalFs(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::GCS(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
}
}
@@ -542,6 +495,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
Self::GCS(_) => todo!(),
}
}
@@ -559,6 +513,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::GCS(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
}
}
@@ -574,6 +529,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.download(from, opts, cancel).await,
Self::AzureBlob(s) => s.download(from, opts, cancel).await,
Self::Unreliable(s) => s.download(from, opts, cancel).await,
Self::GCS(s) => s.download(from, opts, cancel).await,
}
}
@@ -588,6 +544,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete(path, cancel).await,
Self::AzureBlob(s) => s.delete(path, cancel).await,
Self::Unreliable(s) => s.delete(path, cancel).await,
Self::GCS(s) => s.delete(path, cancel).await,
}
}
@@ -602,6 +559,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
Self::GCS(s) => s.delete_objects(paths, cancel).await,
}
}
@@ -612,6 +570,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.max_keys_per_delete(),
Self::AzureBlob(s) => s.max_keys_per_delete(),
Self::Unreliable(s) => s.max_keys_per_delete(),
Self::GCS(s) => s.max_keys_per_delete(),
}
}
@@ -626,6 +585,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
Self::GCS(_) => todo!(),
}
}
@@ -641,6 +601,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.copy(from, to, cancel).await,
Self::AzureBlob(s) => s.copy(from, to, cancel).await,
Self::Unreliable(s) => s.copy(from, to, cancel).await,
Self::GCS(_) => todo!(),
}
}
@@ -669,17 +630,25 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::GCS(_) => todo!(),
}
}
}
impl GenericRemoteStorage {
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
info!("RemoteStorageConfig: {:?}", storage_config);
let timeout = storage_config.timeout;
// If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
// If someone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
info!(
"RemoteStorageConfig's storage attribute: {:?}",
storage_config.storage
);
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
info!("Using fs root '{path}' as a remote storage");
@@ -717,6 +686,16 @@ impl GenericRemoteStorage {
small_timeout,
)?))
}
RemoteStorageKind::GCS(gcs_config) => {
let google_application_credentials =
std::env::var("GOOGLE_APPLICATION_CREDENTIALS")
.unwrap_or_else(|_| "<none>".into());
info!(
"Using gcs bucket '{}' as a remote storage, prefix in bucket: '{:?}', GOOGLE_APPLICATION_CREDENTIALS: {google_application_credentials }",
gcs_config.bucket_name, gcs_config.prefix_in_bucket
);
Self::GCS(Arc::new(GCSBucket::new(gcs_config, timeout).await?))
}
})
}
@@ -746,6 +725,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => Some(s.bucket_name()),
Self::AzureBlob(s) => Some(s.container_name()),
Self::Unreliable(_s) => None,
Self::GCS(s) => Some(s.bucket_name()),
}
}
}
@@ -783,7 +763,6 @@ impl ConcurrencyLimiter {
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
RequestKind::Head => &self.read,
RequestKind::ListVersions => &self.read,
}
}

View File

@@ -445,16 +445,6 @@ impl RemoteStorage for LocalFs {
}
}
async fn list_versions(
&self,
_prefix: Option<&RemotePath>,
_mode: ListingMode,
_max_keys: Option<NonZeroU32>,
_cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
unimplemented!()
}
async fn head_object(
&self,
key: &RemotePath,

View File

@@ -14,7 +14,6 @@ pub(crate) enum RequestKind {
Copy = 4,
TimeTravel = 5,
Head = 6,
ListVersions = 7,
}
use RequestKind::*;
@@ -30,7 +29,6 @@ impl RequestKind {
Copy => "copy_object",
TimeTravel => "time_travel_recover",
Head => "head_object",
ListVersions => "list_versions",
}
}
const fn as_index(&self) -> usize {
@@ -38,10 +36,7 @@ impl RequestKind {
}
}
const REQUEST_KIND_LIST: &[RequestKind] =
&[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions];
const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len();
const REQUEST_KIND_COUNT: usize = 7;
pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
impl<C> RequestTyped<C> {
@@ -50,11 +45,12 @@ impl<C> RequestTyped<C> {
}
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
let mut it = REQUEST_KIND_LIST.iter();
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(*next)
f(next)
});
if let Some(next) = it.next() {

View File

@@ -21,8 +21,9 @@ use aws_sdk_s3::config::{AsyncSleep, IdentityCache, Region, SharedAsyncSleep};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::operation::head_object::HeadObjectError;
use aws_sdk_s3::types::{Delete, ObjectIdentifier, StorageClass};
use aws_sdk_s3::types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion, StorageClass};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_types::DateTime;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;
use aws_smithy_types::date_time::ConversionError;
@@ -45,7 +46,7 @@ use crate::support::PermitCarrying;
use crate::{
ConcurrencyLimiter, Download, DownloadError, DownloadOpts, Listing, ListingMode, ListingObject,
MAX_KEYS_PER_DELETE_S3, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, RemoteStorage,
TimeTravelError, TimeoutOrCancel, Version, VersionId, VersionKind, VersionListing,
TimeTravelError, TimeoutOrCancel,
};
/// AWS S3 storage.
@@ -65,7 +66,6 @@ struct GetObjectRequest {
key: String,
etag: Option<String>,
range: Option<String>,
version_id: Option<String>,
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
@@ -251,7 +251,6 @@ impl S3Bucket {
.get_object()
.bucket(request.bucket)
.key(request.key)
.set_version_id(request.version_id)
.set_range(request.range);
if let Some(etag) = request.etag {
@@ -406,124 +405,6 @@ impl S3Bucket {
Ok(())
}
async fn list_versions_with_permit(
&self,
_permit: &tokio::sync::SemaphorePermit<'_>,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
let mut versions_and_deletes = Vec::new();
loop {
let response = backoff::retry(
|| async {
let mut request = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone());
if let ListingMode::WithDelimiter = mode {
request = request.delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string());
}
let op = request.send();
tokio::select! {
res = op => res.map_err(|e| DownloadError::Other(e.into())),
_ = cancel.cancelled() => Err(DownloadError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions",
cancel,
)
.await
.ok_or_else(|| DownloadError::Cancelled)
.and_then(|x| x)?;
tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
response.version_id_marker,
response.key_marker
);
let versions = response
.versions
.unwrap_or_default()
.into_iter()
.map(|version| {
let key = version.key.expect("response does not contain a key");
let key = self.s3_object_to_relative_path(&key);
let version_id = VersionId(version.version_id.expect("needing version id"));
let last_modified =
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
Ok(Version {
key,
last_modified,
kind: crate::VersionKind::Version(version_id),
})
});
let deletes = response
.delete_markers
.unwrap_or_default()
.into_iter()
.map(|version| {
let key = version.key.expect("response does not contain a key");
let key = self.s3_object_to_relative_path(&key);
let last_modified =
SystemTime::try_from(version.last_modified.expect("no last_modified"))?;
Ok(Version {
key,
last_modified,
kind: crate::VersionKind::DeletionMarker,
})
});
itertools::process_results(versions.chain(deletes), |n_vds| {
versions_and_deletes.extend(n_vds)
})
.map_err(DownloadError::Other)?;
fn none_if_empty(v: Option<String>) -> Option<String> {
v.filter(|v| !v.is_empty())
}
version_id_marker = none_if_empty(response.next_version_id_marker);
key_marker = none_if_empty(response.next_key_marker);
if version_id_marker.is_none() {
// The final response is not supposed to be truncated
if response.is_truncated.unwrap_or_default() {
return Err(DownloadError::Other(anyhow::anyhow!(
"Received truncated ListObjectVersions response for prefix={prefix:?}"
)));
}
break;
}
if let Some(max_keys) = max_keys {
if versions_and_deletes.len() >= max_keys.get().try_into().unwrap() {
return Err(DownloadError::Other(anyhow::anyhow!("too many versions")));
}
}
}
Ok(VersionListing {
versions: versions_and_deletes,
})
}
pub fn bucket_name(&self) -> &str {
&self.bucket_name
}
@@ -740,19 +621,6 @@ impl RemoteStorage for S3Bucket {
}
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
let kind = RequestKind::ListVersions;
let permit = self.permit(kind, cancel).await?;
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
&self,
key: &RemotePath,
@@ -933,7 +801,6 @@ impl RemoteStorage for S3Bucket {
key: self.relative_path_to_s3_object(from),
etag: opts.etag.as_ref().map(|e| e.to_string()),
range: opts.byte_range_header(),
version_id: opts.version_id.as_ref().map(|v| v.0.to_owned()),
},
cancel,
)
@@ -978,25 +845,94 @@ impl RemoteStorage for S3Bucket {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
let timestamp = DateTime::from(timestamp);
let done_if_after = DateTime::from(done_if_after);
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
let mut key_marker = None;
let mut version_id_marker = None;
let mut versions_and_deletes = Vec::new();
loop {
let response = backoff::retry(
|| async {
let op = self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.set_key_marker(key_marker.clone())
.set_version_id_marker(version_id_marker.clone())
.send();
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions for time_travel_recover",
cancel,
)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),
DownloadError::Cancelled => TimeTravelError::Cancelled,
other => TimeTravelError::Other(other.into()),
})?;
let versions_and_deletes = version_listing.versions;
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}",
response.version_id_marker,
response.key_marker
);
let versions = response
.versions
.unwrap_or_default()
.into_iter()
.map(VerOrDelete::from_version);
let deletes = response
.delete_markers
.unwrap_or_default()
.into_iter()
.map(VerOrDelete::from_delete_marker);
itertools::process_results(versions.chain(deletes), |n_vds| {
versions_and_deletes.extend(n_vds)
})
.map_err(TimeTravelError::Other)?;
fn none_if_empty(v: Option<String>) -> Option<String> {
v.filter(|v| !v.is_empty())
}
version_id_marker = none_if_empty(response.next_version_id_marker);
key_marker = none_if_empty(response.next_key_marker);
if version_id_marker.is_none() {
// The final response is not supposed to be truncated
if response.is_truncated.unwrap_or_default() {
return Err(TimeTravelError::Other(anyhow::anyhow!(
"Received truncated ListObjectVersions response for prefix={prefix:?}"
)));
}
break;
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: usize = 100_000;
if versions_and_deletes.len() >= COMPLEXITY_LIMIT {
return Err(TimeTravelError::TooManyVersions);
}
}
tracing::info!(
"Built list for time travel with {} versions and deletions",
@@ -1012,26 +948,24 @@ impl RemoteStorage for S3Bucket {
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
for vd in &versions_and_deletes {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
let VerOrDelete {
version_id, key, ..
} = &vd;
if version_id == "null" {
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"
)));
}
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
tracing::trace!(
"Parsing version key={key} version_id={version_id} kind={:?}",
vd.kind
);
vds_for_key.entry(key).or_default().push(vd);
}
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
for (key, versions) in vds_for_key {
let last_vd = versions.last().unwrap();
let key = self.relative_path_to_s3_object(key);
if last_vd.last_modified > done_if_after {
tracing::trace!("Key {key} has version later than done_if_after, skipping");
continue;
@@ -1056,11 +990,11 @@ impl RemoteStorage for S3Bucket {
do_delete = true;
} else {
match &versions[version_to_restore_to - 1] {
Version {
kind: VersionKind::Version(version_id),
VerOrDelete {
kind: VerOrDeleteKind::Version,
version_id,
..
} => {
let version_id = &version_id.0;
tracing::trace!("Copying old version {version_id} for {key}...");
// Restore the state to the last version by copying
let source_id =
@@ -1072,7 +1006,7 @@ impl RemoteStorage for S3Bucket {
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(&key)
.key(key)
.set_storage_class(self.upload_storage_class.clone())
.copy_source(&source_id)
.send();
@@ -1093,8 +1027,8 @@ impl RemoteStorage for S3Bucket {
.and_then(|x| x)?;
tracing::info!(%version_id, %key, "Copied old version in S3");
}
Version {
kind: VersionKind::DeletionMarker,
VerOrDelete {
kind: VerOrDeleteKind::DeleteMarker,
..
} => {
do_delete = true;
@@ -1102,7 +1036,7 @@ impl RemoteStorage for S3Bucket {
}
};
if do_delete {
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
if matches!(last_vd.kind, VerOrDeleteKind::DeleteMarker) {
// Key has since been deleted (but there was some history), no need to do anything
tracing::trace!("Key {key} already deleted, skipping.");
} else {
@@ -1130,6 +1064,62 @@ impl RemoteStorage for S3Bucket {
}
}
// Save RAM and only store the needed data instead of the entire ObjectVersion/DeleteMarkerEntry
struct VerOrDelete {
kind: VerOrDeleteKind,
last_modified: DateTime,
version_id: String,
key: String,
}
#[derive(Debug)]
enum VerOrDeleteKind {
Version,
DeleteMarker,
}
impl VerOrDelete {
fn with_kind(
kind: VerOrDeleteKind,
last_modified: Option<DateTime>,
version_id: Option<String>,
key: Option<String>,
) -> anyhow::Result<Self> {
let lvk = (last_modified, version_id, key);
let (Some(last_modified), Some(version_id), Some(key)) = lvk else {
anyhow::bail!(
"One (or more) of last_modified, key, and id is None. \
Is versioning enabled in the bucket? last_modified={:?}, version_id={:?}, key={:?}",
lvk.0,
lvk.1,
lvk.2,
);
};
Ok(Self {
kind,
last_modified,
version_id,
key,
})
}
fn from_version(v: ObjectVersion) -> anyhow::Result<Self> {
Self::with_kind(
VerOrDeleteKind::Version,
v.last_modified,
v.version_id,
v.key,
)
}
fn from_delete_marker(v: DeleteMarkerEntry) -> anyhow::Result<Self> {
Self::with_kind(
VerOrDeleteKind::DeleteMarker,
v.last_modified,
v.version_id,
v.key,
)
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroUsize;

View File

@@ -50,6 +50,7 @@ impl UnreliableWrapper {
GenericRemoteStorage::Unreliable(_s) => {
panic!("Can't wrap unreliable wrapper unreliably")
}
GenericRemoteStorage::GCS(_) => todo!(),
};
UnreliableWrapper {
inner,
@@ -139,20 +140,6 @@ impl RemoteStorage for UnreliableWrapper {
self.inner.list(prefix, mode, max_keys, cancel).await
}
async fn list_versions(
&self,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;
self.inner
.list_versions(prefix, mode, max_keys, cancel)
.await
}
async fn head_object(
&self,
key: &RemotePath,

View File

@@ -1,5 +1,5 @@
[package]
name = "endpoint_storage"
name = "object_storage"
version = "0.0.1"
edition.workspace = true
license.workspace = true

View File

@@ -2,7 +2,7 @@ use anyhow::anyhow;
use axum::body::{Body, Bytes};
use axum::response::{IntoResponse, Response};
use axum::{Router, http::StatusCode};
use endpoint_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use object_storage::{PrefixS3Path, S3Path, Storage, bad_request, internal_error, not_found, ok};
use remote_storage::TimeoutOrCancel;
use remote_storage::{DownloadError, DownloadOpts, GenericRemoteStorage, RemotePath};
use std::{sync::Arc, time::SystemTime, time::UNIX_EPOCH};
@@ -46,12 +46,12 @@ async fn metrics() -> Result {
async fn get(S3Path { path }: S3Path, state: State) -> Result {
info!(%path, "downloading");
let download_err = |err| {
if let DownloadError::NotFound = err {
info!(%path, %err, "downloading"); // 404 is not an issue of _this_ service
let download_err = |e| {
if let DownloadError::NotFound = e {
info!(%path, %e, "downloading"); // 404 is not an issue of _this_ service
return not_found(&path);
}
internal_error(err, &path, "downloading")
internal_error(e, &path, "downloading")
};
let cancel = state.cancel.clone();
let opts = &DownloadOpts::default();
@@ -249,7 +249,7 @@ mod tests {
};
let proxy = Storage {
auth: endpoint_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
auth: object_storage::JwtAuth::new(TEST_PUB_KEY_ED25519).unwrap(),
storage,
cancel: cancel.clone(),
max_upload_file_limit: usize::MAX,
@@ -343,14 +343,14 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
TimelineId::from_array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
fn token() -> String {
let claims = endpoint_storage::Claims {
let claims = object_storage::Claims {
tenant_id: TENANT_ID,
timeline_id: TIMELINE_ID,
endpoint_id: ENDPOINT_ID.into(),
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}
@@ -364,10 +364,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
vec![TIMELINE_ID.to_string(), TimelineId::generate().to_string()],
vec![ENDPOINT_ID, "ep-ololo"]
)
// first one is fully valid path, second path is valid for GET as
// read paths may have different endpoint if tenant and timeline matches
// (needed for prewarming RO->RW replica)
.skip(2);
.skip(1);
for ((uri, method), (tenant, timeline, endpoint)) in iproduct!(routes(), args) {
info!(%uri, %method, %tenant, %timeline, %endpoint);
@@ -478,16 +475,6 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
requests_chain(chain.into_iter(), |_| token()).await;
}
#[testlog(tokio::test)]
async fn read_other_endpoint_data() {
let uri = format!("/{TENANT_ID}/{TIMELINE_ID}/other_endpoint/key");
let chain = vec![
(uri.clone(), "GET", "", StatusCode::NOT_FOUND, false),
(uri.clone(), "PUT", "", StatusCode::UNAUTHORIZED, false),
];
requests_chain(chain.into_iter(), |_| token()).await;
}
fn delete_prefix_token(uri: &str) -> String {
use serde::Serialize;
let parts = uri.split("/").collect::<Vec<&str>>();
@@ -495,7 +482,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
struct PrefixClaims {
tenant_id: TenantId,
timeline_id: Option<TimelineId>,
endpoint_id: Option<endpoint_storage::EndpointId>,
endpoint_id: Option<object_storage::EndpointId>,
exp: u64,
}
let claims = PrefixClaims {
@@ -505,7 +492,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
exp: u64::MAX,
};
let key = jsonwebtoken::EncodingKey::from_ed_pem(TEST_PRIV_KEY_ED25519).unwrap();
let header = jsonwebtoken::Header::new(endpoint_storage::VALIDATION_ALGO);
let header = jsonwebtoken::Header::new(object_storage::VALIDATION_ALGO);
jsonwebtoken::encode(&header, &claims, &key).unwrap()
}

View File

@@ -169,19 +169,10 @@ impl FromRequestParts<Arc<Storage>> for S3Path {
.auth
.decode(bearer.token())
.map_err(|e| bad_request(e, "decoding token"))?;
// Read paths may have different endpoint ids. For readonly -> readwrite replica
// prewarming, endpoint must read other endpoint's data.
let endpoint_id = if parts.method == axum::http::Method::GET {
claims.endpoint_id.clone()
} else {
path.endpoint_id.clone()
};
let route = Claims {
tenant_id: path.tenant_id,
timeline_id: path.timeline_id,
endpoint_id,
endpoint_id: path.endpoint_id.clone(),
exp: claims.exp,
};
if route != claims {

View File

@@ -1,4 +1,4 @@
//! `endpoint_storage` is a service which provides API for uploading and downloading
//! `object_storage` is a service which provides API for uploading and downloading
//! files. It is used by compute and control plane for accessing LFC prewarm data.
//! This service is deployed either as a separate component or as part of compute image
//! for large computes.
@@ -33,7 +33,7 @@ async fn main() -> anyhow::Result<()> {
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: endpoint_storage config.json")
anyhow::bail!("Usage: object_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
@@ -41,7 +41,7 @@ async fn main() -> anyhow::Result<()> {
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
let auth = endpoint_storage::JwtAuth::new(&pemfile)?;
let auth = object_storage::JwtAuth::new(&pemfile)?;
let listener = tokio::net::TcpListener::bind(config.listen).await.unwrap();
info!("listening on {}", listener.local_addr().unwrap());
@@ -50,7 +50,7 @@ async fn main() -> anyhow::Result<()> {
let cancel = tokio_util::sync::CancellationToken::new();
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
let proxy = std::sync::Arc::new(object_storage::Storage {
auth,
storage,
cancel: cancel.clone(),

View File

@@ -11,7 +11,6 @@ use pageserver::task_mgr::TaskKind;
use pageserver::tenant::storage_layer::InMemoryLayer;
use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
@@ -22,14 +21,13 @@ use wal_decoder::serialized_batch::SerializedValueBatch;
// A very cheap hash for generating non-sequential keys.
fn murmurhash32(mut h: u32) -> u32 {
h ^= h >> 16;
h h.wrapping_mul(0x85ebca6b);
h = h.wrapping_mul(0x85ebca6b);
h ^= h >> 13;
h = h.wrapping_mul(0xc2b2ae35);
h ^= h >> 16;
h
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum KeyLayout {
/// Sequential unique keys
Sequential,
@@ -39,7 +37,6 @@ enum KeyLayout {
RandomReuse(u32),
}
#[derive(serde::Serialize, Clone, Copy, Debug)]
enum WriteDelta {
Yes,
No,
@@ -141,15 +138,12 @@ async fn ingest(
/// Wrapper to instantiate a tokio runtime
fn ingest_main(
conf: &'static PageServerConf,
io_mode: IoMode,
put_size: usize,
put_count: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
) {
pageserver::virtual_file::set_io_mode(io_mode);
let runtime = tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
@@ -180,207 +174,93 @@ fn criterion_benchmark(c: &mut Criterion) {
virtual_file::init(
16384,
virtual_file::io_engine_for_bench(),
// immaterial, each `ingest_main` invocation below overrides this
conf.virtual_file_io_mode,
// without actually doing syncs, buffered writes have an unfair advantage over direct IO writes
virtual_file::SyncMode::Sync,
);
page_cache::init(conf.page_cache_size);
#[derive(serde::Serialize)]
struct ExplodedParameters {
io_mode: IoMode,
volume_mib: usize,
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
}
#[derive(Clone)]
struct HandPickedParameters {
volume_mib: usize,
key_size: usize,
key_layout: KeyLayout,
write_delta: WriteDelta,
}
let expect = vec![
// Small values (100b) tests
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Random,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::RandomReuse(0x3ff),
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 100,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::No,
},
// Large values (8k) tests
HandPickedParameters {
volume_mib: 128,
key_size: 8192,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::Yes,
},
HandPickedParameters {
volume_mib: 128,
key_size: 8192,
key_layout: KeyLayout::Sequential,
write_delta: WriteDelta::No,
},
];
let exploded_parameters = {
let mut out = Vec::new();
for io_mode in [
IoMode::Buffered,
#[cfg(target_os = "linux")]
IoMode::Direct,
] {
for param in expect.clone() {
let HandPickedParameters {
volume_mib,
key_size,
key_layout,
write_delta,
} = param;
out.push(ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
});
}
}
out
};
impl ExplodedParameters {
fn benchmark_id(&self) -> String {
let ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
} = self;
format!(
"io_mode={io_mode:?} volume_mib={volume_mib:?} key_size_bytes={key_size:?} key_layout={key_layout:?} write_delta={write_delta:?}"
)
}
}
let mut group = c.benchmark_group("ingest");
for params in exploded_parameters {
let id = params.benchmark_id();
let ExplodedParameters {
io_mode,
volume_mib,
key_size,
key_layout,
write_delta,
} = params;
let put_count = volume_mib * 1024 * 1024 / key_size;
group.throughput(criterion::Throughput::Bytes((key_size * put_count) as u64));
{
let mut group = c.benchmark_group("ingest-small-values");
let put_size = 100usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function(id, |b| {
b.iter(|| ingest_main(conf, io_mode, key_size, put_count, key_layout, write_delta))
group.bench_function("ingest 128MB/100b seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Random,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b rand-1024keys", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::RandomReuse(0x3ff),
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/100b seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
{
let mut group = c.benchmark_group("ingest-big-values");
let put_size = 8192usize;
let put_count = 128 * 1024 * 1024 / put_size;
group.throughput(criterion::Throughput::Bytes((put_size * put_count) as u64));
group.sample_size(10);
group.bench_function("ingest 128MB/8k seq", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::Yes,
)
})
});
group.bench_function("ingest 128MB/8k seq, no delta", |b| {
b.iter(|| {
ingest_main(
conf,
put_size,
put_count,
KeyLayout::Sequential,
WriteDelta::No,
)
})
});
}
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
/*
cargo bench --bench bench_ingest
im4gn.2xlarge:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8491 s 1.8540 s 1.8592 s]
thrpt: [68.847 MiB/s 69.039 MiB/s 69.222 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.6976 s 2.7123 s 2.7286 s]
thrpt: [46.911 MiB/s 47.193 MiB/s 47.450 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.7433 s 1.7510 s 1.7600 s]
thrpt: [72.729 MiB/s 73.099 MiB/s 73.423 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [499.63 ms 500.07 ms 500.46 ms]
thrpt: [255.77 MiB/s 255.96 MiB/s 256.19 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [456.97 ms 459.61 ms 461.92 ms]
thrpt: [277.11 MiB/s 278.50 MiB/s 280.11 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [158.82 ms 159.16 ms 159.56 ms]
thrpt: [802.22 MiB/s 804.24 MiB/s 805.93 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.8856 s 1.8997 s 1.9179 s]
thrpt: [66.740 MiB/s 67.380 MiB/s 67.882 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [2.7468 s 2.7625 s 2.7785 s]
thrpt: [46.068 MiB/s 46.335 MiB/s 46.600 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.7689 s 1.7726 s 1.7767 s]
thrpt: [72.045 MiB/s 72.208 MiB/s 72.363 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [497.64 ms 498.60 ms 499.67 ms]
thrpt: [256.17 MiB/s 256.72 MiB/s 257.21 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [493.72 ms 505.07 ms 518.03 ms]
thrpt: [247.09 MiB/s 253.43 MiB/s 259.26 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [267.76 ms 267.85 ms 267.96 ms]
thrpt: [477.69 MiB/s 477.88 MiB/s 478.03 MiB/s]
Hetzner AX102:
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0683 s 1.1006 s 1.1386 s]
thrpt: [112.42 MiB/s 116.30 MiB/s 119.82 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5719 s 1.6012 s 1.6228 s]
thrpt: [78.877 MiB/s 79.938 MiB/s 81.430 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Y...
time: [1.1095 s 1.1331 s 1.1580 s]
thrpt: [110.53 MiB/s 112.97 MiB/s 115.37 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [303.20 ms 307.83 ms 311.90 ms]
thrpt: [410.39 MiB/s 415.81 MiB/s 422.16 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [406.34 ms 429.37 ms 451.63 ms]
thrpt: [283.42 MiB/s 298.11 MiB/s 315.00 MiB/s]
ingest/io_mode=Buffered volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [134.01 ms 135.78 ms 137.48 ms]
thrpt: [931.03 MiB/s 942.68 MiB/s 955.12 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=Yes
time: [1.0406 s 1.0580 s 1.0772 s]
thrpt: [118.83 MiB/s 120.98 MiB/s 123.00 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Random write_delta=Yes
time: [1.5059 s 1.5339 s 1.5625 s]
thrpt: [81.920 MiB/s 83.448 MiB/s 84.999 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=RandomReuse(1023) write_delta=Yes
time: [1.0714 s 1.0934 s 1.1161 s]
thrpt: [114.69 MiB/s 117.06 MiB/s 119.47 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=100 key_layout=Sequential write_delta=No
time: [262.68 ms 265.14 ms 267.71 ms]
thrpt: [478.13 MiB/s 482.76 MiB/s 487.29 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=Yes
time: [375.19 ms 393.80 ms 411.40 ms]
thrpt: [311.14 MiB/s 325.04 MiB/s 341.16 MiB/s]
ingest/io_mode=Direct volume_mib=128 key_size_bytes=8192 key_layout=Sequential write_delta=No
time: [123.02 ms 123.85 ms 124.66 ms]
thrpt: [1.0027 GiB/s 1.0093 GiB/s 1.0161 GiB/s]
*/

View File

@@ -419,23 +419,6 @@ impl Client {
}
}
pub async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
);
self.request(Method::GET, &uri, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_archival_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -225,11 +225,6 @@ pub struct PageServerConf {
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
/// Run in development mode, which disables certain safety checks
/// such as authentication requirements for HTTP and PostgreSQL APIs.
/// This is insecure and should only be used in development environments.
pub dev_mode: bool,
}
/// Token for authentication to safekeepers
@@ -403,7 +398,6 @@ impl PageServerConf {
generate_unarchival_heatmap,
tracing,
enable_tls_page_service_api,
dev_mode,
} = config_toml;
let mut conf = PageServerConf {
@@ -455,7 +449,6 @@ impl PageServerConf {
get_vectored_concurrent_io,
tracing,
enable_tls_page_service_api,
dev_mode,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -263,9 +263,7 @@ where
while let Some((tenant_id, tenant)) = tenants.next().await {
let mut tenant_resident_size = 0;
let timelines = tenant.list_timelines();
let timelines_len = timelines.len();
for timeline in timelines {
for timeline in tenant.list_timelines() {
let timeline_id = timeline.timeline_id;
match TimelineSnapshot::collect(&timeline, ctx) {
@@ -291,11 +289,6 @@ where
tenant_resident_size += timeline.resident_physical_size();
}
if timelines_len == 0 {
// Force set it to 1 byte to avoid not being reported -- all timelines are offloaded.
tenant_resident_size = 1;
}
let snap = TenantSnapshot::collect(&tenant, tenant_resident_size);
snap.to_metrics(tenant_id, Utc::now(), cache, &mut current_metrics);
}

View File

@@ -3,11 +3,10 @@ use std::collections::HashMap;
use futures::Future;
use pageserver_api::config::NodeMetadata;
use pageserver_api::controller_api::{AvailabilityZone, NodeRegisterRequest};
use pageserver_api::models::ShardImportStatus;
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequest, ValidateRequestTenant, ValidateResponse,
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
ValidateRequestTenant, ValidateResponse,
};
use reqwest::Certificate;
use serde::Serialize;
@@ -15,7 +14,7 @@ use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::id::NodeId;
use utils::{backoff, failpoint_support};
use crate::config::PageServerConf;
@@ -47,12 +46,6 @@ pub trait StorageControllerUpcallApi {
&self,
tenants: Vec<(TenantShardId, Generation)>,
) -> impl Future<Output = Result<HashMap<TenantShardId, bool>, RetryForeverError>> + Send;
fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> impl Future<Output = Result<(), RetryForeverError>> + Send;
}
impl StorageControllerUpcallClient {
@@ -280,30 +273,4 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(result.into_iter().collect())
}
/// Send a shard import status to the storage controller
///
/// The implementation must have at-least-once delivery semantics.
/// To this end, we retry the request until it succeeds. If the pageserver
/// restarts or crashes, the shard import will start again from the beggining.
#[tracing::instrument(skip_all)] // so that warning logs from retry_http_forever have context
async fn put_timeline_import_status(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
status: ShardImportStatus,
) -> Result<(), RetryForeverError> {
let url = self
.base_url
.join("timeline_import_status")
.expect("Failed to build path");
let request = PutTimelineImportStatusRequest {
tenant_shard_id,
timeline_id,
status,
};
self.retry_http_forever(&url, request).await
}
}

View File

@@ -787,15 +787,6 @@ mod test {
Ok(result)
}
async fn put_timeline_import_status(
&self,
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
_status: pageserver_api::models::ShardImportStatus,
) -> Result<(), RetryForeverError> {
unimplemented!()
}
}
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {

View File

@@ -28,7 +28,7 @@ use tracing::warn;
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
#[derive(Copy, Clone, Debug)]
@@ -218,7 +218,7 @@ pub(super) const BYTE_ZSTD: u8 = BYTE_UNCOMPRESSED | 0x10;
/// discarded. You need to call [`flush_buffer`](Self::flush_buffer)
/// manually before dropping.
pub struct BlobWriter<const BUFFERED: bool> {
inner: TempVirtualFile,
inner: VirtualFile,
offset: u64,
/// A buffer to save on write calls, only used if BUFFERED=true
buf: Vec<u8>,
@@ -228,7 +228,7 @@ pub struct BlobWriter<const BUFFERED: bool> {
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
pub fn new(
inner: TempVirtualFile,
inner: VirtualFile,
start_offset: u64,
_gate: &utils::sync::gate::Gate,
_cancel: CancellationToken,
@@ -476,17 +476,30 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
}
}
impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
/// Finish this blob writer and return the underlying [`TempVirtualFile`].
impl BlobWriter<true> {
/// Access the underlying `VirtualFile`.
///
/// If there is an internal buffer (depends on `BUFFERED`), it will
/// be flushed before this method returns.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<TempVirtualFile, Error> {
if BUFFERED {
self.flush_buffer(ctx).await?;
}
/// This function flushes the internal buffer before giving access
/// to the underlying `VirtualFile`.
pub async fn into_inner(mut self, ctx: &RequestContext) -> Result<VirtualFile, Error> {
self.flush_buffer(ctx).await?;
Ok(self.inner)
}
/// Access the underlying `VirtualFile`.
///
/// Unlike [`into_inner`](Self::into_inner), this doesn't flush
/// the internal buffer before giving access.
pub fn into_inner_no_flush(self) -> VirtualFile {
self.inner
}
}
impl BlobWriter<false> {
/// Access the underlying `VirtualFile`.
pub fn into_inner(self) -> VirtualFile {
self.inner
}
}
#[cfg(test)]
@@ -499,7 +512,6 @@ pub(crate) mod tests {
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::BlockReaderRef;
use crate::virtual_file::VirtualFile;
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
round_trip_test_compressed::<BUFFERED>(blobs, false).await
@@ -518,10 +530,7 @@ pub(crate) mod tests {
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = TempVirtualFile::new(
VirtualFile::create(pathbuf.as_path(), ctx).await?,
gate.enter().unwrap(),
);
let file = VirtualFile::create(pathbuf.as_path(), ctx).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0, &gate, cancel.clone(), ctx);
for blob in blobs.iter() {
let (_, res) = if compression {
@@ -544,9 +553,7 @@ pub(crate) mod tests {
let (_, res) = wtr.write_blob(vec![0; PAGE_SZ].slice_len(), ctx).await;
let offs = res?;
println!("Writing final blob at offs={offs}");
let file = wtr.into_inner(ctx).await?;
file.disarm_into_inner();
wtr.flush_buffer(ctx).await?;
}
Ok((temp_dir, pathbuf, offsets))
}

View File

@@ -12,7 +12,6 @@ use tokio_epoll_uring::{BoundedBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::{error, info_span};
use utils::id::TimelineId;
use utils::sync::gate::GateGuard;
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
@@ -22,33 +21,16 @@ use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
use self::owned_buffers_io::write::OwnedAsyncWriter;
use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
pub struct EphemeralFile {
_tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
page_cache_file_id: page_cache::FileId,
bytes_written: u64,
file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
buffered_writer: BufferedWriter,
}
type BufferedWriter = owned_buffers_io::write::BufferedWriter<
IoBufferMut,
TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
>;
/// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
///
/// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
/// The co-ownership is between [`EphemeralFile`] and that flush task.)
///
/// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
#[derive(Debug, Clone)]
struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
inner: Arc<TempVirtualFile>,
buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: utils::sync::gate::GateGuard,
}
const TAIL_SZ: usize = 64 * 1024;
@@ -62,12 +44,9 @@ impl EphemeralFile {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<EphemeralFile> {
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
.timeline_path(&tenant_shard_id, &timeline_id)
@@ -75,7 +54,7 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));
let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
let file = Arc::new(
VirtualFile::open_with_options_v2(
&filename,
virtual_file::OpenOptions::new()
@@ -85,7 +64,6 @@ impl EphemeralFile {
ctx,
)
.await?,
gate.enter()?,
);
let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
@@ -95,8 +73,7 @@ impl EphemeralFile {
_timeline_id: timeline_id,
page_cache_file_id,
bytes_written: 0,
file: file.clone(),
buffered_writer: BufferedWriter::new(
buffered_writer: owned_buffers_io::write::BufferedWriter::new(
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
@@ -104,42 +81,29 @@ impl EphemeralFile {
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
_gate_guard: gate.enter()?,
})
}
}
impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
impl Drop for EphemeralFile {
fn drop(&mut self) {
// unlink the file
// we are clear to do this, because we have entered a gate
let path = self.buffered_writer.as_inner().path();
let res = std::fs::remove_file(path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {
// just never log the not found errors, we cannot do anything for them; on detach
// the tenant directory is already gone.
//
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!("could not remove ephemeral file '{path}': {e}");
}
}
}
}
impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
&self,
buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<
Output = (
owned_buffers_io::io_buf_ext::FullSlice<Buf>,
std::io::Result<()>,
),
> + Send {
self.inner.write_all_at(buf, offset, ctx)
}
}
impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
@@ -298,9 +262,9 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
let dst = if written_range.len() > 0 {
let file: &VirtualFile = self.buffered_writer.as_inner();
let bounds = dst.bounds();
let slice = self
.file
let slice = file
.read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
.await?;
Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
@@ -492,7 +456,7 @@ mod tests {
assert_eq!(&buf, &content[range]);
}
let file_contents = std::fs::read(file.file.path()).unwrap();
let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
assert!(file_contents == content[0..cap * 2]);
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
@@ -525,7 +489,7 @@ mod tests {
// assert the state is as this test expects it to be
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
let md = file.file.path().metadata().unwrap();
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
assert_eq!(
md.len(),
2 * cap.into_u64(),

View File

@@ -6,7 +6,6 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;
use anyhow::{Context, anyhow};
@@ -16,7 +15,7 @@ use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
};
use tokio::fs::{self, File, OpenOptions};
use tokio::io::AsyncSeekExt;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
@@ -41,10 +40,7 @@ use crate::span::{
use crate::tenant::Generation;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::virtual_file;
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use crate::virtual_file::{IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{TempVirtualFile, owned_buffers_io};
use crate::virtual_file::{MaybeFatalIo, VirtualFile, on_fatal_io_error};
///
/// If 'metadata' is given, we will validate that the downloaded file's size matches that
@@ -76,36 +72,21 @@ pub async fn download_layer_file<'a>(
layer_metadata.generation,
);
let (bytes_amount, temp_file) = download_retry(
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
// write(tmp)
// fsync(tmp)
// rename(tmp, new)
// fsync(new)
// fsync(parent)
// For more context about durable_rename check this email from postgres mailing list:
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(local_path, TEMP_DOWNLOAD_EXTENSION);
let bytes_amount = download_retry(
|| async {
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let temp_file_path = path_with_suffix_extension(
local_path,
&format!("{filename_disambiguator:x}.{TEMP_DOWNLOAD_EXTENSION}"),
);
let temp_file = TempVirtualFile::new(
// Not _v2 yet which is sensitive to virtual_file_io_mode.
// That'll happen in PR https://github.com/neondatabase/neon/pull/11558
VirtualFile::open_with_options(
&temp_file_path,
virtual_file::OpenOptions::new()
.create_new(true)
.write(true),
ctx,
)
.await
.with_context(|| format!("create a temp file for layer download: {temp_file_path}"))
.map_err(DownloadError::Other)?,
gate.enter().map_err(|_| DownloadError::Cancelled)?,
);
download_object(storage, &remote_path, temp_file, gate, cancel, ctx).await
download_object(storage, &remote_path, &temp_file_path, gate, cancel, ctx).await
},
&format!("download {remote_path:?}"),
cancel,
@@ -115,8 +96,7 @@ pub async fn download_layer_file<'a>(
let expected = layer_metadata.file_size;
if expected != bytes_amount {
return Err(DownloadError::Other(anyhow!(
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {:?}",
temp_file.path()
"According to layer file metadata should have downloaded {expected} bytes but downloaded {bytes_amount} bytes into file {temp_file_path:?}",
)));
}
@@ -126,28 +106,11 @@ pub async fn download_layer_file<'a>(
)))
});
// Try rename before disarming the temp file.
// That way, if rename fails for whatever reason, we clean up the temp file on the return path.
fs::rename(temp_file.path(), &local_path)
fs::rename(&temp_file_path, &local_path)
.await
.with_context(|| format!("rename download layer file to {local_path}"))
.map_err(DownloadError::Other)?;
// The temp file's VirtualFile points to the temp_file_path which we moved above.
// Drop it immediately, it's invalid.
// This will get better in https://github.com/neondatabase/neon/issues/11692
let _: VirtualFile = temp_file.disarm_into_inner();
// NB: The gate guard that was stored in `temp_file` is dropped but we continue
// to operate on it and on the parent timeline directory.
// Those operations are safe to do because higher-level code is holding another gate guard:
// - attached mode: the download task spawned by struct Layer is holding the gate guard
// - secondary mode: The TenantDownloader::download holds the gate open
// The rename above is not durable yet.
// It doesn't matter for crash consistency because pageserver startup deletes temp
// files and we'll re-download on demand if necessary.
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
@@ -183,58 +146,147 @@ pub async fn download_layer_file<'a>(
async fn download_object(
storage: &GenericRemoteStorage,
src_path: &RemotePath,
destination_file: TempVirtualFile,
gate: &utils::sync::gate::Gate,
dst_path: &Utf8PathBuf,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<(u64, TempVirtualFile), DownloadError> {
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
) -> Result<u64, DownloadError> {
let res = match crate::virtual_file::io_engine::get() {
crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"),
crate::virtual_file::io_engine::IoEngine::StdFs => {
async {
let destination_file = tokio::fs::File::create(dst_path)
.await
.with_context(|| format!("create a destination file for layer '{dst_path}'"))
.map_err(DownloadError::Other)?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
let download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
let dst_path = destination_file.path().to_owned();
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
pausable_failpoint!("before-downloading-layer-stream-pausable");
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) = futures::StreamExt::next(&mut download.download_stream).await {
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
let mut buf_writer =
tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file);
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?;
buf_writer.flush().await?;
let mut destination_file = buf_writer.into_inner();
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file
.flush()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("flush source file at {dst_path}"))
.map_err(DownloadError::Other)?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use std::sync::Arc;
use crate::virtual_file::{IoBufferMut, owned_buffers_io};
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)
.await
.with_context(|| {
format!("create a destination file for layer '{dst_path}'")
})
.map_err(DownloadError::Other)?,
);
let mut download = storage
.download(src_path, &DownloadOpts::default(), cancel)
.await?;
pausable_failpoint!("before-downloading-layer-stream-pausable");
let mut buffered = owned_buffers_io::write::BufferedWriter::<IoBufferMut, _>::new(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
// There's chunks_vectored() on the stream.
let (bytes_amount, destination_file) = async {
while let Some(res) =
futures::StreamExt::next(&mut download.download_stream).await
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(DownloadError::from(e)),
};
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered
.flush_and_into_inner(ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok(bytes_amount)
}
.await
}
};
// in case the download failed, clean up
match res {
Ok(bytes_amount) => Ok(bytes_amount),
Err(e) => {
if let Err(e) = tokio::fs::remove_file(dst_path).await {
if e.kind() != std::io::ErrorKind::NotFound {
on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}"));
}
}
Err(e)
}
let inner = buffered.shutdown(ctx).await.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;
// not using sync_data because it can lose file size update
destination_file
.sync_all()
.await
.maybe_fatal_err("download_object sync_all")
.with_context(|| format!("failed to fsync source file at {dst_path}"))
.map_err(DownloadError::Other)?;
Ok((bytes_amount, destination_file))
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";

View File

@@ -646,7 +646,7 @@ enum UpdateError {
NoData,
#[error("Insufficient local storage space")]
NoSpace,
#[error("Failed to download: {0}")]
#[error("Failed to download")]
DownloadError(DownloadError),
#[error(transparent)]
Deserialize(#[from] serde_json::Error),

View File

@@ -34,7 +34,6 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
@@ -46,6 +45,8 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
@@ -73,7 +74,6 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -288,20 +288,19 @@ impl DeltaLayer {
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!(
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
"{}-XXX__{:016X}-{:016X}.{}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
filename_disambiguator,
rand_string,
TEMP_FILE_SUFFIX,
))
}
@@ -422,7 +421,7 @@ impl DeltaLayerWriterInner {
let path =
DeltaLayer::temp_path_for(conf, &tenant_shard_id, &timeline_id, key_start, &lsn_range);
let mut file = TempVirtualFile::new(VirtualFile::create(&path, ctx).await?, gate.enter()?);
let mut file = VirtualFile::create(&path, ctx).await?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -516,6 +515,22 @@ impl DeltaLayerWriterInner {
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(key_end, ctx).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
async fn finish0(
self,
key_end: Key,
ctx: &RequestContext,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -583,10 +598,6 @@ impl DeltaLayerWriterInner {
trace!("created delta layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path))
}
}
@@ -715,6 +726,17 @@ impl DeltaLayerWriter {
}
}
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
vfile.remove();
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
@@ -1587,8 +1609,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -32,7 +32,6 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
@@ -44,6 +43,8 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
@@ -71,7 +72,6 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, BufView, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadPlanner,
};
use crate::virtual_file::TempVirtualFile;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
@@ -252,18 +252,14 @@ impl ImageLayer {
tenant_shard_id: TenantShardId,
fname: &ImageLayerName,
) -> Utf8PathBuf {
// TempVirtualFile requires us to never reuse a filename while an old
// instance of TempVirtualFile created with that filename is not done dropping yet.
// So, we use a monotonic counter to disambiguate the filenames.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
}
///
@@ -777,7 +773,7 @@ impl ImageLayerWriterInner {
},
);
trace!("creating image layer {}", path);
let mut file = TempVirtualFile::new(
let mut file = {
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
@@ -785,9 +781,8 @@ impl ImageLayerWriterInner {
.create_new(true),
ctx,
)
.await?,
gate.enter()?,
);
.await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx);
@@ -901,6 +896,25 @@ impl ImageLayerWriterInner {
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let temp_path = self.path.clone();
let result = self.finish0(ctx, end_key).await;
if let Err(ref e) = result {
tracing::info!(%temp_path, "cleaning up temporary file after error during writing: {e}");
if let Err(e) = std::fs::remove_file(&temp_path) {
tracing::warn!(error=%e, %temp_path, "error cleaning up temporary layer file after error during writing");
}
}
result
}
///
/// Finish writing the image layer.
///
async fn finish0(
self,
ctx: &RequestContext,
end_key: Option<Key>,
) -> anyhow::Result<(PersistentLayerDesc, Utf8PathBuf)> {
let index_start_blk = self.blob_writer.size().div_ceil(PAGE_SZ as u64) as u32;
@@ -918,7 +932,7 @@ impl ImageLayerWriterInner {
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
};
let mut file = self.blob_writer.into_inner(ctx).await?;
let mut file = self.blob_writer.into_inner();
// Write out the index
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
@@ -986,10 +1000,6 @@ impl ImageLayerWriterInner {
trace!("created image layer {}", self.path);
// The gate guard stored in `destination_file` is dropped. Callers (e.g.. flush loop or compaction)
// keep the gate open also, so that it's safe for them to rename the file to its final destination.
file.disarm_into_inner();
Ok((desc, self.path))
}
}
@@ -1115,6 +1125,14 @@ impl ImageLayerWriter {
}
}
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
}
}
pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,

View File

@@ -1285,10 +1285,6 @@ impl Timeline {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
if query.is_empty() {
return Ok(BTreeMap::default());
}
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
Some(ReadPath::new(
query.total_keyspace(),

View File

@@ -1,21 +1,20 @@
use std::sync::Arc;
use anyhow::{Context, bail};
use pageserver_api::models::ShardImportStatus;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{Instrument, info, info_span};
use utils::lsn::Lsn;
use super::Timeline;
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
mod flow;
mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub(crate) mod upcall_api;
pub async fn doit(
timeline: &Arc<Timeline>,
@@ -35,6 +34,23 @@ pub async fn doit(
let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
info!("get spec early so we know we'll be able to upcall when done");
let Some(spec) = storage.get_spec().await? else {
bail!("spec not found")
};
let upcall_client =
upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
//
// send an early progress update to clean up k8s job early and generate potentially useful logs
//
info!("send early progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("early_progress_update"))
.await?;
let status_prefix = RemotePath::from_string("status").unwrap();
//
@@ -160,21 +176,7 @@ pub async fn doit(
//
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.
//
let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
.expect("storcon configured");
storcon_client
.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
// TODO(vlad): What about import errors?
ShardImportStatus::Done,
)
.await
.map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
storage
.put_json(
&shard_status_key,
@@ -184,6 +186,16 @@ pub async fn doit(
.context("put shard status")?;
}
//
// Ensure at-least-once deliver of the upcall to cplane
// before we mark the task as done and never come here again.
//
info!("send final progress update");
upcall_client
.send_progress_until_success(&spec)
.instrument(info_span!("final_progress_update"))
.await?;
//
// Mark as done in index_part.
// This makes subsequent timeline loads enter the normal load code path

View File

@@ -13,7 +13,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
use utils::lsn::Lsn;
use super::index_part_format;
use super::{importbucket_format, index_part_format};
use crate::assert_u64_eq_usize::U64IsUsize;
use crate::config::PageServerConf;
@@ -173,6 +173,12 @@ impl RemoteStorageWrapper {
res
}
pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
self.get_json(&RemotePath::from_string("spec.json").unwrap())
.await
.context("get spec")
}
#[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
pub async fn get_json<T: DeserializeOwned>(
&self,
@@ -238,8 +244,7 @@ impl RemoteStorageWrapper {
kind: DownloadKind::Large,
etag: None,
byte_start: Bound::Included(start_inclusive),
byte_end: Bound::Excluded(end_exclusive),
version_id: None,
byte_end: Bound::Excluded(end_exclusive)
},
&self.cancel)
.await?;

View File

@@ -11,3 +11,10 @@ pub struct ShardStatus {
pub done: bool,
// TODO: remaining fields
}
// TODO: dedupe with fast_import code
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Spec {
pub project_id: String,
pub branch_id: String,
}

View File

@@ -0,0 +1,124 @@
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
use reqwest::{Certificate, Method};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::error;
use super::importbucket_format::Spec;
use crate::config::PageServerConf;
pub struct Client {
base_url: String,
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressRequest {
// no fields yet, not sure if there every will be any
}
#[derive(Serialize, Deserialize, Debug)]
struct ImportProgressResponse {
// we don't care
}
impl Client {
pub fn new(conf: &PageServerConf, cancel: CancellationToken) -> anyhow::Result<Self> {
let Some(ref base_url) = conf.import_pgdata_upcall_api else {
anyhow::bail!("import_pgdata_upcall_api is not configured")
};
let mut http_client = reqwest::Client::builder();
for cert in &conf.ssl_ca_certs {
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
}
let http_client = http_client.build()?;
Ok(Self {
base_url: base_url.to_string(),
client: http_client,
cancel,
authorization_header: conf
.import_pgdata_upcall_api_token
.as_ref()
.map(|secret_string| secret_string.get_contents())
.map(|jwt| format!("Bearer {jwt}")),
})
}
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
}
}
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
Ok(response)
}
pub async fn send_progress_once(&self, spec: &Spec) -> Result<()> {
let url = format!(
"{}/projects/{}/branches/{}/import_progress",
self.base_url, spec.project_id, spec.branch_id
);
let ImportProgressResponse {} = self
.request(Method::POST, url, &ImportProgressRequest {})
.await?
.json()
.await
.map_err(Error::ReceiveBody)?;
Ok(())
}
pub async fn send_progress_until_success(&self, spec: &Spec) -> anyhow::Result<()> {
loop {
match self.send_progress_once(spec).await {
Ok(()) => return Ok(()),
Err(Error::Cancelled) => return Err(anyhow::anyhow!("cancelled")),
Err(err) => {
error!(?err, "error sending progress, retrying");
if tokio::time::timeout(
std::time::Duration::from_secs(10),
self.cancel.cancelled(),
)
.await
.is_ok()
{
anyhow::bail!("cancelled while sending early progress update");
}
}
}
}
}
}

View File

@@ -25,31 +25,29 @@ use owned_buffers_io::aligned_buffer::{AlignedBufferMut, AlignedSlice, ConstAlig
use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut};
use owned_buffers_io::io_buf_ext::FullSlice;
use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT;
pub use pageserver_api::models::virtual_file as api;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice};
use self::owned_buffers_io::write::OwnedAsyncWriter;
use crate::assert_u64_eq_usize::UsizeIsU64;
use crate::context::RequestContext;
use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation};
use crate::page_cache::{PAGE_SZ, PageWriteGuard};
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) mod io_engine;
pub use io_engine::{
FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test,
io_engine_for_bench,
};
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
pub use pageserver_api::models::virtual_file as api;
pub use temporary::TempVirtualFile;
pub(crate) mod io_engine;
mod metadata;
mod open_options;
mod temporary;
pub(crate) use api::IoMode;
pub(crate) use io_engine::IoEngineKind;
pub(crate) use metadata::Metadata;
pub(crate) use open_options::*;
use self::owned_buffers_io::write::OwnedAsyncWriter;
pub(crate) mod owned_buffers_io {
//! Abstractions for IO with owned buffers.
//!
@@ -1371,7 +1369,7 @@ pub(crate) type IoPageSlice<'a> =
static IO_MODE: once_cell::sync::Lazy<AtomicU8> =
once_cell::sync::Lazy::new(|| AtomicU8::new(IoMode::preferred() as u8));
pub fn set_io_mode(mode: IoMode) {
pub(crate) fn set_io_mode(mode: IoMode) {
IO_MODE.store(mode as u8, std::sync::atomic::Ordering::Relaxed);
}

View File

@@ -1,4 +1,5 @@
mod flush;
use std::sync::Arc;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
@@ -40,6 +41,7 @@ pub trait OwnedAsyncWriter {
// TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
// since we would avoid copying majority of the data into the internal buffer.
pub struct BufferedWriter<B: Buffer, W> {
writer: Arc<W>,
/// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after.
pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
@@ -70,7 +72,7 @@ where
///
/// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
pub fn new(
writer: W,
writer: Arc<W>,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -78,6 +80,7 @@ where
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
mutable: Some(buf_new()),
maybe_flushed: None,
flush_handle: FlushHandle::spawn_new(
@@ -92,6 +95,10 @@ where
}
}
pub fn as_inner(&self) -> &W {
&self.writer
}
/// Returns the number of bytes submitted to the background flush task.
pub fn bytes_submitted(&self) -> u64 {
self.bytes_submitted
@@ -109,16 +116,20 @@ where
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn shutdown(mut self, ctx: &RequestContext) -> Result<(u64, W), FlushTaskError> {
pub async fn flush_and_into_inner(
mut self,
ctx: &RequestContext,
) -> Result<(u64, Arc<W>), FlushTaskError> {
self.flush(ctx).await?;
let Self {
mutable: buf,
maybe_flushed: _,
writer,
mut flush_handle,
bytes_submitted: bytes_amount,
} = self;
let writer = flush_handle.shutdown().await?;
flush_handle.shutdown().await?;
assert!(buf.is_some());
Ok((bytes_amount, writer))
}
@@ -318,7 +329,7 @@ mod tests {
async fn test_write_all_borrowed_always_goes_through_buffer() -> anyhow::Result<()> {
let ctx = test_ctx();
let ctx = &ctx;
let recorder = RecorderWriter::default();
let recorder = Arc::new(RecorderWriter::default());
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
@@ -339,7 +350,7 @@ mod tests {
writer.write_buffered_borrowed(b"j", ctx).await?;
writer.write_buffered_borrowed(b"klmno", ctx).await?;
let (_, recorder) = writer.shutdown(ctx).await?;
let (_, recorder) = writer.flush_and_into_inner(ctx).await?;
assert_eq!(
recorder.get_writes(),
{

View File

@@ -1,4 +1,5 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
@@ -20,7 +21,7 @@ pub struct FlushHandleInner<Buf, W> {
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
}
struct FlushRequest<Buf> {
@@ -119,7 +120,7 @@ where
/// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
/// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
pub fn spawn_new<B>(
file: W,
file: Arc<W>,
buf: B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
@@ -182,7 +183,7 @@ where
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> Result<W, FlushTaskError> {
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
let handle = self
.inner
.take()
@@ -206,7 +207,7 @@ pub struct FlushBackgroundTask<Buf, W> {
/// and send back recycled buffer.
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
/// A writter for persisting data to disk.
writer: W,
writer: Arc<W>,
ctx: RequestContext,
cancel: CancellationToken,
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
@@ -227,7 +228,7 @@ where
/// Creates a new background flush task.
fn new(
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: W,
file: Arc<W>,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
ctx: RequestContext,
@@ -242,7 +243,7 @@ where
}
/// Runs the background flush task.
async fn run(mut self) -> Result<W, FlushTaskError> {
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]

View File

@@ -1,106 +0,0 @@
use tracing::error;
use utils::sync::gate::GateGuard;
use crate::context::RequestContext;
use super::{
MaybeFatalIo, VirtualFile,
owned_buffers_io::{
io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice, write::OwnedAsyncWriter,
},
};
/// A wrapper around [`super::VirtualFile`] that deletes the file on drop.
/// For use as a [`OwnedAsyncWriter`] in [`super::owned_buffers_io::write::BufferedWriter`].
#[derive(Debug)]
pub struct TempVirtualFile {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
file: VirtualFile,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop)
_gate_guard: GateGuard,
}
impl OwnedAsyncWriter for TempVirtualFile {
fn write_all_at<Buf: IoBufAligned + Send>(
&self,
buf: FullSlice<Buf>,
offset: u64,
ctx: &RequestContext,
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send {
VirtualFile::write_all_at(self, buf, offset, ctx)
}
}
impl Drop for TempVirtualFile {
fn drop(&mut self) {
let Some(Inner { file, _gate_guard }) = self.inner.take() else {
return;
};
let path = file.path();
if let Err(e) =
std::fs::remove_file(path).maybe_fatal_err("failed to remove the virtual file")
{
error!(err=%e, path=%path, "failed to remove");
}
drop(_gate_guard);
}
}
impl std::ops::Deref for TempVirtualFile {
type Target = VirtualFile;
fn deref(&self) -> &Self::Target {
&self
.inner
.as_ref()
.expect("only None after into_inner or drop")
.file
}
}
impl std::ops::DerefMut for TempVirtualFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self
.inner
.as_mut()
.expect("only None after into_inner or drop")
.file
}
}
impl TempVirtualFile {
/// The caller is responsible for ensuring that the path of `virtual_file` is not reused
/// until after this TempVirtualFile's `Drop` impl has completed.
/// Failure to do so will result in unlinking of the reused path by the original instance's Drop impl.
/// The best way to do so is by using a monotonic counter as a disambiguator.
/// TODO: centralize this disambiguator pattern inside this struct.
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn new(virtual_file: VirtualFile, gate_guard: GateGuard) -> Self {
Self {
inner: Some(Inner {
file: virtual_file,
_gate_guard: gate_guard,
}),
}
}
/// Dismantle this wrapper and return the underlying [`VirtualFile`].
/// This disables auto-unlinking functionality that is the essence of this wrapper.
///
/// The gate guard is dropped as well; it is the callers responsibility to ensure filesystem
/// operations after calls to this functions are still gated by some other gate guard.
///
/// TODO:
/// - centralize the common usage pattern of callers (sync_all(self), rename(self, dst), sync_all(dst.parent))
/// => <https://github.com/neondatabase/neon/pull/11549#issuecomment-2824592831>
pub fn disarm_into_inner(mut self) -> VirtualFile {
self.inner
.take()
.expect("only None after into_inner or drop, and we are into_inner, and we consume")
.file
}
}

View File

@@ -803,13 +803,7 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
#ifdef DEBUG_COMPARE_LOCAL
mdcreate(reln, forkNum, forkNum == INIT_FORKNUM || isRedo);
if (forkNum == MAIN_FORKNUM)
mdcreate(reln, INIT_FORKNUM, true);
#else
mdcreate(reln, forkNum, isRedo);
#endif
return;
default:
@@ -1979,10 +1973,6 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, INIT_FORKNUM, true);
#endif
return;
default:
@@ -2005,14 +1995,12 @@ neon_start_unlogged_build(SMgrRelation reln)
* FIXME: should we pass isRedo true to create the tablespace dir if it
* doesn't exist? Is it needed?
*/
if (!IsParallelWorker())
{
#ifndef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
mdcreate(reln, MAIN_FORKNUM, false);
#else
mdcreate(reln, INIT_FORKNUM, true);
mdcreate(reln, INIT_FORKNUM, false);
#endif
}
}
/*
@@ -2111,12 +2099,12 @@ neon_end_unlogged_build(SMgrRelation reln)
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
mdunlink(rinfob, forknum, true);
#else
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
#ifdef DEBUG_COMPARE_LOCAL
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}

View File

@@ -91,7 +91,6 @@ mod jemalloc;
mod logging;
mod metrics;
mod parse;
mod pglb;
mod protocol2;
mod proxy;
mod rate_limiter;

View File

@@ -1,193 +0,0 @@
#![allow(dead_code, reason = "TODO: work in progress")]
use std::pin::{Pin, pin};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{fmt, io};
use tokio::io::{AsyncRead, AsyncWrite, DuplexStream, ReadBuf};
use tokio::sync::mpsc;
const STREAM_CHANNEL_SIZE: usize = 16;
const MAX_STREAM_BUFFER_SIZE: usize = 4096;
#[derive(Debug)]
pub struct Connection {
stream_sender: mpsc::Sender<Stream>,
stream_receiver: mpsc::Receiver<Stream>,
stream_id_counter: Arc<AtomicUsize>,
}
impl Connection {
pub fn new() -> (Connection, Connection) {
let (sender_a, receiver_a) = mpsc::channel(STREAM_CHANNEL_SIZE);
let (sender_b, receiver_b) = mpsc::channel(STREAM_CHANNEL_SIZE);
let stream_id_counter = Arc::new(AtomicUsize::new(1));
let conn_a = Connection {
stream_sender: sender_a,
stream_receiver: receiver_b,
stream_id_counter: Arc::clone(&stream_id_counter),
};
let conn_b = Connection {
stream_sender: sender_b,
stream_receiver: receiver_a,
stream_id_counter,
};
(conn_a, conn_b)
}
#[inline]
fn next_stream_id(&self) -> StreamId {
StreamId(self.stream_id_counter.fetch_add(1, Ordering::Relaxed))
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn open_stream(&self) -> io::Result<Stream> {
let (local, remote) = tokio::io::duplex(MAX_STREAM_BUFFER_SIZE);
let stream_id = self.next_stream_id();
tracing::Span::current().record("stream_id", stream_id.0);
let local = Stream {
inner: local,
id: stream_id,
};
let remote = Stream {
inner: remote,
id: stream_id,
};
self.stream_sender
.send(remote)
.await
.map_err(io::Error::other)?;
Ok(local)
}
#[tracing::instrument(skip_all, fields(stream_id = tracing::field::Empty, err))]
pub async fn accept_stream(&mut self) -> io::Result<Option<Stream>> {
Ok(self.stream_receiver.recv().await.inspect(|stream| {
tracing::Span::current().record("stream_id", stream.id.0);
}))
}
}
#[derive(Copy, Clone, Debug)]
pub struct StreamId(usize);
impl fmt::Display for StreamId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
// TODO: Proper closing. Currently Streams can outlive their Connections.
// Carry WeakSender and check strong_count?
#[derive(Debug)]
pub struct Stream {
inner: DuplexStream,
id: StreamId,
}
impl Stream {
#[inline]
pub fn id(&self) -> StreamId {
self.id
}
}
impl AsyncRead for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
pin!(&mut self.inner).poll_read(cx, buf)
}
}
impl AsyncWrite for Stream {
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write(cx, buf)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_flush(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
pin!(&mut self.inner).poll_shutdown(cx)
}
#[tracing::instrument(level = "debug", skip_all, fields(stream_id = %self.id))]
#[inline]
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
pin!(&mut self.inner).poll_write_vectored(cx, bufs)
}
#[inline]
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}
#[cfg(test)]
mod tests {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::*;
#[tokio::test]
async fn test_simple_roundtrip() {
let (client, mut server) = Connection::new();
let server_task = tokio::spawn(async move {
while let Some(mut stream) = server.accept_stream().await.unwrap() {
tokio::spawn(async move {
let mut buf = [0; 64];
loop {
match stream.read(&mut buf).await.unwrap() {
0 => break,
n => stream.write(&buf[..n]).await.unwrap(),
};
}
});
}
});
let mut stream = client.open_stream().await.unwrap();
stream.write_all(b"hello!").await.unwrap();
let mut buf = [0; 64];
let n = stream.read(&mut buf).await.unwrap();
assert_eq!(n, 6);
assert_eq!(&buf[..n], b"hello!");
drop(stream);
drop(client);
server_task.await.unwrap();
}
}

View File

@@ -1 +0,0 @@
pub mod inprocess;

View File

@@ -12,7 +12,7 @@ use pin_project_lite::pin_project;
use smol_str::SmolStr;
use strum_macros::FromRepr;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use zerocopy::{FromBytes, Immutable, KnownLayout, Unaligned, network_endian};
use zerocopy::{FromBytes, FromZeroes};
pin_project! {
/// A chained [`AsyncRead`] with [`AsyncWrite`] passthrough
@@ -339,49 +339,49 @@ trait BufExt: Sized {
}
impl BufExt for BytesMut {
fn try_get<T: FromBytes>(&mut self) -> Option<T> {
let (res, _) = T::read_from_prefix(self).ok()?;
let res = T::read_from_prefix(self)?;
self.advance(size_of::<T>());
Some(res)
}
}
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
struct ProxyProtocolV2Header {
signature: [u8; 12],
version_and_command: u8,
protocol_and_family: u8,
len: network_endian::U16,
len: zerocopy::byteorder::network_endian::U16,
}
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
struct ProxyProtocolV2HeaderV4 {
src_addr: NetworkEndianIpv4,
dst_addr: NetworkEndianIpv4,
src_port: network_endian::U16,
dst_port: network_endian::U16,
src_port: zerocopy::byteorder::network_endian::U16,
dst_port: zerocopy::byteorder::network_endian::U16,
}
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
struct ProxyProtocolV2HeaderV6 {
src_addr: NetworkEndianIpv6,
dst_addr: NetworkEndianIpv6,
src_port: network_endian::U16,
dst_port: network_endian::U16,
src_port: zerocopy::byteorder::network_endian::U16,
dst_port: zerocopy::byteorder::network_endian::U16,
}
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[repr(C, packed)]
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(C)]
struct TlvHeader {
kind: u8,
len: network_endian::U16,
len: zerocopy::byteorder::network_endian::U16,
}
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(transparent)]
struct NetworkEndianIpv4(network_endian::U32);
struct NetworkEndianIpv4(zerocopy::byteorder::network_endian::U32);
impl NetworkEndianIpv4 {
#[inline]
fn get(self) -> Ipv4Addr {
@@ -389,9 +389,9 @@ impl NetworkEndianIpv4 {
}
}
#[derive(FromBytes, KnownLayout, Immutable, Unaligned, Copy, Clone)]
#[derive(FromBytes, FromZeroes, Copy, Clone)]
#[repr(transparent)]
struct NetworkEndianIpv6(network_endian::U128);
struct NetworkEndianIpv6(zerocopy::byteorder::network_endian::U128);
impl NetworkEndianIpv6 {
#[inline]
fn get(self) -> Ipv6Addr {

View File

@@ -14,7 +14,6 @@ use clap::{ArgAction, Parser};
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use http_utils::tls_certs::ReloadingCertificateResolver;
use metrics::set_build_info_metric;
use remote_storage::RemoteStorageConfig;
use safekeeper::defaults::{
@@ -24,8 +23,8 @@ use safekeeper::defaults::{
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf, WAL_SERVICE_RUNTIME, broker,
control_file, http, wal_backup, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};
@@ -216,26 +215,16 @@ struct Args {
ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files.
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)]
ssl_cert_reload_period: Duration,
pub ssl_cert_reload_period: Duration,
/// Trusted root CA certificates to use in https APIs.
#[arg(long)]
ssl_ca_file: Option<Utf8PathBuf>,
pub ssl_ca_file: Option<Utf8PathBuf>,
/// Flag to use https for requests to peer's safekeeper API.
#[arg(long)]
use_https_safekeeper_api: bool,
pub use_https_safekeeper_api: bool,
/// Path to the JWT auth token used to authenticate with other safekeepers.
#[arg(long)]
auth_token_path: Option<Utf8PathBuf>,
/// Enable TLS in WAL service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
#[arg(long)]
enable_tls_wal_service_api: bool,
/// Run in development mode (disables security checks)
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -429,7 +418,6 @@ async fn main() -> anyhow::Result<()> {
ssl_cert_reload_period: args.ssl_cert_reload_period,
ssl_ca_certs,
use_https_safekeeper_api: args.use_https_safekeeper_api,
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
});
// initialize sentry if SENTRY_DSN is provided
@@ -529,36 +517,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
info!("running in current thread runtime");
}
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_wal_service_api {
let ssl_key_file = conf.ssl_key_file.clone();
let ssl_cert_file = conf.ssl_cert_file.clone();
let ssl_cert_reload_period = conf.ssl_cert_reload_period;
// Create resolver in BACKGROUND_RUNTIME, so the background certificate reloading
// task is run in this runtime.
let cert_resolver = current_thread_rt
.as_ref()
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
.spawn(async move {
ReloadingCertificateResolver::new(
"main",
&ssl_key_file,
&ssl_cert_file,
ssl_cert_reload_period,
)
.await
})
.await??;
let config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
Some(Arc::new(config))
} else {
None
};
let wal_service_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle())
@@ -566,9 +524,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
conf.clone(),
pg_listener,
Scope::SafekeeperData,
conf.enable_tls_wal_service_api
.then(|| tls_server_config.clone())
.flatten(),
global_timelines.clone(),
))
// wrap with task name for error reporting
@@ -597,9 +552,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
conf.clone(),
pg_listener_tenant_only,
Scope::Tenant,
conf.enable_tls_wal_service_api
.then(|| tls_server_config.clone())
.flatten(),
global_timelines.clone(),
))
// wrap with task name for error reporting
@@ -625,7 +577,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.spawn(http::task_main_https(
conf.clone(),
https_listener,
tls_server_config.expect("tls_server_config is set earlier if https is enabled"),
global_timelines.clone(),
))
.map(|res| ("HTTPS service main".to_owned(), res));

View File

@@ -1,6 +1,7 @@
pub mod routes;
use std::sync::Arc;
use http_utils::tls_certs::ReloadingCertificateResolver;
pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
@@ -27,10 +28,21 @@ pub async fn task_main_http(
pub async fn task_main_https(
conf: Arc<SafeKeeperConf>,
https_listener: std::net::TcpListener,
tls_config: Arc<rustls::ServerConfig>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
let cert_resolver = ReloadingCertificateResolver::new(
"main",
&conf.ssl_key_file,
&conf.ssl_cert_file,
conf.ssl_cert_reload_period,
)
.await?;
let server_config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(cert_resolver);
let tls_acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(server_config));
let router = make_router(conf, global_timelines)
.build()

View File

@@ -122,7 +122,6 @@ pub struct SafeKeeperConf {
pub ssl_cert_reload_period: Duration,
pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool,
pub enable_tls_wal_service_api: bool,
}
impl SafeKeeperConf {
@@ -173,7 +172,6 @@ impl SafeKeeperConf {
ssl_cert_reload_period: Duration::from_secs(60),
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
}
}
}
@@ -211,12 +209,3 @@ pub static WAL_BACKUP_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
.build()
.expect("Failed to create WAL backup runtime")
});
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("background worker")
.worker_threads(1) // there is only one task now (ssl certificate reloading), having more threads doesn't make sense
.enable_all()
.build()
.expect("Failed to create background runtime")
});

View File

@@ -29,7 +29,6 @@ pub async fn task_main(
conf: Arc<SafeKeeperConf>,
pg_listener: std::net::TcpListener,
allowed_auth_scope: Scope,
tls_config: Option<Arc<rustls::ServerConfig>>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
// Tokio's from_std won't do this for us, per its comment.
@@ -44,10 +43,9 @@ pub async fn task_main(
let conf = conf.clone();
let conn_id = issue_connection_id(&mut connection_count);
let global_timelines = global_timelines.clone();
let tls_config = tls_config.clone();
tokio::spawn(
async move {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, tls_config, global_timelines).await {
if let Err(err) = handle_socket(socket, conf, conn_id, allowed_auth_scope, global_timelines).await {
error!("connection handler exited: {}", err);
}
}
@@ -63,7 +61,6 @@ async fn handle_socket(
conf: Arc<SafeKeeperConf>,
conn_id: ConnectionId,
allowed_auth_scope: Scope,
tls_config: Option<Arc<rustls::ServerConfig>>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<(), QueryError> {
socket.set_nodelay(true)?;
@@ -113,8 +110,7 @@ async fn handle_socket(
auth_pair,
global_timelines,
);
let pgbackend =
PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, tls_config)?;
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend

View File

@@ -185,7 +185,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
ssl_cert_reload_period: Duration::ZERO,
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -1 +0,0 @@
DROP TABLE timeline_imports;

View File

@@ -1,6 +0,0 @@
CREATE TABLE timeline_imports (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
shard_statuses JSONB NOT NULL,
PRIMARY KEY(tenant_id, timeline_id)
);

View File

@@ -30,9 +30,7 @@ use pageserver_api::models::{
TimelineArchivalConfigRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_api::upcall_api::{
PutTimelineImportStatusRequest, ReAttachRequest, ValidateRequest,
};
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
use pageserver_client::{BlockUnblock, mgmt_api};
use routerify::Middleware;
use tokio_util::sync::CancellationToken;
@@ -156,28 +154,6 @@ async fn handle_validate(req: Request<Body>) -> Result<Response<Body>, ApiError>
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}
async fn handle_put_timeline_import_status(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::GenerationsApi)?;
let mut req = match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
}
ForwardOutcome::NotForwarded(req) => req,
};
let put_req = json_request::<PutTimelineImportStatusRequest>(&mut req).await?;
let state = get_state(&req);
json_response(
StatusCode::OK,
state
.service
.handle_timeline_shard_import_progress_upcall(put_req)
.await?,
)
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
/// (in the real control plane this is unnecessary, because the same program is managing
/// generation numbers and doing attachments).
@@ -1985,13 +1961,6 @@ pub fn make_router(
.post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
})
.post("/upcall/v1/timeline_import_status", |r| {
named_request_span(
r,
handle_put_timeline_import_status,
RequestName("upcall_v1_timeline_import_status"),
)
})
// Test/dev/debug endpoints
.post("/debug/v1/attach-hook", |r| {
named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))

View File

@@ -23,7 +23,6 @@ mod scheduler;
mod schema;
pub mod service;
mod tenant_shard;
mod timeline_import;
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
struct Sequence(u64);

View File

@@ -212,21 +212,6 @@ impl PageserverClient {
)
}
pub(crate) async fn timeline_detail(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<TimelineInfo> {
measured_request!(
"timeline_detail",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner
.timeline_detail(tenant_shard_id, timeline_id)
.await
)
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, NodeSchedulingPolicy, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
};
use pageserver_api::models::{ShardImportStatus, TenantConfig};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::{
ShardConfigError, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
@@ -40,9 +40,6 @@ use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
};
use crate::node::Node;
use crate::timeline_import::{
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
};
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
/// ## What do we store?
@@ -130,9 +127,6 @@ pub(crate) enum DatabaseOperation {
RemoveTimelineReconcile,
ListTimelineReconcile,
ListTimelineReconcileStartup,
InsertTimelineImport,
UpdateTimelineImport,
DeleteTimelineImport,
}
#[must_use]
@@ -1620,129 +1614,6 @@ impl Persistence {
Ok(())
}
pub(crate) async fn insert_timeline_import(
&self,
import: TimelineImportPersistence,
) -> DatabaseResult<bool> {
self.with_measured_conn(DatabaseOperation::InsertTimelineImport, move |conn| {
Box::pin({
let import = import.clone();
async move {
let inserted = diesel::insert_into(crate::schema::timeline_imports::table)
.values(import)
.execute(conn)
.await?;
Ok(inserted == 1)
}
})
})
.await
}
pub(crate) async fn delete_timeline_import(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::DeleteTimelineImport, move |conn| {
Box::pin(async move {
diesel::delete(crate::schema::timeline_imports::table)
.filter(
dsl::tenant_id
.eq(tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.execute(conn)
.await?;
Ok(())
})
})
.await
}
/// Idempotently update the status of one shard for an ongoing timeline import
///
/// If the update was persisted to the database, then the current state of the
/// import is returned to the caller. In case of logical errors a bespoke
/// [`TimelineImportUpdateError`] instance is returned. Other database errors
/// are covered by the outer [`DatabaseError`].
pub(crate) async fn update_timeline_import(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
shard_status: ShardImportStatus,
) -> DatabaseResult<Result<Option<TimelineImport>, TimelineImportUpdateError>> {
use crate::schema::timeline_imports::dsl;
self.with_measured_conn(DatabaseOperation::UpdateTimelineImport, move |conn| {
Box::pin({
let shard_status = shard_status.clone();
async move {
// Load the current state from the database
let mut from_db: Vec<TimelineImportPersistence> = dsl::timeline_imports
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.load(conn)
.await?;
assert!(from_db.len() <= 1);
let mut status = match from_db.pop() {
Some(some) => TimelineImport::from_persistent(some).unwrap(),
None => {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
};
// Perform the update in-memory
let follow_up = match status.update(tenant_shard_id.to_index(), shard_status) {
Ok(ok) => ok,
Err(err) => {
return Ok(Err(err));
}
};
let new_persistent = status.to_persistent();
// Write back if required (in the same transaction)
match follow_up {
TimelineImportUpdateFollowUp::Persist => {
let updated = diesel::update(dsl::timeline_imports)
.filter(
dsl::tenant_id
.eq(tenant_shard_id.tenant_id.to_string())
.and(dsl::timeline_id.eq(timeline_id.to_string())),
)
.set(dsl::shard_statuses.eq(new_persistent.shard_statuses))
.execute(conn)
.await?;
if updated != 1 {
return Ok(Err(TimelineImportUpdateError::ImportNotFound {
tenant_id: tenant_shard_id.tenant_id,
timeline_id,
}));
}
Ok(Ok(Some(status)))
}
TimelineImportUpdateFollowUp::None => Ok(Ok(None)),
}
}
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -2300,11 +2171,3 @@ impl ToSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
.map_err(Into::into)
}
}
#[derive(Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Clone)]
#[diesel(table_name = crate::schema::timeline_imports)]
pub(crate) struct TimelineImportPersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) shard_statuses: serde_json::Value,
}

View File

@@ -76,14 +76,6 @@ diesel::table! {
}
}
diesel::table! {
timeline_imports (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
shard_statuses -> Jsonb,
}
}
diesel::table! {
use diesel::sql_types::*;
use super::sql_types::PgLsn;
@@ -107,6 +99,5 @@ diesel::allow_tables_to_appear_in_same_query!(
safekeeper_timeline_pending_ops,
safekeepers,
tenant_shards,
timeline_imports,
timelines,
);

View File

@@ -40,14 +40,14 @@ use pageserver_api::models::{
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TimelineState, TopTenantShardItem, TopTenantShardsRequest,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
DEFAULT_STRIPE_SIZE, ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
use pageserver_api::upcall_api::{
PutTimelineImportStatusRequest, ReAttachRequest, ReAttachResponse, ReAttachResponseTenant,
ValidateRequest, ValidateResponse, ValidateResponseTenant,
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse,
ValidateResponseTenant,
};
use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode};
@@ -97,7 +97,6 @@ use crate::tenant_shard::{
ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter,
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -3733,14 +3732,11 @@ impl Service {
create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = self.config.timelines_onto_safekeepers;
let timeline_id = create_req.new_timeline_id;
tracing::info!(
mode=%create_req.mode_tag(),
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
timeline_id,
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
@@ -3750,62 +3746,15 @@ impl Service {
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let is_import = create_req.is_import();
let create_mode = create_req.mode.clone();
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
let selected_safekeepers = if is_import {
let shards = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.map(|(ts_id, _)| ts_id.to_index())
.collect::<Vec<_>>()
};
if !shards
.iter()
.map(|shard_index| shard_index.shard_count)
.all_equal()
{
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Inconsistent shard count"
)));
}
let import = TimelineImport {
tenant_id,
timeline_id,
shard_statuses: ShardImportStatuses::new(shards),
};
let inserted = self
.persistence
.insert_timeline_import(import.to_persistent())
.await
.context("timeline import insert")
.map_err(ApiError::InternalServerError)?;
match inserted {
true => {
tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import");
}
false => {
tracing::info!(%tenant_id, %timeline_id, "Timeline import entry already present");
}
}
None
} else if safekeepers {
// Note that we do not support creating the timeline on the safekeepers
// for imported timelines. The `start_lsn` of the timeline is not known
// until the import finshes.
// https://github.com/neondatabase/neon/issues/11569
let safekeepers = if safekeepers {
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info)
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
.await?;
Some(res)
@@ -3815,168 +3764,10 @@ impl Service {
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers: selected_safekeepers,
safekeepers,
})
}
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,
) -> Result<(), ApiError> {
let res = self
.persistence
.update_timeline_import(req.tenant_shard_id, req.timeline_id, req.status)
.await;
let timeline_import = match res {
Ok(Ok(Some(timeline_import))) => timeline_import,
Ok(Ok(None)) => {
// Idempotency: we've already seen and handled this update.
return Ok(());
}
Ok(Err(logical_err)) => {
return Err(logical_err.into());
}
Err(db_err) => {
return Err(db_err.into());
}
};
tracing::info!(
tenant_id=%req.tenant_shard_id.tenant_id,
timeline_id=%req.timeline_id,
shard_id=%req.tenant_shard_id.shard_slug(),
"Updated timeline import status to: {timeline_import:?}");
if timeline_import.is_complete() {
tokio::task::spawn({
let this = self.clone();
async move { this.finalize_timeline_import(timeline_import).await }
});
}
Ok(())
}
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
shard_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> anyhow::Result<()> {
// TODO(vlad): On start-up, load up the imports and notify cplane of the
// ones that have been completed. This assumes the new cplane API will
// be idempotent. If that's not possible, bang a flag in the database.
// https://github.com/neondatabase/neon/issues/11570
tracing::info!("Finalizing timeline import");
let import_failed = import.completion_error().is_some();
if !import_failed {
loop {
if self.cancel.is_cancelled() {
anyhow::bail!("Shut down requested while finalizing import");
}
let active = self.timeline_active_on_all_shards(&import).await?;
match active {
true => {
tracing::info!("Timeline became active on all shards");
break;
}
false => {
tracing::info!("Timeline not active on all shards yet");
tokio::select! {
_ = self.cancel.cancelled() => {
anyhow::bail!("Shut down requested while finalizing import");
},
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
};
}
}
}
}
tracing::info!(%import_failed, "Notifying cplane of import completion");
let client = UpcallClient::new(self.get_config(), self.cancel.child_token());
client.notify_import_complete(&import).await?;
if let Err(err) = self
.persistence
.delete_timeline_import(import.tenant_id, import.timeline_id)
.await
{
tracing::warn!("Failed to delete timeline import entry from database: {err}");
}
// TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn,
// so we can't create the timeline on the safekeepers. Fix by moving creation here.
// https://github.com/neondatabase/neon/issues/11569
tracing::info!(%import_failed, "Timeline import complete");
Ok(())
}
async fn timeline_active_on_all_shards(
self: &Arc<Self>,
import: &TimelineImport,
) -> anyhow::Result<bool> {
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in locked
.tenants
.range(TenantShardId::tenant_range(import.tenant_id))
{
if !import
.shard_statuses
.0
.contains_key(&tenant_shard_id.to_index())
{
anyhow::bail!("Shard layout change detected on completion");
}
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
} else {
return Ok(false);
}
}
targets
};
let results = self
.tenant_for_shards_api(
targets,
|tenant_shard_id, client| async move {
client
.timeline_detail(tenant_shard_id, import.timeline_id)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
Ok(results.into_iter().all(|res| match res {
Ok(info) => info.state == TimelineState::Active,
Err(_) => false,
}))
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,

View File

@@ -15,7 +15,7 @@ use http_utils::error::ApiError;
use pageserver_api::controller_api::{
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
};
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
@@ -151,39 +151,11 @@ impl Service {
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
remaining.len()
);
let target_sk_count = timeline_persistence.sk_set.len();
let quorum_size = match target_sk_count {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"timeline configured without any safekeepers",
)));
}
1 | 2 => {
#[cfg(feature = "testing")]
{
// In test settings, it is allowed to have one or two safekeepers
target_sk_count
}
#[cfg(not(feature = "testing"))]
{
// The region is misconfigured: we need at least three safekeepers to be configured
// in order to schedule work to them
tracing::warn!(
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
timeline_persistence.sk_set
);
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find at least 3 safekeepers to put timeline to"
)));
}
}
_ => target_sk_count / 2 + 1,
};
let success_count = target_sk_count - remaining.len();
if success_count < quorum_size {
if remaining.len() >= 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
"not enough successful reconciliations to reach quorum, please retry: {} errored",
remaining.len()
)));
}
@@ -207,6 +179,7 @@ impl Service {
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
@@ -216,8 +189,15 @@ impl Service {
// previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn.
let start_lsn = timeline_info.last_record_lsn;
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
@@ -512,6 +492,8 @@ impl Service {
pub(crate) async fn safekeepers_for_new_timeline(
&self,
) -> Result<Vec<SafekeeperInfo>, ApiError> {
// Number of safekeepers in different AZs we are looking for
let wanted_count = 3;
let mut all_safekeepers = {
let locked = self.inner.read().unwrap();
locked
@@ -550,19 +532,6 @@ impl Service {
sk.1.id.0,
)
});
// Number of safekeepers in different AZs we are looking for
let wanted_count = match all_safekeepers.len() {
0 => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find any active safekeeper for new timeline",
)));
}
// Have laxer requirements on testig mode as we don't want to
// spin up three safekeepers for every single test
#[cfg(feature = "testing")]
1 | 2 => all_safekeepers.len(),
_ => 3,
};
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {

View File

@@ -1,260 +0,0 @@
use std::time::Duration;
use std::{collections::HashMap, str::FromStr};
use http_utils::error::ApiError;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use pageserver_api::models::ShardImportStatus;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
shard::ShardIndex,
};
use crate::{persistence::TimelineImportPersistence, service::Config};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
impl ShardImportStatuses {
pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
ShardImportStatuses(
shards
.into_iter()
.map(|ts_id| (ts_id, ShardImportStatus::InProgress))
.collect(),
)
}
}
#[derive(Debug)]
pub(crate) struct TimelineImport {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) shard_statuses: ShardImportStatuses,
}
pub(crate) enum TimelineImportUpdateFollowUp {
Persist,
None,
}
pub(crate) enum TimelineImportUpdateError {
ImportNotFound {
tenant_id: TenantId,
timeline_id: TimelineId,
},
MismatchedShards,
UnexpectedUpdate,
}
impl From<TimelineImportUpdateError> for ApiError {
fn from(err: TimelineImportUpdateError) -> ApiError {
match err {
TimelineImportUpdateError::ImportNotFound {
tenant_id,
timeline_id,
} => ApiError::NotFound(
anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
),
TimelineImportUpdateError::MismatchedShards => {
ApiError::InternalServerError(anyhow::anyhow!(
"Import shards do not match update request, likely a shard split happened during import, this is a bug"
))
}
TimelineImportUpdateError::UnexpectedUpdate => {
ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
}
}
}
}
impl TimelineImport {
pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
Ok(TimelineImport {
tenant_id,
timeline_id,
shard_statuses,
})
}
pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
TimelineImportPersistence {
tenant_id: self.tenant_id.to_string(),
timeline_id: self.timeline_id.to_string(),
shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
}
}
pub(crate) fn update(
&mut self,
shard: ShardIndex,
status: ShardImportStatus,
) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
use std::collections::hash_map::Entry::*;
match self.shard_statuses.0.entry(shard) {
Occupied(mut occ) => {
let crnt = occ.get_mut();
if *crnt == status {
Ok(TimelineImportUpdateFollowUp::None)
} else if crnt.is_terminal() && !status.is_terminal() {
Err(TimelineImportUpdateError::UnexpectedUpdate)
} else {
*crnt = status;
Ok(TimelineImportUpdateFollowUp::Persist)
}
}
Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
}
}
pub(crate) fn is_complete(&self) -> bool {
self.shard_statuses
.0
.values()
.all(|status| status.is_terminal())
}
pub(crate) fn completion_error(&self) -> Option<String> {
assert!(self.is_complete());
let shard_errors: HashMap<_, _> = self
.shard_statuses
.0
.iter()
.filter_map(|(shard, status)| {
if let ShardImportStatus::Error(err) = status {
Some((*shard, err.clone()))
} else {
None
}
})
.collect();
if shard_errors.is_empty() {
None
} else {
Some(serde_json::to_string(&shard_errors).unwrap())
}
}
}
pub(crate) struct UpcallClient {
authorization_header: Option<String>,
client: reqwest::Client,
cancel: CancellationToken,
base_url: String,
}
const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Debug)]
struct ImportCompleteRequest {
tenant_id: TenantId,
timeline_id: TimelineId,
error: Option<String>,
}
impl UpcallClient {
pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
let authorization_header = config
.control_plane_jwt_token
.clone()
.map(|jwt| format!("Bearer {}", jwt));
let client = reqwest::ClientBuilder::new()
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
.build()
.expect("Failed to construct HTTP client");
let base_url = config
.control_plane_url
.clone()
.expect("must be configured");
Self {
authorization_header,
client,
cancel,
base_url,
}
}
/// Notify control plane of a completed import
///
/// This method guarantees at least once delivery semantics assuming
/// eventual cplane availability. The cplane API is idempotent.
pub(crate) async fn notify_import_complete(
&self,
import: &TimelineImport,
) -> anyhow::Result<()> {
let endpoint = if self.base_url.ends_with('/') {
format!("{}import_complete", self.base_url)
} else {
format!("{}/import_complete", self.base_url)
};
tracing::info!("Endpoint is {endpoint}");
let request = self
.client
.request(Method::PUT, endpoint)
.json(&ImportCompleteRequest {
tenant_id: import.tenant_id,
timeline_id: import.timeline_id,
error: import.completion_error(),
})
.timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
let request = if let Some(auth) = &self.authorization_header {
request.header(reqwest::header::AUTHORIZATION, auth)
} else {
request
};
const RETRY_DELAY: Duration = Duration::from_secs(1);
let mut attempt = 1;
loop {
if self.cancel.is_cancelled() {
return Err(anyhow::anyhow!(
"Shutting down while notifying cplane of import completion"
));
}
match request.try_clone().unwrap().send().await {
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) => {
tracing::warn!(
"Import complete notification failed with status {}, attempt {}",
response.status(),
attempt
);
}
Err(e) => {
tracing::warn!(
"Import complete notification failed with error: {}, attempt {}",
e,
attempt
);
}
}
tokio::select! {
_ = tokio::time::sleep(RETRY_DELAY) => {}
_ = self.cancel.cancelled() => {
return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
}
}
attempt += 1;
}
}
}

View File

@@ -5,6 +5,8 @@ edition = "2024"
license.workspace = true
[dependencies]
aws-config.workspace = true
aws-sdk-s3.workspace = true
either.workspace = true
anyhow.workspace = true
hex.workspace = true

View File

@@ -12,9 +12,14 @@ pub mod tenant_snapshot;
use std::env;
use std::fmt::Display;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use anyhow::Context;
use aws_config::retry::{RetryConfigBuilder, RetryMode};
use aws_sdk_s3::Client;
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use futures::{Stream, StreamExt};
@@ -23,7 +28,7 @@ use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_time
use pageserver_api::shard::TenantShardId;
use remote_storage::{
DownloadOpts, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig,
RemoteStorageKind, VersionId,
RemoteStorageKind, S3Config,
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
@@ -266,6 +271,7 @@ impl BucketConfig {
"container {}, storage account {:?}, region {}",
config.container_name, config.storage_account, config.container_region
),
RemoteStorageKind::GCS(config) => format!("bucket {}", config.bucket_name),
}
}
pub fn bucket_name(&self) -> Option<&str> {
@@ -346,6 +352,21 @@ pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
}
}
async fn init_s3_client(bucket_region: Region) -> Client {
let mut retry_config_builder = RetryConfigBuilder::new();
retry_config_builder
.set_max_attempts(Some(3))
.set_mode(Some(RetryMode::Adaptive));
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
.region(bucket_region)
.retry_config(retry_config_builder.build())
.load()
.await;
Client::new(&config)
}
fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
match node_kind {
NodeKind::Pageserver => "pageserver/v1/",
@@ -365,6 +386,23 @@ fn make_root_target(desc_str: String, prefix_in_bucket: String, node_kind: NodeK
}
}
async fn init_remote_s3(
bucket_config: S3Config,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.bucket_region);
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let s3_root = make_root_target(
bucket_config.bucket_name,
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
node_kind,
);
Ok((s3_client, s3_root))
}
async fn init_remote(
mut storage_config: BucketConfig,
node_kind: NodeKind,
@@ -381,6 +419,9 @@ async fn init_remote(
config.prefix_in_container.get_or_insert(default_prefix);
}
RemoteStorageKind::LocalFs { .. } => (),
RemoteStorageKind::GCS(config) => {
config.prefix_in_bucket.get_or_insert(default_prefix);
}
}
// We already pass the prefix to the remote client above
@@ -462,7 +503,7 @@ async fn list_objects_with_retries(
remote_client.bucket_name().unwrap_or_default(),
s3_target.prefix_in_bucket,
s3_target.delimiter,
e,
DisplayErrorContext(e),
);
let backoff_time = 1 << trial.min(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
@@ -512,18 +553,14 @@ async fn download_object_with_retries(
anyhow::bail!("Failed to download objects with key {key} {MAX_RETRIES} times")
}
async fn download_object_to_file(
remote_storage: &GenericRemoteStorage,
key: &RemotePath,
version_id: Option<VersionId>,
async fn download_object_to_file_s3(
s3_client: &Client,
bucket_name: &str,
key: &str,
version_id: Option<&str>,
local_path: &Utf8Path,
) -> anyhow::Result<()> {
let opts = DownloadOpts {
version_id: version_id.clone(),
..Default::default()
};
let tmp_path = Utf8PathBuf::from(format!("{local_path}.tmp"));
let cancel = CancellationToken::new();
for _ in 0..MAX_RETRIES {
tokio::fs::remove_file(&tmp_path)
.await
@@ -533,24 +570,28 @@ async fn download_object_to_file(
.await
.context("Opening output file")?;
let res = remote_storage.download(key, &opts, &cancel).await;
let request = s3_client.get_object().bucket(bucket_name).key(key);
let download = match res {
let request = match version_id {
Some(version_id) => request.version_id(version_id),
None => request,
};
let response_stream = match request.send().await {
Ok(response) => response,
Err(e) => {
error!(
"Failed to download object for key {key} version {:?}: {e:#}",
&version_id.as_ref().unwrap_or(&VersionId(String::new()))
"Failed to download object for key {key} version {}: {e:#}",
version_id.unwrap_or("")
);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
//response_stream.download_stream
let mut read_stream = response_stream.body.into_async_read();
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
tokio::io::copy(&mut body, &mut file).await?;
tokio::io::copy(&mut read_stream, &mut file).await?;
tokio::fs::rename(&tmp_path, local_path).await?;
return Ok(());

View File

@@ -1,30 +1,31 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context;
use async_stream::stream;
use aws_sdk_s3::Client;
use camino::Utf8PathBuf;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::IndexPart;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::remote_layer_path;
use pageserver::tenant::storage_layer::LayerName;
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use remote_storage::{GenericRemoteStorage, S3Config};
use utils::generation::Generation;
use utils::id::TenantId;
use crate::checks::{BlobDataParseResult, RemoteTimelineBlobData, list_timeline_blobs};
use crate::metadata_stream::{stream_tenant_shards, stream_tenant_timelines};
use crate::{
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file, init_remote,
BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, download_object_to_file_s3,
init_remote, init_remote_s3,
};
pub struct SnapshotDownloader {
remote_client: GenericRemoteStorage,
#[allow(dead_code)]
target: RootTarget,
s3_client: Arc<Client>,
s3_root: RootTarget,
bucket_config: BucketConfig,
bucket_config_s3: S3Config,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
@@ -37,13 +38,17 @@ impl SnapshotDownloader {
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let (remote_client, target) =
init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let bucket_config_s3 = match &bucket_config.0.storage {
remote_storage::RemoteStorageKind::AwsS3(config) => config.clone(),
_ => panic!("only S3 configuration is supported for snapshot downloading"),
};
let (s3_client, s3_root) =
init_remote_s3(bucket_config_s3.clone(), NodeKind::Pageserver).await?;
Ok(Self {
remote_client,
target,
s3_client,
s3_root,
bucket_config,
bucket_config_s3,
tenant_id,
output_path,
concurrency,
@@ -56,7 +61,6 @@ impl SnapshotDownloader {
layer_name: LayerName,
layer_metadata: LayerFileMetadata,
) -> anyhow::Result<(LayerName, LayerFileMetadata)> {
let cancel = CancellationToken::new();
// Note this is local as in a local copy of S3 data, not local as in the pageserver's local format. They use
// different layer names (remote-style has the generation suffix)
let local_path = self.output_path.join(format!(
@@ -78,27 +82,30 @@ impl SnapshotDownloader {
} else {
tracing::debug!("{} requires download...", local_path);
let remote_path = remote_layer_path(
&ttid.tenant_shard_id.tenant_id,
&ttid.timeline_id,
layer_metadata.shard,
&layer_name,
layer_metadata.generation,
let timeline_root = self.s3_root.timeline_root(&ttid);
let remote_layer_path = format!(
"{}{}{}",
timeline_root.prefix_in_bucket,
layer_name,
layer_metadata.generation.get_suffix()
);
let mode = remote_storage::ListingMode::NoDelimiter;
// List versions: the object might be deleted.
let versions = self
.remote_client
.list_versions(Some(&remote_path), mode, None, &cancel)
.s3_client
.list_object_versions()
.bucket(self.bucket_config_s3.bucket_name.clone())
.prefix(&remote_layer_path)
.send()
.await?;
let Some(version) = versions.versions.first() else {
return Err(anyhow::anyhow!("No versions found for {remote_path}"));
let Some(version) = versions.versions.as_ref().and_then(|v| v.first()) else {
return Err(anyhow::anyhow!("No versions found for {remote_layer_path}"));
};
download_object_to_file(
&self.remote_client,
&remote_path,
version.version_id().cloned(),
download_object_to_file_s3(
&self.s3_client,
&self.bucket_config_s3.bucket_name,
&remote_layer_path,
version.version_id.as_deref(),
&local_path,
)
.await?;

View File

@@ -417,14 +417,14 @@ class NeonLocalCli(AbstractNeonCli):
cmd.append(f"--instance-id={instance_id}")
return self.raw_cli(cmd)
def endpoint_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["endpoint-storage", "start"]
def object_storage_start(self, timeout_in_seconds: int | None = None):
cmd = ["object-storage", "start"]
if timeout_in_seconds is not None:
cmd.append(f"--start-timeout={timeout_in_seconds}s")
return self.raw_cli(cmd)
def endpoint_storage_stop(self, immediate: bool):
cmd = ["endpoint-storage", "stop"]
def object_storage_stop(self, immediate: bool):
cmd = ["object-storage", "stop"]
if immediate:
cmd.extend(["-m", "immediate"])
return self.raw_cli(cmd)

View File

@@ -1029,7 +1029,7 @@ class NeonEnvBuilder:
self.env.broker.assert_no_errors()
self.env.endpoint_storage.assert_no_errors()
self.env.object_storage.assert_no_errors()
try:
self.overlay_cleanup_teardown()
@@ -1126,7 +1126,7 @@ class NeonEnv:
pagectl_env_vars["RUST_LOG"] = self.rust_log_override
self.pagectl = Pagectl(extra_env=pagectl_env_vars, binpath=self.neon_binpath)
self.endpoint_storage = EndpointStorage(self)
self.object_storage = ObjectStorage(self)
# The URL for the pageserver to use as its control_plane_api config
if config.storage_controller_port_override is not None:
@@ -1183,7 +1183,7 @@ class NeonEnv:
},
"safekeepers": [],
"pageservers": [],
"endpoint_storage": {"port": self.port_distributor.get_port()},
"object_storage": {"port": self.port_distributor.get_port()},
"generate_local_ssl_certs": self.generate_local_ssl_certs,
}
@@ -1420,7 +1420,7 @@ class NeonEnv:
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
self.object_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
"""
@@ -1439,7 +1439,7 @@ class NeonEnv:
except Exception as e:
raise_later = e
self.endpoint_storage.stop(immediate=immediate)
self.object_storage.stop(immediate=immediate)
# Stop storage controller before pageservers: we don't want it to spuriously
# detect a pageserver "failure" during test teardown
@@ -2660,24 +2660,24 @@ class NeonStorageController(MetricsGetter, LogUtils):
self.stop(immediate=True)
class EndpointStorage(LogUtils):
class ObjectStorage(LogUtils):
def __init__(self, env: NeonEnv):
service_dir = env.repo_dir / "endpoint_storage"
super().__init__(logfile=service_dir / "endpoint_storage.log")
self.conf_path = service_dir / "endpoint_storage.json"
service_dir = env.repo_dir / "object_storage"
super().__init__(logfile=service_dir / "object_storage.log")
self.conf_path = service_dir / "object_storage.json"
self.env = env
def base_url(self):
return json.loads(self.conf_path.read_text())["listen"]
def start(self, timeout_in_seconds: int | None = None):
self.env.neon_cli.endpoint_storage_start(timeout_in_seconds)
self.env.neon_cli.object_storage_start(timeout_in_seconds)
def stop(self, immediate: bool = False):
self.env.neon_cli.endpoint_storage_stop(immediate)
self.env.neon_cli.object_storage_stop(immediate)
def assert_no_errors(self):
assert_no_errors(self.logfile, "endpoint_storage", [])
assert_no_errors(self.logfile, "object_storage", [])
class NeonProxiedStorageController(NeonStorageController):

View File

@@ -65,7 +65,7 @@ def test_ro_replica_lag(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
@@ -198,7 +198,7 @@ def test_replication_start_stop(
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
log.info("Project ID: %s", project_id)
log.info("Primary endpoint ID: %s", project["endpoints"][0]["id"])
log.info("Primary endpoint ID: %s", project["project"]["endpoints"][0]["id"])
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]

View File

@@ -1,70 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.metrics import parse_metrics
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
def test_compute_monitor(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can detect Postgres going down (unresponsive) and
reconnect when it comes back online. Also check that the downtime metrics
are properly emitted.
"""
TEST_DB = "test_compute_monitor"
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
# Check that default postgres database is present
with endpoint.cursor() as cursor:
cursor.execute("SELECT datname FROM pg_database WHERE datname = 'postgres'")
catalog_db = cursor.fetchone()
assert catalog_db is not None
assert len(catalog_db) == 1
# Create a new database
cursor.execute(f"CREATE DATABASE {TEST_DB}")
# Drop database 'postgres'
with endpoint.cursor(dbname=TEST_DB) as cursor:
# Use FORCE to terminate all connections to the database
cursor.execute("DROP DATABASE postgres WITH (FORCE)")
client = endpoint.http_client()
def check_metrics_down():
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
assert len(compute_pg_current_downtime_ms) == 1
assert compute_pg_current_downtime_ms[0].value > 0
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
assert len(compute_pg_downtime_ms_total) == 1
assert compute_pg_downtime_ms_total[0].value > 0
wait_until(check_metrics_down)
# Recreate postgres database
with endpoint.cursor(dbname=TEST_DB) as cursor:
cursor.execute("CREATE DATABASE postgres")
# Current downtime should reset to 0, but not total downtime
def check_metrics_up():
raw_metrics = client.metrics()
metrics = parse_metrics(raw_metrics)
compute_pg_current_downtime_ms = metrics.query_all("compute_pg_current_downtime_ms")
assert len(compute_pg_current_downtime_ms) == 1
assert compute_pg_current_downtime_ms[0].value == 0
compute_pg_downtime_ms_total = metrics.query_all("compute_pg_downtime_ms_total")
assert len(compute_pg_downtime_ms_total) == 1
assert compute_pg_downtime_ms_total[0].value > 0
wait_until(check_metrics_up)
# Just a sanity check that we log the downtime info
endpoint.log_contains("downtime_info")

View File

@@ -1,7 +1,6 @@
from __future__ import annotations
import os
import platform
import shutil
import tarfile
from typing import TYPE_CHECKING
@@ -59,18 +58,7 @@ def test_remote_extensions(
extensions_endpoint = f"http://{host}:{port}/pg-ext-s3-gateway"
build_tag = os.environ.get("BUILD_TAG", "latest")
# We have decided to use the Go naming convention due to Kubernetes.
arch = platform.machine()
match arch:
case "aarch64":
arch = "arm64"
case "x86_64":
arch = "amd64"
case _:
pass
archive_route = f"{build_tag}/{arch}/v{pg_version}/extensions/test_extension.tar.zst"
archive_route = f"{build_tag}/v{pg_version}/extensions/test_extension.tar.zst"
tarball = test_output_dir / "test_extension.tar"
extension_dir = (
base_dir / "test_runner" / "regress" / "data" / "test_remote_extensions" / "test_extension"

View File

@@ -1,9 +1,9 @@
import base64
import json
import re
import time
from enum import Enum
from pathlib import Path
from threading import Event
import psycopg2
import psycopg2.errors
@@ -14,11 +14,12 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
PageserverApiException,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from fixtures.utils import shared_buffers_for_max_cu, skip_in_debug_build, wait_until
from fixtures.utils import shared_buffers_for_max_cu
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -43,7 +44,6 @@ smoke_params = [
]
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
vanilla_pg: VanillaPostgres,
@@ -56,29 +56,24 @@ def test_pgdata_import_smoke(
#
# Setup fake control plane for import progress
#
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
log.info(f"control plane request: {request.json}")
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode.
env.pageserver.is_testing_enabled_or_skip()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api"
}
)
env.pageserver.stop()
env.pageserver.start()
@@ -198,11 +193,40 @@ def test_pgdata_import_smoke(
)
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
def cplane_notified():
assert import_completion_signaled.is_set()
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
# Generous timeout for the MULTIPLE_RELATION_SEGMENTS test variants
wait_until(cplane_notified, timeout=90)
shard_status_file = statusdir / f"shard-{shard_id.shard_index}"
if state == "Active":
shard_status_file_contents = (
shard_status_file.read_text()
) # Active state implies import is done
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(1)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")
@@ -348,27 +372,19 @@ def test_fast_import_with_pageserver_ingest(
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
# Setup pageserver and fake cplane for import progress
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
log.info(f"control plane request: {request.json}")
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
# because import_pgdata code uses this endpoint, not the one in common remote storage config
# TODO: maybe use common remote_storage config in pageserver?
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
@@ -460,10 +476,42 @@ def test_fast_import_with_pageserver_ingest(
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
validate_vanilla_equivalence(conn)
def cplane_notified():
assert import_completion_signaled.is_set()
# Poll pageserver statuses in s3
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
wait_until(cplane_notified, timeout=60)
if state == "Active":
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
shard_status_file_contents = (
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
)
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(0.5)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")

View File

@@ -138,7 +138,7 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
env.neon_cli.pageserver_stop(env.pageserver.id)
env.neon_cli.safekeeper_stop()
env.neon_cli.storage_controller_stop(False)
env.neon_cli.endpoint_storage_stop(False)
env.neon_cli.object_storage_stop(False)
env.neon_cli.storage_broker_stop()
# Keep NeonEnv state up to date, it usually owns starting/stopping services
@@ -185,7 +185,7 @@ def test_cli_start_stop_multi(neon_env_builder: NeonEnvBuilder):
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 1)
env.neon_cli.safekeeper_stop(neon_env_builder.safekeepers_id_start + 2)
env.neon_cli.endpoint_storage_stop(False)
env.neon_cli.object_storage_stop(False)
# Stop this to get out of the way of the following `start`
env.neon_cli.storage_controller_stop(False)

View File

@@ -8,7 +8,7 @@ from jwcrypto import jwk, jwt
@pytest.mark.asyncio
async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
async def test_object_storage_insert_retrieve_delete(neon_simple_env: NeonEnv):
"""
Inserts, retrieves, and deletes test file using a JWT token
"""
@@ -31,7 +31,7 @@ async def test_endpoint_storage_insert_retrieve_delete(neon_simple_env: NeonEnv)
token.make_signed_token(key)
token = token.serialize()
base_url = env.endpoint_storage.base_url()
base_url = env.object_storage.base_url()
key = f"http://{base_url}/{tenant_id}/{timeline_id}/{endpoint_id}/key"
headers = {"Authorization": f"Bearer {token}"}
log.info(f"cache key url {key}")

View File

@@ -95,7 +95,7 @@ def test_storage_controller_smoke(
env.pageservers[1].start()
for sk in env.safekeepers:
sk.start()
env.endpoint_storage.start()
env.object_storage.start()
# The pageservers we started should have registered with the sharding service on startup
nodes = env.storage_controller.node_list()
@@ -347,7 +347,7 @@ def prepare_onboarding_env(
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
env.endpoint_storage.start()
env.object_storage.start()
# This is the pageserver where we'll initially create the tenant. Run it in emergency
# mode so that it doesn't talk to storage controller, and do not register it.
@@ -1612,18 +1612,16 @@ def test_storage_controller_heartbeats(
env = neon_env_builder.init_configs()
env.start()
env.storage_controller.allowed_errors.extend(
[
# Default log allow list permits connection errors, but this test will use error responses on
# the utilization endpoint.
".*Call to node.*management API.*failed.*failpoint.*",
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
".*Call to node.*management API.*failed.* Timeout.*",
# We will intentionally cause reconcile errors
".*Reconcile error.*",
]
# Default log allow list permits connection errors, but this test will use error responses on
# the utilization endpoint.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.*failpoint.*"
)
# The server starts listening to the socket before sending re-attach request,
# but it starts serving HTTP only when re-attach is completed.
# If re-attach is slow (last scenario), storcon's heartbeat requests will time out.
env.storage_controller.allowed_errors.append(
".*Call to node.*management API.*failed.* Timeout.*"
)
# Initially we have two online pageservers
@@ -4242,63 +4240,6 @@ def test_storcon_create_delete_sk_down(
wait_until(timeline_deleted_on_sk)
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("num_safekeepers", [1, 2, 3])
@pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE])
def test_storcon_few_sk(
neon_env_builder: NeonEnvBuilder,
num_safekeepers: int,
deletetion_subject: DeletionSubject,
):
"""
Test that the storcon can create and delete tenants and timelines with a limited/special number of safekeepers
- num_safekeepers: number of safekeepers.
- deletion_subject: test that both single timeline and whole tenant deletion work.
"""
neon_env_builder.num_safekeepers = num_safekeepers
safekeeper_list = list(range(1, num_safekeepers + 1))
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
child_timeline_id = env.create_branch("child_of_main", tenant_id)
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
if deletetion_subject is DeletionSubject.TENANT:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
else:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, child_timeline_id)
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
"""

View File

@@ -77,8 +77,6 @@ regex-automata = { version = "0.4", default-features = false, features = ["dfa-o
regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
rustls-pki-types = { version = "1", features = ["std"] }
rustls-webpki = { version = "0.102", default-features = false, features = ["ring", "std"] }
scopeguard = { version = "1" }
sec1 = { version = "0.7", features = ["pem", "serde", "std", "subtle"] }
serde = { version = "1", features = ["alloc", "derive"] }
@@ -105,6 +103,7 @@ tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zeroize = { version = "1", features = ["derive", "serde"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
@@ -147,6 +146,7 @@ serde = { version = "1", features = ["alloc", "derive"] }
syn = { version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
toml_edit = { version = "0.22", features = ["serde"] }
zerocopy = { version = "0.7", features = ["derive", "simd"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }