Compare commits

...

25 Commits

Author SHA1 Message Date
Christian Schwarz
2978c839c6 avoid --privileged and blanket passwdless sudo 2024-01-26 13:04:10 +00:00
Christian Schwarz
1fdde9e41e fixup cedc0376ff 2024-01-26 10:26:03 +00:00
Christian Schwarz
3c71003b7d Revert "[DO NOT MERGE] build only debug build for v14"
This reverts commit 800d3d1cee.
2024-01-26 10:18:45 +00:00
Christian Schwarz
a36be87680 Merge remote-tracking branch 'origin/main' into problame/neon-env-builder-cgroup 2024-01-26 10:17:25 +00:00
Christian Schwarz
39490ddd6c fix other detected process leakage in the lowest-effort-way possible 2024-01-26 10:11:08 +00:00
Christian Schwarz
2b581eefa7 add todo to protect against race condition with leaked threads 2024-01-26 10:09:35 +00:00
Christian Schwarz
cedc0376ff fix(neon_local): leaks compute_ctl child process if get_status() fails
Copy-pasting from #6474 here; as multiple TODO comments in this file
indicate, we should really be using background_process::start_process
for compute_ctl => https://github.com/neondatabase/neon/pull/6482
2024-01-26 10:08:33 +00:00
Vadim Kharitonov
12e9b2a909 Update plv8 (#6465) 2024-01-26 09:56:11 +00:00
Christian Schwarz
918b03b3b0 integrate tokio-epoll-uring as alternative VirtualFile IO engine (#5824) 2024-01-26 09:25:07 +01:00
Christian Schwarz
b2ec54b8ac Revert "run tests sequentially"
This reverts commit 407c78cfaf.
2024-01-25 20:31:20 +00:00
Christian Schwarz
193ba2384a Revert "[DO NOT MERGE] fail fast"
This reverts commit 194981c16b.
2024-01-25 20:31:04 +00:00
Christian Schwarz
bcb8bed875 Revert "guarantee leaked processes so we know this actually works"
This reverts commit 4204a7dd59.
2024-01-25 19:53:09 +00:00
Alexander Bayandin
d36623ad74 CI: cancel old e2e-tests on new commits (#6463)
## Problem

Triggered `e2e-tests` job is not cancelled along with other jobs in a PR
if the PR get new commits. We can improve the situation by setting
`concurrency_group` for the remote workflow
(https://github.com/neondatabase/cloud/pull/9622 adds
`concurrency_group` group input to the remote workflow).

Ref https://neondb.slack.com/archives/C059ZC138NR/p1706087124297569

Cloud's part added in https://github.com/neondatabase/cloud/pull/9622

## Summary of changes
- Set `concurrency_group` parameter when triggering `e2e-tests`
- At the beginning of a CI pipeline, trigger Cloud's
`cancel-previous-in-concurrency-group.yml` workflow which cancels
previously triggered e2e-tests
2024-01-25 19:25:29 +00:00
Christian Schwarz
23e36ae6a3 see if this fixes the permission denied issue 2024-01-25 19:02:35 +00:00
Christian Schwarz
194981c16b [DO NOT MERGE] fail fast 2024-01-25 19:01:04 +00:00
Christian Schwarz
415b489f18 neon_simple_env was not using test_cgroup_dir 2024-01-25 18:59:41 +00:00
Christian Schwarz
689ad72e92 fix(neon_local): leaks child process if it fails to start & pass checks (#6474)
refs https://github.com/neondatabase/neon/issues/6473

Before this PR, if process_started() didn't return Ok(true) until we
ran out of retries, we'd return an error but leave the process running.

Try it by adding a 20s sleep to the pageserver `main()`, e.g., right
before we claim the pidfile.

Without this PR, output looks like so:

```
(.venv) cs@devvm-mbp:[~/src/neon-work-2]: ./target/debug/neon_local start
Starting neon broker at 127.0.0.1:50051.
storage_broker started, pid: 2710939
.
attachment_service started, pid: 2710949
Starting pageserver node 1 at '127.0.0.1:64000' in ".neon/pageserver_1".....
pageserver has not started yet, continuing to wait.....
pageserver 1 start failed: pageserver did not start in 10 seconds
No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: ".neon/pageserver_1/pageserver.pid"
No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: ".neon/safekeepers/sk1/safekeeper.pid"
Stopping storage_broker with pid 2710939 immediately.......
storage_broker has not stopped yet, continuing to wait.....
neon broker stop failed: storage_broker with pid 2710939 did not stop in 10 seconds
Stopping attachment_service with pid 2710949 immediately.......
attachment_service has not stopped yet, continuing to wait.....
attachment service stop failed: attachment_service with pid 2710949 did not stop in 10 seconds
```

and we leak the pageserver process

```
(.venv) cs@devvm-mbp:[~/src/neon-work-2]: ps aux | grep pageserver
cs       2710959  0.0  0.2 2377960 47616 pts/4   Sl   14:36   0:00 /home/cs/src/neon-work-2/target/debug/pageserver -D .neon/pageserver_1 -c id=1 -c pg_distrib_dir='/home/cs/src/neon-work-2/pg_install' -c http_auth_type='Trust' -c pg_auth_type='Trust' -c listen_http_addr='127.0.0.1:9898' -c listen_pg_addr='127.0.0.1:64000' -c broker_endpoint='http://127.0.0.1:50051/' -c control_plane_api='http://127.0.0.1:1234/' -c remote_storage={local_path='../local_fs_remote_storage/pageserver'}
```

After this PR, there is no leaked process.
2024-01-25 19:20:02 +01:00
Christian Schwarz
fd4cce9417 test_pageserver_max_throughput_getpage_at_latest_lsn: remove n_tenants=100 combination (#6477)
Need to fix the neon_local timeouts first
(https://github.com/neondatabase/neon/issues/6473)
and also not run them on every merge, but only nightly:
https://github.com/neondatabase/neon/issues/6476
2024-01-25 18:17:53 +00:00
Arpad Müller
d52b81340f S3 based recovery (#6155)
Adds a new `time_travel_recover` function to the `RemoteStorage` trait
that allows time travel like functionality for S3 buckets, regardless of
their content (it is not even pageserver related). It takes a different
approach from [this
post](https://aws.amazon.com/blogs/storage/point-in-time-restore-for-amazon-s3-buckets/)
that is more complicated.

It takes as input a prefix a target timestamp, and a limit timestamp:

* executes [`ListObjectVersions`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html)
* obtains the latest version that comes before the target timestamp
* copies that latest version to the same prefix
* if there is versions newer than the limit timestamp, it doesn't do
anything for the file

The limit timestamp is meant to be some timestamp before the start of
the recovery operation and after any changes that one wants to revert.
For example, it might be the time point after a tenant was detached from
all involved pageservers. The limiting mechanism ensures that the
operation is idempotent and can be retried without causing additional
writes/copies.

The approach fulfills all the requirements laid out in 8233, and is a
recoverable operation. Nothing is deleted permanently, only new entries
added to the version log.

I also enable [nextest retries](https://nexte.st/book/retries.html) to
help with some general S3 flakiness (on top of low level retries).

Part of https://github.com/neondatabase/cloud/issues/8233
2024-01-25 18:23:18 +01:00
Joonas Koivunen
8dee9908f8 fix(compaction_task): wrong log levels (#6442)
Filter what we log on compaction task. Per discussion in last triage
call, fixing these by introducing and inspecting the root cause within
anyhow::Error instead of rolling out proper conversions.

Fixes: #6365
Fixes: #6367
2024-01-25 18:45:17 +02:00
Christian Schwarz
407c78cfaf run tests sequentially 2024-01-25 16:27:10 +00:00
Alexander Bayandin
800d3d1cee [DO NOT MERGE] build only debug build for v14 2024-01-25 16:20:55 +00:00
Christian Schwarz
4204a7dd59 guarantee leaked processes so we know this actually works 2024-01-25 13:05:29 +00:00
Christian Schwarz
b602063f7f attempt to make it work in CI 2024-01-25 13:05:29 +00:00
Christian Schwarz
bcfd333c98 feat(test suite): use cgroups to detect if a test leaks processes
Tested manually by commenting out NeonEnv.stop()'s self.attachment_service.stop() call.
2024-01-25 13:05:29 +00:00
42 changed files with 1544 additions and 232 deletions

View File

@@ -21,6 +21,9 @@ env:
COPT: '-Werror'
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY_DEV }}
NEXTEST_RETRIES: 3
# A concurrency group that we use for e2e-tests runs, matches `concurrency.group` above with `github.repository` as a prefix
E2E_CONCURRENCY_GROUP: ${{ github.repository }}-${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
jobs:
check-permissions:
@@ -44,6 +47,20 @@ jobs:
exit 1
cancel-previous-e2e-tests:
needs: [ check-permissions ]
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- name: Cancel previous e2e-tests runs for this PR
env:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
gh workflow --repo neondatabase/cloud \
run cancel-previous-in-concurrency-group.yml \
--field concurrency_group="${{ env.E2E_CONCURRENCY_GROUP }}"
tag:
needs: [ check-permissions ]
runs-on: [ self-hosted, gen3, small ]
@@ -186,7 +203,11 @@ jobs:
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
options: --init
# Raise locked memory limit for tokio-epoll-uring.
# On 5.10 LTS kernels < 5.10.162 (and generally mainline kernels < 5.12),
# io_uring will account the memory of the CQ and SQ as locked.
# More details: https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
strategy:
fail-fast: false
matrix:
@@ -341,7 +362,9 @@ jobs:
- name: Run rust tests
run: |
${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
for io_engine in std-fs tokio-epoll-uring ; do
NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE=$io_engine ${cov_prefix} cargo nextest run $CARGO_FLAGS $CARGO_FEATURES
done
# Run separate tests for real S3
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
@@ -419,8 +442,8 @@ jobs:
runs-on: [ self-hosted, gen3, large ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
# Default shared memory is 64mb
options: --init --shm-size=512mb
# for changed limits, see comments on `options:` earlier in this file
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864 --cgroupns=private --security-opt umask=/sys/fs/cgroup
strategy:
fail-fast: false
matrix:
@@ -433,6 +456,9 @@ jobs:
submodules: true
fetch-depth: 1
- name: Setup cgroup for use by test suite
run: sudo bash -x /setup_neon_testsuite_cgroup.bash
- name: Pytest regression tests
uses: ./.github/actions/run-python-test-set
with:
@@ -448,6 +474,8 @@ jobs:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
NEON_TEST_SUITE_USE_CGROUPS: /sys/fs/cgroup/neon_testsuite
- name: Merge and upload coverage data
if: matrix.build_type == 'debug' && matrix.pg_version == 'v14'
@@ -458,12 +486,13 @@ jobs:
runs-on: [ self-hosted, gen3, small ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:${{ needs.build-buildtools-image.outputs.build-tools-tag }}
# Default shared memory is 64mb
options: --init --shm-size=512mb
# for changed limits, see comments on `options:` earlier in this file
options: --init --shm-size=512mb --ulimit memlock=67108864:67108864
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
strategy:
fail-fast: false
matrix:
# the amount of groups (N) should be reflected in `extra_params: --splits N ...`
pytest_split_group: [ 1, 2, 3, 4 ]
build_type: [ release ]
steps:
@@ -477,11 +506,12 @@ jobs:
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ github.ref_name == 'main' }}
extra_params: --splits ${{ strategy.job-total }} --group ${{ matrix.pytest_split_group }}
extra_params: --splits 4 --group ${{ matrix.pytest_split_group }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
@@ -695,7 +725,8 @@ jobs:
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\",
\"storage_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\"
\"compute_image_tag\": \"${{ needs.tag.outputs.build-tag }}\",
\"concurrency_group\": \"${{ env.E2E_CONCURRENCY_GROUP }}\"
}
}"

44
Cargo.lock generated
View File

@@ -1342,6 +1342,7 @@ dependencies = [
"regex",
"reqwest",
"safekeeper_api",
"scopeguard",
"serde",
"serde_json",
"serde_with",
@@ -2563,6 +2564,16 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "io-uring"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@@ -3361,6 +3372,7 @@ dependencies = [
"tenant_size_model",
"thiserror",
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
@@ -5382,18 +5394,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
dependencies = [
"proc-macro2",
"quote",
@@ -5517,6 +5529,21 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0dd3a2f8bf3239d34a19719ef1a74146c093126f"
dependencies = [
"futures",
"once_cell",
"scopeguard",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"uring-common",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
@@ -6026,6 +6053,15 @@ dependencies = [
"webpki-roots 0.23.1",
]
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#0dd3a2f8bf3239d34a19719ef1a74146c093126f"
dependencies = [
"io-uring",
"libc",
]
[[package]]
name = "url"
version = "2.3.1"

View File

@@ -151,6 +151,7 @@ test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.7", features = ["rustls", "hyper-h1"] }
tokio = { version = "1.17", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.10.0"
tokio-rustls = "0.24"

View File

@@ -1,8 +1,5 @@
FROM debian:bullseye-slim
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
SHELL ["/bin/bash", "-c"]
# System deps
RUN set -e \
@@ -43,6 +40,7 @@ RUN set -e \
openssh-client \
parallel \
pkg-config \
sudo \
unzip \
wget \
xz-utils \
@@ -50,6 +48,17 @@ RUN set -e \
zstd \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# Add nonroot user
RUN useradd -ms /bin/bash nonroot -b /home
SHELL ["/bin/bash", "-c"]
RUN echo "#!/usr/bin/env bash \
set -exuo pipefail \
mkdir /sys/fs/cgroup/neon_testsuite \
chown -R nonroot:nonroot /sys/fs/cgroup/neon_testsuite \
echo SUCCESS: cgroup set up for user nonroot at /sys/fs/cgroup/neon_testsuite \
" > /setup_neon_testsuite_cgroup.bash && chmod +x /setup_neon_testsuite_cgroup.bash
RUN echo "ALL ALL = (ALL) NOPASSWD: /setup_neon_testsuite_cgroup.bash" >> /etc/sudoers
# protobuf-compiler (protoc)
ENV PROTOC_VERSION 25.1
RUN curl -fsSL "https://github.com/protocolbuffers/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-$(uname -m | sed 's/aarch64/aarch_64/g').zip" -o "protoc.zip" \

View File

@@ -144,30 +144,23 @@ RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouti
FROM build-deps AS plv8-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ARG PG_VERSION
RUN apt update && \
apt install -y ninja-build python3-dev libncurses5 binutils clang
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
export PLV8_VERSION=3.1.5 \
export PLV8_CHECKSUM=1e108d5df639e4c189e1c5bdfa2432a521c126ca89e7e5a969d46899ca7bf106 \
;; \
"v16") \
export PLV8_VERSION=3.1.8 \
export PLV8_CHECKSUM=92b10c7db39afdae97ff748c9ec54713826af222c459084ad002571b79eb3f49 \
;; \
*) \
echo "Export the valid PG_VERSION variable" && exit 1 \
;; \
esac && \
wget https://github.com/plv8/plv8/archive/refs/tags/v${PLV8_VERSION}.tar.gz -O plv8.tar.gz && \
echo "${PLV8_CHECKSUM} plv8.tar.gz" | sha256sum --check && \
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.10.tar.gz -O plv8.tar.gz && \
echo "7096c3290928561f0d4901b7a52794295dc47f6303102fae3f8e42dd575ad97d plv8.tar.gz" | sha256sum --check && \
mkdir plv8-src && cd plv8-src && tar xvzf ../plv8.tar.gz --strip-components=1 -C . && \
# generate and copy upgrade scripts
mkdir -p upgrade && ./generate_upgrade.sh 3.1.10 && \
cp upgrade/* /usr/local/pgsql/share/extension/ && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) install && \
rm -rf /plv8-* && \
find /usr/local/pgsql/ -name "plv8-*.so" | xargs strip && \
# don't break computes with installed old version of plv8
cd /usr/local/pgsql/lib/ && \
ln -s plv8-3.1.10.so plv8-3.1.5.so && \
ln -s plv8-3.1.10.so plv8-3.1.8.so && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plcoffee.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plls.control

View File

@@ -19,6 +19,7 @@ hex.workspace = true
hyper.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = ["blocking", "json"] }
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true

View File

@@ -9,7 +9,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{path::PathBuf, process::Child, str::FromStr};
use std::{path::PathBuf, str::FromStr};
use tracing::instrument;
use utils::{
auth::{Claims, Scope},
@@ -220,7 +220,7 @@ impl AttachmentService {
.expect("non-Unicode path")
}
pub async fn start(&self) -> anyhow::Result<Child> {
pub async fn start(&self) -> anyhow::Result<()> {
let path_str = self.path.to_string_lossy();
let mut args = vec!["-l", &self.listen, "-p", &path_str]
@@ -254,6 +254,7 @@ impl AttachmentService {
)
.await;
// TODO: shouldn't we bail if we fail to spawn the process?
for ps_conf in &self.env.pageservers {
let (pg_host, pg_port) =
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");

View File

@@ -17,7 +17,7 @@ use std::io::Write;
use std::os::unix::prelude::AsRawFd;
use std::os::unix::process::CommandExt;
use std::path::Path;
use std::process::{Child, Command};
use std::process::Command;
use std::time::Duration;
use std::{fs, io, thread};
@@ -60,7 +60,7 @@ pub async fn start_process<F, Fut, AI, A, EI>(
envs: EI,
initial_pid_file: InitialPidFile,
process_status_check: F,
) -> anyhow::Result<Child>
) -> anyhow::Result<()>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<bool>>,
@@ -98,7 +98,7 @@ where
InitialPidFile::Expect(path) => path,
};
let mut spawned_process = filled_cmd.spawn().with_context(|| {
let spawned_process = filled_cmd.spawn().with_context(|| {
format!("Could not spawn {process_name}, see console output and log files for details.")
})?;
let pid = spawned_process.id();
@@ -106,12 +106,26 @@ where
i32::try_from(pid)
.with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
);
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
println!("SIGKILL & wait the started process");
(|| {
// TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
spawned_process.kill().context("SIGKILL child")?;
spawned_process.wait().context("wait() for child process")?;
anyhow::Ok(())
})()
.with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
.unwrap();
});
for retries in 0..RETRIES {
match process_started(pid, pid_file_to_check, &process_status_check).await {
Ok(true) => {
println!("\n{process_name} started, pid: {pid}");
return Ok(spawned_process);
println!("\n{process_name} started and passed status check, pid: {pid}");
// leak the child process, it'll outlive this neon_local invocation
drop(scopeguard::ScopeGuard::into_inner(spawned_process));
return Ok(());
}
Ok(false) => {
if retries == NOTICE_AFTER_RETRIES {
@@ -126,16 +140,15 @@ where
thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
}
Err(e) => {
println!("{process_name} failed to start: {e:#}");
if let Err(e) = spawned_process.kill() {
println!("Could not stop {process_name} subprocess: {e:#}")
};
println!("error starting process {process_name:?}: {e:#}");
return Err(e);
}
}
}
println!();
anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
anyhow::bail!(
"{process_name} did not start+pass status checks within {RETRY_UNTIL_SECS} seconds"
);
}
/// Stops the process, using the pid file given. Returns Ok also if the process is already not running.

View File

@@ -438,7 +438,7 @@ impl Endpoint {
}
fn wait_for_compute_ctl_to_exit(&self, send_sigterm: bool) -> Result<()> {
// TODO use background_process::stop_process instead
// TODO use background_process::stop_process instead: https://github.com/neondatabase/neon/pull/6482
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?;
let pid = nix::unistd::Pid::from_raw(pid as i32);
@@ -583,9 +583,21 @@ impl Endpoint {
}
let child = cmd.spawn()?;
// set up a scopeguard to kill & wait for the child in case we panic or bail below
let child = scopeguard::guard(child, |mut child| {
println!("SIGKILL & wait the started process");
(|| {
// TODO: use another signal that can be caught by the child so it can clean up any children it spawned
child.kill().context("SIGKILL child")?;
child.wait().context("wait() for child process")?;
anyhow::Ok(())
})()
.with_context(|| format!("scopeguard kill&wait child {child:?}"))
.unwrap();
});
// Write down the pid so we can wait for it when we want to stop
// TODO use background_process::start_process instead
// TODO use background_process::start_process instead: https://github.com/neondatabase/neon/pull/6482
let pid = child.id();
let pidfile_path = self.endpoint_path().join("compute_ctl.pid");
std::fs::write(pidfile_path, pid.to_string())?;
@@ -634,6 +646,9 @@ impl Endpoint {
std::thread::sleep(ATTEMPT_INTERVAL);
}
// disarm the scopeguard, let the child outlive this function (and neon_local invoction)
drop(scopeguard::ScopeGuard::into_inner(child));
Ok(())
}

View File

@@ -11,7 +11,7 @@ use std::io;
use std::io::Write;
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::process::Command;
use std::time::Duration;
use anyhow::{bail, Context};
@@ -161,7 +161,7 @@ impl PageServerNode {
.expect("non-Unicode path")
}
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
self.start_node(config_overrides, false).await
}
@@ -207,7 +207,7 @@ impl PageServerNode {
&self,
config_overrides: &[&str],
update_config: bool,
) -> anyhow::Result<Child> {
) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
print!(

View File

@@ -7,7 +7,6 @@
//! ```
use std::io::Write;
use std::path::PathBuf;
use std::process::Child;
use std::{io, result};
use anyhow::Context;
@@ -104,7 +103,7 @@ impl SafekeeperNode {
.expect("non-Unicode path")
}
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<Child> {
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<()> {
print!(
"Starting safekeeper at '{}' in '{}'",
self.pg_connection_config.raw_address(),

View File

@@ -8,6 +8,7 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Result;
@@ -23,6 +24,7 @@ use futures::stream::Stream;
use futures_util::StreamExt;
use http_types::{StatusCode, Url};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use crate::s3_bucket::RequestKind;
@@ -370,6 +372,20 @@ impl RemoteStorage for AzureBlobStorage {
copy_status = status;
}
}
async fn time_travel_recover(
&self,
_prefix: Option<&RemotePath>,
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: CancellationToken,
) -> anyhow::Result<()> {
// TODO use Azure point in time recovery feature for this
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Err(anyhow::anyhow!(
"time travel recovery for azure blob storage is not implemented"
))
}
}
pin_project_lite::pin_project! {

View File

@@ -25,6 +25,7 @@ use bytes::Bytes;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use toml_edit::Item;
use tracing::info;
@@ -210,6 +211,15 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// Copy a remote object inside a bucket from one path to another.
async fn copy(&self, from: &RemotePath, to: &RemotePath) -> anyhow::Result<()>;
/// Resets the content of everything with the given prefix to the given state
async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
) -> anyhow::Result<()>;
}
pub type DownloadStream = Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Unpin + Send + Sync>>;
@@ -387,6 +397,33 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::Unreliable(s) => s.copy(from, to).await,
}
}
pub async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::AwsS3(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::AzureBlob(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::Unreliable(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
}
}
}
impl GenericRemoteStorage {
@@ -674,6 +711,7 @@ impl ConcurrencyLimiter {
RequestKind::List => &self.read,
RequestKind::Delete => &self.write,
RequestKind::Copy => &self.write,
RequestKind::TimeTravel => &self.write,
}
}

View File

@@ -4,7 +4,7 @@
//! This storage used in tests, but can also be used in cases when a certain persistent
//! volume is mounted to the local FS.
use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin, time::SystemTime};
use anyhow::{bail, ensure, Context};
use bytes::Bytes;
@@ -14,7 +14,7 @@ use tokio::{
fs,
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tokio_util::io::ReaderStream;
use tokio_util::{io::ReaderStream, sync::CancellationToken};
use tracing::*;
use utils::{crashsafe::path_with_suffix_extension, fs_ext::is_directory_empty};
@@ -422,6 +422,17 @@ impl RemoteStorage for LocalFs {
})?;
Ok(())
}
#[allow(clippy::diverging_sub_expression)]
async fn time_travel_recover(
&self,
_prefix: Option<&RemotePath>,
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: CancellationToken,
) -> anyhow::Result<()> {
unimplemented!()
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {

View File

@@ -6,12 +6,14 @@
use std::{
borrow::Cow,
collections::HashMap,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::SystemTime,
};
use anyhow::Context as _;
use anyhow::{anyhow, Context as _};
use aws_config::{
environment::credentials::EnvironmentVariableCredentialsProvider,
imds::credentials::ImdsCredentialsProvider,
@@ -27,17 +29,19 @@ use aws_sdk_s3::{
config::{AsyncSleep, Builder, IdentityCache, Region, SharedAsyncSleep},
error::SdkError,
operation::get_object::GetObjectError,
types::{Delete, ObjectIdentifier},
types::{Delete, DeleteMarkerEntry, ObjectIdentifier, ObjectVersion},
Client,
};
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;
use aws_smithy_types::{body::SdkBody, DateTime};
use bytes::Bytes;
use futures::stream::Stream;
use hyper::Body;
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use utils::backoff;
use super::StorageMetadata;
use crate::{
@@ -270,6 +274,59 @@ impl S3Bucket {
}
}
}
async fn delete_oids(
&self,
kind: RequestKind,
delete_objects: &[ObjectIdentifier],
) -> anyhow::Result<()> {
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
let started_at = start_measuring_requests(kind);
let resp = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(
Delete::builder()
.set_objects(Some(chunk.to_vec()))
.build()?,
)
.send()
.await;
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);
let resp = resp?;
metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
}
}
Ok(())
}
}
pin_project_lite::pin_project! {
@@ -568,64 +625,168 @@ impl RemoteStorage for S3Bucket {
delete_objects.push(obj_id);
}
for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE) {
let started_at = start_measuring_requests(kind);
let resp = self
.client
.delete_objects()
.bucket(self.bucket_name.clone())
.delete(
Delete::builder()
.set_objects(Some(chunk.to_vec()))
.build()?,
)
.send()
.await;
let started_at = ScopeGuard::into_inner(started_at);
metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, &resp, started_at);
match resp {
Ok(resp) => {
metrics::BUCKET_METRICS
.deleted_objects_total
.inc_by(chunk.len() as u64);
if let Some(errors) = resp.errors {
// Log a bounded number of the errors within the response:
// these requests can carry 1000 keys so logging each one
// would be too verbose, especially as errors may lead us
// to retry repeatedly.
const LOG_UP_TO_N_ERRORS: usize = 10;
for e in errors.iter().take(LOG_UP_TO_N_ERRORS) {
tracing::warn!(
"DeleteObjects key {} failed: {}: {}",
e.key.as_ref().map(Cow::from).unwrap_or("".into()),
e.code.as_ref().map(Cow::from).unwrap_or("".into()),
e.message.as_ref().map(Cow::from).unwrap_or("".into())
);
}
return Err(anyhow::format_err!(
"Failed to delete {} objects",
errors.len()
));
}
}
Err(e) => {
return Err(e.into());
}
}
}
Ok(())
self.delete_oids(kind, &delete_objects).await
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let paths = std::array::from_ref(path);
self.delete_objects(paths).await
}
async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let kind = RequestKind::TimeTravel;
let _guard = self.permit(kind).await;
let timestamp = DateTime::from(timestamp);
let done_if_after = DateTime::from(done_if_after);
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// get the passed prefix or if it is not set use prefix_in_bucket value
let prefix = prefix
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone());
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |_e: &_| false;
let list = backoff::retry(
|| async {
Ok(self
.client
.list_object_versions()
.bucket(self.bucket_name.clone())
.set_prefix(prefix.clone())
.send()
.await?)
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions for time_travel_recover",
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
)
.await?;
if list.is_truncated().unwrap_or_default() {
anyhow::bail!("Received truncated ListObjectVersions response for prefix={prefix:?}");
}
let mut versions_deletes = list
.versions()
.iter()
.map(VerOrDelete::Version)
.chain(list.delete_markers().iter().map(VerOrDelete::DeleteMarker))
.collect::<Vec<_>>();
versions_deletes.sort_by_key(|vd| (vd.key(), vd.last_modified()));
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
for vd in versions_deletes {
let last_modified = vd.last_modified();
let version_id = vd.version_id();
let key = vd.key();
let (Some(last_modified), Some(version_id), Some(key)) =
(last_modified, version_id, key)
else {
anyhow::bail!(
"One (or more) of last_modified, key, and id is None. \
Is versioning enabled in the bucket? last_modified={:?} key={:?} version_id={:?}",
last_modified, key, version_id,
);
};
if version_id == "null" {
anyhow::bail!("Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values");
}
tracing::trace!(
"Parsing version key={key} version_id={version_id} is_delete={}",
matches!(vd, VerOrDelete::DeleteMarker(_))
);
vds_for_key
.entry(key)
.or_default()
.push((vd, last_modified, version_id));
}
for (key, versions) in vds_for_key {
let (last_vd, last_last_modified, _version_id) = versions.last().unwrap();
if last_last_modified > &&done_if_after {
tracing::trace!("Key {key} has version later than done_if_after, skipping");
continue;
}
// the version we want to restore to.
let version_to_restore_to =
match versions.binary_search_by_key(&timestamp, |tpl| *tpl.1) {
Ok(v) => v,
Err(e) => e,
};
if version_to_restore_to == versions.len() {
tracing::trace!("Key {key} has no changes since timestamp, skipping");
continue;
}
let mut do_delete = false;
if version_to_restore_to == 0 {
// All versions more recent, so the key didn't exist at the specified time point.
tracing::trace!(
"All {} versions more recent for {key}, deleting",
versions.len()
);
do_delete = true;
} else {
match &versions[version_to_restore_to - 1] {
(VerOrDelete::Version(_), _last_modified, version_id) => {
tracing::trace!("Copying old version {version_id} for {key}...");
// Restore the state to the last version by copying
let source_id =
format!("{}/{key}?versionId={version_id}", self.bucket_name);
backoff::retry(
|| async {
Ok(self
.client
.copy_object()
.bucket(self.bucket_name.clone())
.key(key)
.copy_source(&source_id)
.send()
.await?)
},
is_permanent,
warn_threshold,
max_retries,
"listing object versions for time_travel_recover",
backoff::Cancel::new(cancel.clone(), || anyhow!("Cancelled")),
)
.await?;
}
(VerOrDelete::DeleteMarker(_), _last_modified, _version_id) => {
do_delete = true;
}
}
};
if do_delete {
if matches!(last_vd, VerOrDelete::DeleteMarker(_)) {
// Key has since been deleted (but there was some history), no need to do anything
tracing::trace!("Key {key} already deleted, skipping.");
} else {
tracing::trace!("Deleting {key}...");
let oid = ObjectIdentifier::builder().key(key.to_owned()).build()?;
self.delete_oids(kind, &[oid]).await?;
}
}
}
Ok(())
}
}
/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`].
@@ -650,6 +811,32 @@ fn start_measuring_requests(
})
}
enum VerOrDelete<'a> {
Version(&'a ObjectVersion),
DeleteMarker(&'a DeleteMarkerEntry),
}
impl<'a> VerOrDelete<'a> {
fn last_modified(&self) -> Option<&'a DateTime> {
match self {
VerOrDelete::Version(v) => v.last_modified(),
VerOrDelete::DeleteMarker(v) => v.last_modified(),
}
}
fn version_id(&self) -> Option<&'a str> {
match self {
VerOrDelete::Version(v) => v.version_id(),
VerOrDelete::DeleteMarker(v) => v.version_id(),
}
}
fn key(&self) -> Option<&'a str> {
match self {
VerOrDelete::Version(v) => v.key(),
VerOrDelete::DeleteMarker(v) => v.key(),
}
}
}
#[cfg(test)]
mod tests {
use camino::Utf8Path;

View File

@@ -12,6 +12,7 @@ pub(crate) enum RequestKind {
Delete = 2,
List = 3,
Copy = 4,
TimeTravel = 5,
}
use RequestKind::*;
@@ -24,6 +25,7 @@ impl RequestKind {
Delete => "delete_object",
List => "list_objects",
Copy => "copy_object",
TimeTravel => "time_travel_recover",
}
}
const fn as_index(&self) -> usize {
@@ -31,7 +33,7 @@ impl RequestKind {
}
}
pub(super) struct RequestTyped<C>([C; 5]);
pub(super) struct RequestTyped<C>([C; 6]);
impl<C> RequestTyped<C> {
pub(super) fn get(&self, kind: RequestKind) -> &C {
@@ -40,8 +42,8 @@ impl<C> RequestTyped<C> {
fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
use RequestKind::*;
let mut it = [Get, Put, Delete, List, Copy].into_iter();
let arr = std::array::from_fn::<C, 5, _>(|index| {
let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
let arr = std::array::from_fn::<C, 6, _>(|index| {
let next = it.next().unwrap();
assert_eq!(index, next.as_index());
f(next)

View File

@@ -5,7 +5,9 @@ use bytes::Bytes;
use futures::stream::Stream;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::SystemTime;
use std::{collections::hash_map::Entry, sync::Arc};
use tokio_util::sync::CancellationToken;
use crate::{
Download, DownloadError, GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorage,
@@ -30,6 +32,7 @@ enum RemoteOp {
Download(RemotePath),
Delete(RemotePath),
DeleteObjects(Vec<RemotePath>),
TimeTravelRecover(Option<RemotePath>),
}
impl UnreliableWrapper {
@@ -181,4 +184,17 @@ impl RemoteStorage for UnreliableWrapper {
self.attempt(RemoteOp::Upload(to.clone()))?;
self.inner.copy_object(from, to).await
}
async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: CancellationToken,
) -> anyhow::Result<()> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
}

View File

@@ -1,15 +1,19 @@
use std::collections::HashSet;
use std::env;
use std::num::NonZeroUsize;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, UNIX_EPOCH};
use std::{collections::HashSet, time::SystemTime};
use crate::common::{download_to_vec, upload_stream};
use anyhow::Context;
use camino::Utf8Path;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
};
use test_context::test_context;
use test_context::AsyncTestContext;
use tokio_util::sync::CancellationToken;
use tracing::info;
mod common;
@@ -23,6 +27,121 @@ const ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_S3_REMOTE_
const BASE_PREFIX: &str = "test";
#[test_context(MaybeEnabledStorage)]
#[tokio::test]
async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow::Result<()> {
let ctx = match ctx {
MaybeEnabledStorage::Enabled(ctx) => ctx,
MaybeEnabledStorage::Disabled => return Ok(()),
};
// Our test depends on discrepancies in the clock between S3 and the environment the tests
// run in. Therefore, wait a little bit before and after. The alternative would be
// to take the time from S3 response headers.
const WAIT_TIME: Duration = Duration::from_millis(3_000);
async fn time_point() -> SystemTime {
tokio::time::sleep(WAIT_TIME).await;
let ret = SystemTime::now();
tokio::time::sleep(WAIT_TIME).await;
ret
}
async fn list_files(client: &Arc<GenericRemoteStorage>) -> anyhow::Result<HashSet<RemotePath>> {
Ok(client
.list_files(None)
.await
.context("list root files failure")?
.into_iter()
.collect::<HashSet<_>>())
}
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None).await?;
let t0_files = list_files(&ctx.client).await?;
let t0 = time_point().await;
println!("at t0: {t0_files:?}");
let old_data = "remote blob data2";
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
let t1_files = list_files(&ctx.client).await?;
let t1 = time_point().await;
println!("at t1: {t1_files:?}");
// A little check to ensure that our clock is not too far off from the S3 clock
{
let dl = ctx.client.download(&path2).await?;
let last_modified = dl.last_modified.unwrap();
let half_wt = WAIT_TIME.mul_f32(0.5);
let t0_hwt = t0 + half_wt;
let t1_hwt = t1 - half_wt;
if !(t0_hwt..=t1_hwt).contains(&last_modified) {
panic!("last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
This likely means a large lock discrepancy between S3 and the local clock.");
}
}
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None).await?;
let new_data = "new remote blob data2";
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None).await?;
ctx.client.delete(&path1).await?;
let t2_files = list_files(&ctx.client).await?;
let t2 = time_point().await;
println!("at t2: {t2_files:?}");
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, CancellationToken::new())
.await?;
let t2_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
assert_eq!(t2_files, t2_files_recovered);
let path2_recovered_t2 = download_to_vec(ctx.client.download(&path2).await?).await?;
assert_eq!(path2_recovered_t2, new_data.as_bytes());
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, CancellationToken::new())
.await?;
let t1_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
assert_eq!(t1_files, t1_files_recovered);
let path2_recovered_t1 = download_to_vec(ctx.client.download(&path2).await?).await?;
assert_eq!(path2_recovered_t1, old_data.as_bytes());
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, CancellationToken::new())
.await?;
let t0_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t0: {t0_files_recovered:?}");
assert_eq!(t0_files, t0_files_recovered);
// cleanup
ctx.client.delete_objects(&[path1, path2, path3]).await?;
Ok(())
}
struct EnabledS3 {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,

View File

@@ -61,6 +61,7 @@ sync_wrapper.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-epoll-uring.workspace = true
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-stream.workspace = true

View File

@@ -18,7 +18,7 @@ use pageserver::tenant::block_io::FileBlockReader;
use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection};
use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE};
use pageserver::tenant::storage_layer::range_overlaps;
use pageserver::virtual_file::VirtualFile;
use pageserver::virtual_file::{self, VirtualFile};
use utils::{bin_ser::BeSer, lsn::Lsn};
@@ -142,7 +142,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let mut total_delta_layers = 0usize;

View File

@@ -59,7 +59,7 @@ pub(crate) enum LayerCmd {
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
virtual_file::init(10);
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
@@ -187,7 +187,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);

View File

@@ -123,7 +123,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
virtual_file::init(10, virtual_file::IoEngineKind::StdFs);
page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
dump_layerfile_from_path(path, true, &ctx).await

View File

@@ -130,7 +130,7 @@ fn main() -> anyhow::Result<()> {
let scenario = failpoint_support::init();
// Basic initialization of things that don't change after startup
virtual_file::init(conf.max_file_descriptors);
virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine);
page_cache::init(conf.page_cache_size);
start_pageserver(launch_ts, conf).context("Failed to start pageserver")?;

View File

@@ -36,6 +36,7 @@ use crate::tenant::config::TenantConfOpt;
use crate::tenant::{
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
};
use crate::virtual_file;
use crate::{
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
@@ -43,6 +44,8 @@ use crate::{
use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
use self::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE;
pub mod defaults {
use crate::tenant::config::defaults::*;
use const_format::formatcp;
@@ -79,6 +82,8 @@ pub mod defaults {
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
///
/// Default built-in configuration file.
///
@@ -114,6 +119,8 @@ pub mod defaults {
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
#virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}'
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -247,6 +254,8 @@ pub struct PageServerConf {
/// Maximum number of WAL records to be ingested and committed at the same time
pub ingest_batch_size: u64,
pub virtual_file_io_engine: virtual_file::IoEngineKind,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -331,6 +340,8 @@ struct PageServerConfigBuilder {
secondary_download_concurrency: BuilderValue<usize>,
ingest_batch_size: BuilderValue<u64>,
virtual_file_io_engine: BuilderValue<virtual_file::IoEngineKind>,
}
impl Default for PageServerConfigBuilder {
@@ -406,6 +417,8 @@ impl Default for PageServerConfigBuilder {
secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()),
}
}
}
@@ -562,6 +575,10 @@ impl PageServerConfigBuilder {
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
}
pub fn virtual_file_io_engine(&mut self, value: virtual_file::IoEngineKind) {
self.virtual_file_io_engine = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_warmup = self
.concurrent_tenant_warmup
@@ -669,6 +686,9 @@ impl PageServerConfigBuilder {
ingest_batch_size: self
.ingest_batch_size
.ok_or(anyhow!("missing ingest_batch_size"))?,
virtual_file_io_engine: self
.virtual_file_io_engine
.ok_or(anyhow!("missing virtual_file_io_engine"))?,
})
}
}
@@ -920,6 +940,9 @@ impl PageServerConf {
builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
},
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
"virtual_file_io_engine" => {
builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -993,6 +1016,7 @@ impl PageServerConf {
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
}
}
}
@@ -1225,6 +1249,7 @@ background_task_maximum_delay = '334 s'
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
},
"Correct defaults should be used when no config values are provided"
);
@@ -1288,6 +1313,7 @@ background_task_maximum_delay = '334 s'
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
ingest_batch_size: 100,
virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -1,3 +1,4 @@
#![recursion_limit = "300"]
#![deny(clippy::undocumented_unsafe_blocks)]
mod auth;

View File

@@ -932,6 +932,7 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
#[cfg(not(test))]
pub(crate) mod virtual_file_descriptor_cache {
use super::*;
@@ -951,6 +952,20 @@ pub(crate) mod virtual_file_descriptor_cache {
// ```
}
#[cfg(not(test))]
pub(crate) mod virtual_file_io_engine {
use super::*;
pub(crate) static KIND: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_virtual_file_io_engine_kind",
"The configured io engine for VirtualFile",
&["kind"],
)
.unwrap()
});
}
#[derive(Debug)]
struct GlobalAndPerTimelineHistogram {
global: Histogram,

View File

@@ -5,10 +5,10 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -39,6 +39,8 @@ pub enum BlockLease<'a> {
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
Vec(Vec<u8>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
@@ -63,6 +65,10 @@ impl<'a> Deref for BlockLease<'a> {
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
BlockLease::Vec(v) => {
TryFrom::try_from(&v[..]).expect("caller must ensure that v has PAGE_SZ")
}
}
}
}
@@ -169,10 +175,14 @@ impl FileBlockReader {
}
/// Read a page from the underlying file into given buffer.
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
@@ -196,9 +206,9 @@ impl FileBlockReader {
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}

View File

@@ -5,11 +5,11 @@ use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
@@ -47,7 +47,10 @@ impl EphemeralFile {
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
virtual_file::OpenOptions::new()
.read(true)
.write(true)
.create(true),
)
.await?;
@@ -89,11 +92,10 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));

View File

@@ -36,7 +36,7 @@ use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, Fi
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
@@ -649,7 +649,7 @@ impl DeltaLayer {
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
virtual_file::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;

View File

@@ -34,7 +34,7 @@ use crate::tenant::storage_layer::{
LayerAccessStats, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
@@ -327,7 +327,7 @@ impl ImageLayer {
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
virtual_file::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
@@ -492,11 +492,15 @@ impl ImageLayerWriterInner {
},
);
info!("new image layer {path}");
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)
.await?;
let mut file = {
VirtualFile::open_with_options(
&path,
virtual_file::OpenOptions::new()
.write(true)
.create_new(true),
)
.await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);

View File

@@ -9,6 +9,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::TENANT_TASK_EVENTS;
use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -181,8 +182,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
wait_duration
} else {
@@ -210,6 +214,58 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
fn log_compaction_error(
e: &CompactionError,
error_run_count: u32,
sleep_duration: &std::time::Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
enum LooksLike {
Info,
Error,
}
let decision = match e {
ShuttingDown => None,
_ if task_cancelled => Some(LooksLike::Info),
Other(e) => {
let root_cause = e.root_cause();
let is_stopping = {
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
upload_queue || timeline
};
if is_stopping {
Some(LooksLike::Info)
} else {
Some(LooksLike::Error)
}
}
};
match decision {
Some(LooksLike::Info) => info!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
),
Some(LooksLike::Error) => error!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
),
None => {}
}
}
///
/// GC task's main loop
///

View File

@@ -392,8 +392,7 @@ pub(crate) enum PageReconstructError {
#[error("Ancestor LSN wait error: {0}")]
AncestorLsnTimeout(#[from] WaitLsnError),
/// The operation was cancelled
#[error("Cancelled")]
#[error("timeline shutting down")]
Cancelled,
/// The ancestor of this is being stopped
@@ -405,6 +404,19 @@ pub(crate) enum PageReconstructError {
WalRedo(anyhow::Error),
}
impl PageReconstructError {
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
pub(crate) fn is_stopping(&self) -> bool {
use PageReconstructError::*;
match self {
Other(_) => false,
AncestorLsnTimeout(_) => false,
Cancelled | AncestorStopping(_) => true,
WalRedo(_) => false,
}
}
}
#[derive(thiserror::Error, Debug)]
enum CreateImageLayersError {
#[error("timeline shutting down")]

View File

@@ -126,6 +126,27 @@ pub(super) struct UploadQueueStopped {
pub(super) deleted_at: SetDeletedFlagProgress,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotInitialized {
#[error("queue is in state Uninitialized")]
Uninitialized,
#[error("queue is in state Stopping")]
Stopped,
#[error("queue is shutting down")]
ShuttingDown,
}
impl NotInitialized {
pub(crate) fn is_stopping(&self) -> bool {
use NotInitialized::*;
match self {
Uninitialized => false,
Stopped => true,
ShuttingDown => true,
}
}
}
impl UploadQueue {
pub(crate) fn initialize_empty_remote(
&mut self,
@@ -214,17 +235,17 @@ impl UploadQueue {
}
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
use UploadQueue::*;
match self {
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(x) => {
if !x.shutting_down {
Ok(x)
Uninitialized => Err(NotInitialized::Uninitialized.into()),
Initialized(x) => {
if x.shutting_down {
Err(NotInitialized::ShuttingDown.into())
} else {
anyhow::bail!("queue is shutting down")
Ok(x)
}
}
Stopped(_) => Err(NotInitialized::Stopped.into()),
}
}

View File

@@ -11,18 +11,28 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::page_cache::PageWriteGuard;
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File, OpenOptions};
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::IoBufMut;
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
mod io_engine;
mod open_options;
pub use io_engine::IoEngineKind;
pub(crate) use open_options::*;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
/// the underlying file is closed if the system is low on file descriptors,
@@ -106,7 +116,38 @@ struct SlotInner {
tag: u64,
/// the underlying file
file: Option<File>,
file: Option<OwnedFd>,
}
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
struct PageWriteGuardBuf {
page: PageWriteGuard<'static>,
init_up_to: usize,
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.page.len()
}
}
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
// hence it's safe to hand out the `stable_mut_ptr()`.
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.page.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}
impl OpenFiles {
@@ -274,6 +315,10 @@ macro_rules! with_file {
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
let mut $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
@@ -326,7 +371,9 @@ impl VirtualFile {
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
// where our caller doesn't get to use the returned VirtualFile before its
// slot gets re-used by someone else.
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
let file = observe_duration!(StorageIoOperation::Open, {
open_options.open(path.as_std_path()).await?
});
// Strip all options other than read and write.
//
@@ -395,15 +442,13 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
.with_std_file(|std_file| std_file.sync_all()))
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
.with_std_file(|std_file| std_file.metadata()))
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
@@ -412,7 +457,7 @@ impl VirtualFile {
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
async fn lock_file(&self) -> Result<FileGuard, Error> {
let open_files = get_open_files();
let mut handle_guard = {
@@ -458,10 +503,9 @@ impl VirtualFile {
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
// case from StorageIoOperation::Open. This helps with identifying thrashing
// of the virtual file descriptor cache.
let file = observe_duration!(
StorageIoOperation::OpenAfterReplace,
self.open_options.open(&self.path)
)?;
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
self.open_options.open(self.path.as_std_path()).await?
});
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -486,9 +530,8 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -507,25 +550,28 @@ impl VirtualFile {
Ok(self.pos)
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
where
B: IoBufMut + Send,
{
let (buf, res) =
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
res.map(|()| buf)
}
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf {
page,
init_up_to: 0,
};
let res = self.read_exact_at(buf, offset).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
@@ -575,22 +621,35 @@ impl VirtualFile {
Ok(n)
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.shard_id, &self.timeline_id])
.add(size as i64);
}
result
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
observe_duration!(StorageIoOperation::Read, {
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
if let Ok(size) = res {
STORAGE_IO_SIZE
.with_label_values(&[
"read",
&self.tenant_id,
&self.shard_id,
&self.timeline_id,
])
.add(size as i64);
}
(buf, res)
})
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
});
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id])
@@ -600,18 +659,241 @@ impl VirtualFile {
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at_impl<B, F, Fut>(
buf: B,
mut offset: u64,
mut read_at: F,
) -> (B, std::io::Result<()>)
where
B: IoBufMut + Send,
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
while buf.bytes_total() != 0 {
let res;
(buf, res) = read_at(buf, offset).await;
match res {
Ok(0) => break,
Ok(n) => {
buf = buf.slice(n..);
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (buf.into_inner(), Err(e)),
}
}
// NB: don't use `buf.is_empty()` here; it is from the
// `impl Deref for Slice { Target = [u8] }`; the the &[u8]
// returned by it only covers the initialized portion of `buf`.
// Whereas we're interested in ensuring that we filled the entire
// buffer that the user passed in.
if buf.bytes_total() != 0 {
(
buf.into_inner(),
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)),
)
} else {
assert_eq!(buf.len(), buf.bytes_total());
(buf.into_inner(), Ok(()))
}
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
#[cfg(test)]
mod test_read_exact_at_impl {
use std::{collections::VecDeque, sync::Arc};
use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
use super::read_exact_at_impl;
struct Expectation {
offset: u64,
bytes_total: usize,
result: std::io::Result<Vec<u8>>,
}
struct MockReadAt {
expectations: VecDeque<Expectation>,
}
impl MockReadAt {
async fn read_at(
&mut self,
mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
offset: u64,
) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
let exp = self
.expectations
.pop_front()
.expect("read_at called but we have no expectations left");
assert_eq!(exp.offset, offset);
assert_eq!(exp.bytes_total, buf.bytes_total());
match exp.result {
Ok(bytes) => {
assert!(bytes.len() <= buf.bytes_total());
buf.put_slice(&bytes);
(buf, Ok(bytes.len()))
}
Err(e) => (buf, Err(e)),
}
}
}
impl Drop for MockReadAt {
fn drop(&mut self) {
assert_eq!(self.expectations.len(), 0);
}
}
#[tokio::test]
async fn test_basic() {
let buf = Vec::with_capacity(5);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![Expectation {
offset: 0,
bytes_total: 5,
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
}]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
}
#[tokio::test]
async fn test_empty_buf_issues_no_syscall() {
let buf = Vec::new();
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::new(),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
}
#[tokio::test]
async fn test_two_read_at_calls_needed_until_buf_filled() {
let buf = Vec::with_capacity(4);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![
Expectation {
offset: 0,
bytes_total: 4,
result: Ok(vec![b'a', b'b']),
},
Expectation {
offset: 2,
bytes_total: 2,
result: Ok(vec![b'c', b'd']),
},
]),
}));
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
assert!(res.is_ok());
assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
}
#[tokio::test]
async fn test_eof_before_buffer_full() {
let buf = Vec::with_capacity(3);
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
expectations: VecDeque::from(vec![
Expectation {
offset: 0,
bytes_total: 3,
result: Ok(vec![b'a']),
},
Expectation {
offset: 1,
bytes_total: 2,
result: Ok(vec![b'b']),
},
Expectation {
offset: 2,
bytes_total: 1,
result: Ok(vec![]),
},
]),
}));
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
let mock_read_at = Arc::clone(&mock_read_at);
async move { mock_read_at.lock().await.read_at(buf, offset).await }
})
.await;
let Err(err) = res else {
panic!("should return an error");
};
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
assert_eq!(format!("{err}"), "failed to fill whole buffer");
// buffer contents on error are unspecified
}
}
struct FileGuard {
slot_guard: RwLockReadGuard<'static, SlotInner>,
}
impl AsRef<OwnedFd> for FileGuard {
fn as_ref(&self) -> &OwnedFd {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
impl FileGuard {
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
fn with_std_file<F, R>(&self, with: F) -> R
where
F: FnOnce(&File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&file);
let _ = file.into_raw_fd();
res
}
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&mut file);
let _ = file.into_raw_fd();
res
}
}
impl tokio_epoll_uring::IoFd for FileGuard {
unsafe fn as_fd(&self) -> RawFd {
let owned_fd: &OwnedFd = self.as_ref();
owned_fd.as_raw_fd()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -619,16 +901,19 @@ impl VirtualFile {
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
.await?;
Ok(std::sync::Arc::new(buf).into())
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let mut tmp = [0; 128];
match self.read_at(&mut tmp, self.pos).await {
let res;
(tmp, res) = self.read_at(tmp, self.pos).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
@@ -704,10 +989,12 @@ impl OpenFiles {
/// Initialize the virtual file module. This must be called once at page
/// server startup.
///
pub fn init(num_slots: usize) {
#[cfg(not(test))]
pub fn init(num_slots: usize, engine: IoEngineKind) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
io_engine::init(engine);
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
@@ -752,10 +1039,10 @@ mod tests {
}
impl MaybeVirtualFile {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
@@ -797,14 +1084,14 @@ mod tests {
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let mut buf = vec![0; len];
self.read_exact_at(&mut buf, pos).await?;
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos).await?;
Ok(String::from_utf8(buf).unwrap())
}
}
#[tokio::test]
async fn test_virtual_files() -> Result<(), Error> {
async fn test_virtual_files() -> anyhow::Result<()> {
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
@@ -820,14 +1107,17 @@ mod tests {
}
#[tokio::test]
async fn test_physical_files() -> Result<(), Error> {
async fn test_physical_files() -> anyhow::Result<()> {
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File(open_options.open(path)?))
Ok(MaybeVirtualFile::File({
let owned_fd = open_options.open(path.as_std_path()).await?;
File::from(owned_fd)
}))
})
.await
}
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
where
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
@@ -971,11 +1261,11 @@ mod tests {
for _threadno in 0..THREADS {
let files = files.clone();
let hdl = rt.spawn(async move {
let mut buf = [0u8; SIZE];
let mut buf = vec![0u8; SIZE];
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
f.read_exact_at(&mut buf, 0).await.unwrap();
buf = f.read_exact_at(buf, 0).await.unwrap();
assert!(buf == SAMPLE);
}
});

View File

@@ -0,0 +1,114 @@
//! [`super::VirtualFile`] supports different IO engines.
//!
//! The [`IoEngineKind`] enum identifies them.
//!
//! The choice of IO engine is global.
//! Initialize using [`init`].
//!
//! Then use [`get`] and [`super::OpenOptions`].
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Hash,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
pub enum IoEngineKind {
StdFs,
#[cfg(target_os = "linux")]
TokioEpollUring,
}
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
#[cfg(not(test))]
pub(super) fn init(engine: IoEngineKind) {
if IO_ENGINE.set(engine).is_err() {
panic!("called twice");
}
crate::metrics::virtual_file_io_engine::KIND
.with_label_values(&[&format!("{engine}")])
.set(1);
}
pub(super) fn get() -> &'static IoEngineKind {
#[cfg(test)]
{
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
panic!("invalid VirtualFile io engine for env var {env_var_name}: {e:#}: {v:?}")
}
},
Err(std::env::VarError::NotPresent) => {
crate::config::defaults::DEFAULT_VIRTUAL_FILE_IO_ENGINE
.parse()
.unwrap()
}
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
})
}
#[cfg(not(test))]
IO_ENGINE.get().unwrap()
}
use std::os::unix::prelude::FileExt;
use super::FileGuard;
impl IoEngineKind {
pub(super) async fn read_at<B>(
&self,
file_guard: FileGuard,
offset: u64,
mut buf: B,
) -> ((FileGuard, B), std::io::Result<usize>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
match self {
IoEngineKind::StdFs => {
// SAFETY: `dst` only lives at most as long as this match arm, during which buf remains valid memory.
let dst = unsafe {
std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total())
};
let res = file_guard.with_std_file(|std_file| std_file.read_at(dst, offset));
if let Ok(nbytes) = &res {
assert!(*nbytes <= buf.bytes_total());
// SAFETY: see above assertion
unsafe {
buf.set_init(*nbytes);
}
}
#[allow(dropping_references)]
drop(dst);
((file_guard, buf), res)
}
#[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => {
let system = tokio_epoll_uring::thread_local_system().await;
let (resources, res) = system.read(file_guard, offset, buf).await;
(
resources,
res.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
}),
)
}
}
}
}

View File

@@ -0,0 +1,138 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use super::IoEngineKind;
use std::{os::fd::OwnedFd, path::Path};
#[derive(Debug, Clone)]
pub enum OpenOptions {
StdFs(std::fs::OpenOptions),
#[cfg(target_os = "linux")]
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
}
impl Default for OpenOptions {
fn default() -> Self {
match super::io_engine::get() {
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
#[cfg(target_os = "linux")]
IoEngineKind::TokioEpollUring => {
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
}
}
}
}
impl OpenOptions {
pub fn new() -> OpenOptions {
Self::default()
}
pub fn read(&mut self, read: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.read(read);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.read(read);
}
}
self
}
pub fn write(&mut self, write: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.write(write);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.write(write);
}
}
self
}
pub fn create(&mut self, create: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.create(create);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.create(create);
}
}
self
}
pub fn create_new(&mut self, create_new: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.create_new(create_new);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.create_new(create_new);
}
}
self
}
pub fn truncate(&mut self, truncate: bool) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.truncate(truncate);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.truncate(truncate);
}
}
self
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let system = tokio_epoll_uring::thread_local_system().await;
system.open(path, x).await.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})
}
}
}
}
impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.mode(mode);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.mode(mode);
}
}
self
}
fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(flags);
}
#[cfg(target_os = "linux")]
OpenOptions::TokioEpollUring(x) => {
let _ = x.custom_flags(flags);
}
}
self
}
}

View File

@@ -3,6 +3,7 @@
import argparse
import json
import logging
import os
from collections import defaultdict
from typing import DefaultDict, Dict
@@ -45,6 +46,15 @@ def main(args: argparse.Namespace):
logging.error("cannot fetch flaky tests from the DB due to an error", exc)
rows = []
# If a test run has non-default PAGESERVER_VIRTUAL_FILE_IO_ENGINE (i.e. not empty, not std-fs),
# use it to parametrize test name along with build_type and pg_version
#
# See test_runner/fixtures/parametrize.py for details
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
pageserver_virtual_file_io_engine_parameter = f"-{io_engine}"
else:
pageserver_virtual_file_io_engine_parameter = ""
for row in rows:
# We don't want to automatically rerun tests in a performance suite
if row["parent_suite"] != "test_runner.regress":
@@ -53,10 +63,10 @@ def main(args: argparse.Namespace):
if row["name"].endswith("]"):
parametrized_test = row["name"].replace(
"[",
f"[{build_type}-pg{pg_version}-",
f"[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}-",
)
else:
parametrized_test = f"{row['name']}[{build_type}-pg{pg_version}]"
parametrized_test = f"{row['name']}[{build_type}-pg{pg_version}{pageserver_virtual_file_io_engine_parameter}]"
res[row["parent_suite"]][row["suite"]][parametrized_test] = True

View File

@@ -2,11 +2,13 @@ from __future__ import annotations
import abc
import asyncio
import errno
import filecmp
import json
import os
import re
import shutil
import signal
import subprocess
import tempfile
import textwrap
@@ -173,6 +175,35 @@ def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Ite
yield versioned_dir
@pytest.fixture(scope="function")
def test_cgroup_dir(request: FixtureRequest) -> Optional[Path]:
## Use like so:
# systemd-run --user --shell
# cat /proc/self/cgroup
# # will look like so: 0::/user.slice/user-1000.slice/user@1000.service/app.slice/run-u5.service
# env \
# NEON_TEST_SUITE_USE_CGROUPS=/sys/fs/cgroup/user.slice/user-1000.slice/user@1000.service/app.slice/run-u5.service \
# BUILD_TYPE=debug \
# DEFAULT_PG_VERSION=15 \
# ./scripts/pytest
var = os.getenv("NEON_TEST_SUITE_USE_CGROUPS")
if var is None:
return None
root = Path(var)
assert root.is_dir()
assert (
"cgroup2fs"
== subprocess.check_output(
["stat", "--file-system", "--format=%T", root],
text=True,
).strip()
)
test_cgroup_dir: Path = get_test_cgroup_dir(request, root)
test_cgroup_dir.mkdir(exist_ok=False, parents=False)
return test_cgroup_dir
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
@@ -446,6 +477,8 @@ class NeonEnvBuilder:
preserve_database_files: bool = False,
initial_tenant: Optional[TenantId] = None,
initial_timeline: Optional[TimelineId] = None,
pageserver_virtual_file_io_engine: Optional[str] = None,
test_cgroup_dir: Optional[Path] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -481,6 +514,9 @@ class NeonEnvBuilder:
self.config_init_force: Optional[str] = None
self.top_output_dir = top_output_dir
self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine
self.test_cgroup_dir = test_cgroup_dir
assert test_name.startswith(
"test_"
), "Unexpectedly instantiated from outside a test function"
@@ -873,6 +909,15 @@ class NeonEnvBuilder:
x.do_cleanup()
def __enter__(self) -> "NeonEnvBuilder":
if self.test_cgroup_dir is not None:
log.info(f"putting test runner into cgroup {self.test_cgroup_dir}")
procs_file = self.test_cgroup_dir / "cgroup.procs"
# TODO: auto-cleanup the cgroup, share code with __exit__
pids = procs_file.read_text().split()
assert pids == []
mypid = os.getpid()
with open(procs_file, "a") as f:
f.write(f" {mypid}\n")
return self
def __exit__(
@@ -926,6 +971,57 @@ class NeonEnvBuilder:
if cleanup_error is not None:
cleanup_error = e
if self.test_cgroup_dir is not None:
# TODO: ensure that we're the only python thread running;
# otherwise, if a test leaks a thread,
# our checking here would race with that other thread.
# => https://github.com/neondatabase/neon/issues/6486
log.info(f"check test runner cgroup for leaked processes: {self.test_cgroup_dir}")
procs_file = self.test_cgroup_dir / "cgroup.procs"
mypid = os.getpid()
# move ourselves out of cgroup
with open(self.test_cgroup_dir.parent / "cgroup.procs", "a") as f:
f.write(f"{mypid}")
# now freeze the test cgroup, so we can race-free list & SIGKILL the processes
freeze_file = self.test_cgroup_dir / "cgroup.freeze"
freeze_file.write_text("1")
# inspect remaining pids
pids = [int(pid) for pid in procs_file.read_text().split()]
had_leaked_pids = len(pids) > 0
for pid in pids:
# dump info
try:
args_file = Path(f"/proc/{pid}/cmdline").read_bytes()
args_parsed = [
bytes.decode("utf-8") for bytes in args_file.split(b"\x00")
] # NULL-byte separated
args = f"{args_parsed}"
except Exception as e:
args = f"cmdline unavailable, {type(e)}, {e}"
log.warning(f"SIGKILLing leaked process: {pid}: {args}")
# send SIGKILL to the frozen process
os.kill(pid, signal.SIGKILL)
# thaw the cgroup
freeze_file.write_text("0")
# wait for processes to exit
while True:
try:
self.test_cgroup_dir.rmdir()
break
except OSError as e:
if e.errno != errno.EBUSY:
raise
pids = [int(pid) for pid in procs_file.read_text().split()]
if len(pids) == 0:
break
log.warning(f"waiting for pids to exit: {pids}")
time.sleep(0.1)
log.info("all pids exited and test cgroup has been deleted")
cleanup_cmd = (
f"for pid in $(cat '{procs_file}'); do; kill -9 $pid; done; rmdir {procs_file}"
)
assert not had_leaked_pids, f"had pids in test cgroup after NeonEnvBuilder teardown, see logs for details & cleanup help\ncleanup using bash command: {cleanup_cmd}\n"
class NeonEnv:
"""
@@ -995,6 +1091,8 @@ class NeonEnv:
self, config.auth_enabled
)
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
# Create a config file corresponding to the options
cfg: Dict[str, Any] = {
"default_tenant_id": str(self.initial_tenant),
@@ -1026,6 +1124,9 @@ class NeonEnv:
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(
@@ -1191,6 +1292,8 @@ def _shared_simple_env(
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
pageserver_virtual_file_io_engine: str,
test_cgroup_dir: Optional[Path],
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
@@ -1220,6 +1323,8 @@ def _shared_simple_env(
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
test_name=request.node.name,
test_output_dir=test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
test_cgroup_dir=test_cgroup_dir,
) as builder:
env = builder.init_start()
@@ -1258,6 +1363,8 @@ def neon_env_builder(
request: FixtureRequest,
test_overlay_dir: Path,
top_output_dir: Path,
pageserver_virtual_file_io_engine: str,
test_cgroup_dir: Optional[Path],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1287,9 +1394,11 @@ def neon_env_builder(
broker=default_broker,
run_id=run_id,
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
test_name=request.node.name,
test_output_dir=test_output_dir,
test_overlay_dir=test_overlay_dir,
test_cgroup_dir=test_cgroup_dir,
) as builder:
yield builder
@@ -3624,6 +3733,10 @@ def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
return get_test_output_dir(request, top_output_dir) / "repo"
def get_test_cgroup_dir(request: FixtureRequest, top_cgroup_dir: Path) -> Path:
return _get_test_dir(request, top_cgroup_dir, "")
def pytest_addoption(parser: Parser):
parser.addoption(
"--preserve-database-files",

View File

@@ -8,7 +8,7 @@ from _pytest.python import Metafunc
from fixtures.pg_version import PgVersion
"""
Dynamically parametrize tests by Postgres version and build type (debug/release/remote)
Dynamically parametrize tests by Postgres version, build type (debug/release/remote), and possibly by other parameters
"""
@@ -31,11 +31,12 @@ def build_type(request: FixtureRequest) -> Optional[str]:
return None
def pytest_generate_tests(metafunc: Metafunc):
# Do not parametrize performance tests yet, we need to prepare grafana charts first
if "test_runner/performance" in metafunc.definition._nodeid:
return
@pytest.fixture(scope="function", autouse=True)
def pageserver_virtual_file_io_engine(request: FixtureRequest) -> Optional[str]:
return None
def pytest_generate_tests(metafunc: Metafunc):
if (v := os.environ.get("DEFAULT_PG_VERSION")) is None:
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
else:
@@ -46,5 +47,12 @@ def pytest_generate_tests(metafunc: Metafunc):
else:
build_types = [bt.lower()]
metafunc.parametrize("build_type", build_types)
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
# Do not parametrize performance tests yet by Postgres version or build type, we need to prepare grafana charts first
if "test_runner/performance" not in metafunc.definition._nodeid:
metafunc.parametrize("build_type", build_types)
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring`
# And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep tests statistics
if (io_engine := os.environ.get("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])

View File

@@ -29,7 +29,7 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
@pytest.mark.parametrize("n_tenants", [1, 10, 100])
@pytest.mark.parametrize("n_tenants", [1, 10])
@pytest.mark.timeout(
10000
) # TODO: this value is just "a really high number"; have this per instance type

View File

@@ -24,7 +24,8 @@ from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture
def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_builder):
# fixture order matters here: neon_env_builder leaked process assertions
def test_import_from_vanilla(test_output_dir, pg_bin, neon_env_builder, vanilla_pg):
# Put data in vanilla pg
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")

View File

@@ -59,3 +59,5 @@ def test_neon_two_primary_endpoints_fail(
env.neon_cli.endpoint_stop("ep1")
# ep1 is stopped so create ep2 will succeed
env.neon_cli.endpoint_start("ep2")
# cleanup
env.neon_cli.endpoint_stop("ep2")