mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 07:10:37 +00:00
Compare commits
11 Commits
problame/f
...
iddm/postg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e191daa152 | ||
|
|
7572ffc725 | ||
|
|
0aacbc2583 | ||
|
|
cb9874dc4e | ||
|
|
891c1fe512 | ||
|
|
9cae494555 | ||
|
|
c3e6d360b5 | ||
|
|
9e69e24a52 | ||
|
|
8dbf5a8c5b | ||
|
|
463429af97 | ||
|
|
d8b0c0834e |
356
Cargo.lock
generated
356
Cargo.lock
generated
@@ -687,13 +687,40 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.7.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core 0.4.5",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"itoa",
|
||||
"matchit 0.7.3",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"serde",
|
||||
"sync_wrapper 1.0.1",
|
||||
"tower 0.5.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
|
||||
dependencies = [
|
||||
"axum-core",
|
||||
"axum-core 0.5.0",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"form_urlencoded",
|
||||
@@ -704,7 +731,7 @@ dependencies = [
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"itoa",
|
||||
"matchit",
|
||||
"matchit 0.8.4",
|
||||
"memchr",
|
||||
"mime",
|
||||
"percent-encoding",
|
||||
@@ -724,6 +751,26 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.4.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"rustversion",
|
||||
"sync_wrapper 1.0.1",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-core"
|
||||
version = "0.5.0"
|
||||
@@ -750,8 +797,8 @@ version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "460fc6f625a1f7705c6cf62d0d070794e94668988b1c38111baeec177c715f7b"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"axum-core",
|
||||
"axum 0.8.1",
|
||||
"axum-core 0.5.0",
|
||||
"bytes",
|
||||
"form_urlencoded",
|
||||
"futures-util",
|
||||
@@ -962,6 +1009,24 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.70.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"itertools 0.12.1",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash 1.1.0",
|
||||
"shlex",
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.71.1"
|
||||
@@ -1321,7 +1386,7 @@ dependencies = [
|
||||
"aws-sdk-kms",
|
||||
"aws-sdk-s3",
|
||||
"aws-smithy-types",
|
||||
"axum",
|
||||
"axum 0.8.1",
|
||||
"axum-extra",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
@@ -1330,15 +1395,17 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"compute_api",
|
||||
"fail",
|
||||
"flate2",
|
||||
"futures",
|
||||
"hostname-validator",
|
||||
"http 1.1.0",
|
||||
"indexmap 2.9.0",
|
||||
"inferno 0.12.0",
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"libproc",
|
||||
"metrics",
|
||||
"neon_failpoint",
|
||||
"nix 0.30.1",
|
||||
"notify",
|
||||
"num_cpus",
|
||||
@@ -1351,6 +1418,8 @@ dependencies = [
|
||||
"postgres-types",
|
||||
"postgres_initdb",
|
||||
"postgres_versioninfo",
|
||||
"pprof 0.15.0",
|
||||
"prost 0.12.6",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
@@ -1362,6 +1431,7 @@ dependencies = [
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"tar",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
@@ -2082,7 +2152,7 @@ name = "endpoint_storage"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum 0.8.1",
|
||||
"axum-extra",
|
||||
"camino",
|
||||
"camino-tempfile",
|
||||
@@ -2198,12 +2268,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
|
||||
|
||||
[[package]]
|
||||
name = "errno"
|
||||
version = "0.3.8"
|
||||
version = "0.3.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
|
||||
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2533,6 +2603,18 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"r-efi",
|
||||
"wasi 0.14.2+wasi-0.2.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gettid"
|
||||
version = "0.1.3"
|
||||
@@ -2790,6 +2872,15 @@ dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "home"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hostname"
|
||||
version = "0.4.0"
|
||||
@@ -2891,15 +2982,15 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"bytes",
|
||||
"camino",
|
||||
"fail",
|
||||
"futures",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"jemalloc_pprof",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"neon_failpoint",
|
||||
"once_cell",
|
||||
"pprof",
|
||||
"pprof 0.14.0",
|
||||
"regex",
|
||||
"routerify",
|
||||
"rustls 0.23.27",
|
||||
@@ -3561,7 +3652,7 @@ version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
|
||||
dependencies = [
|
||||
"spin",
|
||||
"spin 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3586,6 +3677,17 @@ version = "0.2.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
|
||||
|
||||
[[package]]
|
||||
name = "libproc"
|
||||
version = "0.14.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e78a09b56be5adbcad5aa1197371688dc6bb249a26da3bca2011ee2fb987ebfb"
|
||||
dependencies = [
|
||||
"bindgen 0.70.1",
|
||||
"errno",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.4.14"
|
||||
@@ -3598,6 +3700,12 @@ version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f0b5399f6804fbab912acbd8878ed3532d506b7c951b8f9f164ef90fef39e3f4"
|
||||
|
||||
[[package]]
|
||||
name = "linux-raw-sys"
|
||||
version = "0.9.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
|
||||
|
||||
[[package]]
|
||||
name = "litemap"
|
||||
version = "0.7.4"
|
||||
@@ -3651,6 +3759,12 @@ dependencies = [
|
||||
"regex-automata 0.1.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.8.4"
|
||||
@@ -3852,23 +3966,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "neon_failpoint"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"either",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
"serde",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "never-say-never"
|
||||
version = "6.6.666"
|
||||
@@ -4374,6 +4471,7 @@ dependencies = [
|
||||
"either",
|
||||
"enum-map",
|
||||
"enumset",
|
||||
"fail",
|
||||
"futures",
|
||||
"hashlink",
|
||||
"hex",
|
||||
@@ -4388,7 +4486,6 @@ dependencies = [
|
||||
"jsonwebtoken",
|
||||
"md5",
|
||||
"metrics",
|
||||
"neon_failpoint",
|
||||
"nix 0.30.1",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
@@ -4407,7 +4504,7 @@ dependencies = [
|
||||
"postgres_ffi_types",
|
||||
"postgres_initdb",
|
||||
"posthog_client_lite",
|
||||
"pprof",
|
||||
"pprof 0.14.0",
|
||||
"pq_proto",
|
||||
"procfs",
|
||||
"rand 0.8.5",
|
||||
@@ -4982,7 +5079,7 @@ name = "postgres_ffi"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bindgen",
|
||||
"bindgen 0.71.1",
|
||||
"bytes",
|
||||
"crc32c",
|
||||
"criterion",
|
||||
@@ -4993,7 +5090,7 @@ dependencies = [
|
||||
"postgres",
|
||||
"postgres_ffi_types",
|
||||
"postgres_versioninfo",
|
||||
"pprof",
|
||||
"pprof 0.14.0",
|
||||
"regex",
|
||||
"serde",
|
||||
"thiserror 1.0.69",
|
||||
@@ -5083,6 +5180,30 @@ dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.15.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187"
|
||||
dependencies = [
|
||||
"aligned-vec",
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"findshlibs",
|
||||
"inferno 0.11.21",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"protobuf",
|
||||
"protobuf-codegen",
|
||||
"smallvec",
|
||||
"spin 0.10.0",
|
||||
"symbolic-demangle",
|
||||
"tempfile",
|
||||
"thiserror 2.0.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pprof_util"
|
||||
version = "0.7.0"
|
||||
@@ -5158,7 +5279,7 @@ dependencies = [
|
||||
"hex",
|
||||
"lazy_static",
|
||||
"procfs-core",
|
||||
"rustix",
|
||||
"rustix 0.38.41",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5294,6 +5415,57 @@ dependencies = [
|
||||
"prost 0.13.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"protobuf-support",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"once_cell",
|
||||
"protobuf",
|
||||
"protobuf-parse",
|
||||
"regex",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-parse"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"indexmap 2.9.0",
|
||||
"log",
|
||||
"protobuf",
|
||||
"protobuf-support",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"which",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-support"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
|
||||
dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
@@ -5465,6 +5637,12 @@ dependencies = [
|
||||
"proc-macro2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "r-efi"
|
||||
version = "5.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
|
||||
|
||||
[[package]]
|
||||
name = "rand"
|
||||
version = "0.7.3"
|
||||
@@ -5853,7 +6031,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"getrandom 0.2.11",
|
||||
"http 1.1.0",
|
||||
"matchit",
|
||||
"matchit 0.8.4",
|
||||
"opentelemetry",
|
||||
"reqwest",
|
||||
"reqwest-middleware",
|
||||
@@ -6044,6 +6222,19 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
"errno",
|
||||
"libc",
|
||||
"linux-raw-sys 0.9.4",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.21.12"
|
||||
@@ -6208,6 +6399,7 @@ dependencies = [
|
||||
"criterion",
|
||||
"desim",
|
||||
"env_logger",
|
||||
"fail",
|
||||
"futures",
|
||||
"hex",
|
||||
"http 1.1.0",
|
||||
@@ -6217,7 +6409,6 @@ dependencies = [
|
||||
"itertools 0.10.5",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"neon_failpoint",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
@@ -6226,7 +6417,7 @@ dependencies = [
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"postgres_versioninfo",
|
||||
"pprof",
|
||||
"pprof 0.14.0",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -6814,6 +7005,18 @@ name = "spin"
|
||||
version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spin"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spinning_top"
|
||||
@@ -6904,6 +7107,7 @@ dependencies = [
|
||||
"diesel",
|
||||
"diesel-async",
|
||||
"diesel_migrations",
|
||||
"fail",
|
||||
"futures",
|
||||
"governor",
|
||||
"hex",
|
||||
@@ -6916,7 +7120,6 @@ dependencies = [
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
"neon_failpoint",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
@@ -7174,14 +7377,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.14.0"
|
||||
version = "3.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
|
||||
checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fastrand 2.2.0",
|
||||
"getrandom 0.3.3",
|
||||
"once_cell",
|
||||
"rustix",
|
||||
"rustix 1.0.7",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
@@ -7675,16 +7878,25 @@ version = "0.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"axum 0.7.9",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper-timeout",
|
||||
"hyper-util",
|
||||
"percent-encoding",
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower 0.4.13",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@@ -7697,7 +7909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum",
|
||||
"axum 0.8.1",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"flate2",
|
||||
@@ -7758,11 +7970,16 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"indexmap 1.9.3",
|
||||
"pin-project",
|
||||
"pin-project-lite",
|
||||
"rand 0.8.5",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8181,7 +8398,7 @@ dependencies = [
|
||||
"const_format",
|
||||
"criterion",
|
||||
"diatomic-waker",
|
||||
"either",
|
||||
"fail",
|
||||
"futures",
|
||||
"git-version",
|
||||
"hex",
|
||||
@@ -8189,13 +8406,12 @@ dependencies = [
|
||||
"humantime",
|
||||
"jsonwebtoken",
|
||||
"metrics",
|
||||
"neon_failpoint",
|
||||
"nix 0.30.1",
|
||||
"once_cell",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pprof",
|
||||
"pprof 0.14.0",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -8247,7 +8463,7 @@ name = "vm_monitor"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum 0.8.1",
|
||||
"cgroups-rs",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -8302,7 +8518,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"postgres_ffi_types",
|
||||
"pprof",
|
||||
"pprof 0.14.0",
|
||||
"prost 0.13.5",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
@@ -8332,7 +8548,7 @@ name = "walproposer"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bindgen",
|
||||
"bindgen 0.71.1",
|
||||
"postgres_ffi",
|
||||
"utils",
|
||||
]
|
||||
@@ -8359,6 +8575,15 @@ version = "0.11.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.14.2+wasi-0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
|
||||
dependencies = [
|
||||
"wit-bindgen-rt",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasite"
|
||||
version = "0.1.0"
|
||||
@@ -8488,6 +8713,18 @@ dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "which"
|
||||
version = "4.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
|
||||
dependencies = [
|
||||
"either",
|
||||
"home",
|
||||
"once_cell",
|
||||
"rustix 0.38.41",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "whoami"
|
||||
version = "1.5.1"
|
||||
@@ -8716,6 +8953,15 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wit-bindgen-rt"
|
||||
version = "0.39.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "workspace_hack"
|
||||
version = "0.1.0"
|
||||
@@ -8723,17 +8969,19 @@ dependencies = [
|
||||
"ahash",
|
||||
"anstream",
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-core",
|
||||
"axum 0.8.1",
|
||||
"axum-core 0.5.0",
|
||||
"base64 0.21.7",
|
||||
"base64ct",
|
||||
"bytes",
|
||||
"camino",
|
||||
"cc",
|
||||
"chrono",
|
||||
"clang-sys",
|
||||
"clap",
|
||||
"clap_builder",
|
||||
"const-oid",
|
||||
"criterion",
|
||||
"crypto-bigint 0.5.5",
|
||||
"der 0.7.8",
|
||||
"deranged",
|
||||
@@ -8760,6 +9008,7 @@ dependencies = [
|
||||
"hyper 0.14.30",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"indexmap 1.9.3",
|
||||
"indexmap 2.9.0",
|
||||
"itertools 0.12.1",
|
||||
"lazy_static",
|
||||
@@ -8776,7 +9025,6 @@ dependencies = [
|
||||
"num-iter",
|
||||
"num-rational",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"p256 0.13.2",
|
||||
"parquet",
|
||||
"prettyplease",
|
||||
@@ -8798,6 +9046,7 @@ dependencies = [
|
||||
"sha2",
|
||||
"signature 2.2.0",
|
||||
"smallvec",
|
||||
"spin 0.9.8",
|
||||
"spki 0.7.3",
|
||||
"stable_deref_trait",
|
||||
"subtle",
|
||||
@@ -8812,6 +9061,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic 0.12.3",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
|
||||
@@ -21,7 +21,6 @@ members = [
|
||||
"workspace_hack",
|
||||
"libs/compute_api",
|
||||
"libs/http-utils",
|
||||
"libs/neon_failpoint",
|
||||
"libs/pageserver_api",
|
||||
"libs/postgres_ffi",
|
||||
"libs/postgres_ffi_types",
|
||||
@@ -98,6 +97,7 @@ diatomic-waker = { version = "0.2.3" }
|
||||
either = "1.8"
|
||||
enum-map = "2.4.2"
|
||||
enumset = "1.0.12"
|
||||
fail = "0.5.0"
|
||||
fallible-iterator = "0.2"
|
||||
framed-websockets = { version = "0.1.0", git = "https://github.com/neondatabase/framed-websockets" }
|
||||
futures = "0.3"
|
||||
@@ -130,6 +130,7 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
|
||||
jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
libc = "0.2"
|
||||
libproc = "0.14"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
@@ -258,7 +259,6 @@ desim = { version = "0.1", path = "./libs/desim" }
|
||||
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
|
||||
http-utils = { version = "0.1", path = "./libs/http-utils/" }
|
||||
metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
neon_failpoint = { version = "0.1", path = "./libs/neon_failpoint/" }
|
||||
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
|
||||
pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
@@ -279,6 +279,7 @@ safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" }
|
||||
safekeeper_client = { path = "./safekeeper/client" }
|
||||
storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
|
||||
storage_controller_client = { path = "./storage_controller/client" }
|
||||
tempfile = "3"
|
||||
tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" }
|
||||
tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
|
||||
utils = { version = "0.1", path = "./libs/utils/" }
|
||||
|
||||
@@ -109,6 +109,8 @@ RUN set -e \
|
||||
libreadline-dev \
|
||||
libseccomp-dev \
|
||||
ca-certificates \
|
||||
bpfcc-tools \
|
||||
sudo \
|
||||
openssl \
|
||||
unzip \
|
||||
curl \
|
||||
|
||||
@@ -61,6 +61,9 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
libpq5 \
|
||||
libpq-dev \
|
||||
libzstd-dev \
|
||||
linux-perf \
|
||||
bpfcc-tools \
|
||||
linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \
|
||||
postgresql-16 \
|
||||
postgresql-server-dev-16 \
|
||||
postgresql-common \
|
||||
@@ -105,15 +108,21 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
|
||||
#
|
||||
# 'gdb' is included so that we get backtraces of core dumps produced in
|
||||
# regression tests
|
||||
RUN set -e \
|
||||
RUN set -ex \
|
||||
&& KERNEL_VERSION="$(uname -r | cut -d'-' -f1 | sed 's/\.0$//')" \
|
||||
&& echo KERNEL_VERSION=${KERNEL_VERSION} >> /etc/environment \
|
||||
&& KERNEL_ARCH=$(uname -m | awk '{ if ($1 ~ /^(x86_64|i[3-6]86)$/) print "x86"; else if ($1 ~ /^(aarch64|arm.*)$/) print "aarch"; else print $1 }') \
|
||||
&& echo KERNEL_ARCH=${KERNEL_ARCH} >> /etc/environment \
|
||||
&& apt update \
|
||||
&& apt install -y \
|
||||
autoconf \
|
||||
automake \
|
||||
bc \
|
||||
bison \
|
||||
build-essential \
|
||||
ca-certificates \
|
||||
cmake \
|
||||
cpio \
|
||||
curl \
|
||||
flex \
|
||||
gdb \
|
||||
@@ -122,8 +131,10 @@ RUN set -e \
|
||||
gzip \
|
||||
jq \
|
||||
jsonnet \
|
||||
kmod \
|
||||
libcurl4-openssl-dev \
|
||||
libbz2-dev \
|
||||
libelf-dev \
|
||||
libffi-dev \
|
||||
liblzma-dev \
|
||||
libncurses5-dev \
|
||||
@@ -137,6 +148,11 @@ RUN set -e \
|
||||
libxml2-dev \
|
||||
libxmlsec1-dev \
|
||||
libxxhash-dev \
|
||||
linux-perf \
|
||||
bpfcc-tools \
|
||||
libbpfcc \
|
||||
libbpfcc-dev \
|
||||
linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \
|
||||
lsof \
|
||||
make \
|
||||
netcat-openbsd \
|
||||
@@ -144,6 +160,8 @@ RUN set -e \
|
||||
openssh-client \
|
||||
parallel \
|
||||
pkg-config \
|
||||
rsync \
|
||||
sudo \
|
||||
unzip \
|
||||
wget \
|
||||
xz-utils \
|
||||
@@ -198,6 +216,8 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /
|
||||
# Configure sudo & docker
|
||||
RUN usermod -aG sudo nonroot && \
|
||||
echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \
|
||||
mkdir -p /etc/sudoers.d && \
|
||||
echo 'nonroot ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/nonroot && \
|
||||
usermod -aG docker nonroot
|
||||
|
||||
# AWS CLI
|
||||
|
||||
@@ -149,6 +149,9 @@ RUN case $DEBIAN_VERSION in \
|
||||
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
|
||||
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
|
||||
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
|
||||
bpfcc-tools \
|
||||
libbpfcc \
|
||||
libbpfcc-dev \
|
||||
libclang-dev \
|
||||
jsonnet \
|
||||
$VERSION_INSTALLS \
|
||||
@@ -1988,6 +1991,10 @@ RUN apt update && \
|
||||
locales \
|
||||
lsof \
|
||||
procps \
|
||||
bpfcc-tools \
|
||||
libbpfcc \
|
||||
libbpfcc-dev \
|
||||
libclang-dev \
|
||||
rsyslog-gnutls \
|
||||
screen \
|
||||
tcpdump \
|
||||
|
||||
@@ -39,6 +39,14 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
- name: enable-kernel-modules
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: mkdir -p /lib/ && ln -s /neonvm/tools/lib/modules /lib/
|
||||
- name: enable-bpfs
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: mkdir -p /sys/kernel/debug && mount -t debugfs debugfs /sys/kernel/debug && mount -t bpf bpf /sys/fs/bpf && chmod 755 /sys/fs/bpf
|
||||
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
|
||||
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
|
||||
# use a different path for the socket. The symlink actually points to our custom path.
|
||||
@@ -65,7 +73,7 @@ files:
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
@@ -152,6 +160,8 @@ merge: |
|
||||
RUN set -e \
|
||||
&& chmod 0644 /etc/cgconfig.conf
|
||||
|
||||
ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf
|
||||
|
||||
|
||||
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
|
||||
RUN chmod 0666 /etc/compute_rsyslog.conf
|
||||
|
||||
@@ -39,6 +39,14 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
- name: enable-kernel-modules
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: mkdir -p /lib/ && ln -s /neonvm/tools/lib/modules /lib/
|
||||
- name: enable-bpfs
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: mkdir -p /sys/kernel/debug && mount -t debugfs debugfs /sys/kernel/debug && mount -t bpf bpf /sys/fs/bpf && chmod 755 /sys/fs/bpf
|
||||
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
|
||||
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
|
||||
# use a different path for the socket. The symlink actually points to our custom path.
|
||||
@@ -65,7 +73,7 @@ files:
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
@@ -148,6 +156,8 @@ merge: |
|
||||
RUN set -e \
|
||||
&& chmod 0644 /etc/cgconfig.conf
|
||||
|
||||
ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf
|
||||
|
||||
COPY compute_rsyslog.conf /etc/compute_rsyslog.conf
|
||||
RUN chmod 0666 /etc/compute_rsyslog.conf
|
||||
RUN mkdir /var/log/rsyslog && chown -R postgres /var/log/rsyslog
|
||||
|
||||
@@ -7,7 +7,7 @@ license.workspace = true
|
||||
[features]
|
||||
default = []
|
||||
# Enables test specific features.
|
||||
testing = ["neon_failpoint/testing"]
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
async-compression.workspace = true
|
||||
@@ -23,7 +23,7 @@ camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
clap.workspace = true
|
||||
neon_failpoint.workspace = true
|
||||
fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
@@ -31,6 +31,7 @@ hostname-validator = "1.1"
|
||||
indexmap.workspace = true
|
||||
itertools.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
libproc.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
@@ -49,6 +50,7 @@ serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
tar.workspace = true
|
||||
tempfile.workspace = true
|
||||
tower.workspace = true
|
||||
tower-http.workspace = true
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
@@ -78,3 +80,10 @@ zstd = "0.13"
|
||||
bytes = "1.0"
|
||||
rust-ini = "0.20.0"
|
||||
rlimit = "0.10.1"
|
||||
|
||||
inferno = { version = "0.12", default-features = false, features = [
|
||||
"multithreaded",
|
||||
"nameattr",
|
||||
] }
|
||||
pprof = { version = "0.15", features = ["protobuf-codec", "flamegraph"] }
|
||||
prost = "0.12"
|
||||
|
||||
@@ -154,7 +154,7 @@ impl Cli {
|
||||
fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
failpoint_support::init().unwrap();
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
// For historical reasons, the main thread that processes the config and launches postgres
|
||||
// is synchronous, but we always have this tokio runtime available and we "enter" it so
|
||||
@@ -201,6 +201,8 @@ fn main() -> Result<()> {
|
||||
|
||||
let exit_code = compute_node.run()?;
|
||||
|
||||
scenario.teardown();
|
||||
|
||||
deinit_and_exit(exit_code);
|
||||
}
|
||||
|
||||
|
||||
@@ -371,7 +371,9 @@ fn maybe_cgexec(cmd: &str) -> Command {
|
||||
}
|
||||
}
|
||||
|
||||
struct PostgresHandle {
|
||||
/// A handle to the Postgres process that is running in the compute
|
||||
/// node.
|
||||
pub struct PostgresHandle {
|
||||
postgres: std::process::Child,
|
||||
log_collector: JoinHandle<Result<()>>,
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use http::StatusCode;
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tracing::info;
|
||||
use utils::failpoint_support::apply_failpoint;
|
||||
|
||||
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
|
||||
@@ -12,16 +11,10 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
pub struct FailpointConfig {
|
||||
/// Name of the fail point
|
||||
pub name: String,
|
||||
/// List of actions to take, using the format described in neon_failpoint
|
||||
/// List of actions to take, using the format described in `fail::cfg`
|
||||
///
|
||||
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
|
||||
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
|
||||
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
|
||||
pub actions: String,
|
||||
/// Optional context matching rules for conditional failpoints
|
||||
/// Each key-value pair specifies a context key and a regex pattern to match against
|
||||
/// All context matchers must match for the failpoint to trigger
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub context_matchers: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
use crate::http::JsonResponse;
|
||||
@@ -31,7 +24,7 @@ use crate::http::extract::Json;
|
||||
pub(in crate::http) async fn configure_failpoints(
|
||||
failpoints: Json<ConfigureFailpointsRequest>,
|
||||
) -> Response {
|
||||
if !neon_failpoint::has_failpoints() {
|
||||
if !fail::has_failpoints() {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"Cannot manage failpoints because neon was compiled without failpoints support",
|
||||
@@ -39,21 +32,16 @@ pub(in crate::http) async fn configure_failpoints(
|
||||
}
|
||||
|
||||
for fp in &*failpoints {
|
||||
info!(
|
||||
"cfg failpoint: {} {} (context: {:?})",
|
||||
fp.name, fp.actions, fp.context_matchers
|
||||
);
|
||||
info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
|
||||
let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() {
|
||||
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
|
||||
} else {
|
||||
configure_failpoint(&fp.name, &fp.actions)
|
||||
};
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
|
||||
|
||||
if let Err(e) = cfg_result {
|
||||
return JsonResponse::error(
|
||||
StatusCode::BAD_REQUEST,
|
||||
format!("failed to configure failpoint '{}': {e}", fp.name),
|
||||
format!("failed to configure failpoints: {e}"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ pub(in crate::http) mod lfc;
|
||||
pub(in crate::http) mod metrics;
|
||||
pub(in crate::http) mod metrics_json;
|
||||
pub(in crate::http) mod promote;
|
||||
pub(in crate::http) mod profile;
|
||||
pub(in crate::http) mod status;
|
||||
pub(in crate::http) mod terminate;
|
||||
|
||||
|
||||
217
compute_tools/src/http/routes/profile.rs
Normal file
217
compute_tools/src/http/routes/profile.rs
Normal file
@@ -0,0 +1,217 @@
|
||||
//! Contains the route for profiling the compute.
|
||||
//!
|
||||
//! Profiling the compute means generating a pprof profile of the
|
||||
//! postgres processes.
|
||||
//!
|
||||
//! The profiling is done using the `perf` tool, which is expected to be
|
||||
//! available somewhere in `$PATH`.
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use axum::Json;
|
||||
use axum::response::IntoResponse;
|
||||
use http::StatusCode;
|
||||
use nix::unistd::Pid;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::http::JsonResponse;
|
||||
|
||||
static CANCEL_CHANNEL: Lazy<Mutex<Option<tokio::sync::broadcast::Sender<()>>>> =
|
||||
Lazy::new(|| Mutex::new(None));
|
||||
|
||||
fn default_sampling_frequency() -> u16 {
|
||||
100
|
||||
}
|
||||
|
||||
fn default_timeout_seconds() -> u8 {
|
||||
5
|
||||
}
|
||||
|
||||
fn deserialize_sampling_frequency<'de, D>(deserializer: D) -> Result<u16, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
use serde::Deserialize;
|
||||
|
||||
const MIN_SAMPLING_FREQUENCY: u16 = 1;
|
||||
const MAX_SAMPLING_FREQUENCY: u16 = 1000;
|
||||
|
||||
let value = u16::deserialize(deserializer)?;
|
||||
|
||||
if !(MIN_SAMPLING_FREQUENCY..=MAX_SAMPLING_FREQUENCY).contains(&value) {
|
||||
return Err(serde::de::Error::custom(format!(
|
||||
"sampling_frequency must be between {MIN_SAMPLING_FREQUENCY} and {MAX_SAMPLING_FREQUENCY}, got {value}"
|
||||
)));
|
||||
}
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
fn deserialize_profiling_timeout<'de, D>(deserializer: D) -> Result<u8, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
use serde::Deserialize;
|
||||
|
||||
const MIN_TIMEOUT_SECONDS: u8 = 1;
|
||||
const MAX_TIMEOUT_SECONDS: u8 = 60;
|
||||
|
||||
let value = u8::deserialize(deserializer)?;
|
||||
|
||||
if !(MIN_TIMEOUT_SECONDS..=MAX_TIMEOUT_SECONDS).contains(&value) {
|
||||
return Err(serde::de::Error::custom(format!(
|
||||
"timeout_seconds must be between {MIN_TIMEOUT_SECONDS} and {MAX_TIMEOUT_SECONDS}, got {value}"
|
||||
)));
|
||||
}
|
||||
Ok(value)
|
||||
}
|
||||
|
||||
/// Request parameters for profiling the compute.
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
pub(in crate::http) struct ProfileRequest {
|
||||
/// The profiling tool to use, currently only `perf` is supported.
|
||||
profiler: crate::profiling::ProfileGenerator,
|
||||
#[serde(default = "default_sampling_frequency")]
|
||||
#[serde(deserialize_with = "deserialize_sampling_frequency")]
|
||||
sampling_frequency: u16,
|
||||
#[serde(default = "default_timeout_seconds")]
|
||||
#[serde(deserialize_with = "deserialize_profiling_timeout")]
|
||||
timeout_seconds: u8,
|
||||
#[serde(default)]
|
||||
archive: bool,
|
||||
}
|
||||
|
||||
/// The HTTP request handler for reporting the profiling status of
|
||||
/// the compute.
|
||||
pub(in crate::http) async fn profile_status() -> impl IntoResponse {
|
||||
tracing::info!("Profile status request received.");
|
||||
|
||||
let cancel_channel = CANCEL_CHANNEL.lock().await;
|
||||
|
||||
if let Some(tx) = cancel_channel.as_ref() {
|
||||
if tx.receiver_count() > 0 {
|
||||
return JsonResponse::create_response(
|
||||
StatusCode::OK,
|
||||
"Profiling is currently in progress.",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
JsonResponse::create_response(StatusCode::NO_CONTENT, "Profiling is not in progress.")
|
||||
}
|
||||
|
||||
/// The HTTP request handler for stopping profiling the compute.
|
||||
pub(in crate::http) async fn profile_stop() -> impl IntoResponse {
|
||||
tracing::info!("Profile stop request received.");
|
||||
|
||||
match CANCEL_CHANNEL.lock().await.take() {
|
||||
Some(tx) => {
|
||||
if tx.send(()).is_err() {
|
||||
tracing::error!("Failed to send cancellation signal.");
|
||||
return JsonResponse::create_response(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Failed to send cancellation signal",
|
||||
);
|
||||
}
|
||||
JsonResponse::create_response(StatusCode::OK, "Profiling stopped successfully.")
|
||||
}
|
||||
None => JsonResponse::create_response(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"Profiling is not in progress, there is nothing to stop.",
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// The HTTP request handler for starting profiling the compute.
|
||||
pub(in crate::http) async fn profile_start(
|
||||
Json(request): Json<ProfileRequest>,
|
||||
) -> impl IntoResponse {
|
||||
tracing::info!("Profile start request received: {request:?}");
|
||||
|
||||
let tx = tokio::sync::broadcast::Sender::<()>::new(1);
|
||||
|
||||
{
|
||||
let mut cancel_channel = CANCEL_CHANNEL.lock().await;
|
||||
|
||||
if cancel_channel.is_some() {
|
||||
return JsonResponse::create_response(
|
||||
StatusCode::CONFLICT,
|
||||
"Profiling is already in progress.",
|
||||
);
|
||||
}
|
||||
*cancel_channel = Some(tx.clone());
|
||||
}
|
||||
|
||||
tracing::info!("Profiling will start with parameters: {request:?}");
|
||||
let pg_pid = Pid::from_raw(crate::compute::PG_PID.load(Ordering::SeqCst) as _);
|
||||
|
||||
let run_with_sudo = !cfg!(feature = "testing");
|
||||
|
||||
let options = crate::profiling::ProfileGenerationOptions {
|
||||
profiler: request.profiler,
|
||||
run_with_sudo,
|
||||
pids: [pg_pid].into_iter().collect(),
|
||||
follow_forks: true,
|
||||
sampling_frequency: request.sampling_frequency as u32,
|
||||
blocklist_symbols: vec![
|
||||
"libc".to_owned(),
|
||||
"libgcc".to_owned(),
|
||||
"pthread".to_owned(),
|
||||
"vdso".to_owned(),
|
||||
],
|
||||
archive: request.archive,
|
||||
};
|
||||
|
||||
let options = crate::profiling::ProfileGenerationTaskOptions {
|
||||
options,
|
||||
timeout: std::time::Duration::from_secs(request.timeout_seconds as u64),
|
||||
should_stop: Some(tx),
|
||||
};
|
||||
|
||||
let pprof_data = crate::profiling::generate_pprof_profile(options).await;
|
||||
|
||||
if CANCEL_CHANNEL.lock().await.take().is_none() {
|
||||
tracing::error!("Profiling was cancelled from another request.");
|
||||
|
||||
return JsonResponse::create_response(
|
||||
StatusCode::NO_CONTENT,
|
||||
"Profiling was cancelled from another request.",
|
||||
);
|
||||
}
|
||||
|
||||
let pprof_data = match pprof_data {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
tracing::error!(error = ?e, "failed to generate pprof data");
|
||||
return JsonResponse::create_response(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
format!("Failed to generate pprof data: {e:?}"),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!("Profiling has completed successfully.");
|
||||
|
||||
let mut headers = http::HeaderMap::new();
|
||||
|
||||
if request.archive {
|
||||
headers.insert(
|
||||
http::header::CONTENT_TYPE,
|
||||
http::HeaderValue::from_static("application/gzip"),
|
||||
);
|
||||
headers.insert(
|
||||
http::header::CONTENT_DISPOSITION,
|
||||
http::HeaderValue::from_static("attachment; filename=\"profile.pb.gz\""),
|
||||
);
|
||||
} else {
|
||||
headers.insert(
|
||||
http::header::CONTENT_TYPE,
|
||||
http::HeaderValue::from_static("application/octet-stream"),
|
||||
);
|
||||
headers.insert(
|
||||
http::header::CONTENT_DISPOSITION,
|
||||
http::HeaderValue::from_static("attachment; filename=\"profile.pb\""),
|
||||
);
|
||||
}
|
||||
|
||||
(headers, pprof_data.0).into_response()
|
||||
}
|
||||
@@ -27,6 +27,7 @@ use super::{
|
||||
},
|
||||
};
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::http::routes::profile;
|
||||
|
||||
/// `compute_ctl` has two servers: internal and external. The internal server
|
||||
/// binds to the loopback interface and handles communication from clients on
|
||||
@@ -81,8 +82,14 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
Server::External {
|
||||
config, compute_id, ..
|
||||
} => {
|
||||
let unauthenticated_router =
|
||||
Router::<Arc<ComputeNode>>::new().route("/metrics", get(metrics::get_metrics));
|
||||
let unauthenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/metrics", get(metrics::get_metrics))
|
||||
.route(
|
||||
"/profile/cpu",
|
||||
get(profile::profile_status)
|
||||
.post(profile::profile_start)
|
||||
.delete(profile::profile_stop),
|
||||
);
|
||||
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
|
||||
|
||||
@@ -24,6 +24,7 @@ pub mod monitor;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod pgbouncer;
|
||||
pub mod profiling;
|
||||
pub mod rsyslog;
|
||||
pub mod spec;
|
||||
mod spec_apply;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::{Context, Result};
|
||||
use neon_failpoint::fail_point;
|
||||
use fail::fail_point;
|
||||
use tokio_postgres::{Client, Transaction};
|
||||
use tracing::{error, info};
|
||||
|
||||
@@ -40,14 +40,13 @@ impl<'m> MigrationRunner<'m> {
|
||||
// middle of applying a series of migrations fails in an expected
|
||||
// manner
|
||||
if cfg!(feature = "testing") {
|
||||
let fail = async {
|
||||
fail_point!("compute-migration", |fail_migration_id: Option<String>| {
|
||||
let fail = (|| {
|
||||
fail_point!("compute-migration", |fail_migration_id| {
|
||||
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
|
||||
});
|
||||
|
||||
false
|
||||
}
|
||||
.await;
|
||||
})();
|
||||
|
||||
if fail {
|
||||
return Err(anyhow::anyhow!(format!(
|
||||
|
||||
1240
compute_tools/src/profiling/mod.rs
Normal file
1240
compute_tools/src/profiling/mod.rs
Normal file
File diff suppressed because it is too large
Load Diff
58
docs/continuous-profiling.md
Normal file
58
docs/continuous-profiling.md
Normal file
@@ -0,0 +1,58 @@
|
||||
# Continuous Crofiling (Compute)
|
||||
|
||||
The continuous profiling of the compute node is performed by `perf` or `bcc-tools`, the latter is preferred.
|
||||
|
||||
The executables profiled are all the postgres-related ones only, excluding the actual compute code (Rust). This can be done as well but
|
||||
was not the main goal.
|
||||
|
||||
## Tools
|
||||
|
||||
The aforementioned tools are available within the same Docker image as
|
||||
the compute node itself, but the corresponding dependencies linux the
|
||||
linux kernel headers and the linux kernel itself are not and can't be
|
||||
for obvious reasons. To solve that, as we run the compute nodes as a
|
||||
virtual machine (qemu), we need to deliver these dependencies to it.
|
||||
This is done by the `autoscaling` part, which builds and deploys the
|
||||
kernel headers, needed modules, and the `perf` binary into an ext4-fs
|
||||
disk image, which is later attached to the VM and is symlinked to be
|
||||
made available for the compute node.
|
||||
|
||||
## Output
|
||||
|
||||
The output of the profiling is always a binary file in the same format
|
||||
of `pprof`. It can, however, be archived by `gzip` additionally, if the
|
||||
corresponding argument is provided in the JSON request.
|
||||
|
||||
## REST API
|
||||
|
||||
### Test profiling
|
||||
|
||||
One can test the profiling after connecting to the VM and running:
|
||||
|
||||
```sh
|
||||
curl -X POST -H "Content-Type: application/json" http://localhost:3080/profile/cpu -d '{"profiler": {"BccProfile": null}, "sampling_frequency": 99, "timeout_seconds": 5, "archive": false}' -v --output profile.pb
|
||||
```
|
||||
|
||||
This uses the `Bcc` profiler and does not archive the output. The
|
||||
profiling data will be saved into the `profile.pb` file locally.
|
||||
|
||||
**Only one profiling session can be run at a time.**
|
||||
|
||||
To check the profiling status (to see whether it is already running or
|
||||
not), one can perform the `GET` request:
|
||||
|
||||
```sh
|
||||
curl http://localhost:3080/profile/cpu -v
|
||||
```
|
||||
|
||||
The profiling can be stopped by performing the `DELETE` request:
|
||||
|
||||
```sh
|
||||
curl -X DELETE http://localhost:3080/profile/cpu -v
|
||||
```
|
||||
|
||||
## Supported profiling data
|
||||
|
||||
For now, only the CPU profiling is done and ther is no heap profiling.
|
||||
Also, only the postgres-related executables are tracked, the compute
|
||||
(Rust) part itself **is not tracked**.
|
||||
@@ -9,7 +9,7 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
bytes.workspace = true
|
||||
camino.workspace = true
|
||||
neon_failpoint.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hyper0.workspace = true
|
||||
itertools.workspace = true
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::failpoint_support::apply_failpoint;
|
||||
|
||||
use crate::error::ApiError;
|
||||
use crate::json::{json_request, json_response};
|
||||
@@ -14,16 +13,10 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
|
||||
pub struct FailpointConfig {
|
||||
/// Name of the fail point
|
||||
pub name: String,
|
||||
/// List of actions to take, using the format described in neon_failpoint
|
||||
/// List of actions to take, using the format described in `fail::cfg`
|
||||
///
|
||||
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
|
||||
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
|
||||
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
|
||||
pub actions: String,
|
||||
/// Optional context matching rules for conditional failpoints
|
||||
/// Each key-value pair specifies a context key and a regex pattern to match against
|
||||
/// All context matchers must match for the failpoint to trigger
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub context_matchers: Option<HashMap<String, String>>,
|
||||
}
|
||||
|
||||
/// Configure failpoints through http.
|
||||
@@ -31,7 +24,7 @@ pub async fn failpoints_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if !has_failpoints() {
|
||||
if !fail::has_failpoints() {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Cannot manage failpoints because neon was compiled without failpoints support"
|
||||
)));
|
||||
@@ -39,24 +32,15 @@ pub async fn failpoints_handler(
|
||||
|
||||
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
|
||||
for fp in failpoints {
|
||||
tracing::info!(
|
||||
"cfg failpoint: {} {} (context: {:?})",
|
||||
fp.name,
|
||||
fp.actions,
|
||||
fp.context_matchers
|
||||
);
|
||||
tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||
|
||||
let cfg_result = if let Some(context_matchers) = fp.context_matchers {
|
||||
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
|
||||
} else {
|
||||
configure_failpoint(&fp.name, &fp.actions)
|
||||
};
|
||||
// We recognize one extra "action" that's not natively recognized
|
||||
// by the failpoints crate: exit, to immediately kill the process
|
||||
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
|
||||
|
||||
if let Err(err) = cfg_result {
|
||||
if let Err(err_msg) = cfg_result {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Failed to configure failpoint '{}': {}",
|
||||
fp.name,
|
||||
err
|
||||
"Failed to configure failpoints: {err_msg}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
thiserror.workspace = true
|
||||
nix.workspace=true
|
||||
nix.workspace = true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
tempfile = "3.14.0"
|
||||
tempfile = "3.20.0"
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
[package]
|
||||
name = "neon_failpoint"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["time", "sync", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
anyhow = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
either = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-subscriber = { workspace = true, features = ["fmt"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[[example]]
|
||||
name = "context_demo"
|
||||
required-features = ["testing"]
|
||||
@@ -1,460 +0,0 @@
|
||||
# Neon Failpoint Library
|
||||
|
||||
A modern, async-first failpoint library for Neon, replacing the `fail` crate with enhanced functionality.
|
||||
|
||||
## Features
|
||||
|
||||
- **Async-first**: All failpoint operations are async and don't require `spawn_blocking`
|
||||
- **Context matching**: Failpoints can be configured to trigger only when specific context conditions are met
|
||||
- **Regex support**: Context values can be matched using regular expressions
|
||||
- **Cancellation support**: All operations support cancellation tokens
|
||||
- **Dynamic reconfiguration**: Paused and sleeping tasks automatically resume when failpoint configurations change
|
||||
- **Backward compatibility**: Drop-in replacement for existing `fail` crate usage
|
||||
|
||||
## Supported Actions
|
||||
|
||||
- `off` - Disable the failpoint
|
||||
- `pause` - Pause indefinitely until disabled, reconfigured, or cancelled
|
||||
- `sleep(N)` - Sleep for N milliseconds (can be interrupted by reconfiguration)
|
||||
- `return` - Return early (empty value)
|
||||
- `return(value)` - Return early with a specific value
|
||||
- `exit` - Exit the process immediately
|
||||
- `panic(message)` - Panic the process with a custom message
|
||||
- `N%return(value)` - Return with a specific value N% of the time (probability-based)
|
||||
- `N%M*return(value)` - Return with a specific value N% of the time, maximum M times
|
||||
- `N%action` - Execute any action N% of the time (probability-based)
|
||||
- `N%M*action` - Execute any action N% of the time, maximum M times
|
||||
|
||||
## Probability-Based Actions
|
||||
|
||||
The library supports probability-based failpoints that trigger only a percentage of the time:
|
||||
|
||||
```rust
|
||||
// 50% chance to return a value
|
||||
configure_failpoint("random_failure", "50%return(error)").unwrap();
|
||||
|
||||
// 10% chance to sleep, maximum 3 times
|
||||
configure_failpoint("occasional_delay", "10%3*sleep(1000)").unwrap();
|
||||
|
||||
// 25% chance to panic
|
||||
configure_failpoint("rare_panic", "25%panic(critical error)").unwrap();
|
||||
```
|
||||
|
||||
The probability system uses a counter to track how many times a probability-based action has been triggered, allowing for precise control over test scenarios.
|
||||
|
||||
## Dynamic Behavior
|
||||
|
||||
When a failpoint is reconfigured while tasks are waiting on it:
|
||||
|
||||
- **Paused tasks** will immediately resume and continue normal execution
|
||||
- **Sleeping tasks** will wake up early and continue normal execution
|
||||
- **Removed failpoints** will cause all waiting tasks to resume normally
|
||||
|
||||
The new configuration only applies to future hits of the failpoint, not to tasks that are already waiting. This allows for flexible testing scenarios where you can pause execution, inspect state, and then resume execution dynamically.
|
||||
|
||||
## Example: Dynamic Reconfiguration
|
||||
|
||||
```rust
|
||||
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
|
||||
use tokio::time::Duration;
|
||||
|
||||
// Start a task that will hit a failpoint
|
||||
let task = tokio::spawn(async {
|
||||
println!("About to hit failpoint");
|
||||
match failpoint("test_pause", None).await {
|
||||
FailpointResult::Return(value) => println!("Returned: {}", value),
|
||||
FailpointResult::Continue => println!("Continued normally"),
|
||||
FailpointResult::Cancelled => println!("Cancelled"),
|
||||
}
|
||||
});
|
||||
|
||||
// Configure the failpoint to pause
|
||||
configure_failpoint("test_pause", "pause").unwrap();
|
||||
|
||||
// Let the task hit the failpoint and pause
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
// Change the failpoint configuration - this will wake up the paused task
|
||||
// The task will resume and continue normally (not apply the new config)
|
||||
configure_failpoint("test_pause", "return(not_applied)").unwrap();
|
||||
|
||||
// The task will complete with Continue, not Return
|
||||
let result = task.await.unwrap();
|
||||
```
|
||||
|
||||
## Basic Usage
|
||||
|
||||
```rust
|
||||
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
|
||||
|
||||
// Configure a failpoint
|
||||
configure_failpoint("my_failpoint", "return(42)").unwrap();
|
||||
|
||||
// Use the failpoint
|
||||
match failpoint("my_failpoint", None).await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint returned: {}", value);
|
||||
return value.parse().unwrap_or_default();
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
// Continue normal execution
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
// Handle cancellation
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Context-Based Failpoint Configuration
|
||||
|
||||
Context allows you to create **conditional failpoints** that only trigger when specific runtime conditions are met. This is particularly useful for testing scenarios where you want to inject failures only for specific tenants, operations, or other contextual conditions.
|
||||
|
||||
### Configuring Context-Based Failpoints
|
||||
|
||||
Use `configure_failpoint_with_context()` to set up failpoints with context matching:
|
||||
|
||||
```rust
|
||||
use neon_failpoint::configure_failpoint_with_context;
|
||||
use std::collections::HashMap;
|
||||
|
||||
let mut context_matchers = HashMap::new();
|
||||
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
|
||||
context_matchers.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
configure_failpoint_with_context(
|
||||
"backup_operation", // failpoint name
|
||||
"return(simulated_failure)", // action to take
|
||||
context_matchers // context matching rules
|
||||
).unwrap();
|
||||
```
|
||||
|
||||
### Context Matching Rules
|
||||
|
||||
The context matching system works as follows:
|
||||
|
||||
1. **Key-Value Matching**: Each entry in `context_matchers` specifies a key that must exist in the runtime context
|
||||
2. **Regex Support**: Values in `context_matchers` are treated as regular expressions first
|
||||
3. **Fallback to Exact Match**: If the regex compilation fails, it falls back to exact string matching
|
||||
4. **ALL Must Match**: All context matchers must match for the failpoint to trigger
|
||||
|
||||
### Runtime Context Usage
|
||||
|
||||
When code hits a failpoint, it provides context using a `HashMap<String, String>`:
|
||||
|
||||
```rust
|
||||
use neon_failpoint::{failpoint, FailpointResult};
|
||||
use std::collections::HashMap;
|
||||
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
context.insert("operation".to_string(), "backup".to_string());
|
||||
context.insert("user_id".to_string(), "user_456".to_string());
|
||||
|
||||
match failpoint("backup_operation", Some(&context)) {
|
||||
either::Either::Left(result) => {
|
||||
match result {
|
||||
FailpointResult::Return(value) => {
|
||||
// This will only trigger if ALL context matchers match
|
||||
println!("Backup failed: {}", value);
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
// Continue with normal backup operation
|
||||
}
|
||||
FailpointResult::Cancelled => {}
|
||||
}
|
||||
}
|
||||
either::Either::Right(future) => {
|
||||
match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
// This will only trigger if ALL context matchers match
|
||||
println!("Backup failed: {}", value);
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
// Continue with normal backup operation
|
||||
}
|
||||
FailpointResult::Cancelled => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Context Matching Examples
|
||||
|
||||
#### Regex Matching
|
||||
```rust
|
||||
// Configure to match test tenants only
|
||||
let mut matchers = HashMap::new();
|
||||
matchers.insert("tenant_id".to_string(), "test_.*".to_string());
|
||||
|
||||
configure_failpoint_with_context("test_failpoint", "pause", matchers).unwrap();
|
||||
|
||||
// This will match
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
// This will NOT match
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "prod_123".to_string());
|
||||
```
|
||||
|
||||
#### Multiple Conditions
|
||||
```rust
|
||||
// Must match BOTH tenant pattern AND operation
|
||||
let mut matchers = HashMap::new();
|
||||
matchers.insert("tenant_id".to_string(), "test_.*".to_string());
|
||||
matchers.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
configure_failpoint_with_context("backup_test", "return(failed)", matchers).unwrap();
|
||||
|
||||
// This will match (both conditions met)
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
context.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
// This will NOT match (missing operation)
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
context.insert("operation".to_string(), "restore".to_string());
|
||||
```
|
||||
|
||||
#### Exact String Matching
|
||||
```rust
|
||||
// If regex compilation fails, falls back to exact match
|
||||
let mut matchers = HashMap::new();
|
||||
matchers.insert("env".to_string(), "staging".to_string());
|
||||
|
||||
configure_failpoint_with_context("env_specific", "sleep(1000)", matchers).unwrap();
|
||||
|
||||
// This will match
|
||||
let mut context = HashMap::new();
|
||||
context.insert("env".to_string(), "staging".to_string());
|
||||
// This will NOT match
|
||||
let mut context = HashMap::new();
|
||||
context.insert("env".to_string(), "production".to_string());
|
||||
```
|
||||
|
||||
### Benefits of Context-Based Failpoints
|
||||
|
||||
1. **Selective Testing**: Only inject failures for specific tenants, environments, or operations
|
||||
2. **Production Safety**: Avoid accidentally triggering failpoints in production by using context filters
|
||||
3. **Complex Scenarios**: Test interactions between different components with targeted failures
|
||||
4. **Debugging**: Isolate issues to specific contexts without affecting the entire system
|
||||
|
||||
### Context vs. Non-Context Failpoints
|
||||
|
||||
- **Without context**: `configure_failpoint("name", "action")` - triggers for ALL hits
|
||||
- **With context**: `configure_failpoint_with_context("name", "action", matchers)` - triggers only when context matches
|
||||
|
||||
## Context-Specific Failpoints
|
||||
|
||||
```rust
|
||||
use neon_failpoint::{configure_failpoint_with_context, failpoint};
|
||||
use std::collections::HashMap;
|
||||
|
||||
// Configure a failpoint that only triggers for specific tenants
|
||||
let mut context_matchers = HashMap::new();
|
||||
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
|
||||
context_matchers.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
configure_failpoint_with_context(
|
||||
"backup_operation",
|
||||
"return(simulated_failure)",
|
||||
context_matchers
|
||||
).unwrap();
|
||||
|
||||
// Use with context
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
context.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
match failpoint("backup_operation", Some(&context)) {
|
||||
either::Either::Left(result) => {
|
||||
match result {
|
||||
FailpointResult::Return(value) => {
|
||||
// This will trigger for tenant_id matching "test_.*"
|
||||
println!("Backup failed: {}", value);
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
// Continue with backup
|
||||
}
|
||||
FailpointResult::Cancelled => {}
|
||||
}
|
||||
}
|
||||
either::Either::Right(future) => {
|
||||
match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
// This will trigger for tenant_id matching "test_.*"
|
||||
println!("Backup failed: {}", value);
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
// Continue with backup
|
||||
}
|
||||
FailpointResult::Cancelled => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Macros
|
||||
|
||||
The library provides convenient macros for common patterns:
|
||||
|
||||
### `fail_point!` - Basic Failpoint Macro
|
||||
|
||||
The `fail_point!` macro has three variants:
|
||||
|
||||
1. **Simple failpoint** - `fail_point!(name)`
|
||||
- Just checks the failpoint and continues or returns early (no value)
|
||||
- Panics if the failpoint is configured with `return(value)` since no closure is provided
|
||||
|
||||
2. **Failpoint with return handler** - `fail_point!(name, closure)`
|
||||
- Provides a closure to handle return values from the failpoint
|
||||
- The closure receives `Option<String>` and should return the appropriate value
|
||||
|
||||
3. **Conditional failpoint** - `fail_point!(name, condition, closure)`
|
||||
- Only checks the failpoint if the condition is true
|
||||
- Provides a closure to handle return values (receives `&str`)
|
||||
|
||||
```rust
|
||||
use neon_failpoint::fail_point;
|
||||
|
||||
// Simple failpoint - just continue or return early
|
||||
fail_point!("my_failpoint");
|
||||
|
||||
// Failpoint with return value handling
|
||||
fail_point!("my_failpoint", |value: Option<String>| {
|
||||
match value {
|
||||
Some(v) => {
|
||||
println!("Got value: {}", v);
|
||||
return Ok(v.parse().unwrap_or_default());
|
||||
}
|
||||
None => return Ok(42), // Default return value
|
||||
}
|
||||
});
|
||||
|
||||
// Conditional failpoint - only check if condition is met
|
||||
let should_fail = some_condition();
|
||||
fail_point!("conditional_failpoint", should_fail, |value: &str| {
|
||||
println!("Conditional failpoint triggered with: {}", value);
|
||||
return Err(anyhow::anyhow!("Simulated failure"));
|
||||
});
|
||||
```
|
||||
|
||||
### `fail_point_with_context!` - Context-Aware Failpoint Macro
|
||||
|
||||
The `fail_point_with_context!` macro has three variants that mirror `fail_point!` but include context:
|
||||
|
||||
1. **Simple with context** - `fail_point_with_context!(name, context)`
|
||||
2. **With context and return handler** - `fail_point_with_context!(name, context, closure)`
|
||||
3. **Conditional with context** - `fail_point_with_context!(name, context, condition, closure)`
|
||||
|
||||
```rust
|
||||
use neon_failpoint::{fail_point_with_context};
|
||||
use std::collections::HashMap;
|
||||
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
context.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
// Simple context failpoint
|
||||
fail_point_with_context!("backup_failpoint", &context);
|
||||
|
||||
// Context failpoint with return handler
|
||||
fail_point_with_context!("backup_failpoint", &context, |value: Option<String>| {
|
||||
match value {
|
||||
Some(v) => return Err(anyhow::anyhow!("Backup failed: {}", v)),
|
||||
None => return Err(anyhow::anyhow!("Backup failed")),
|
||||
}
|
||||
});
|
||||
|
||||
// Conditional context failpoint
|
||||
let is_test_tenant = tenant_id.starts_with("test_");
|
||||
fail_point_with_context!("backup_failpoint", &context, is_test_tenant, |value: Option<String>| {
|
||||
// Only triggers for test tenants
|
||||
return Err(anyhow::anyhow!("Test tenant backup failure"));
|
||||
});
|
||||
```
|
||||
|
||||
### Other Utility Macros
|
||||
|
||||
```rust
|
||||
use neon_failpoint::{pausable_failpoint, sleep_millis_async};
|
||||
|
||||
// Pausable failpoint with cancellation
|
||||
let cancel_token = CancellationToken::new();
|
||||
if let Err(()) = pausable_failpoint!("pause_here", &cancel_token).await {
|
||||
println!("Failpoint was cancelled");
|
||||
}
|
||||
|
||||
// Sleep failpoint
|
||||
sleep_millis_async!("sleep_here", &cancel_token).await;
|
||||
|
||||
// Context creation helper
|
||||
let mut context = HashMap::new();
|
||||
context.insert("key1".to_string(), "value1".to_string());
|
||||
context.insert("key2".to_string(), "value2".to_string());
|
||||
```
|
||||
|
||||
### Argument Reference
|
||||
|
||||
- **`name`**: String literal - the name of the failpoint
|
||||
- **`context`**: Expression that evaluates to `&HashMap<String, String>` - context for matching
|
||||
- **`condition`**: Boolean expression - only check failpoint if true
|
||||
- **`closure`**: Closure that handles return values:
|
||||
- For `fail_point!` with closure: receives `Option<String>`
|
||||
- For conditional variants: receives `&str`
|
||||
- For `fail_point_with_context!` with closure: receives `Option<String>`
|
||||
- **`cancel`**: `&CancellationToken` - for cancellation support
|
||||
|
||||
## Migration from `fail` crate
|
||||
|
||||
The library provides a compatibility layer in `libs/utils/src/failpoint_support.rs`. Most existing code should work without changes, but you can migrate to the new async APIs for better performance:
|
||||
|
||||
### Before (with `fail` crate):
|
||||
```rust
|
||||
use utils::failpoint_support::pausable_failpoint;
|
||||
|
||||
// This used spawn_blocking internally
|
||||
pausable_failpoint!("my_failpoint", &cancel_token).await?;
|
||||
```
|
||||
|
||||
### After (with `neon_failpoint`):
|
||||
```rust
|
||||
use neon_failpoint::{failpoint_with_cancellation, FailpointResult};
|
||||
|
||||
// This is fully async
|
||||
match failpoint_with_cancellation("my_failpoint", None, &cancel_token).await {
|
||||
FailpointResult::Continue => {},
|
||||
FailpointResult::Cancelled => return Err(()),
|
||||
FailpointResult::Return(_) => {},
|
||||
}
|
||||
```
|
||||
|
||||
## Environment Variable Support
|
||||
|
||||
Failpoints can be configured via the `FAILPOINTS` environment variable:
|
||||
|
||||
```bash
|
||||
FAILPOINTS="failpoint1=return(42);failpoint2=sleep(1000);failpoint3=exit"
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
The library includes comprehensive tests and examples. Run them with:
|
||||
|
||||
```bash
|
||||
cargo test --features testing
|
||||
cargo run --example context_demo --features testing
|
||||
```
|
||||
|
||||
## HTTP Configuration
|
||||
|
||||
The library integrates with the existing HTTP failpoint configuration API. Send POST requests to `/v1/failpoints` with:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"name": "my_failpoint",
|
||||
"actions": "return(42)"
|
||||
}
|
||||
]
|
||||
```
|
||||
@@ -1,82 +0,0 @@
|
||||
use neon_failpoint::{configure_failpoint_with_context, failpoint, FailpointResult};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Initialize tracing for better output
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
// Set up a context-specific failpoint
|
||||
let mut context_matchers = HashMap::new();
|
||||
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
|
||||
context_matchers.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
configure_failpoint_with_context(
|
||||
"backup_operation",
|
||||
"return(simulated_failure)",
|
||||
context_matchers,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Test with matching context
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "test_123".to_string());
|
||||
context.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
println!("Testing with matching context...");
|
||||
match failpoint("backup_operation", Some(&context)) {
|
||||
either::Either::Left(result) => match result {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
either::Either::Right(future) => match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Test with non-matching context
|
||||
let mut context = HashMap::new();
|
||||
context.insert("tenant_id".to_string(), "prod_456".to_string());
|
||||
context.insert("operation".to_string(), "backup".to_string());
|
||||
|
||||
println!("Testing with non-matching context...");
|
||||
match failpoint("backup_operation", Some(&context)) {
|
||||
either::Either::Left(result) => match result {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered (expected)");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
either::Either::Right(future) => match future.await {
|
||||
FailpointResult::Return(value) => {
|
||||
println!("Failpoint triggered with value: {value:?}");
|
||||
}
|
||||
FailpointResult::Continue => {
|
||||
println!("Failpoint not triggered (expected)");
|
||||
}
|
||||
FailpointResult::Cancelled => {
|
||||
println!("Failpoint cancelled");
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,356 +0,0 @@
|
||||
//! Macros for convenient failpoint usage
|
||||
|
||||
/// Simple failpoint macro - async version that doesn't require spawn_blocking
|
||||
#[macro_export]
|
||||
macro_rules! fail_point {
|
||||
($name:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(_) => {
|
||||
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(_) => {
|
||||
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $condition:expr, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
if $condition {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Simple failpoint macro - sync version that panics if async action is triggered
|
||||
#[macro_export]
|
||||
macro_rules! fail_point_sync {
|
||||
($name:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(_) => {
|
||||
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(_) => {
|
||||
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(_) => {
|
||||
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $condition:expr, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
if $condition {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(_) => {
|
||||
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Failpoint macro with context support
|
||||
#[macro_export]
|
||||
macro_rules! fail_point_with_context {
|
||||
($name:literal, $context:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, Some($context)) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(_) => {
|
||||
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(_) => {
|
||||
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $context:expr, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, Some($context)) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
if $condition {
|
||||
match $crate::failpoint($name, Some($context)) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(future) => {
|
||||
match future.await {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Failpoint macro with context support - sync version
|
||||
#[macro_export]
|
||||
macro_rules! fail_point_with_context_sync {
|
||||
($name:literal, $context:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, Some($context)) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(_) => {
|
||||
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(_) => {
|
||||
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $context:expr, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, Some($context)) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(_) => {
|
||||
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
|
||||
},
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
if $condition {
|
||||
match $crate::failpoint($name, Some($context)) {
|
||||
$crate::either::Either::Left(result) => {
|
||||
match result {
|
||||
$crate::FailpointResult::Continue => {},
|
||||
$crate::FailpointResult::Return(value) => {
|
||||
let closure = $closure;
|
||||
return closure(value);
|
||||
},
|
||||
$crate::FailpointResult::Cancelled => {},
|
||||
}
|
||||
},
|
||||
$crate::either::Either::Right(_) => {
|
||||
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Pausable failpoint macro - equivalent to the old pausable_failpoint
|
||||
#[macro_export]
|
||||
macro_rules! pausable_failpoint {
|
||||
($name:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
let cancel = ::tokio_util::sync::CancellationToken::new();
|
||||
let _ = $crate::pausable_failpoint!($name, &cancel);
|
||||
}
|
||||
}};
|
||||
($name:literal, $cancel:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
::tracing::info!("at failpoint {}", $name); // tests rely on this
|
||||
match $crate::failpoint_with_cancellation($name, None, $cancel) {
|
||||
$crate::either::Either::Left(result) => match result {
|
||||
$crate::FailpointResult::Continue => Ok(()),
|
||||
$crate::FailpointResult::Return(_) => Ok(()),
|
||||
$crate::FailpointResult::Cancelled => Err(()),
|
||||
},
|
||||
$crate::either::Either::Right(future) => match future.await {
|
||||
$crate::FailpointResult::Continue => Ok(()),
|
||||
$crate::FailpointResult::Return(_) => Ok(()),
|
||||
$crate::FailpointResult::Cancelled => Err(()),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// Sleep failpoint macro - for async sleep operations
|
||||
#[macro_export]
|
||||
macro_rules! sleep_millis_async {
|
||||
($name:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint($name, None) {
|
||||
$crate::either::Either::Left(_) => {}
|
||||
$crate::either::Either::Right(future) => {
|
||||
future.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
($name:literal, $cancel:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
match $crate::failpoint_with_cancellation($name, None, $cancel) {
|
||||
$crate::either::Either::Left(_) => {}
|
||||
$crate::either::Either::Right(future) => {
|
||||
future.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
// Re-export for convenience
|
||||
pub use fail_point;
|
||||
pub use fail_point_sync;
|
||||
pub use fail_point_with_context;
|
||||
pub use fail_point_with_context_sync;
|
||||
pub use pausable_failpoint;
|
||||
pub use sleep_millis_async;
|
||||
@@ -9,7 +9,7 @@ default = ["rename_noreplace"]
|
||||
rename_noreplace = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["neon_failpoint/testing"]
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
arc-swap.workspace = true
|
||||
@@ -21,11 +21,10 @@ bytes.workspace = true
|
||||
camino.workspace = true
|
||||
chrono.workspace = true
|
||||
diatomic-waker.workspace = true
|
||||
either.workspace = true
|
||||
git-version.workspace = true
|
||||
hex = { workspace = true, features = ["serde"] }
|
||||
humantime.workspace = true
|
||||
neon_failpoint.workspace = true
|
||||
fail.workspace = true
|
||||
futures = { workspace = true }
|
||||
jsonwebtoken.workspace = true
|
||||
nix = { workspace = true, features = ["ioctl"] }
|
||||
|
||||
@@ -1,22 +1,59 @@
|
||||
//! Failpoint support code shared between pageserver and safekeepers.
|
||||
//!
|
||||
//! This module provides a compatibility layer over the new neon_failpoint crate.
|
||||
|
||||
pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Mere forward to neon_failpoint::pausable_failpoint
|
||||
/// Declare a failpoint that can use to `pause` failpoint action.
|
||||
/// We don't want to block the executor thread, hence, spawn_blocking + await.
|
||||
///
|
||||
/// Optionally pass a cancellation token, and this failpoint will drop out of
|
||||
/// its pause when the cancellation token fires. This is useful for testing
|
||||
/// cases where we would like to block something, but test its clean shutdown behavior.
|
||||
/// The macro evaluates to a Result in that case, where Ok(()) is the case
|
||||
/// where the failpoint was not paused, and Err() is the case where cancellation
|
||||
/// token fired while evaluating the failpoint.
|
||||
///
|
||||
/// Remember to unpause the failpoint in the test; until that happens, one of the
|
||||
/// limited number of spawn_blocking thread pool threads is leaked.
|
||||
#[macro_export]
|
||||
macro_rules! pausable_failpoint {
|
||||
($name:literal) => {
|
||||
::neon_failpoint::pausable_failpoint!($name)
|
||||
};
|
||||
($name:literal, $cancel:expr) => {
|
||||
::neon_failpoint::pausable_failpoint!($name, $cancel)
|
||||
};
|
||||
($name:literal) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
let cancel = ::tokio_util::sync::CancellationToken::new();
|
||||
let _ = $crate::pausable_failpoint!($name, &cancel);
|
||||
}
|
||||
}};
|
||||
($name:literal, $cancel:expr) => {{
|
||||
if cfg!(feature = "testing") {
|
||||
let failpoint_fut = ::tokio::task::spawn_blocking({
|
||||
let current = ::tracing::Span::current();
|
||||
move || {
|
||||
let _entered = current.entered();
|
||||
::tracing::info!("at failpoint {}", $name);
|
||||
::fail::fail_point!($name);
|
||||
}
|
||||
});
|
||||
let cancel_fut = async move {
|
||||
$cancel.cancelled().await;
|
||||
};
|
||||
::tokio::select! {
|
||||
res = failpoint_fut => {
|
||||
res.expect("spawn_blocking");
|
||||
// continue with execution
|
||||
Ok(())
|
||||
},
|
||||
_ = cancel_fut => {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
/// DEPRECATED! - use with fail::cfg("$name", "return(2000)")
|
||||
pub use pausable_failpoint;
|
||||
|
||||
/// use with fail::cfg("$name", "return(2000)")
|
||||
///
|
||||
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
|
||||
/// specified time (in milliseconds). The main difference is that we use async
|
||||
@@ -32,7 +69,7 @@ macro_rules! __failpoint_sleep_millis_async {
|
||||
// If the failpoint is used with a "return" action, set should_sleep to the
|
||||
// returned value (as string). Otherwise it's set to None.
|
||||
let should_sleep = (|| {
|
||||
::neon_failpoint::fail_point_sync!($name, |x| x);
|
||||
::fail::fail_point!($name, |x| x);
|
||||
::std::option::Option::None
|
||||
})();
|
||||
|
||||
@@ -45,7 +82,7 @@ macro_rules! __failpoint_sleep_millis_async {
|
||||
// If the failpoint is used with a "return" action, set should_sleep to the
|
||||
// returned value (as string). Otherwise it's set to None.
|
||||
let should_sleep = (|| {
|
||||
::neon_failpoint::fail_point_sync!($name, |x| x);
|
||||
::fail::fail_point!($name, |x| x);
|
||||
::std::option::Option::None
|
||||
})();
|
||||
|
||||
@@ -89,3 +126,60 @@ pub async fn failpoint_sleep_cancellable_helper(
|
||||
tokio::time::timeout(d, cancel.cancelled()).await.ok();
|
||||
tracing::info!("failpoint {:?}: sleep done", name);
|
||||
}
|
||||
|
||||
/// Initialize the configured failpoints
|
||||
///
|
||||
/// You must call this function before any concurrent threads do operations.
|
||||
pub fn init() -> fail::FailScenario<'static> {
|
||||
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
|
||||
// We want non-default behavior for `exit`, though, so, we handle it separately.
|
||||
//
|
||||
// Format for FAILPOINTS is "name=actions" separated by ";".
|
||||
let actions = std::env::var("FAILPOINTS");
|
||||
if actions.is_ok() {
|
||||
// SAFETY: this function should before any threads start and access env vars concurrently
|
||||
unsafe {
|
||||
std::env::remove_var("FAILPOINTS");
|
||||
}
|
||||
} else {
|
||||
// let the library handle non-utf8, or nothing for not present
|
||||
}
|
||||
|
||||
let scenario = fail::FailScenario::setup();
|
||||
|
||||
if let Ok(val) = actions {
|
||||
val.split(';')
|
||||
.enumerate()
|
||||
.map(|(i, s)| s.split_once('=').ok_or((i, s)))
|
||||
.for_each(|res| {
|
||||
let (name, actions) = match res {
|
||||
Ok(t) => t,
|
||||
Err((i, s)) => {
|
||||
panic!(
|
||||
"startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
|
||||
i + 1,
|
||||
);
|
||||
}
|
||||
};
|
||||
if let Err(e) = apply_failpoint(name, actions) {
|
||||
panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
scenario
|
||||
}
|
||||
|
||||
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
|
||||
if actions == "exit" {
|
||||
fail::cfg_callback(name, exit_failpoint)
|
||||
} else {
|
||||
fail::cfg(name, actions)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(never)]
|
||||
fn exit_failpoint() {
|
||||
tracing::info!("Exit requested by failpoint");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["neon_failpoint/testing", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
|
||||
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
|
||||
|
||||
fuzz-read-path = ["testing"]
|
||||
|
||||
@@ -33,7 +33,7 @@ crc32c.workspace = true
|
||||
either.workspace = true
|
||||
enum-map.workspace = true
|
||||
enumset = { workspace = true, features = ["serde"]}
|
||||
neon_failpoint.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -17,7 +17,6 @@ use anyhow::{Context, anyhow};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
|
||||
|
||||
@@ -68,7 +68,7 @@ const FEATURES: &[&str] = &[
|
||||
fn version() -> String {
|
||||
format!(
|
||||
"{GIT_VERSION} failpoints: {}, features: {:?}",
|
||||
neon_failpoint::has_failpoints(),
|
||||
fail::has_failpoints(),
|
||||
FEATURES,
|
||||
)
|
||||
}
|
||||
@@ -84,7 +84,7 @@ fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
// Initialize up failpoints support
|
||||
failpoint_support::init().unwrap();
|
||||
let scenario = failpoint_support::init();
|
||||
|
||||
let workdir = arg_matches
|
||||
.get_one::<String>("workdir")
|
||||
@@ -221,6 +221,7 @@ fn main() -> anyhow::Result<()> {
|
||||
|
||||
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
|
||||
|
||||
scenario.teardown();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -365,9 +366,16 @@ fn start_pageserver(
|
||||
|
||||
// If any failpoints were set from FAILPOINTS environment variable,
|
||||
// print them to the log for debugging purposes
|
||||
let failpoints = neon_failpoint::list();
|
||||
for (name, actions) in failpoints {
|
||||
info!("starting with failpoint: {name} {actions}");
|
||||
let failpoints = fail::list();
|
||||
if !failpoints.is_empty() {
|
||||
info!(
|
||||
"started with failpoints: {}",
|
||||
failpoints
|
||||
.iter()
|
||||
.map(|(name, actions)| format!("{name}={actions}"))
|
||||
.collect::<Vec<String>>()
|
||||
.join(";")
|
||||
)
|
||||
}
|
||||
|
||||
// Create and lock PID file. This ensures that there cannot be more than one
|
||||
|
||||
@@ -6,8 +6,6 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
|
||||
use crate::consumption_metrics::NewMetricsRefRoot;
|
||||
|
||||
use neon_failpoint as fail;
|
||||
|
||||
pub(super) fn read_metrics_from_serde_value(
|
||||
json_value: serde_json::Value,
|
||||
) -> anyhow::Result<Vec<NewRawMetric>> {
|
||||
@@ -131,7 +129,7 @@ pub(super) async fn flush_metrics_to_disk(
|
||||
tempfile.flush()?;
|
||||
tempfile.as_file().sync_all()?;
|
||||
|
||||
fail::fail_point_sync!("before-persist-last-metrics-collected");
|
||||
fail::fail_point!("before-persist-last-metrics-collected");
|
||||
|
||||
drop(tempfile.persist(&*path).map_err(|e| e.error)?);
|
||||
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use neon_failpoint as fail;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -28,7 +28,6 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use humantime::format_rfc3339;
|
||||
use hyper::{Body, Request, Response, StatusCode, Uri, header};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::models::virtual_file::IoMode;
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, DownloadRemoteLayersTaskSpawnRequest, IngestAuxFilesRequest,
|
||||
@@ -3976,7 +3975,7 @@ pub fn make_router(
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
|
||||
.get("/v1/status", |r| api_handler(r, status_handler))
|
||||
.post("/v1/failpoints", |r| {
|
||||
.put("/v1/failpoints", |r| {
|
||||
testing_api_handler("manage failpoints", r, failpoints_handler)
|
||||
})
|
||||
.post("/v1/reload_auth_validation_keys", |r| {
|
||||
|
||||
@@ -19,7 +19,6 @@ use futures::future::BoxFuture;
|
||||
use futures::{FutureExt, Stream};
|
||||
use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use neon_failpoint as fail;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
@@ -336,21 +335,18 @@ async fn page_service_conn_main(
|
||||
|
||||
let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
|
||||
let socket_timeout_ms = (|| {
|
||||
fail::fail_point_sync!(
|
||||
"simulated-bad-compute-connection",
|
||||
|avg_timeout_ms: Option<String>| {
|
||||
// Exponential distribution for simulating
|
||||
// poor network conditions, expect about avg_timeout_ms to be around 15
|
||||
// in tests
|
||||
if let Some(avg_timeout_ms) = avg_timeout_ms {
|
||||
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
|
||||
let u = rand::random::<f32>();
|
||||
((1.0 - u).ln() / (-avg)) as u64
|
||||
} else {
|
||||
default_timeout_ms
|
||||
}
|
||||
fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
|
||||
// Exponential distribution for simulating
|
||||
// poor network conditions, expect about avg_timeout_ms to be around 15
|
||||
// in tests
|
||||
if let Some(avg_timeout_ms) = avg_timeout_ms {
|
||||
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
|
||||
let u = rand::random::<f32>();
|
||||
((1.0 - u).ln() / (-avg)) as u64
|
||||
} else {
|
||||
default_timeout_ms
|
||||
}
|
||||
);
|
||||
});
|
||||
default_timeout_ms
|
||||
})();
|
||||
|
||||
@@ -3047,7 +3043,7 @@ where
|
||||
_pgb: &mut PostgresBackend<IO>,
|
||||
sm: &FeStartupPacket,
|
||||
) -> Result<(), QueryError> {
|
||||
fail::fail_point_sync!("ps::connection-start::startup-packet");
|
||||
fail::fail_point!("ps::connection-start::startup-packet");
|
||||
|
||||
if let FeStartupPacket::StartupMessage { params, .. } = sm {
|
||||
if let Some(app_name) = params.get("application_name") {
|
||||
|
||||
@@ -14,7 +14,6 @@ use crate::{PERF_TRACE_TARGET, ensure_walingest};
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::{
|
||||
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
|
||||
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,
|
||||
|
||||
@@ -30,7 +30,6 @@ use enumset::EnumSet;
|
||||
use futures::StreamExt;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use itertools::Itertools as _;
|
||||
use neon_failpoint as fail;
|
||||
use once_cell::sync::Lazy;
|
||||
pub use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{self, RelSizeMigration};
|
||||
@@ -9572,7 +9571,7 @@ mod tests {
|
||||
writer.finish_write(Lsn(0x30));
|
||||
drop(writer);
|
||||
|
||||
neon_failpoint::configure_failpoint(
|
||||
fail::cfg(
|
||||
"flush-layer-before-update-remote-consistent-lsn",
|
||||
"return()",
|
||||
)
|
||||
|
||||
@@ -12,7 +12,6 @@ use anyhow::Context;
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
|
||||
use pageserver_api::shard::{
|
||||
|
||||
@@ -194,7 +194,6 @@ pub(crate) use download::{
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use regex::Regex;
|
||||
|
||||
@@ -11,7 +11,6 @@ use std::time::SystemTime;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{
|
||||
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
|
||||
|
||||
@@ -8,7 +8,6 @@ use anyhow::{Context, bail};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
|
||||
use tokio::fs::{self, File};
|
||||
|
||||
@@ -39,7 +39,6 @@ use layer_manager::{
|
||||
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
|
||||
Shutdown,
|
||||
};
|
||||
use neon_failpoint as fail;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
|
||||
@@ -5184,9 +5183,7 @@ impl Timeline {
|
||||
*self.applied_gc_cutoff_lsn.read(),
|
||||
);
|
||||
|
||||
neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x: Option<
|
||||
String,
|
||||
>| bail!(
|
||||
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
|
||||
"{}",
|
||||
x.unwrap()
|
||||
));
|
||||
|
||||
@@ -26,7 +26,6 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
use neon_failpoint as fail;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
|
||||
use pageserver_api::key::{KEY_SIZE, Key};
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::sync::Arc;
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use http_utils::error::ApiError;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::DetachBehavior;
|
||||
@@ -1114,7 +1113,7 @@ pub(super) async fn detach_and_reparent(
|
||||
// others will fail as if those timelines had been stopped for whatever reason.
|
||||
#[cfg(feature = "testing")]
|
||||
let failpoint_sem = || -> Option<Arc<Semaphore>> {
|
||||
fail::fail_point_sync!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
|
||||
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
|
||||
Arc::new(Semaphore::new(1))
|
||||
));
|
||||
None
|
||||
|
||||
@@ -5,7 +5,6 @@ use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use neon_failpoint as fail;
|
||||
use tracing::{error, info, info_span};
|
||||
use utils::fs_ext;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
@@ -11,7 +11,6 @@ use bytes::BytesMut;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use neon_failpoint as fail;
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
|
||||
35
poetry.lock
generated
35
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -2326,6 +2326,25 @@ files = [
|
||||
{file = "propcache-0.2.0.tar.gz", hash = "sha256:df81779732feb9d01e5d513fad0122efb3d53bbc75f61b2a4f29a020bc985e70"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "6.31.1"
|
||||
description = ""
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9"},
|
||||
{file = "protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447"},
|
||||
{file = "protobuf-6.31.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:6f1227473dc43d44ed644425268eb7c2e488ae245d51c6866d19fe158e207402"},
|
||||
{file = "protobuf-6.31.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:a40fc12b84c154884d7d4c4ebd675d5b3b5283e155f324049ae396b95ddebc39"},
|
||||
{file = "protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4ee898bf66f7a8b0bd21bce523814e6fbd8c6add948045ce958b73af7e8878c6"},
|
||||
{file = "protobuf-6.31.1-cp39-cp39-win32.whl", hash = "sha256:0414e3aa5a5f3ff423828e1e6a6e907d6c65c1d5b7e6e975793d5590bdeecc16"},
|
||||
{file = "protobuf-6.31.1-cp39-cp39-win_amd64.whl", hash = "sha256:8764cf4587791e7564051b35524b72844f845ad0bb011704c3736cce762d8fe9"},
|
||||
{file = "protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e"},
|
||||
{file = "protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psutil"
|
||||
version = "5.9.4"
|
||||
@@ -3309,6 +3328,18 @@ files = [
|
||||
[package.dependencies]
|
||||
cryptography = "*"
|
||||
|
||||
[[package]]
|
||||
name = "types-protobuf"
|
||||
version = "6.30.2.20250516"
|
||||
description = "Typing stubs for protobuf"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722"},
|
||||
{file = "types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "types-psutil"
|
||||
version = "5.9.5.12"
|
||||
@@ -3847,4 +3878,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"
|
||||
content-hash = "7cc735f57c2760db6c994575a98d4f0e2670497ad9e909135a3bc67d479f5edf"
|
||||
|
||||
@@ -10,6 +10,8 @@ psycopg2-binary = "^2.9.10"
|
||||
typing-extensions = "^4.12.2"
|
||||
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
|
||||
requests = "^2.32.4"
|
||||
protobuf = "^6.31.1"
|
||||
types-protobuf="^6.30.2"
|
||||
pytest-xdist = "^3.3.1"
|
||||
asyncpg = "^0.30.0"
|
||||
aiopg = "^1.4.0"
|
||||
@@ -63,6 +65,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
exclude = [
|
||||
"^vendor/",
|
||||
"^target/",
|
||||
"test_runner/regress/data/profile_pb2.py",
|
||||
"test_runner/performance/pgvector/loaddata.py",
|
||||
]
|
||||
check_untyped_defs = true
|
||||
@@ -94,6 +97,7 @@ target-version = "py311"
|
||||
extend-exclude = [
|
||||
"vendor/",
|
||||
"target/",
|
||||
"test_runner/regress/data/profile_pb2.py",
|
||||
"test_runner/stubs/", # Autogenerated by mypy's stubgen
|
||||
]
|
||||
line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter
|
||||
|
||||
@@ -6,7 +6,9 @@ license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = ["neon_failpoint/testing"]
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints"]
|
||||
benchmarking = []
|
||||
|
||||
[dependencies]
|
||||
@@ -19,7 +21,7 @@ camino-tempfile.workspace = true
|
||||
chrono.workspace = true
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
crc32c.workspace = true
|
||||
neon_failpoint.workspace = true
|
||||
fail.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
http.workspace = true
|
||||
|
||||
@@ -65,7 +65,7 @@ const FEATURES: &[&str] = &[
|
||||
fn version() -> String {
|
||||
format!(
|
||||
"{GIT_VERSION} failpoints: {}, features: {:?}",
|
||||
neon_failpoint::has_failpoints(),
|
||||
fail::has_failpoints(),
|
||||
FEATURES,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -717,7 +717,7 @@ pub fn make_router(
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
.post("/v1/failpoints", |r| {
|
||||
.put("/v1/failpoints", |r| {
|
||||
request_span(r, move |r| async {
|
||||
check_permission(&r, None)?;
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
@@ -872,15 +872,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
|
||||
loop {
|
||||
self.end_pos = self.end_watch.get();
|
||||
let have_something_to_send = async {
|
||||
neon_failpoint::fail_point!(
|
||||
let have_something_to_send = (|| {
|
||||
fail::fail_point!(
|
||||
"sk-pause-send",
|
||||
self.appname.as_deref() != Some("pageserver"),
|
||||
|_| { false }
|
||||
);
|
||||
self.end_pos > self.start_pos
|
||||
}
|
||||
.await;
|
||||
})();
|
||||
|
||||
if have_something_to_send {
|
||||
trace!("got end_pos {:?}, streaming", self.end_pos);
|
||||
@@ -932,15 +931,14 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
|
||||
/// - Err in case of error -- only if 1) term changed while fetching in recovery
|
||||
/// mode 2) watch channel closed, which must never happen.
|
||||
async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
|
||||
let fp = async {
|
||||
neon_failpoint::fail_point!(
|
||||
let fp = (|| {
|
||||
fail::fail_point!(
|
||||
"sk-pause-send",
|
||||
self.appname.as_deref() != Some("pageserver"),
|
||||
|_| { true }
|
||||
);
|
||||
false
|
||||
}
|
||||
.await;
|
||||
})();
|
||||
if fp {
|
||||
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
|
||||
return Ok(None);
|
||||
|
||||
@@ -657,7 +657,7 @@ pub async fn delete_timeline(
|
||||
|
||||
pausable_failpoint!("sk-delete-timeline-remote-pause");
|
||||
|
||||
neon_failpoint::fail_point!("sk-delete-timeline-remote", |_| {
|
||||
fail::fail_point!("sk-delete-timeline-remote", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote"))
|
||||
});
|
||||
|
||||
|
||||
@@ -301,7 +301,7 @@ impl PhysicalStorage {
|
||||
format!("Failed to open tmp wal file {:?}", &tmp_path)
|
||||
})?;
|
||||
|
||||
neon_failpoint::fail_point!("sk-zero-segment", |_| {
|
||||
fail::fail_point!("sk-zero-segment", |_| {
|
||||
info!("sk-zero-segment failpoint hit");
|
||||
Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
|
||||
});
|
||||
|
||||
@@ -22,7 +22,7 @@ clap.workspace = true
|
||||
clashmap.workspace = true
|
||||
compute_api.workspace = true
|
||||
cron.workspace = true
|
||||
neon_failpoint.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
governor.workspace = true
|
||||
hex.workspace = true
|
||||
|
||||
@@ -25,7 +25,6 @@ use futures::stream::FuturesUnordered;
|
||||
use http_utils::error::ApiError;
|
||||
use hyper::Uri;
|
||||
use itertools::Itertools;
|
||||
use neon_failpoint as fail;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
@@ -6027,7 +6026,7 @@ impl Service {
|
||||
tenant_id: TenantId,
|
||||
split_req: TenantShardSplitRequest,
|
||||
) -> Result<ShardSplitAction, ApiError> {
|
||||
fail::fail_point_sync!("shard-split-validation", |_| Err(ApiError::BadRequest(
|
||||
fail::fail_point!("shard-split-validation", |_| Err(ApiError::BadRequest(
|
||||
anyhow::anyhow!("failpoint")
|
||||
)));
|
||||
|
||||
|
||||
@@ -115,6 +115,39 @@ class EndpointHttpClient(requests.Session):
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def start_profiling_cpu(
|
||||
self, sampling_frequency: int, timeout_seconds: int, archive: bool = False
|
||||
) -> tuple[int, bytes]:
|
||||
url = f"http://localhost:{self.external_port}/profile/cpu"
|
||||
params = {
|
||||
"profiler": {"BccProfile": None},
|
||||
"sampling_frequency": sampling_frequency,
|
||||
"timeout_seconds": timeout_seconds,
|
||||
"archive": archive,
|
||||
}
|
||||
|
||||
res = self.post(
|
||||
url,
|
||||
json=params,
|
||||
auth=self.auth,
|
||||
)
|
||||
|
||||
return res.status_code, res.content
|
||||
|
||||
def stop_profiling_cpu(self) -> int:
|
||||
url = f"http://localhost:{self.external_port}/profile/cpu"
|
||||
res = self.delete(url, auth=self.auth)
|
||||
return res.status_code
|
||||
|
||||
def get_profiling_cpu_status(self) -> bool:
|
||||
"""
|
||||
Returns True if CPU profiling is currently running, False otherwise.
|
||||
"""
|
||||
url = f"http://localhost:{self.external_port}/profile/cpu"
|
||||
res = self.get(url, auth=self.auth)
|
||||
res.raise_for_status()
|
||||
return res.status_code == 200
|
||||
|
||||
def database_schema(self, database: str):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",
|
||||
@@ -159,59 +192,16 @@ class EndpointHttpClient(requests.Session):
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def configure_failpoints(
|
||||
self, *args: tuple[str, str] | list[dict[str, str | dict[str, str]]]
|
||||
) -> None:
|
||||
"""Configure failpoints for testing purposes.
|
||||
def configure_failpoints(self, *args: tuple[str, str]) -> None:
|
||||
body: list[dict[str, str]] = []
|
||||
|
||||
Args:
|
||||
*args: Can be one of:
|
||||
- Variable number of (name, actions) tuples
|
||||
- Single list of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoints
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
client.configure_failpoints(("fp1", "return"), ("fp2", "sleep(1000)"))
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
|
||||
request_body: list[dict[str, Any]] = []
|
||||
|
||||
if (
|
||||
len(args) == 1
|
||||
and isinstance(args[0], list)
|
||||
and args[0]
|
||||
and isinstance(args[0][0], dict)
|
||||
):
|
||||
# Handle list of dicts (context-based failpoints)
|
||||
failpoint_configs = args[0]
|
||||
for config in failpoint_configs:
|
||||
server_config: dict[str, Any] = {
|
||||
"name": config["name"],
|
||||
"actions": config["actions"],
|
||||
for fp in args:
|
||||
body.append(
|
||||
{
|
||||
"name": fp[0],
|
||||
"action": fp[1],
|
||||
}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
request_body.append(server_config)
|
||||
else:
|
||||
# Handle tuples (basic failpoints)
|
||||
for fp in args:
|
||||
request_body.append(
|
||||
{
|
||||
"name": fp[0],
|
||||
"actions": fp[1],
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=request_body)
|
||||
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
@@ -2614,68 +2614,22 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
)
|
||||
return res.json()
|
||||
|
||||
def configure_failpoints(
|
||||
self,
|
||||
config_strings: tuple[str, str]
|
||||
| list[tuple[str, str]]
|
||||
| list[dict[str, str | dict[str, str]]],
|
||||
):
|
||||
"""
|
||||
Configure failpoints for testing purposes.
|
||||
|
||||
Args:
|
||||
config_strings: Can be one of:
|
||||
- Single tuple of (name, actions)
|
||||
- List of tuples [(name, actions), ...]
|
||||
- List of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoint
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
|
||||
# Multiple basic failpoints
|
||||
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
# Handle single tuple case
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
if isinstance(config_strings, tuple):
|
||||
config_strings = [config_strings]
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
# Convert to server format
|
||||
body: list[dict[str, str | dict[str, str]]] = []
|
||||
for config in config_strings:
|
||||
if isinstance(config, tuple):
|
||||
# Simple (name, actions) tuple
|
||||
body.append({"name": config[0], "actions": config[1]})
|
||||
elif isinstance(config, dict):
|
||||
# Dict with name, actions, and optional context_matchers
|
||||
server_config: dict[str, str | dict[str, str]] = {
|
||||
"name": config["name"],
|
||||
"actions": config["actions"],
|
||||
}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
body.append(server_config)
|
||||
else:
|
||||
raise ValueError(f"Invalid config format: {config}")
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.request(
|
||||
"PUT",
|
||||
f"{self.api_root()}/debug/v1/failpoints",
|
||||
json=body,
|
||||
f"{self.api}/debug/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
if res.status_code != 200:
|
||||
self.raise_api_exception(res)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
|
||||
def get_tenants_placement(self) -> defaultdict[str, dict[str, Any]]:
|
||||
"""
|
||||
|
||||
@@ -309,64 +309,25 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def configure_failpoints(
|
||||
self,
|
||||
config_strings: tuple[str, str]
|
||||
| list[tuple[str, str]]
|
||||
| list[dict[str, str | dict[str, str]]],
|
||||
):
|
||||
"""
|
||||
Configure failpoints for testing purposes.
|
||||
|
||||
Args:
|
||||
config_strings: Can be one of:
|
||||
- Single tuple of (name, actions)
|
||||
- List of tuples [(name, actions), ...]
|
||||
- List of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoint
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
|
||||
# Multiple basic failpoints
|
||||
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
# Handle single tuple case
|
||||
if isinstance(config_strings, tuple):
|
||||
config_strings = [config_strings]
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
# Convert to server format
|
||||
body: list[dict[str, str | dict[str, str]]] = []
|
||||
for config in config_strings:
|
||||
if isinstance(config, tuple):
|
||||
# Simple (name, actions) tuple
|
||||
body.append({"name": config[0], "actions": config[1]})
|
||||
elif isinstance(config, dict):
|
||||
# Dict with name, actions, and optional context_matchers
|
||||
server_config = {"name": config["name"], "actions": config["actions"]}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
body.append(server_config)
|
||||
else:
|
||||
raise ValueError(f"Invalid config format: {config}")
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.post(f"{self.base_url}/v1/failpoints", json=body)
|
||||
if res.status_code != 200:
|
||||
raise PageserverApiException(
|
||||
f"Failed to configure failpoints: {res.text}", res.status_code
|
||||
)
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def reload_auth_validation_keys(self):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
|
||||
|
||||
@@ -8,6 +8,7 @@ import pytest
|
||||
import requests
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
|
||||
from fixtures.utils import EnhancedJSONEncoder, wait_until
|
||||
|
||||
@@ -154,62 +155,25 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
if not self.is_testing_enabled:
|
||||
pytest.skip("safekeeper was built without 'testing' feature")
|
||||
|
||||
def configure_failpoints(
|
||||
self,
|
||||
config_strings: tuple[str, str]
|
||||
| list[tuple[str, str]]
|
||||
| list[dict[str, str | dict[str, str]]],
|
||||
):
|
||||
"""
|
||||
Configure failpoints for testing purposes.
|
||||
|
||||
Args:
|
||||
config_strings: Can be one of:
|
||||
- Single tuple of (name, actions)
|
||||
- List of tuples [(name, actions), ...]
|
||||
- List of dicts with keys: name, actions, and optionally context_matchers
|
||||
|
||||
Examples:
|
||||
# Basic failpoint
|
||||
client.configure_failpoints(("test_fp", "return(error)"))
|
||||
|
||||
# Multiple basic failpoints
|
||||
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
|
||||
|
||||
# Probability-based failpoint
|
||||
client.configure_failpoints(("test_fp", "50%return(error)"))
|
||||
|
||||
# Context-based failpoint
|
||||
client.configure_failpoints([{
|
||||
"name": "test_fp",
|
||||
"actions": "return(error)",
|
||||
"context_matchers": {"tenant_id": ".*test.*"}
|
||||
}])
|
||||
"""
|
||||
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
|
||||
self.is_testing_enabled_or_skip()
|
||||
|
||||
# Handle single tuple case
|
||||
if isinstance(config_strings, tuple):
|
||||
config_strings = [config_strings]
|
||||
pairs = [config_strings]
|
||||
else:
|
||||
pairs = config_strings
|
||||
|
||||
# Convert to server format
|
||||
body: list[dict[str, str | dict[str, str]]] = []
|
||||
for config in config_strings:
|
||||
if isinstance(config, tuple):
|
||||
# Simple (name, actions) tuple
|
||||
body.append({"name": config[0], "actions": config[1]})
|
||||
elif isinstance(config, dict):
|
||||
# Dict with name, actions, and optional context_matchers
|
||||
server_config = {"name": config["name"], "actions": config["actions"]}
|
||||
if "context_matchers" in config:
|
||||
server_config["context_matchers"] = config["context_matchers"]
|
||||
body.append(server_config)
|
||||
else:
|
||||
raise ValueError(f"Invalid config format: {config}")
|
||||
log.info(f"Requesting config failpoints: {repr(pairs)}")
|
||||
|
||||
res = self.post(f"http://localhost:{self.port}/v1/failpoints", json=body)
|
||||
if res.status_code != 200:
|
||||
raise RuntimeError(f"Failed to configure failpoints: {res.text}")
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/failpoints",
|
||||
json=[{"name": name, "actions": actions} for name, actions in pairs],
|
||||
)
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def tenant_delete_force(self, tenant_id: TenantId) -> dict[Any, Any]:
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
|
||||
@@ -6,6 +6,7 @@ import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tarfile
|
||||
import threading
|
||||
import time
|
||||
@@ -717,6 +718,33 @@ def skip_in_debug_build(reason: str):
|
||||
)
|
||||
|
||||
|
||||
def run_only_on_linux_kernel_higher_than(version: float, reason: str):
|
||||
"""
|
||||
Skip tests if the Linux kernel version is lower than the specified version.
|
||||
The version is specified as a float, e.g. 5.4 for kernel.
|
||||
|
||||
Also skips if the host is not Linux.
|
||||
"""
|
||||
|
||||
should_skip = False
|
||||
if sys.platform != "linux":
|
||||
should_skip = True
|
||||
else:
|
||||
try:
|
||||
kernel_version_list = os.uname()[2].split("-")[0].split(".")
|
||||
kernel_version = float(f"{kernel_version_list[0]}.{kernel_version_list[1]}")
|
||||
if kernel_version < version:
|
||||
should_skip = True
|
||||
except ValueError:
|
||||
log.error(f"Failed to parse kernel version: {os.uname()[2]}")
|
||||
should_skip = True
|
||||
|
||||
return pytest.mark.skipif(
|
||||
should_skip,
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
def skip_on_ci(reason: str):
|
||||
# `CI` variable is always set to `true` on GitHub
|
||||
# Ref: https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/store-information-in-variables#default-environment-variables
|
||||
|
||||
0
test_runner/regress/data/__init__.py
Normal file
0
test_runner/regress/data/__init__.py
Normal file
41
test_runner/regress/data/profile_pb2.py
Normal file
41
test_runner/regress/data/profile_pb2.py
Normal file
@@ -0,0 +1,41 @@
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# type: ignore
|
||||
# source: profile.proto
|
||||
"""Generated protocol buffer code."""
|
||||
from google.protobuf import descriptor as _descriptor
|
||||
from google.protobuf import descriptor_pool as _descriptor_pool
|
||||
from google.protobuf import symbol_database as _symbol_database
|
||||
from google.protobuf.internal import builder as _builder
|
||||
|
||||
# @@protoc_insertion_point(imports)
|
||||
|
||||
_sym_db = _symbol_database.Default()
|
||||
|
||||
|
||||
|
||||
|
||||
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rprofile.proto\x12\x12perftools.profiles\"\xd5\x03\n\x07Profile\x12\x32\n\x0bsample_type\x18\x01 \x03(\x0b\x32\x1d.perftools.profiles.ValueType\x12*\n\x06sample\x18\x02 \x03(\x0b\x32\x1a.perftools.profiles.Sample\x12,\n\x07mapping\x18\x03 \x03(\x0b\x32\x1b.perftools.profiles.Mapping\x12.\n\x08location\x18\x04 \x03(\x0b\x32\x1c.perftools.profiles.Location\x12.\n\x08\x66unction\x18\x05 \x03(\x0b\x32\x1c.perftools.profiles.Function\x12\x14\n\x0cstring_table\x18\x06 \x03(\t\x12\x13\n\x0b\x64rop_frames\x18\x07 \x01(\x03\x12\x13\n\x0bkeep_frames\x18\x08 \x01(\x03\x12\x12\n\ntime_nanos\x18\t \x01(\x03\x12\x16\n\x0e\x64uration_nanos\x18\n \x01(\x03\x12\x32\n\x0bperiod_type\x18\x0b \x01(\x0b\x32\x1d.perftools.profiles.ValueType\x12\x0e\n\x06period\x18\x0c \x01(\x03\x12\x0f\n\x07\x63omment\x18\r \x03(\x03\x12\x1b\n\x13\x64\x65\x66\x61ult_sample_type\x18\x0e \x01(\x03\"\'\n\tValueType\x12\x0c\n\x04type\x18\x01 \x01(\x03\x12\x0c\n\x04unit\x18\x02 \x01(\x03\"V\n\x06Sample\x12\x13\n\x0blocation_id\x18\x01 \x03(\x04\x12\r\n\x05value\x18\x02 \x03(\x03\x12(\n\x05label\x18\x03 \x03(\x0b\x32\x19.perftools.profiles.Label\"@\n\x05Label\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\x0b\n\x03str\x18\x02 \x01(\x03\x12\x0b\n\x03num\x18\x03 \x01(\x03\x12\x10\n\x08num_unit\x18\x04 \x01(\x03\"\xdd\x01\n\x07Mapping\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x14\n\x0cmemory_start\x18\x02 \x01(\x04\x12\x14\n\x0cmemory_limit\x18\x03 \x01(\x04\x12\x13\n\x0b\x66ile_offset\x18\x04 \x01(\x04\x12\x10\n\x08\x66ilename\x18\x05 \x01(\x03\x12\x10\n\x08\x62uild_id\x18\x06 \x01(\x03\x12\x15\n\rhas_functions\x18\x07 \x01(\x08\x12\x15\n\rhas_filenames\x18\x08 \x01(\x08\x12\x18\n\x10has_line_numbers\x18\t \x01(\x08\x12\x19\n\x11has_inline_frames\x18\n \x01(\x08\"v\n\x08Location\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x12\n\nmapping_id\x18\x02 \x01(\x04\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\x04\x12&\n\x04line\x18\x04 \x03(\x0b\x32\x18.perftools.profiles.Line\x12\x11\n\tis_folded\x18\x05 \x01(\x08\")\n\x04Line\x12\x13\n\x0b\x66unction_id\x18\x01 \x01(\x04\x12\x0c\n\x04line\x18\x02 \x01(\x03\"_\n\x08\x46unction\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\x03\x12\x13\n\x0bsystem_name\x18\x03 \x01(\x03\x12\x10\n\x08\x66ilename\x18\x04 \x01(\x03\x12\x12\n\nstart_line\x18\x05 \x01(\x03\x42-\n\x1d\x63om.google.perftools.profilesB\x0cProfileProtob\x06proto3')
|
||||
|
||||
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
|
||||
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'profile_pb2', globals())
|
||||
if _descriptor._USE_C_DESCRIPTORS == False:
|
||||
|
||||
DESCRIPTOR._options = None
|
||||
DESCRIPTOR._serialized_options = b'\n\035com.google.perftools.profilesB\014ProfileProto'
|
||||
_PROFILE._serialized_start=38
|
||||
_PROFILE._serialized_end=507
|
||||
_VALUETYPE._serialized_start=509
|
||||
_VALUETYPE._serialized_end=548
|
||||
_SAMPLE._serialized_start=550
|
||||
_SAMPLE._serialized_end=636
|
||||
_LABEL._serialized_start=638
|
||||
_LABEL._serialized_end=702
|
||||
_MAPPING._serialized_start=705
|
||||
_MAPPING._serialized_end=926
|
||||
_LOCATION._serialized_start=928
|
||||
_LOCATION._serialized_end=1046
|
||||
_LINE._serialized_start=1048
|
||||
_LINE._serialized_end=1089
|
||||
_FUNCTION._serialized_start=1091
|
||||
_FUNCTION._serialized_end=1186
|
||||
# @@protoc_insertion_point(module_scope)
|
||||
322
test_runner/regress/test_compute_profiling.py
Normal file
322
test_runner/regress/test_compute_profiling.py
Normal file
@@ -0,0 +1,322 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import gzip
|
||||
import io
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pytest
|
||||
from data.profile_pb2 import Profile # type: ignore
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.utils import run_only_on_default_postgres, run_only_on_linux_kernel_higher_than
|
||||
from google.protobuf.message import Message
|
||||
from requests import HTTPError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.endpoint.http import EndpointHttpClient
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
LINUX_VERSION_REQUIRED = 6.0
|
||||
PG_REASON = "test doesn't use postgres in a manner that requires any specific version"
|
||||
LINUX_REASON = f"test requires linux {LINUX_VERSION_REQUIRED} for ebpfs"
|
||||
|
||||
|
||||
def _start_profiling_cpu(
|
||||
client: EndpointHttpClient, event: threading.Event | None, archive: bool = False
|
||||
) -> Profile | None:
|
||||
"""
|
||||
Start CPU profiling for the compute node.
|
||||
"""
|
||||
log.info("Starting CPU profiling...")
|
||||
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
status, response = client.start_profiling_cpu(100, 30, archive=archive)
|
||||
match status:
|
||||
case 200:
|
||||
log.debug("CPU profiling finished")
|
||||
profile: Any = Profile()
|
||||
if archive:
|
||||
log.debug("Unpacking gzipped profile data")
|
||||
with gzip.GzipFile(fileobj=io.BytesIO(response)) as f:
|
||||
response = f.read()
|
||||
else:
|
||||
log.debug("Unpacking non-gzipped profile data")
|
||||
Message.ParseFromString(profile, response)
|
||||
return profile
|
||||
case 204:
|
||||
log.debug("CPU profiling was stopped")
|
||||
raise HTTPError("Failed to finish CPU profiling: was stopped.")
|
||||
case 409:
|
||||
log.debug("CPU profiling is already in progress, cannot start again")
|
||||
raise HTTPError("Failed to finish CPU profiling: profiling is already in progress.")
|
||||
case 500:
|
||||
response_str = response.decode("utf-8", errors="replace")
|
||||
log.debug(
|
||||
f"Failed to finish CPU profiling, was stopped or got an error: {status}: {response_str}"
|
||||
)
|
||||
raise HTTPError(f"Failed to finish CPU profiling, {status}: {response_str}")
|
||||
case _:
|
||||
log.error(f"Failed to start CPU profiling: {status}")
|
||||
raise HTTPError(f"Failed to start CPU profiling: {status}")
|
||||
|
||||
|
||||
def _stop_profiling_cpu(client: EndpointHttpClient, event: threading.Event | None):
|
||||
"""
|
||||
Stop CPU profiling for the compute node.
|
||||
"""
|
||||
log.info("Manually stopping CPU profiling...")
|
||||
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
status = client.stop_profiling_cpu()
|
||||
match status:
|
||||
case 200:
|
||||
log.debug("CPU profiling stopped successfully")
|
||||
case 412:
|
||||
log.debug("CPU profiling is not running, nothing to do")
|
||||
case _:
|
||||
log.error(f"Failed to stop CPU profiling: {status}")
|
||||
raise HTTPError(f"Failed to stop CPU profiling: {status}")
|
||||
return status
|
||||
|
||||
|
||||
def _wait_till_profiling_starts(
|
||||
http_client: EndpointHttpClient,
|
||||
event: threading.Event | None,
|
||||
repeat_delay_secs: float = 0.3,
|
||||
timeout: int = 60,
|
||||
) -> bool:
|
||||
"""
|
||||
Wait until CPU profiling starts.
|
||||
"""
|
||||
log.info("Waiting for CPU profiling to start...")
|
||||
|
||||
end_time = time.time() + timeout
|
||||
|
||||
while not http_client.get_profiling_cpu_status():
|
||||
time.sleep(repeat_delay_secs)
|
||||
|
||||
if end_time <= time.time():
|
||||
log.error("Timeout while waiting for CPU profiling to start")
|
||||
return False
|
||||
|
||||
log.info("CPU profiling has started successfully and is in progress")
|
||||
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _wait_and_assert_cpu_profiling(http_client: EndpointHttpClient, event: threading.Event | None):
|
||||
profile = _start_profiling_cpu(http_client, event)
|
||||
|
||||
if profile is None:
|
||||
log.error("The received profiling data is malformed or empty.")
|
||||
return
|
||||
|
||||
assert len(profile.sample) > 0, "No samples found in CPU profiling data"
|
||||
assert len(profile.mapping) > 0, "No mappings found in CPU profiling data"
|
||||
assert len(profile.location) > 0, "No locations found in CPU profiling data"
|
||||
assert len(profile.function) > 0, "No functions found in CPU profiling data"
|
||||
assert len(profile.string_table) > 0, "No string tables found in CPU profiling data"
|
||||
strings = [
|
||||
"PostgresMain",
|
||||
"ServerLoop",
|
||||
"BackgroundWorkerMain",
|
||||
"pq_recvbuf",
|
||||
"pq_getbyte",
|
||||
]
|
||||
|
||||
assert any(s in profile.string_table for s in strings), (
|
||||
f"Expected at least one of {strings} in string table, but none found"
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason=PG_REASON)
|
||||
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
|
||||
def test_compute_profiling_cpu_with_timeout(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that CPU profiling works correctly with timeout.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create database profiling_test2")
|
||||
http_client = endpoint.http_client()
|
||||
|
||||
def _wait_and_assert_cpu_profiling_local():
|
||||
_wait_and_assert_cpu_profiling(http_client, None)
|
||||
|
||||
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
|
||||
thread.start()
|
||||
|
||||
inserting_should_stop_event = threading.Event()
|
||||
|
||||
def insert_rows():
|
||||
lfc_conn = endpoint.connect(dbname="profiling_test2")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
n_records = 0
|
||||
lfc_cur.execute(
|
||||
"create table t(pk integer primary key, payload text default repeat('?', 128))"
|
||||
)
|
||||
batch_size = 1000
|
||||
|
||||
log.info("Inserting rows")
|
||||
|
||||
while not inserting_should_stop_event.is_set():
|
||||
n_records += batch_size
|
||||
lfc_cur.execute(
|
||||
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
|
||||
)
|
||||
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
thread2 = threading.Thread(target=insert_rows)
|
||||
|
||||
assert _wait_till_profiling_starts(http_client, None)
|
||||
time.sleep(4) # Give some time for the profiling to start
|
||||
thread2.start()
|
||||
|
||||
thread.join(timeout=60)
|
||||
inserting_should_stop_event.set() # Stop the insertion thread
|
||||
thread2.join(timeout=60)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason=PG_REASON)
|
||||
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
|
||||
def test_compute_profiling_cpu_with_archiving_the_response(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that CPU profiling works correctly with archiving the data.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
http_client = endpoint.http_client()
|
||||
|
||||
assert _start_profiling_cpu(http_client, None, archive=True) is not None, (
|
||||
"Failed to start and finish CPU profiling with archiving enabled"
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason=PG_REASON)
|
||||
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
|
||||
def test_compute_profiling_cpu_start_and_stop(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that CPU profiling can be started and stopped correctly.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
http_client = endpoint.http_client()
|
||||
|
||||
def _wait_and_assert_cpu_profiling():
|
||||
# Should raise as the profiling will be stopped.
|
||||
with pytest.raises(HTTPError) as _:
|
||||
_start_profiling_cpu(http_client, None)
|
||||
|
||||
thread = threading.Thread(target=_wait_and_assert_cpu_profiling)
|
||||
thread.start()
|
||||
|
||||
assert _wait_till_profiling_starts(http_client, None)
|
||||
_stop_profiling_cpu(http_client, None)
|
||||
|
||||
# Should raise as the profiling is already stopped.
|
||||
assert _stop_profiling_cpu(http_client, None) == 412
|
||||
|
||||
thread.join(timeout=60)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason=PG_REASON)
|
||||
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
|
||||
def test_compute_profiling_cpu_conflict(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that CPU profiling can be started once and the second time
|
||||
it will throw an error as it is already running.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
pg_conn = endpoint.connect()
|
||||
pg_cur = pg_conn.cursor()
|
||||
pg_cur.execute("create database profiling_test")
|
||||
http_client = endpoint.http_client()
|
||||
|
||||
def _wait_and_assert_cpu_profiling_local():
|
||||
_wait_and_assert_cpu_profiling(http_client, None)
|
||||
|
||||
thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local)
|
||||
thread.start()
|
||||
|
||||
assert _wait_till_profiling_starts(http_client, None)
|
||||
|
||||
inserting_should_stop_event = threading.Event()
|
||||
|
||||
def insert_rows():
|
||||
lfc_conn = endpoint.connect(dbname="profiling_test")
|
||||
lfc_cur = lfc_conn.cursor()
|
||||
n_records = 0
|
||||
lfc_cur.execute(
|
||||
"create table t(pk integer primary key, payload text default repeat('?', 128))"
|
||||
)
|
||||
batch_size = 1000
|
||||
|
||||
log.info("Inserting rows")
|
||||
|
||||
while not inserting_should_stop_event.is_set():
|
||||
n_records += batch_size
|
||||
lfc_cur.execute(
|
||||
f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))"
|
||||
)
|
||||
|
||||
log.info(f"Inserted {n_records} rows")
|
||||
|
||||
thread2 = threading.Thread(target=insert_rows)
|
||||
thread2.start()
|
||||
|
||||
# Should raise as the profiling is already in progress.
|
||||
with pytest.raises(HTTPError) as _:
|
||||
_start_profiling_cpu(http_client, None)
|
||||
|
||||
inserting_should_stop_event.set() # Stop the insertion thread
|
||||
|
||||
# The profiling should still be running and finish normally.
|
||||
thread.join(timeout=600)
|
||||
thread2.join(timeout=600)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason=PG_REASON)
|
||||
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
|
||||
def test_compute_profiling_cpu_stop_when_not_running(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that CPU profiling throws the expected error when is attempted
|
||||
to be stopped when it hasn't be running.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
http_client = endpoint.http_client()
|
||||
|
||||
for _ in range(3):
|
||||
status = _stop_profiling_cpu(http_client, None)
|
||||
assert status == 412
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason=PG_REASON)
|
||||
@run_only_on_linux_kernel_higher_than(version=LINUX_VERSION_REQUIRED, reason=LINUX_REASON)
|
||||
def test_compute_profiling_cpu_start_arguments_validation_works(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that CPU profiling start request properly validated the
|
||||
arguments and throws the expected error (bad request).
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
http_client = endpoint.http_client()
|
||||
|
||||
for sampling_frequency in [-1, 0, 1000000]:
|
||||
status, _ = http_client.start_profiling_cpu(sampling_frequency, 5)
|
||||
assert status == 422
|
||||
for timeout_seconds in [-1, 0, 1000000]:
|
||||
status, _ = http_client.start_profiling_cpu(5, timeout_seconds)
|
||||
assert status == 422
|
||||
@@ -28,6 +28,7 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
|
||||
clap = { version = "4", features = ["derive", "env", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
|
||||
const-oid = { version = "0.9", default-features = false, features = ["db", "std"] }
|
||||
criterion = { version = "0.5", features = ["html_reports"] }
|
||||
crypto-bigint = { version = "0.5", features = ["generic-array", "zeroize"] }
|
||||
der = { version = "0.7", default-features = false, features = ["derive", "flagset", "oid", "pem", "std"] }
|
||||
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
|
||||
@@ -54,7 +55,8 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
|
||||
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
|
||||
indexmap = { version = "2", features = ["serde"] }
|
||||
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
|
||||
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
@@ -70,7 +72,6 @@ num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
|
||||
@@ -89,6 +90,7 @@ serde_json = { version = "1", features = ["alloc", "raw_value"] }
|
||||
sha2 = { version = "0.10", features = ["asm", "oid"] }
|
||||
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
|
||||
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
|
||||
spin = { version = "0.9" }
|
||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
stable_deref_trait = { version = "1" }
|
||||
subtle = { version = "2" }
|
||||
@@ -101,6 +103,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["loggin
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io-util", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
tonic = { version = "0.12", default-features = false, features = ["codegen", "prost", "transport"] }
|
||||
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
@@ -120,13 +123,15 @@ anyhow = { version = "1", features = ["backtrace"] }
|
||||
bytes = { version = "1", features = ["serde"] }
|
||||
cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
clang-sys = { version = "1", default-features = false, features = ["clang_11_0", "runtime"] }
|
||||
clap = { version = "4", features = ["derive", "env", "string"] }
|
||||
clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] }
|
||||
either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
half = { version = "2", default-features = false, features = ["num-traits"] }
|
||||
hashbrown = { version = "0.14", features = ["raw"] }
|
||||
indexmap = { version = "2", features = ["serde"] }
|
||||
indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] }
|
||||
indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
@@ -139,7 +144,6 @@ num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-rational = { version = "0.4", default-features = false, features = ["num-bigint-std", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
prettyplease = { version = "0.2", default-features = false, features = ["verbatim"] }
|
||||
proc-macro2 = { version = "1" }
|
||||
|
||||
Reference in New Issue
Block a user