Compare commits

..

11 Commits

Author SHA1 Message Date
Victor Polevoy
e191daa152 Add a test for generating pprof from collapsed 2025-08-21 12:40:57 +02:00
Victor Polevoy
7572ffc725 Document the continuous profiling of computes 2025-08-21 11:53:34 +02:00
Victor Polevoy
0aacbc2583 Install bpfcc tools in the compute node docker image.
This provides the binaries and libraries required for the continuous
profiling at runtime.
2025-08-21 11:16:00 +02:00
Victor Polevoy
cb9874dc4e After switching to ext4 img, symlinking is possible 2025-08-20 16:24:07 +02:00
Victor Polevoy
891c1fe512 Fix the kernel headers enabling 2025-08-02 14:03:25 +02:00
Victor Polevoy
9cae494555 TRY TRY TRY (pink) 2025-07-31 13:02:45 +02:00
Victor Polevoy
c3e6d360b5 Clarify the reason for postgres comment 2025-07-18 12:52:57 +02:00
Victor Polevoy
9e69e24a52 Use the supplied kernel headers and modules 2025-07-18 12:52:45 +02:00
Victor Polevoy
8dbf5a8c5b Add annotation to skip on CI and macos 2025-07-15 12:30:34 +02:00
Victor Polevoy
463429af97 build-tools 2025-07-11 16:20:04 +02:00
Victor Polevoy
d8b0c0834e Implement HTTP endpoint for compute profiling.
Exposes an endpoint "/profile/cpu" for profiling the postgres
processes (currently spawned and the new ones) using "perf".

Adds the corresponding python test to test the added endpoint
and confirm the output expected is the profiling data in the
expected format.

Add "perf" binary to the sudo list.

Fix python poetry ruff

Address the clippy lints

Document the code

Format python code

Address code review

Prettify

Embed profile_pb2.py and small code/test fixes.

Make the code slightly better.

1. Makes optional the sampling_frequency parameter for profiling.
2. Avoids using unsafe code when killing a child.

Better code, better tests

More tests

Separate start and stop of profiling

Correctly check for the exceptions

Address clippy lint

Final fixes.

1. Allows the perf to be found in $PATH instead of having the path
hardcoded.
2. Changes the path to perf in the sudoers file so that the compute
can run it properly.
3. Changes the way perf is invoked, now it is with sudo and the path
from $PATH.
4. Removes the authentication requirement from the /profile/cpu/
endpoint.

hakari thing

Python fixes

Fix python formatting

More python fixes

Update poetry lock

Fix ruff

Address the review comments

Fix the tests

Try fixing the flaky test for pg17?

Try fixing the flaky test for pg17?

PYTHON

Fix the tests

Remove the PROGRESS parameter

Remove unused

Increase the timeout due to concurrency

Increase the timeout to 60

Increase the profiling window timeout

Try this

Lets see the error

Just log all the errors

Add perf into the build environment

uijdfghjdf

Update tempfile to 3.20

Snapshot

Use bbc-profile

Update tempfile to 3.20

Provide bpfcc-tools in debian

Properly respond with status

Python check

Fix build-tools dockerfile

Add path probation for the bcc profile

Try err printing

Refactor

Add bpfcc-tools to the final image

Add error context

sudo not found?

Print more errors for verbosity

Remove procfs and use libproc

Update hakari

Debug sudo in CI

Rebase and adjust hakari

remove leftover

Add archiving support

Correct the paths to the perf binary

Try hardcoded sudo path

Add sudo into build-tools dockerfile

Minor cleanup

Print out the sudoers file from github

Stop the tests earlier

Add the sudoers entry for nonroot, install kmod for modprobe for bcc-profile

Try hacking the kernel headers for bcc-profile

Redeclare the kernel version argument

Try using the kernel of the runner

Try another way

Check bpfcc-tools
2025-07-11 12:53:48 +02:00
66 changed files with 2600 additions and 2344 deletions

356
Cargo.lock generated
View File

