From d8b0c0834e1b6e850827e6fc34d9fd1c6087ab7b Mon Sep 17 00:00:00 2001 From: Victor Polevoy Date: Tue, 17 Jun 2025 14:19:29 +0200 Subject: [PATCH] Implement HTTP endpoint for compute profiling. Exposes an endpoint "/profile/cpu" for profiling the postgres processes (currently spawned and the new ones) using "perf". Adds the corresponding python test to test the added endpoint and confirm the output expected is the profiling data in the expected format. Add "perf" binary to the sudo list. Fix python poetry ruff Address the clippy lints Document the code Format python code Address code review Prettify Embed profile_pb2.py and small code/test fixes. Make the code slightly better. 1. Makes optional the sampling_frequency parameter for profiling. 2. Avoids using unsafe code when killing a child. Better code, better tests More tests Separate start and stop of profiling Correctly check for the exceptions Address clippy lint Final fixes. 1. Allows the perf to be found in $PATH instead of having the path hardcoded. 2. Changes the path to perf in the sudoers file so that the compute can run it properly. 3. Changes the way perf is invoked, now it is with sudo and the path from $PATH. 4. Removes the authentication requirement from the /profile/cpu/ endpoint. hakari thing Python fixes Fix python formatting More python fixes Update poetry lock Fix ruff Address the review comments Fix the tests Try fixing the flaky test for pg17? Try fixing the flaky test for pg17? PYTHON Fix the tests Remove the PROGRESS parameter Remove unused Increase the timeout due to concurrency Increase the timeout to 60 Increase the profiling window timeout Try this Lets see the error Just log all the errors Add perf into the build environment uijdfghjdf Update tempfile to 3.20 Snapshot Use bbc-profile Update tempfile to 3.20 Provide bpfcc-tools in debian Properly respond with status Python check Fix build-tools dockerfile Add path probation for the bcc profile Try err printing Refactor Add bpfcc-tools to the final image Add error context sudo not found? Print more errors for verbosity Remove procfs and use libproc Update hakari Debug sudo in CI Rebase and adjust hakari remove leftover Add archiving support Correct the paths to the perf binary Try hardcoded sudo path Add sudo into build-tools dockerfile Minor cleanup Print out the sudoers file from github Stop the tests earlier Add the sudoers entry for nonroot, install kmod for modprobe for bcc-profile Try hacking the kernel headers for bcc-profile Redeclare the kernel version argument Try using the kernel of the runner Try another way Check bpfcc-tools --- Cargo.lock | 229 ++++- Cargo.toml | 2 + Dockerfile | 2 + build-tools.Dockerfile | 45 +- compute/vm-image-spec-bookworm.yaml | 4 +- compute/vm-image-spec-bullseye.yaml | 4 +- compute_tools/Cargo.toml | 9 + compute_tools/src/compute.rs | 4 +- compute_tools/src/http/routes/mod.rs | 1 + compute_tools/src/http/routes/profile.rs | 217 +++++ compute_tools/src/http/server.rs | 11 +- compute_tools/src/lib.rs | 1 + compute_tools/src/profiling/mod.rs | 862 ++++++++++++++++++ libs/neon-shmem/Cargo.toml | 4 +- poetry.lock | 35 +- pyproject.toml | 4 + test_runner/fixtures/endpoint/http.py | 33 + test_runner/regress/data/__init__.py | 0 test_runner/regress/data/profile_pb2.py | 41 + test_runner/regress/test_compute_profiling.py | 314 +++++++ workspace_hack/Cargo.toml | 1 + 21 files changed, 1795 insertions(+), 28 deletions(-) create mode 100644 compute_tools/src/http/routes/profile.rs create mode 100644 compute_tools/src/profiling/mod.rs create mode 100644 test_runner/regress/data/__init__.py create mode 100644 test_runner/regress/data/profile_pb2.py create mode 100644 test_runner/regress/test_compute_profiling.py diff --git a/Cargo.lock b/Cargo.lock index c528354053..65dcd5c3c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -962,6 +962,24 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +dependencies = [ + "bitflags 2.8.0", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.100", +] + [[package]] name = "bindgen" version = "0.71.1" @@ -1336,8 +1354,10 @@ dependencies = [ "hostname-validator", "http 1.1.0", "indexmap 2.9.0", + "inferno 0.12.0", "itertools 0.10.5", "jsonwebtoken", + "libproc", "metrics", "nix 0.30.1", "notify", @@ -1351,6 +1371,8 @@ dependencies = [ "postgres-types", "postgres_initdb", "postgres_versioninfo", + "pprof 0.15.0", + "prost 0.12.6", "regex", "remote_storage", "reqwest", @@ -1362,6 +1384,7 @@ dependencies = [ "serde_with", "signal-hook", "tar", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-postgres", @@ -2198,12 +2221,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2533,6 +2556,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "gettid" version = "0.1.3" @@ -2790,6 +2825,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -2899,7 +2943,7 @@ dependencies = [ "jsonwebtoken", "metrics", "once_cell", - "pprof", + "pprof 0.14.0", "regex", "routerify", "rustls 0.23.27", @@ -3561,7 +3605,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" dependencies = [ - "spin", + "spin 0.9.8", ] [[package]] @@ -3586,6 +3630,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libproc" +version = "0.14.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78a09b56be5adbcad5aa1197371688dc6bb249a26da3bca2011ee2fb987ebfb" +dependencies = [ + "bindgen 0.70.1", + "errno", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -3598,6 +3653,12 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0b5399f6804fbab912acbd8878ed3532d506b7c951b8f9f164ef90fef39e3f4" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.7.4" @@ -4390,7 +4451,7 @@ dependencies = [ "postgres_ffi_types", "postgres_initdb", "posthog_client_lite", - "pprof", + "pprof 0.14.0", "pq_proto", "procfs", "rand 0.8.5", @@ -4965,7 +5026,7 @@ name = "postgres_ffi" version = "0.1.0" dependencies = [ "anyhow", - "bindgen", + "bindgen 0.71.1", "bytes", "crc32c", "criterion", @@ -4976,7 +5037,7 @@ dependencies = [ "postgres", "postgres_ffi_types", "postgres_versioninfo", - "pprof", + "pprof 0.14.0", "regex", "serde", "thiserror 1.0.69", @@ -5066,6 +5127,30 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pprof" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187" +dependencies = [ + "aligned-vec", + "backtrace", + "cfg-if", + "findshlibs", + "inferno 0.11.21", + "libc", + "log", + "nix 0.26.4", + "once_cell", + "protobuf", + "protobuf-codegen", + "smallvec", + "spin 0.10.0", + "symbolic-demangle", + "tempfile", + "thiserror 2.0.11", +] + [[package]] name = "pprof_util" version = "0.7.0" @@ -5141,7 +5226,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix", + "rustix 0.38.41", ] [[package]] @@ -5277,6 +5362,57 @@ dependencies = [ "prost 0.13.5", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-codegen" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace" +dependencies = [ + "anyhow", + "once_cell", + "protobuf", + "protobuf-parse", + "regex", + "tempfile", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-parse" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973" +dependencies = [ + "anyhow", + "indexmap 2.9.0", + "log", + "protobuf", + "protobuf-support", + "tempfile", + "thiserror 1.0.69", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "proxy" version = "0.1.0" @@ -5448,6 +5584,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "rand" version = "0.7.3" @@ -6027,6 +6169,19 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustix" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +dependencies = [ + "bitflags 2.8.0", + "errno", + "libc", + "linux-raw-sys 0.9.4", + "windows-sys 0.59.0", +] + [[package]] name = "rustls" version = "0.21.12" @@ -6209,7 +6364,7 @@ dependencies = [ "postgres_backend", "postgres_ffi", "postgres_versioninfo", - "pprof", + "pprof 0.14.0", "pq_proto", "rand 0.8.5", "regex", @@ -6798,6 +6953,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" +dependencies = [ + "lock_api", +] + [[package]] name = "spinning_top" version = "0.3.0" @@ -7157,14 +7321,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.14.0" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ - "cfg-if", "fastrand 2.2.0", + "getrandom 0.3.3", "once_cell", - "rustix", + "rustix 1.0.7", "windows-sys 0.59.0", ] @@ -8177,7 +8341,7 @@ dependencies = [ "pem", "pin-project-lite", "postgres_connection", - "pprof", + "pprof 0.14.0", "pq_proto", "rand 0.8.5", "regex", @@ -8284,7 +8448,7 @@ dependencies = [ "pageserver_api", "postgres_ffi", "postgres_ffi_types", - "pprof", + "pprof 0.14.0", "prost 0.13.5", "remote_storage", "serde", @@ -8314,7 +8478,7 @@ name = "walproposer" version = "0.1.0" dependencies = [ "anyhow", - "bindgen", + "bindgen 0.71.1", "postgres_ffi", "utils", ] @@ -8341,6 +8505,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasite" version = "0.1.0" @@ -8470,6 +8643,18 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.41", +] + [[package]] name = "whoami" version = "1.5.1" @@ -8698,6 +8883,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "workspace_hack" version = "0.1.0" @@ -8713,6 +8907,7 @@ dependencies = [ "camino", "cc", "chrono", + "clang-sys", "clap", "clap_builder", "const-oid", diff --git a/Cargo.toml b/Cargo.toml index 0d521ee4d9..5fdf8e9a3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] } jsonwebtoken = "9" lasso = "0.7" libc = "0.2" +libproc = "0.14" md5 = "0.7.0" measured = { version = "0.0.22", features=["lasso"] } measured-process = { version = "0.0.22" } @@ -278,6 +279,7 @@ safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" } safekeeper_client = { path = "./safekeeper/client" } storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy. storage_controller_client = { path = "./storage_controller/client" } +tempfile = "3" tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" } tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } diff --git a/Dockerfile b/Dockerfile index 55b87d4012..231d0aef1d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -109,6 +109,8 @@ RUN set -e \ libreadline-dev \ libseccomp-dev \ ca-certificates \ + bpfcc-tools \ + sudo \ openssl \ unzip \ curl \ diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index 14a52bd736..ddc32b2c9c 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -61,6 +61,9 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \ libpq5 \ libpq-dev \ libzstd-dev \ + linux-perf \ + bpfcc-tools \ + linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \ postgresql-16 \ postgresql-server-dev-16 \ postgresql-common \ @@ -105,15 +108,21 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \ # # 'gdb' is included so that we get backtraces of core dumps produced in # regression tests -RUN set -e \ +RUN set -ex \ + && KERNEL_VERSION="$(uname -r | cut -d'-' -f1 | sed 's/\.0$//')" \ + && echo KERNEL_VERSION=${KERNEL_VERSION} >> /etc/environment \ + && KERNEL_ARCH=$(uname -m | awk '{ if ($1 ~ /^(x86_64|i[3-6]86)$/) print "x86"; else if ($1 ~ /^(aarch64|arm.*)$/) print "aarch"; else print $1 }') \ + && echo KERNEL_ARCH=${KERNEL_ARCH} >> /etc/environment \ && apt update \ && apt install -y \ autoconf \ automake \ + bc \ bison \ build-essential \ ca-certificates \ cmake \ + cpio \ curl \ flex \ gdb \ @@ -122,8 +131,10 @@ RUN set -e \ gzip \ jq \ jsonnet \ + kmod \ libcurl4-openssl-dev \ libbz2-dev \ + libelf-dev \ libffi-dev \ liblzma-dev \ libncurses5-dev \ @@ -137,6 +148,11 @@ RUN set -e \ libxml2-dev \ libxmlsec1-dev \ libxxhash-dev \ + linux-perf \ + bpfcc-tools \ + libbpfcc \ + libbpfcc-dev \ + linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \ lsof \ make \ netcat-openbsd \ @@ -144,12 +160,35 @@ RUN set -e \ openssh-client \ parallel \ pkg-config \ + rsync \ + sudo \ unzip \ wget \ xz-utils \ zlib1g-dev \ zstd \ - && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \ + && git clone --depth 1 --branch v${KERNEL_VERSION} https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git \ + && cd linux \ + && make mrproper \ + && make defconfig \ + && scripts/config --module CONFIG_IKHEADERS \ + && scripts/config --enable CONFIG_MODULE_COMPRESS \ + && scripts/config --disable CONFIG_MODULE_COMPRESS_GZIP \ + && scripts/config --enable CONFIG_MODULE_COMPRESS_ZSTD \ + && make olddefconfig \ + && make modules_prepare -j \ + && make WERROR=0 NO_WERROR=1 -j 10 \ + && make WERROR=0 NO_WERROR=1 modules -j10 \ + && mkdir -p /lib/modules/$(uname -r)/build \ + && mkdir -p /lib/modules/$(uname -r)/kernel/kernel \ + && cp -a include arch/${KERNEL_ARCH}/include scripts Module.symvers .config Makefile /lib/modules/$(uname -r)/build/ \ + && make headers_install INSTALL_HDR_PATH=/lib/modules/$(uname -r)/build \ + && cp -a arch/${KERNEL_ARCH}/include /lib/modules/$(uname -r)/build/arch/${KERNEL_ARCH}/include \ + && zstd -19 ./kernel/kheaders.ko -o ./kernel/kheaders.ko.zst \ + && cp -a kernel/kheaders.ko.zst /lib/modules/$(uname -r)/kernel/kernel \ + && execsnoop-bpfcc \ + && rm -rf linux # sql_exporter @@ -198,6 +237,8 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o / # Configure sudo & docker RUN usermod -aG sudo nonroot && \ echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \ + mkdir -p /etc/sudoers.d && \ + echo 'nonroot ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/nonroot && \ usermod -aG docker nonroot # AWS CLI diff --git a/compute/vm-image-spec-bookworm.yaml b/compute/vm-image-spec-bookworm.yaml index 267e4c83b5..3372293f5d 100644 --- a/compute/vm-image-spec-bookworm.yaml +++ b/compute/vm-image-spec-bookworm.yaml @@ -65,7 +65,7 @@ files: # regardless of hostname (ALL) # # Also allow it to shut down the VM. The fast_import job does that when it's finished. - postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd + postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc - filename: cgconfig.conf content: | # Configuration for cgroups in VM compute nodes @@ -152,6 +152,8 @@ merge: | RUN set -e \ && chmod 0644 /etc/cgconfig.conf + ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf + COPY compute_rsyslog.conf /etc/compute_rsyslog.conf RUN chmod 0666 /etc/compute_rsyslog.conf diff --git a/compute/vm-image-spec-bullseye.yaml b/compute/vm-image-spec-bullseye.yaml index 2b6e77b656..4e5a0d3ca6 100644 --- a/compute/vm-image-spec-bullseye.yaml +++ b/compute/vm-image-spec-bullseye.yaml @@ -65,7 +65,7 @@ files: # regardless of hostname (ALL) # # Also allow it to shut down the VM. The fast_import job does that when it's finished. - postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd + postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc - filename: cgconfig.conf content: | # Configuration for cgroups in VM compute nodes @@ -148,6 +148,8 @@ merge: | RUN set -e \ && chmod 0644 /etc/cgconfig.conf + ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf + COPY compute_rsyslog.conf /etc/compute_rsyslog.conf RUN chmod 0666 /etc/compute_rsyslog.conf RUN mkdir /var/log/rsyslog && chown -R postgres /var/log/rsyslog diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 910bae3bda..b0261c3227 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -31,6 +31,7 @@ hostname-validator = "1.1" indexmap.workspace = true itertools.workspace = true jsonwebtoken.workspace = true +libproc.workspace = true metrics.workspace = true nix.workspace = true notify.workspace = true @@ -49,6 +50,7 @@ serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true tar.workspace = true +tempfile.workspace = true tower.workspace = true tower-http.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } @@ -78,3 +80,10 @@ zstd = "0.13" bytes = "1.0" rust-ini = "0.20.0" rlimit = "0.10.1" + +inferno = { version = "0.12", default-features = false, features = [ + "multithreaded", + "nameattr", +] } +pprof = { version = "0.15", features = ["protobuf-codec", "flamegraph"] } +prost = "0.12" diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c05cc229a2..6bac941892 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -371,7 +371,9 @@ fn maybe_cgexec(cmd: &str) -> Command { } } -struct PostgresHandle { +/// A handle to the Postgres process that is running in the compute +/// node. +pub struct PostgresHandle { postgres: std::process::Child, log_collector: JoinHandle>, } diff --git a/compute_tools/src/http/routes/mod.rs b/compute_tools/src/http/routes/mod.rs index dd71f663eb..4458714eef 100644 --- a/compute_tools/src/http/routes/mod.rs +++ b/compute_tools/src/http/routes/mod.rs @@ -15,6 +15,7 @@ pub(in crate::http) mod lfc; pub(in crate::http) mod metrics; pub(in crate::http) mod metrics_json; pub(in crate::http) mod promote; +pub(in crate::http) mod profile; pub(in crate::http) mod status; pub(in crate::http) mod terminate; diff --git a/compute_tools/src/http/routes/profile.rs b/compute_tools/src/http/routes/profile.rs new file mode 100644 index 0000000000..0aed00f22a --- /dev/null +++ b/compute_tools/src/http/routes/profile.rs @@ -0,0 +1,217 @@ +//! Contains the route for profiling the compute. +//! +//! Profiling the compute means generating a pprof profile of the +//! postgres processes. +//! +//! The profiling is done using the `perf` tool, which is expected to be +//! available somewhere in `$PATH`. +use std::sync::atomic::Ordering; + +use axum::Json; +use axum::response::IntoResponse; +use http::StatusCode; +use nix::unistd::Pid; +use once_cell::sync::Lazy; +use tokio::sync::Mutex; + +use crate::http::JsonResponse; + +static CANCEL_CHANNEL: Lazy>>> = + Lazy::new(|| Mutex::new(None)); + +fn default_sampling_frequency() -> u16 { + 100 +} + +fn default_timeout_seconds() -> u8 { + 5 +} + +fn deserialize_sampling_frequency<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::Deserialize; + + const MIN_SAMPLING_FREQUENCY: u16 = 1; + const MAX_SAMPLING_FREQUENCY: u16 = 1000; + + let value = u16::deserialize(deserializer)?; + + if !(MIN_SAMPLING_FREQUENCY..=MAX_SAMPLING_FREQUENCY).contains(&value) { + return Err(serde::de::Error::custom(format!( + "sampling_frequency must be between {MIN_SAMPLING_FREQUENCY} and {MAX_SAMPLING_FREQUENCY}, got {value}" + ))); + } + Ok(value) +} + +fn deserialize_profiling_timeout<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::Deserialize; + + const MIN_TIMEOUT_SECONDS: u8 = 1; + const MAX_TIMEOUT_SECONDS: u8 = 60; + + let value = u8::deserialize(deserializer)?; + + if !(MIN_TIMEOUT_SECONDS..=MAX_TIMEOUT_SECONDS).contains(&value) { + return Err(serde::de::Error::custom(format!( + "timeout_seconds must be between {MIN_TIMEOUT_SECONDS} and {MAX_TIMEOUT_SECONDS}, got {value}" + ))); + } + Ok(value) +} + +/// Request parameters for profiling the compute. +#[derive(Debug, Clone, serde::Deserialize)] +pub(in crate::http) struct ProfileRequest { + /// The profiling tool to use, currently only `perf` is supported. + profiler: crate::profiling::ProfileGenerator, + #[serde(default = "default_sampling_frequency")] + #[serde(deserialize_with = "deserialize_sampling_frequency")] + sampling_frequency: u16, + #[serde(default = "default_timeout_seconds")] + #[serde(deserialize_with = "deserialize_profiling_timeout")] + timeout_seconds: u8, + #[serde(default)] + archive: bool, +} + +/// The HTTP request handler for reporting the profiling status of +/// the compute. +pub(in crate::http) async fn profile_status() -> impl IntoResponse { + tracing::info!("Profile status request received."); + + let cancel_channel = CANCEL_CHANNEL.lock().await; + + if let Some(tx) = cancel_channel.as_ref() { + if tx.receiver_count() > 0 { + return JsonResponse::create_response( + StatusCode::OK, + "Profiling is currently in progress.", + ); + } + } + + JsonResponse::create_response(StatusCode::NO_CONTENT, "Profiling is not in progress.") +} + +/// The HTTP request handler for stopping profiling the compute. +pub(in crate::http) async fn profile_stop() -> impl IntoResponse { + tracing::info!("Profile stop request received."); + + match CANCEL_CHANNEL.lock().await.take() { + Some(tx) => { + if tx.send(()).is_err() { + tracing::error!("Failed to send cancellation signal."); + return JsonResponse::create_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to send cancellation signal", + ); + } + JsonResponse::create_response(StatusCode::OK, "Profiling stopped successfully.") + } + None => JsonResponse::create_response( + StatusCode::PRECONDITION_FAILED, + "Profiling is not in progress, there is nothing to stop.", + ), + } +} + +/// The HTTP request handler for starting profiling the compute. +pub(in crate::http) async fn profile_start( + Json(request): Json, +) -> impl IntoResponse { + tracing::info!("Profile start request received: {request:?}"); + + let tx = tokio::sync::broadcast::Sender::<()>::new(1); + + { + let mut cancel_channel = CANCEL_CHANNEL.lock().await; + + if cancel_channel.is_some() { + return JsonResponse::create_response( + StatusCode::CONFLICT, + "Profiling is already in progress.", + ); + } + *cancel_channel = Some(tx.clone()); + } + + tracing::info!("Profiling will start with parameters: {request:?}"); + let pg_pid = Pid::from_raw(crate::compute::PG_PID.load(Ordering::SeqCst) as _); + + let run_with_sudo = !cfg!(feature = "testing"); + + let options = crate::profiling::ProfileGenerationOptions { + profiler: request.profiler, + run_with_sudo, + pids: [pg_pid].into_iter().collect(), + follow_forks: true, + sampling_frequency: request.sampling_frequency as u32, + blocklist_symbols: vec![ + "libc".to_owned(), + "libgcc".to_owned(), + "pthread".to_owned(), + "vdso".to_owned(), + ], + archive: request.archive, + }; + + let options = crate::profiling::ProfileGenerationTaskOptions { + options, + timeout: std::time::Duration::from_secs(request.timeout_seconds as u64), + should_stop: Some(tx), + }; + + let pprof_data = crate::profiling::generate_pprof_profile(options).await; + + if CANCEL_CHANNEL.lock().await.take().is_none() { + tracing::error!("Profiling was cancelled from another request."); + + return JsonResponse::create_response( + StatusCode::NO_CONTENT, + "Profiling was cancelled from another request.", + ); + } + + let pprof_data = match pprof_data { + Ok(data) => data, + Err(e) => { + tracing::error!(error = ?e, "failed to generate pprof data"); + return JsonResponse::create_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to generate pprof data: {e:?}"), + ); + } + }; + + tracing::info!("Profiling has completed successfully."); + + let mut headers = http::HeaderMap::new(); + + if request.archive { + headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/gzip"), + ); + headers.insert( + http::header::CONTENT_DISPOSITION, + http::HeaderValue::from_static("attachment; filename=\"profile.pb.gz\""), + ); + } else { + headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/octet-stream"), + ); + headers.insert( + http::header::CONTENT_DISPOSITION, + http::HeaderValue::from_static("attachment; filename=\"profile.pb\""), + ); + } + + (headers, pprof_data.0).into_response() +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 17939e39d4..4f993b6051 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -27,6 +27,7 @@ use super::{ }, }; use crate::compute::ComputeNode; +use crate::http::routes::profile; /// `compute_ctl` has two servers: internal and external. The internal server /// binds to the loopback interface and handles communication from clients on @@ -81,8 +82,14 @@ impl From<&Server> for Router> { Server::External { config, compute_id, .. } => { - let unauthenticated_router = - Router::>::new().route("/metrics", get(metrics::get_metrics)); + let unauthenticated_router = Router::>::new() + .route("/metrics", get(metrics::get_metrics)) + .route( + "/profile/cpu", + get(profile::profile_status) + .post(profile::profile_start) + .delete(profile::profile_stop), + ); let authenticated_router = Router::>::new() .route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm)) diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 2d5d4565b7..a9a6506b5e 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -24,6 +24,7 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod pgbouncer; +pub mod profiling; pub mod rsyslog; pub mod spec; mod spec_apply; diff --git a/compute_tools/src/profiling/mod.rs b/compute_tools/src/profiling/mod.rs new file mode 100644 index 0000000000..1a7c6ca3f3 --- /dev/null +++ b/compute_tools/src/profiling/mod.rs @@ -0,0 +1,862 @@ +use std::{ + collections::{HashMap, HashSet}, + io::{BufRead, BufReader, Cursor, Write}, + ops::{Deref, DerefMut}, + path::PathBuf, +}; + +use anyhow::{Context, anyhow}; +use flate2::{Compression, write::GzEncoder}; +use inferno::collapse::Collapse; +use nix::{libc::pid_t, unistd::Pid}; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; +use tracing::instrument; + +const SUDO_PATH: &str = "/usr/bin/sudo"; +const PERF_BINARY_DEFAULT_PATH: &str = "perf"; +const BCC_PROFILE_BINARY_DEFAULT_PATH: &str = "/usr/share/bcc/tools/profile"; +const STANDARD_PATH_PATHS: &str = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"; + +/// Either returns the path to the `sudo` binary +/// from the environment variable `SUDO_BINARY_PATH` +/// or returns the default path defined by +/// [`SUDO_PATH`]. +fn sudo_path() -> String { + std::env::var("SUDO_BINARY_PATH").unwrap_or_else(|_| SUDO_PATH.to_owned()) +} + +/// Either returns the path to the `perf` binary +/// from the environment variable `PERF_BINARY_PATH` +/// or returns the default path defined by +/// [`PERF_BINARY_DEFAULT_PATH`]. +fn perf_binary_path() -> String { + std::env::var("PERF_BINARY_PATH").unwrap_or_else(|_| PERF_BINARY_DEFAULT_PATH.to_owned()) +} + +/// Either returns the path to the `bcc-profile` binary +/// from the environment variable `BCC_PROFILE_BINARY_PATH` +/// or returns the default path defined by +/// [`BCC_PROFILE_BINARY_DEFAULT_PATH`]. +fn bcc_profile_binary_path() -> String { + std::env::var("BCC_PROFILE_BINARY_PATH") + .unwrap_or_else(|_| BCC_PROFILE_BINARY_DEFAULT_PATH.to_owned()) +} + +/// Probes the executable at the given paths for the provided binary +/// names. +/// +/// To check that the binary can run, it is invoked with the arguments. +fn probe_executable_at_paths( + exact_matches: &[&str], + binary_names: &[&str], + paths: &[PathBuf], + arg: Option<&str>, + path_env_override: Option, +) -> anyhow::Result { + let check_binary_runs = move |path: &str| { + let mut command = vec![path]; + + if let Some(arg) = arg { + command.push(arg); + } + + check_binary_runs(&command, path_env_override.clone()) + }; + + let mut probed = Vec::new(); + + for exact_match in exact_matches { + match check_binary_runs(exact_match) { + Ok(_) => { + tracing::trace!("Found exact match for binary: {exact_match}"); + return Ok(exact_match.to_owned().to_owned()); + } + Err(e) => { + probed.push(exact_match.to_owned().to_owned()); + tracing::trace!("Failed to run the binary at path: {exact_match}: {e:?}"); + } + } + } + + for path in paths { + for name in binary_names { + let full_path = path.join(name); + + tracing::trace!("Looking at path: {}", full_path.to_string_lossy()); + + if full_path.exists() && full_path.is_file() { + tracing::trace!( + "There is an existing file at path: {}", + full_path.to_string_lossy() + ); + + match check_binary_runs(full_path.to_string_lossy().as_ref()) { + Ok(_) => { + tracing::trace!( + "Found valid binary at path: {}", + full_path.to_string_lossy() + ); + + return Ok(full_path.to_string_lossy().into_owned()); + } + Err(e) => { + tracing::trace!( + "Failed to run the binary at path: {}: {e:?}", + full_path.to_string_lossy(), + ); + } + } + } + + probed.push(full_path.to_string_lossy().into_owned()); + } + } + + Err(anyhow!( + "No valid binary found in the paths: {paths:?}, probed paths: {probed:?}" + )) +} + +/// Probes the executable in the known paths for the provided binary +/// names. At first, the function checks the exact matches +/// provided in the `exact_matches` slice. If any of those binaries +/// can be run with the provided `arg`, it returns the path to that +/// binary. If no exact matches are found, it checks the standard +/// paths defined by [`STANDARD_PATH_PATHS`] and looks for the binaries +/// with the names provided in the `binary_names` slice. +/// If any of those binaries can be run with the provided `arg`, it +/// returns the path to that binary. +/// If no valid binary is found, it returns an error. +fn probe_executable_in_known_paths( + exact_matches: &[&str], + binary_names: &[&str], + arg: Option<&str>, + path_env_override: Option, +) -> anyhow::Result { + let paths = STANDARD_PATH_PATHS + .split(':') + .map(PathBuf::from) + .collect::>(); + + probe_executable_at_paths(exact_matches, binary_names, &paths, arg, path_env_override) +} + +/// Returns the path to the `sudo` binary if it can be run. +fn probe_sudo_paths() -> anyhow::Result { + let sudo_path = sudo_path(); + let exact_matches = [sudo_path.as_ref()]; + let binary_names = ["sudo"]; + + probe_executable_in_known_paths(&exact_matches, &binary_names, Some("--version"), None) +} + +/// Checks the known paths for `bcc-profile` binary and verifies that it +/// can be run. +/// +/// First, if the provided path is `Some`, it checks if the binary +/// at that path can be run with `--help` argument. If it can, it +/// returns the path. +/// +/// If the provided path is `None`, it checks the default `bcc-profile` +/// binary path defined by [`BCC_PROFILE_BINARY_DEFAULT_PATH`] constant. +/// If that binary can be run, it returns the path. +/// +/// If neither the provided path nor the default path can be run, +/// it checks the standard paths defined by [`STANDARD_PATH_PATHS`] +/// and looks for known binary names like `bcc-profile`, `profile.py`, +/// or `profile-bpfcc` and other known names from various linux +/// distributions. If any of those binaries can be run, it returns the +/// path to that binary. If no valid binary is found, it returns an +/// error. +fn probe_bcc_profile_binary_paths(provided_path: Option) -> anyhow::Result { + let binary_names = ["bcc-profile", "profile.py", "profile-bpfcc", "profile"]; + + let mut exact_matches = Vec::new(); + + if let Some(path) = provided_path.as_ref() { + if let Some(path) = path.to_str() { + exact_matches.push(path); + } + } + + let bcc_profile_binary_path = bcc_profile_binary_path(); + exact_matches.push(&bcc_profile_binary_path); + + probe_executable_in_known_paths( + &exact_matches, + &binary_names, + Some("--help"), + Some(get_override_path_env()), + ) +} + +/// Represents the pprof data generated from a profiling tool. +#[repr(transparent)] +#[derive(Debug, Clone)] +pub struct PprofData(pub(crate) Vec); + +impl PprofData { + /// Dumps the pprof data to a file. + pub fn write_to_file(&self, path: &PathBuf) -> anyhow::Result<()> { + let mut file = std::fs::File::create(path).context(format!( + "couldn't create a file for dumping pprof data at path: {path:?}" + ))?; + file.write_all(&self.0) + .context("couldn't write pprof raw data to the file at path: {path:?}")?; + Ok(()) + } +} + +impl Deref for PprofData { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for PprofData { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Returns a list of child processes for a given parent process ID. +fn list_children_processes(parent_pid: Pid) -> anyhow::Result> { + use libproc::processes::{ProcFilter, pids_by_type}; + + let filter = ProcFilter::ByParentProcess { + ppid: parent_pid.as_raw() as _, + }; + + Ok(pids_by_type(filter) + .context("failed to list child processes")? + .into_iter() + .map(|p| Pid::from_raw(p as _)) + .collect()) +} + +fn check_binary_runs(command: &[&str], override_path: Option) -> anyhow::Result<()> { + if command.is_empty() { + return Err(anyhow!("Command cannot be empty")); + } + + // One by one the strings from the command are joined to form the + // word in the invocation of the binary. + let (first, rest) = command + .split_first() + .ok_or_else(|| anyhow!("Command must contain at least one argument (the binary name)"))?; + + let mut output = std::process::Command::new(first); + + for arg in rest { + output.arg(arg); + } + + let command_invocation = get_command_invocation_string(&output); + + if let Some(path) = override_path { + output.env("PATH", path); + } + + let output = output.output().context(format!( + "failed to run the command to check if it can be run: {command_invocation}" + ))?; + + if !output.status.success() { + return Err(anyhow!( + "The command invocation is not possible: {}", + output.status + )); + } + + Ok(()) +} + +fn check_perf_runs(perf_binary_path: &str, run_with_sudo: bool) -> anyhow::Result<()> { + let sudo_path = sudo_path(); + + let command = if run_with_sudo { + vec![&sudo_path, perf_binary_path, "version"] + } else { + vec![perf_binary_path, "version"] + }; + + check_binary_runs(&command, None) +} + +/// Returns the command invocation string for a given +/// [`tokio::process::Command`]. +fn get_command_invocation_string(command: &std::process::Command) -> String { + let mut command_str = command.get_program().to_string_lossy().into_owned(); + + for arg in command.get_args() { + command_str.push(' '); + command_str.push_str(&arg.to_string_lossy()); + } + + command_str +} + +/// Returns the clear `PATH` environment variable, so that even the +/// Python scripts can run cleanly without any virtualenv or pyenv +/// interference. +fn get_override_path_env() -> String { + let path = std::env::var("PATH").unwrap_or_else(|_| STANDARD_PATH_PATHS.to_string()); + + #[cfg(feature = "testing")] + let path = path + .split(':') + .filter(|p| { + let p = p.to_lowercase(); + !p.contains("virtualenv") + && !p.contains("venv") + && !p.contains("pyenv") + && !p.contains("pypoetry") + && !p.contains("pyenv-virtualenv") + && !p.contains("pyenv-virtualenvs") + }) + .collect::>() + .join(":"); + + path +} + +/// The generator for the pprof profile. +/// +/// The generator tools have the path to the binary +/// that will be used to generate the profile. +/// If the path is `None`, the tool will be searched +/// in the system's `PATH` using the default name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ProfileGenerator { + /// The `bcc` tool for generating profiles. + BccProfile(Option), + /// The Linux `perf` tool. + Perf(Option), +} + +impl ProfileGenerator { + /// Returns the path to the `profile` binary if it is set. + pub fn get_bcc_profile_binary_path(&self) -> Option { + match self { + ProfileGenerator::BccProfile(path) => path.clone(), + _ => None, + } + } +} + +/// The options for generating a pprof profile using the `perf` tool. +#[derive(Debug, Clone)] +pub struct ProfileGenerationOptions { + pub profiler: ProfileGenerator, + /// If set to `true`, the `perf` command will be run with `sudo`. + /// This is useful for profiling processes that require elevated + /// privileges. + /// If `false`, the command will be run without `sudo`. + pub run_with_sudo: bool, + /// The PID of the process to profile. This can be the main process + /// or a child process. For targeting postgres, this should be the + /// PID of the main postgres process. + pub pids: HashSet, + /// If set to `true`, the `perf` command will follow forks. + /// This means that it will continue to profile child processes + /// that are created by the main process after it has already + /// started profiling. + pub follow_forks: bool, + /// The sampling frequency in Hz. + /// This is the frequency at which stack traces will be sampled. + pub sampling_frequency: u32, + /// A list of symbols to block from the profiling output. + /// This is useful for filtering out noise from the profiling data, + /// such as system libraries or other irrelevant symbols. + pub blocklist_symbols: Vec, + /// Whether to archive the pprof profile data or not. If yes, + /// the profile data will be gzipped before returning. + pub archive: bool, +} + +/// The options for the task for generating a pprof profile using the +/// `perf` tool. +#[derive(Debug, Clone)] +pub struct ProfileGenerationTaskOptions { + /// The generation options. See [`ProfileGenerationOptions`]. + pub options: ProfileGenerationOptions, + /// The duration for which the profiling should run. + /// This is the maximum time the profiling will run before it is + /// stopped. If the profiling is not stopped manually, it will run + /// for this duration. + pub timeout: std::time::Duration, + /// An optional channel receiver that can be used to signal the + /// profiling to stop early. If provided, the profiling will stop + /// when a message is received on this channel or when the timeout + /// is reached, whichever comes first. + pub should_stop: Option>, +} + +/// Profiles the processes using the `bcc` framework and `profile.py` +/// tool and returns the collapsed (folded) stack output as [`Vec`]. +#[instrument] +async fn profile_with_bcc_profile( + options: ProfileGenerationTaskOptions, +) -> anyhow::Result> { + let bcc_profile_binary_path = + probe_bcc_profile_binary_paths(options.options.profiler.get_bcc_profile_binary_path()) + .context("failed to probe bcc profile binary paths")?; + + let path = get_override_path_env(); + + let mut command = tokio::process::Command::new( + probe_sudo_paths().context("failed to find the sudo executable")?, + ); + + command + .arg(bcc_profile_binary_path) + .arg("-f") + .arg("-F") + .arg(options.options.sampling_frequency.to_string()) + .arg("-p") + .arg( + options + .options + .pids + .iter() + .map(|p| p.as_raw().to_string()) + .collect::>() + .join(","), + ) + .arg(options.timeout.as_secs().to_string()) + .env_clear() + .env("PATH", path); + + let command_str = get_command_invocation_string(command.as_std()); + + let result = command.output(); + + let result = match options.should_stop.as_ref().map(|s| s.subscribe()) { + Some(mut rx) => { + tokio::select! { + _ = rx.recv() => { + tracing::trace!("Received shutdown signal, stopping perf..."); + + return Err(anyhow!("Profiling was stopped by shutdown signal")); + } + result = result => { + tracing::trace!("Profiling completed, processing result..."); + result + } + } + } + None => { + tracing::trace!("No shutdown signal receiver provided, running bcc profile..."); + result.await + } + } + .context(format!("failed to invoke bcc-profile using {command_str}"))?; + + match result.status.success() { + true => Ok(result.stdout), + false => { + return Err(anyhow!( + "failed to run bcc profiler, invocation: {command_str}, stderr: {}", + String::from_utf8_lossy(&result.stderr) + )); + } + } +} + +/// Profiles the processes using the `perf` tool and returns the +/// collapsed (folded) stack output as [`Vec`]. +#[allow(unsafe_code)] +#[instrument] +async fn profile_with_perf(options: ProfileGenerationTaskOptions) -> anyhow::Result> { + let perf_binary_path = match options.options.profiler { + ProfileGenerator::BccProfile(_) => { + return Err(anyhow!( + "bcc profile generator is not supported for perf profiling" + )); + } + ProfileGenerator::Perf(v) => v, + } + .unwrap_or_else(|| PathBuf::from(perf_binary_path())); + + check_perf_runs( + perf_binary_path + .to_str() + .context("couldn't reconstruct perf binary path as string.")?, + options.options.run_with_sudo, + ) + .context("couldn't check that perf is available and can be run")?; + + let temp_file = tempfile::NamedTempFile::new() + .context("failed to create temporary file for perf output")?; + let temp_file_path = temp_file.path(); + + // Step 1: Run perf to collect stack traces + let mut perf_record_command = if options.options.run_with_sudo { + let mut cmd = + tokio::process::Command::new(probe_sudo_paths().context("failed to find sudo")?); + cmd.arg(&perf_binary_path); + cmd + } else { + tokio::process::Command::new(&perf_binary_path) + }; + + let mut perf_record_command = perf_record_command + .arg("record") + // Target the specified process IDs. + .arg("-a") + // Specify the sampling frequency or default to 99 Hz. + .arg("-F") + .arg(options.options.sampling_frequency.to_string()) + // Enable call-graph (stack chain/backtrace) recording for both + // kernel space and user space. + .arg("-g") + // Disable buffering to ensure that the output is written + // immediately to the temporary file. + // This is important for real-time profiling and ensures that + // we get the most accurate stack traces and that after we "ask" + // perf to stop, we get the data dumped to the file properly. + .arg("--no-buffering"); + + if options.options.follow_forks { + perf_record_command = perf_record_command.arg("--inherit"); + } + + // Make it write to stdout instead of a file. + let perf_record_command = perf_record_command + .arg("-o") + .arg(temp_file_path) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()); + + let mut perf_record_command = perf_record_command + .spawn() + .context("failed to spawn a process for perf record")?; + + let rx = options.should_stop.as_ref().map(|s| s.subscribe()); + + if let Some(mut rx) = rx { + tokio::select! { + _ = rx.recv() => { + tracing::trace!("Received shutdown signal, stopping perf..."); + } + _ = tokio::time::sleep(options.timeout) => { + tracing::trace!("Timeout reached, stopping perf..."); + } + } + } else { + tokio::time::sleep(options.timeout).await; + tracing::trace!("Timeout reached, stopping perf..."); + } + + let perf_record_pid = perf_record_command + .id() + .ok_or_else(|| anyhow!("failed to get perf record process ID"))?; + + // Send SIGTERM to the perf record process to stop it + // gracefully and wait for it to finish. + let _ = nix::sys::signal::kill( + nix::unistd::Pid::from_raw(perf_record_pid as i32), + nix::sys::signal::Signal::SIGTERM, + ); + + tracing::trace!( + "Waiting for perf record to finish for pids = {:?}...", + options.options.pids + ); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + if perf_record_command.try_wait().is_err() { + perf_record_command + .kill() + .await + .context("failed to kill perf record command")?; + } + + let _ = perf_record_command + .wait() + .await + .context("failed to wait for perf record command")?; + + let mut perf_record_stderr = perf_record_command + .stderr + .take() + .context("failed to take stderr from perf record command")?; + + let mut perf_record_stderr_string: String = String::new(); + perf_record_stderr + .read_to_string(&mut perf_record_stderr_string) + .await + .context("failed to read stderr from perf record command")?; + + let temp_file_size = std::fs::metadata(temp_file_path) + .context(format!( + "couldn't get metadata for temp file: {temp_file_path:?}" + ))? + .len(); + + if temp_file_size == 0 { + tracing::warn!( + "perf record output file is empty, no data collected.\nPerf stderr: {perf_record_stderr_string}" + ); + + return Err(anyhow!( + "perf record output file is empty, no data collected" + )); + } + + let perf_script_output = tokio::process::Command::new(&perf_binary_path) + .arg("script") + .arg("-i") + .arg(temp_file_path) + .arg(format!( + "--pid={}", + options + .options + .pids + .iter() + .map(|p| p.as_raw().to_string()) + .collect::>() + .join(",") + )) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .output() + .await + .context(format!("couldn't run perf script -i {temp_file_path:?}"))?; + + if !perf_script_output.status.success() { + return Err(anyhow!( + "perf script command failed: {}", + String::from_utf8(perf_script_output.stderr) + .unwrap_or_else(|_| "Invalid UTF-8 output".to_string()) + )); + } + + if perf_script_output.stdout.is_empty() { + return Err(anyhow!(format!( + "Perf script output is empty for pid = {:?}.\n", + options.options.pids + ))); + } + + // Step 2: Collapse the stack traces into a folded stack trace + let mut folder = { + let mut options = inferno::collapse::perf::Options::default(); + options.annotate_jit = true; // Enable JIT annotation if needed + options.annotate_kernel = true; // Enable kernel annotation if needed + options.include_addrs = true; // Include addresses in the output + options.include_pid = true; // Include PIDs in the output + options.include_tid = true; // Include TIDs in the output + inferno::collapse::perf::Folder::from(options) + }; + + let mut collapsed = Vec::new(); + + folder + .collapse(perf_script_output.stdout.as_slice(), &mut collapsed) + .context("couldn't collapse the output of perf script into the folded format")?; + + if collapsed.is_empty() { + tracing::error!( + "collapsed stack trace is empty for output: {}", + String::from_utf8_lossy(&perf_script_output.stdout) + ); + + return Err(anyhow!("collapsed stack trace is empty")); + } + + Ok(collapsed) +} + +/// Run perf against a process with the given name and generate a pprof +/// profile from the output. +/// +/// The pipeline is as follows: +/// 1. Running perf for the specified process name and duration. +/// 2. Collapsing the perf output into a folded stack trace. +/// 3. Generating a pprof profile from the collapsed stack trace. +/// +/// This function blocks until the profiling is complete or the timeout +/// is reached. +/// +/// If the perf binary path is not provided, it defaults to "perf" in +/// the system's `PATH`. +pub async fn generate_pprof_profile( + mut options: ProfileGenerationTaskOptions, +) -> anyhow::Result { + options.options.pids = options + .options + .pids + .into_iter() + .flat_map(list_children_processes) + .flatten() + .collect::>(); + + let blocklist_symbols = options.options.blocklist_symbols.clone(); + let archive = options.options.archive; + + let collapsed = match options.options.profiler { + ProfileGenerator::BccProfile(_) => profile_with_bcc_profile(options).await, + ProfileGenerator::Perf(_) => profile_with_perf(options).await, + } + .context("failed to run the profiler")?; + + if collapsed.is_empty() { + return Err(anyhow!("collapsed output is empty, no data collected")); + } + + generate_pprof_from_collapsed(&collapsed, &blocklist_symbols, archive) + .context("failed to generate pprof profile from collapsed stack trace") +} + +fn archive_bytes(bytes: &[u8], output: &mut W) -> anyhow::Result<()> { + let mut encoder = GzEncoder::new(output, Compression::default()); + + encoder + .write_all(bytes) + .context("couldn't write the bytes into the gz encoder")?; + encoder + .finish() + .context("couldn't finish the gz encoder session")?; + + Ok(()) +} + +/// Returns the executable path for a given process ID. +fn get_exe_from_pid(pid: Pid, fallback_name: &str) -> String { + libproc::proc_pid::name(pid.as_raw()).unwrap_or_else(|_| fallback_name.to_owned()) +} + +/// Generate a pprof profile from a collapsed stack trace file. +pub fn generate_pprof_from_collapsed>( + collapsed_bytes: &[u8], + blocklist_symbols: &[S], + archive: bool, +) -> anyhow::Result { + use pprof::protos::Message; + use pprof::protos::profile::{Function, Line, Location, Profile, Sample, ValueType}; + + let mut profile = Profile::default(); + let mut value_type = ValueType::new(); + value_type.ty = profile.string_table.len() as i64; + value_type.unit = profile.string_table.len() as i64 + 1; + + profile.sample_type.push(value_type); + profile.string_table.push("".to_string()); // 0 + profile.string_table.push("samples".to_string()); // 1 + profile.string_table.push("count".to_string()); // 2 + + let mut mapping_map = HashMap::new(); // binary_name -> mapping_id + let mut function_map = HashMap::new(); // (function_name) -> function_id + let mut location_map = HashMap::new(); // (function_id) -> location_id + + let reader = BufReader::new(Cursor::new(collapsed_bytes)); + + for line in reader.lines() { + let line = line.context("invalid line found")?; + let Some((stack_str, count_str)) = line.rsplit_once(' ') else { + continue; + }; + let count: i64 = count_str + .trim() + .parse() + .context("failed to parse the counter")?; + + let mut parts = stack_str.trim().split(';'); + + // Extract binary name and function stack + let raw_binary = parts.next().unwrap_or("[unknown]"); + let (bin_part, pid) = raw_binary + .rsplit_once('/') + .unwrap_or((raw_binary, "unknown")); + let binary_name = bin_part.strip_suffix("-?").unwrap_or(bin_part); + + let mapping_id = *mapping_map + .entry(binary_name.to_string()) + .or_insert_with(|| { + let exe = + get_exe_from_pid(Pid::from_raw(pid.parse::().unwrap_or(0)), "unknown"); + let id = profile.mapping.len() as u64 + 1; + profile.string_table.push(exe); + profile.mapping.push(pprof::protos::profile::Mapping { + id, + filename: profile.string_table.len() as i64 - 1, + ..Default::default() + }); + id + }); + + let stack: Vec<&str> = stack_str + .trim() + .split(';') + .filter(|s| { + // Filter out blocklisted symbols + !blocklist_symbols.iter().any(|b| s.contains(b.as_ref())) + }) + .collect(); + + let mut location_ids = Vec::new(); + + for func_name in stack.iter() { + let func_id = *function_map + .entry(func_name.to_string()) + .or_insert_with(|| { + let id = profile.function.len() as u64 + 1; + profile.string_table.push(func_name.to_string()); + profile.function.push(Function { + id, + name: profile.string_table.len() as i64 - 1, + system_name: profile.string_table.len() as i64 - 1, + filename: 0, + start_line: 0, + special_fields: Default::default(), + }); + id + }); + + let loc_id = *location_map.entry(func_id).or_insert_with(|| { + let id = profile.location.len() as u64 + 1; + profile.location.push(Location { + id, + mapping_id, + address: 0, + line: vec![Line { + function_id: func_id, + line: 0, + special_fields: Default::default(), + }], + is_folded: true, + special_fields: Default::default(), + }); + id + }); + + location_ids.push(loc_id); + } + + profile.sample.push(Sample { + location_id: location_ids.into_iter().rev().collect(), + value: vec![count], + label: vec![], + special_fields: Default::default(), + }); + } + + let mut protobuf_encoded = Vec::new(); + profile + .write_to_vec(&mut protobuf_encoded) + .context("failed to write the encoded pprof-protobuf message into a vec")?; + + Ok(PprofData(if archive { + let mut archived = Vec::new(); + archive_bytes(&protobuf_encoded, &mut archived) + .context("couldn't archive the pprof-protobuf message data")?; + archived + } else { + protobuf_encoded + })) +} diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index 2a636bec40..4b8ab26da6 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -6,8 +6,8 @@ license.workspace = true [dependencies] thiserror.workspace = true -nix.workspace=true +nix.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } [target.'cfg(target_os = "macos")'.dependencies] -tempfile = "3.14.0" +tempfile = "3.20.0" diff --git a/poetry.lock b/poetry.lock index 1bc5077eb7..cb1f2e92bb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -2326,6 +2326,25 @@ files = [ {file = "propcache-0.2.0.tar.gz", hash = "sha256:df81779732feb9d01e5d513fad0122efb3d53bbc75f61b2a4f29a020bc985e70"}, ] +[[package]] +name = "protobuf" +version = "6.31.1" +description = "" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9"}, + {file = "protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447"}, + {file = "protobuf-6.31.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:6f1227473dc43d44ed644425268eb7c2e488ae245d51c6866d19fe158e207402"}, + {file = "protobuf-6.31.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:a40fc12b84c154884d7d4c4ebd675d5b3b5283e155f324049ae396b95ddebc39"}, + {file = "protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4ee898bf66f7a8b0bd21bce523814e6fbd8c6add948045ce958b73af7e8878c6"}, + {file = "protobuf-6.31.1-cp39-cp39-win32.whl", hash = "sha256:0414e3aa5a5f3ff423828e1e6a6e907d6c65c1d5b7e6e975793d5590bdeecc16"}, + {file = "protobuf-6.31.1-cp39-cp39-win_amd64.whl", hash = "sha256:8764cf4587791e7564051b35524b72844f845ad0bb011704c3736cce762d8fe9"}, + {file = "protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e"}, + {file = "protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a"}, +] + [[package]] name = "psutil" version = "5.9.4" @@ -3309,6 +3328,18 @@ files = [ [package.dependencies] cryptography = "*" +[[package]] +name = "types-protobuf" +version = "6.30.2.20250516" +description = "Typing stubs for protobuf" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722"}, + {file = "types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41"}, +] + [[package]] name = "types-psutil" version = "5.9.5.12" @@ -3847,4 +3878,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5" +content-hash = "7cc735f57c2760db6c994575a98d4f0e2670497ad9e909135a3bc67d479f5edf" diff --git a/pyproject.toml b/pyproject.toml index e7e314d144..21e6997376 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,8 @@ psycopg2-binary = "^2.9.10" typing-extensions = "^4.12.2" PyJWT = {version = "^2.1.0", extras = ["crypto"]} requests = "^2.32.4" +protobuf = "^6.31.1" +types-protobuf="^6.30.2" pytest-xdist = "^3.3.1" asyncpg = "^0.30.0" aiopg = "^1.4.0" @@ -63,6 +65,7 @@ build-backend = "poetry.core.masonry.api" exclude = [ "^vendor/", "^target/", + "test_runner/regress/data/profile_pb2.py", "test_runner/performance/pgvector/loaddata.py", ] check_untyped_defs = true @@ -94,6 +97,7 @@ target-version = "py311" extend-exclude = [ "vendor/", "target/", + "test_runner/regress/data/profile_pb2.py", "test_runner/stubs/", # Autogenerated by mypy's stubgen ] line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 1d278095ce..8ac4eb1744 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -115,6 +115,39 @@ class EndpointHttpClient(requests.Session): json: dict[str, str] = res.json() return json + def start_profiling_cpu( + self, sampling_frequency: int, timeout_seconds: int, archive: bool = False + ) -> tuple[int, bytes]: + url = f"http://localhost:{self.external_port}/profile/cpu" + params = { + "profiler": {"BccProfile": None}, + "sampling_frequency": sampling_frequency, + "timeout_seconds": timeout_seconds, + "archive": archive, + } + + res = self.post( + url, + json=params, + auth=self.auth, + ) + + return res.status_code, res.content + + def stop_profiling_cpu(self) -> int: + url = f"http://localhost:{self.external_port}/profile/cpu" + res = self.delete(url, auth=self.auth) + return res.status_code + + def get_profiling_cpu_status(self) -> bool: + """ + Returns True if CPU profiling is currently running, False otherwise. + """ + url = f"http://localhost:{self.external_port}/profile/cpu" + res = self.get(url, auth=self.auth) + res.raise_for_status() + return res.status_code == 200 + def database_schema(self, database: str): res = self.get( f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}", diff --git a/test_runner/regress/data/__init__.py b/test_runner/regress/data/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_runner/regress/data/profile_pb2.py b/test_runner/regress/data/profile_pb2.py new file mode 100644 index 0000000000..c507ef9f6c --- /dev/null +++ b/test_runner/regress/data/profile_pb2.py @@ -0,0 +1,41 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# type: ignore +# source: profile.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rprofile.proto\x12\x12perftools.profiles\"\xd5\x03\n\x07Profile\x12\x32\n\x0bsample_type\x18\x01 \x03(\x0b\x32\x1d.perftools.profiles.ValueType\x12*\n\x06sample\x18\x02 \x03(\x0b\x32\x1a.perftools.profiles.Sample\x12,\n\x07mapping\x18\x03 \x03(\x0b\x32\x1b.perftools.profiles.Mapping\x12.\n\x08location\x18\x04 \x03(\x0b\x32\x1c.perftools.profiles.Location\x12.\n\x08\x66unction\x18\x05 \x03(\x0b\x32\x1c.perftools.profiles.Function\x12\x14\n\x0cstring_table\x18\x06 \x03(\t\x12\x13\n\x0b\x64rop_frames\x18\x07 \x01(\x03\x12\x13\n\x0bkeep_frames\x18\x08 \x01(\x03\x12\x12\n\ntime_nanos\x18\t \x01(\x03\x12\x16\n\x0e\x64uration_nanos\x18\n \x01(\x03\x12\x32\n\x0bperiod_type\x18\x0b \x01(\x0b\x32\x1d.perftools.profiles.ValueType\x12\x0e\n\x06period\x18\x0c \x01(\x03\x12\x0f\n\x07\x63omment\x18\r \x03(\x03\x12\x1b\n\x13\x64\x65\x66\x61ult_sample_type\x18\x0e \x01(\x03\"\'\n\tValueType\x12\x0c\n\x04type\x18\x01 \x01(\x03\x12\x0c\n\x04unit\x18\x02 \x01(\x03\"V\n\x06Sample\x12\x13\n\x0blocation_id\x18\x01 \x03(\x04\x12\r\n\x05value\x18\x02 \x03(\x03\x12(\n\x05label\x18\x03 \x03(\x0b\x32\x19.perftools.profiles.Label\"@\n\x05Label\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\x0b\n\x03str\x18\x02 \x01(\x03\x12\x0b\n\x03num\x18\x03 \x01(\x03\x12\x10\n\x08num_unit\x18\x04 \x01(\x03\"\xdd\x01\n\x07Mapping\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x14\n\x0cmemory_start\x18\x02 \x01(\x04\x12\x14\n\x0cmemory_limit\x18\x03 \x01(\x04\x12\x13\n\x0b\x66ile_offset\x18\x04 \x01(\x04\x12\x10\n\x08\x66ilename\x18\x05 \x01(\x03\x12\x10\n\x08\x62uild_id\x18\x06 \x01(\x03\x12\x15\n\rhas_functions\x18\x07 \x01(\x08\x12\x15\n\rhas_filenames\x18\x08 \x01(\x08\x12\x18\n\x10has_line_numbers\x18\t \x01(\x08\x12\x19\n\x11has_inline_frames\x18\n \x01(\x08\"v\n\x08Location\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x12\n\nmapping_id\x18\x02 \x01(\x04\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\x04\x12&\n\x04line\x18\x04 \x03(\x0b\x32\x18.perftools.profiles.Line\x12\x11\n\tis_folded\x18\x05 \x01(\x08\")\n\x04Line\x12\x13\n\x0b\x66unction_id\x18\x01 \x01(\x04\x12\x0c\n\x04line\x18\x02 \x01(\x03\"_\n\x08\x46unction\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\x03\x12\x13\n\x0bsystem_name\x18\x03 \x01(\x03\x12\x10\n\x08\x66ilename\x18\x04 \x01(\x03\x12\x12\n\nstart_line\x18\x05 \x01(\x03\x42-\n\x1d\x63om.google.perftools.profilesB\x0cProfileProtob\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'profile_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'\n\035com.google.perftools.profilesB\014ProfileProto' + _PROFILE._serialized_start=38 + _PROFILE._serialized_end=507 + _VALUETYPE._serialized_start=509 + _VALUETYPE._serialized_end=548 + _SAMPLE._serialized_start=550 + _SAMPLE._serialized_end=636 + _LABEL._serialized_start=638 + _LABEL._serialized_end=702 + _MAPPING._serialized_start=705 + _MAPPING._serialized_end=926 + _LOCATION._serialized_start=928 + _LOCATION._serialized_end=1046 + _LINE._serialized_start=1048 + _LINE._serialized_end=1089 + _FUNCTION._serialized_start=1091 + _FUNCTION._serialized_end=1186 +# @@protoc_insertion_point(module_scope) diff --git a/test_runner/regress/test_compute_profiling.py b/test_runner/regress/test_compute_profiling.py new file mode 100644 index 0000000000..145d2d035c --- /dev/null +++ b/test_runner/regress/test_compute_profiling.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import gzip +import io +import threading +import time +from typing import TYPE_CHECKING, Any + +import pytest +from data.profile_pb2 import Profile # type: ignore +from fixtures.log_helper import log +from fixtures.utils import run_only_on_default_postgres +from google.protobuf.message import Message +from requests import HTTPError + +if TYPE_CHECKING: + from fixtures.endpoint.http import EndpointHttpClient + from fixtures.neon_fixtures import NeonEnv + +REASON = "test doesn't use postgres" + + +def _start_profiling_cpu( + client: EndpointHttpClient, event: threading.Event | None, archive: bool = False +) -> Profile | None: + """ + Start CPU profiling for the compute node. + """ + log.info("Starting CPU profiling...") + + if event is not None: + event.set() + + status, response = client.start_profiling_cpu(100, 30, archive=archive) + match status: + case 200: + log.debug("CPU profiling finished") + profile: Any = Profile() + if archive: + log.debug("Unpacking gzipped profile data") + with gzip.GzipFile(fileobj=io.BytesIO(response)) as f: + response = f.read() + else: + log.debug("Unpacking non-gzipped profile data") + Message.ParseFromString(profile, response) + return profile + case 204: + log.debug("CPU profiling was stopped") + raise HTTPError("Failed to finish CPU profiling: was stopped.") + case 409: + log.debug("CPU profiling is already in progress, cannot start again") + raise HTTPError("Failed to finish CPU profiling: profiling is already in progress.") + case 500: + response_str = response.decode("utf-8", errors="replace") + log.debug( + f"Failed to finish CPU profiling, was stopped or got an error: {status}: {response_str}" + ) + raise HTTPError(f"Failed to finish CPU profiling, {status}: {response_str}") + case _: + log.error(f"Failed to start CPU profiling: {status}") + raise HTTPError(f"Failed to start CPU profiling: {status}") + + +def _stop_profiling_cpu(client: EndpointHttpClient, event: threading.Event | None): + """ + Stop CPU profiling for the compute node. + """ + log.info("Manually stopping CPU profiling...") + + if event is not None: + event.set() + + status = client.stop_profiling_cpu() + match status: + case 200: + log.debug("CPU profiling stopped successfully") + case 412: + log.debug("CPU profiling is not running, nothing to do") + case _: + log.error(f"Failed to stop CPU profiling: {status}") + raise HTTPError(f"Failed to stop CPU profiling: {status}") + return status + + +def _wait_till_profiling_starts( + http_client: EndpointHttpClient, + event: threading.Event | None, + repeat_delay_secs: float = 0.3, + timeout: int = 60, +) -> bool: + """ + Wait until CPU profiling starts. + """ + log.info("Waiting for CPU profiling to start...") + + end_time = time.time() + timeout + + while not http_client.get_profiling_cpu_status(): + time.sleep(repeat_delay_secs) + + if end_time <= time.time(): + log.error("Timeout while waiting for CPU profiling to start") + return False + + log.info("CPU profiling has started successfully and is in progress") + + if event is not None: + event.set() + + return True + + +def _wait_and_assert_cpu_profiling(http_client: EndpointHttpClient, event: threading.Event | None): + profile = _start_profiling_cpu(http_client, event) + + if profile is None: + log.error("The received profiling data is malformed or empty.") + return + + assert len(profile.sample) > 0, "No samples found in CPU profiling data" + assert len(profile.mapping) > 0, "No mappings found in CPU profiling data" + assert len(profile.location) > 0, "No locations found in CPU profiling data" + assert len(profile.function) > 0, "No functions found in CPU profiling data" + assert len(profile.string_table) > 0, "No string tables found in CPU profiling data" + strings = [ + "PostgresMain", + "ServerLoop", + "BackgroundWorkerMain", + "pq_recvbuf", + "pq_getbyte", + ] + + assert any(s in profile.string_table for s in strings), ( + f"Expected at least one of {strings} in string table, but none found" + ) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_with_timeout(neon_simple_env: NeonEnv): + """ + Test that CPU profiling works correctly with timeout. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + pg_conn = endpoint.connect() + pg_cur = pg_conn.cursor() + pg_cur.execute("create database profiling_test2") + http_client = endpoint.http_client() + + def _wait_and_assert_cpu_profiling_local(): + _wait_and_assert_cpu_profiling(http_client, None) + + thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local) + thread.start() + + inserting_should_stop_event = threading.Event() + + def insert_rows(): + lfc_conn = endpoint.connect(dbname="profiling_test2") + lfc_cur = lfc_conn.cursor() + n_records = 0 + lfc_cur.execute( + "create table t(pk integer primary key, payload text default repeat('?', 128))" + ) + batch_size = 1000 + + log.info("Inserting rows") + + while not inserting_should_stop_event.is_set(): + n_records += batch_size + lfc_cur.execute( + f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))" + ) + + log.info(f"Inserted {n_records} rows") + + thread2 = threading.Thread(target=insert_rows) + + assert _wait_till_profiling_starts(http_client, None) + time.sleep(4) # Give some time for the profiling to start + thread2.start() + + thread.join(timeout=60) + inserting_should_stop_event.set() # Stop the insertion thread + thread2.join(timeout=60) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_with_archiving_the_response(neon_simple_env: NeonEnv): + """ + Test that CPU profiling works correctly with archiving the data. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + assert _start_profiling_cpu(http_client, None, archive=True) is not None, ( + "Failed to start and finish CPU profiling with archiving enabled" + ) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_start_and_stop(neon_simple_env: NeonEnv): + """ + Test that CPU profiling can be started and stopped correctly. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + def _wait_and_assert_cpu_profiling(): + # Should raise as the profiling will be stopped. + with pytest.raises(HTTPError) as _: + _start_profiling_cpu(http_client, None) + + thread = threading.Thread(target=_wait_and_assert_cpu_profiling) + thread.start() + + assert _wait_till_profiling_starts(http_client, None) + _stop_profiling_cpu(http_client, None) + + # Should raise as the profiling is already stopped. + assert _stop_profiling_cpu(http_client, None) == 412 + + thread.join(timeout=60) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_conflict(neon_simple_env: NeonEnv): + """ + Test that CPU profiling can be started once and the second time + it will throw an error as it is already running. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + pg_conn = endpoint.connect() + pg_cur = pg_conn.cursor() + pg_cur.execute("create database profiling_test") + http_client = endpoint.http_client() + + def _wait_and_assert_cpu_profiling_local(): + _wait_and_assert_cpu_profiling(http_client, None) + + thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local) + thread.start() + + assert _wait_till_profiling_starts(http_client, None) + + inserting_should_stop_event = threading.Event() + + def insert_rows(): + lfc_conn = endpoint.connect(dbname="profiling_test") + lfc_cur = lfc_conn.cursor() + n_records = 0 + lfc_cur.execute( + "create table t(pk integer primary key, payload text default repeat('?', 128))" + ) + batch_size = 1000 + + log.info("Inserting rows") + + while not inserting_should_stop_event.is_set(): + n_records += batch_size + lfc_cur.execute( + f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))" + ) + + log.info(f"Inserted {n_records} rows") + + thread2 = threading.Thread(target=insert_rows) + thread2.start() + + # Should raise as the profiling is already in progress. + with pytest.raises(HTTPError) as _: + _start_profiling_cpu(http_client, None) + + inserting_should_stop_event.set() # Stop the insertion thread + + # The profiling should still be running and finish normally. + thread.join(timeout=600) + thread2.join(timeout=600) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_stop_when_not_running(neon_simple_env: NeonEnv): + """ + Test that CPU profiling throws the expected error when is attempted + to be stopped when it hasn't be running. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + for _ in range(3): + status = _stop_profiling_cpu(http_client, None) + assert status == 412 + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_start_arguments_validation_works(neon_simple_env: NeonEnv): + """ + Test that CPU profiling start request properly validated the + arguments and throws the expected error (bad request). + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + for sampling_frequency in [-1, 0, 1000000]: + status, _ = http_client.start_profiling_cpu(sampling_frequency, 5) + assert status == 422 + for timeout_seconds in [-1, 0, 1000000]: + status, _ = http_client.start_profiling_cpu(5, timeout_seconds) + assert status == 422 diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index fc01deb92d..47160c0c8d 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -120,6 +120,7 @@ anyhow = { version = "1", features = ["backtrace"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } +clang-sys = { version = "1", default-features = false, features = ["clang_11_0", "runtime"] } clap = { version = "4", features = ["derive", "env", "string"] } clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] } either = { version = "1" }