mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
Compare commits
25 Commits
problame/w
...
problame/n
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2978c839c6 | ||
|
|
1fdde9e41e | ||
|
|
3c71003b7d | ||
|
|
a36be87680 | ||
|
|
39490ddd6c | ||
|
|
2b581eefa7 | ||
|
|
cedc0376ff | ||
|
|
12e9b2a909 | ||
|
|
918b03b3b0 | ||
|
|
b2ec54b8ac | ||
|
|
193ba2384a | ||
|
|
bcb8bed875 | ||
|
|
d36623ad74 | ||
|
|
23e36ae6a3 | ||
|
|
194981c16b | ||
|
|
415b489f18 | ||
|
|
689ad72e92 | ||
|
|
fd4cce9417 | ||
|
|
d52b81340f | ||
|
|
8dee9908f8 | ||
|
|
407c78cfaf | ||
|
|
800d3d1cee | ||
|
|
4204a7dd59 | ||
|
|
b602063f7f | ||
|
|
bcfd333c98 |
47
.github/workflows/build_and_test.yml
vendored
47
.github/workflows/build_and_test.yml
vendored
@@ -21,6 +21,9 @@ env:
|
||||
COPT: '-Werror'
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
|
||||
NEXTEST_RETRIES: 3
|
||||
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
|
||||
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
|
||||
|
||||
jobs:
|
||||
check-permissions:
|
||||
@@ -44,6 +47,20 @@ jobs:
|
||||
|
||||
exit 1
|
||||
|
||||
cancel-previous-e2e-tests:
|
||||
needs: [ check-permissions ]
|
||||
if: github.event_name == 'pull_request'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Cancel previous e2e-tests runs for this PR
|
||||
env:
|
||||
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
|
||||
run: |
|
||||
gh workflow --repo neondatabase/cloud \
|
||||
run cancel-previous-in-concurrency-group.yml \
|
||||
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
|
||||
|
||||
tag:
|
||||
needs: [ check-permissions ]
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
@@ -186,7 +203,11 @@ jobs:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
options: --init
|
||||
# Raise locked memory limit for tokio-epoll-uring.
|
||||
# On 5.10 LTS kernels < 5.10.162 (and generally mainline kernels < 5.12),
|
||||
# io_uring will account the memory of the CQ and SQ as locked.
|
||||
# More details: https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391
|
||||
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -341,7 +362,9 @@ jobs:
|
||||
|
||||
- name: Run rust tests
|
||||
run: |
|
||||
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
for io_engine in std-fs tokio-epoll-uring ; do
|
||||
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
|
||||
done
|
||||
|
||||
# Run separate tests for real S3
|
||||
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
|
||||
@@ -419,8 +442,8 @@ jobs:
|
||||
runs-on: [ self-hosted, gen3, large ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
# Default shared memory is 64mb
|
||||
options: --init --shm-size=512mb
|
||||
# for changed limits, see comments on `options:` earlier in this file
|
||||
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864 --cgroupns=private --security-opt umask=/sys/fs/cgroup
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
@@ -433,6 +456,9 @@ jobs:
|
||||
submodules: true
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Setup cgroup for use by test suite
|
||||
run: sudo bash -x /setup_neon_testsuite_cgroup.bash
|
||||
|
||||
- name: Pytest regression tests
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
@@ -448,6 +474,8 @@ jobs:
|
||||
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
|
||||
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
|
||||
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
NEON_TEST_SUITE_USE_CGROUPS: /sys/fs/cgroup/neon_testsuite
|
||||
|
||||
- name: Merge and upload coverage data
|
||||
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
|
||||
@@ -458,12 +486,13 @@ jobs:
|
||||
runs-on: [ self-hosted, gen3, small ]
|
||||
container:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
|
||||
# Default shared memory is 64mb
|
||||
options: --init --shm-size=512mb
|
||||
# for changed limits, see comments on `options:` earlier in this file
|
||||
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
|
||||
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
# the amount of groups (N) should be reflected in `extra_params: --splits N ...`
|
||||
pytest_split_group: [ 1, 2, 3, 4 ]
|
||||
build_type: [ release ]
|
||||
steps:
|
||||
@@ -477,11 +506,12 @@ jobs:
|
||||
test_selection: performance
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ github.ref_name == 'main' }}
|
||||
extra_params: --splits ${{ strategy.job-total }} --group ${{ matrix.pytest_split_group }}
|
||||
extra_params: --splits 4 --group ${{ matrix.pytest_split_group }}
|
||||
env:
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
|
||||
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
|
||||
# XXX: no coverage data handling here, since benchmarks are run on release builds,
|
||||
# while coverage is currently collected for the debug ones
|
||||
|
||||
@@ -695,7 +725,8 @@ jobs:
|
||||
\"commit_hash\": \"$COMMIT_SHA\",
|
||||
\"remote_repo\": \"${{ github.repository }}\",
|
||||
\"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
|
||||
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\"
|
||||
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
|
||||
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
|
||||
}
|
||||
}"
|
||||
|
||||
|
||||
44
Cargo.lock
generated
44
Cargo.lock
generated
@@ -1342,6 +1342,7 @@ dependencies = [
|
||||
"regex",
|
||||
"reqwest",
|
||||
"safekeeper_api",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
@@ -2563,6 +2564,16 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "io-uring"
|
||||
version = "0.6.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.9.0"
|
||||
@@ -3361,6 +3372,7 @@ dependencies = [
|
||||
"tenant_size_model",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-epoll-uring",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
@@ -5382,18 +5394,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
|
||||
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "1.0.40"
|
||||
version = "1.0.47"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
|
||||
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -5517,6 +5529,21 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-epoll-uring"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0dd3a2f8bf3239d34a19719ef1a74146c093126f"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"once_cell",
|
||||
"scopeguard",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"uring-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-io-timeout"
|
||||
version = "1.2.0"
|
||||
@@ -6026,6 +6053,15 @@ dependencies = [
|
||||
"webpki-roots 0.23.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uring-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0dd3a2f8bf3239d34a19719ef1a74146c093126f"
|
||||
dependencies = [
|
||||
"io-uring",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "url"
|
||||
version = "2.3.1"
|
||||
|
||||
@@ -151,6 +151,7 @@ test-context = "0.1"
|
||||
thiserror = "1.0"
|
||||
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
tokio-postgres-rustls = "0.10.0"
|
||||
tokio-rustls = "0.24"
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
FROM debian:bullseye-slim
|
||||
|
||||
# Add nonroot user
|
||||
RUN useradd -ms /bin/bash nonroot -b /home
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
|
||||
# System deps
|
||||
RUN set -e \
|
||||
@@ -43,6 +40,7 @@ RUN set -e \
|
||||
openssh-client \
|
||||
parallel \
|
||||
pkg-config \
|
||||
sudo \
|
||||
unzip \
|
||||
wget \
|
||||
xz-utils \
|
||||
@@ -50,6 +48,17 @@ RUN set -e \
|
||||
zstd \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
|
||||
|
||||
# Add nonroot user
|
||||
RUN useradd -ms /bin/bash nonroot -b /home
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
RUN echo "#!/usr/bin/env bash \
|
||||
set -exuo pipefail \
|
||||
mkdir /sys/fs/cgroup/neon_testsuite \
|
||||
chown -R nonroot:nonroot /sys/fs/cgroup/neon_testsuite \
|
||||
echo SUCCESS: cgroup set up for user nonroot at /sys/fs/cgroup/neon_testsuite \
|
||||
" > /setup_neon_testsuite_cgroup.bash && chmod +x /setup_neon_testsuite_cgroup.bash
|
||||
RUN echo "ALL ALL = (ALL) NOPASSWD: /setup_neon_testsuite_cgroup.bash" >> /etc/sudoers
|
||||
|
||||
# protobuf-compiler (protoc)
|
||||
ENV PROTOC_VERSION 25.1
|
||||
RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-$(uname -m | sed 's/aarch64/aarch_64/g').zip" -o "protoc.zip" \
|
||||
|
||||
@@ -144,30 +144,23 @@ RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouti
|
||||
FROM build-deps AS plv8-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ARG PG_VERSION
|
||||
RUN apt update && \
|
||||
apt install -y ninja-build python3-dev libncurses5 binutils clang
|
||||
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
export PLV8_VERSION=3.1.5 \
|
||||
export PLV8_CHECKSUM=1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 \
|
||||
;; \
|
||||
"v16") \
|
||||
export PLV8_VERSION=3.1.8 \
|
||||
export PLV8_CHECKSUM=92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 \
|
||||
;; \
|
||||
*) \
|
||||
echo "Export the valid PG_VERSION variable" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/plv8/plv8/archive/refs/tags/v${PLV8_VERSION}.tar.gz -O plv8.tar.gz && \
|
||||
echo "${PLV8_CHECKSUM} plv8.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.10.tar.gz -O plv8.tar.gz && \
|
||||
echo "7096c3290928561f0d4901b7a52794295dc47f6303102fae3f8e42dd575ad97d plv8.tar.gz" | sha256sum --check && \
|
||||
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
|
||||
# generate and copy upgrade scripts
|
||||
mkdir -p upgrade && ./generate_upgrade.sh 3.1.10 && \
|
||||
cp upgrade/* /usr/local/pgsql/share/extension/ && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
rm -rf /plv8-* && \
|
||||
find /usr/local/pgsql/ -name "plv8-*.so" | xargs strip && \
|
||||
# don't break computes with installed old version of plv8
|
||||
cd /usr/local/pgsql/lib/ && \
|
||||
ln -s plv8-3.1.10.so plv8-3.1.5.so && \
|
||||
ln -s plv8-3.1.10.so plv8-3.1.8.so && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plcoffee.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plls.control
|
||||
|
||||
@@ -19,6 +19,7 @@ hex.workspace = true
|
||||
hyper.workspace = true
|
||||
regex.workspace = true
|
||||
reqwest = { workspace = true, features = ["blocking", "json"] }
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
|
||||
@@ -9,7 +9,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{path::PathBuf, process::Child, str::FromStr};
|
||||
use std::{path::PathBuf, str::FromStr};
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
auth::{Claims, Scope},
|
||||
@@ -220,7 +220,7 @@ impl AttachmentService {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> anyhow::Result<Child> {
|
||||
pub async fn start(&self) -> anyhow::Result<()> {
|
||||
let path_str = self.path.to_string_lossy();
|
||||
|
||||
let mut args = vec!["-l", &self.listen, "-p", &path_str]
|
||||
@@ -254,6 +254,7 @@ impl AttachmentService {
|
||||
)
|
||||
.await;
|
||||
|
||||
// TODO: shouldn't we bail if we fail to spawn the process?
|
||||
for ps_conf in &self.env.pageservers {
|
||||
let (pg_host, pg_port) =
|
||||
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::io::Write;
|
||||
use std::os::unix::prelude::AsRawFd;
|
||||
use std::os::unix::process::CommandExt;
|
||||
use std::path::Path;
|
||||
use std::process::{Child, Command};
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use std::{fs, io, thread};
|
||||
|
||||
@@ -60,7 +60,7 @@ pub async fn start_process<F, Fut, AI, A, EI>(
|
||||
envs: EI,
|
||||
initial_pid_file: InitialPidFile,
|
||||
process_status_check: F,
|
||||
) -> anyhow::Result<Child>
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
F: Fn() -> Fut,
|
||||
Fut: std::future::Future<Output = anyhow::Result<bool>>,
|
||||
@@ -98,7 +98,7 @@ where
|
||||
InitialPidFile::Expect(path) => path,
|
||||
};
|
||||
|
||||
let mut spawned_process = filled_cmd.spawn().with_context(|| {
|
||||
let spawned_process = filled_cmd.spawn().with_context(|| {
|
||||
format!("Could not spawn {process_name}, see console output and log files for details.")
|
||||
})?;
|
||||
let pid = spawned_process.id();
|
||||
@@ -106,12 +106,26 @@ where
|
||||
i32::try_from(pid)
|
||||
.with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
|
||||
);
|
||||
// set up a scopeguard to kill & wait for the child in case we panic or bail below
|
||||
let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
|
||||
println!("SIGKILL & wait the started process");
|
||||
(|| {
|
||||
// TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
|
||||
spawned_process.kill().context("SIGKILL child")?;
|
||||
spawned_process.wait().context("wait() for child process")?;
|
||||
anyhow::Ok(())
|
||||
})()
|
||||
.with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
for retries in 0..RETRIES {
|
||||
match process_started(pid, pid_file_to_check, &process_status_check).await {
|
||||
Ok(true) => {
|
||||
println!("\n{process_name} started, pid: {pid}");
|
||||
return Ok(spawned_process);
|
||||
println!("\n{process_name} started and passed status check, pid: {pid}");
|
||||
// leak the child process, it'll outlive this neon_local invocation
|
||||
drop(scopeguard::ScopeGuard::into_inner(spawned_process));
|
||||
return Ok(());
|
||||
}
|
||||
Ok(false) => {
|
||||
if retries == NOTICE_AFTER_RETRIES {
|
||||
@@ -126,16 +140,15 @@ where
|
||||
thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
|
||||
}
|
||||
Err(e) => {
|
||||
println!("{process_name} failed to start: {e:#}");
|
||||
if let Err(e) = spawned_process.kill() {
|
||||
println!("Could not stop {process_name} subprocess: {e:#}")
|
||||
};
|
||||
println!("error starting process {process_name:?}: {e:#}");
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
println!();
|
||||
anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
|
||||
anyhow::bail!(
|
||||
"{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds"
|
||||
);
|
||||
}
|
||||
|
||||
/// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
|
||||
|
||||
@@ -438,7 +438,7 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
|
||||
// TODO use background_process::stop_process instead
|
||||
// TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
|
||||
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
|
||||
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
|
||||
let pid = nix::unistd::Pid::from_raw(pid as i32);
|
||||
@@ -583,9 +583,21 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
let child = cmd.spawn()?;
|
||||
// set up a scopeguard to kill & wait for the child in case we panic or bail below
|
||||
let child = scopeguard::guard(child, |mut child| {
|
||||
println!("SIGKILL & wait the started process");
|
||||
(|| {
|
||||
// TODO: use another signal that can be caught by the child so it can clean up any children it spawned
|
||||
child.kill().context("SIGKILL child")?;
|
||||
child.wait().context("wait() for child process")?;
|
||||
anyhow::Ok(())
|
||||
})()
|
||||
.with_context(|| format!("scopeguard kill&wait child {child:?}"))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
// Write down the pid so we can wait for it when we want to stop
|
||||
// TODO use background_process::start_process instead
|
||||
// TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
|
||||
let pid = child.id();
|
||||
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
|
||||
std::fs::write(pidfile_path, pid.to_string())?;
|
||||
@@ -634,6 +646,9 @@ impl Endpoint {
|
||||
std::thread::sleep(ATTEMPT_INTERVAL);
|
||||
}
|
||||
|
||||
// disarm the scopeguard, let the child outlive this function (and neon_local invoction)
|
||||
drop(scopeguard::ScopeGuard::into_inner(child));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ use std::io;
|
||||
use std::io::Write;
|
||||
use std::num::NonZeroU64;
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Child, Command};
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
@@ -161,7 +161,7 @@ impl PageServerNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
|
||||
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
self.start_node(config_overrides, false).await
|
||||
}
|
||||
|
||||
@@ -207,7 +207,7 @@ impl PageServerNode {
|
||||
&self,
|
||||
config_overrides: &[&str],
|
||||
update_config: bool,
|
||||
) -> anyhow::Result<Child> {
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: using a thread here because start_process() is not async but we need to call check_status()
|
||||
let datadir = self.repo_path();
|
||||
print!(
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
//! ```
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Child;
|
||||
use std::{io, result};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -104,7 +103,7 @@ impl SafekeeperNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<Child> {
|
||||
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<()> {
|
||||
print!(
|
||||
"Starting safekeeper at '{}' in '{}'",
|
||||
self.pg_connection_config.raw_address(),
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
|
||||
use anyhow::Result;
|
||||
@@ -23,6 +24,7 @@ use futures::stream::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use http_types::{StatusCode, Url};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::s3_bucket::RequestKind;
|
||||
@@ -370,6 +372,20 @@ impl RemoteStorage for AzureBlobStorage {
|
||||
copy_status = status;
|
||||
}
|
||||
}
|
||||
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
_prefix: Option<&RemotePath>,
|
||||
_timestamp: SystemTime,
|
||||
_done_if_after: SystemTime,
|
||||
_cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO use Azure point in time recovery feature for this
|
||||
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
|
||||
Err(anyhow::anyhow!(
|
||||
"time travel recovery for azure blob storage is not implemented"
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
|
||||
@@ -25,6 +25,7 @@ use bytes::Bytes;
|
||||
use futures::stream::Stream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use toml_edit::Item;
|
||||
use tracing::info;
|
||||
|
||||
@@ -210,6 +211,15 @@ pub trait RemoteStorage: Send + Sync + 'static {
|
||||
|
||||
/// Copy a remote object inside a bucket from one path to another.
|
||||
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
|
||||
|
||||
/// Resets the content of everything with the given prefix to the given state
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
|
||||
@@ -387,6 +397,33 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
|
||||
Self::Unreliable(s) => s.copy(from, to).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn time_travel_recover(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::LocalFs(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
Self::AwsS3(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
Self::AzureBlob(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
Self::Unreliable(s) => {
|
||||
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
@@ -674,6 +711,7 @@ impl ConcurrencyLimiter {
|
||||
RequestKind::List => &self.read,
|
||||
RequestKind::Delete => &self.write,
|
||||
RequestKind::Copy => &self.write,
|
||||
RequestKind::TimeTravel => &self.write,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//! This storage used in tests, but can also be used in cases when a certain persistent
|
||||
//! volume is mounted to the local FS.
|
||||
|
||||
use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
|
||||
use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime};
|
||||
|
||||
use anyhow::{bail, ensure, Context};
|
||||
use bytes::Bytes;
|
||||
@@ -14,7 +14,7 @@ use tokio::{
|
||||
fs,
|
||||
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
|
||||
};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio_util::{io::ReaderStream, sync::CancellationToken};
|
||||
use tracing::*;
|
||||
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
|
||||
|
||||
@@ -422,6 +422,17 @@ impl RemoteStorage for LocalFs {
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::diverging_sub_expression)]
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
_prefix: Option<&RemotePath>,
|
||||
_timestamp: SystemTime,
|
||||
_done_if_after: SystemTime,
|
||||
_cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
|
||||
|
||||
@@ -6,12 +6,14 @@
|
||||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
collections::HashMap,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use anyhow::Context as _;
|
||||
use anyhow::{anyhow, Context as _};
|
||||
use aws_config::{
|
||||
environment::credentials::EnvironmentVariableCredentialsProvider,
|
||||
imds::credentials::ImdsCredentialsProvider,
|
||||
@@ -27,17 +29,19 @@ use aws_sdk_s3::{
|
||||
config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
|
||||
error::SdkError,
|
||||
operation::get_object::GetObjectError,
|
||||
types::{Delete, ObjectIdentifier},
|
||||
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
|
||||
Client,
|
||||
};
|
||||
use aws_smithy_async::rt::sleep::TokioSleep;
|
||||
|
||||
use aws_smithy_types::body::SdkBody;
|
||||
use aws_smithy_types::byte_stream::ByteStream;
|
||||
use aws_smithy_types::{body::SdkBody, DateTime};
|
||||
use bytes::Bytes;
|
||||
use futures::stream::Stream;
|
||||
use hyper::Body;
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
|
||||
use super::StorageMetadata;
|
||||
use crate::{
|
||||
@@ -270,6 +274,59 @@ impl S3Bucket {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_oids(
|
||||
&self,
|
||||
kind: RequestKind,
|
||||
delete_objects: &[ObjectIdentifier],
|
||||
) -> anyhow::Result<()> {
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.delete_objects()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.delete(
|
||||
Delete::builder()
|
||||
.set_objects(Some(chunk.to_vec()))
|
||||
.build()?,
|
||||
)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &resp, started_at);
|
||||
|
||||
let resp = resp?;
|
||||
metrics::BUCKET_METRICS
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
if let Some(errors) = resp.errors {
|
||||
// Log a bounded number of the errors within the response:
|
||||
// these requests can carry 1000 keys so logging each one
|
||||
// would be too verbose, especially as errors may lead us
|
||||
// to retry repeatedly.
|
||||
const LOG_UP_TO_N_ERRORS: usize = 10;
|
||||
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
|
||||
tracing::warn!(
|
||||
"DeleteObjects key {} failed: {}: {}",
|
||||
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.message.as_ref().map(Cow::from).unwrap_or("".into())
|
||||
);
|
||||
}
|
||||
|
||||
return Err(anyhow::format_err!(
|
||||
"Failed to delete {} objects",
|
||||
errors.len()
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pin_project_lite::pin_project! {
|
||||
@@ -568,64 +625,168 @@ impl RemoteStorage for S3Bucket {
|
||||
delete_objects.push(obj_id);
|
||||
}
|
||||
|
||||
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
|
||||
let started_at = start_measuring_requests(kind);
|
||||
|
||||
let resp = self
|
||||
.client
|
||||
.delete_objects()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.delete(
|
||||
Delete::builder()
|
||||
.set_objects(Some(chunk.to_vec()))
|
||||
.build()?,
|
||||
)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let started_at = ScopeGuard::into_inner(started_at);
|
||||
metrics::BUCKET_METRICS
|
||||
.req_seconds
|
||||
.observe_elapsed(kind, &resp, started_at);
|
||||
|
||||
match resp {
|
||||
Ok(resp) => {
|
||||
metrics::BUCKET_METRICS
|
||||
.deleted_objects_total
|
||||
.inc_by(chunk.len() as u64);
|
||||
if let Some(errors) = resp.errors {
|
||||
// Log a bounded number of the errors within the response:
|
||||
// these requests can carry 1000 keys so logging each one
|
||||
// would be too verbose, especially as errors may lead us
|
||||
// to retry repeatedly.
|
||||
const LOG_UP_TO_N_ERRORS: usize = 10;
|
||||
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
|
||||
tracing::warn!(
|
||||
"DeleteObjects key {} failed: {}: {}",
|
||||
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
|
||||
e.message.as_ref().map(Cow::from).unwrap_or("".into())
|
||||
);
|
||||
}
|
||||
|
||||
return Err(anyhow::format_err!(
|
||||
"Failed to delete {} objects",
|
||||
errors.len()
|
||||
));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
self.delete_oids(kind, &delete_objects).await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
|
||||
let paths = std::array::from_ref(path);
|
||||
self.delete_objects(paths).await
|
||||
}
|
||||
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let kind = RequestKind::TimeTravel;
|
||||
let _guard = self.permit(kind).await;
|
||||
|
||||
let timestamp = DateTime::from(timestamp);
|
||||
let done_if_after = DateTime::from(done_if_after);
|
||||
|
||||
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
|
||||
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let prefix = prefix
|
||||
.map(|p| self.relative_path_to_s3_object(p))
|
||||
.or_else(|| self.prefix_in_bucket.clone());
|
||||
|
||||
let warn_threshold = 3;
|
||||
let max_retries = 10;
|
||||
let is_permanent = |_e: &_| false;
|
||||
|
||||
let list = backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
.list_object_versions()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.set_prefix(prefix.clone())
|
||||
.send()
|
||||
.await?)
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if list.is_truncated().unwrap_or_default() {
|
||||
anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}");
|
||||
}
|
||||
|
||||
let mut versions_deletes = list
|
||||
.versions()
|
||||
.iter()
|
||||
.map(VerOrDelete::Version)
|
||||
.chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified()));
|
||||
|
||||
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
|
||||
|
||||
for vd in versions_deletes {
|
||||
let last_modified = vd.last_modified();
|
||||
let version_id = vd.version_id();
|
||||
let key = vd.key();
|
||||
let (Some(last_modified), Some(version_id), Some(key)) =
|
||||
(last_modified, version_id, key)
|
||||
else {
|
||||
anyhow::bail!(
|
||||
"One (or more) of last_modified, key, and id is None. \
|
||||
Is versioning enabled in the bucket? last_modified={:?} key={:?} version_id={:?}",
|
||||
last_modified, key, version_id,
|
||||
);
|
||||
};
|
||||
if version_id == "null" {
|
||||
anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values");
|
||||
}
|
||||
tracing::trace!(
|
||||
"Parsing version key={key} version_id={version_id} is_delete={}",
|
||||
matches!(vd, VerOrDelete::DeleteMarker(_))
|
||||
);
|
||||
|
||||
vds_for_key
|
||||
.entry(key)
|
||||
.or_default()
|
||||
.push((vd, last_modified, version_id));
|
||||
}
|
||||
for (key, versions) in vds_for_key {
|
||||
let (last_vd, last_last_modified, _version_id) = versions.last().unwrap();
|
||||
if last_last_modified > &&done_if_after {
|
||||
tracing::trace!("Key {key} has version later than done_if_after, skipping");
|
||||
continue;
|
||||
}
|
||||
// the version we want to restore to.
|
||||
let version_to_restore_to =
|
||||
match versions.binary_search_by_key(×tamp, |tpl| *tpl.1) {
|
||||
Ok(v) => v,
|
||||
Err(e) => e,
|
||||
};
|
||||
if version_to_restore_to == versions.len() {
|
||||
tracing::trace!("Key {key} has no changes since timestamp, skipping");
|
||||
continue;
|
||||
}
|
||||
let mut do_delete = false;
|
||||
if version_to_restore_to == 0 {
|
||||
// All versions more recent, so the key didn't exist at the specified time point.
|
||||
tracing::trace!(
|
||||
"All {} versions more recent for {key}, deleting",
|
||||
versions.len()
|
||||
);
|
||||
do_delete = true;
|
||||
} else {
|
||||
match &versions[version_to_restore_to - 1] {
|
||||
(VerOrDelete::Version(_), _last_modified, version_id) => {
|
||||
tracing::trace!("Copying old version {version_id} for {key}...");
|
||||
// Restore the state to the last version by copying
|
||||
let source_id =
|
||||
format!("{}/{key}?versionId={version_id}", self.bucket_name);
|
||||
|
||||
backoff::retry(
|
||||
|| async {
|
||||
Ok(self
|
||||
.client
|
||||
.copy_object()
|
||||
.bucket(self.bucket_name.clone())
|
||||
.key(key)
|
||||
.copy_source(&source_id)
|
||||
.send()
|
||||
.await?)
|
||||
},
|
||||
is_permanent,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
"listing object versions for time_travel_recover",
|
||||
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
(VerOrDelete::DeleteMarker(_), _last_modified, _version_id) => {
|
||||
do_delete = true;
|
||||
}
|
||||
}
|
||||
};
|
||||
if do_delete {
|
||||
if matches!(last_vd, VerOrDelete::DeleteMarker(_)) {
|
||||
// Key has since been deleted (but there was some history), no need to do anything
|
||||
tracing::trace!("Key {key} already deleted, skipping.");
|
||||
} else {
|
||||
tracing::trace!("Deleting {key}...");
|
||||
|
||||
let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?;
|
||||
self.delete_oids(kind, &[oid]).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
|
||||
@@ -650,6 +811,32 @@ fn start_measuring_requests(
|
||||
})
|
||||
}
|
||||
|
||||
enum VerOrDelete<'a> {
|
||||
Version(&'a ObjectVersion),
|
||||
DeleteMarker(&'a DeleteMarkerEntry),
|
||||
}
|
||||
|
||||
impl<'a> VerOrDelete<'a> {
|
||||
fn last_modified(&self) -> Option<&'a DateTime> {
|
||||
match self {
|
||||
VerOrDelete::Version(v) => v.last_modified(),
|
||||
VerOrDelete::DeleteMarker(v) => v.last_modified(),
|
||||
}
|
||||
}
|
||||
fn version_id(&self) -> Option<&'a str> {
|
||||
match self {
|
||||
VerOrDelete::Version(v) => v.version_id(),
|
||||
VerOrDelete::DeleteMarker(v) => v.version_id(),
|
||||
}
|
||||
}
|
||||
fn key(&self) -> Option<&'a str> {
|
||||
match self {
|
||||
VerOrDelete::Version(v) => v.key(),
|
||||
VerOrDelete::DeleteMarker(v) => v.key(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use camino::Utf8Path;
|
||||
|
||||
@@ -12,6 +12,7 @@ pub(crate) enum RequestKind {
|
||||
Delete = 2,
|
||||
List = 3,
|
||||
Copy = 4,
|
||||
TimeTravel = 5,
|
||||
}
|
||||
|
||||
use RequestKind::*;
|
||||
@@ -24,6 +25,7 @@ impl RequestKind {
|
||||
Delete => "delete_object",
|
||||
List => "list_objects",
|
||||
Copy => "copy_object",
|
||||
TimeTravel => "time_travel_recover",
|
||||
}
|
||||
}
|
||||
const fn as_index(&self) -> usize {
|
||||
@@ -31,7 +33,7 @@ impl RequestKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct RequestTyped<C>([C; 5]);
|
||||
pub(super) struct RequestTyped<C>([C; 6]);
|
||||
|
||||
impl<C> RequestTyped<C> {
|
||||
pub(super) fn get(&self, kind: RequestKind) -> &C {
|
||||
@@ -40,8 +42,8 @@ impl<C> RequestTyped<C> {
|
||||
|
||||
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
|
||||
use RequestKind::*;
|
||||
let mut it = [Get, Put, Delete, List, Copy].into_iter();
|
||||
let arr = std::array::from_fn::<C, 5, _>(|index| {
|
||||
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
|
||||
let arr = std::array::from_fn::<C, 6, _>(|index| {
|
||||
let next = it.next().unwrap();
|
||||
assert_eq!(index, next.as_index());
|
||||
f(next)
|
||||
|
||||
@@ -5,7 +5,9 @@ use bytes::Bytes;
|
||||
use futures::stream::Stream;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
use std::time::SystemTime;
|
||||
use std::{collections::hash_map::Entry, sync::Arc};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
|
||||
@@ -30,6 +32,7 @@ enum RemoteOp {
|
||||
Download(RemotePath),
|
||||
Delete(RemotePath),
|
||||
DeleteObjects(Vec<RemotePath>),
|
||||
TimeTravelRecover(Option<RemotePath>),
|
||||
}
|
||||
|
||||
impl UnreliableWrapper {
|
||||
@@ -181,4 +184,17 @@ impl RemoteStorage for UnreliableWrapper {
|
||||
self.attempt(RemoteOp::Upload(to.clone()))?;
|
||||
self.inner.copy_object(from, to).await
|
||||
}
|
||||
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
prefix: Option<&RemotePath>,
|
||||
timestamp: SystemTime,
|
||||
done_if_after: SystemTime,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?;
|
||||
self.inner
|
||||
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
use std::collections::HashSet;
|
||||
use std::env;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use std::time::{Duration, UNIX_EPOCH};
|
||||
use std::{collections::HashSet, time::SystemTime};
|
||||
|
||||
use crate::common::{download_to_vec, upload_stream};
|
||||
use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use remote_storage::{
|
||||
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
|
||||
};
|
||||
use test_context::test_context;
|
||||
use test_context::AsyncTestContext;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
|
||||
mod common;
|
||||
@@ -23,6 +27,121 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_
|
||||
|
||||
const BASE_PREFIX: &str = "test";
|
||||
|
||||
#[test_context(MaybeEnabledStorage)]
|
||||
#[tokio::test]
|
||||
async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
|
||||
let ctx = match ctx {
|
||||
MaybeEnabledStorage::Enabled(ctx) => ctx,
|
||||
MaybeEnabledStorage::Disabled => return Ok(()),
|
||||
};
|
||||
// Our test depends on discrepancies in the clock between S3 and the environment the tests
|
||||
// run in. Therefore, wait a little bit before and after. The alternative would be
|
||||
// to take the time from S3 response headers.
|
||||
const WAIT_TIME: Duration = Duration::from_millis(3_000);
|
||||
|
||||
async fn time_point() -> SystemTime {
|
||||
tokio::time::sleep(WAIT_TIME).await;
|
||||
let ret = SystemTime::now();
|
||||
tokio::time::sleep(WAIT_TIME).await;
|
||||
ret
|
||||
}
|
||||
|
||||
async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
|
||||
Ok(client
|
||||
.list_files(None)
|
||||
.await
|
||||
.context("list root files failure")?
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>())
|
||||
}
|
||||
|
||||
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
|
||||
ctx.client.upload(data, len, &path1, None).await?;
|
||||
|
||||
let t0_files = list_files(&ctx.client).await?;
|
||||
let t0 = time_point().await;
|
||||
println!("at t0: {t0_files:?}");
|
||||
|
||||
let old_data = "remote blob data2";
|
||||
let (data, len) = upload_stream(old_data.as_bytes().into());
|
||||
ctx.client.upload(data, len, &path2, None).await?;
|
||||
|
||||
let t1_files = list_files(&ctx.client).await?;
|
||||
let t1 = time_point().await;
|
||||
println!("at t1: {t1_files:?}");
|
||||
|
||||
// A little check to ensure that our clock is not too far off from the S3 clock
|
||||
{
|
||||
let dl = ctx.client.download(&path2).await?;
|
||||
let last_modified = dl.last_modified.unwrap();
|
||||
let half_wt = WAIT_TIME.mul_f32(0.5);
|
||||
let t0_hwt = t0 + half_wt;
|
||||
let t1_hwt = t1 - half_wt;
|
||||
if !(t0_hwt..=t1_hwt).contains(&last_modified) {
|
||||
panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
|
||||
This likely means a large lock discrepancy between S3 and the local clock.");
|
||||
}
|
||||
}
|
||||
|
||||
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
|
||||
ctx.client.upload(data, len, &path3, None).await?;
|
||||
|
||||
let new_data = "new remote blob data2";
|
||||
let (data, len) = upload_stream(new_data.as_bytes().into());
|
||||
ctx.client.upload(data, len, &path2, None).await?;
|
||||
|
||||
ctx.client.delete(&path1).await?;
|
||||
|
||||
let t2_files = list_files(&ctx.client).await?;
|
||||
let t2 = time_point().await;
|
||||
println!("at t2: {t2_files:?}");
|
||||
|
||||
// No changes after recovery to t2 (no-op)
|
||||
let t_final = time_point().await;
|
||||
ctx.client
|
||||
.time_travel_recover(None, t2, t_final, CancellationToken::new())
|
||||
.await?;
|
||||
let t2_files_recovered = list_files(&ctx.client).await?;
|
||||
println!("after recovery to t2: {t2_files_recovered:?}");
|
||||
assert_eq!(t2_files, t2_files_recovered);
|
||||
let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?;
|
||||
assert_eq!(path2_recovered_t2, new_data.as_bytes());
|
||||
|
||||
// after recovery to t1: path1 is back, path2 has the old content
|
||||
let t_final = time_point().await;
|
||||
ctx.client
|
||||
.time_travel_recover(None, t1, t_final, CancellationToken::new())
|
||||
.await?;
|
||||
let t1_files_recovered = list_files(&ctx.client).await?;
|
||||
println!("after recovery to t1: {t1_files_recovered:?}");
|
||||
assert_eq!(t1_files, t1_files_recovered);
|
||||
let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?;
|
||||
assert_eq!(path2_recovered_t1, old_data.as_bytes());
|
||||
|
||||
// after recovery to t0: everything is gone except for path1
|
||||
let t_final = time_point().await;
|
||||
ctx.client
|
||||
.time_travel_recover(None, t0, t_final, CancellationToken::new())
|
||||
.await?;
|
||||
let t0_files_recovered = list_files(&ctx.client).await?;
|
||||
println!("after recovery to t0: {t0_files_recovered:?}");
|
||||
assert_eq!(t0_files, t0_files_recovered);
|
||||
|
||||
// cleanup
|
||||
ctx.client.delete_objects(&[path1, path2, path3]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct EnabledS3 {
|
||||
client: Arc<GenericRemoteStorage>,
|
||||
base_prefix: &'static str,
|
||||
|
||||
@@ -61,6 +61,7 @@ sync_wrapper.workspace = true
|
||||
tokio-tar.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
|
||||
tokio-epoll-uring.workspace = true
|
||||
tokio-io-timeout.workspace = true
|
||||
tokio-postgres.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
|
||||
@@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader;
|
||||
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
|
||||
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
|
||||
use pageserver::tenant::storage_layer::range_overlaps;
|
||||
use pageserver::virtual_file::VirtualFile;
|
||||
use pageserver::virtual_file::{self, VirtualFile};
|
||||
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let mut total_delta_layers = 0usize;
|
||||
|
||||
@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
|
||||
virtual_file::init(10);
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let file = FileBlockReader::new(VirtualFile::open(path).await?);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
|
||||
|
||||
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(10);
|
||||
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
|
||||
page_cache::init(100);
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
dump_layerfile_from_path(path, true, &ctx).await
|
||||
|
||||
@@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> {
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// Basic initialization of things that don't change after startup
|
||||
virtual_file::init(conf.max_file_descriptors);
|
||||
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
|
||||
page_cache::init(conf.page_cache_size);
|
||||
|
||||
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{
|
||||
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use crate::virtual_file;
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
|
||||
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
@@ -43,6 +44,8 @@ use crate::{
|
||||
|
||||
use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
|
||||
|
||||
use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
|
||||
|
||||
pub mod defaults {
|
||||
use crate::tenant::config::defaults::*;
|
||||
use const_format::formatcp;
|
||||
@@ -79,6 +82,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
///
|
||||
@@ -114,6 +119,8 @@ pub mod defaults {
|
||||
|
||||
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
|
||||
|
||||
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
|
||||
|
||||
[tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
@@ -247,6 +254,8 @@ pub struct PageServerConf {
|
||||
|
||||
/// Maximum number of WAL records to be ingested and committed at the same time
|
||||
pub ingest_batch_size: u64,
|
||||
|
||||
pub virtual_file_io_engine: virtual_file::IoEngineKind,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -331,6 +340,8 @@ struct PageServerConfigBuilder {
|
||||
secondary_download_concurrency: BuilderValue<usize>,
|
||||
|
||||
ingest_batch_size: BuilderValue<u64>,
|
||||
|
||||
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -406,6 +417,8 @@ impl Default for PageServerConfigBuilder {
|
||||
secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
|
||||
|
||||
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -562,6 +575,10 @@ impl PageServerConfigBuilder {
|
||||
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
|
||||
}
|
||||
|
||||
pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
|
||||
self.virtual_file_io_engine = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_warmup = self
|
||||
.concurrent_tenant_warmup
|
||||
@@ -669,6 +686,9 @@ impl PageServerConfigBuilder {
|
||||
ingest_batch_size: self
|
||||
.ingest_batch_size
|
||||
.ok_or(anyhow!("missing ingest_batch_size"))?,
|
||||
virtual_file_io_engine: self
|
||||
.virtual_file_io_engine
|
||||
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -920,6 +940,9 @@ impl PageServerConf {
|
||||
builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
|
||||
},
|
||||
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
|
||||
"virtual_file_io_engine" => {
|
||||
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
|
||||
}
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -993,6 +1016,7 @@ impl PageServerConf {
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1225,6 +1249,7 @@ background_task_maximum_delay = '334 s'
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1288,6 +1313,7 @@ background_task_maximum_delay = '334 s'
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: 100,
|
||||
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![recursion_limit = "300"]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
mod auth;
|
||||
|
||||
@@ -932,6 +932,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) mod virtual_file_descriptor_cache {
|
||||
use super::*;
|
||||
|
||||
@@ -951,6 +952,20 @@ pub(crate) mod virtual_file_descriptor_cache {
|
||||
// ```
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(crate) mod virtual_file_io_engine {
|
||||
use super::*;
|
||||
|
||||
pub(crate) static KIND: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_virtual_file_io_engine_kind",
|
||||
"The configured io engine for VirtualFile",
|
||||
&["kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct GlobalAndPerTimelineHistogram {
|
||||
global: Histogram,
|
||||
|
||||
@@ -5,10 +5,10 @@
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::ops::Deref;
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -39,6 +39,8 @@ pub enum BlockLease<'a> {
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
#[cfg(test)]
|
||||
Vec(Vec<u8>),
|
||||
}
|
||||
|
||||
impl From<PageReadGuard<'static>> for BlockLease<'static> {
|
||||
@@ -63,6 +65,10 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
BlockLease::EphemeralFileMutableTail(v) => v,
|
||||
#[cfg(test)]
|
||||
BlockLease::Arc(v) => v.deref(),
|
||||
#[cfg(test)]
|
||||
BlockLease::Vec(v) => {
|
||||
TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -169,10 +175,14 @@ impl FileBlockReader {
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
async fn fill_buffer(
|
||||
&self,
|
||||
buf: PageWriteGuard<'static>,
|
||||
blkno: u32,
|
||||
) -> Result<PageWriteGuard<'static>, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
/// Read a block.
|
||||
@@ -196,9 +206,9 @@ impl FileBlockReader {
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::cmp::min;
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
@@ -47,7 +47,10 @@ impl EphemeralFile {
|
||||
|
||||
let file = VirtualFile::open_with_options(
|
||||
&filename,
|
||||
OpenOptions::new().read(true).write(true).create(true),
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(true),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -89,11 +92,10 @@ impl EphemeralFile {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
page_cache::ReadBufResult::NotFound(write_guard) => {
|
||||
let write_guard = self
|
||||
.file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
let read_guard = write_guard.mark_valid();
|
||||
return Ok(BlockLease::PageReadGuard(read_guard));
|
||||
|
||||
@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{walrecord, TEMP_FILE_SUFFIX};
|
||||
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
@@ -649,7 +649,7 @@ impl DeltaLayer {
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::Timeline;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
@@ -327,7 +327,7 @@ impl ImageLayer {
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
virtual_file::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
@@ -492,11 +492,15 @@ impl ImageLayerWriterInner {
|
||||
},
|
||||
);
|
||||
info!("new image layer {path}");
|
||||
let mut file = VirtualFile::open_with_options(
|
||||
&path,
|
||||
std::fs::OpenOptions::new().write(true).create_new(true),
|
||||
)
|
||||
.await?;
|
||||
let mut file = {
|
||||
VirtualFile::open_with_options(
|
||||
&path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true),
|
||||
)
|
||||
.await?
|
||||
};
|
||||
// make room for the header block
|
||||
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
|
||||
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::TENANT_TASK_EVENTS;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::{Tenant, TenantState};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -181,8 +182,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
);
|
||||
error_run_count += 1;
|
||||
let wait_duration = Duration::from_secs_f64(wait_duration);
|
||||
error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
|
||||
log_compaction_error(
|
||||
&e,
|
||||
error_run_count,
|
||||
&wait_duration,
|
||||
cancel.is_cancelled(),
|
||||
);
|
||||
wait_duration
|
||||
} else {
|
||||
@@ -210,6 +214,58 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
|
||||
}
|
||||
|
||||
fn log_compaction_error(
|
||||
e: &CompactionError,
|
||||
error_run_count: u32,
|
||||
sleep_duration: &std::time::Duration,
|
||||
task_cancelled: bool,
|
||||
) {
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
use crate::tenant::PageReconstructError;
|
||||
use CompactionError::*;
|
||||
|
||||
enum LooksLike {
|
||||
Info,
|
||||
Error,
|
||||
}
|
||||
|
||||
let decision = match e {
|
||||
ShuttingDown => None,
|
||||
_ if task_cancelled => Some(LooksLike::Info),
|
||||
Other(e) => {
|
||||
let root_cause = e.root_cause();
|
||||
|
||||
let is_stopping = {
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
|
||||
upload_queue || timeline
|
||||
};
|
||||
|
||||
if is_stopping {
|
||||
Some(LooksLike::Info)
|
||||
} else {
|
||||
Some(LooksLike::Error)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match decision {
|
||||
Some(LooksLike::Info) => info!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
|
||||
),
|
||||
Some(LooksLike::Error) => error!(
|
||||
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
|
||||
),
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// GC task's main loop
|
||||
///
|
||||
|
||||
@@ -392,8 +392,7 @@ pub(crate) enum PageReconstructError {
|
||||
#[error("Ancestor LSN wait error: {0}")]
|
||||
AncestorLsnTimeout(#[from] WaitLsnError),
|
||||
|
||||
/// The operation was cancelled
|
||||
#[error("Cancelled")]
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
/// The ancestor of this is being stopped
|
||||
@@ -405,6 +404,19 @@ pub(crate) enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
impl PageReconstructError {
|
||||
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use PageReconstructError::*;
|
||||
match self {
|
||||
Other(_) => false,
|
||||
AncestorLsnTimeout(_) => false,
|
||||
Cancelled | AncestorStopping(_) => true,
|
||||
WalRedo(_) => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum CreateImageLayersError {
|
||||
#[error("timeline shutting down")]
|
||||
|
||||
@@ -126,6 +126,27 @@ pub(super) struct UploadQueueStopped {
|
||||
pub(super) deleted_at: SetDeletedFlagProgress,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum NotInitialized {
|
||||
#[error("queue is in state Uninitialized")]
|
||||
Uninitialized,
|
||||
#[error("queue is in state Stopping")]
|
||||
Stopped,
|
||||
#[error("queue is shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl NotInitialized {
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
use NotInitialized::*;
|
||||
match self {
|
||||
Uninitialized => false,
|
||||
Stopped => true,
|
||||
ShuttingDown => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UploadQueue {
|
||||
pub(crate) fn initialize_empty_remote(
|
||||
&mut self,
|
||||
@@ -214,17 +235,17 @@ impl UploadQueue {
|
||||
}
|
||||
|
||||
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
|
||||
use UploadQueue::*;
|
||||
match self {
|
||||
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
|
||||
anyhow::bail!("queue is in state {}", self.as_str())
|
||||
}
|
||||
UploadQueue::Initialized(x) => {
|
||||
if !x.shutting_down {
|
||||
Ok(x)
|
||||
Uninitialized => Err(NotInitialized::Uninitialized.into()),
|
||||
Initialized(x) => {
|
||||
if x.shutting_down {
|
||||
Err(NotInitialized::ShuttingDown.into())
|
||||
} else {
|
||||
anyhow::bail!("queue is shutting down")
|
||||
Ok(x)
|
||||
}
|
||||
}
|
||||
Stopped(_) => Err(NotInitialized::Stopped.into()),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,18 +11,28 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
mod io_engine;
|
||||
mod open_options;
|
||||
pub use io_engine::IoEngineKind;
|
||||
pub(crate) use open_options::*;
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -106,7 +116,38 @@ struct SlotInner {
|
||||
tag: u64,
|
||||
|
||||
/// the underlying file
|
||||
file: Option<File>,
|
||||
file: Option<OwnedFd>,
|
||||
}
|
||||
|
||||
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
|
||||
struct PageWriteGuardBuf {
|
||||
page: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
}
|
||||
}
|
||||
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
|
||||
// hence it's safe to hand out the `stable_mut_ptr()`.
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.page.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
@@ -274,6 +315,10 @@ macro_rules! with_file {
|
||||
let $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
|
||||
let mut $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
@@ -326,7 +371,9 @@ impl VirtualFile {
|
||||
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -395,15 +442,13 @@ impl VirtualFile {
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::Fsync, |file| file
|
||||
.as_ref()
|
||||
.sync_all())
|
||||
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.sync_all()))
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
with_file!(self, StorageIoOperation::Metadata, |file| file
|
||||
.as_ref()
|
||||
.metadata())
|
||||
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.metadata()))
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
@@ -412,7 +457,7 @@ impl VirtualFile {
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
|
||||
async fn lock_file(&self) -> Result<FileGuard, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -458,10 +503,9 @@ impl VirtualFile {
|
||||
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
|
||||
// case from StorageIoOperation::Open. This helps with identifying thrashing
|
||||
// of the virtual file descriptor cache.
|
||||
let file = observe_duration!(
|
||||
StorageIoOperation::OpenAfterReplace,
|
||||
self.open_options.open(&self.path)
|
||||
)?;
|
||||
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
|
||||
self.open_options.open(self.path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -486,9 +530,8 @@ impl VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
|
||||
.as_ref()
|
||||
.seek(SeekFrom::End(offset)))?
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
|
||||
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -507,25 +550,28 @@ impl VirtualFile {
|
||||
Ok(self.pos)
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
{
|
||||
let (buf, res) =
|
||||
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
|
||||
res.map(|()| buf)
|
||||
}
|
||||
|
||||
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
|
||||
pub async fn read_exact_at_page(
|
||||
&self,
|
||||
page: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let buf = PageWriteGuardBuf {
|
||||
page,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let res = self.read_exact_at(buf, offset).await;
|
||||
res.map(|PageWriteGuardBuf { page, .. }| page)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
@@ -575,22 +621,35 @@ impl VirtualFile {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file| file
|
||||
.as_ref()
|
||||
.read_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
"read",
|
||||
&self.tenant_id,
|
||||
&self.shard_id,
|
||||
&self.timeline_id,
|
||||
])
|
||||
.add(size as i64);
|
||||
}
|
||||
(buf, res)
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
.write_at(buf, offset));
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
|
||||
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
|
||||
});
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
@@ -600,18 +659,241 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at_impl<B, F, Fut>(
|
||||
buf: B,
|
||||
mut offset: u64,
|
||||
mut read_at: F,
|
||||
) -> (B, std::io::Result<()>)
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
|
||||
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
|
||||
{
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
|
||||
while buf.bytes_total() != 0 {
|
||||
let res;
|
||||
(buf, res) = read_at(buf, offset).await;
|
||||
match res {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
buf = buf.slice(n..);
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return (buf.into_inner(), Err(e)),
|
||||
}
|
||||
}
|
||||
// NB: don't use `buf.is_empty()` here; it is from the
|
||||
// `impl Deref for Slice { Target = [u8] }`; the the &[u8]
|
||||
// returned by it only covers the initialized portion of `buf`.
|
||||
// Whereas we're interested in ensuring that we filled the entire
|
||||
// buffer that the user passed in.
|
||||
if buf.bytes_total() != 0 {
|
||||
(
|
||||
buf.into_inner(),
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
)),
|
||||
)
|
||||
} else {
|
||||
assert_eq!(buf.len(), buf.bytes_total());
|
||||
(buf.into_inner(), Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> AsRef<File> for FileGuard<'a> {
|
||||
fn as_ref(&self) -> &File {
|
||||
#[cfg(test)]
|
||||
mod test_read_exact_at_impl {
|
||||
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
|
||||
|
||||
use super::read_exact_at_impl;
|
||||
|
||||
struct Expectation {
|
||||
offset: u64,
|
||||
bytes_total: usize,
|
||||
result: std::io::Result<Vec<u8>>,
|
||||
}
|
||||
struct MockReadAt {
|
||||
expectations: VecDeque<Expectation>,
|
||||
}
|
||||
|
||||
impl MockReadAt {
|
||||
async fn read_at(
|
||||
&mut self,
|
||||
mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
|
||||
offset: u64,
|
||||
) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
|
||||
let exp = self
|
||||
.expectations
|
||||
.pop_front()
|
||||
.expect("read_at called but we have no expectations left");
|
||||
assert_eq!(exp.offset, offset);
|
||||
assert_eq!(exp.bytes_total, buf.bytes_total());
|
||||
match exp.result {
|
||||
Ok(bytes) => {
|
||||
assert!(bytes.len() <= buf.bytes_total());
|
||||
buf.put_slice(&bytes);
|
||||
(buf, Ok(bytes.len()))
|
||||
}
|
||||
Err(e) => (buf, Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MockReadAt {
|
||||
fn drop(&mut self) {
|
||||
assert_eq!(self.expectations.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() {
|
||||
let buf = Vec::with_capacity(5);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 5,
|
||||
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
|
||||
}]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_buf_issues_no_syscall() {
|
||||
let buf = Vec::new();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::new(),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_two_read_at_calls_needed_until_buf_filled() {
|
||||
let buf = Vec::with_capacity(4);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![
|
||||
Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 4,
|
||||
result: Ok(vec![b'a', b'b']),
|
||||
},
|
||||
Expectation {
|
||||
offset: 2,
|
||||
bytes_total: 2,
|
||||
result: Ok(vec![b'c', b'd']),
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_eof_before_buffer_full() {
|
||||
let buf = Vec::with_capacity(3);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![
|
||||
Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 3,
|
||||
result: Ok(vec![b'a']),
|
||||
},
|
||||
Expectation {
|
||||
offset: 1,
|
||||
bytes_total: 2,
|
||||
result: Ok(vec![b'b']),
|
||||
},
|
||||
Expectation {
|
||||
offset: 2,
|
||||
bytes_total: 1,
|
||||
result: Ok(vec![]),
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
let Err(err) = res else {
|
||||
panic!("should return an error");
|
||||
};
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
assert_eq!(format!("{err}"), "failed to fill whole buffer");
|
||||
// buffer contents on error are unspecified
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard {
|
||||
slot_guard: RwLockReadGuard<'static, SlotInner>,
|
||||
}
|
||||
|
||||
impl AsRef<OwnedFd> for FileGuard {
|
||||
fn as_ref(&self) -> &OwnedFd {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
// if we know that the file is Some.
|
||||
self.slot_guard.file.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileGuard {
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file<F, R>(&self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
|
||||
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
|
||||
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&mut file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
unsafe fn as_fd(&self) -> RawFd {
|
||||
let owned_fd: &OwnedFd = self.as_ref();
|
||||
owned_fd.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -619,16 +901,19 @@ impl VirtualFile {
|
||||
blknum: u32,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let mut buf = [0; PAGE_SZ];
|
||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
let buf = vec![0; PAGE_SZ];
|
||||
let buf = self
|
||||
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
.await?;
|
||||
Ok(std::sync::Arc::new(buf).into())
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
|
||||
let mut tmp = vec![0; 128];
|
||||
loop {
|
||||
let mut tmp = [0; 128];
|
||||
match self.read_at(&mut tmp, self.pos).await {
|
||||
let res;
|
||||
(tmp, res) = self.read_at(tmp, self.pos).await;
|
||||
match res {
|
||||
Ok(0) => return Ok(()),
|
||||
Ok(n) => {
|
||||
self.pos += n as u64;
|
||||
@@ -704,10 +989,12 @@ impl OpenFiles {
|
||||
/// Initialize the virtual file module. This must be called once at page
|
||||
/// server startup.
|
||||
///
|
||||
pub fn init(num_slots: usize) {
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
io_engine::init(engine);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
|
||||
@@ -752,10 +1039,10 @@ mod tests {
|
||||
}
|
||||
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
@@ -797,14 +1084,14 @@ mod tests {
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
|
||||
let mut buf = vec![0; len];
|
||||
self.read_exact_at(&mut buf, pos).await?;
|
||||
let buf = vec![0; len];
|
||||
let buf = self.read_exact_at(buf, pos).await?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_virtual_files() -> Result<(), Error> {
|
||||
async fn test_virtual_files() -> anyhow::Result<()> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
@@ -820,14 +1107,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_physical_files() -> Result<(), Error> {
|
||||
async fn test_physical_files() -> anyhow::Result<()> {
|
||||
test_files("physical_files", |path, open_options| async move {
|
||||
Ok(MaybeVirtualFile::File(open_options.open(path)?))
|
||||
Ok(MaybeVirtualFile::File({
|
||||
let owned_fd = open_options.open(path.as_std_path()).await?;
|
||||
File::from(owned_fd)
|
||||
}))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
|
||||
where
|
||||
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
|
||||
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
|
||||
@@ -971,11 +1261,11 @@ mod tests {
|
||||
for _threadno in 0..THREADS {
|
||||
let files = files.clone();
|
||||
let hdl = rt.spawn(async move {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut buf = vec![0u8; SIZE];
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).await.unwrap();
|
||||
buf = f.read_exact_at(buf, 0).await.unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
});
|
||||
|
||||
114
pageserver/src/virtual_file/io_engine.rs
Normal file
114
pageserver/src/virtual_file/io_engine.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
//! [`super::VirtualFile`] supports different IO engines.
|
||||
//!
|
||||
//! The [`IoEngineKind`] enum identifies them.
|
||||
//!
|
||||
//! The choice of IO engine is global.
|
||||
//! Initialize using [`init`].
|
||||
//!
|
||||
//! Then use [`get`] and [`super::OpenOptions`].
|
||||
|
||||
#[derive(
|
||||
Copy,
|
||||
Clone,
|
||||
PartialEq,
|
||||
Eq,
|
||||
Hash,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
Debug,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum IoEngineKind {
|
||||
StdFs,
|
||||
#[cfg(target_os = "linux")]
|
||||
TokioEpollUring,
|
||||
}
|
||||
|
||||
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn init(engine: IoEngineKind) {
|
||||
if IO_ENGINE.set(engine).is_err() {
|
||||
panic!("called twice");
|
||||
}
|
||||
crate::metrics::virtual_file_io_engine::KIND
|
||||
.with_label_values(&[&format!("{engine}")])
|
||||
.set(1);
|
||||
}
|
||||
|
||||
pub(super) fn get() -> &'static IoEngineKind {
|
||||
#[cfg(test)]
|
||||
{
|
||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
||||
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
|
||||
Ok(v) => match v.parse::<IoEngineKind>() {
|
||||
Ok(engine_kind) => engine_kind,
|
||||
Err(e) => {
|
||||
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
|
||||
}
|
||||
},
|
||||
Err(std::env::VarError::NotPresent) => {
|
||||
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
|
||||
.parse()
|
||||
.unwrap()
|
||||
}
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
}
|
||||
})
|
||||
}
|
||||
#[cfg(not(test))]
|
||||
IO_ENGINE.get().unwrap()
|
||||
}
|
||||
|
||||
use std::os::unix::prelude::FileExt;
|
||||
|
||||
use super::FileGuard;
|
||||
|
||||
impl IoEngineKind {
|
||||
pub(super) async fn read_at<B>(
|
||||
&self,
|
||||
file_guard: FileGuard,
|
||||
offset: u64,
|
||||
mut buf: B,
|
||||
) -> ((FileGuard, B), std::io::Result<usize>)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
{
|
||||
match self {
|
||||
IoEngineKind::StdFs => {
|
||||
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
|
||||
let dst = unsafe {
|
||||
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
|
||||
};
|
||||
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
|
||||
if let Ok(nbytes) = &res {
|
||||
assert!(*nbytes <= buf.bytes_total());
|
||||
// SAFETY: see above assertion
|
||||
unsafe {
|
||||
buf.set_init(*nbytes);
|
||||
}
|
||||
}
|
||||
#[allow(dropping_references)]
|
||||
drop(dst);
|
||||
((file_guard, buf), res)
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, buf).await;
|
||||
(
|
||||
resources,
|
||||
res.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
138
pageserver/src/virtual_file/open_options.rs
Normal file
138
pageserver/src/virtual_file/open_options.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use super::IoEngineKind;
|
||||
use std::{os::fd::OwnedFd, path::Path};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OpenOptions {
|
||||
StdFs(std::fs::OpenOptions),
|
||||
#[cfg(target_os = "linux")]
|
||||
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
|
||||
}
|
||||
|
||||
impl Default for OpenOptions {
|
||||
fn default() -> Self {
|
||||
match super::io_engine::get() {
|
||||
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||
#[cfg(target_os = "linux")]
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenOptions {
|
||||
pub fn new() -> OpenOptions {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
system.open(path, x).await.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
#[cfg(target_os = "linux")]
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from collections import defaultdict
|
||||
from typing import DefaultDict, Dict
|
||||
|
||||
@@ -45,6 +46,15 @@ def main(args: argparse.Namespace):
|
||||
logging.error("cannot fetch flaky tests from the DB due to an error", exc)
|
||||
rows = []
|
||||
|
||||
# If a test run has non-default PAGESERVER_VIRTUAL_FILE_IO_ENGINE (i.e. not empty, not std-fs),
|
||||
# use it to parametrize test name along with build_type and pg_version
|
||||
#
|
||||
# See test_runner/fixtures/parametrize.py for details
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
pageserver_virtual_file_io_engine_parameter = f"-{io_engine}"
|
||||
else:
|
||||
pageserver_virtual_file_io_engine_parameter = ""
|
||||
|
||||
for row in rows:
|
||||
# We don't want to automatically rerun tests in a performance suite
|
||||
if row["parent_suite"] != "test_runner.regress":
|
||||
@@ -53,10 +63,10 @@ def main(args: argparse.Namespace):
|
||||
if row["name"].endswith("]"):
|
||||
parametrized_test = row["name"].replace(
|
||||
"[",
|
||||
f"[{build_type}-pg{pg_version}-",
|
||||
f"[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}-",
|
||||
)
|
||||
else:
|
||||
parametrized_test = f"{row['name']}[{build_type}-pg{pg_version}]"
|
||||
parametrized_test = f"{row['name']}[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}]"
|
||||
|
||||
res[row["parent_suite"]][row["suite"]][parametrized_test] = True
|
||||
|
||||
|
||||
@@ -2,11 +2,13 @@ from __future__ import annotations
|
||||
|
||||
import abc
|
||||
import asyncio
|
||||
import errno
|
||||
import filecmp
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import tempfile
|
||||
import textwrap
|
||||
@@ -173,6 +175,35 @@ def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Ite
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def test_cgroup_dir(request: FixtureRequest) -> Optional[Path]:
|
||||
## Use like so:
|
||||
# systemd-run --user --shell
|
||||
# cat /proc/self/cgroup
|
||||
# # will look like so: 0::/user.slice/user-1000.slice/user@1000.service/app.slice/run-u5.service
|
||||
# env \
|
||||
# NEON_TEST_SUITE_USE_CGROUPS=/sys/fs/cgroup/user.slice/user-1000.slice/user@1000.service/app.slice/run-u5.service \
|
||||
# BUILD_TYPE=debug \
|
||||
# DEFAULT_PG_VERSION=15 \
|
||||
# ./scripts/pytest
|
||||
|
||||
var = os.getenv("NEON_TEST_SUITE_USE_CGROUPS")
|
||||
if var is None:
|
||||
return None
|
||||
root = Path(var)
|
||||
assert root.is_dir()
|
||||
assert (
|
||||
"cgroup2fs"
|
||||
== subprocess.check_output(
|
||||
["stat", "--file-system", "--format=%T", root],
|
||||
text=True,
|
||||
).strip()
|
||||
)
|
||||
test_cgroup_dir: Path = get_test_cgroup_dir(request, root)
|
||||
test_cgroup_dir.mkdir(exist_ok=False, parents=False)
|
||||
return test_cgroup_dir
|
||||
|
||||
|
||||
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
|
||||
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
|
||||
|
||||
@@ -446,6 +477,8 @@ class NeonEnvBuilder:
|
||||
preserve_database_files: bool = False,
|
||||
initial_tenant: Optional[TenantId] = None,
|
||||
initial_timeline: Optional[TimelineId] = None,
|
||||
pageserver_virtual_file_io_engine: Optional[str] = None,
|
||||
test_cgroup_dir: Optional[Path] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -481,6 +514,9 @@ class NeonEnvBuilder:
|
||||
self.config_init_force: Optional[str] = None
|
||||
self.top_output_dir = top_output_dir
|
||||
|
||||
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
|
||||
self.test_cgroup_dir = test_cgroup_dir
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
@@ -873,6 +909,15 @@ class NeonEnvBuilder:
|
||||
x.do_cleanup()
|
||||
|
||||
def __enter__(self) -> "NeonEnvBuilder":
|
||||
if self.test_cgroup_dir is not None:
|
||||
log.info(f"putting test runner into cgroup {self.test_cgroup_dir}")
|
||||
procs_file = self.test_cgroup_dir / "cgroup.procs"
|
||||
# TODO: auto-cleanup the cgroup, share code with __exit__
|
||||
pids = procs_file.read_text().split()
|
||||
assert pids == []
|
||||
mypid = os.getpid()
|
||||
with open(procs_file, "a") as f:
|
||||
f.write(f" {mypid}\n")
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
@@ -926,6 +971,57 @@ class NeonEnvBuilder:
|
||||
if cleanup_error is not None:
|
||||
cleanup_error = e
|
||||
|
||||
if self.test_cgroup_dir is not None:
|
||||
# TODO: ensure that we're the only python thread running;
|
||||
# otherwise, if a test leaks a thread,
|
||||
# our checking here would race with that other thread.
|
||||
# => https://github.com/neondatabase/neon/issues/6486
|
||||
log.info(f"check test runner cgroup for leaked processes: {self.test_cgroup_dir}")
|
||||
procs_file = self.test_cgroup_dir / "cgroup.procs"
|
||||
mypid = os.getpid()
|
||||
# move ourselves out of cgroup
|
||||
with open(self.test_cgroup_dir.parent / "cgroup.procs", "a") as f:
|
||||
f.write(f"{mypid}")
|
||||
# now freeze the test cgroup, so we can race-free list & SIGKILL the processes
|
||||
freeze_file = self.test_cgroup_dir / "cgroup.freeze"
|
||||
freeze_file.write_text("1")
|
||||
# inspect remaining pids
|
||||
pids = [int(pid) for pid in procs_file.read_text().split()]
|
||||
had_leaked_pids = len(pids) > 0
|
||||
for pid in pids:
|
||||
# dump info
|
||||
try:
|
||||
args_file = Path(f"/proc/{pid}/cmdline").read_bytes()
|
||||
args_parsed = [
|
||||
bytes.decode("utf-8") for bytes in args_file.split(b"\x00")
|
||||
] # NULL-byte separated
|
||||
args = f"{args_parsed}"
|
||||
except Exception as e:
|
||||
args = f"cmdline unavailable, {type(e)}, {e}"
|
||||
log.warning(f"SIGKILLing leaked process: {pid}: {args}")
|
||||
# send SIGKILL to the frozen process
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
# thaw the cgroup
|
||||
freeze_file.write_text("0")
|
||||
# wait for processes to exit
|
||||
while True:
|
||||
try:
|
||||
self.test_cgroup_dir.rmdir()
|
||||
break
|
||||
except OSError as e:
|
||||
if e.errno != errno.EBUSY:
|
||||
raise
|
||||
pids = [int(pid) for pid in procs_file.read_text().split()]
|
||||
if len(pids) == 0:
|
||||
break
|
||||
log.warning(f"waiting for pids to exit: {pids}")
|
||||
time.sleep(0.1)
|
||||
log.info("all pids exited and test cgroup has been deleted")
|
||||
cleanup_cmd = (
|
||||
f"for pid in $(cat '{procs_file}'); do; kill -9 $pid; done; rmdir {procs_file}"
|
||||
)
|
||||
assert not had_leaked_pids, f"had pids in test cgroup after NeonEnvBuilder teardown, see logs for details & cleanup help\ncleanup using bash command: {cleanup_cmd}\n"
|
||||
|
||||
|
||||
class NeonEnv:
|
||||
"""
|
||||
@@ -995,6 +1091,8 @@ class NeonEnv:
|
||||
self, config.auth_enabled
|
||||
)
|
||||
|
||||
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
cfg: Dict[str, Any] = {
|
||||
"default_tenant_id": str(self.initial_tenant),
|
||||
@@ -1026,6 +1124,9 @@ class NeonEnv:
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
}
|
||||
if self.pageserver_virtual_file_io_engine is not None:
|
||||
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
|
||||
|
||||
# Create a corresponding NeonPageserver object
|
||||
self.pageservers.append(
|
||||
NeonPageserver(
|
||||
@@ -1191,6 +1292,8 @@ def _shared_simple_env(
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
test_cgroup_dir: Optional[Path],
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
|
||||
@@ -1220,6 +1323,8 @@ def _shared_simple_env(
|
||||
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
test_cgroup_dir=test_cgroup_dir,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
@@ -1258,6 +1363,8 @@ def neon_env_builder(
|
||||
request: FixtureRequest,
|
||||
test_overlay_dir: Path,
|
||||
top_output_dir: Path,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
test_cgroup_dir: Optional[Path],
|
||||
) -> Iterator[NeonEnvBuilder]:
|
||||
"""
|
||||
Fixture to create a Neon environment for test.
|
||||
@@ -1287,9 +1394,11 @@ def neon_env_builder(
|
||||
broker=default_broker,
|
||||
run_id=run_id,
|
||||
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
test_name=request.node.name,
|
||||
test_output_dir=test_output_dir,
|
||||
test_overlay_dir=test_overlay_dir,
|
||||
test_cgroup_dir=test_cgroup_dir,
|
||||
) as builder:
|
||||
yield builder
|
||||
|
||||
@@ -3624,6 +3733,10 @@ def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir) / "repo"
|
||||
|
||||
|
||||
def get_test_cgroup_dir(request: FixtureRequest, top_cgroup_dir: Path) -> Path:
|
||||
return _get_test_dir(request, top_cgroup_dir, "")
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
parser.addoption(
|
||||
"--preserve-database-files",
|
||||
|
||||
@@ -8,7 +8,7 @@ from _pytest.python import Metafunc
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
"""
|
||||
Dynamically parametrize tests by Postgres version and build type (debug/release/remote)
|
||||
Dynamically parametrize tests by Postgres version, build type (debug/release/remote), and possibly by other parameters
|
||||
"""
|
||||
|
||||
|
||||
@@ -31,11 +31,12 @@ def build_type(request: FixtureRequest) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
# Do not parametrize performance tests yet, we need to prepare grafana charts first
|
||||
if "test_runner/performance" in metafunc.definition._nodeid:
|
||||
return
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_virtual_file_io_engine(request: FixtureRequest) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
if (v := os.environ.get("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
@@ -46,5 +47,12 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
# Do not parametrize performance tests yet by Postgres version or build type, we need to prepare grafana charts first
|
||||
if "test_runner/performance" not in metafunc.definition._nodeid:
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
|
||||
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep tests statistics
|
||||
if (io_engine := os.environ.get("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])
|
||||
|
||||
@@ -29,7 +29,7 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
|
||||
@pytest.mark.parametrize("duration", [30])
|
||||
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
|
||||
@pytest.mark.parametrize("n_tenants", [1, 10, 100])
|
||||
@pytest.mark.parametrize("n_tenants", [1, 10])
|
||||
@pytest.mark.timeout(
|
||||
10000
|
||||
) # TODO: this value is just "a really high number"; have this per instance type
|
||||
|
||||
@@ -24,7 +24,8 @@ from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import subprocess_capture
|
||||
|
||||
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
|
||||
# fixture order matters here: neon_env_builder leaked process assertions
|
||||
def test_import_from_vanilla(test_output_dir, pg_bin, neon_env_builder, vanilla_pg):
|
||||
# Put data in vanilla pg
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
|
||||
@@ -59,3 +59,5 @@ def test_neon_two_primary_endpoints_fail(
|
||||
env.neon_cli.endpoint_stop("ep1")
|
||||
# ep1 is stopped so create ep2 will succeed
|
||||
env.neon_cli.endpoint_start("ep2")
|
||||
# cleanup
|
||||
env.neon_cli.endpoint_stop("ep2")
|
||||
|
||||
Reference in New Issue
Block a user