diff --git a/Cargo.lock b/Cargo.lock index c528354053..65dcd5c3c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -962,6 +962,24 @@ dependencies = [ "serde", ] +[[package]] +name = "bindgen" +version = "0.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" +dependencies = [ + "bitflags 2.8.0", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "proc-macro2", + "quote", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.100", +] + [[package]] name = "bindgen" version = "0.71.1" @@ -1336,8 +1354,10 @@ dependencies = [ "hostname-validator", "http 1.1.0", "indexmap 2.9.0", + "inferno 0.12.0", "itertools 0.10.5", "jsonwebtoken", + "libproc", "metrics", "nix 0.30.1", "notify", @@ -1351,6 +1371,8 @@ dependencies = [ "postgres-types", "postgres_initdb", "postgres_versioninfo", + "pprof 0.15.0", + "prost 0.12.6", "regex", "remote_storage", "reqwest", @@ -1362,6 +1384,7 @@ dependencies = [ "serde_with", "signal-hook", "tar", + "tempfile", "thiserror 1.0.69", "tokio", "tokio-postgres", @@ -2198,12 +2221,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -2533,6 +2556,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "gettid" version = "0.1.3" @@ -2790,6 +2825,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -2899,7 +2943,7 @@ dependencies = [ "jsonwebtoken", "metrics", "once_cell", - "pprof", + "pprof 0.14.0", "regex", "routerify", "rustls 0.23.27", @@ -3561,7 +3605,7 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" dependencies = [ - "spin", + "spin 0.9.8", ] [[package]] @@ -3586,6 +3630,17 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libproc" +version = "0.14.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78a09b56be5adbcad5aa1197371688dc6bb249a26da3bca2011ee2fb987ebfb" +dependencies = [ + "bindgen 0.70.1", + "errno", + "libc", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -3598,6 +3653,12 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0b5399f6804fbab912acbd8878ed3532d506b7c951b8f9f164ef90fef39e3f4" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.7.4" @@ -4390,7 +4451,7 @@ dependencies = [ "postgres_ffi_types", "postgres_initdb", "posthog_client_lite", - "pprof", + "pprof 0.14.0", "pq_proto", "procfs", "rand 0.8.5", @@ -4965,7 +5026,7 @@ name = "postgres_ffi" version = "0.1.0" dependencies = [ "anyhow", - "bindgen", + "bindgen 0.71.1", "bytes", "crc32c", "criterion", @@ -4976,7 +5037,7 @@ dependencies = [ "postgres", "postgres_ffi_types", "postgres_versioninfo", - "pprof", + "pprof 0.14.0", "regex", "serde", "thiserror 1.0.69", @@ -5066,6 +5127,30 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pprof" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38a01da47675efa7673b032bf8efd8214f1917d89685e07e395ab125ea42b187" +dependencies = [ + "aligned-vec", + "backtrace", + "cfg-if", + "findshlibs", + "inferno 0.11.21", + "libc", + "log", + "nix 0.26.4", + "once_cell", + "protobuf", + "protobuf-codegen", + "smallvec", + "spin 0.10.0", + "symbolic-demangle", + "tempfile", + "thiserror 2.0.11", +] + [[package]] name = "pprof_util" version = "0.7.0" @@ -5141,7 +5226,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix", + "rustix 0.38.41", ] [[package]] @@ -5277,6 +5362,57 @@ dependencies = [ "prost 0.13.5", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-codegen" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace" +dependencies = [ + "anyhow", + "once_cell", + "protobuf", + "protobuf-parse", + "regex", + "tempfile", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-parse" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973" +dependencies = [ + "anyhow", + "indexmap 2.9.0", + "log", + "protobuf", + "protobuf-support", + "tempfile", + "thiserror 1.0.69", + "which", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "proxy" version = "0.1.0" @@ -5448,6 +5584,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "rand" version = "0.7.3" @@ -6027,6 +6169,19 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustix" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +dependencies = [ + "bitflags 2.8.0", + "errno", + "libc", + "linux-raw-sys 0.9.4", + "windows-sys 0.59.0", +] + [[package]] name = "rustls" version = "0.21.12" @@ -6209,7 +6364,7 @@ dependencies = [ "postgres_backend", "postgres_ffi", "postgres_versioninfo", - "pprof", + "pprof 0.14.0", "pq_proto", "rand 0.8.5", "regex", @@ -6798,6 +6953,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spin" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5fe4ccb98d9c292d56fec89a5e07da7fc4cf0dc11e156b41793132775d3e591" +dependencies = [ + "lock_api", +] + [[package]] name = "spinning_top" version = "0.3.0" @@ -7157,14 +7321,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.14.0" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ - "cfg-if", "fastrand 2.2.0", + "getrandom 0.3.3", "once_cell", - "rustix", + "rustix 1.0.7", "windows-sys 0.59.0", ] @@ -8177,7 +8341,7 @@ dependencies = [ "pem", "pin-project-lite", "postgres_connection", - "pprof", + "pprof 0.14.0", "pq_proto", "rand 0.8.5", "regex", @@ -8284,7 +8448,7 @@ dependencies = [ "pageserver_api", "postgres_ffi", "postgres_ffi_types", - "pprof", + "pprof 0.14.0", "prost 0.13.5", "remote_storage", "serde", @@ -8314,7 +8478,7 @@ name = "walproposer" version = "0.1.0" dependencies = [ "anyhow", - "bindgen", + "bindgen 0.71.1", "postgres_ffi", "utils", ] @@ -8341,6 +8505,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasite" version = "0.1.0" @@ -8470,6 +8643,18 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "which" +version = "4.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7" +dependencies = [ + "either", + "home", + "once_cell", + "rustix 0.38.41", +] + [[package]] name = "whoami" version = "1.5.1" @@ -8698,6 +8883,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.8.0", +] + [[package]] name = "workspace_hack" version = "0.1.0" @@ -8713,6 +8907,7 @@ dependencies = [ "camino", "cc", "chrono", + "clang-sys", "clap", "clap_builder", "const-oid", diff --git a/Cargo.toml b/Cargo.toml index 0d521ee4d9..5fdf8e9a3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] } jsonwebtoken = "9" lasso = "0.7" libc = "0.2" +libproc = "0.14" md5 = "0.7.0" measured = { version = "0.0.22", features=["lasso"] } measured-process = { version = "0.0.22" } @@ -278,6 +279,7 @@ safekeeper_api = { version = "0.1", path = "./libs/safekeeper_api" } safekeeper_client = { path = "./safekeeper/client" } storage_broker = { version = "0.1", path = "./storage_broker/" } # Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy. storage_controller_client = { path = "./storage_controller/client" } +tempfile = "3" tenant_size_model = { version = "0.1", path = "./libs/tenant_size_model/" } tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" } utils = { version = "0.1", path = "./libs/utils/" } diff --git a/Dockerfile b/Dockerfile index 55b87d4012..231d0aef1d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -109,6 +109,8 @@ RUN set -e \ libreadline-dev \ libseccomp-dev \ ca-certificates \ + bpfcc-tools \ + sudo \ openssl \ unzip \ curl \ diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index 14a52bd736..ddc32b2c9c 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -61,6 +61,9 @@ RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \ libpq5 \ libpq-dev \ libzstd-dev \ + linux-perf \ + bpfcc-tools \ + linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \ postgresql-16 \ postgresql-server-dev-16 \ postgresql-common \ @@ -105,15 +108,21 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \ # # 'gdb' is included so that we get backtraces of core dumps produced in # regression tests -RUN set -e \ +RUN set -ex \ + && KERNEL_VERSION="$(uname -r | cut -d'-' -f1 | sed 's/\.0$//')" \ + && echo KERNEL_VERSION=${KERNEL_VERSION} >> /etc/environment \ + && KERNEL_ARCH=$(uname -m | awk '{ if ($1 ~ /^(x86_64|i[3-6]86)$/) print "x86"; else if ($1 ~ /^(aarch64|arm.*)$/) print "aarch"; else print $1 }') \ + && echo KERNEL_ARCH=${KERNEL_ARCH} >> /etc/environment \ && apt update \ && apt install -y \ autoconf \ automake \ + bc \ bison \ build-essential \ ca-certificates \ cmake \ + cpio \ curl \ flex \ gdb \ @@ -122,8 +131,10 @@ RUN set -e \ gzip \ jq \ jsonnet \ + kmod \ libcurl4-openssl-dev \ libbz2-dev \ + libelf-dev \ libffi-dev \ liblzma-dev \ libncurses5-dev \ @@ -137,6 +148,11 @@ RUN set -e \ libxml2-dev \ libxmlsec1-dev \ libxxhash-dev \ + linux-perf \ + bpfcc-tools \ + libbpfcc \ + libbpfcc-dev \ + linux-headers-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac) \ lsof \ make \ netcat-openbsd \ @@ -144,12 +160,35 @@ RUN set -e \ openssh-client \ parallel \ pkg-config \ + rsync \ + sudo \ unzip \ wget \ xz-utils \ zlib1g-dev \ zstd \ - && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \ + && git clone --depth 1 --branch v${KERNEL_VERSION} https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git \ + && cd linux \ + && make mrproper \ + && make defconfig \ + && scripts/config --module CONFIG_IKHEADERS \ + && scripts/config --enable CONFIG_MODULE_COMPRESS \ + && scripts/config --disable CONFIG_MODULE_COMPRESS_GZIP \ + && scripts/config --enable CONFIG_MODULE_COMPRESS_ZSTD \ + && make olddefconfig \ + && make modules_prepare -j \ + && make WERROR=0 NO_WERROR=1 -j 10 \ + && make WERROR=0 NO_WERROR=1 modules -j10 \ + && mkdir -p /lib/modules/$(uname -r)/build \ + && mkdir -p /lib/modules/$(uname -r)/kernel/kernel \ + && cp -a include arch/${KERNEL_ARCH}/include scripts Module.symvers .config Makefile /lib/modules/$(uname -r)/build/ \ + && make headers_install INSTALL_HDR_PATH=/lib/modules/$(uname -r)/build \ + && cp -a arch/${KERNEL_ARCH}/include /lib/modules/$(uname -r)/build/arch/${KERNEL_ARCH}/include \ + && zstd -19 ./kernel/kheaders.ko -o ./kernel/kheaders.ko.zst \ + && cp -a kernel/kheaders.ko.zst /lib/modules/$(uname -r)/kernel/kernel \ + && execsnoop-bpfcc \ + && rm -rf linux # sql_exporter @@ -198,6 +237,8 @@ RUN curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o / # Configure sudo & docker RUN usermod -aG sudo nonroot && \ echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers && \ + mkdir -p /etc/sudoers.d && \ + echo 'nonroot ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/nonroot && \ usermod -aG docker nonroot # AWS CLI diff --git a/compute/vm-image-spec-bookworm.yaml b/compute/vm-image-spec-bookworm.yaml index 267e4c83b5..3372293f5d 100644 --- a/compute/vm-image-spec-bookworm.yaml +++ b/compute/vm-image-spec-bookworm.yaml @@ -65,7 +65,7 @@ files: # regardless of hostname (ALL) # # Also allow it to shut down the VM. The fast_import job does that when it's finished. - postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd + postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc - filename: cgconfig.conf content: | # Configuration for cgroups in VM compute nodes @@ -152,6 +152,8 @@ merge: | RUN set -e \ && chmod 0644 /etc/cgconfig.conf + ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf + COPY compute_rsyslog.conf /etc/compute_rsyslog.conf RUN chmod 0666 /etc/compute_rsyslog.conf diff --git a/compute/vm-image-spec-bullseye.yaml b/compute/vm-image-spec-bullseye.yaml index 2b6e77b656..4e5a0d3ca6 100644 --- a/compute/vm-image-spec-bullseye.yaml +++ b/compute/vm-image-spec-bullseye.yaml @@ -65,7 +65,7 @@ files: # regardless of hostname (ALL) # # Also allow it to shut down the VM. The fast_import job does that when it's finished. - postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd + postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff, /usr/sbin/rsyslogd, /neonvm/tools/bin/perf, /usr/sbin/profile-bpfcc - filename: cgconfig.conf content: | # Configuration for cgroups in VM compute nodes @@ -148,6 +148,8 @@ merge: | RUN set -e \ && chmod 0644 /etc/cgconfig.conf + ENV PERF_BINARY_PATH=/neonvm/tools/bin/perf + COPY compute_rsyslog.conf /etc/compute_rsyslog.conf RUN chmod 0666 /etc/compute_rsyslog.conf RUN mkdir /var/log/rsyslog && chown -R postgres /var/log/rsyslog diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 910bae3bda..b0261c3227 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -31,6 +31,7 @@ hostname-validator = "1.1" indexmap.workspace = true itertools.workspace = true jsonwebtoken.workspace = true +libproc.workspace = true metrics.workspace = true nix.workspace = true notify.workspace = true @@ -49,6 +50,7 @@ serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true tar.workspace = true +tempfile.workspace = true tower.workspace = true tower-http.workspace = true tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } @@ -78,3 +80,10 @@ zstd = "0.13" bytes = "1.0" rust-ini = "0.20.0" rlimit = "0.10.1" + +inferno = { version = "0.12", default-features = false, features = [ + "multithreaded", + "nameattr", +] } +pprof = { version = "0.15", features = ["protobuf-codec", "flamegraph"] } +prost = "0.12" diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c05cc229a2..6bac941892 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -371,7 +371,9 @@ fn maybe_cgexec(cmd: &str) -> Command { } } -struct PostgresHandle { +/// A handle to the Postgres process that is running in the compute +/// node. +pub struct PostgresHandle { postgres: std::process::Child, log_collector: JoinHandle>, } diff --git a/compute_tools/src/http/routes/mod.rs b/compute_tools/src/http/routes/mod.rs index dd71f663eb..4458714eef 100644 --- a/compute_tools/src/http/routes/mod.rs +++ b/compute_tools/src/http/routes/mod.rs @@ -15,6 +15,7 @@ pub(in crate::http) mod lfc; pub(in crate::http) mod metrics; pub(in crate::http) mod metrics_json; pub(in crate::http) mod promote; +pub(in crate::http) mod profile; pub(in crate::http) mod status; pub(in crate::http) mod terminate; diff --git a/compute_tools/src/http/routes/profile.rs b/compute_tools/src/http/routes/profile.rs new file mode 100644 index 0000000000..0aed00f22a --- /dev/null +++ b/compute_tools/src/http/routes/profile.rs @@ -0,0 +1,217 @@ +//! Contains the route for profiling the compute. +//! +//! Profiling the compute means generating a pprof profile of the +//! postgres processes. +//! +//! The profiling is done using the `perf` tool, which is expected to be +//! available somewhere in `$PATH`. +use std::sync::atomic::Ordering; + +use axum::Json; +use axum::response::IntoResponse; +use http::StatusCode; +use nix::unistd::Pid; +use once_cell::sync::Lazy; +use tokio::sync::Mutex; + +use crate::http::JsonResponse; + +static CANCEL_CHANNEL: Lazy>>> = + Lazy::new(|| Mutex::new(None)); + +fn default_sampling_frequency() -> u16 { + 100 +} + +fn default_timeout_seconds() -> u8 { + 5 +} + +fn deserialize_sampling_frequency<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::Deserialize; + + const MIN_SAMPLING_FREQUENCY: u16 = 1; + const MAX_SAMPLING_FREQUENCY: u16 = 1000; + + let value = u16::deserialize(deserializer)?; + + if !(MIN_SAMPLING_FREQUENCY..=MAX_SAMPLING_FREQUENCY).contains(&value) { + return Err(serde::de::Error::custom(format!( + "sampling_frequency must be between {MIN_SAMPLING_FREQUENCY} and {MAX_SAMPLING_FREQUENCY}, got {value}" + ))); + } + Ok(value) +} + +fn deserialize_profiling_timeout<'de, D>(deserializer: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::Deserialize; + + const MIN_TIMEOUT_SECONDS: u8 = 1; + const MAX_TIMEOUT_SECONDS: u8 = 60; + + let value = u8::deserialize(deserializer)?; + + if !(MIN_TIMEOUT_SECONDS..=MAX_TIMEOUT_SECONDS).contains(&value) { + return Err(serde::de::Error::custom(format!( + "timeout_seconds must be between {MIN_TIMEOUT_SECONDS} and {MAX_TIMEOUT_SECONDS}, got {value}" + ))); + } + Ok(value) +} + +/// Request parameters for profiling the compute. +#[derive(Debug, Clone, serde::Deserialize)] +pub(in crate::http) struct ProfileRequest { + /// The profiling tool to use, currently only `perf` is supported. + profiler: crate::profiling::ProfileGenerator, + #[serde(default = "default_sampling_frequency")] + #[serde(deserialize_with = "deserialize_sampling_frequency")] + sampling_frequency: u16, + #[serde(default = "default_timeout_seconds")] + #[serde(deserialize_with = "deserialize_profiling_timeout")] + timeout_seconds: u8, + #[serde(default)] + archive: bool, +} + +/// The HTTP request handler for reporting the profiling status of +/// the compute. +pub(in crate::http) async fn profile_status() -> impl IntoResponse { + tracing::info!("Profile status request received."); + + let cancel_channel = CANCEL_CHANNEL.lock().await; + + if let Some(tx) = cancel_channel.as_ref() { + if tx.receiver_count() > 0 { + return JsonResponse::create_response( + StatusCode::OK, + "Profiling is currently in progress.", + ); + } + } + + JsonResponse::create_response(StatusCode::NO_CONTENT, "Profiling is not in progress.") +} + +/// The HTTP request handler for stopping profiling the compute. +pub(in crate::http) async fn profile_stop() -> impl IntoResponse { + tracing::info!("Profile stop request received."); + + match CANCEL_CHANNEL.lock().await.take() { + Some(tx) => { + if tx.send(()).is_err() { + tracing::error!("Failed to send cancellation signal."); + return JsonResponse::create_response( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to send cancellation signal", + ); + } + JsonResponse::create_response(StatusCode::OK, "Profiling stopped successfully.") + } + None => JsonResponse::create_response( + StatusCode::PRECONDITION_FAILED, + "Profiling is not in progress, there is nothing to stop.", + ), + } +} + +/// The HTTP request handler for starting profiling the compute. +pub(in crate::http) async fn profile_start( + Json(request): Json, +) -> impl IntoResponse { + tracing::info!("Profile start request received: {request:?}"); + + let tx = tokio::sync::broadcast::Sender::<()>::new(1); + + { + let mut cancel_channel = CANCEL_CHANNEL.lock().await; + + if cancel_channel.is_some() { + return JsonResponse::create_response( + StatusCode::CONFLICT, + "Profiling is already in progress.", + ); + } + *cancel_channel = Some(tx.clone()); + } + + tracing::info!("Profiling will start with parameters: {request:?}"); + let pg_pid = Pid::from_raw(crate::compute::PG_PID.load(Ordering::SeqCst) as _); + + let run_with_sudo = !cfg!(feature = "testing"); + + let options = crate::profiling::ProfileGenerationOptions { + profiler: request.profiler, + run_with_sudo, + pids: [pg_pid].into_iter().collect(), + follow_forks: true, + sampling_frequency: request.sampling_frequency as u32, + blocklist_symbols: vec![ + "libc".to_owned(), + "libgcc".to_owned(), + "pthread".to_owned(), + "vdso".to_owned(), + ], + archive: request.archive, + }; + + let options = crate::profiling::ProfileGenerationTaskOptions { + options, + timeout: std::time::Duration::from_secs(request.timeout_seconds as u64), + should_stop: Some(tx), + }; + + let pprof_data = crate::profiling::generate_pprof_profile(options).await; + + if CANCEL_CHANNEL.lock().await.take().is_none() { + tracing::error!("Profiling was cancelled from another request."); + + return JsonResponse::create_response( + StatusCode::NO_CONTENT, + "Profiling was cancelled from another request.", + ); + } + + let pprof_data = match pprof_data { + Ok(data) => data, + Err(e) => { + tracing::error!(error = ?e, "failed to generate pprof data"); + return JsonResponse::create_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Failed to generate pprof data: {e:?}"), + ); + } + }; + + tracing::info!("Profiling has completed successfully."); + + let mut headers = http::HeaderMap::new(); + + if request.archive { + headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/gzip"), + ); + headers.insert( + http::header::CONTENT_DISPOSITION, + http::HeaderValue::from_static("attachment; filename=\"profile.pb.gz\""), + ); + } else { + headers.insert( + http::header::CONTENT_TYPE, + http::HeaderValue::from_static("application/octet-stream"), + ); + headers.insert( + http::header::CONTENT_DISPOSITION, + http::HeaderValue::from_static("attachment; filename=\"profile.pb\""), + ); + } + + (headers, pprof_data.0).into_response() +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs index 17939e39d4..4f993b6051 100644 --- a/compute_tools/src/http/server.rs +++ b/compute_tools/src/http/server.rs @@ -27,6 +27,7 @@ use super::{ }, }; use crate::compute::ComputeNode; +use crate::http::routes::profile; /// `compute_ctl` has two servers: internal and external. The internal server /// binds to the loopback interface and handles communication from clients on @@ -81,8 +82,14 @@ impl From<&Server> for Router> { Server::External { config, compute_id, .. } => { - let unauthenticated_router = - Router::>::new().route("/metrics", get(metrics::get_metrics)); + let unauthenticated_router = Router::>::new() + .route("/metrics", get(metrics::get_metrics)) + .route( + "/profile/cpu", + get(profile::profile_status) + .post(profile::profile_start) + .delete(profile::profile_stop), + ); let authenticated_router = Router::>::new() .route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm)) diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 2d5d4565b7..a9a6506b5e 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -24,6 +24,7 @@ pub mod monitor; pub mod params; pub mod pg_helpers; pub mod pgbouncer; +pub mod profiling; pub mod rsyslog; pub mod spec; mod spec_apply; diff --git a/compute_tools/src/profiling/mod.rs b/compute_tools/src/profiling/mod.rs new file mode 100644 index 0000000000..1a7c6ca3f3 --- /dev/null +++ b/compute_tools/src/profiling/mod.rs @@ -0,0 +1,862 @@ +use std::{ + collections::{HashMap, HashSet}, + io::{BufRead, BufReader, Cursor, Write}, + ops::{Deref, DerefMut}, + path::PathBuf, +}; + +use anyhow::{Context, anyhow}; +use flate2::{Compression, write::GzEncoder}; +use inferno::collapse::Collapse; +use nix::{libc::pid_t, unistd::Pid}; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; +use tracing::instrument; + +const SUDO_PATH: &str = "/usr/bin/sudo"; +const PERF_BINARY_DEFAULT_PATH: &str = "perf"; +const BCC_PROFILE_BINARY_DEFAULT_PATH: &str = "/usr/share/bcc/tools/profile"; +const STANDARD_PATH_PATHS: &str = "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"; + +/// Either returns the path to the `sudo` binary +/// from the environment variable `SUDO_BINARY_PATH` +/// or returns the default path defined by +/// [`SUDO_PATH`]. +fn sudo_path() -> String { + std::env::var("SUDO_BINARY_PATH").unwrap_or_else(|_| SUDO_PATH.to_owned()) +} + +/// Either returns the path to the `perf` binary +/// from the environment variable `PERF_BINARY_PATH` +/// or returns the default path defined by +/// [`PERF_BINARY_DEFAULT_PATH`]. +fn perf_binary_path() -> String { + std::env::var("PERF_BINARY_PATH").unwrap_or_else(|_| PERF_BINARY_DEFAULT_PATH.to_owned()) +} + +/// Either returns the path to the `bcc-profile` binary +/// from the environment variable `BCC_PROFILE_BINARY_PATH` +/// or returns the default path defined by +/// [`BCC_PROFILE_BINARY_DEFAULT_PATH`]. +fn bcc_profile_binary_path() -> String { + std::env::var("BCC_PROFILE_BINARY_PATH") + .unwrap_or_else(|_| BCC_PROFILE_BINARY_DEFAULT_PATH.to_owned()) +} + +/// Probes the executable at the given paths for the provided binary +/// names. +/// +/// To check that the binary can run, it is invoked with the arguments. +fn probe_executable_at_paths( + exact_matches: &[&str], + binary_names: &[&str], + paths: &[PathBuf], + arg: Option<&str>, + path_env_override: Option, +) -> anyhow::Result { + let check_binary_runs = move |path: &str| { + let mut command = vec![path]; + + if let Some(arg) = arg { + command.push(arg); + } + + check_binary_runs(&command, path_env_override.clone()) + }; + + let mut probed = Vec::new(); + + for exact_match in exact_matches { + match check_binary_runs(exact_match) { + Ok(_) => { + tracing::trace!("Found exact match for binary: {exact_match}"); + return Ok(exact_match.to_owned().to_owned()); + } + Err(e) => { + probed.push(exact_match.to_owned().to_owned()); + tracing::trace!("Failed to run the binary at path: {exact_match}: {e:?}"); + } + } + } + + for path in paths { + for name in binary_names { + let full_path = path.join(name); + + tracing::trace!("Looking at path: {}", full_path.to_string_lossy()); + + if full_path.exists() && full_path.is_file() { + tracing::trace!( + "There is an existing file at path: {}", + full_path.to_string_lossy() + ); + + match check_binary_runs(full_path.to_string_lossy().as_ref()) { + Ok(_) => { + tracing::trace!( + "Found valid binary at path: {}", + full_path.to_string_lossy() + ); + + return Ok(full_path.to_string_lossy().into_owned()); + } + Err(e) => { + tracing::trace!( + "Failed to run the binary at path: {}: {e:?}", + full_path.to_string_lossy(), + ); + } + } + } + + probed.push(full_path.to_string_lossy().into_owned()); + } + } + + Err(anyhow!( + "No valid binary found in the paths: {paths:?}, probed paths: {probed:?}" + )) +} + +/// Probes the executable in the known paths for the provided binary +/// names. At first, the function checks the exact matches +/// provided in the `exact_matches` slice. If any of those binaries +/// can be run with the provided `arg`, it returns the path to that +/// binary. If no exact matches are found, it checks the standard +/// paths defined by [`STANDARD_PATH_PATHS`] and looks for the binaries +/// with the names provided in the `binary_names` slice. +/// If any of those binaries can be run with the provided `arg`, it +/// returns the path to that binary. +/// If no valid binary is found, it returns an error. +fn probe_executable_in_known_paths( + exact_matches: &[&str], + binary_names: &[&str], + arg: Option<&str>, + path_env_override: Option, +) -> anyhow::Result { + let paths = STANDARD_PATH_PATHS + .split(':') + .map(PathBuf::from) + .collect::>(); + + probe_executable_at_paths(exact_matches, binary_names, &paths, arg, path_env_override) +} + +/// Returns the path to the `sudo` binary if it can be run. +fn probe_sudo_paths() -> anyhow::Result { + let sudo_path = sudo_path(); + let exact_matches = [sudo_path.as_ref()]; + let binary_names = ["sudo"]; + + probe_executable_in_known_paths(&exact_matches, &binary_names, Some("--version"), None) +} + +/// Checks the known paths for `bcc-profile` binary and verifies that it +/// can be run. +/// +/// First, if the provided path is `Some`, it checks if the binary +/// at that path can be run with `--help` argument. If it can, it +/// returns the path. +/// +/// If the provided path is `None`, it checks the default `bcc-profile` +/// binary path defined by [`BCC_PROFILE_BINARY_DEFAULT_PATH`] constant. +/// If that binary can be run, it returns the path. +/// +/// If neither the provided path nor the default path can be run, +/// it checks the standard paths defined by [`STANDARD_PATH_PATHS`] +/// and looks for known binary names like `bcc-profile`, `profile.py`, +/// or `profile-bpfcc` and other known names from various linux +/// distributions. If any of those binaries can be run, it returns the +/// path to that binary. If no valid binary is found, it returns an +/// error. +fn probe_bcc_profile_binary_paths(provided_path: Option) -> anyhow::Result { + let binary_names = ["bcc-profile", "profile.py", "profile-bpfcc", "profile"]; + + let mut exact_matches = Vec::new(); + + if let Some(path) = provided_path.as_ref() { + if let Some(path) = path.to_str() { + exact_matches.push(path); + } + } + + let bcc_profile_binary_path = bcc_profile_binary_path(); + exact_matches.push(&bcc_profile_binary_path); + + probe_executable_in_known_paths( + &exact_matches, + &binary_names, + Some("--help"), + Some(get_override_path_env()), + ) +} + +/// Represents the pprof data generated from a profiling tool. +#[repr(transparent)] +#[derive(Debug, Clone)] +pub struct PprofData(pub(crate) Vec); + +impl PprofData { + /// Dumps the pprof data to a file. + pub fn write_to_file(&self, path: &PathBuf) -> anyhow::Result<()> { + let mut file = std::fs::File::create(path).context(format!( + "couldn't create a file for dumping pprof data at path: {path:?}" + ))?; + file.write_all(&self.0) + .context("couldn't write pprof raw data to the file at path: {path:?}")?; + Ok(()) + } +} + +impl Deref for PprofData { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for PprofData { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +/// Returns a list of child processes for a given parent process ID. +fn list_children_processes(parent_pid: Pid) -> anyhow::Result> { + use libproc::processes::{ProcFilter, pids_by_type}; + + let filter = ProcFilter::ByParentProcess { + ppid: parent_pid.as_raw() as _, + }; + + Ok(pids_by_type(filter) + .context("failed to list child processes")? + .into_iter() + .map(|p| Pid::from_raw(p as _)) + .collect()) +} + +fn check_binary_runs(command: &[&str], override_path: Option) -> anyhow::Result<()> { + if command.is_empty() { + return Err(anyhow!("Command cannot be empty")); + } + + // One by one the strings from the command are joined to form the + // word in the invocation of the binary. + let (first, rest) = command + .split_first() + .ok_or_else(|| anyhow!("Command must contain at least one argument (the binary name)"))?; + + let mut output = std::process::Command::new(first); + + for arg in rest { + output.arg(arg); + } + + let command_invocation = get_command_invocation_string(&output); + + if let Some(path) = override_path { + output.env("PATH", path); + } + + let output = output.output().context(format!( + "failed to run the command to check if it can be run: {command_invocation}" + ))?; + + if !output.status.success() { + return Err(anyhow!( + "The command invocation is not possible: {}", + output.status + )); + } + + Ok(()) +} + +fn check_perf_runs(perf_binary_path: &str, run_with_sudo: bool) -> anyhow::Result<()> { + let sudo_path = sudo_path(); + + let command = if run_with_sudo { + vec![&sudo_path, perf_binary_path, "version"] + } else { + vec![perf_binary_path, "version"] + }; + + check_binary_runs(&command, None) +} + +/// Returns the command invocation string for a given +/// [`tokio::process::Command`]. +fn get_command_invocation_string(command: &std::process::Command) -> String { + let mut command_str = command.get_program().to_string_lossy().into_owned(); + + for arg in command.get_args() { + command_str.push(' '); + command_str.push_str(&arg.to_string_lossy()); + } + + command_str +} + +/// Returns the clear `PATH` environment variable, so that even the +/// Python scripts can run cleanly without any virtualenv or pyenv +/// interference. +fn get_override_path_env() -> String { + let path = std::env::var("PATH").unwrap_or_else(|_| STANDARD_PATH_PATHS.to_string()); + + #[cfg(feature = "testing")] + let path = path + .split(':') + .filter(|p| { + let p = p.to_lowercase(); + !p.contains("virtualenv") + && !p.contains("venv") + && !p.contains("pyenv") + && !p.contains("pypoetry") + && !p.contains("pyenv-virtualenv") + && !p.contains("pyenv-virtualenvs") + }) + .collect::>() + .join(":"); + + path +} + +/// The generator for the pprof profile. +/// +/// The generator tools have the path to the binary +/// that will be used to generate the profile. +/// If the path is `None`, the tool will be searched +/// in the system's `PATH` using the default name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ProfileGenerator { + /// The `bcc` tool for generating profiles. + BccProfile(Option), + /// The Linux `perf` tool. + Perf(Option), +} + +impl ProfileGenerator { + /// Returns the path to the `profile` binary if it is set. + pub fn get_bcc_profile_binary_path(&self) -> Option { + match self { + ProfileGenerator::BccProfile(path) => path.clone(), + _ => None, + } + } +} + +/// The options for generating a pprof profile using the `perf` tool. +#[derive(Debug, Clone)] +pub struct ProfileGenerationOptions { + pub profiler: ProfileGenerator, + /// If set to `true`, the `perf` command will be run with `sudo`. + /// This is useful for profiling processes that require elevated + /// privileges. + /// If `false`, the command will be run without `sudo`. + pub run_with_sudo: bool, + /// The PID of the process to profile. This can be the main process + /// or a child process. For targeting postgres, this should be the + /// PID of the main postgres process. + pub pids: HashSet, + /// If set to `true`, the `perf` command will follow forks. + /// This means that it will continue to profile child processes + /// that are created by the main process after it has already + /// started profiling. + pub follow_forks: bool, + /// The sampling frequency in Hz. + /// This is the frequency at which stack traces will be sampled. + pub sampling_frequency: u32, + /// A list of symbols to block from the profiling output. + /// This is useful for filtering out noise from the profiling data, + /// such as system libraries or other irrelevant symbols. + pub blocklist_symbols: Vec, + /// Whether to archive the pprof profile data or not. If yes, + /// the profile data will be gzipped before returning. + pub archive: bool, +} + +/// The options for the task for generating a pprof profile using the +/// `perf` tool. +#[derive(Debug, Clone)] +pub struct ProfileGenerationTaskOptions { + /// The generation options. See [`ProfileGenerationOptions`]. + pub options: ProfileGenerationOptions, + /// The duration for which the profiling should run. + /// This is the maximum time the profiling will run before it is + /// stopped. If the profiling is not stopped manually, it will run + /// for this duration. + pub timeout: std::time::Duration, + /// An optional channel receiver that can be used to signal the + /// profiling to stop early. If provided, the profiling will stop + /// when a message is received on this channel or when the timeout + /// is reached, whichever comes first. + pub should_stop: Option>, +} + +/// Profiles the processes using the `bcc` framework and `profile.py` +/// tool and returns the collapsed (folded) stack output as [`Vec`]. +#[instrument] +async fn profile_with_bcc_profile( + options: ProfileGenerationTaskOptions, +) -> anyhow::Result> { + let bcc_profile_binary_path = + probe_bcc_profile_binary_paths(options.options.profiler.get_bcc_profile_binary_path()) + .context("failed to probe bcc profile binary paths")?; + + let path = get_override_path_env(); + + let mut command = tokio::process::Command::new( + probe_sudo_paths().context("failed to find the sudo executable")?, + ); + + command + .arg(bcc_profile_binary_path) + .arg("-f") + .arg("-F") + .arg(options.options.sampling_frequency.to_string()) + .arg("-p") + .arg( + options + .options + .pids + .iter() + .map(|p| p.as_raw().to_string()) + .collect::>() + .join(","), + ) + .arg(options.timeout.as_secs().to_string()) + .env_clear() + .env("PATH", path); + + let command_str = get_command_invocation_string(command.as_std()); + + let result = command.output(); + + let result = match options.should_stop.as_ref().map(|s| s.subscribe()) { + Some(mut rx) => { + tokio::select! { + _ = rx.recv() => { + tracing::trace!("Received shutdown signal, stopping perf..."); + + return Err(anyhow!("Profiling was stopped by shutdown signal")); + } + result = result => { + tracing::trace!("Profiling completed, processing result..."); + result + } + } + } + None => { + tracing::trace!("No shutdown signal receiver provided, running bcc profile..."); + result.await + } + } + .context(format!("failed to invoke bcc-profile using {command_str}"))?; + + match result.status.success() { + true => Ok(result.stdout), + false => { + return Err(anyhow!( + "failed to run bcc profiler, invocation: {command_str}, stderr: {}", + String::from_utf8_lossy(&result.stderr) + )); + } + } +} + +/// Profiles the processes using the `perf` tool and returns the +/// collapsed (folded) stack output as [`Vec`]. +#[allow(unsafe_code)] +#[instrument] +async fn profile_with_perf(options: ProfileGenerationTaskOptions) -> anyhow::Result> { + let perf_binary_path = match options.options.profiler { + ProfileGenerator::BccProfile(_) => { + return Err(anyhow!( + "bcc profile generator is not supported for perf profiling" + )); + } + ProfileGenerator::Perf(v) => v, + } + .unwrap_or_else(|| PathBuf::from(perf_binary_path())); + + check_perf_runs( + perf_binary_path + .to_str() + .context("couldn't reconstruct perf binary path as string.")?, + options.options.run_with_sudo, + ) + .context("couldn't check that perf is available and can be run")?; + + let temp_file = tempfile::NamedTempFile::new() + .context("failed to create temporary file for perf output")?; + let temp_file_path = temp_file.path(); + + // Step 1: Run perf to collect stack traces + let mut perf_record_command = if options.options.run_with_sudo { + let mut cmd = + tokio::process::Command::new(probe_sudo_paths().context("failed to find sudo")?); + cmd.arg(&perf_binary_path); + cmd + } else { + tokio::process::Command::new(&perf_binary_path) + }; + + let mut perf_record_command = perf_record_command + .arg("record") + // Target the specified process IDs. + .arg("-a") + // Specify the sampling frequency or default to 99 Hz. + .arg("-F") + .arg(options.options.sampling_frequency.to_string()) + // Enable call-graph (stack chain/backtrace) recording for both + // kernel space and user space. + .arg("-g") + // Disable buffering to ensure that the output is written + // immediately to the temporary file. + // This is important for real-time profiling and ensures that + // we get the most accurate stack traces and that after we "ask" + // perf to stop, we get the data dumped to the file properly. + .arg("--no-buffering"); + + if options.options.follow_forks { + perf_record_command = perf_record_command.arg("--inherit"); + } + + // Make it write to stdout instead of a file. + let perf_record_command = perf_record_command + .arg("-o") + .arg(temp_file_path) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::piped()); + + let mut perf_record_command = perf_record_command + .spawn() + .context("failed to spawn a process for perf record")?; + + let rx = options.should_stop.as_ref().map(|s| s.subscribe()); + + if let Some(mut rx) = rx { + tokio::select! { + _ = rx.recv() => { + tracing::trace!("Received shutdown signal, stopping perf..."); + } + _ = tokio::time::sleep(options.timeout) => { + tracing::trace!("Timeout reached, stopping perf..."); + } + } + } else { + tokio::time::sleep(options.timeout).await; + tracing::trace!("Timeout reached, stopping perf..."); + } + + let perf_record_pid = perf_record_command + .id() + .ok_or_else(|| anyhow!("failed to get perf record process ID"))?; + + // Send SIGTERM to the perf record process to stop it + // gracefully and wait for it to finish. + let _ = nix::sys::signal::kill( + nix::unistd::Pid::from_raw(perf_record_pid as i32), + nix::sys::signal::Signal::SIGTERM, + ); + + tracing::trace!( + "Waiting for perf record to finish for pids = {:?}...", + options.options.pids + ); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + if perf_record_command.try_wait().is_err() { + perf_record_command + .kill() + .await + .context("failed to kill perf record command")?; + } + + let _ = perf_record_command + .wait() + .await + .context("failed to wait for perf record command")?; + + let mut perf_record_stderr = perf_record_command + .stderr + .take() + .context("failed to take stderr from perf record command")?; + + let mut perf_record_stderr_string: String = String::new(); + perf_record_stderr + .read_to_string(&mut perf_record_stderr_string) + .await + .context("failed to read stderr from perf record command")?; + + let temp_file_size = std::fs::metadata(temp_file_path) + .context(format!( + "couldn't get metadata for temp file: {temp_file_path:?}" + ))? + .len(); + + if temp_file_size == 0 { + tracing::warn!( + "perf record output file is empty, no data collected.\nPerf stderr: {perf_record_stderr_string}" + ); + + return Err(anyhow!( + "perf record output file is empty, no data collected" + )); + } + + let perf_script_output = tokio::process::Command::new(&perf_binary_path) + .arg("script") + .arg("-i") + .arg(temp_file_path) + .arg(format!( + "--pid={}", + options + .options + .pids + .iter() + .map(|p| p.as_raw().to_string()) + .collect::>() + .join(",") + )) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .output() + .await + .context(format!("couldn't run perf script -i {temp_file_path:?}"))?; + + if !perf_script_output.status.success() { + return Err(anyhow!( + "perf script command failed: {}", + String::from_utf8(perf_script_output.stderr) + .unwrap_or_else(|_| "Invalid UTF-8 output".to_string()) + )); + } + + if perf_script_output.stdout.is_empty() { + return Err(anyhow!(format!( + "Perf script output is empty for pid = {:?}.\n", + options.options.pids + ))); + } + + // Step 2: Collapse the stack traces into a folded stack trace + let mut folder = { + let mut options = inferno::collapse::perf::Options::default(); + options.annotate_jit = true; // Enable JIT annotation if needed + options.annotate_kernel = true; // Enable kernel annotation if needed + options.include_addrs = true; // Include addresses in the output + options.include_pid = true; // Include PIDs in the output + options.include_tid = true; // Include TIDs in the output + inferno::collapse::perf::Folder::from(options) + }; + + let mut collapsed = Vec::new(); + + folder + .collapse(perf_script_output.stdout.as_slice(), &mut collapsed) + .context("couldn't collapse the output of perf script into the folded format")?; + + if collapsed.is_empty() { + tracing::error!( + "collapsed stack trace is empty for output: {}", + String::from_utf8_lossy(&perf_script_output.stdout) + ); + + return Err(anyhow!("collapsed stack trace is empty")); + } + + Ok(collapsed) +} + +/// Run perf against a process with the given name and generate a pprof +/// profile from the output. +/// +/// The pipeline is as follows: +/// 1. Running perf for the specified process name and duration. +/// 2. Collapsing the perf output into a folded stack trace. +/// 3. Generating a pprof profile from the collapsed stack trace. +/// +/// This function blocks until the profiling is complete or the timeout +/// is reached. +/// +/// If the perf binary path is not provided, it defaults to "perf" in +/// the system's `PATH`. +pub async fn generate_pprof_profile( + mut options: ProfileGenerationTaskOptions, +) -> anyhow::Result { + options.options.pids = options + .options + .pids + .into_iter() + .flat_map(list_children_processes) + .flatten() + .collect::>(); + + let blocklist_symbols = options.options.blocklist_symbols.clone(); + let archive = options.options.archive; + + let collapsed = match options.options.profiler { + ProfileGenerator::BccProfile(_) => profile_with_bcc_profile(options).await, + ProfileGenerator::Perf(_) => profile_with_perf(options).await, + } + .context("failed to run the profiler")?; + + if collapsed.is_empty() { + return Err(anyhow!("collapsed output is empty, no data collected")); + } + + generate_pprof_from_collapsed(&collapsed, &blocklist_symbols, archive) + .context("failed to generate pprof profile from collapsed stack trace") +} + +fn archive_bytes(bytes: &[u8], output: &mut W) -> anyhow::Result<()> { + let mut encoder = GzEncoder::new(output, Compression::default()); + + encoder + .write_all(bytes) + .context("couldn't write the bytes into the gz encoder")?; + encoder + .finish() + .context("couldn't finish the gz encoder session")?; + + Ok(()) +} + +/// Returns the executable path for a given process ID. +fn get_exe_from_pid(pid: Pid, fallback_name: &str) -> String { + libproc::proc_pid::name(pid.as_raw()).unwrap_or_else(|_| fallback_name.to_owned()) +} + +/// Generate a pprof profile from a collapsed stack trace file. +pub fn generate_pprof_from_collapsed>( + collapsed_bytes: &[u8], + blocklist_symbols: &[S], + archive: bool, +) -> anyhow::Result { + use pprof::protos::Message; + use pprof::protos::profile::{Function, Line, Location, Profile, Sample, ValueType}; + + let mut profile = Profile::default(); + let mut value_type = ValueType::new(); + value_type.ty = profile.string_table.len() as i64; + value_type.unit = profile.string_table.len() as i64 + 1; + + profile.sample_type.push(value_type); + profile.string_table.push("".to_string()); // 0 + profile.string_table.push("samples".to_string()); // 1 + profile.string_table.push("count".to_string()); // 2 + + let mut mapping_map = HashMap::new(); // binary_name -> mapping_id + let mut function_map = HashMap::new(); // (function_name) -> function_id + let mut location_map = HashMap::new(); // (function_id) -> location_id + + let reader = BufReader::new(Cursor::new(collapsed_bytes)); + + for line in reader.lines() { + let line = line.context("invalid line found")?; + let Some((stack_str, count_str)) = line.rsplit_once(' ') else { + continue; + }; + let count: i64 = count_str + .trim() + .parse() + .context("failed to parse the counter")?; + + let mut parts = stack_str.trim().split(';'); + + // Extract binary name and function stack + let raw_binary = parts.next().unwrap_or("[unknown]"); + let (bin_part, pid) = raw_binary + .rsplit_once('/') + .unwrap_or((raw_binary, "unknown")); + let binary_name = bin_part.strip_suffix("-?").unwrap_or(bin_part); + + let mapping_id = *mapping_map + .entry(binary_name.to_string()) + .or_insert_with(|| { + let exe = + get_exe_from_pid(Pid::from_raw(pid.parse::().unwrap_or(0)), "unknown"); + let id = profile.mapping.len() as u64 + 1; + profile.string_table.push(exe); + profile.mapping.push(pprof::protos::profile::Mapping { + id, + filename: profile.string_table.len() as i64 - 1, + ..Default::default() + }); + id + }); + + let stack: Vec<&str> = stack_str + .trim() + .split(';') + .filter(|s| { + // Filter out blocklisted symbols + !blocklist_symbols.iter().any(|b| s.contains(b.as_ref())) + }) + .collect(); + + let mut location_ids = Vec::new(); + + for func_name in stack.iter() { + let func_id = *function_map + .entry(func_name.to_string()) + .or_insert_with(|| { + let id = profile.function.len() as u64 + 1; + profile.string_table.push(func_name.to_string()); + profile.function.push(Function { + id, + name: profile.string_table.len() as i64 - 1, + system_name: profile.string_table.len() as i64 - 1, + filename: 0, + start_line: 0, + special_fields: Default::default(), + }); + id + }); + + let loc_id = *location_map.entry(func_id).or_insert_with(|| { + let id = profile.location.len() as u64 + 1; + profile.location.push(Location { + id, + mapping_id, + address: 0, + line: vec![Line { + function_id: func_id, + line: 0, + special_fields: Default::default(), + }], + is_folded: true, + special_fields: Default::default(), + }); + id + }); + + location_ids.push(loc_id); + } + + profile.sample.push(Sample { + location_id: location_ids.into_iter().rev().collect(), + value: vec![count], + label: vec![], + special_fields: Default::default(), + }); + } + + let mut protobuf_encoded = Vec::new(); + profile + .write_to_vec(&mut protobuf_encoded) + .context("failed to write the encoded pprof-protobuf message into a vec")?; + + Ok(PprofData(if archive { + let mut archived = Vec::new(); + archive_bytes(&protobuf_encoded, &mut archived) + .context("couldn't archive the pprof-protobuf message data")?; + archived + } else { + protobuf_encoded + })) +} diff --git a/libs/neon-shmem/Cargo.toml b/libs/neon-shmem/Cargo.toml index 2a636bec40..4b8ab26da6 100644 --- a/libs/neon-shmem/Cargo.toml +++ b/libs/neon-shmem/Cargo.toml @@ -6,8 +6,8 @@ license.workspace = true [dependencies] thiserror.workspace = true -nix.workspace=true +nix.workspace = true workspace_hack = { version = "0.1", path = "../../workspace_hack" } [target.'cfg(target_os = "macos")'.dependencies] -tempfile = "3.14.0" +tempfile = "3.20.0" diff --git a/poetry.lock b/poetry.lock index 1bc5077eb7..cb1f2e92bb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -2326,6 +2326,25 @@ files = [ {file = "propcache-0.2.0.tar.gz", hash = "sha256:df81779732feb9d01e5d513fad0122efb3d53bbc75f61b2a4f29a020bc985e70"}, ] +[[package]] +name = "protobuf" +version = "6.31.1" +description = "" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9"}, + {file = "protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447"}, + {file = "protobuf-6.31.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:6f1227473dc43d44ed644425268eb7c2e488ae245d51c6866d19fe158e207402"}, + {file = "protobuf-6.31.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:a40fc12b84c154884d7d4c4ebd675d5b3b5283e155f324049ae396b95ddebc39"}, + {file = "protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4ee898bf66f7a8b0bd21bce523814e6fbd8c6add948045ce958b73af7e8878c6"}, + {file = "protobuf-6.31.1-cp39-cp39-win32.whl", hash = "sha256:0414e3aa5a5f3ff423828e1e6a6e907d6c65c1d5b7e6e975793d5590bdeecc16"}, + {file = "protobuf-6.31.1-cp39-cp39-win_amd64.whl", hash = "sha256:8764cf4587791e7564051b35524b72844f845ad0bb011704c3736cce762d8fe9"}, + {file = "protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e"}, + {file = "protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a"}, +] + [[package]] name = "psutil" version = "5.9.4" @@ -3309,6 +3328,18 @@ files = [ [package.dependencies] cryptography = "*" +[[package]] +name = "types-protobuf" +version = "6.30.2.20250516" +description = "Typing stubs for protobuf" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "types_protobuf-6.30.2.20250516-py3-none-any.whl", hash = "sha256:8c226d05b5e8b2623111765fa32d6e648bbc24832b4c2fddf0fa340ba5d5b722"}, + {file = "types_protobuf-6.30.2.20250516.tar.gz", hash = "sha256:aecd1881770a9bb225ede66872ef7f0da4505edd0b193108edd9892e48d49a41"}, +] + [[package]] name = "types-psutil" version = "5.9.5.12" @@ -3847,4 +3878,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5" +content-hash = "7cc735f57c2760db6c994575a98d4f0e2670497ad9e909135a3bc67d479f5edf" diff --git a/pyproject.toml b/pyproject.toml index e7e314d144..21e6997376 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,8 @@ psycopg2-binary = "^2.9.10" typing-extensions = "^4.12.2" PyJWT = {version = "^2.1.0", extras = ["crypto"]} requests = "^2.32.4" +protobuf = "^6.31.1" +types-protobuf="^6.30.2" pytest-xdist = "^3.3.1" asyncpg = "^0.30.0" aiopg = "^1.4.0" @@ -63,6 +65,7 @@ build-backend = "poetry.core.masonry.api" exclude = [ "^vendor/", "^target/", + "test_runner/regress/data/profile_pb2.py", "test_runner/performance/pgvector/loaddata.py", ] check_untyped_defs = true @@ -94,6 +97,7 @@ target-version = "py311" extend-exclude = [ "vendor/", "target/", + "test_runner/regress/data/profile_pb2.py", "test_runner/stubs/", # Autogenerated by mypy's stubgen ] line-length = 100 # this setting is rather guidance, it won't fail if it can't make the shorter diff --git a/test_runner/fixtures/endpoint/http.py b/test_runner/fixtures/endpoint/http.py index 1d278095ce..8ac4eb1744 100644 --- a/test_runner/fixtures/endpoint/http.py +++ b/test_runner/fixtures/endpoint/http.py @@ -115,6 +115,39 @@ class EndpointHttpClient(requests.Session): json: dict[str, str] = res.json() return json + def start_profiling_cpu( + self, sampling_frequency: int, timeout_seconds: int, archive: bool = False + ) -> tuple[int, bytes]: + url = f"http://localhost:{self.external_port}/profile/cpu" + params = { + "profiler": {"BccProfile": None}, + "sampling_frequency": sampling_frequency, + "timeout_seconds": timeout_seconds, + "archive": archive, + } + + res = self.post( + url, + json=params, + auth=self.auth, + ) + + return res.status_code, res.content + + def stop_profiling_cpu(self) -> int: + url = f"http://localhost:{self.external_port}/profile/cpu" + res = self.delete(url, auth=self.auth) + return res.status_code + + def get_profiling_cpu_status(self) -> bool: + """ + Returns True if CPU profiling is currently running, False otherwise. + """ + url = f"http://localhost:{self.external_port}/profile/cpu" + res = self.get(url, auth=self.auth) + res.raise_for_status() + return res.status_code == 200 + def database_schema(self, database: str): res = self.get( f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}", diff --git a/test_runner/regress/data/__init__.py b/test_runner/regress/data/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test_runner/regress/data/profile_pb2.py b/test_runner/regress/data/profile_pb2.py new file mode 100644 index 0000000000..c507ef9f6c --- /dev/null +++ b/test_runner/regress/data/profile_pb2.py @@ -0,0 +1,41 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# type: ignore +# source: profile.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rprofile.proto\x12\x12perftools.profiles\"\xd5\x03\n\x07Profile\x12\x32\n\x0bsample_type\x18\x01 \x03(\x0b\x32\x1d.perftools.profiles.ValueType\x12*\n\x06sample\x18\x02 \x03(\x0b\x32\x1a.perftools.profiles.Sample\x12,\n\x07mapping\x18\x03 \x03(\x0b\x32\x1b.perftools.profiles.Mapping\x12.\n\x08location\x18\x04 \x03(\x0b\x32\x1c.perftools.profiles.Location\x12.\n\x08\x66unction\x18\x05 \x03(\x0b\x32\x1c.perftools.profiles.Function\x12\x14\n\x0cstring_table\x18\x06 \x03(\t\x12\x13\n\x0b\x64rop_frames\x18\x07 \x01(\x03\x12\x13\n\x0bkeep_frames\x18\x08 \x01(\x03\x12\x12\n\ntime_nanos\x18\t \x01(\x03\x12\x16\n\x0e\x64uration_nanos\x18\n \x01(\x03\x12\x32\n\x0bperiod_type\x18\x0b \x01(\x0b\x32\x1d.perftools.profiles.ValueType\x12\x0e\n\x06period\x18\x0c \x01(\x03\x12\x0f\n\x07\x63omment\x18\r \x03(\x03\x12\x1b\n\x13\x64\x65\x66\x61ult_sample_type\x18\x0e \x01(\x03\"\'\n\tValueType\x12\x0c\n\x04type\x18\x01 \x01(\x03\x12\x0c\n\x04unit\x18\x02 \x01(\x03\"V\n\x06Sample\x12\x13\n\x0blocation_id\x18\x01 \x03(\x04\x12\r\n\x05value\x18\x02 \x03(\x03\x12(\n\x05label\x18\x03 \x03(\x0b\x32\x19.perftools.profiles.Label\"@\n\x05Label\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\x0b\n\x03str\x18\x02 \x01(\x03\x12\x0b\n\x03num\x18\x03 \x01(\x03\x12\x10\n\x08num_unit\x18\x04 \x01(\x03\"\xdd\x01\n\x07Mapping\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x14\n\x0cmemory_start\x18\x02 \x01(\x04\x12\x14\n\x0cmemory_limit\x18\x03 \x01(\x04\x12\x13\n\x0b\x66ile_offset\x18\x04 \x01(\x04\x12\x10\n\x08\x66ilename\x18\x05 \x01(\x03\x12\x10\n\x08\x62uild_id\x18\x06 \x01(\x03\x12\x15\n\rhas_functions\x18\x07 \x01(\x08\x12\x15\n\rhas_filenames\x18\x08 \x01(\x08\x12\x18\n\x10has_line_numbers\x18\t \x01(\x08\x12\x19\n\x11has_inline_frames\x18\n \x01(\x08\"v\n\x08Location\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x12\n\nmapping_id\x18\x02 \x01(\x04\x12\x0f\n\x07\x61\x64\x64ress\x18\x03 \x01(\x04\x12&\n\x04line\x18\x04 \x03(\x0b\x32\x18.perftools.profiles.Line\x12\x11\n\tis_folded\x18\x05 \x01(\x08\")\n\x04Line\x12\x13\n\x0b\x66unction_id\x18\x01 \x01(\x04\x12\x0c\n\x04line\x18\x02 \x01(\x03\"_\n\x08\x46unction\x12\n\n\x02id\x18\x01 \x01(\x04\x12\x0c\n\x04name\x18\x02 \x01(\x03\x12\x13\n\x0bsystem_name\x18\x03 \x01(\x03\x12\x10\n\x08\x66ilename\x18\x04 \x01(\x03\x12\x12\n\nstart_line\x18\x05 \x01(\x03\x42-\n\x1d\x63om.google.perftools.profilesB\x0cProfileProtob\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'profile_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'\n\035com.google.perftools.profilesB\014ProfileProto' + _PROFILE._serialized_start=38 + _PROFILE._serialized_end=507 + _VALUETYPE._serialized_start=509 + _VALUETYPE._serialized_end=548 + _SAMPLE._serialized_start=550 + _SAMPLE._serialized_end=636 + _LABEL._serialized_start=638 + _LABEL._serialized_end=702 + _MAPPING._serialized_start=705 + _MAPPING._serialized_end=926 + _LOCATION._serialized_start=928 + _LOCATION._serialized_end=1046 + _LINE._serialized_start=1048 + _LINE._serialized_end=1089 + _FUNCTION._serialized_start=1091 + _FUNCTION._serialized_end=1186 +# @@protoc_insertion_point(module_scope) diff --git a/test_runner/regress/test_compute_profiling.py b/test_runner/regress/test_compute_profiling.py new file mode 100644 index 0000000000..145d2d035c --- /dev/null +++ b/test_runner/regress/test_compute_profiling.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import gzip +import io +import threading +import time +from typing import TYPE_CHECKING, Any + +import pytest +from data.profile_pb2 import Profile # type: ignore +from fixtures.log_helper import log +from fixtures.utils import run_only_on_default_postgres +from google.protobuf.message import Message +from requests import HTTPError + +if TYPE_CHECKING: + from fixtures.endpoint.http import EndpointHttpClient + from fixtures.neon_fixtures import NeonEnv + +REASON = "test doesn't use postgres" + + +def _start_profiling_cpu( + client: EndpointHttpClient, event: threading.Event | None, archive: bool = False +) -> Profile | None: + """ + Start CPU profiling for the compute node. + """ + log.info("Starting CPU profiling...") + + if event is not None: + event.set() + + status, response = client.start_profiling_cpu(100, 30, archive=archive) + match status: + case 200: + log.debug("CPU profiling finished") + profile: Any = Profile() + if archive: + log.debug("Unpacking gzipped profile data") + with gzip.GzipFile(fileobj=io.BytesIO(response)) as f: + response = f.read() + else: + log.debug("Unpacking non-gzipped profile data") + Message.ParseFromString(profile, response) + return profile + case 204: + log.debug("CPU profiling was stopped") + raise HTTPError("Failed to finish CPU profiling: was stopped.") + case 409: + log.debug("CPU profiling is already in progress, cannot start again") + raise HTTPError("Failed to finish CPU profiling: profiling is already in progress.") + case 500: + response_str = response.decode("utf-8", errors="replace") + log.debug( + f"Failed to finish CPU profiling, was stopped or got an error: {status}: {response_str}" + ) + raise HTTPError(f"Failed to finish CPU profiling, {status}: {response_str}") + case _: + log.error(f"Failed to start CPU profiling: {status}") + raise HTTPError(f"Failed to start CPU profiling: {status}") + + +def _stop_profiling_cpu(client: EndpointHttpClient, event: threading.Event | None): + """ + Stop CPU profiling for the compute node. + """ + log.info("Manually stopping CPU profiling...") + + if event is not None: + event.set() + + status = client.stop_profiling_cpu() + match status: + case 200: + log.debug("CPU profiling stopped successfully") + case 412: + log.debug("CPU profiling is not running, nothing to do") + case _: + log.error(f"Failed to stop CPU profiling: {status}") + raise HTTPError(f"Failed to stop CPU profiling: {status}") + return status + + +def _wait_till_profiling_starts( + http_client: EndpointHttpClient, + event: threading.Event | None, + repeat_delay_secs: float = 0.3, + timeout: int = 60, +) -> bool: + """ + Wait until CPU profiling starts. + """ + log.info("Waiting for CPU profiling to start...") + + end_time = time.time() + timeout + + while not http_client.get_profiling_cpu_status(): + time.sleep(repeat_delay_secs) + + if end_time <= time.time(): + log.error("Timeout while waiting for CPU profiling to start") + return False + + log.info("CPU profiling has started successfully and is in progress") + + if event is not None: + event.set() + + return True + + +def _wait_and_assert_cpu_profiling(http_client: EndpointHttpClient, event: threading.Event | None): + profile = _start_profiling_cpu(http_client, event) + + if profile is None: + log.error("The received profiling data is malformed or empty.") + return + + assert len(profile.sample) > 0, "No samples found in CPU profiling data" + assert len(profile.mapping) > 0, "No mappings found in CPU profiling data" + assert len(profile.location) > 0, "No locations found in CPU profiling data" + assert len(profile.function) > 0, "No functions found in CPU profiling data" + assert len(profile.string_table) > 0, "No string tables found in CPU profiling data" + strings = [ + "PostgresMain", + "ServerLoop", + "BackgroundWorkerMain", + "pq_recvbuf", + "pq_getbyte", + ] + + assert any(s in profile.string_table for s in strings), ( + f"Expected at least one of {strings} in string table, but none found" + ) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_with_timeout(neon_simple_env: NeonEnv): + """ + Test that CPU profiling works correctly with timeout. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + pg_conn = endpoint.connect() + pg_cur = pg_conn.cursor() + pg_cur.execute("create database profiling_test2") + http_client = endpoint.http_client() + + def _wait_and_assert_cpu_profiling_local(): + _wait_and_assert_cpu_profiling(http_client, None) + + thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local) + thread.start() + + inserting_should_stop_event = threading.Event() + + def insert_rows(): + lfc_conn = endpoint.connect(dbname="profiling_test2") + lfc_cur = lfc_conn.cursor() + n_records = 0 + lfc_cur.execute( + "create table t(pk integer primary key, payload text default repeat('?', 128))" + ) + batch_size = 1000 + + log.info("Inserting rows") + + while not inserting_should_stop_event.is_set(): + n_records += batch_size + lfc_cur.execute( + f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))" + ) + + log.info(f"Inserted {n_records} rows") + + thread2 = threading.Thread(target=insert_rows) + + assert _wait_till_profiling_starts(http_client, None) + time.sleep(4) # Give some time for the profiling to start + thread2.start() + + thread.join(timeout=60) + inserting_should_stop_event.set() # Stop the insertion thread + thread2.join(timeout=60) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_with_archiving_the_response(neon_simple_env: NeonEnv): + """ + Test that CPU profiling works correctly with archiving the data. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + assert _start_profiling_cpu(http_client, None, archive=True) is not None, ( + "Failed to start and finish CPU profiling with archiving enabled" + ) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_start_and_stop(neon_simple_env: NeonEnv): + """ + Test that CPU profiling can be started and stopped correctly. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + def _wait_and_assert_cpu_profiling(): + # Should raise as the profiling will be stopped. + with pytest.raises(HTTPError) as _: + _start_profiling_cpu(http_client, None) + + thread = threading.Thread(target=_wait_and_assert_cpu_profiling) + thread.start() + + assert _wait_till_profiling_starts(http_client, None) + _stop_profiling_cpu(http_client, None) + + # Should raise as the profiling is already stopped. + assert _stop_profiling_cpu(http_client, None) == 412 + + thread.join(timeout=60) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_conflict(neon_simple_env: NeonEnv): + """ + Test that CPU profiling can be started once and the second time + it will throw an error as it is already running. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + pg_conn = endpoint.connect() + pg_cur = pg_conn.cursor() + pg_cur.execute("create database profiling_test") + http_client = endpoint.http_client() + + def _wait_and_assert_cpu_profiling_local(): + _wait_and_assert_cpu_profiling(http_client, None) + + thread = threading.Thread(target=_wait_and_assert_cpu_profiling_local) + thread.start() + + assert _wait_till_profiling_starts(http_client, None) + + inserting_should_stop_event = threading.Event() + + def insert_rows(): + lfc_conn = endpoint.connect(dbname="profiling_test") + lfc_cur = lfc_conn.cursor() + n_records = 0 + lfc_cur.execute( + "create table t(pk integer primary key, payload text default repeat('?', 128))" + ) + batch_size = 1000 + + log.info("Inserting rows") + + while not inserting_should_stop_event.is_set(): + n_records += batch_size + lfc_cur.execute( + f"insert into t (pk) values (generate_series({n_records - batch_size + 1},{batch_size}))" + ) + + log.info(f"Inserted {n_records} rows") + + thread2 = threading.Thread(target=insert_rows) + thread2.start() + + # Should raise as the profiling is already in progress. + with pytest.raises(HTTPError) as _: + _start_profiling_cpu(http_client, None) + + inserting_should_stop_event.set() # Stop the insertion thread + + # The profiling should still be running and finish normally. + thread.join(timeout=600) + thread2.join(timeout=600) + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_stop_when_not_running(neon_simple_env: NeonEnv): + """ + Test that CPU profiling throws the expected error when is attempted + to be stopped when it hasn't be running. + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + for _ in range(3): + status = _stop_profiling_cpu(http_client, None) + assert status == 412 + + +@run_only_on_default_postgres(reason=REASON) +def test_compute_profiling_cpu_start_arguments_validation_works(neon_simple_env: NeonEnv): + """ + Test that CPU profiling start request properly validated the + arguments and throws the expected error (bad request). + """ + env = neon_simple_env + endpoint = env.endpoints.create_start("main") + http_client = endpoint.http_client() + + for sampling_frequency in [-1, 0, 1000000]: + status, _ = http_client.start_profiling_cpu(sampling_frequency, 5) + assert status == 422 + for timeout_seconds in [-1, 0, 1000000]: + status, _ = http_client.start_profiling_cpu(5, timeout_seconds) + assert status == 422 diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index fc01deb92d..47160c0c8d 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -120,6 +120,7 @@ anyhow = { version = "1", features = ["backtrace"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } +clang-sys = { version = "1", default-features = false, features = ["clang_11_0", "runtime"] } clap = { version = "4", features = ["derive", "env", "string"] } clap_builder = { version = "4", default-features = false, features = ["color", "env", "help", "std", "string", "suggestions", "usage"] } either = { version = "1" }