@@ -687,13 +687,40 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"itoa",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"axum-core",
"axum-core 0.5.0",
"base64 0.22.1",
"bytes",
"form_urlencoded",
@@ -704,7 +731,7 @@ dependencies = [
"hyper 1.4.1",
"hyper-util",
"itoa",
"matchit",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
@@ -724,6 +751,26 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 1.0.1",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.0"
@@ -750,8 +797,8 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b"
dependencies = [
"axum",
"axum-core",
"axum 0.8.1",
"axum-core 0.5.0",
"bytes",
"form_urlencoded",
"futures-util",
@@ -962,6 +1009,24 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 2.8.0",
"cexpr",
"clang-sys",
"itertools 0.12.1",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.100",
]
[[package]]
name = "bindgen"
version = "0.71.1"
@@ -1321,7 +1386,7 @@ dependencies = [
"aws-sdk-kms",
"aws-sdk-s3",
"aws-smithy-types",
"axum",
"axum 0.8.1",
"axum-extra",
"base64 0.22.1",
"bytes",
@@ -1330,15 +1395,17 @@ dependencies = [
"chrono",
"clap",
"compute_api",
"fail",
"flate2",
"futures",
"hostname-validator",
"http 1.1.0",
"indexmap 2.9.0",
"inferno 0.12.0",
"itertools 0.10.5",
"jsonwebtoken",
"libproc",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"notify",
"num_cpus",
@@ -1351,6 +1418,8 @@ dependencies = [
"postgres-types",
"postgres_initdb",
"postgres_versioninfo",
"pprof 0.15.0",
"prost 0.12.6",
"regex",
"remote_storage",
"reqwest",
@@ -1362,6 +1431,7 @@ dependencies = [
"serde_with",
"signal-hook",
"tar",
"tempfile",
"thiserror 1.0.69",
"tokio",
"tokio-postgres",
@@ -2082,7 +2152,7 @@ name = "endpoint_storage"
version = "0.0.1"
dependencies = [
"anyhow",
"axum",
"axum 0.8.1",
"axum-extra",
"camino",
"camino-tempfile",
@@ -2198,12 +2268,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.8"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -2533,6 +2603,18 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "getrandom"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
]
[[package]]
name = "gettid"
version = "0.1.3"
@@ -2790,6 +2872,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hostname"
version = "0.4.0"
@@ -2891,15 +2982,15 @@ dependencies = [
"arc-swap",
"bytes",
"camino",
"fail",
"futures",
"hyper 0.14.30",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"once_cell",
"pprof",
"pprof 0.14.0",
"regex",
"routerify",
"rustls 0.23.27",
@@ -3561,7 +3652,7 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
dependencies = [
"spin",
"spin 0.9.8",
]
[[package]]
@@ -3586,6 +3677,17 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "libproc"
version = "0.14.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78a09b56be5adbcad5aa1197371688dc6bb249a26da3bca2011ee2fb987ebfb"
dependencies = [
"bindgen 0.70.1",
"errno",
"libc",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.14"
@@ -3598,6 +3700,12 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0b5399f6804fbab912acbd8878ed3532d506b7c951b8f9f164ef90fef39e3f4"
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
[[package]]
name = "litemap"
version = "0.7.4"
@@ -3651,6 +3759,12 @@ dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
@@ -3852,23 +3966,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "neon_failpoint"
version = "0.1.0"
dependencies = [
"anyhow",
"either",
"once_cell",
"parking_lot 0.12.1",
"rand 0.8.5",
"regex",
"serde",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "never-say-never"
version = "6.6.666"
@@ -4374,6 +4471,7 @@ dependencies = [
"either",
"enum-map",
"enumset",
"fail",
"futures",
"hashlink",
"hex",
@@ -4388,7 +4486,6 @@ dependencies = [
"jsonwebtoken",
"md5",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"num-traits",
"num_cpus",
@@ -4407,7 +4504,7 @@ dependencies = [
"postgres_ffi_types",
"postgres_initdb",
"posthog_client_lite",
"pprof",
"pprof 0.14.0",
"pq_proto",
"procfs",
"rand 0.8.5",
@@ -4982,7 +5079,7 @@ name = "postgres_ffi"
version = "0.1.0"
dependencies = [
"anyhow",
"bindgen",
"bindgen 0.71.1",
"bytes",
"crc32c",
"criterion",
@@ -4993,7 +5090,7 @@ dependencies = [
"postgres",
"postgres_ffi_types",
"postgres_versioninfo",
"pprof",
"pprof 0.14.0",
"regex",
"serde",
"thiserror 1.0.69",
@@ -5083,6 +5180,30 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "pprof"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
dependencies = [
"aligned-vec",
"backtrace",
"cfg-if",
"findshlibs",
"inferno 0.11.21",
"libc",
"log",
"nix 0.26.4",
"once_cell",
"protobuf",
"protobuf-codegen",
"smallvec",
"spin 0.10.0",
"symbolic-demangle",
"tempfile",
"thiserror 2.0.11",
]
[[package]]
name = "pprof_util"
version = "0.7.0"
@@ -5158,7 +5279,7 @@ dependencies = [
"hex",
"lazy_static",
"procfs-core",
"rustix",
"rustix 0.38.41",
]
[[package]]
@@ -5294,6 +5415,57 @@ dependencies = [
"prost 0.13.5",
]
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-codegen"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
dependencies = [
"anyhow",
"once_cell",
"protobuf",
"protobuf-parse",
"regex",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-parse"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
dependencies = [
"anyhow",
"indexmap 2.9.0",
"log",
"protobuf",
"protobuf-support",
"tempfile",
"thiserror 1.0.69",
"which",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "proxy"
version = "0.1.0"
@@ -5465,6 +5637,12 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "rand"
version = "0.7.3"
@@ -5853,7 +6031,7 @@ dependencies = [
"async-trait",
"getrandom 0.2.11",
"http 1.1.0",
"matchit",
"matchit 0.8.4",
"opentelemetry",
"reqwest",
"reqwest-middleware",
@@ -6044,6 +6222,19 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustix"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
dependencies = [
"bitflags 2.8.0",
"errno",
"libc",
"linux-raw-sys 0.9.4",
"windows-sys 0.59.0",
]
[[package]]
name = "rustls"
version = "0.21.12"
@@ -6208,6 +6399,7 @@ dependencies = [
"criterion",
"desim",
"env_logger",
"fail",
"futures",
"hex",
"http 1.1.0",
@@ -6217,7 +6409,6 @@ dependencies = [
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
@@ -6226,7 +6417,7 @@ dependencies = [
"postgres_backend",
"postgres_ffi",
"postgres_versioninfo",
"pprof",
"pprof 0.14.0",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -6814,6 +7005,18 @@ name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
dependencies = [
"lock_api",
]
[[package]]
name = "spin"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
dependencies = [
"lock_api",
]
[[package]]
name = "spinning_top"
@@ -6904,6 +7107,7 @@ dependencies = [
"diesel",
"diesel-async",
"diesel_migrations",
"fail",
"futures",
"governor",
"hex",
@@ -6916,7 +7120,6 @@ dependencies = [
"lasso",
"measured",
"metrics",
"neon_failpoint",
"once_cell",
"pageserver_api",
"pageserver_client",
@@ -7174,14 +7377,14 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.14.0"
version = "3.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
dependencies = [
"cfg-if",
"fastrand 2.2.0",
"getrandom 0.3.3",
"once_cell",
"rustix",
"rustix 1.0.7",
"windows-sys 0.59.0",
]
@@ -7675,16 +7878,25 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum 0.7.9",
"base64 0.22.1",
"bytes",
"h2 0.4.4",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"prost 0.13.5",
"socket2",
"tokio",
"tokio-stream",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
@@ -7697,7 +7909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [
"async-trait",
"axum",
"axum 0.8.1",
"base64 0.22.1",
"bytes",
"flate2",
@@ -7758,11 +7970,16 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -8181,7 +8398,7 @@ dependencies = [
"const_format",
"criterion",
"diatomic-waker",
"either",
"fail",
"futures",
"git-version",
"hex",
@@ -8189,13 +8406,12 @@ dependencies = [
"humantime",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"once_cell",
"pem",
"pin-project-lite",
"postgres_connection",
"pprof",
"pprof 0.14.0",
"pq_proto",
"rand 0.8.5",
"regex",
@@ -8247,7 +8463,7 @@ name = "vm_monitor"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"axum 0.8.1",
"cgroups-rs",
"clap",
"futures",
@@ -8302,7 +8518,7 @@ dependencies = [
"pageserver_api",
"postgres_ffi",
"postgres_ffi_types",
"pprof",
"pprof 0.14.0",
"prost 0.13.5",
"remote_storage",
"serde",
@@ -8332,7 +8548,7 @@ name = "walproposer"
version = "0.1.0"
dependencies = [
"anyhow",
"bindgen",
"bindgen 0.71.1",
"postgres_ffi",
"utils",
]
@@ -8359,6 +8575,15 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.14.2+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasite"
version = "0.1.0"
@@ -8488,6 +8713,18 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix 0.38.41",
]
[[package]]
name = "whoami"
version = "1.5.1"
@@ -8716,6 +8953,15 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "wit-bindgen-rt"
version = "0.39.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
dependencies = [
"bitflags 2.8.0",
]
[[package]]
name = "workspace_hack"
version = "0.1.0"
@@ -8723,17 +8969,19 @@ dependencies = [
"ahash",
"anstream",
"anyhow",
"axum",
"axum-core",
"axum 0.8.1",
"axum-core 0.5.0",
"base64 0.21.7",
"base64ct",
"bytes",
"camino",
"cc",
"chrono",
"clang-sys",
"clap",
"clap_builder",
"const-oid",
"criterion",
"crypto-bigint 0.5.5",
"der 0.7.8",
"deranged",
@@ -8760,6 +9008,7 @@ dependencies = [
"hyper 0.14.30",
"hyper 1.4.1",
"hyper-util",
"indexmap 1.9.3",
"indexmap 2.9.0",
"itertools 0.12.1",
"lazy_static",
@@ -8776,7 +9025,6 @@ dependencies = [
"num-iter",
"num-rational",
"num-traits",
"once_cell",
"p256 0.13.2",
"parquet",
"prettyplease",
@@ -8798,6 +9046,7 @@ dependencies = [
"sha2",
"signature 2.2.0",
"smallvec",
"spin 0.9.8",
"spki 0.7.3",
"stable_deref_trait",
"subtle",
@@ -8812,6 +9061,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tonic 0.12.3",
"tower 0.5.2",
"tracing",
"tracing-core",

View File

@@ -21,7 +21,6 @@ members = [
"workspace_hack",
"libs/compute_api",
"libs/http-utils",
"libs/neon_failpoint",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
@@ -98,6 +97,7 @@ diatomic-waker = { version = "0.2.3" }
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
fallible-iterator = "0.2"
framed-websockets = { version = "0.1.0", git = "https://github.com/neondatabase/framed-websockets" }
futures = "0.3"
@@ -130,6 +130,7 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
libproc = "0.14"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
@@ -258,7 +259,6 @@ desim = { version = "0.1", path = "./libs/desim" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
neon_failpoint = { version = "0.1", path = "./libs/neon_failpoint/" }
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
@@ -279,6 +279,7 @@ safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
safekeeper_client = { path = "./safekeeper/client" }
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_controller_client = { path = "./storage_controller/client" }
tempfile = "3"
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }

View File

@@ -109,6 +109,8 @@ RUN set -e \
libreadline-dev \
libseccomp-dev \
ca-certificates \
bpfcc-tools \
sudo \
openssl \
unzip \
curl \

View File

@@ -61,6 +61,9 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
libpq5 \
libpq-dev \
libzstd-dev \
linux-perf \
bpfcc-tools \
linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \
postgresql-16 \
postgresql-server-dev-16 \
postgresql-common \
@@ -105,15 +108,21 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
#
# 'gdb' is included so that we get backtraces of core dumps produced in
# regression tests
RUN set -e \
RUN set -ex \
&& KERNEL_VERSION="$(uname -r | cut -d'-' -f1 | sed 's/\.0$//')" \
&& echo KERNEL_VERSION=${KERNEL_VERSION} >> /etc/environment \
&& KERNEL_ARCH=$(uname -m | awk '{ if ($1 ~ /^(x86_64|i[3-6]86)$/) print "x86"; else if ($1 ~ /^(aarch64|arm.*)$/) print "aarch"; else print $1 }') \
&& echo KERNEL_ARCH=${KERNEL_ARCH} >> /etc/environment \
&& apt update \
&& apt install -y \
autoconf \
automake \
bc \
bison \
build-essential \
ca-certificates \
cmake \
cpio \
curl \
flex \
gdb \
@@ -122,8 +131,10 @@ RUN set -e \
gzip \
jq \
jsonnet \
kmod \
libcurl4-openssl-dev \
libbz2-dev \
libelf-dev \
libffi-dev \
liblzma-dev \
libncurses5-dev \
@@ -137,6 +148,11 @@ RUN set -e \
libxml2-dev \
libxmlsec1-dev \
libxxhash-dev \
linux-perf \
bpfcc-tools \
libbpfcc \
libbpfcc-dev \
linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \
lsof \
make \
netcat-openbsd \
@@ -144,6 +160,8 @@ RUN set -e \
openssh-client \
parallel \
pkg-config \
rsync \
sudo \
unzip \
wget \
xz-utils \
@@ -198,6 +216,8 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /
# Configure sudo & docker
RUN usermod -aG sudo nonroot && \
echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \
mkdir -p /etc/sudoers.d && \
echo 'nonroot ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/nonroot && \
usermod -aG docker nonroot
# AWS CLI

View File

@@ -149,6 +149,9 @@ RUN case $DEBIAN_VERSION in \
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
bpfcc-tools \
libbpfcc \
libbpfcc-dev \
libclang-dev \
jsonnet \
$VERSION_INSTALLS \
@@ -1988,6 +1991,10 @@ RUN apt update && \
locales \
lsof \
procps \
bpfcc-tools \
libbpfcc \
libbpfcc-dev \
libclang-dev \
rsyslog-gnutls \
screen \
tcpdump \

View File

@@ -39,6 +39,14 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: enable-kernel-modules
user: root
sysvInitAction: sysinit
shell: mkdir -p /lib/ && ln -s /neonvm/tools/lib/modules /lib/
- name: enable-bpfs
user: root
sysvInitAction: sysinit
shell: mkdir -p /sys/kernel/debug && mount -t debugfs debugfs /sys/kernel/debug && mount -t bpf bpf /sys/fs/bpf && chmod 755 /sys/fs/bpf
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
# use a different path for the socket. The symlink actually points to our custom path.
@@ -65,7 +73,7 @@ files:
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
@@ -152,6 +160,8 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN chmod 0666 /etc/compute_rsyslog.conf

View File

@@ -39,6 +39,14 @@ commands:
user: nobody
sysvInitAction: respawn
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
- name: enable-kernel-modules
user: root
sysvInitAction: sysinit
shell: mkdir -p /lib/ && ln -s /neonvm/tools/lib/modules /lib/
- name: enable-bpfs
user: root
sysvInitAction: sysinit
shell: mkdir -p /sys/kernel/debug && mount -t debugfs debugfs /sys/kernel/debug && mount -t bpf bpf /sys/fs/bpf && chmod 755 /sys/fs/bpf
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
# use a different path for the socket. The symlink actually points to our custom path.
@@ -65,7 +73,7 @@ files:
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes
@@ -148,6 +156,8 @@ merge: |
RUN set -e \
&& chmod 0644 /etc/cgconfig.conf
ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
RUN chmod 0666 /etc/compute_rsyslog.conf
RUN mkdir /var/log/rsyslog && chown -R postgres /var/log/rsyslog

View File

@@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = ["neon_failpoint/testing"]
testing = ["fail/failpoints"]
[dependencies]
async-compression.workspace = true
@@ -23,7 +23,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
neon_failpoint.workspace = true
fail.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true
@@ -31,6 +31,7 @@ hostname-validator = "1.1"
indexmap.workspace = true
itertools.workspace = true
jsonwebtoken.workspace = true
libproc.workspace = true
metrics.workspace = true
nix.workspace = true
notify.workspace = true
@@ -49,6 +50,7 @@ serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
tar.workspace = true
tempfile.workspace = true
tower.workspace = true
tower-http.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
@@ -78,3 +80,10 @@ zstd = "0.13"
bytes = "1.0"
rust-ini = "0.20.0"
rlimit = "0.10.1"
inferno = { version = "0.12", default-features = false, features = [
"multithreaded",
"nameattr",
] }
pprof = { version = "0.15", features = ["protobuf-codec", "flamegraph"] }
prost = "0.12"

View File

@@ -154,7 +154,7 @@ impl Cli {
fn main() -> Result<()> {
let cli = Cli::parse();
failpoint_support::init().unwrap();
let scenario = failpoint_support::init();
// For historical reasons, the main thread that processes the config and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
@@ -201,6 +201,8 @@ fn main() -> Result<()> {
let exit_code = compute_node.run()?;
scenario.teardown();
deinit_and_exit(exit_code);
}

View File

@@ -371,7 +371,9 @@ fn maybe_cgexec(cmd: &str) -> Command {
}
}
struct PostgresHandle {
/// A handle to the Postgres process that is running in the compute
/// node.
pub struct PostgresHandle {
postgres: std::process::Child,
log_collector: JoinHandle<Result<()>>,
}

View File

@@ -1,9 +1,8 @@
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::info;
use utils::failpoint_support::apply_failpoint;
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
@@ -12,16 +11,10 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in neon_failpoint
/// List of actions to take, using the format described in `fail::cfg`
///
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
/// Optional context matching rules for conditional failpoints
/// Each key-value pair specifies a context key and a regex pattern to match against
/// All context matchers must match for the failpoint to trigger
#[serde(default, skip_serializing_if = "Option::is_none")]
pub context_matchers: Option<HashMap<String, String>>,
}
use crate::http::JsonResponse;
@@ -31,7 +24,7 @@ use crate::http::extract::Json;
pub(in crate::http) async fn configure_failpoints(
failpoints: Json<ConfigureFailpointsRequest>,
) -> Response {
if !neon_failpoint::has_failpoints() {
if !fail::has_failpoints() {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"Cannot manage failpoints because neon was compiled without failpoints support",
@@ -39,21 +32,16 @@ pub(in crate::http) async fn configure_failpoints(
}
for fp in &*failpoints {
info!(
"cfg failpoint: {} {} (context: {:?})",
fp.name, fp.actions, fp.context_matchers
);
info!("cfg failpoint: {} {}", fp.name, fp.actions);
let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() {
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
} else {
configure_failpoint(&fp.name, &fp.actions)
};
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(e) = cfg_result {
return JsonResponse::error(
StatusCode::BAD_REQUEST,
format!("failed to configure failpoint '{}': {e}", fp.name),
format!("failed to configure failpoints: {e}"),
);
}
}

View File

@@ -15,6 +15,7 @@ pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod profile;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -0,0 +1,217 @@
//! Contains the route for profiling the compute.
//!
//! Profiling the compute means generating a pprof profile of the
//! postgres processes.
//!
//! The profiling is done using the `perf` tool, which is expected to be
//! available somewhere in `$PATH`.
use std::sync::atomic::Ordering;
use axum::Json;
use axum::response::IntoResponse;
use http::StatusCode;
use nix::unistd::Pid;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use crate::http::JsonResponse;
static CANCEL_CHANNEL: Lazy<Mutex<Option<tokio::sync::broadcast::Sender<()>>>> =
Lazy::new(|| Mutex::new(None));
fn default_sampling_frequency() -> u16 {
100
}
fn default_timeout_seconds() -> u8 {
5
}
fn deserialize_sampling_frequency<'de, D>(deserializer: D) -> Result<u16, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
const MIN_SAMPLING_FREQUENCY: u16 = 1;
const MAX_SAMPLING_FREQUENCY: u16 = 1000;
let value = u16::deserialize(deserializer)?;
if !(MIN_SAMPLING_FREQUENCY..=MAX_SAMPLING_FREQUENCY).contains(&value) {
return Err(serde::de::Error::custom(format!(
"sampling_frequency must be between {MIN_SAMPLING_FREQUENCY} and {MAX_SAMPLING_FREQUENCY}, got {value}"
)));
}
Ok(value)
}
fn deserialize_profiling_timeout<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::Deserialize;
const MIN_TIMEOUT_SECONDS: u8 = 1;
const MAX_TIMEOUT_SECONDS: u8 = 60;
let value = u8::deserialize(deserializer)?;
if !(MIN_TIMEOUT_SECONDS..=MAX_TIMEOUT_SECONDS).contains(&value) {
return Err(serde::de::Error::custom(format!(
"timeout_seconds must be between {MIN_TIMEOUT_SECONDS} and {MAX_TIMEOUT_SECONDS}, got {value}"
)));
}
Ok(value)
}
/// Request parameters for profiling the compute.
#[derive(Debug, Clone, serde::Deserialize)]
pub(in crate::http) struct ProfileRequest {
/// The profiling tool to use, currently only `perf` is supported.
profiler: crate::profiling::ProfileGenerator,
#[serde(default = "default_sampling_frequency")]
#[serde(deserialize_with = "deserialize_sampling_frequency")]
sampling_frequency: u16,
#[serde(default = "default_timeout_seconds")]
#[serde(deserialize_with = "deserialize_profiling_timeout")]
timeout_seconds: u8,
#[serde(default)]
archive: bool,
}
/// The HTTP request handler for reporting the profiling status of
/// the compute.
pub(in crate::http) async fn profile_status() -> impl IntoResponse {
tracing::info!("Profile status request received.");
let cancel_channel = CANCEL_CHANNEL.lock().await;
if let Some(tx) = cancel_channel.as_ref() {
if tx.receiver_count() > 0 {
return JsonResponse::create_response(
StatusCode::OK,
"Profiling is currently in progress.",
);
}
}
JsonResponse::create_response(StatusCode::NO_CONTENT, "Profiling is not in progress.")
}
/// The HTTP request handler for stopping profiling the compute.
pub(in crate::http) async fn profile_stop() -> impl IntoResponse {
tracing::info!("Profile stop request received.");
match CANCEL_CHANNEL.lock().await.take() {
Some(tx) => {
if tx.send(()).is_err() {
tracing::error!("Failed to send cancellation signal.");
return JsonResponse::create_response(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to send cancellation signal",
);
}
JsonResponse::create_response(StatusCode::OK, "Profiling stopped successfully.")
}
None => JsonResponse::create_response(
StatusCode::PRECONDITION_FAILED,
"Profiling is not in progress, there is nothing to stop.",
),
}
}
/// The HTTP request handler for starting profiling the compute.
pub(in crate::http) async fn profile_start(
Json(request): Json<ProfileRequest>,
) -> impl IntoResponse {
tracing::info!("Profile start request received: {request:?}");
let tx = tokio::sync::broadcast::Sender::<()>::new(1);
{
let mut cancel_channel = CANCEL_CHANNEL.lock().await;
if cancel_channel.is_some() {
return JsonResponse::create_response(
StatusCode::CONFLICT,
"Profiling is already in progress.",
);
}
*cancel_channel = Some(tx.clone());
}
tracing::info!("Profiling will start with parameters: {request:?}");
let pg_pid = Pid::from_raw(crate::compute::PG_PID.load(Ordering::SeqCst) as _);
let run_with_sudo = !cfg!(feature = "testing");
let options = crate::profiling::ProfileGenerationOptions {
profiler: request.profiler,
run_with_sudo,
pids: [pg_pid].into_iter().collect(),
follow_forks: true,
sampling_frequency: request.sampling_frequency as u32,
blocklist_symbols: vec![
"libc".to_owned(),
"libgcc".to_owned(),
"pthread".to_owned(),
"vdso".to_owned(),
],
archive: request.archive,
};
let options = crate::profiling::ProfileGenerationTaskOptions {
options,
timeout: std::time::Duration::from_secs(request.timeout_seconds as u64),
should_stop: Some(tx),
};
let pprof_data = crate::profiling::generate_pprof_profile(options).await;
if CANCEL_CHANNEL.lock().await.take().is_none() {
tracing::error!("Profiling was cancelled from another request.");
return JsonResponse::create_response(
StatusCode::NO_CONTENT,
"Profiling was cancelled from another request.",
);
}
let pprof_data = match pprof_data {
Ok(data) => data,
Err(e) => {
tracing::error!(error = ?e, "failed to generate pprof data");
return JsonResponse::create_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to generate pprof data: {e:?}"),
);
}
};
tracing::info!("Profiling has completed successfully.");
let mut headers = http::HeaderMap::new();
if request.archive {
headers.insert(
http::header::CONTENT_TYPE,
http::HeaderValue::from_static("application/gzip"),
);
headers.insert(
http::header::CONTENT_DISPOSITION,
http::HeaderValue::from_static("attachment; filename=\"profile.pb.gz\""),
);
} else {
headers.insert(
http::header::CONTENT_TYPE,
http::HeaderValue::from_static("application/octet-stream"),
);
headers.insert(
http::header::CONTENT_DISPOSITION,
http::HeaderValue::from_static("attachment; filename=\"profile.pb\""),
);
}
(headers, pprof_data.0).into_response()
}

View File

@@ -27,6 +27,7 @@ use super::{
},
};
use crate::compute::ComputeNode;
use crate::http::routes::profile;
/// `compute_ctl` has two servers: internal and external. The internal server
/// binds to the loopback interface and handles communication from clients on
@@ -81,8 +82,14 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
Server::External {
config, compute_id, ..
} => {
let unauthenticated_router =
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/metrics", get(metrics::get_metrics))
.route(
"/profile/cpu",
get(profile::profile_status)
.post(profile::profile_start)
.delete(profile::profile_stop),
);
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))

View File

@@ -24,6 +24,7 @@ pub mod monitor;
pub mod params;
pub mod pg_helpers;
pub mod pgbouncer;
pub mod profiling;
pub mod rsyslog;
pub mod spec;
mod spec_apply;

View File

@@ -1,5 +1,5 @@
use anyhow::{Context, Result};
use neon_failpoint::fail_point;
use fail::fail_point;
use tokio_postgres::{Client, Transaction};
use tracing::{error, info};
@@ -40,14 +40,13 @@ impl<'m> MigrationRunner<'m> {
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = async {
fail_point!("compute-migration", |fail_migration_id: Option<String>| {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
});
false
}
.await;
})();
if fail {
return Err(anyhow::anyhow!(format!(

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,58 @@
# Continuous Crofiling (Compute)
The continuous profiling of the compute node is performed by `perf` or `bcc-tools`, the latter is preferred.
The executables profiled are all the postgres-related ones only, excluding the actual compute code (Rust). This can be done as well but
was not the main goal.
## Tools
The aforementioned tools are available within the same Docker image as
the compute node itself, but the corresponding dependencies linux the
linux kernel headers and the linux kernel itself are not and can't be
for obvious reasons. To solve that, as we run the compute nodes as a
virtual machine (qemu), we need to deliver these dependencies to it.
This is done by the `autoscaling` part, which builds and deploys the
kernel headers, needed modules, and the `perf` binary into an ext4-fs
disk image, which is later attached to the VM and is symlinked to be
made available for the compute node.
## Output
The output of the profiling is always a binary file in the same format
of `pprof`. It can, however, be archived by `gzip` additionally, if the
corresponding argument is provided in the JSON request.
## REST API
### Test profiling
One can test the profiling after connecting to the VM and running:
```sh
curl -X POST -H "Content-Type: application/json" http://localhost:3080/profile/cpu -d '{"profiler": {"BccProfile": null}, "sampling_frequency": 99, "timeout_seconds": 5, "archive": false}' -v --output profile.pb
```
This uses the `Bcc` profiler and does not archive the output. The
profiling data will be saved into the `profile.pb` file locally.
**Only one profiling session can be run at a time.**
To check the profiling status (to see whether it is already running or
not), one can perform the `GET` request:
```sh
curl http://localhost:3080/profile/cpu -v
```
The profiling can be stopped by performing the `DELETE` request:
```sh
curl -X DELETE http://localhost:3080/profile/cpu -v
```
## Supported profiling data
For now, only the CPU profiling is done and ther is no heap profiling.
Also, only the postgres-related executables are tracked, the compute
(Rust) part itself **is not tracked**.

View File

@@ -9,7 +9,7 @@ anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
camino.workspace = true
neon_failpoint.workspace = true
fail.workspace = true
futures.workspace = true
hyper0.workspace = true
itertools.workspace = true

View File

@@ -1,8 +1,7 @@
use hyper::{Body, Request, Response, StatusCode};
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use crate::error::ApiError;
use crate::json::{json_request, json_response};
@@ -14,16 +13,10 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in neon_failpoint
/// List of actions to take, using the format described in `fail::cfg`
///
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
/// Optional context matching rules for conditional failpoints
/// Each key-value pair specifies a context key and a regex pattern to match against
/// All context matchers must match for the failpoint to trigger
#[serde(default, skip_serializing_if = "Option::is_none")]
pub context_matchers: Option<HashMap<String, String>>,
}
/// Configure failpoints through http.
@@ -31,7 +24,7 @@ pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !has_failpoints() {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
@@ -39,24 +32,15 @@ pub async fn failpoints_handler(
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
tracing::info!(
"cfg failpoint: {} {} (context: {:?})",
fp.name,
fp.actions,
fp.context_matchers
);
tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions);
let cfg_result = if let Some(context_matchers) = fp.context_matchers {
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
} else {
configure_failpoint(&fp.name, &fp.actions)
};
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(err) = cfg_result {
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoint '{}': {}",
fp.name,
err
"Failed to configure failpoints: {err_msg}"
)));
}
}

View File

@@ -6,8 +6,8 @@ license.workspace = true
[dependencies]
thiserror.workspace = true
nix.workspace=true
nix.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[target.'cfg(target_os = "macos")'.dependencies]
tempfile = "3.14.0"
tempfile = "3.20.0"

View File

@@ -1,27 +0,0 @@
[package]
name = "neon_failpoint"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { workspace = true, features = ["time", "sync", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
regex = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
either = { workspace = true }
[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["fmt"] }
[features]
default = []
testing = []
[[example]]
name = "context_demo"
required-features = ["testing"]

View File

@@ -1,460 +0,0 @@
# Neon Failpoint Library
A modern, async-first failpoint library for Neon, replacing the `fail` crate with enhanced functionality.
## Features
- **Async-first**: All failpoint operations are async and don't require `spawn_blocking`
- **Context matching**: Failpoints can be configured to trigger only when specific context conditions are met
- **Regex support**: Context values can be matched using regular expressions
- **Cancellation support**: All operations support cancellation tokens
- **Dynamic reconfiguration**: Paused and sleeping tasks automatically resume when failpoint configurations change
- **Backward compatibility**: Drop-in replacement for existing `fail` crate usage
## Supported Actions
- `off` - Disable the failpoint
- `pause` - Pause indefinitely until disabled, reconfigured, or cancelled
- `sleep(N)` - Sleep for N milliseconds (can be interrupted by reconfiguration)
- `return` - Return early (empty value)
- `return(value)` - Return early with a specific value
- `exit` - Exit the process immediately
- `panic(message)` - Panic the process with a custom message
- `N%return(value)` - Return with a specific value N% of the time (probability-based)
- `N%M*return(value)` - Return with a specific value N% of the time, maximum M times
- `N%action` - Execute any action N% of the time (probability-based)
- `N%M*action` - Execute any action N% of the time, maximum M times
## Probability-Based Actions
The library supports probability-based failpoints that trigger only a percentage of the time:
```rust
// 50% chance to return a value
configure_failpoint("random_failure", "50%return(error)").unwrap();
// 10% chance to sleep, maximum 3 times
configure_failpoint("occasional_delay", "10%3*sleep(1000)").unwrap();
// 25% chance to panic
configure_failpoint("rare_panic", "25%panic(critical error)").unwrap();
```
The probability system uses a counter to track how many times a probability-based action has been triggered, allowing for precise control over test scenarios.
## Dynamic Behavior
When a failpoint is reconfigured while tasks are waiting on it:
- **Paused tasks** will immediately resume and continue normal execution
- **Sleeping tasks** will wake up early and continue normal execution
- **Removed failpoints** will cause all waiting tasks to resume normally
The new configuration only applies to future hits of the failpoint, not to tasks that are already waiting. This allows for flexible testing scenarios where you can pause execution, inspect state, and then resume execution dynamically.
## Example: Dynamic Reconfiguration
```rust
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
use tokio::time::Duration;
// Start a task that will hit a failpoint
let task = tokio::spawn(async {
println!("About to hit failpoint");
match failpoint("test_pause", None).await {
FailpointResult::Return(value) => println!("Returned: {}", value),
FailpointResult::Continue => println!("Continued normally"),
FailpointResult::Cancelled => println!("Cancelled"),
}
});
// Configure the failpoint to pause
configure_failpoint("test_pause", "pause").unwrap();
// Let the task hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Change the failpoint configuration - this will wake up the paused task
// The task will resume and continue normally (not apply the new config)
configure_failpoint("test_pause", "return(not_applied)").unwrap();
// The task will complete with Continue, not Return
let result = task.await.unwrap();
```
## Basic Usage
```rust
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
// Configure a failpoint
configure_failpoint("my_failpoint", "return(42)").unwrap();
// Use the failpoint
match failpoint("my_failpoint", None).await {
FailpointResult::Return(value) => {
println!("Failpoint returned: {}", value);
return value.parse().unwrap_or_default();
}
FailpointResult::Continue => {
// Continue normal execution
}
FailpointResult::Cancelled => {
// Handle cancellation
}
}
```
## Context-Based Failpoint Configuration
Context allows you to create **conditional failpoints** that only trigger when specific runtime conditions are met. This is particularly useful for testing scenarios where you want to inject failures only for specific tenants, operations, or other contextual conditions.
### Configuring Context-Based Failpoints
Use `configure_failpoint_with_context()` to set up failpoints with context matching:
```rust
use neon_failpoint::configure_failpoint_with_context;
use std::collections::HashMap;
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation", // failpoint name
"return(simulated_failure)", // action to take
context_matchers // context matching rules
).unwrap();
```
### Context Matching Rules
The context matching system works as follows:
1. **Key-Value Matching**: Each entry in `context_matchers` specifies a key that must exist in the runtime context
2. **Regex Support**: Values in `context_matchers` are treated as regular expressions first
3. **Fallback to Exact Match**: If the regex compilation fails, it falls back to exact string matching
4. **ALL Must Match**: All context matchers must match for the failpoint to trigger
### Runtime Context Usage
When code hits a failpoint, it provides context using a `HashMap<String, String>`:
```rust
use neon_failpoint::{failpoint, FailpointResult};
use std::collections::HashMap;
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
context.insert("user_id".to_string(), "user_456".to_string());
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => {
match result {
FailpointResult::Return(value) => {
// This will only trigger if ALL context matchers match
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with normal backup operation
}
FailpointResult::Cancelled => {}
}
}
either::Either::Right(future) => {
match future.await {
FailpointResult::Return(value) => {
// This will only trigger if ALL context matchers match
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with normal backup operation
}
FailpointResult::Cancelled => {}
}
}
}
```
### Context Matching Examples
#### Regex Matching
```rust
// Configure to match test tenants only
let mut matchers = HashMap::new();
matchers.insert("tenant_id".to_string(), "test_.*".to_string());
configure_failpoint_with_context("test_failpoint", "pause", matchers).unwrap();
// This will match
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
// This will NOT match
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "prod_123".to_string());
```
#### Multiple Conditions
```rust
// Must match BOTH tenant pattern AND operation
let mut matchers = HashMap::new();
matchers.insert("tenant_id".to_string(), "test_.*".to_string());
matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context("backup_test", "return(failed)", matchers).unwrap();
// This will match (both conditions met)
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
// This will NOT match (missing operation)
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "restore".to_string());
```
#### Exact String Matching
```rust
// If regex compilation fails, falls back to exact match
let mut matchers = HashMap::new();
matchers.insert("env".to_string(), "staging".to_string());
configure_failpoint_with_context("env_specific", "sleep(1000)", matchers).unwrap();
// This will match
let mut context = HashMap::new();
context.insert("env".to_string(), "staging".to_string());
// This will NOT match
let mut context = HashMap::new();
context.insert("env".to_string(), "production".to_string());
```
### Benefits of Context-Based Failpoints
1. **Selective Testing**: Only inject failures for specific tenants, environments, or operations
2. **Production Safety**: Avoid accidentally triggering failpoints in production by using context filters
3. **Complex Scenarios**: Test interactions between different components with targeted failures
4. **Debugging**: Isolate issues to specific contexts without affecting the entire system
### Context vs. Non-Context Failpoints
- **Without context**: `configure_failpoint("name", "action")` - triggers for ALL hits
- **With context**: `configure_failpoint_with_context("name", "action", matchers)` - triggers only when context matches
## Context-Specific Failpoints
```rust
use neon_failpoint::{configure_failpoint_with_context, failpoint};
use std::collections::HashMap;
// Configure a failpoint that only triggers for specific tenants
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation",
"return(simulated_failure)",
context_matchers
).unwrap();
// Use with context
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => {
match result {
FailpointResult::Return(value) => {
// This will trigger for tenant_id matching "test_.*"
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with backup
}
FailpointResult::Cancelled => {}
}
}
either::Either::Right(future) => {
match future.await {
FailpointResult::Return(value) => {
// This will trigger for tenant_id matching "test_.*"
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with backup
}
FailpointResult::Cancelled => {}
}
}
}
```
## Macros
The library provides convenient macros for common patterns:
### `fail_point!` - Basic Failpoint Macro
The `fail_point!` macro has three variants:
1. **Simple failpoint** - `fail_point!(name)`
- Just checks the failpoint and continues or returns early (no value)
- Panics if the failpoint is configured with `return(value)` since no closure is provided
2. **Failpoint with return handler** - `fail_point!(name, closure)`
- Provides a closure to handle return values from the failpoint
- The closure receives `Option<String>` and should return the appropriate value
3. **Conditional failpoint** - `fail_point!(name, condition, closure)`
- Only checks the failpoint if the condition is true
- Provides a closure to handle return values (receives `&str`)
```rust
use neon_failpoint::fail_point;
// Simple failpoint - just continue or return early
fail_point!("my_failpoint");
// Failpoint with return value handling
fail_point!("my_failpoint", |value: Option<String>| {
match value {
Some(v) => {
println!("Got value: {}", v);
return Ok(v.parse().unwrap_or_default());
}
None => return Ok(42), // Default return value
}
});
// Conditional failpoint - only check if condition is met
let should_fail = some_condition();
fail_point!("conditional_failpoint", should_fail, |value: &str| {
println!("Conditional failpoint triggered with: {}", value);
return Err(anyhow::anyhow!("Simulated failure"));
});
```
### `fail_point_with_context!` - Context-Aware Failpoint Macro
The `fail_point_with_context!` macro has three variants that mirror `fail_point!` but include context:
1. **Simple with context** - `fail_point_with_context!(name, context)`
2. **With context and return handler** - `fail_point_with_context!(name, context, closure)`
3. **Conditional with context** - `fail_point_with_context!(name, context, condition, closure)`
```rust
use neon_failpoint::{fail_point_with_context};
use std::collections::HashMap;
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
// Simple context failpoint
fail_point_with_context!("backup_failpoint", &context);
// Context failpoint with return handler
fail_point_with_context!("backup_failpoint", &context, |value: Option<String>| {
match value {
Some(v) => return Err(anyhow::anyhow!("Backup failed: {}", v)),
None => return Err(anyhow::anyhow!("Backup failed")),
}
});
// Conditional context failpoint
let is_test_tenant = tenant_id.starts_with("test_");
fail_point_with_context!("backup_failpoint", &context, is_test_tenant, |value: Option<String>| {
// Only triggers for test tenants
return Err(anyhow::anyhow!("Test tenant backup failure"));
});
```
### Other Utility Macros
```rust
use neon_failpoint::{pausable_failpoint, sleep_millis_async};
// Pausable failpoint with cancellation
let cancel_token = CancellationToken::new();
if let Err(()) = pausable_failpoint!("pause_here", &cancel_token).await {
println!("Failpoint was cancelled");
}
// Sleep failpoint
sleep_millis_async!("sleep_here", &cancel_token).await;
// Context creation helper
let mut context = HashMap::new();
context.insert("key1".to_string(), "value1".to_string());
context.insert("key2".to_string(), "value2".to_string());
```
### Argument Reference
- **`name`**: String literal - the name of the failpoint
- **`context`**: Expression that evaluates to `&HashMap<String, String>` - context for matching
- **`condition`**: Boolean expression - only check failpoint if true
- **`closure`**: Closure that handles return values:
- For `fail_point!` with closure: receives `Option<String>`
- For conditional variants: receives `&str`
- For `fail_point_with_context!` with closure: receives `Option<String>`
- **`cancel`**: `&CancellationToken` - for cancellation support
## Migration from `fail` crate
The library provides a compatibility layer in `libs/utils/src/failpoint_support.rs`. Most existing code should work without changes, but you can migrate to the new async APIs for better performance:
### Before (with `fail` crate):
```rust
use utils::failpoint_support::pausable_failpoint;
// This used spawn_blocking internally
pausable_failpoint!("my_failpoint", &cancel_token).await?;
```
### After (with `neon_failpoint`):
```rust
use neon_failpoint::{failpoint_with_cancellation, FailpointResult};
// This is fully async
match failpoint_with_cancellation("my_failpoint", None, &cancel_token).await {
FailpointResult::Continue => {},
FailpointResult::Cancelled => return Err(()),
FailpointResult::Return(_) => {},
}
```
## Environment Variable Support
Failpoints can be configured via the `FAILPOINTS` environment variable:
```bash
FAILPOINTS="failpoint1=return(42);failpoint2=sleep(1000);failpoint3=exit"
```
## Testing
The library includes comprehensive tests and examples. Run them with:
```bash
cargo test --features testing
cargo run --example context_demo --features testing
```
## HTTP Configuration
The library integrates with the existing HTTP failpoint configuration API. Send POST requests to `/v1/failpoints` with:
```json
[
{
"name": "my_failpoint",
"actions": "return(42)"
}
]
```

View File

@@ -1,82 +0,0 @@
use neon_failpoint::{configure_failpoint_with_context, failpoint, FailpointResult};
use std::collections::HashMap;
#[tokio::main]
async fn main() {
// Initialize tracing for better output
tracing_subscriber::fmt::init();
// Set up a context-specific failpoint
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation",
"return(simulated_failure)",
context_matchers,
)
.unwrap();
// Test with matching context
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
println!("Testing with matching context...");
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => match result {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
either::Either::Right(future) => match future.await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
}
// Test with non-matching context
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "prod_456".to_string());
context.insert("operation".to_string(), "backup".to_string());
println!("Testing with non-matching context...");
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => match result {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered (expected)");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
either::Either::Right(future) => match future.await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered (expected)");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,356 +0,0 @@
//! Macros for convenient failpoint usage
/// Simple failpoint macro - async version that doesn't require spawn_blocking
#[macro_export]
macro_rules! fail_point {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}
}};
}
/// Simple failpoint macro - sync version that panics if async action is triggered
#[macro_export]
macro_rules! fail_point_sync {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}};
($name:literal, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}};
($name:literal, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}
}};
}
/// Failpoint macro with context support
#[macro_export]
macro_rules! fail_point_with_context {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $context:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}
}};
}
/// Failpoint macro with context support - sync version
#[macro_export]
macro_rules! fail_point_with_context_sync {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}
}};
}
/// Pausable failpoint macro - equivalent to the old pausable_failpoint
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
::tracing::info!("at failpoint {}", $name); // tests rely on this
match $crate::failpoint_with_cancellation($name, None, $cancel) {
$crate::either::Either::Left(result) => match result {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
},
$crate::either::Either::Right(future) => match future.await {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
},
}
} else {
Ok(())
}
}};
}
/// Sleep failpoint macro - for async sleep operations
#[macro_export]
macro_rules! sleep_millis_async {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(_) => {}
$crate::either::Either::Right(future) => {
future.await;
}
}
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint_with_cancellation($name, None, $cancel) {
$crate::either::Either::Left(_) => {}
$crate::either::Either::Right(future) => {
future.await;
}
}
}
}};
}
// Re-export for convenience
pub use fail_point;
pub use fail_point_sync;
pub use fail_point_with_context;
pub use fail_point_with_context_sync;
pub use pausable_failpoint;
pub use sleep_millis_async;

View File

@@ -9,7 +9,7 @@ default = ["rename_noreplace"]
rename_noreplace = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["neon_failpoint/testing"]
testing = ["fail/failpoints"]
[dependencies]
arc-swap.workspace = true
@@ -21,11 +21,10 @@ bytes.workspace = true
camino.workspace = true
chrono.workspace = true
diatomic-waker.workspace = true
either.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
neon_failpoint.workspace = true
fail.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = { workspace = true, features = ["ioctl"] }

View File

@@ -1,22 +1,59 @@
//! Failpoint support code shared between pageserver and safekeepers.
//!
//! This module provides a compatibility layer over the new neon_failpoint crate.
pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init};
use tokio_util::sync::CancellationToken;
/// Mere forward to neon_failpoint::pausable_failpoint
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
///
/// Optionally pass a cancellation token, and this failpoint will drop out of
/// its pause when the cancellation token fires. This is useful for testing
/// cases where we would like to block something, but test its clean shutdown behavior.
/// The macro evaluates to a Result in that case, where Ok(()) is the case
/// where the failpoint was not paused, and Err() is the case where cancellation
/// token fired while evaluating the failpoint.
///
/// Remember to unpause the failpoint in the test; until that happens, one of the
/// limited number of spawn_blocking thread pool threads is leaked.
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {
::neon_failpoint::pausable_failpoint!($name)
};
($name:literal, $cancel:expr) => {
::neon_failpoint::pausable_failpoint!($name, $cancel)
};
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
let failpoint_fut = ::tokio::task::spawn_blocking({
let current = ::tracing::Span::current();
move || {
let _entered = current.entered();
::tracing::info!("at failpoint {}", $name);
::fail::fail_point!($name);
}
});
let cancel_fut = async move {
$cancel.cancelled().await;
};
::tokio::select! {
res = failpoint_fut => {
res.expect("spawn_blocking");
// continue with execution
Ok(())
},
_ = cancel_fut => {
Err(())
}
}
} else {
Ok(())
}
}};
}
/// DEPRECATED! - use with fail::cfg("$name", "return(2000)")
pub use pausable_failpoint;
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
/// specified time (in milliseconds). The main difference is that we use async
@@ -32,7 +69,7 @@ macro_rules! __failpoint_sleep_millis_async {
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::neon_failpoint::fail_point_sync!($name, |x| x);
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
@@ -45,7 +82,7 @@ macro_rules! __failpoint_sleep_millis_async {
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::neon_failpoint::fail_point_sync!($name, |x| x);
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
@@ -89,3 +126,60 @@ pub async fn failpoint_sleep_cancellable_helper(
tokio::time::timeout(d, cancel.cancelled()).await.ok();
tracing::info!("failpoint {:?}: sleep done", name);
}
/// Initialize the configured failpoints
///
/// You must call this function before any concurrent threads do operations.
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
//
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
// SAFETY: this function should before any threads start and access env vars concurrently
unsafe {
std::env::remove_var("FAILPOINTS");
}
} else {
// let the library handle non-utf8, or nothing for not present
}
let scenario = fail::FailScenario::setup();
if let Ok(val) = actions {
val.split(';')
.enumerate()
.map(|(i, s)| s.split_once('=').ok_or((i, s)))
.for_each(|res| {
let (name, actions) = match res {
Ok(t) => t,
Err((i, s)) => {
panic!(
"startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
i + 1,
);
}
};
if let Err(e) = apply_failpoint(name, actions) {
panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
}
});
}
scenario
}
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
if actions == "exit" {
fail::cfg_callback(name, exit_failpoint)
} else {
fail::cfg(name, actions)
}
}
#[inline(never)]
fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["neon_failpoint/testing", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
fuzz-read-path = ["testing"]
@@ -33,7 +33,7 @@ crc32c.workspace = true
either.workspace = true
enum-map.workspace = true
enumset = { workspace = true, features = ["serde"]}
neon_failpoint.workspace = true
fail.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true

View File

@@ -17,7 +17,6 @@ use anyhow::{Context, anyhow};
use async_compression::tokio::write::GzipEncoder;
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use neon_failpoint as fail;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};

View File

@@ -68,7 +68,7 @@ const FEATURES: &[&str] = &[
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
neon_failpoint::has_failpoints(),
fail::has_failpoints(),
FEATURES,
)
}
@@ -84,7 +84,7 @@ fn main() -> anyhow::Result<()> {
}
// Initialize up failpoints support
failpoint_support::init().unwrap();
let scenario = failpoint_support::init();
let workdir = arg_matches
.get_one::<String>("workdir")
@@ -221,6 +221,7 @@ fn main() -> anyhow::Result<()> {
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
}
@@ -365,9 +366,16 @@ fn start_pageserver(
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
let failpoints = neon_failpoint::list();
for (name, actions) in failpoints {
info!("starting with failpoint: {name} {actions}");
let failpoints = fail::list();
if !failpoints.is_empty() {
info!(
"started with failpoints: {}",
failpoints
.iter()
.map(|(name, actions)| format!("{name}={actions}"))
.collect::<Vec<String>>()
.join(";")
)
}
// Create and lock PID file. This ensures that there cannot be more than one

View File

@@ -6,8 +6,6 @@ use camino::{Utf8Path, Utf8PathBuf};
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
use crate::consumption_metrics::NewMetricsRefRoot;
use neon_failpoint as fail;
pub(super) fn read_metrics_from_serde_value(
json_value: serde_json::Value,
) -> anyhow::Result<Vec<NewRawMetric>> {
@@ -131,7 +129,7 @@ pub(super) async fn flush_metrics_to_disk(
tempfile.flush()?;
tempfile.as_file().sync_all()?;
fail::fail_point_sync!("before-persist-last-metrics-collected");
fail::fail_point!("before-persist-last-metrics-collected");
drop(tempfile.persist(&*path).map_err(|e| e.error)?);

View File

@@ -8,7 +8,6 @@
use std::time::Duration;
use neon_failpoint as fail;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

View File

@@ -28,7 +28,6 @@ use http_utils::{RequestExt, RouterBuilder};
use humantime::format_rfc3339;
use hyper::{Body, Request, Response, StatusCode, Uri, header};
use metrics::launch_timestamp::LaunchTimestamp;
use neon_failpoint as fail;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::{
DetachBehavior, DownloadRemoteLayersTaskSpawnRequest, IngestAuxFilesRequest,
@@ -3976,7 +3975,7 @@ pub fn make_router(
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| api_handler(r, status_handler))
.post("/v1/failpoints", |r| {
.put("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)
})
.post("/v1/reload_auth_validation_keys", |r| {

View File

@@ -19,7 +19,6 @@ use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
use jsonwebtoken::TokenData;
use neon_failpoint as fail;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
@@ -336,21 +335,18 @@ async fn page_service_conn_main(
let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
let socket_timeout_ms = (|| {
fail::fail_point_sync!(
"simulated-bad-compute-connection",
|avg_timeout_ms: Option<String>| {
// Exponential distribution for simulating
// poor network conditions, expect about avg_timeout_ms to be around 15
// in tests
if let Some(avg_timeout_ms) = avg_timeout_ms {
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
let u = rand::random::<f32>();
((1.0 - u).ln() / (-avg)) as u64
} else {
default_timeout_ms
}
fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
// Exponential distribution for simulating
// poor network conditions, expect about avg_timeout_ms to be around 15
// in tests
if let Some(avg_timeout_ms) = avg_timeout_ms {
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
let u = rand::random::<f32>();
((1.0 - u).ln() / (-avg)) as u64
} else {
default_timeout_ms
}
);
});
default_timeout_ms
})();
@@ -3047,7 +3043,7 @@ where
_pgb: &mut PostgresBackend<IO>,
sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point_sync!("ps::connection-start::startup-packet");
fail::fail_point!("ps::connection-start::startup-packet");
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {

View File

@@ -14,7 +14,6 @@ use crate::{PERF_TRACE_TARGET, ensure_walingest};
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use neon_failpoint as fail;
use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,

View File

@@ -30,7 +30,6 @@ use enumset::EnumSet;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use itertools::Itertools as _;
use neon_failpoint as fail;
use once_cell::sync::Lazy;
pub use pageserver_api::models::TenantState;
use pageserver_api::models::{self, RelSizeMigration};
@@ -9572,7 +9571,7 @@ mod tests {
writer.finish_write(Lsn(0x30));
drop(writer);
neon_failpoint::configure_failpoint(
fail::cfg(
"flush-layer-before-update-remote-consistent-lsn",
"return()",
)

View File

@@ -12,7 +12,6 @@ use anyhow::Context;
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use neon_failpoint as fail;
use pageserver_api::key::Key;
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
use pageserver_api::shard::{

View File

@@ -194,7 +194,6 @@ pub(crate) use download::{
};
use index::GcCompactionState;
pub(crate) use index::LayerFileMetadata;
use neon_failpoint as fail;
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;

View File

@@ -11,7 +11,6 @@ use std::time::SystemTime;
use anyhow::{Context, anyhow};
use camino::{Utf8Path, Utf8PathBuf};
use neon_failpoint as fail;
use pageserver_api::shard::TenantShardId;
use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,

View File

@@ -8,7 +8,6 @@ use anyhow::{Context, bail};
use bytes::Bytes;
use camino::Utf8Path;
use fail::fail_point;
use neon_failpoint as fail;
use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
use tokio::fs::{self, File};

View File

@@ -39,7 +39,6 @@ use layer_manager::{
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
Shutdown,
};
use neon_failpoint as fail;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
@@ -5184,9 +5183,7 @@ impl Timeline {
*self.applied_gc_cutoff_lsn.read(),
);
neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x: Option<
String,
>| bail!(
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
"{}",
x.unwrap()
));

View File

@@ -26,7 +26,6 @@ use enumset::EnumSet;
use fail::fail_point;
use futures::FutureExt;
use itertools::Itertools;
use neon_failpoint as fail;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
use pageserver_api::key::{KEY_SIZE, Key};

View File

@@ -2,7 +2,6 @@ use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use anyhow::Context;
use neon_failpoint as fail;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;

View File

@@ -4,7 +4,6 @@ use std::sync::Arc;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
use neon_failpoint as fail;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::DetachBehavior;
@@ -1114,7 +1113,7 @@ pub(super) async fn detach_and_reparent(
// others will fail as if those timelines had been stopped for whatever reason.
#[cfg(feature = "testing")]
let failpoint_sem = || -> Option<Arc<Semaphore>> {
fail::fail_point_sync!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
Arc::new(Semaphore::new(1))
));
None

View File

@@ -5,7 +5,6 @@ use std::sync::Arc;
use anyhow::Context;
use camino::Utf8PathBuf;
use neon_failpoint as fail;
use tracing::{error, info, info_span};
use utils::fs_ext;
use utils::id::TimelineId;

View File

@@ -11,7 +11,6 @@ use bytes::BytesMut;
use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use neon_failpoint as fail;
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::WAL_SEGMENT_SIZE;

35
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -2326,6 +2326,25 @@ files = [
{file = "propcache-0.2.0.tar.gz", hash = "sha256:df81779732feb9d01e5d513fad0122efb3d53bbc75f61b2a4f29a020bc985e70"},
]
[[package]]
name = "protobuf"
version = "6.31.1"
description = ""
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9"},
{file = "protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447"},
{file = "protobuf-6.31.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:6f1227473dc43d44ed644425268eb7c2e488ae245d51c6866d19fe158e207402"},
{file = "protobuf-6.31.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:a40fc12b84c154884d7d4c4ebd675d5b3b5283e155f324049ae396b95ddebc39"},
{file = "protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4ee898bf66f7a8b0bd21bce523814e6fbd8c6add948045ce958b73af7e8878c6"},
{file = "protobuf-6.31.1-cp39-cp39-win32.whl", hash = "sha256:0414e3aa5a5f3ff423828e1e6a6e907d6c65c1d5b7e6e975793d5590bdeecc16"},
{file = "protobuf-6.31.1-cp39-cp39-win_amd64.whl", hash = "sha256:8764cf4587791e7564051b35524b72844f845ad0bb011704c3736cce762d8fe9"},
{file = "protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e"},
{file = "protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a"},
]
[[package]]
name = "psutil"
version = "5.9.4"
@@ -3309,6 +3328,18 @@ files = [
[package.dependencies]
cryptography = "*"
[[package]]
name = "types-protobuf"
version = "6.30.2.20250516"
description = "Typing stubs for protobuf"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722"},
{file = "types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41"},
]
[[package]]
name = "types-psutil"
version = "5.9.5.12"
@@ -3847,4 +3878,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"
content-hash = "7cc735f57c2760db6c994575a98d4f0e2670497ad9e909135a3bc67d479f5edf"

View File

@@ -10,6 +10,8 @@ psycopg2-binary = "^2.9.10"
typing-extensions = "^4.12.2"
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
requests = "^2.32.4"
protobuf = "^6.31.1"
types-protobuf="^6.30.2"
pytest-xdist = "^3.3.1"
asyncpg = "^0.30.0"
aiopg = "^1.4.0"
@@ -63,6 +65,7 @@ build-backend = "poetry.core.masonry.api"
exclude = [
"^vendor/",
"^target/",
"test_runner/regress/data/profile_pb2.py",
"test_runner/performance/pgvector/loaddata.py",
]
check_untyped_defs = true
@@ -94,6 +97,7 @@ target-version = "py311"
extend-exclude = [
"vendor/",
"target/",
"test_runner/regress/data/profile_pb2.py",
"test_runner/stubs/", # Autogenerated by mypy's stubgen
]
line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter

View File

@@ -6,7 +6,9 @@ license.workspace = true
[features]
default = []
testing = ["neon_failpoint/testing"]
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
benchmarking = []
[dependencies]
@@ -19,7 +21,7 @@ camino-tempfile.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
crc32c.workspace = true
neon_failpoint.workspace = true
fail.workspace = true
hex.workspace = true
humantime.workspace = true
http.workspace = true

View File

@@ -65,7 +65,7 @@ const FEATURES: &[&str] = &[
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
neon_failpoint::has_failpoints(),
fail::has_failpoints(),
FEATURES,
)
}

View File

@@ -717,7 +717,7 @@ pub fn make_router(
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| request_span(r, status_handler))
.post("/v1/failpoints", |r| {
.put("/v1/failpoints", |r| {
request_span(r, move |r| async {
check_permission(&r, None)?;
let cancel = CancellationToken::new();

View File

@@ -872,15 +872,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = self.end_watch.get();
let have_something_to_send = async {
neon_failpoint::fail_point!(
let have_something_to_send = (|| {
fail::fail_point!(
"sk-pause-send",
self.appname.as_deref() != Some("pageserver"),
|_| { false }
);
self.end_pos > self.start_pos
}
.await;
})();
if have_something_to_send {
trace!("got end_pos {:?}, streaming", self.end_pos);
@@ -932,15 +931,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
let fp = async {
neon_failpoint::fail_point!(
let fp = (|| {
fail::fail_point!(
"sk-pause-send",
self.appname.as_deref() != Some("pageserver"),
|_| { true }
);
false
}
.await;
})();
if fp {
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
return Ok(None);

View File

@@ -657,7 +657,7 @@ pub async fn delete_timeline(
pausable_failpoint!("sk-delete-timeline-remote-pause");
neon_failpoint::fail_point!("sk-delete-timeline-remote", |_| {
fail::fail_point!("sk-delete-timeline-remote", |_| {
Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote"))
});

View File

@@ -301,7 +301,7 @@ impl PhysicalStorage {
format!("Failed to open tmp wal file {:?}", &tmp_path)
})?;
neon_failpoint::fail_point!("sk-zero-segment", |_| {
fail::fail_point!("sk-zero-segment", |_| {
info!("sk-zero-segment failpoint hit");
Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
});

View File

@@ -22,7 +22,7 @@ clap.workspace = true
clashmap.workspace = true
compute_api.workspace = true
cron.workspace = true
neon_failpoint.workspace = true
fail.workspace = true
futures.workspace = true
governor.workspace = true
hex.workspace = true

View File

@@ -25,7 +25,6 @@ use futures::stream::FuturesUnordered;
use http_utils::error::ApiError;
use hyper::Uri;
use itertools::Itertools;
use neon_failpoint as fail;
use pageserver_api::config::PostHogConfig;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
@@ -6027,7 +6026,7 @@ impl Service {
tenant_id: TenantId,
split_req: TenantShardSplitRequest,
) -> Result<ShardSplitAction, ApiError> {
fail::fail_point_sync!("shard-split-validation", |_| Err(ApiError::BadRequest(
fail::fail_point!("shard-split-validation", |_| Err(ApiError::BadRequest(
anyhow::anyhow!("failpoint")
)));

View File

@@ -115,6 +115,39 @@ class EndpointHttpClient(requests.Session):
json: dict[str, str] = res.json()
return json
def start_profiling_cpu(
self, sampling_frequency: int, timeout_seconds: int, archive: bool = False
) -> tuple[int, bytes]:
url = f"http://localhost:{self.external_port}/profile/cpu"
params = {
"profiler": {"BccProfile": None},
"sampling_frequency": sampling_frequency,
"timeout_seconds": timeout_seconds,
"archive": archive,
}
res = self.post(
url,
json=params,
auth=self.auth,
)
return res.status_code, res.content
def stop_profiling_cpu(self) -> int:
url = f"http://localhost:{self.external_port}/profile/cpu"
res = self.delete(url, auth=self.auth)
return res.status_code
def get_profiling_cpu_status(self) -> bool:
"""
Returns True if CPU profiling is currently running, False otherwise.
"""
url = f"http://localhost:{self.external_port}/profile/cpu"
res = self.get(url, auth=self.auth)
res.raise_for_status()
return res.status_code == 200
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",
@@ -159,59 +192,16 @@ class EndpointHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def configure_failpoints(
self, *args: tuple[str, str] | list[dict[str, str | dict[str, str]]]
) -> None:
"""Configure failpoints for testing purposes.
def configure_failpoints(self, *args: tuple[str, str]) -> None:
body: list[dict[str, str]] = []
Args:
*args: Can be one of:
- Variable number of (name, actions) tuples
- Single list of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoints
client.configure_failpoints(("test_fp", "return(error)"))
client.configure_failpoints(("fp1", "return"), ("fp2", "sleep(1000)"))
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
request_body: list[dict[str, Any]] = []
if (
len(args) == 1
and isinstance(args[0], list)
and args[0]
and isinstance(args[0][0], dict)
):
# Handle list of dicts (context-based failpoints)
failpoint_configs = args[0]
for config in failpoint_configs:
server_config: dict[str, Any] = {
"name": config["name"],
"actions": config["actions"],
for fp in args:
body.append(
{
"name": fp[0],
"action": fp[1],
}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
request_body.append(server_config)
else:
# Handle tuples (basic failpoints)
for fp in args:
request_body.append(
{
"name": fp[0],
"actions": fp[1],
}
)
)
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=request_body)
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
res.raise_for_status()

View File

@@ -2614,68 +2614,22 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return res.json()
def configure_failpoints(
self,
config_strings: tuple[str, str]
| list[tuple[str, str]]
| list[dict[str, str | dict[str, str]]],
):
"""
Configure failpoints for testing purposes.
Args:
config_strings: Can be one of:
- Single tuple of (name, actions)
- List of tuples [(name, actions), ...]
- List of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoint
client.configure_failpoints(("test_fp", "return(error)"))
# Multiple basic failpoints
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
# Handle single tuple case
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
if isinstance(config_strings, tuple):
config_strings = [config_strings]
pairs = [config_strings]
else:
pairs = config_strings
# Convert to server format
body: list[dict[str, str | dict[str, str]]] = []
for config in config_strings:
if isinstance(config, tuple):
# Simple (name, actions) tuple
body.append({"name": config[0], "actions": config[1]})
elif isinstance(config, dict):
# Dict with name, actions, and optional context_matchers
server_config: dict[str, str | dict[str, str]] = {
"name": config["name"],
"actions": config["actions"],
}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
body.append(server_config)
else:
raise ValueError(f"Invalid config format: {config}")
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.request(
"PUT",
f"{self.api_root()}/debug/v1/failpoints",
json=body,
f"{self.api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
headers=self.headers(TokenScope.ADMIN),
)
if res.status_code != 200:
self.raise_api_exception(res)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
def get_tenants_placement(self) -> defaultdict[str, dict[str, Any]]:
"""

View File

@@ -309,64 +309,25 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(
self,
config_strings: tuple[str, str]
| list[tuple[str, str]]
| list[dict[str, str | dict[str, str]]],
):
"""
Configure failpoints for testing purposes.
Args:
config_strings: Can be one of:
- Single tuple of (name, actions)
- List of tuples [(name, actions), ...]
- List of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoint
client.configure_failpoints(("test_fp", "return(error)"))
# Multiple basic failpoints
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
self.is_testing_enabled_or_skip()
# Handle single tuple case
if isinstance(config_strings, tuple):
config_strings = [config_strings]
pairs = [config_strings]
else:
pairs = config_strings
# Convert to server format
body: list[dict[str, str | dict[str, str]]] = []
for config in config_strings:
if isinstance(config, tuple):
# Simple (name, actions) tuple
body.append({"name": config[0], "actions": config[1]})
elif isinstance(config, dict):
# Dict with name, actions, and optional context_matchers
server_config = {"name": config["name"], "actions": config["actions"]}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
body.append(server_config)
else:
raise ValueError(f"Invalid config format: {config}")
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.post(f"{self.base_url}/v1/failpoints", json=body)
if res.status_code != 200:
raise PageserverApiException(
f"Failed to configure failpoints: {res.text}", res.status_code
)
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def reload_auth_validation_keys(self):
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")

View File

@@ -8,6 +8,7 @@ import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.utils import EnhancedJSONEncoder, wait_until
@@ -154,62 +155,25 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(
self,
config_strings: tuple[str, str]
| list[tuple[str, str]]
| list[dict[str, str | dict[str, str]]],
):
"""
Configure failpoints for testing purposes.
Args:
config_strings: Can be one of:
- Single tuple of (name, actions)
- List of tuples [(name, actions), ...]
- List of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoint
client.configure_failpoints(("test_fp", "return(error)"))
# Multiple basic failpoints
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
self.is_testing_enabled_or_skip()
# Handle single tuple case
if isinstance(config_strings, tuple):
config_strings = [config_strings]
pairs = [config_strings]
else:
pairs = config_strings
# Convert to server format
body: list[dict[str, str | dict[str, str]]] = []
for config in config_strings:
if isinstance(config, tuple):
# Simple (name, actions) tuple
body.append({"name": config[0], "actions": config[1]})
elif isinstance(config, dict):
# Dict with name, actions, and optional context_matchers
server_config = {"name": config["name"], "actions": config["actions"]}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
body.append(server_config)
else:
raise ValueError(f"Invalid config format: {config}")
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.post(f"http://localhost:{self.port}/v1/failpoints", json=body)
if res.status_code != 200:
raise RuntimeError(f"Failed to configure failpoints: {res.text}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
def tenant_delete_force(self, tenant_id: TenantId) -> dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")

View File

@@ -6,6 +6,7 @@ import json
import os
import re
import subprocess
import sys
import tarfile
import threading
import time
@@ -717,6 +718,33 @@ def skip_in_debug_build(reason: str):
)
def run_only_on_linux_kernel_higher_than(version: float, reason: str):
"""
Skip tests if the Linux kernel version is lower than the specified version.
The version is specified as a float, e.g. 5.4 for kernel.
Also skips if the host is not Linux.
"""
should_skip = False
if sys.platform != "linux":
should_skip = True
else:
try:
kernel_version_list = os.uname()[2].split("-")[0].split(".")
kernel_version = float(f"{kernel_version_list[0]}.{kernel_version_list[1]}")
if kernel_version < version:
should_skip = True
except ValueError:
log.error(f"Failed to parse kernel version: {os.uname()[2]}")
should_skip = True
return pytest.mark.skipif(
should_skip,
reason=reason,
)
def skip_on_ci(reason: str):
# `CI` variable is always set to `true` on GitHub
# Ref: https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/store-information-in-variables#default-environment-variables

View File

View File

@@ -0,0 +1,41 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# type: ignore
# source: profile.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rprofile.proto\x12\x12perftools.profiles\"\xd5\x03\n\x07Profile\x12\x32\n\x0bsample_type\x18\x01 \x03(\x0b\x32\x1d.perftools.profiles.ValueType\x12*\n\x06sample\x18\x02 \x03(\x0b\x32\x1a.perftools.profiles.Sample\x12,\n\x07mapping\x18\x03 \x03(\x0b\x32\x1b.perftools.profiles.Mapping\x12.\n\x08location\x18\x04 \x03(\x0b\x32\x1c.perftools.profiles.Location\x12.\n\x08\x66unction\x18\x05 \x03(\x0b\x32\x1c.perftools.profiles.Function\x12\x14\n\x0cstring_table\x18\x06 \x03(\t\x12\x13\n\x0b\x64rop_frames\x18\x07 \x01(\x03\x12\x13\n\x0bkeep_frames\x18\x08 \x01(\x03\x12\x12\n\ntime_nanos\x18\t \x01(\x03\x12\x16\n\x0e\x64uration_nanos\x18\n \x01(\x03\x12\x32\n\x0bperiod_type\x18\x0b \x01(\x0b\x32\x1d.perftools.profiles.ValueType\x12\x0e\n\x06period\x18\x0c \x01(\x03\x12\x0f\n\x07\x63omment\x18\r \x03(\x03\x12\x1b\n\x13\x64\x65\x66\x61ult_sample_type\x18\x0e \x01(\x03\"\'\n\tValueType\x12\x0c\n\x04type\x18\x01 \x01(\x03\x12\x0c\n\x04unit\x18\x02 \x01(\x03\"V\n\x06Sample\x12\x13\n\x0blocation_id\x18\x01 \x03(\x04\x12\r\n\x05value\x18\x02 \x03(\x03\x12(\n\x05label\x18\x03 \x03(\x0b\x32\x19.perftools.profiles.Label\"@\n\x05Label\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\x0b\n\x03str\x18\x02 \x01(\x03\x12\x0b\n\x03num\x18\x03 \x01(\x03\x12\x10\n\x08num_unit\x18\x04 \x01(\x03\"\xdd\x01\n\x07Mapping\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x14\n\x0cmemory_start\x18\x02 \x01(\x04\x12\x14\n\x0cmemory_limit\x18\x03 \x01(\x04\x12\x13\n\x0b\x66ile_offset\x18\x04 \x01(\x04\x12\x10\n\x08\x66ilename\x18\x05 \x01(\x03\x12\x10\n\x08\x62uild_id\x18\x06 \x01(\x03\x12\x15\n\rhas_functions\x18\x07 \x01(\x08\x12\x15\n\rhas_filenames\x18\x08 \x01(\x08\x12\x18\n\x10has_line_numbers\x18\t \x01(\x08\x12\x19\n\x11has_inline_frames\x18\n \x01(\x08\"v\n\x08Location\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x12\n\nmapping_id\x18\x02 \x01(\x04\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\x04\x12&\n\x04line\x18\x04 \x03(\x0b\x32\x18.perftools.profiles.Line\x12\x11\n\tis_folded\x18\x05 \x01(\x08\")\n\x04Line\x12\x13\n\x0b\x66unction_id\x18\x01 \x01(\x04\x12\x0c\n\x04line\x18\x02 \x01(\x03\"_\n\x08\x46unction\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\x03\x12\x13\n\x0bsystem_name\x18\x03 \x01(\x03\x12\x10\n\x08\x66ilename\x18\x04 \x01(\x03\x12\x12\n\nstart_line\x18\x05 \x01(\x03\x42-\n\x1d\x63om.google.perftools.profilesB\x0cProfileProtob\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'profile_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\035com.google.perftools.profilesB\014ProfileProto'
_PROFILE._serialized_start=38
_PROFILE._serialized_end=507
_VALUETYPE._serialized_start=509
_VALUETYPE._serialized_end=548
_SAMPLE._serialized_start=550
_SAMPLE._serialized_end=636
_LABEL._serialized_start=638
_LABEL._serialized_end=702
_MAPPING._serialized_start=705
_MAPPING._serialized_end=926
_LOCATION._serialized_start=928
_LOCATION._serialized_end=1046
_LINE._serialized_start=1048
_LINE._serialized_end=1089
_FUNCTION._serialized_start=1091
_FUNCTION._serialized_end=1186
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,322 @@
from __future__ import annotations
import gzip
import io
import threading
import time
from typing import TYPE_CHECKING, Any
import pytest
from data.profile_pb2 import Profile # type: ignore
from fixtures.log_helper import log
from fixtures.utils import run_only_on_default_postgres, run_only_on_linux_kernel_higher_than
from google.protobuf.message import Message
from requests import HTTPError
if TYPE_CHECKING:
from fixtures.endpoint.http import EndpointHttpClient
from fixtures.neon_fixtures import NeonEnv
LINUX_VERSION_REQUIRED = 6.0
PG_REASON = "test doesn't use postgres in a manner that requires any specific version"
LINUX_REASON = f"test requires linux {LINUX_VERSION_REQUIRED} for ebpfs"
def _start_profiling_cpu(
client: EndpointHttpClient, event: threading.Event | None, archive: bool = False
) -> Profile | None:
"""
Start CPU profiling for the compute node.
"""
log.info("Starting CPU profiling...")
if event is not None:
event.set()
status, response = client.start_profiling_cpu(100, 30, archive=archive)
match status:
case 200:
log.debug("CPU profiling finished")
profile: Any = Profile()
if archive:
log.debug("Unpacking gzipped profile data")
with gzip.GzipFile(fileobj=io.BytesIO(response)) as f:
response = f.read()
else:
log.debug("Unpacking non-gzipped profile data")
Message.ParseFromString(profile, response)
return profile
case 204:
log.debug("CPU profiling was stopped")
raise HTTPError("Failed to finish CPU profiling: was stopped.")
case 409:
log.debug("CPU profiling is already in progress, cannot start again")
raise HTTPError("Failed to finish CPU profiling: profiling is already in progress.")
case 500:
response_str = response.decode("utf-8", errors="replace")
log.debug(
f"Failed to finish CPU profiling, was stopped or got an error: {status}: {response_str}"
)
raise HTTPError(f"Failed to finish CPU profiling, {status}: {response_str}")
case _:
log.error(f"Failed to start CPU profiling: {status}")
raise HTTPError(f"Failed to start CPU profiling: {status}")
def _stop_profiling_cpu(client: EndpointHttpClient, event: threading.Event | None):
"""
Stop CPU profiling for the compute node.
"""
log.info("Manually stopping CPU profiling...")
if event is not None:
event.set()
status = client.stop_profiling_cpu()
match status:
case 200:
log.debug("CPU profiling stopped successfully")
case 412:
log.debug("CPU profiling is not running, nothing to do")
case _:
log.error(f"Failed to stop CPU profiling: {status}")
raise HTTPError(f"Failed to stop CPU profiling: {status}")
return status
def _wait_till_profiling_starts(
http_client: EndpointHttpClient,
event: threading.Event | None,
repeat_delay_secs: float = 0.3,
timeout: int = 60,
) -> bool:
"""
Wait until CPU profiling starts.
"""
log.info("Waiting for CPU profiling to start...")
end_time = time.time() + timeout
while not http_client.get_profiling_cpu_status():
time.sleep(repeat_delay_secs)
if end_time <= time.time():
log.error("Timeout while waiting for CPU profiling to start")
return False
log.info("CPU profiling has started successfully and is in progress")
if event is not None:
event.set()
return True
def _wait_and_assert_cpu_profiling(http_client: EndpointHttpClient, event: threading.Event | None):
profile = _start_profiling_cpu(http_client, event)
if profile is None:
log.error("The received profiling data is malformed or empty.")
return
assert len(profile.sample) > 0, "No samples found in CPU profiling data"
assert len(profile.mapping) > 0, "No mappings found in CPU profiling data"
assert len(profile.location) > 0, "No locations found in CPU profiling data"
assert len(profile.function) > 0, "No functions found in CPU profiling data"
assert len(profile.string_table) > 0, "No string tables found in CPU profiling data"
strings = [
"PostgresMain",
"ServerLoop",
"BackgroundWorkerMain",
"pq_recvbuf",
"pq_getbyte",
]
assert any(s in profile.string_table for s in strings), (
f"Expected at least one of {strings} in string table, but none found"
)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_with_timeout(neon_simple_env: NeonEnv):
"""
Test that CPU profiling works correctly with timeout.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create database profiling_test2")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling_local():
_wait_and_assert_cpu_profiling(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
thread.start()
inserting_should_stop_event = threading.Event()
def insert_rows():
lfc_conn = endpoint.connect(dbname="profiling_test2")
lfc_cur = lfc_conn.cursor()
n_records = 0
lfc_cur.execute(
"create table t(pk integer primary key, payload text default repeat('?', 128))"
)
batch_size = 1000
log.info("Inserting rows")
while not inserting_should_stop_event.is_set():
n_records += batch_size
lfc_cur.execute(
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
)
log.info(f"Inserted {n_records} rows")
thread2 = threading.Thread(target=insert_rows)
assert _wait_till_profiling_starts(http_client, None)
time.sleep(4) # Give some time for the profiling to start
thread2.start()
thread.join(timeout=60)
inserting_should_stop_event.set() # Stop the insertion thread
thread2.join(timeout=60)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_with_archiving_the_response(neon_simple_env: NeonEnv):
"""
Test that CPU profiling works correctly with archiving the data.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
assert _start_profiling_cpu(http_client, None, archive=True) is not None, (
"Failed to start and finish CPU profiling with archiving enabled"
)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_start_and_stop(neon_simple_env: NeonEnv):
"""
Test that CPU profiling can be started and stopped correctly.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling():
# Should raise as the profiling will be stopped.
with pytest.raises(HTTPError) as _:
_start_profiling_cpu(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling)
thread.start()
assert _wait_till_profiling_starts(http_client, None)
_stop_profiling_cpu(http_client, None)
# Should raise as the profiling is already stopped.
assert _stop_profiling_cpu(http_client, None) == 412
thread.join(timeout=60)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_conflict(neon_simple_env: NeonEnv):
"""
Test that CPU profiling can be started once and the second time
it will throw an error as it is already running.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create database profiling_test")
http_client = endpoint.http_client()
def _wait_and_assert_cpu_profiling_local():
_wait_and_assert_cpu_profiling(http_client, None)
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
thread.start()
assert _wait_till_profiling_starts(http_client, None)
inserting_should_stop_event = threading.Event()
def insert_rows():
lfc_conn = endpoint.connect(dbname="profiling_test")
lfc_cur = lfc_conn.cursor()
n_records = 0
lfc_cur.execute(
"create table t(pk integer primary key, payload text default repeat('?', 128))"
)
batch_size = 1000
log.info("Inserting rows")
while not inserting_should_stop_event.is_set():
n_records += batch_size
lfc_cur.execute(
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
)
log.info(f"Inserted {n_records} rows")
thread2 = threading.Thread(target=insert_rows)
thread2.start()
# Should raise as the profiling is already in progress.
with pytest.raises(HTTPError) as _:
_start_profiling_cpu(http_client, None)
inserting_should_stop_event.set() # Stop the insertion thread
# The profiling should still be running and finish normally.
thread.join(timeout=600)
thread2.join(timeout=600)
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_stop_when_not_running(neon_simple_env: NeonEnv):
"""
Test that CPU profiling throws the expected error when is attempted
to be stopped when it hasn't be running.
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
for _ in range(3):
status = _stop_profiling_cpu(http_client, None)
assert status == 412
@run_only_on_default_postgres(reason=PG_REASON)
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
def test_compute_profiling_cpu_start_arguments_validation_works(neon_simple_env: NeonEnv):
"""
Test that CPU profiling start request properly validated the
arguments and throws the expected error (bad request).
"""
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
http_client = endpoint.http_client()
for sampling_frequency in [-1, 0, 1000000]:
status, _ = http_client.start_profiling_cpu(sampling_frequency, 5)
assert status == 422
for timeout_seconds in [-1, 0, 1000000]:
status, _ = http_client.start_profiling_cpu(5, timeout_seconds)
assert status == 422

View File

@@ -28,6 +28,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
const-oid = { version = "0.9", default-features = false, features = ["db", "std"] }
criterion = { version = "0.5", features = ["html_reports"] }
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
der = { version = "0.7", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
@@ -54,7 +55,8 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
indexmap = { version = "2", features = ["serde"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.12" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
@@ -70,7 +72,6 @@ num-integer = { version = "0.1", features = ["i128"] }
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
p256 = { version = "0.13", features = ["jwk"] }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
@@ -89,6 +90,7 @@ serde_json = { version = "1", features = ["alloc", "raw_value"] }
sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
spin = { version = "0.9" }
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
stable_deref_trait = { version = "1" }
subtle = { version = "2" }
@@ -101,6 +103,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tonic = { version = "0.12", default-features = false, features = ["codegen", "prost", "transport"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
@@ -120,13 +123,15 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
clang-sys = { version = "1", default-features = false, features = ["clang_11_0", "runtime"] }
clap = { version = "4", features = ["derive", "env", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", features = ["raw"] }
indexmap = { version = "2", features = ["serde"] }
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
itertools = { version = "0.12" }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -139,7 +144,6 @@ num-integer = { version = "0.1", features = ["i128"] }
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] }
proc-macro2 = { version = "1" }